package support
import (
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/golang/glog"
kapi "k8s.io/kubernetes/pkg/api"
kerrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
kdeployutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"github.com/openshift/origin/pkg/client"
deployapi "github.com/openshift/origin/pkg/deploy/api"
strategyutil "github.com/openshift/origin/pkg/deploy/strategy/util"
deployutil "github.com/openshift/origin/pkg/deploy/util"
imageapi "github.com/openshift/origin/pkg/image/api"
"github.com/openshift/origin/pkg/util"
namer "github.com/openshift/origin/pkg/util/namer"
)
// hookContainerName is the name used for the container that runs inside hook pods.
const hookContainerName = "lifecycle"
// HookExecutor knows how to execute a deployment lifecycle hook.
type HookExecutor interface {
Execute(hook *deployapi.LifecycleHook, rc *kapi.ReplicationController, suffix, label string) error
}
// hookExecutor implements the HookExecutor interface.
var _ HookExecutor = &hookExecutor{}
// hookExecutor executes a deployment lifecycle hook.
type hookExecutor struct {
// pods provides client to pods
pods kcoreclient.PodsGetter
// tags allows setting image stream tags
tags client.ImageStreamTagsNamespacer
// out is where hook pod logs should be written to.
out io.Writer
// decoder is used for encoding/decoding.
decoder runtime.Decoder
// recorder is used to emit events from hooks
events kcoreclient.EventsGetter
// getPodLogs knows how to get logs from a pod and is used for testing
getPodLogs func(*kapi.Pod) (io.ReadCloser, error)
}
// NewHookExecutor makes a HookExecutor from a client.
func NewHookExecutor(pods kcoreclient.PodsGetter, tags client.ImageStreamTagsNamespacer, events kcoreclient.EventsGetter, out io.Writer, decoder runtime.Decoder) HookExecutor {
executor := &hookExecutor{
tags: tags,
pods: pods,
events: events,
out: out,
decoder: decoder,
}
executor.getPodLogs = func(pod *kapi.Pod) (io.ReadCloser, error) {
opts := &kapi.PodLogOptions{
Container: hookContainerName,
Follow: true,
Timestamps: false,
}
return executor.pods.Pods(pod.Namespace).GetLogs(pod.Name, opts).Stream()
}
return executor
}
// Execute executes hook in the context of deployment. The suffix is used to
// distinguish the kind of hook (e.g. pre, post).
func (e *hookExecutor) Execute(hook *deployapi.LifecycleHook, rc *kapi.ReplicationController, suffix, label string) error {
var err error
switch {
case len(hook.TagImages) > 0:
tagEventMessages := []string{}
for _, t := range hook.TagImages {
image, ok := findContainerImage(rc, t.ContainerName)
if ok {
tagEventMessages = append(tagEventMessages, fmt.Sprintf("image %q as %q", image, t.To.Name))
}
}
strategyutil.RecordConfigEvent(e.events, rc, e.decoder, kapi.EventTypeNormal, "Started",
fmt.Sprintf("Running %s-hook (TagImages) %s for rc %s/%s", label, strings.Join(tagEventMessages, ","), rc.Namespace, rc.Name))
err = e.tagImages(hook, rc, suffix, label)
case hook.ExecNewPod != nil:
strategyutil.RecordConfigEvent(e.events, rc, e.decoder, kapi.EventTypeNormal, "Started",
fmt.Sprintf("Running %s-hook (%q) for rc %s/%s", label, strings.Join(hook.ExecNewPod.Command, " "), rc.Namespace, rc.Name))
err = e.executeExecNewPod(hook, rc, suffix, label)
}
if err == nil {
strategyutil.RecordConfigEvent(e.events, rc, e.decoder, kapi.EventTypeNormal, "Completed",
fmt.Sprintf("The %s-hook for rc %s/%s completed successfully", label, rc.Namespace, rc.Name))
return nil
}
// Retry failures are treated the same as Abort.
switch hook.FailurePolicy {
case deployapi.LifecycleHookFailurePolicyAbort, deployapi.LifecycleHookFailurePolicyRetry:
strategyutil.RecordConfigEvent(e.events, rc, e.decoder, kapi.EventTypeWarning, "Failed",
fmt.Sprintf("The %s-hook failed: %v, aborting rollout of %s/%s", label, err, rc.Namespace, rc.Name))
return fmt.Errorf("the %s hook failed: %v, aborting rollout of %s/%s", label, err, rc.Namespace, rc.Name)
case deployapi.LifecycleHookFailurePolicyIgnore:
strategyutil.RecordConfigEvent(e.events, rc, e.decoder, kapi.EventTypeWarning, "Failed",
fmt.Sprintf("The %s-hook failed: %v (ignore), rollout of %s/%s will continue", label, err, rc.Namespace, rc.Name))
return nil
default:
return err
}
}
// findContainerImage returns the image with the given container name from a replication controller.
func findContainerImage(rc *kapi.ReplicationController, containerName string) (string, bool) {
if rc.Spec.Template == nil {
return "", false
}
for _, container := range rc.Spec.Template.Spec.Containers {
if container.Name == containerName {
return container.Image, true
}
}
return "", false
}
// tagImages tags images as part of the lifecycle of a rc. It uses an ImageStreamTag client
// which will provision an ImageStream if it doesn't already exist.
func (e *hookExecutor) tagImages(hook *deployapi.LifecycleHook, rc *kapi.ReplicationController, suffix, label string) error {
var errs []error
for _, action := range hook.TagImages {
value, ok := findContainerImage(rc, action.ContainerName)
if !ok {
errs = append(errs, fmt.Errorf("unable to find image for container %q, container could not be found", action.ContainerName))
continue
}
namespace := action.To.Namespace
if len(namespace) == 0 {
namespace = rc.Namespace
}
if _, err := e.tags.ImageStreamTags(namespace).Update(&imageapi.ImageStreamTag{
ObjectMeta: kapi.ObjectMeta{
Name: action.To.Name,
Namespace: namespace,
},
Tag: &imageapi.TagReference{
From: &kapi.ObjectReference{
Kind: "DockerImage",
Name: value,
},
},
}); err != nil {
errs = append(errs, err)
continue
}
fmt.Fprintf(e.out, "--> %s: Tagged %q into %s/%s\n", label, value, action.To.Namespace, action.To.Name)
}
return utilerrors.NewAggregate(errs)
}
// executeExecNewPod executes a ExecNewPod hook by creating a new pod based on
// the hook parameters and replication controller. The pod is then synchronously
// watched until the pod completes, and if the pod failed, an error is returned.
//
// The hook pod inherits the following from the container the hook refers to:
//
// * Environment (hook keys take precedence)
// * Working directory
// * Resources
func (e *hookExecutor) executeExecNewPod(hook *deployapi.LifecycleHook, rc *kapi.ReplicationController, suffix, label string) error {
config, err := deployutil.DecodeDeploymentConfig(rc, e.decoder)
if err != nil {
return err
}
deployerPod, err := e.pods.Pods(rc.Namespace).Get(deployutil.DeployerPodNameForDeployment(rc.Name))
if err != nil {
return err
}
// Build a pod spec from the hook config and replication controller.
podSpec, err := makeHookPod(hook, rc, deployerPod, &config.Spec.Strategy, suffix)
if err != nil {
return err
}
// Track whether the pod has already run to completion and avoid showing logs
// or the Success message twice.
completed, created := false, false
// Try to create the pod.
pod, err := e.pods.Pods(rc.Namespace).Create(podSpec)
if err != nil {
if !kerrors.IsAlreadyExists(err) {
return fmt.Errorf("couldn't create lifecycle pod for %s: %v", rc.Name, err)
}
completed = true
pod = podSpec
pod.Namespace = rc.Namespace
} else {
created = true
fmt.Fprintf(e.out, "--> %s: Running hook pod ...\n", label)
}
stopChannel := make(chan struct{})
defer close(stopChannel)
nextPod := newPodWatch(e.pods.Pods(pod.Namespace), pod.Namespace, pod.Name, pod.ResourceVersion, stopChannel)
// Wait for the hook pod to reach a terminal phase. Start reading logs as
// soon as the pod enters a usable phase.
var updatedPod *kapi.Pod
wg := &sync.WaitGroup{}
wg.Add(1)
restarts := int32(0)
alreadyRead := false
waitLoop:
for {
updatedPod = nextPod()
switch updatedPod.Status.Phase {
case kapi.PodRunning:
completed = false
// We should read only the first time or in any container restart when we want to retry.
canRetry, restartCount := canRetryReading(updatedPod, restarts)
if alreadyRead && !canRetry {
break
}
// The hook container has restarted; we need to notify that we are retrying in the logs.
// TODO: Maybe log the container id
if restarts != restartCount {
wg.Add(1)
restarts = restartCount
fmt.Fprintf(e.out, "--> %s: Retrying hook pod (retry #%d)\n", label, restartCount)
}
alreadyRead = true
go e.readPodLogs(pod, wg)
case kapi.PodSucceeded, kapi.PodFailed:
if completed {
if updatedPod.Status.Phase == kapi.PodSucceeded {
fmt.Fprintf(e.out, "--> %s: Hook pod already succeeded\n", label)
}
wg.Done()
break waitLoop
}
if !created {
fmt.Fprintf(e.out, "--> %s: Hook pod is already running ...\n", label)
}
if !alreadyRead {
go e.readPodLogs(pod, wg)
}
break waitLoop
default:
completed = false
}
}
// The pod is finished, wait for all logs to be consumed before returning.
wg.Wait()
if updatedPod.Status.Phase == kapi.PodFailed {
fmt.Fprintf(e.out, "--> %s: Failed\n", label)
return fmt.Errorf(updatedPod.Status.Message)
}
// Only show this message if we created the pod ourselves, or we saw
// the pod in a running or pending state.
if !completed {
fmt.Fprintf(e.out, "--> %s: Success\n", label)
}
return nil
}
// readPodLogs streams logs from pod to out. It signals wg when
// done.
func (e *hookExecutor) readPodLogs(pod *kapi.Pod, wg *sync.WaitGroup) {
defer wg.Done()
logStream, err := e.getPodLogs(pod)
if err != nil || logStream == nil {
fmt.Fprintf(e.out, "warning: Unable to retrieve hook logs from %s: %v\n", pod.Name, err)
return
}
// Read logs.
defer logStream.Close()
if _, err := io.Copy(e.out, logStream); err != nil {
fmt.Fprintf(e.out, "\nwarning: Unable to read all logs from %s, continuing: %v\n", pod.Name, err)
}
}
// makeHookPod makes a pod spec from a hook and replication controller.
func makeHookPod(hook *deployapi.LifecycleHook, rc *kapi.ReplicationController, deployerPod *kapi.Pod, strategy *deployapi.DeploymentStrategy, suffix string) (*kapi.Pod, error) {
exec := hook.ExecNewPod
var baseContainer *kapi.Container
for _, container := range rc.Spec.Template.Spec.Containers {
if container.Name == exec.ContainerName {
baseContainer = &container
break
}
}
if baseContainer == nil {
return nil, fmt.Errorf("no container named '%s' found in rc template", exec.ContainerName)
}
// Build a merged environment; hook environment takes precedence over base
// container environment
envMap := map[string]kapi.EnvVar{}
mergedEnv := []kapi.EnvVar{}
for _, env := range baseContainer.Env {
envMap[env.Name] = env
}
for _, env := range exec.Env {
envMap[env.Name] = env
}
for k, v := range envMap {
mergedEnv = append(mergedEnv, kapi.EnvVar{Name: k, Value: v.Value, ValueFrom: v.ValueFrom})
}
mergedEnv = append(mergedEnv, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAME", Value: rc.Name})
mergedEnv = append(mergedEnv, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAMESPACE", Value: rc.Namespace})
// Inherit resources from the base container
resources := kapi.ResourceRequirements{}
if err := kapi.Scheme.Convert(&baseContainer.Resources, &resources, nil); err != nil {
return nil, fmt.Errorf("couldn't clone ResourceRequirements: %v", err)
}
// Assigning to a variable since its address is required
defaultActiveDeadline := deployapi.MaxDeploymentDurationSeconds
if strategy.ActiveDeadlineSeconds != nil {
defaultActiveDeadline = *(strategy.ActiveDeadlineSeconds)
}
maxDeploymentDurationSeconds := defaultActiveDeadline - int64(time.Since(deployerPod.Status.StartTime.Time).Seconds())
// Let the kubelet manage retries if requested
restartPolicy := kapi.RestartPolicyNever
if hook.FailurePolicy == deployapi.LifecycleHookFailurePolicyRetry {
restartPolicy = kapi.RestartPolicyOnFailure
}
// Transfer any requested volumes to the hook pod.
volumes := []kapi.Volume{}
volumeNames := sets.NewString()
for _, volume := range rc.Spec.Template.Spec.Volumes {
for _, name := range exec.Volumes {
if volume.Name == name {
volumes = append(volumes, volume)
volumeNames.Insert(volume.Name)
}
}
}
// Transfer any volume mounts associated with transferred volumes.
volumeMounts := []kapi.VolumeMount{}
for _, mount := range baseContainer.VolumeMounts {
if volumeNames.Has(mount.Name) {
volumeMounts = append(volumeMounts, kapi.VolumeMount{
Name: mount.Name,
ReadOnly: mount.ReadOnly,
MountPath: mount.MountPath,
})
}
}
// Transfer image pull secrets from the pod spec.
imagePullSecrets := []kapi.LocalObjectReference{}
for _, pullSecret := range rc.Spec.Template.Spec.ImagePullSecrets {
imagePullSecrets = append(imagePullSecrets, kapi.LocalObjectReference{Name: pullSecret.Name})
}
gracePeriod := int64(10)
pod := &kapi.Pod{
ObjectMeta: kapi.ObjectMeta{
Name: namer.GetPodName(rc.Name, suffix),
Annotations: map[string]string{
deployapi.DeploymentAnnotation: rc.Name,
},
Labels: map[string]string{
deployapi.DeploymentPodTypeLabel: suffix,
deployapi.DeployerPodForDeploymentLabel: rc.Name,
},
},
Spec: kapi.PodSpec{
Containers: []kapi.Container{
{
Name: hookContainerName,
Image: baseContainer.Image,
ImagePullPolicy: baseContainer.ImagePullPolicy,
Command: exec.Command,
WorkingDir: baseContainer.WorkingDir,
Env: mergedEnv,
Resources: resources,
VolumeMounts: volumeMounts,
},
},
Volumes: volumes,
ActiveDeadlineSeconds: &maxDeploymentDurationSeconds,
// Setting the node selector on the hook pod so that it is created
// on the same set of nodes as the rc pods.
NodeSelector: rc.Spec.Template.Spec.NodeSelector,
RestartPolicy: restartPolicy,
ImagePullSecrets: imagePullSecrets,
TerminationGracePeriodSeconds: &gracePeriod,
},
}
util.MergeInto(pod.Labels, strategy.Labels, 0)
util.MergeInto(pod.Annotations, strategy.Annotations, 0)
return pod, nil
}
// canRetryReading returns whether the deployment strategy can retry reading logs
// from the given (hook) pod and the number of restarts that pod has.
func canRetryReading(pod *kapi.Pod, restarts int32) (bool, int32) {
if len(pod.Status.ContainerStatuses) == 0 {
return false, int32(0)
}
restartCount := pod.Status.ContainerStatuses[0].RestartCount
return pod.Spec.RestartPolicy == kapi.RestartPolicyOnFailure && restartCount > restarts, restartCount
}
// newPodWatch creates a pod watching function which is backed by a
// FIFO/reflector pair. This avoids managing watches directly.
// A stop channel to close the watch's reflector is also returned.
// It is the caller's responsibility to defer closing the stop channel to prevent leaking resources.
func newPodWatch(client kcoreclient.PodInterface, namespace, name, resourceVersion string, stopChannel chan struct{}) func() *kapi.Pod {
fieldSelector := fields.OneTermEqualSelector("metadata.name", name)
podLW := &cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return client.List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return client.Watch(options)
},
}
queue := cache.NewResyncableFIFO(cache.MetaNamespaceKeyFunc)
cache.NewReflector(podLW, &kapi.Pod{}, queue, 1*time.Minute).RunUntil(stopChannel)
return func() *kapi.Pod {
obj := cache.Pop(queue)
return obj.(*kapi.Pod)
}
}
// NewAcceptAvailablePods makes a new acceptAvailablePods from a real client.
func NewAcceptAvailablePods(
out io.Writer,
kclient kcoreclient.PodsGetter,
timeout time.Duration,
interval time.Duration,
minReadySeconds int32,
) *acceptAvailablePods {
return &acceptAvailablePods{
out: out,
timeout: timeout,
interval: interval,
minReadySeconds: minReadySeconds,
acceptedPods: sets.NewString(),
getRcPodStore: func(rc *kapi.ReplicationController) (cache.Store, chan struct{}) {
selector := labels.Set(rc.Spec.Selector).AsSelector()
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
lw := &cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
options.LabelSelector = selector
return kclient.Pods(rc.Namespace).List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
options.LabelSelector = selector
return kclient.Pods(rc.Namespace).Watch(options)
},
}
stop := make(chan struct{})
cache.NewReflector(lw, &kapi.Pod{}, store, 10*time.Second).RunUntil(stop)
return store, stop
},
}
}
// acceptAvailablePods will accept a replication controller if all the pods
// for the replication controller become available.
//
// acceptAvailablePods keeps track of the pods it has accepted for a
// replication controller so that the acceptor can be reused across multiple
// batches of updates to a single controller. For example, if during the first
// acceptance call the replication controller has 3 pods, the acceptor will
// validate those 3 pods. If the same acceptor instance is used again for the
// same replication controller which now has 6 pods, only the latest 3 pods
// will be considered for acceptance. The status of the original 3 pods becomes
// irrelevant.
//
// Note that this struct is stateful and intended for use with a single
// replication controller and should be discarded and recreated between
// rollouts.
type acceptAvailablePods struct {
out io.Writer
// getRcPodStore should return a Store containing all the pods for the
// replication controller, and a channel to stop whatever process is
// feeding the store.
getRcPodStore func(*kapi.ReplicationController) (cache.Store, chan struct{})
// timeout is how long to wait for pod readiness.
timeout time.Duration
// interval is how often to check for pod readiness
interval time.Duration
// minReadySeconds is the minimum number of seconds for which a newly created
// pod should be ready without any of its container crashing, for it to be
// considered available.
minReadySeconds int32
// acceptedPods keeps track of pods which have been previously accepted for
// a replication controller.
acceptedPods sets.String
}
// Accept all pods for a replication controller once they are available.
func (c *acceptAvailablePods) Accept(rc *kapi.ReplicationController) error {
// Make a pod store to poll and ensure it gets cleaned up.
podStore, stopStore := c.getRcPodStore(rc)
defer close(stopStore)
// Start checking for pod updates.
if c.acceptedPods.Len() > 0 {
fmt.Fprintf(c.out, "--> Waiting up to %s for pods in rc %s to become ready (%d pods previously accepted)\n", c.timeout, rc.Name, c.acceptedPods.Len())
} else {
fmt.Fprintf(c.out, "--> Waiting up to %s for pods in rc %s to become ready\n", c.timeout, rc.Name)
}
err := wait.Poll(c.interval, c.timeout, func() (done bool, err error) {
// Check for pod readiness.
unready := sets.NewString()
for _, obj := range podStore.List() {
pod := obj.(*kapi.Pod)
// Skip previously accepted pods; we only want to verify newly observed
// and unaccepted pods.
if c.acceptedPods.Has(pod.Name) {
continue
}
if kdeployutil.IsPodAvailable(pod, c.minReadySeconds, time.Now()) {
// If the pod is ready, track it as accepted.
c.acceptedPods.Insert(pod.Name)
} else {
// Otherwise, track it as unready.
unready.Insert(pod.Name)
}
}
// Check to see if we're done.
if unready.Len() == 0 {
return true, nil
}
// Otherwise, try again later.
glog.V(4).Infof("Still waiting for %d pods to become ready for rc %s", unready.Len(), rc.Name)
return false, nil
})
// Handle acceptance failure.
if err != nil {
if err == wait.ErrWaitTimeout {
return fmt.Errorf("pods for rc %q took longer than %.f seconds to become ready", rc.Name, c.timeout.Seconds())
}
return fmt.Errorf("pod readiness check failed for rc %q: %v", rc.Name, err)
}
return nil
}