package support

import (
	"fmt"
	"io"
	"strings"
	"sync"
	"time"

	"github.com/golang/glog"

	kapi "k8s.io/kubernetes/pkg/api"
	kerrors "k8s.io/kubernetes/pkg/api/errors"
	"k8s.io/kubernetes/pkg/api/unversioned"
	"k8s.io/kubernetes/pkg/client/cache"
	"k8s.io/kubernetes/pkg/client/record"
	kclient "k8s.io/kubernetes/pkg/client/unversioned"
	"k8s.io/kubernetes/pkg/fields"
	"k8s.io/kubernetes/pkg/labels"
	"k8s.io/kubernetes/pkg/runtime"
	utilerrors "k8s.io/kubernetes/pkg/util/errors"
	"k8s.io/kubernetes/pkg/util/sets"
	"k8s.io/kubernetes/pkg/util/wait"
	"k8s.io/kubernetes/pkg/watch"

	"github.com/openshift/origin/pkg/client"
	deployapi "github.com/openshift/origin/pkg/deploy/api"
	deployutil "github.com/openshift/origin/pkg/deploy/util"
	imageapi "github.com/openshift/origin/pkg/image/api"
	"github.com/openshift/origin/pkg/util"
	namer "github.com/openshift/origin/pkg/util/namer"
)

const HookContainerName = "lifecycle"

// HookExecutor executes a deployment lifecycle hook.
type HookExecutor struct {
	// podClient provides access to pods.
	podClient HookExecutorPodClient
	// tags allows setting image stream tags
	tags client.ImageStreamTagsNamespacer
	// out is where hook pod logs should be written to.
	out io.Writer
	// podLogStream provides a reader for a pod's logs.
	podLogStream func(namespace, name string, opts *kapi.PodLogOptions) (io.ReadCloser, error)
	// decoder is used for encoding/decoding.
	decoder runtime.Decoder
	// recorder is used to emit events from hooks
	events record.EventSink
}

// NewHookExecutor makes a HookExecutor from a client.
func NewHookExecutor(client kclient.PodsNamespacer, tags client.ImageStreamTagsNamespacer, events record.EventSink, out io.Writer, decoder runtime.Decoder) *HookExecutor {
	return &HookExecutor{
		tags:   tags,
		events: events,
		podClient: &HookExecutorPodClientImpl{
			CreatePodFunc: func(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
				return client.Pods(namespace).Create(pod)
			},
			PodWatchFunc: func(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
				return NewPodWatch(client, namespace, name, resourceVersion, stopChannel)
			},
		},
		podLogStream: func(namespace, name string, opts *kapi.PodLogOptions) (io.ReadCloser, error) {
			return client.Pods(namespace).GetLogs(name, opts).Stream()
		},
		out:     out,
		decoder: decoder,
	}
}

func (e *HookExecutor) emitEvent(deployment *kapi.ReplicationController, eventType, reason, msg string) {
	t := unversioned.Time{Time: time.Now()}
	var ref *kapi.ObjectReference
	if config, err := deployutil.DecodeDeploymentConfig(deployment, e.decoder); err != nil {
		glog.Errorf("Unable to decode deployment %s/%s to replication contoller: %v", deployment.Namespace, deployment.Name, err)
		if ref, err = kapi.GetReference(deployment); err != nil {
			glog.Errorf("Unable to get reference for %#v: %v", deployment, err)
			return
		}
	} else {
		if ref, err = kapi.GetReference(config); err != nil {
			glog.Errorf("Unable to get reference for %#v: %v", config, err)
			return
		}
	}
	event := &kapi.Event{
		ObjectMeta: kapi.ObjectMeta{
			Name:      fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
			Namespace: ref.Namespace,
		},
		InvolvedObject: *ref,
		Reason:         reason,
		Message:        msg,
		FirstTimestamp: t,
		LastTimestamp:  t,
		Count:          1,
		Type:           eventType,
	}
	if _, err := e.events.Create(event); err != nil {
		glog.Errorf("Could not send event '%#v': %v", event, err)
	}
}

