package controller import ( "errors" "github.com/golang/glog" kapi "k8s.io/kubernetes/pkg/api" apierrs "k8s.io/kubernetes/pkg/api/errors" "github.com/openshift/origin/pkg/client" "github.com/openshift/origin/pkg/image/api" ) var ErrNotImportable = errors.New("the specified stream cannot be imported") type ImportController struct { streams client.ImageStreamsNamespacer } // Notifier provides information about when the controller makes a decision type Notifier interface { // Importing is invoked when the controller is going to import an image stream Importing(stream *api.ImageStream) } // NotifierFunc implements Notifier type NotifierFunc func(stream *api.ImageStream) // Importing adapts NotifierFunc to Notifier func (fn NotifierFunc) Importing(stream *api.ImageStream) { fn(stream) } // tagImportable is true if the given TagReference is importable by this controller func tagImportable(tagRef api.TagReference) bool { if tagRef.From == nil { return false } if tagRef.From.Kind != "DockerImage" || tagRef.Reference { return false } return true } // tagNeedsImport is true if the observed tag generation for this tag is older than the // specified tag generation (if no tag generation is specified, the controller does not // need to import this tag). func tagNeedsImport(stream *api.ImageStream, tag string, tagRef api.TagReference, importWhenGenerationNil bool) bool { if !tagImportable(tagRef) { return false } if tagRef.Generation == nil { return importWhenGenerationNil } return *tagRef.Generation > api.LatestObservedTagGeneration(stream, tag) } // needsImport returns true if the provided image stream should have tags imported. Partial is returned // as true if the spec.dockerImageRepository does not need to be refreshed (if only tags have to be imported). func needsImport(stream *api.ImageStream) (ok bool, partial bool) { if stream.Annotations == nil || len(stream.Annotations[api.DockerImageRepositoryCheckAnnotation]) == 0 { if len(stream.Spec.DockerImageRepository) > 0 { return true, false } // for backwards compatibility, if any of our tags are importable, trigger a partial import when the // annotation is cleared. for _, tagRef := range stream.Spec.Tags { if tagImportable(tagRef) { return true, true } } } // find any tags with a newer generation than their status for tag, tagRef := range stream.Spec.Tags { if tagNeedsImport(stream, tag, tagRef, false) { return true, true } } return false, false } // needsScheduling returns true if this image stream has any scheduled tags func needsScheduling(stream *api.ImageStream) bool { for _, tagRef := range stream.Spec.Tags { if tagImportable(tagRef) && tagRef.ImportPolicy.Scheduled { return true } } return false } // resetScheduledTags artificially increments the generation on the tags that should be imported. func resetScheduledTags(stream *api.ImageStream) { next := stream.Generation + 1 for tag, tagRef := range stream.Spec.Tags { if tagImportable(tagRef) && tagRef.ImportPolicy.Scheduled { tagRef.Generation = &next stream.Spec.Tags[tag] = tagRef } } } // Next processes the given image stream, looking for streams that have DockerImageRepository // set but have not yet been marked as "ready". If transient errors occur, err is returned but // the image stream is not modified (so it will be tried again later). If a permanent // failure occurs the image is marked with an annotation and conditions are set on the status // tags. The tags of the original spec image are left as is (those are updated through status). // // There are 3 scenarios: // // 1. spec.DockerImageRepository defined without any tags results in all tags being imported // from upstream image repository // // 2. spec.DockerImageRepository + tags defined - import all tags from upstream image repository, // and all the specified which (if name matches) will overwrite the default ones. // Additionally: // for kind == DockerImage import or reference underlying image, exact tag (not provided means latest), // for kind != DockerImage reference tag from the same or other ImageStream // // 3. spec.DockerImageRepository not defined - import tags per each definition. // // Notifier, if passed, will be invoked if the stream is going to be imported. func (c *ImportController) Next(stream *api.ImageStream, notifier Notifier) error { ok, partial := needsImport(stream) if !ok { return nil } glog.V(3).Infof("Importing stream %s/%s partial=%t...", stream.Namespace, stream.Name, partial) if notifier != nil { notifier.Importing(stream) } isi := &api.ImageStreamImport{ ObjectMeta: kapi.ObjectMeta{ Name: stream.Name, Namespace: stream.Namespace, ResourceVersion: stream.ResourceVersion, UID: stream.UID, }, Spec: api.ImageStreamImportSpec{Import: true}, } for tag, tagRef := range stream.Spec.Tags { if !(partial && tagImportable(tagRef)) && !tagNeedsImport(stream, tag, tagRef, true) { continue } isi.Spec.Images = append(isi.Spec.Images, api.ImageImportSpec{ From: kapi.ObjectReference{Kind: "DockerImage", Name: tagRef.From.Name}, To: &kapi.LocalObjectReference{Name: tag}, ImportPolicy: tagRef.ImportPolicy, }) } if repo := stream.Spec.DockerImageRepository; !partial && len(repo) > 0 { insecure := stream.Annotations[api.InsecureRepositoryAnnotation] == "true" isi.Spec.Repository = &api.RepositoryImportSpec{ From: kapi.ObjectReference{Kind: "DockerImage", Name: repo}, ImportPolicy: api.TagImportPolicy{Insecure: insecure}, } } result, err := c.streams.ImageStreams(stream.Namespace).Import(isi) if err != nil { if apierrs.IsNotFound(err) && client.IsStatusErrorKind(err, "imageStream") { return ErrNotImportable } glog.V(4).Infof("Import stream %s/%s partial=%t error: %v", stream.Namespace, stream.Name, partial, err) } else { glog.V(5).Infof("Import stream %s/%s partial=%t import: %#v", stream.Namespace, stream.Name, partial, result.Status.Import) } return err } func (c *ImportController) NextTimedByName(namespace, name string) error { stream, err := c.streams.ImageStreams(namespace).Get(name) if err != nil { if apierrs.IsNotFound(err) { return ErrNotImportable } return err } return c.NextTimed(stream) } func (c *ImportController) NextTimed(stream *api.ImageStream) error { if !needsScheduling(stream) { return ErrNotImportable } resetScheduledTags(stream) glog.V(3).Infof("Scheduled import of stream %s/%s...", stream.Namespace, stream.Name) return c.Next(stream, nil) }