package support

import (
	"fmt"
	"io"
	"strings"
	"sync"
	"time"

	"github.com/golang/glog"

	kapi "k8s.io/kubernetes/pkg/api"
	kerrors "k8s.io/kubernetes/pkg/api/errors"
	"k8s.io/kubernetes/pkg/client/cache"
	kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
	kdeployutil "k8s.io/kubernetes/pkg/controller/deployment/util"
	"k8s.io/kubernetes/pkg/fields"
	"k8s.io/kubernetes/pkg/labels"
	"k8s.io/kubernetes/pkg/runtime"
	utilerrors "k8s.io/kubernetes/pkg/util/errors"
	"k8s.io/kubernetes/pkg/util/sets"
	"k8s.io/kubernetes/pkg/util/wait"
	"k8s.io/kubernetes/pkg/watch"

	"github.com/openshift/origin/pkg/client"
	deployapi "github.com/openshift/origin/pkg/deploy/api"
	strategyutil "github.com/openshift/origin/pkg/deploy/strategy/util"
	deployutil "github.com/openshift/origin/pkg/deploy/util"
	imageapi "github.com/openshift/origin/pkg/image/api"
	"github.com/openshift/origin/pkg/util"
	namer "github.com/openshift/origin/pkg/util/namer"
)

// hookContainerName is the name used for the container that runs inside hook pods.
const hookContainerName = "lifecycle"

// HookExecutor knows how to execute a deployment lifecycle hook.
type HookExecutor interface {
	Execute(hook *deployapi.LifecycleHook, rc *kapi.ReplicationController, suffix, label string) error
}

// hookExecutor implements the HookExecutor interface.
var _ HookExecutor = &hookExecutor{}

// hookExecutor executes a deployment lifecycle hook.
type hookExecutor struct {
	// pods provides client to pods
	pods kcoreclient.PodsGetter
	// tags allows setting image stream tags
	tags client.ImageStreamTagsNamespacer
	// out is where hook pod logs should be written to.
	out io.Writer
	// decoder is used for encoding/decoding.
	decoder runtime.Decoder
	// recorder is used to emit events from hooks
	events kcoreclient.EventsGetter
	// getPodLogs knows how to get logs from a pod and is used for testing
	getPodLogs func(*kapi.Pod) (io.ReadCloser, error)
}

// NewHookExecutor makes a HookExecutor from a client.
func NewHookExecutor(pods kcoreclient.PodsGetter, tags client.ImageStreamTagsNamespacer, events kcoreclient.EventsGetter, out io.Writer, decoder runtime.Decoder) HookExecutor {
	executor := &hookExecutor{
		tags:    tags,
		pods:    pods,
		events:  events,
		out:     out,
		decoder: decoder,
	}
	executor.getPodLogs = func(pod *kapi.Pod) (io.ReadCloser, error) {
		opts := &kapi.PodLogOptions{
			Container:  hookContainerName,
			Follow:     true,
			Timestamps: false,
		}
		return executor.pods.Pods(pod.Namespace).GetLogs(pod.Name, opts).Stream()
	}
	return executor
}

// Execute executes hook in the context of deployment. The suffix is used to
// distinguish the kind of hook (e.g. pre, post).
func (e *hookExecutor) Execute(hook *deployapi.LifecycleHook, rc *kapi.ReplicationController, suffix, label string) error {
	var err error
	switch {
	case len(hook.TagImages) > 0:
		tagEventMessages := []string{}
		for _, t := range hook.TagImages {
			image, ok := findContainerImage(rc, t.ContainerName)
			if ok {
				tagEventMessages = append(tagEventMessages, fmt.Sprintf("image %q as %q", image, t.To.Name))
			}
		}
		strategyutil.RecordConfigEvent(e.events, rc, e.decoder, kapi.EventTypeNormal, "Started",
			fmt.Sprintf("Running %s-hook (TagImages) %s for rc %s/%s", label, strings.Join(tagEventMessages, ","), rc.Namespace, rc.Name))
		err = e.tagImages(hook, rc, suffix, label)
	case hook.ExecNewPod != nil:
		strategyutil.RecordConfigEvent(e.events, rc, e.decoder, kapi.EventTypeNormal, "Started",
			fmt.Sprintf("Running %s-hook (%q) for rc %s/%s", label, strings.Join(hook.ExecNewPod.Command, " "), rc.Namespace, rc.Name))
		err = e.executeExecNewPod(hook, rc, suffix, label)
	}

	if err == nil {
		strategyutil.RecordConfigEvent(e.events, rc, e.decoder, kapi.EventTypeNormal, "Completed",
			fmt.Sprintf("The %s-hook for rc %s/%s completed successfully", label, rc.Namespace, rc.Name))
		return nil
	}

	// Retry failures are treated the same as Abort.
	switch hook.FailurePolicy {
	case deployapi.LifecycleHookFailurePolicyAbort, deployapi.LifecycleHookFailurePolicyRetry:
		strategyutil.RecordConfigEvent(e.events, rc, e.decoder, kapi.EventTypeWarning, "Failed",
			fmt.Sprintf("The %s-hook failed: %v, aborting rollout of %s/%s", label, err, rc.Namespace, rc.Name))
		return fmt.Errorf("the %s hook failed: %v, aborting rollout of %s/%s", label, err, rc.Namespace, rc.Name)
	case deployapi.LifecycleHookFailurePolicyIgnore:
		strategyutil.RecordConfigEvent(e.events, rc, e.decoder, kapi.EventTypeWarning, "Failed",
			fmt.Sprintf("The %s-hook failed: %v (ignore), rollout of %s/%s will continue", label, err, rc.Namespace, rc.Name))
		return nil
	default:
		return err
	}
}