// Execute executes hook in the context of deployment. The suffix is used to
// distinguish the kind of hook (e.g. pre, post).
func (e *HookExecutor) Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error {
	var err error
	switch {
	case len(hook.TagImages) > 0:
		tagEventMessages := []string{}
		for _, t := range hook.TagImages {
			image, ok := findContainerImage(deployment, t.ContainerName)
			if ok {
				tagEventMessages = append(tagEventMessages, fmt.Sprintf("image %q as %q", image, t.To.Name))
			}
		}
		e.emitEvent(deployment, kapi.EventTypeNormal, "Started", fmt.Sprintf("Running %s-hook (TagImages) %s for deployment %s/%s", label, strings.Join(tagEventMessages, ","), deployment.Namespace, deployment.Name))
		err = e.tagImages(hook, deployment, suffix, label)
	case hook.ExecNewPod != nil:
		e.emitEvent(deployment, kapi.EventTypeNormal, "Started", fmt.Sprintf("Running %s-hook (%q) for deployment %s/%s", label, strings.Join(hook.ExecNewPod.Command, " "), deployment.Namespace, deployment.Name))
		err = e.executeExecNewPod(hook, deployment, suffix, label)
	}

	if err == nil {
		e.emitEvent(deployment, kapi.EventTypeNormal, "Completed", fmt.Sprintf("The %s-hook for deployment %s/%s completed successfully", label, deployment.Namespace, deployment.Name))
		return nil
	}

	// Retry failures are treated the same as Abort.
	switch hook.FailurePolicy {
	case deployapi.LifecycleHookFailurePolicyAbort, deployapi.LifecycleHookFailurePolicyRetry:
		e.emitEvent(deployment, kapi.EventTypeWarning, "Failed", fmt.Sprintf("The %s-hook failed: %v, aborting deployment %s/%s", label, err, deployment.Namespace, deployment.Name))
		return fmt.Errorf("the %s hook failed: %v, aborting deployment: %s/%s", label, err, deployment.Namespace, deployment.Name)
	case deployapi.LifecycleHookFailurePolicyIgnore:
		e.emitEvent(deployment, kapi.EventTypeWarning, "Failed", fmt.Sprintf("The %s-hook failed: %v (ignore), deployment %s/%s will continue", label, err, deployment.Namespace, deployment.Name))
		fmt.Fprintf(e.out, "the %s hook failed: %v (ignore), deployment %s/%s will continue", label, err, deployment.Namespace, deployment.Name)
		return nil
	default:
		return err
	}
}

func findContainerImage(rc *kapi.ReplicationController, containerName string) (string, bool) {
	if rc.Spec.Template == nil {
		return "", false
	}
	for _, container := range rc.Spec.Template.Spec.Containers {
		if container.Name == containerName {
			return container.Image, true
		}
	}
	return "", false
}

