package importer import ( "encoding/json" "fmt" "net/http" "net/url" "path" "strings" "time" "github.com/golang/glog" gocontext "golang.org/x/net/context" "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest/schema1" "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/api/errcode" "github.com/docker/distribution/registry/api/v2" registryclient "github.com/docker/distribution/registry/client" "github.com/docker/distribution/registry/client/auth" "github.com/docker/distribution/registry/client/transport" kapi "k8s.io/kubernetes/pkg/api" kapierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/validation/field" "github.com/openshift/origin/pkg/dockerregistry" "github.com/openshift/origin/pkg/image/api" "github.com/openshift/origin/pkg/image/api/dockerpre012" ) // Add a dockerregistry.Client to the passed context with this key to support v1 Docker registry importing const ContextKeyV1RegistryClient = "v1-registry-client" // Interface loads images into an image stream import request. type Interface interface { Import(ctx gocontext.Context, isi *api.ImageStreamImport) error } // RepositoryRetriever fetches a Docker distribution.Repository. type RepositoryRetriever interface { // Repository returns a properly authenticated distribution.Repository for the given registry, repository // name, and insecure toleration behavior. Repository(ctx gocontext.Context, registry *url.URL, repoName string, insecure bool) (distribution.Repository, error) } // ErrNotV2Registry is returned when the server does not report itself as a V2 Docker registry type ErrNotV2Registry struct { Registry string } func (e *ErrNotV2Registry) Error() string { return fmt.Sprintf("endpoint %q does not support v2 API", e.Registry) } // ImageStreamImport implements an import strategy for Docker images. It keeps a cache of images // per distinct auth context to reduce duplicate loads. This type is not thread safe. type ImageStreamImporter struct { maximumTagsPerRepo int retriever RepositoryRetriever limiter util.RateLimiter digestToRepositoryCache map[gocontext.Context]map[manifestKey]*api.Image } // NewImageStreamImport creates an importer that will load images from a remote Docker registry into an // ImageStreamImport object. Limiter may be nil. func NewImageStreamImporter(retriever RepositoryRetriever, maximumTagsPerRepo int, limiter util.RateLimiter) *ImageStreamImporter { if limiter == nil { limiter = util.NewFakeAlwaysRateLimiter() } return &ImageStreamImporter{ maximumTagsPerRepo: maximumTagsPerRepo, retriever: retriever, limiter: limiter, digestToRepositoryCache: make(map[gocontext.Context]map[manifestKey]*api.Image), } } // contextImageCache returns the image cache entry for a context. func (i *ImageStreamImporter) contextImageCache(ctx gocontext.Context) map[manifestKey]*api.Image { cache := i.digestToRepositoryCache[ctx] if cache == nil { cache = make(map[manifestKey]*api.Image) i.digestToRepositoryCache[ctx] = cache } return cache } // Import tries to complete the provided isi object with images loaded from remote registries. func (i *ImageStreamImporter) Import(ctx gocontext.Context, isi *api.ImageStreamImport) error { cache := i.contextImageCache(ctx) importImages(ctx, i.retriever, isi, cache, i.limiter) importFromRepository(ctx, i.retriever, isi, i.maximumTagsPerRepo, cache, i.limiter) return nil } // importImages updates the passed ImageStreamImport object and sets Status for each image based on whether the import // succeeded or failed. Cache is updated with any loaded images. Limiter is optional and controls how fast images are updated. func importImages(ctx gocontext.Context, retriever RepositoryRetriever, isi *api.ImageStreamImport, cache map[manifestKey]*api.Image, limiter util.RateLimiter) { tags := make(map[manifestKey][]int) ids := make(map[manifestKey][]int) repositories := make(map[repositoryKey]*importRepository) isi.Status.Images = make([]api.ImageImportStatus, len(isi.Spec.Images)) for i := range isi.Spec.Images { spec := &isi.Spec.Images[i] from := spec.From if from.Kind != "DockerImage" { continue } ref, err := api.ParseDockerImageReference(from.Name) if err != nil { isi.Status.Images[i].Status = invalidStatus("", field.Invalid(field.NewPath("from", "name"), from.Name, fmt.Sprintf("invalid name: %v", err))) continue } defaultRef := ref.DockerClientDefaults() repoName := defaultRef.RepositoryName() registryURL := defaultRef.RegistryURL() key := repositoryKey{url: *registryURL, name: repoName} repo, ok := repositories[key] if !ok { repo = &importRepository{ Ref: ref, Registry: &key.url, Name: key.name, Insecure: spec.ImportPolicy.Insecure, } repositories[key] = repo } if len(defaultRef.ID) > 0 { id := manifestKey{repositoryKey: key} id.value = defaultRef.ID ids[id] = append(ids[id], i) if len(ids[id]) == 1 { repo.Digests = append(repo.Digests, importDigest{ Name: defaultRef.ID, Image: cache[id], }) } } else { tag := manifestKey{repositoryKey: key} tag.value = defaultRef.Tag tags[tag] = append(tags[tag], i) if len(tags[tag]) == 1 { repo.Tags = append(repo.Tags, importTag{ Name: defaultRef.Tag, Image: cache[tag], }) } } } // for each repository we found, import all tags and digests for key, repo := range repositories { importRepositoryFromDocker(ctx, retriever, repo, limiter) for _, tag := range repo.Tags { j := manifestKey{repositoryKey: key} j.value = tag.Name if tag.Image != nil { cache[j] = tag.Image } for _, index := range tags[j] { if tag.Err != nil { setImageImportStatus(isi, index, tag.Name, tag.Err) continue } copied := *tag.Image image := &isi.Status.Images[index] ref := repo.Ref ref.Tag, ref.ID = tag.Name, copied.Name copied.DockerImageReference = ref.MostSpecific().Exact() image.Tag = tag.Name image.Image = &copied image.Status.Status = unversioned.StatusSuccess } } for _, digest := range repo.Digests { j := manifestKey{repositoryKey: key} j.value = digest.Name if digest.Image != nil { cache[j] = digest.Image } for _, index := range ids[j] { if digest.Err != nil { setImageImportStatus(isi, index, "", digest.Err) continue } image := &isi.Status.Images[index] copied := *digest.Image ref := repo.Ref ref.Tag, ref.ID = "", copied.Name copied.DockerImageReference = ref.MostSpecific().Exact() image.Image = &copied image.Status.Status = unversioned.StatusSuccess } } } } // importFromRepository imports the repository named on the ImageStreamImport, if any, importing up to maximumTags, and reporting // status on each image that is attempted to be imported. If the repository cannot be found or tags cannot be retrieved, the repository // status field is set. func importFromRepository(ctx gocontext.Context, retriever RepositoryRetriever, isi *api.ImageStreamImport, maximumTags int, cache map[manifestKey]*api.Image, limiter util.RateLimiter) { if isi.Spec.Repository == nil { return } isi.Status.Repository = &api.RepositoryImportStatus{} status := isi.Status.Repository spec := isi.Spec.Repository from := spec.From if from.Kind != "DockerImage" { return } ref, err := api.ParseDockerImageReference(from.Name) if err != nil { status.Status = invalidStatus("", field.Invalid(field.NewPath("from", "name"), from.Name, fmt.Sprintf("invalid name: %v", err))) return } defaultRef := ref.DockerClientDefaults() repoName := defaultRef.RepositoryName() registryURL := defaultRef.RegistryURL() key := repositoryKey{url: *registryURL, name: repoName} repo := &importRepository{ Ref: ref, Registry: &key.url, Name: key.name, Insecure: spec.ImportPolicy.Insecure, MaximumTags: maximumTags, } importRepositoryFromDocker(ctx, retriever, repo, limiter) if repo.Err != nil { status.Status = imageImportStatus(repo.Err, "", "repository") return } additional := []string{} tagKey := manifestKey{repositoryKey: key} for _, s := range repo.AdditionalTags { tagKey.value = s if image, ok := cache[tagKey]; ok { repo.Tags = append(repo.Tags, importTag{ Name: s, Image: image, }) } else { additional = append(additional, s) } } status.AdditionalTags = additional failures := 0 status.Status.Status = unversioned.StatusSuccess status.Images = make([]api.ImageImportStatus, len(repo.Tags)) for i, tag := range repo.Tags { status.Images[i].Tag = tag.Name if tag.Err != nil { failures++ status.Images[i].Status = imageImportStatus(tag.Err, "", "repository") continue } status.Images[i].Status.Status = unversioned.StatusSuccess copied := *tag.Image ref.Tag, ref.ID = tag.Name, copied.Name copied.DockerImageReference = ref.MostSpecific().Exact() status.Images[i].Image = &copied } if failures > 0 { status.Status.Status = unversioned.StatusFailure status.Status.Reason = unversioned.StatusReason("ImportFailed") switch failures { case 1: status.Status.Message = "one of the images from this repository failed to import" default: status.Status.Message = fmt.Sprintf("%d of the images from this repository failed to import", failures) } } } func applyErrorToRepository(repository *importRepository, err error) { repository.Err = err for i := range repository.Tags { repository.Tags[i].Err = err } for i := range repository.Digests { repository.Digests[i].Err = err } } // importRepositoryFromDocker loads the tags and images requested in the passed importRepository, obeying the // optional rate limiter. Errors are set onto the individual tags and digest objects. func importRepositoryFromDocker(ctx gocontext.Context, retriever RepositoryRetriever, repository *importRepository, limiter util.RateLimiter) { glog.V(5).Infof("importing remote Docker repository registry=%s repository=%s insecure=%t", repository.Registry, repository.Name, repository.Insecure) // retrieve the repository repo, err := retriever.Repository(ctx, repository.Registry, repository.Name, repository.Insecure) if err != nil { glog.V(5).Infof("unable to access repository %#v: %#v", repository, err) switch { case err == reference.ErrReferenceInvalidFormat: err = field.Invalid(field.NewPath("from", "name"), repository.Name, "the provided repository name is not valid") case isDockerError(err, v2.ErrorCodeNameUnknown): err = kapierrors.NewNotFound(api.Resource("dockerimage"), repository.Ref.Exact()) case isDockerError(err, errcode.ErrorCodeUnauthorized): err = kapierrors.NewUnauthorized(fmt.Sprintf("you may not have access to the Docker image %q", repository.Ref.Exact())) case strings.Contains(err.Error(), "tls: oversized record received with length") && !repository.Insecure: err = kapierrors.NewBadRequest("this repository is HTTP only and requires the insecure flag to import") case strings.HasSuffix(err.Error(), "no basic auth credentials"): err = kapierrors.NewUnauthorized(fmt.Sprintf("you may not have access to the Docker image %q and did not have credentials to the repository", repository.Ref.Exact())) case strings.HasSuffix(err.Error(), "does not support v2 API"): importRepositoryFromDockerV1(ctx, repository, limiter) return } applyErrorToRepository(repository, err) return } // get a manifest context s, err := repo.Manifests(ctx) if err != nil { glog.V(5).Infof("unable to access manifests for repository %#v: %#v", repository, err) switch { case isDockerError(err, v2.ErrorCodeNameUnknown): err = kapierrors.NewNotFound(api.Resource("dockerimage"), repository.Ref.Exact()) case isDockerError(err, errcode.ErrorCodeUnauthorized): err = kapierrors.NewUnauthorized(fmt.Sprintf("you may not have access to the Docker image %q", repository.Ref.Exact())) case strings.HasSuffix(err.Error(), "no basic auth credentials"): err = kapierrors.NewUnauthorized(fmt.Sprintf("you may not have access to the Docker image %q and did not have credentials to the repository", repository.Ref.Exact())) } applyErrorToRepository(repository, err) return } // if repository import is requested (MaximumTags), attempt to load the tags, sort them, and request the first N if count := repository.MaximumTags; count > 0 || count == -1 { tags, err := s.Tags() if err != nil { glog.V(5).Infof("unable to access tags for repository %#v: %#v", repository, err) switch { case isDockerError(err, v2.ErrorCodeNameUnknown): err = kapierrors.NewNotFound(api.Resource("dockerimage"), repository.Ref.Exact()) case isDockerError(err, errcode.ErrorCodeUnauthorized): err = kapierrors.NewUnauthorized(fmt.Sprintf("you may not have access to the Docker image %q", repository.Ref.Exact())) } repository.Err = err return } // some images on the Hub have empty tags - treat those as "latest" set := sets.NewString(tags...) if set.Has("") { set.Delete("") set.Insert(api.DefaultImageTag) } tags = set.List() // include only the top N tags in the result, put the rest in AdditionalTags api.PrioritizeTags(tags) for _, s := range tags { if count <= 0 && repository.MaximumTags != -1 { repository.AdditionalTags = append(repository.AdditionalTags, s) continue } count-- repository.Tags = append(repository.Tags, importTag{ Name: s, }) } } // load digests for i := range repository.Digests { importDigest := &repository.Digests[i] if importDigest.Err != nil || importDigest.Image != nil { continue } d, err := digest.ParseDigest(importDigest.Name) if err != nil { importDigest.Err = err continue } limiter.Accept() m, err := s.Get(d) if err != nil { glog.V(5).Infof("unable to access digest %q for repository %#v: %#v", d, repository, err) switch { case isDockerError(err, v2.ErrorCodeManifestUnknown): ref := repository.Ref ref.Tag, ref.ID = "", importDigest.Name err = kapierrors.NewNotFound(api.Resource("dockerimage"), ref.Exact()) case isDockerError(err, errcode.ErrorCodeUnauthorized): err = kapierrors.NewUnauthorized(fmt.Sprintf("you may not have access to the Docker image %q", repository.Ref.Exact())) case strings.HasSuffix(err.Error(), "no basic auth credentials"): err = kapierrors.NewUnauthorized(fmt.Sprintf("you may not have access to the Docker image %q", repository.Ref.Exact())) } importDigest.Err = err continue } importDigest.Image, err = schema1ToImage(m, d) if err != nil { importDigest.Err = err continue } if err := api.ImageWithMetadata(importDigest.Image); err != nil { importDigest.Err = err continue } } for i := range repository.Tags { importTag := &repository.Tags[i] if importTag.Err != nil || importTag.Image != nil { continue } limiter.Accept() m, err := s.GetByTag(importTag.Name) if err != nil { glog.V(5).Infof("unable to access tag %q for repository %#v: %#v", importTag.Name, repository, err) switch { case isDockerError(err, v2.ErrorCodeManifestUnknown): ref := repository.Ref ref.Tag = importTag.Name err = kapierrors.NewNotFound(api.Resource("dockerimage"), ref.Exact()) case isDockerError(err, errcode.ErrorCodeUnauthorized): err = kapierrors.NewUnauthorized(fmt.Sprintf("you may not have access to the Docker image %q", repository.Ref.Exact())) case strings.HasSuffix(err.Error(), "no basic auth credentials"): err = kapierrors.NewUnauthorized(fmt.Sprintf("you may not have access to the Docker image %q", repository.Ref.Exact())) } importTag.Err = err continue } importTag.Image, err = schema1ToImage(m, "") if err != nil { importTag.Err = err continue } if err := api.ImageWithMetadata(importTag.Image); err != nil { importTag.Err = err continue } } } func importRepositoryFromDockerV1(ctx gocontext.Context, repository *importRepository, limiter util.RateLimiter) { value := ctx.Value(ContextKeyV1RegistryClient) if value == nil { err := kapierrors.NewForbidden(api.Resource(""), "", fmt.Errorf("registry %q does not support the v2 Registry API", repository.Registry.Host)).(*kapierrors.StatusError) err.ErrStatus.Reason = "NotV2Registry" applyErrorToRepository(repository, err) return } client, ok := value.(dockerregistry.Client) if !ok { err := kapierrors.NewForbidden(api.Resource(""), "", fmt.Errorf("registry %q does not support the v2 Registry API", repository.Registry.Host)).(*kapierrors.StatusError) err.ErrStatus.Reason = "NotV2Registry" return } conn, err := client.Connect(repository.Registry.Host, repository.Insecure) if err != nil { applyErrorToRepository(repository, err) return } // if repository import is requested (MaximumTags), attempt to load the tags, sort them, and request the first N if count := repository.MaximumTags; count > 0 { tagMap, err := conn.ImageTags(repository.Ref.Namespace, repository.Ref.Name) if err != nil { repository.Err = err return } tags := make([]string, 0, len(tagMap)) for tag := range tagMap { tags = append(tags, tag) } // some images on the Hub have empty tags - treat those as "latest" set := sets.NewString(tags...) if set.Has("") { set.Delete("") set.Insert(api.DefaultImageTag) } tags = set.List() // include only the top N tags in the result, put the rest in AdditionalTags api.PrioritizeTags(tags) for _, s := range tags { if count <= 0 { repository.AdditionalTags = append(repository.AdditionalTags, s) continue } count-- repository.Tags = append(repository.Tags, importTag{ Name: s, }) } } // load digests for i := range repository.Digests { importDigest := &repository.Digests[i] if importDigest.Err != nil || importDigest.Image != nil { continue } limiter.Accept() image, err := conn.ImageByID(repository.Ref.Namespace, repository.Ref.Name, importDigest.Name) if err != nil { importDigest.Err = err continue } // we do not preserve manifests of legacy images importDigest.Image, err = schema0ToImage(image, importDigest.Name) if err != nil { importDigest.Err = err continue } } for i := range repository.Tags { importTag := &repository.Tags[i] if importTag.Err != nil || importTag.Image != nil { continue } limiter.Accept() image, err := conn.ImageByTag(repository.Ref.Namespace, repository.Ref.Name, importTag.Name) if err != nil { importTag.Err = err continue } // we do not preserve manifests of legacy images importTag.Image, err = schema0ToImage(image, "") if err != nil { importTag.Err = err continue } } } type importTag struct { Name string Image *api.Image Err error } type importDigest struct { Name string Image *api.Image Err error } type importRepository struct { Ref api.DockerImageReference Registry *url.URL Name string Insecure bool Tags []importTag Digests []importDigest MaximumTags int AdditionalTags []string Err error } // repositoryKey is the key used to cache information loaded from a remote Docker repository. type repositoryKey struct { // The URL of the server url url.URL // The name of the image repository (contains both namespace and path) name string } // manifestKey is a key for a map between a Docker image tag or image ID and a retrieved api.Image, used // to ensure we don't fetch the same image multiple times. type manifestKey struct { repositoryKey // The tag or ID of the image, not used within the same map value string } func imageImportStatus(err error, kind, position string) unversioned.Status { switch t := err.(type) { case kapierrors.APIStatus: return t.Status() case *field.Error: return kapierrors.NewInvalid(api.Kind(kind), position, field.ErrorList{t}).(kapierrors.APIStatus).Status() default: return kapierrors.NewInternalError(err).(kapierrors.APIStatus).Status() } } func setImageImportStatus(images *api.ImageStreamImport, i int, tag string, err error) { images.Status.Images[i].Tag = tag images.Status.Images[i].Status = imageImportStatus(err, "", "") } func invalidStatus(position string, errs ...*field.Error) unversioned.Status { return kapierrors.NewInvalid(api.Kind(""), position, errs).(kapierrors.APIStatus).Status() } // NewContext is capable of creating RepositoryRetrievers. func NewContext(transport, insecureTransport http.RoundTripper) Context { return Context{ Transport: transport, InsecureTransport: insecureTransport, Challenges: auth.NewSimpleChallengeManager(), } } type Context struct { Transport http.RoundTripper InsecureTransport http.RoundTripper Challenges auth.ChallengeManager } func (c Context) WithCredentials(credentials auth.CredentialStore) RepositoryRetriever { return &repositoryRetriever{ context: c, credentials: credentials, pings: make(map[url.URL]error), redirect: make(map[url.URL]*url.URL), } } type repositoryRetriever struct { context Context credentials auth.CredentialStore pings map[url.URL]error redirect map[url.URL]*url.URL } func (r *repositoryRetriever) Repository(ctx gocontext.Context, registry *url.URL, repoName string, insecure bool) (distribution.Repository, error) { t := r.context.Transport if insecure && r.context.InsecureTransport != nil { t = r.context.InsecureTransport } src := *registry // ping the registry to get challenge headers if err, ok := r.pings[src]; ok { if err != nil { return nil, err } if redirect, ok := r.redirect[src]; ok { src = *redirect } } else { redirect, err := r.ping(src, insecure, t) r.pings[src] = err if err != nil { return nil, err } if redirect != nil { r.redirect[src] = redirect src = *redirect } } rt := transport.NewTransport( t, // TODO: slightly smarter authorizer that retries unauthenticated requests // TODO: make multiple attempts if the first credential fails auth.NewAuthorizer( r.context.Challenges, auth.NewTokenHandler(t, r.credentials, repoName, "pull"), auth.NewBasicHandler(r.credentials), ), ) return registryclient.NewRepository(context.Context(ctx), repoName, src.String(), rt) } func (r *repositoryRetriever) ping(registry url.URL, insecure bool, transport http.RoundTripper) (*url.URL, error) { pingClient := &http.Client{ Transport: transport, Timeout: 15 * time.Second, } target := registry target.Path = path.Join(target.Path, "v2") + "/" req, err := http.NewRequest("GET", target.String(), nil) if err != nil { return nil, err } resp, err := pingClient.Do(req) if err != nil { if insecure && registry.Scheme == "https" { glog.V(5).Infof("Falling back to an HTTP check for an insecure registry %s: %v", registry, err) registry.Scheme = "http" _, nErr := r.ping(registry, true, transport) if nErr != nil { return nil, nErr } return ®istry, nil } return nil, err } defer resp.Body.Close() versions := auth.APIVersions(resp, "Docker-Distribution-API-Version") if len(versions) == 0 { glog.V(5).Infof("Registry responded to v2 Docker endpoint, but has no header for Docker Distribution %s: %d, %#v", req.URL, resp.StatusCode, resp.Header) return nil, &ErrNotV2Registry{Registry: registry.String()} } r.context.Challenges.AddResponse(resp) return nil, nil } func schema1ToImage(manifest *schema1.SignedManifest, d digest.Digest) (*api.Image, error) { if len(manifest.History) == 0 { return nil, fmt.Errorf("image has no v1Compatibility history and cannot be used") } dockerImage, err := unmarshalDockerImage([]byte(manifest.History[0].V1Compatibility)) if err != nil { return nil, err } if len(d) > 0 { dockerImage.ID = d.String() } else { if p, err := manifest.Payload(); err == nil { d, err := digest.FromBytes(p) if err != nil { return nil, fmt.Errorf("unable to create digest from image payload: %v", err) } dockerImage.ID = d.String() } else { d, err := digest.FromBytes(manifest.Raw) if err != nil { return nil, fmt.Errorf("unable to create digest from image bytes: %v", err) } dockerImage.ID = d.String() } } image := &api.Image{ ObjectMeta: kapi.ObjectMeta{ Name: dockerImage.ID, }, DockerImageMetadata: *dockerImage, DockerImageManifest: string(manifest.Raw), DockerImageMetadataVersion: "1.0", } return image, nil } func schema0ToImage(dockerImage *dockerregistry.Image, id string) (*api.Image, error) { var baseImage api.DockerImage if err := kapi.Scheme.Convert(&dockerImage.Image, &baseImage); err != nil { return nil, fmt.Errorf("could not convert image: %#v", err) } image := &api.Image{ ObjectMeta: kapi.ObjectMeta{ Name: dockerImage.ID, }, DockerImageMetadata: baseImage, DockerImageMetadataVersion: "1.0", } return image, nil } func unmarshalDockerImage(body []byte) (*api.DockerImage, error) { var image dockerpre012.DockerImage if err := json.Unmarshal(body, &image); err != nil { return nil, err } dockerImage := &api.DockerImage{} if err := kapi.Scheme.Convert(&image, dockerImage); err != nil { return nil, err } return dockerImage, nil } func isDockerError(err error, code errcode.ErrorCode) bool { switch t := err.(type) { case errcode.Errors: for _, err := range t { if isDockerError(err, code) { return true } } case errcode.ErrorCode: if code == t { return true } case errcode.Error: if t.ErrorCode() == code { return true } } return false }