// findContainerImage returns the image with the given container name from a replication controller.
func findContainerImage(rc *kapi.ReplicationController, containerName string) (string, bool) {
	if rc.Spec.Template == nil {
		return "", false
	}
	for _, container := range rc.Spec.Template.Spec.Containers {
		if container.Name == containerName {
			return container.Image, true
		}
	}
	return "", false
}

// tagImages tags images as part of the lifecycle of a rc. It uses an ImageStreamTag client
// which will provision an ImageStream if it doesn't already exist.
func (e *hookExecutor) tagImages(hook *deployapi.LifecycleHook, rc *kapi.ReplicationController, suffix, label string) error {
	var errs []error
	for _, action := range hook.TagImages {
		value, ok := findContainerImage(rc, action.ContainerName)
		if !ok {
			errs = append(errs, fmt.Errorf("unable to find image for container %q, container could not be found", action.ContainerName))
			continue
		}
		namespace := action.To.Namespace
		if len(namespace) == 0 {
			namespace = rc.Namespace
		}
		if _, err := e.tags.ImageStreamTags(namespace).Update(&imageapi.ImageStreamTag{
			ObjectMeta: kapi.ObjectMeta{
				Name:      action.To.Name,
				Namespace: namespace,
			},
			Tag: &imageapi.TagReference{
				From: &kapi.ObjectReference{
					Kind: "DockerImage",
					Name: value,
				},
			},
		}); err != nil {
			errs = append(errs, err)
			continue
		}
		fmt.Fprintf(e.out, "--> %s: Tagged %q into %s/%s\n", label, value, action.To.Namespace, action.To.Name)
	}

	return utilerrors.NewAggregate(errs)
}

