package deployerpod

import (
	"fmt"
	"sort"

	"github.com/golang/glog"

	kapi "k8s.io/kubernetes/pkg/api"
	kerrors "k8s.io/kubernetes/pkg/api/errors"
	kutil "k8s.io/kubernetes/pkg/util"

	deployapi "github.com/openshift/origin/pkg/deploy/api"
	deployutil "github.com/openshift/origin/pkg/deploy/util"
)

// DeployerPodController keeps a deployment's status in sync with the deployer pod
// handling the deployment.
//
// Use the DeployerPodControllerFactory to create this controller.
type DeployerPodController struct {
	// deploymentClient provides access to deployments.
	deploymentClient deploymentClient
	// deployerPodsFor returns all deployer pods for the named deployment.
	deployerPodsFor func(namespace, name string) (*kapi.PodList, error)
	// deletePod deletes a pod.
	deletePod func(namespace, name string) error
}

// transientError is an error which will be retried indefinitely.
type transientError string

func (e transientError) Error() string { return "transient error handling deployer pod: " + string(e) }

// Handle syncs pod's status with any associated deployment.
func (c *DeployerPodController) Handle(pod *kapi.Pod) error {
	// Find the deployment associated with the deployer pod.
	deploymentName := deployutil.DeploymentNameFor(pod)
	if len(deploymentName) == 0 {
		return nil
	}
	// Reject updates to anything but the main deployer pod
	// TODO: Find a way to filter this on the watch side.
	if pod.Name != deployutil.DeployerPodNameForDeployment(deploymentName) {
		return nil
	}

	deployment, err := c.deploymentClient.getDeployment(pod.Namespace, deploymentName)
	// If the deployment for this pod has disappeared, we should clean up this
	// and any other deployer pods, then bail out.
	if err != nil {
		// Some retrieval error occured. Retry.
		if !kerrors.IsNotFound(err) {
			return fmt.Errorf("couldn't get deployment %s/%s which owns deployer pod %s/%s", pod.Namespace, deploymentName, pod.Name, pod.Namespace)
		}
		// Find all the deployer pods for the deployment (including this one).
		deployers, err := c.deployerPodsFor(pod.Namespace, deploymentName)
		if err != nil {
			// Retry.
			return fmt.Errorf("couldn't get deployer pods for %s: %v", deployutil.LabelForDeployment(deployment), err)
		}
		// Delete all deployers.
		for _, deployer := range deployers.Items {
			err := c.deletePod(deployer.Namespace, deployer.Name)
			if err != nil {
				if !kerrors.IsNotFound(err) {
					// TODO: Should this fire an event?
					glog.V(2).Infof("Couldn't delete orphaned deployer pod %s/%s: %v", deployer.Namespace, deployer.Name, err)
				}
			} else {
				// TODO: Should this fire an event?
				glog.V(2).Infof("Deleted orphaned deployer pod %s/%s", deployer.Namespace, deployer.Name)
			}
		}
		return nil
	}

	currentStatus := deployutil.DeploymentStatusFor(deployment)
	nextStatus := currentStatus

	switch pod.Status.Phase {
	case kapi.PodRunning:
		nextStatus = deployapi.DeploymentStatusRunning
	case kapi.PodSucceeded:
		// Detect failure based on the container state
		nextStatus = deployapi.DeploymentStatusComplete
		for _, info := range pod.Status.ContainerStatuses {
			if info.State.Terminated != nil && info.State.Terminated.ExitCode != 0 {
				nextStatus = deployapi.DeploymentStatusFailed
			}
		}
		if nextStatus == deployapi.DeploymentStatusComplete {
			delete(deployment.Annotations, deployapi.DesiredReplicasAnnotation)
		}
	case kapi.PodFailed:
		// if the deployment is already marked Failed, do not attempt clean up again
		if currentStatus != deployapi.DeploymentStatusFailed {
			// clean up will also update the deployment status to Failed
			// failure to clean up will result in retries and
			// the deployment will not be marked Failed
			// Note: this will prevent new deployments from being created for this config
			err := c.cleanupFailedDeployment(deployment)
			if err != nil {
				return transientError(fmt.Sprintf("couldn't clean up failed deployment: %v", err))
			}
		}
	}

	if currentStatus != nextStatus {
		deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(nextStatus)
		if _, err := c.deploymentClient.updateDeployment(deployment.Namespace, deployment); err != nil {
			if kerrors.IsNotFound(err) {
				return nil
			}
			return fmt.Errorf("couldn't update Deployment %s to status %s: %v", deployutil.LabelForDeployment(deployment), nextStatus, err)
		}
		glog.V(4).Infof("Updated Deployment %s status from %s to %s", deployutil.LabelForDeployment(deployment), currentStatus, nextStatus)
	}

	return nil
}

