package deploymentconfig import ( "fmt" "time" kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" osclient "github.com/openshift/origin/pkg/client" controller "github.com/openshift/origin/pkg/controller" deployapi "github.com/openshift/origin/pkg/deploy/api" deployutil "github.com/openshift/origin/pkg/deploy/util" ) // DeploymentConfigControllerFactory can create a DeploymentConfigController which obtains // DeploymentConfigs from a queue populated from a watch of all DeploymentConfigs. type DeploymentConfigControllerFactory struct { // Client is an OpenShift client. Client osclient.Interface // KubeClient is a Kubernetes client. KubeClient kclient.Interface // Codec is used to encode/decode. Codec runtime.Codec } // Create creates a DeploymentConfigController. func (factory *DeploymentConfigControllerFactory) Create() controller.RunnableController { deploymentConfigLW := &deployutil.ListWatcherImpl{ ListFunc: func() (runtime.Object, error) { return factory.Client.DeploymentConfigs(kapi.NamespaceAll).List(labels.Everything(), fields.Everything()) }, WatchFunc: func(resourceVersion string) (watch.Interface, error) { return factory.Client.DeploymentConfigs(kapi.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion) }, } queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) cache.NewReflector(deploymentConfigLW, &deployapi.DeploymentConfig{}, queue, 2*time.Minute).Run() eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(factory.KubeClient.Events("")) configController := &DeploymentConfigController{ deploymentClient: &deploymentClientImpl{ createDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { return factory.KubeClient.ReplicationControllers(namespace).Create(deployment) }, listDeploymentsForConfigFunc: func(namespace, configName string) (*kapi.ReplicationControllerList, error) { selector, err := labels.Parse(fmt.Sprintf("%s=%s", deployapi.DeploymentConfigLabel, configName)) if err != nil { return nil, err } return factory.KubeClient.ReplicationControllers(namespace).List(selector) }, updateDeploymentFunc: func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) { return factory.KubeClient.ReplicationControllers(namespace).Update(deployment) }, }, makeDeployment: func(config *deployapi.DeploymentConfig) (*kapi.ReplicationController, error) { return deployutil.MakeDeployment(config, factory.Codec) }, recorder: eventBroadcaster.NewRecorder(kapi.EventSource{Component: "deployer"}), } return &controller.RetryController{ Queue: queue, RetryManager: controller.NewQueueRetryManager( queue, cache.MetaNamespaceKeyFunc, func(obj interface{}, err error, count int) bool { kutil.HandleError(err) // no retries for a fatal error if _, isFatal := err.(fatalError); isFatal { return false } // infinite retries for a transient error if _, isTransient := err.(transientError); isTransient { return true } // no retries for anything else if count > 0 { return false } return true }, kutil.NewTokenBucketRateLimiter(1, 10), ), Handle: func(obj interface{}) error { config := obj.(*deployapi.DeploymentConfig) return configController.Handle(config) }, } }