// tagImages tags images from the deployment
func (e *HookExecutor) tagImages(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error {
	var errs []error
	for _, action := range hook.TagImages {
		value, ok := findContainerImage(deployment, action.ContainerName)
		if !ok {
			errs = append(errs, fmt.Errorf("unable to find image for container %q, container could not be found", action.ContainerName))
			continue
		}
		namespace := action.To.Namespace
		if len(namespace) == 0 {
			namespace = deployment.Namespace
		}
		if _, err := e.tags.ImageStreamTags(namespace).Update(&imageapi.ImageStreamTag{
			ObjectMeta: kapi.ObjectMeta{
				Name:      action.To.Name,
				Namespace: namespace,
			},
			Tag: &imageapi.TagReference{
				From: &kapi.ObjectReference{
					Kind: "DockerImage",
					Name: value,
				},
			},
		}); err != nil {
			errs = append(errs, err)
			continue
		}
		fmt.Fprintf(e.out, "--> %s: Tagged %q into %s/%s\n", label, value, action.To.Namespace, action.To.Name)
	}

	return utilerrors.NewAggregate(errs)
}

// executeExecNewPod executes a ExecNewPod hook by creating a new pod based on
// the hook parameters and deployment. The pod is then synchronously watched
// until the pod completes, and if the pod failed, an error is returned.
//
// The hook pod inherits the following from the container the hook refers to:
//
//   * Environment (hook keys take precedence)
//   * Working directory
//   * Resources
func (e *HookExecutor) executeExecNewPod(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error {
	config, err := deployutil.DecodeDeploymentConfig(deployment, e.decoder)
	if err != nil {
		return err
	}

	// Build a pod spec from the hook config and deployment
	podSpec, err := makeHookPod(hook, deployment, &config.Spec.Strategy, suffix)
	if err != nil {
		return err
	}

	// Track whether the pod has already run to completion and avoid showing logs
	// or the Success message twice.
	completed, created := false, false

	// Try to create the pod.
	pod, err := e.podClient.CreatePod(deployment.Namespace, podSpec)
	if err != nil {
		if !kerrors.IsAlreadyExists(err) {
			return fmt.Errorf("couldn't create lifecycle pod for %s: %v", deployment.Name, err)
		}
		completed = true
		pod = podSpec
		pod.Namespace = deployment.Namespace
	} else {
		created = true
		fmt.Fprintf(e.out, "--> %s: Running hook pod ...\n", label)
	}

	stopChannel := make(chan struct{})
	defer close(stopChannel)
	nextPod := e.podClient.PodWatch(pod.Namespace, pod.Name, pod.ResourceVersion, stopChannel)

	// Wait for the hook pod to reach a terminal phase. Start reading logs as
	// soon as the pod enters a usable phase.
	var updatedPod *kapi.Pod
	wg := &sync.WaitGroup{}
	wg.Add(1)
	restarts := int32(0)
	alreadyRead := false
waitLoop:
	for {
		updatedPod = nextPod()
		switch updatedPod.Status.Phase {
		case kapi.PodRunning:
			completed = false

			// We should read only the first time or in any container restart when we want to retry.
			canRetry, restartCount := canRetryReading(updatedPod, restarts)
			if alreadyRead && !canRetry {
				break
			}
			// The hook container has restarted; we need to notify that we are retrying in the logs.
			// TODO: Maybe log the container id
			if restarts != restartCount {
				wg.Add(1)
				restarts = restartCount
				fmt.Fprintf(e.out, "--> %s: Retrying hook pod (retry #%d)\n", label, restartCount)
			}
			alreadyRead = true
			go e.readPodLogs(pod, wg)

		case kapi.PodSucceeded, kapi.PodFailed:
			if completed {
				if updatedPod.Status.Phase == kapi.PodSucceeded {
					fmt.Fprintf(e.out, "--> %s: Hook pod already succeeded\n", label)
				}
				wg.Done()
				break waitLoop
			}
			if !created {
				fmt.Fprintf(e.out, "--> %s: Hook pod is already running ...\n", label)
			}
			if !alreadyRead {
				go e.readPodLogs(pod, wg)
			}
			break waitLoop
		default:
			completed = false
		}
	}
	// The pod is finished, wait for all logs to be consumed before returning.
	wg.Wait()
	if updatedPod.Status.Phase == kapi.PodFailed {
		fmt.Fprintf(e.out, "--> %s: Failed\n", label)
		return fmt.Errorf(updatedPod.Status.Message)
	}
	// Only show this message if we created the pod ourselves, or we saw
	// the pod in a running or pending state.
	if !completed {
		fmt.Fprintf(e.out, "--> %s: Success\n", label)
	}
	return nil
}

// readPodLogs streams logs from pod to out. It signals wg when
// done.
func (e *HookExecutor) readPodLogs(pod *kapi.Pod, wg *sync.WaitGroup) {
	defer wg.Done()
	opts := &kapi.PodLogOptions{
		Container:  HookContainerName,
		Follow:     true,
		Timestamps: false,
	}
	logStream, err := e.podLogStream(pod.Namespace, pod.Name, opts)
	if err != nil || logStream == nil {
		fmt.Fprintf(e.out, "warning: Unable to retrieve hook logs from %s: %v\n", pod.Name, err)
		return
	}
	// Read logs.
	defer logStream.Close()
	if _, err := io.Copy(e.out, logStream); err != nil {
		fmt.Fprintf(e.out, "\nwarning: Unable to read all logs from %s, continuing: %v\n", pod.Name, err)
	}
}

// makeHookPod makes a pod spec from a hook and deployment.
func makeHookPod(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, strategy *deployapi.DeploymentStrategy, suffix string) (*kapi.Pod, error) {
	exec := hook.ExecNewPod
	var baseContainer *kapi.Container
	for _, container := range deployment.Spec.Template.Spec.Containers {
		if container.Name == exec.ContainerName {
			baseContainer = &container
			break
		}
	}
	if baseContainer == nil {
		return nil, fmt.Errorf("no container named '%s' found in deployment template", exec.ContainerName)
	}

	// Build a merged environment; hook environment takes precedence over base
	// container environment
	envMap := map[string]string{}
	mergedEnv := []kapi.EnvVar{}
	for _, env := range baseContainer.Env {
		envMap[env.Name] = env.Value
	}
	for _, env := range exec.Env {
		envMap[env.Name] = env.Value
	}
	for k, v := range envMap {
		mergedEnv = append(mergedEnv, kapi.EnvVar{Name: k, Value: v})
	}
	mergedEnv = append(mergedEnv, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAME", Value: deployment.Name})
	mergedEnv = append(mergedEnv, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAMESPACE", Value: deployment.Namespace})

	// Inherit resources from the base container
	resources := kapi.ResourceRequirements{}
	if err := kapi.Scheme.Convert(&baseContainer.Resources, &resources); err != nil {
		return nil, fmt.Errorf("couldn't clone ResourceRequirements: %v", err)
	}

	// Assigning to a variable since its address is required
	maxDeploymentDurationSeconds := deployapi.MaxDeploymentDurationSeconds

	// Let the kubelet manage retries if requested
	restartPolicy := kapi.RestartPolicyNever
	if hook.FailurePolicy == deployapi.LifecycleHookFailurePolicyRetry {
		restartPolicy = kapi.RestartPolicyOnFailure
	}

	// Transfer any requested volumes to the hook pod.
	volumes := []kapi.Volume{}
	volumeNames := sets.NewString()
	for _, volume := range deployment.Spec.Template.Spec.Volumes {
		for _, name := range exec.Volumes {
			if volume.Name == name {
				volumes = append(volumes, volume)
				volumeNames.Insert(volume.Name)
			}
		}
	}
	// Transfer any volume mounts associated with transferred volumes.
	volumeMounts := []kapi.VolumeMount{}
	for _, mount := range baseContainer.VolumeMounts {
		if volumeNames.Has(mount.Name) {
			volumeMounts = append(volumeMounts, kapi.VolumeMount{
				Name:      mount.Name,
				ReadOnly:  mount.ReadOnly,
				MountPath: mount.MountPath,
			})
		}
	}

	// Transfer image pull secrets from the pod spec.
	imagePullSecrets := []kapi.LocalObjectReference{}
	for _, pullSecret := range deployment.Spec.Template.Spec.ImagePullSecrets {
		imagePullSecrets = append(imagePullSecrets, kapi.LocalObjectReference{Name: pullSecret.Name})
	}

	pod := &kapi.Pod{
		ObjectMeta: kapi.ObjectMeta{
			Name: namer.GetPodName(deployment.Name, suffix),
			Annotations: map[string]string{
				deployapi.DeploymentAnnotation: deployment.Name,
			},
			Labels: map[string]string{
				deployapi.DeploymentPodTypeLabel:        suffix,
				deployapi.DeployerPodForDeploymentLabel: deployment.Name,
			},
		},
		Spec: kapi.PodSpec{
			Containers: []kapi.Container{
				{
					Name:         HookContainerName,
					Image:        baseContainer.Image,
					Command:      exec.Command,
					WorkingDir:   baseContainer.WorkingDir,
					Env:          mergedEnv,
					Resources:    resources,
					VolumeMounts: volumeMounts,
				},
			},
			Volumes:               volumes,
			ActiveDeadlineSeconds: &maxDeploymentDurationSeconds,
			// Setting the node selector on the hook pod so that it is created
			// on the same set of nodes as the deployment pods.
			NodeSelector:     deployment.Spec.Template.Spec.NodeSelector,
			RestartPolicy:    restartPolicy,
			ImagePullSecrets: imagePullSecrets,
		},
	}

	util.MergeInto(pod.Labels, strategy.Labels, 0)
	util.MergeInto(pod.Annotations, strategy.Annotations, 0)

	return pod, nil
}

func canRetryReading(pod *kapi.Pod, restarts int32) (bool, int32) {
	if len(pod.Status.ContainerStatuses) == 0 {
		return false, int32(0)
	}
	restartCount := pod.Status.ContainerStatuses[0].RestartCount
	return pod.Spec.RestartPolicy == kapi.RestartPolicyOnFailure && restartCount > restarts, restartCount
}

// HookExecutorPodClient abstracts access to pods.
type HookExecutorPodClient interface {
	CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error)
	PodWatch(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod
}

// HookExecutorPodClientImpl is a pluggable HookExecutorPodClient.
type HookExecutorPodClientImpl struct {
	CreatePodFunc func(namespace string, pod *kapi.Pod) (*kapi.Pod, error)
	PodWatchFunc  func(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod
}

func (i *HookExecutorPodClientImpl) CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error) {
	return i.CreatePodFunc(namespace, pod)
}

func (i *HookExecutorPodClientImpl) PodWatch(namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
	return i.PodWatchFunc(namespace, name, resourceVersion, stopChannel)
}

// NewPodWatch creates a pod watching function which is backed by a
// FIFO/reflector pair. This avoids managing watches directly.
// A stop channel to close the watch's reflector is also returned.
// It is the caller's responsibility to defer closing the stop channel to prevent leaking resources.
func NewPodWatch(client kclient.PodsNamespacer, namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
	fieldSelector := fields.OneTermEqualSelector("metadata.name", name)
	podLW := &cache.ListWatch{
		ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
			opts := kapi.ListOptions{FieldSelector: fieldSelector}
			return client.Pods(namespace).List(opts)
		},
		WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
			opts := kapi.ListOptions{FieldSelector: fieldSelector, ResourceVersion: options.ResourceVersion}
			return client.Pods(namespace).Watch(opts)
		},
	}

	queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
	cache.NewReflector(podLW, &kapi.Pod{}, queue, 1*time.Minute).RunUntil(stopChannel)

	return func() *kapi.Pod {
		obj := cache.Pop(queue)
		return obj.(*kapi.Pod)
	}
}

// NewAcceptNewlyObservedReadyPods makes a new AcceptNewlyObservedReadyPods
// from a real client.
func NewAcceptNewlyObservedReadyPods(out io.Writer, kclient kclient.PodsNamespacer, timeout time.Duration, interval time.Duration) *AcceptNewlyObservedReadyPods {
	return &AcceptNewlyObservedReadyPods{
		out:          out,
		timeout:      timeout,
		interval:     interval,
		acceptedPods: sets.NewString(),
		getDeploymentPodStore: func(deployment *kapi.ReplicationController) (cache.Store, chan struct{}) {
			selector := labels.Set(deployment.Spec.Selector).AsSelector()
			store := cache.NewStore(cache.MetaNamespaceKeyFunc)
			lw := &cache.ListWatch{
				ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
					opts := kapi.ListOptions{LabelSelector: selector}
					return kclient.Pods(deployment.Namespace).List(opts)
				},
				WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
					opts := kapi.ListOptions{LabelSelector: selector, ResourceVersion: options.ResourceVersion}
					return kclient.Pods(deployment.Namespace).Watch(opts)
				},
			}
			stop := make(chan struct{})
			cache.NewReflector(lw, &kapi.Pod{}, store, 10*time.Second).RunUntil(stop)
			return store, stop
		},
	}
}