// executeExecNewPod executes a ExecNewPod hook by creating a new pod based on
// the hook parameters and replication controller. The pod is then synchronously
// watched until the pod completes, and if the pod failed, an error is returned.
//
// The hook pod inherits the following from the container the hook refers to:
//
//   * Environment (hook keys take precedence)
//   * Working directory
//   * Resources
func (e *hookExecutor) executeExecNewPod(hook *deployapi.LifecycleHook, rc *kapi.ReplicationController, suffix, label string) error {
	config, err := deployutil.DecodeDeploymentConfig(rc, e.decoder)
	if err != nil {
		return err
	}

	deployerPod, err := e.pods.Pods(rc.Namespace).Get(deployutil.DeployerPodNameForDeployment(rc.Name))
	if err != nil {
		return err
	}

	// Build a pod spec from the hook config and replication controller.
	podSpec, err := makeHookPod(hook, rc, deployerPod, &config.Spec.Strategy, suffix)
	if err != nil {
		return err
	}

	// Track whether the pod has already run to completion and avoid showing logs
	// or the Success message twice.
	completed, created := false, false

	// Try to create the pod.
	pod, err := e.pods.Pods(rc.Namespace).Create(podSpec)
	if err != nil {
		if !kerrors.IsAlreadyExists(err) {
			return fmt.Errorf("couldn't create lifecycle pod for %s: %v", rc.Name, err)
		}
		completed = true
		pod = podSpec
		pod.Namespace = rc.Namespace
	} else {
		created = true
		fmt.Fprintf(e.out, "--> %s: Running hook pod ...\n", label)
	}

	stopChannel := make(chan struct{})
	defer close(stopChannel)
	nextPod := newPodWatch(e.pods.Pods(pod.Namespace), pod.Namespace, pod.Name, pod.ResourceVersion, stopChannel)

	// Wait for the hook pod to reach a terminal phase. Start reading logs as
	// soon as the pod enters a usable phase.
	var updatedPod *kapi.Pod
	wg := &sync.WaitGroup{}
	wg.Add(1)
	restarts := int32(0)
	alreadyRead := false
waitLoop:
	for {
		updatedPod = nextPod()
		switch updatedPod.Status.Phase {
		case kapi.PodRunning:
			completed = false

			// We should read only the first time or in any container restart when we want to retry.
			canRetry, restartCount := canRetryReading(updatedPod, restarts)
			if alreadyRead && !canRetry {
				break
			}
			// The hook container has restarted; we need to notify that we are retrying in the logs.
			// TODO: Maybe log the container id
			if restarts != restartCount {
				wg.Add(1)
				restarts = restartCount
				fmt.Fprintf(e.out, "--> %s: Retrying hook pod (retry #%d)\n", label, restartCount)
			}
			alreadyRead = true
			go e.readPodLogs(pod, wg)

		case kapi.PodSucceeded, kapi.PodFailed:
			if completed {
				if updatedPod.Status.Phase == kapi.PodSucceeded {
					fmt.Fprintf(e.out, "--> %s: Hook pod already succeeded\n", label)
				}
				wg.Done()
				break waitLoop
			}
			if !created {
				fmt.Fprintf(e.out, "--> %s: Hook pod is already running ...\n", label)
			}
			if !alreadyRead {
				go e.readPodLogs(pod, wg)
			}
			break waitLoop
		default:
			completed = false
		}
	}
	// The pod is finished, wait for all logs to be consumed before returning.
	wg.Wait()
	if updatedPod.Status.Phase == kapi.PodFailed {
		fmt.Fprintf(e.out, "--> %s: Failed\n", label)
		return fmt.Errorf(updatedPod.Status.Message)
	}
	// Only show this message if we created the pod ourselves, or we saw
	// the pod in a running or pending state.
	if !completed {
		fmt.Fprintf(e.out, "--> %s: Success\n", label)
	}
	return nil
}

// readPodLogs streams logs from pod to out. It signals wg when
// done.
func (e *hookExecutor) readPodLogs(pod *kapi.Pod, wg *sync.WaitGroup) {
	defer wg.Done()
	logStream, err := e.getPodLogs(pod)
	if err != nil || logStream == nil {
		fmt.Fprintf(e.out, "warning: Unable to retrieve hook logs from %s: %v\n", pod.Name, err)
		return
	}
	// Read logs.
	defer logStream.Close()
	if _, err := io.Copy(e.out, logStream); err != nil {
		fmt.Fprintf(e.out, "\nwarning: Unable to read all logs from %s, continuing: %v\n", pod.Name, err)
	}
}