func (c *DeployerPodController) cleanupFailedDeployment(deployment *kapi.ReplicationController) error {
	// Scale down the current failed deployment
	configName := deployutil.DeploymentConfigNameFor(deployment)
	existingDeployments, err := c.deploymentClient.listDeploymentsForConfig(deployment.Namespace, configName)
	if err != nil {
		return fmt.Errorf("couldn't list Deployments for DeploymentConfig %s: %v", configName, err)
	}

	desiredReplicas, ok := deployutil.DeploymentDesiredReplicas(deployment)
	if !ok {
		// if desired replicas could not be found, then log the error
		// and update the failed deployment
		// this cannot be treated as a transient error
		kutil.HandleError(fmt.Errorf("Could not determine desired replicas from %s to reset replicas for last completed deployment", deployutil.LabelForDeployment(deployment)))
	}

	if ok && len(existingDeployments.Items) > 0 {
		sort.Sort(deployutil.DeploymentsByLatestVersionDesc(existingDeployments.Items))
		for index, existing := range existingDeployments.Items {
			// if a newer deployment exists:
			// - set the replicas for the current failed deployment to 0
			// - there is no point in scaling up the last completed deployment
			// since that will be scaled down by the later deployment
			if index == 0 && existing.Name != deployment.Name {
				break
			}

			// the latest completed deployment is the one that needs to be scaled back up
			if deployutil.DeploymentStatusFor(&existing) == deployapi.DeploymentStatusComplete {
				if existing.Spec.Replicas == desiredReplicas {
					break
				}

				// scale back the completed deployment to the target of the failed deployment
				existing.Spec.Replicas = desiredReplicas
				if _, err := c.deploymentClient.updateDeployment(existing.Namespace, &existing); err != nil {
					if kerrors.IsNotFound(err) {
						return nil
					}
					return fmt.Errorf("couldn't update replicas to %d for deployment %s: %v", desiredReplicas, deployutil.LabelForDeployment(&existing), err)
				}
				glog.V(4).Infof("Updated replicas to %d for deployment %s", desiredReplicas, deployutil.LabelForDeployment(&existing))

				break
			}
		}
	}
	// set the replicas for the failed deployment to 0
	// and set the status to Failed
	deployment.Spec.Replicas = 0
	deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusFailed)
	if _, err := c.deploymentClient.updateDeployment(deployment.Namespace, deployment); err != nil {
		if kerrors.IsNotFound(err) {
			return nil
		}
		return fmt.Errorf("couldn't scale down the deployment %s and mark it as failed: %v", deployutil.LabelForDeployment(deployment), err)
	}
	glog.V(4).Infof("Scaled down the deployment %s and marked it as failed", deployutil.LabelForDeployment(deployment))

	return nil
}

// deploymentClient abstracts access to deployments.
type deploymentClient interface {
	getDeployment(namespace, name string) (*kapi.ReplicationController, error)
	updateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error)
	// listDeploymentsForConfig should return deployments associated with the
	// provided config.
	listDeploymentsForConfig(namespace, configName string) (*kapi.ReplicationControllerList, error)
}

// deploymentClientImpl is a pluggable deploymentControllerDeploymentClient.
type deploymentClientImpl struct {
	getDeploymentFunc            func(namespace, name string) (*kapi.ReplicationController, error)
	updateDeploymentFunc         func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error)
	listDeploymentsForConfigFunc func(namespace, configName string) (*kapi.ReplicationControllerList, error)
}

func (i *deploymentClientImpl) getDeployment(namespace, name string) (*kapi.ReplicationController, error) {
	return i.getDeploymentFunc(namespace, name)
}

func (i *deploymentClientImpl) updateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
	return i.updateDeploymentFunc(namespace, deployment)
}

func (i *deploymentClientImpl) listDeploymentsForConfig(namespace, configName string) (*kapi.ReplicationControllerList, error) {
	return i.listDeploymentsForConfigFunc(namespace, configName)
}