// AcceptNewlyObservedReadyPods is a kubectl.UpdateAcceptor which will accept
// a deployment if all the containers in all of the pods for the deployment
// are observed to be ready at least once.
//
// AcceptNewlyObservedReadyPods keeps track of the pods it has accepted for a
// deployment so that the acceptor can be reused across multiple batches of
// updates to a single controller. For example, if during the first acceptance
// call the deployment has 3 pods, the acceptor will validate those 3 pods. If
// the same acceptor instance is used again for the same deployment which now
// has 6 pods, only the latest 3 pods will be considered for acceptance. The
// status of the original 3 pods becomes irrelevant.
//
// Note that this struct is stateful and intended for use with a single
// deployment and should be discarded and recreated between deployments.
type AcceptNewlyObservedReadyPods struct {
	out io.Writer
	// getDeploymentPodStore should return a Store containing all the pods for
	// the named deployment, and a channel to stop whatever process is feeding
	// the store.
	getDeploymentPodStore func(deployment *kapi.ReplicationController) (cache.Store, chan struct{})
	// timeout is how long to wait for pod readiness.
	timeout time.Duration
	// interval is how often to check for pod readiness
	interval time.Duration
	// acceptedPods keeps track of pods which have been previously accepted for
	// a deployment.
	acceptedPods sets.String
}