// makeHookPod makes a pod spec from a hook and replication controller.
func makeHookPod(hook *deployapi.LifecycleHook, rc *kapi.ReplicationController, deployerPod *kapi.Pod, strategy *deployapi.DeploymentStrategy, suffix string) (*kapi.Pod, error) {
	exec := hook.ExecNewPod
	var baseContainer *kapi.Container
	for _, container := range rc.Spec.Template.Spec.Containers {
		if container.Name == exec.ContainerName {
			baseContainer = &container
			break
		}
	}
	if baseContainer == nil {
		return nil, fmt.Errorf("no container named '%s' found in rc template", exec.ContainerName)
	}

	// Build a merged environment; hook environment takes precedence over base
	// container environment
	envMap := map[string]kapi.EnvVar{}
	mergedEnv := []kapi.EnvVar{}
	for _, env := range baseContainer.Env {
		envMap[env.Name] = env
	}
	for _, env := range exec.Env {
		envMap[env.Name] = env
	}
	for k, v := range envMap {
		mergedEnv = append(mergedEnv, kapi.EnvVar{Name: k, Value: v.Value, ValueFrom: v.ValueFrom})
	}
	mergedEnv = append(mergedEnv, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAME", Value: rc.Name})
	mergedEnv = append(mergedEnv, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAMESPACE", Value: rc.Namespace})

	// Inherit resources from the base container
	resources := kapi.ResourceRequirements{}
	if err := kapi.Scheme.Convert(&baseContainer.Resources, &resources, nil); err != nil {
		return nil, fmt.Errorf("couldn't clone ResourceRequirements: %v", err)
	}

	// Assigning to a variable since its address is required
	defaultActiveDeadline := deployapi.MaxDeploymentDurationSeconds
	if strategy.ActiveDeadlineSeconds != nil {
		defaultActiveDeadline = *(strategy.ActiveDeadlineSeconds)
	}
	maxDeploymentDurationSeconds := defaultActiveDeadline - int64(time.Since(deployerPod.Status.StartTime.Time).Seconds())

	// Let the kubelet manage retries if requested
	restartPolicy := kapi.RestartPolicyNever
	if hook.FailurePolicy == deployapi.LifecycleHookFailurePolicyRetry {
		restartPolicy = kapi.RestartPolicyOnFailure
	}

	// Transfer any requested volumes to the hook pod.
	volumes := []kapi.Volume{}
	volumeNames := sets.NewString()
	for _, volume := range rc.Spec.Template.Spec.Volumes {
		for _, name := range exec.Volumes {
			if volume.Name == name {
				volumes = append(volumes, volume)
				volumeNames.Insert(volume.Name)
			}
		}
	}
	// Transfer any volume mounts associated with transferred volumes.
	volumeMounts := []kapi.VolumeMount{}
	for _, mount := range baseContainer.VolumeMounts {
		if volumeNames.Has(mount.Name) {
			volumeMounts = append(volumeMounts, kapi.VolumeMount{
				Name:      mount.Name,
				ReadOnly:  mount.ReadOnly,
				MountPath: mount.MountPath,
			})
		}
	}

	// Transfer image pull secrets from the pod spec.
	imagePullSecrets := []kapi.LocalObjectReference{}
	for _, pullSecret := range rc.Spec.Template.Spec.ImagePullSecrets {
		imagePullSecrets = append(imagePullSecrets, kapi.LocalObjectReference{Name: pullSecret.Name})
	}

	gracePeriod := int64(10)

	pod := &kapi.Pod{
		ObjectMeta: kapi.ObjectMeta{
			Name: namer.GetPodName(rc.Name, suffix),
			Annotations: map[string]string{
				deployapi.DeploymentAnnotation: rc.Name,
			},
			Labels: map[string]string{
				deployapi.DeploymentPodTypeLabel:        suffix,
				deployapi.DeployerPodForDeploymentLabel: rc.Name,
			},
		},
		Spec: kapi.PodSpec{
			Containers: []kapi.Container{
				{
					Name:            hookContainerName,
					Image:           baseContainer.Image,
					ImagePullPolicy: baseContainer.ImagePullPolicy,
					Command:         exec.Command,
					WorkingDir:      baseContainer.WorkingDir,
					Env:             mergedEnv,
					Resources:       resources,
					VolumeMounts:    volumeMounts,
				},
			},
			Volumes:               volumes,
			ActiveDeadlineSeconds: &maxDeploymentDurationSeconds,
			// Setting the node selector on the hook pod so that it is created
			// on the same set of nodes as the rc pods.
			NodeSelector:                  rc.Spec.Template.Spec.NodeSelector,
			RestartPolicy:                 restartPolicy,
			ImagePullSecrets:              imagePullSecrets,
			TerminationGracePeriodSeconds: &gracePeriod,
		},
	}

	util.MergeInto(pod.Labels, strategy.Labels, 0)
	util.MergeInto(pod.Annotations, strategy.Annotations, 0)

	return pod, nil
}

// canRetryReading returns whether the deployment strategy can retry reading logs
// from the given (hook) pod and the number of restarts that pod has.
func canRetryReading(pod *kapi.Pod, restarts int32) (bool, int32) {
	if len(pod.Status.ContainerStatuses) == 0 {
		return false, int32(0)
	}
	restartCount := pod.Status.ContainerStatuses[0].RestartCount
	return pod.Spec.RestartPolicy == kapi.RestartPolicyOnFailure && restartCount > restarts, restartCount
}

// newPodWatch creates a pod watching function which is backed by a
// FIFO/reflector pair. This avoids managing watches directly.
// A stop channel to close the watch's reflector is also returned.
// It is the caller's responsibility to defer closing the stop channel to prevent leaking resources.
func newPodWatch(client kcoreclient.PodInterface, namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
	fieldSelector := fields.OneTermEqualSelector("metadata.name", name)
	podLW := &cache.ListWatch{
		ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
			options.FieldSelector = fieldSelector
			return client.List(options)
		},
		WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
			options.FieldSelector = fieldSelector
			return client.Watch(options)
		},
	}

	queue := cache.NewResyncableFIFO(cache.MetaNamespaceKeyFunc)
	cache.NewReflector(podLW, &kapi.Pod{}, queue, 1*time.Minute).RunUntil(stopChannel)

	return func() *kapi.Pod {
		obj := cache.Pop(queue)
		return obj.(*kapi.Pod)
	}
}