// Accept implements UpdateAcceptor.
func (c *AcceptNewlyObservedReadyPods) Accept(deployment *kapi.ReplicationController) error {
	// Make a pod store to poll and ensure it gets cleaned up.
	podStore, stopStore := c.getDeploymentPodStore(deployment)
	defer close(stopStore)

	// Start checking for pod updates.
	if c.acceptedPods.Len() > 0 {
		fmt.Fprintf(c.out, "--> Waiting up to %s for pods in deployment %s to become ready (%d pods previously accepted)\n", c.timeout, deployment.Name, c.acceptedPods.Len())
	} else {
		fmt.Fprintf(c.out, "--> Waiting up to %s for pods in deployment %s to become ready\n", c.timeout, deployment.Name)
	}
	err := wait.Poll(c.interval, c.timeout, func() (done bool, err error) {
		// Check for pod readiness.
		unready := sets.NewString()
		for _, obj := range podStore.List() {
			pod := obj.(*kapi.Pod)
			// Skip previously accepted pods; we only want to verify newly observed
			// and unaccepted pods.
			if c.acceptedPods.Has(pod.Name) {
				continue
			}
			if kapi.IsPodReady(pod) {
				// If the pod is ready, track it as accepted.
				c.acceptedPods.Insert(pod.Name)
			} else {
				// Otherwise, track it as unready.
				unready.Insert(pod.Name)
			}
		}
		// Check to see if we're done.
		if unready.Len() == 0 {
			return true, nil
		}
		// Otherwise, try again later.
		glog.V(4).Infof("Still waiting for %d pods to become ready for deployment %s", unready.Len(), deployment.Name)
		return false, nil
	})

	// Handle acceptance failure.
	if err != nil {
		if err == wait.ErrWaitTimeout {
			return fmt.Errorf("pods for deployment %q took longer than %.f seconds to become ready", deployment.Name, c.timeout.Seconds())
		}
		return fmt.Errorf("pod readiness check failed for deployment %q: %v", deployment.Name, err)
	}
	return nil
}