// NewAcceptAvailablePods makes a new acceptAvailablePods from a real client.
func NewAcceptAvailablePods(
	out io.Writer,
	kclient kcoreclient.PodsGetter,
	timeout time.Duration,
	interval time.Duration,
	minReadySeconds int32,
) *acceptAvailablePods {

	return &acceptAvailablePods{
		out:             out,
		timeout:         timeout,
		interval:        interval,
		minReadySeconds: minReadySeconds,
		acceptedPods:    sets.NewString(),
		getRcPodStore: func(rc *kapi.ReplicationController) (cache.Store, chan struct{}) {
			selector := labels.Set(rc.Spec.Selector).AsSelector()
			store := cache.NewStore(cache.MetaNamespaceKeyFunc)
			lw := &cache.ListWatch{
				ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
					options.LabelSelector = selector
					return kclient.Pods(rc.Namespace).List(options)
				},
				WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
					options.LabelSelector = selector
					return kclient.Pods(rc.Namespace).Watch(options)
				},
			}
			stop := make(chan struct{})
			cache.NewReflector(lw, &kapi.Pod{}, store, 10*time.Second).RunUntil(stop)
			return store, stop
		},
	}
}

// acceptAvailablePods will accept a replication controller if all the pods
// for the replication controller become available.
//
// acceptAvailablePods keeps track of the pods it has accepted for a
// replication controller so that the acceptor can be reused across multiple
// batches of updates to a single controller. For example, if during the first
// acceptance call the replication controller has 3 pods, the acceptor will
// validate those 3 pods. If the same acceptor instance is used again for the
// same replication controller which now has 6 pods, only the latest 3 pods
// will be considered for acceptance. The status of the original 3 pods becomes
// irrelevant.
//
// Note that this struct is stateful and intended for use with a single
// replication controller and should be discarded and recreated between
// rollouts.
type acceptAvailablePods struct {
	out io.Writer
	// getRcPodStore should return a Store containing all the pods for the
	// replication controller, and a channel to stop whatever process is
	// feeding the store.
	getRcPodStore func(*kapi.ReplicationController) (cache.Store, chan struct{})
	// timeout is how long to wait for pod readiness.
	timeout time.Duration
	// interval is how often to check for pod readiness
	interval time.Duration
	// minReadySeconds is the minimum number of seconds for which a newly created
	// pod should be ready without any of its container crashing, for it to be
	// considered available.
	minReadySeconds int32
	// acceptedPods keeps track of pods which have been previously accepted for
	// a replication controller.
	acceptedPods sets.String
}

// Accept all pods for a replication controller once they are available.
func (c *acceptAvailablePods) Accept(rc *kapi.ReplicationController) error {
	// Make a pod store to poll and ensure it gets cleaned up.
	podStore, stopStore := c.getRcPodStore(rc)
	defer close(stopStore)

	// Start checking for pod updates.
	if c.acceptedPods.Len() > 0 {
		fmt.Fprintf(c.out, "--> Waiting up to %s for pods in rc %s to become ready (%d pods previously accepted)\n", c.timeout, rc.Name, c.acceptedPods.Len())
	} else {
		fmt.Fprintf(c.out, "--> Waiting up to %s for pods in rc %s to become ready\n", c.timeout, rc.Name)
	}
	err := wait.Poll(c.interval, c.timeout, func() (done bool, err error) {
		// Check for pod readiness.
		unready := sets.NewString()
		for _, obj := range podStore.List() {
			pod := obj.(*kapi.Pod)
			// Skip previously accepted pods; we only want to verify newly observed
			// and unaccepted pods.
			if c.acceptedPods.Has(pod.Name) {
				continue
			}
			if kdeployutil.IsPodAvailable(pod, c.minReadySeconds, time.Now()) {
				// If the pod is ready, track it as accepted.
				c.acceptedPods.Insert(pod.Name)
			} else {
				// Otherwise, track it as unready.
				unready.Insert(pod.Name)
			}
		}
		// Check to see if we're done.
		if unready.Len() == 0 {
			return true, nil
		}
		// Otherwise, try again later.
		glog.V(4).Infof("Still waiting for %d pods to become ready for rc %s", unready.Len(), rc.Name)
		return false, nil
	})

	// Handle acceptance failure.
	if err != nil {
		if err == wait.ErrWaitTimeout {
			return fmt.Errorf("pods for rc %q took longer than %.f seconds to become ready", rc.Name, c.timeout.Seconds())
		}
		return fmt.Errorf("pod readiness check failed for rc %q: %v", rc.Name, err)
	}
	return nil
}