package server import ( "io" "net/http" "sync" "time" "github.com/docker/distribution" "github.com/docker/distribution/context" "github.com/docker/distribution/digest" "k8s.io/kubernetes/pkg/api/errors" imageapi "github.com/openshift/origin/pkg/image/api" "github.com/openshift/origin/pkg/image/importer" ) // pullthroughBlobStore wraps a distribution.BlobStore and allows remote repositories to serve blobs from remote // repositories. type pullthroughBlobStore struct { distribution.BlobStore repo *repository digestToStore map[string]distribution.BlobStore pullFromInsecureRegistries bool mirror bool } var _ distribution.BlobStore = &pullthroughBlobStore{} // Stat makes a local check for the blob, then falls through to the other servers referenced by // the image stream and looks for those that have the layer. func (r *pullthroughBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { // check the local store for the blob desc, err := r.BlobStore.Stat(ctx, dgst) switch { case err == distribution.ErrBlobUnknown: // continue on to the code below and look up the blob in a remote store since it is not in // the local store case err != nil: context.GetLogger(ctx).Errorf("Failed to find blob %q: %#v", dgst.String(), err) fallthrough default: return desc, err } return r.remoteStat(ctx, dgst) } // remoteStat attempts to find requested blob in candidate remote repositories and if found, it updates // digestToRepository store. ErrBlobUnknown will be returned if not found. func (r *pullthroughBlobStore) remoteStat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { // look up the potential remote repositories that this blob could be part of (at this time, // we don't know which image in the image stream surfaced the content). is, err := r.repo.getImageStream() if err != nil { if errors.IsNotFound(err) || errors.IsForbidden(err) { return distribution.Descriptor{}, distribution.ErrBlobUnknown } context.GetLogger(ctx).Errorf("Error retrieving image stream for blob: %v", err) return distribution.Descriptor{}, err } r.pullFromInsecureRegistries = false if insecure, ok := is.Annotations[imageapi.InsecureRepositoryAnnotation]; ok { r.pullFromInsecureRegistries = insecure == "true" } var localRegistry string if local, err := imageapi.ParseDockerImageReference(is.Status.DockerImageRepository); err == nil { // TODO: normalize further? localRegistry = local.Registry } retriever := r.repo.importContext() cached := r.repo.cachedLayers.RepositoriesForDigest(dgst) // look at the first level of tagged repositories first search := identifyCandidateRepositories(is, localRegistry, true) if desc, err := r.findCandidateRepository(ctx, search, cached, dgst, retriever); err == nil { return desc, nil } // look at all other repositories tagged by the server secondary := identifyCandidateRepositories(is, localRegistry, false) for k := range search { delete(secondary, k) } if desc, err := r.findCandidateRepository(ctx, secondary, cached, dgst, retriever); err == nil { return desc, nil } return distribution.Descriptor{}, distribution.ErrBlobUnknown } // proxyStat attempts to locate the digest in the provided remote repository or returns an error. If the digest is found, // r.digestToStore saves the store. func (r *pullthroughBlobStore) proxyStat(ctx context.Context, retriever importer.RepositoryRetriever, ref imageapi.DockerImageReference, dgst digest.Digest) (distribution.Descriptor, error) { context.GetLogger(ctx).Infof("Trying to stat %q from %q", dgst, ref.Exact()) repo, err := retriever.Repository(ctx, ref.RegistryURL(), ref.RepositoryName(), r.pullFromInsecureRegistries) if err != nil { context.GetLogger(ctx).Errorf("Error getting remote repository for image %q: %v", ref.Exact(), err) return distribution.Descriptor{}, err } pullthroughBlobStore := repo.Blobs(ctx) desc, err := pullthroughBlobStore.Stat(ctx, dgst) if err != nil { if err != distribution.ErrBlobUnknown { context.GetLogger(ctx).Errorf("Error getting pullthroughBlobStore for image %q: %v", ref.Exact(), err) } return distribution.Descriptor{}, err } r.digestToStore[dgst.String()] = pullthroughBlobStore return desc, nil } // ServeBlob attempts to serve the requested digest onto w, using a remote proxy store if necessary. func (pbs *pullthroughBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst digest.Digest) error { store, ok := pbs.digestToStore[dgst.String()] if !ok { return pbs.BlobStore.ServeBlob(ctx, w, req, dgst) } // store the content locally if requested, but ensure only one instance at a time // is storing to avoid excessive local writes if pbs.mirror { mu.Lock() if _, ok = inflight[dgst]; ok { mu.Unlock() context.GetLogger(ctx).Infof("Serving %q while mirroring in background", dgst) _, err := pbs.copyContent(store, ctx, dgst, w, req) return err } inflight[dgst] = struct{}{} mu.Unlock() go func(dgst digest.Digest) { context.GetLogger(ctx).Infof("Start background mirroring of %q", dgst) if err := pbs.storeLocal(store, ctx, dgst); err != nil { context.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error()) } context.GetLogger(ctx).Infof("Completed mirroring of %q", dgst) }(dgst) } _, err := pbs.copyContent(store, ctx, dgst, w, req) return err } // Get attempts to fetch the requested blob by digest using a remote proxy store if necessary. func (r *pullthroughBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { store, ok := r.digestToStore[dgst.String()] if ok { return store.Get(ctx, dgst) } data, originalErr := r.BlobStore.Get(ctx, dgst) if originalErr == nil { return data, nil } desc, err := r.remoteStat(ctx, dgst) if err != nil { context.GetLogger(ctx).Errorf("failed to stat blob %q in remote repositories: %v", dgst.String(), err) return nil, originalErr } store, ok = r.digestToStore[desc.Digest.String()] if !ok { return nil, originalErr } return store.Get(ctx, desc.Digest) } // findCandidateRepository looks in search for a particular blob, referring to previously cached items func (r *pullthroughBlobStore) findCandidateRepository(ctx context.Context, search map[string]*imageapi.DockerImageReference, cachedLayers []string, dgst digest.Digest, retriever importer.RepositoryRetriever) (distribution.Descriptor, error) { // no possible remote locations to search, exit early if len(search) == 0 { return distribution.Descriptor{}, distribution.ErrBlobUnknown } // see if any of the previously located repositories containing this digest are in this // image stream for _, repo := range cachedLayers { ref, ok := search[repo] if !ok { continue } desc, err := r.proxyStat(ctx, retriever, *ref, dgst) if err != nil { delete(search, repo) continue } context.GetLogger(ctx).Infof("Found digest location from cache %q in %q", dgst, repo) return desc, nil } // search the remaining registries for this digest for repo, ref := range search { desc, err := r.proxyStat(ctx, retriever, *ref, dgst) if err != nil { continue } r.repo.cachedLayers.RememberDigest(dgst, r.repo.blobrepositorycachettl, repo) context.GetLogger(ctx).Infof("Found digest location by search %q in %q", dgst, repo) return desc, nil } return distribution.Descriptor{}, distribution.ErrBlobUnknown } // identifyCandidateRepositories returns a map of remote repositories referenced by this image stream. func identifyCandidateRepositories(is *imageapi.ImageStream, localRegistry string, primary bool) map[string]*imageapi.DockerImageReference { // identify the canonical location of referenced registries to search search := make(map[string]*imageapi.DockerImageReference) for _, tagEvent := range is.Status.Tags { var candidates []imageapi.TagEvent if primary { if len(tagEvent.Items) == 0 { continue } candidates = tagEvent.Items[:1] } else { if len(tagEvent.Items) <= 1 { continue } candidates = tagEvent.Items[1:] } for _, event := range candidates { ref, err := imageapi.ParseDockerImageReference(event.DockerImageReference) if err != nil { continue } // skip anything that matches the innate registry // TODO: there may be a better way to make this determination if len(localRegistry) != 0 && localRegistry == ref.Registry { continue } ref = ref.DockerClientDefaults() search[ref.AsRepository().Exact()] = &ref } } return search } // setResponseHeaders sets the appropriate content serving headers func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) { w.Header().Set("Content-Type", mediaType) w.Header().Set("Docker-Content-Digest", digest.String()) w.Header().Set("Etag", digest.String()) } // inflight tracks currently downloading blobs var inflight = make(map[digest.Digest]struct{}) // mu protects inflight var mu sync.Mutex // copyContent attempts to load and serve the provided blob. If req != nil and writer is an instance of http.ResponseWriter, // response headers will be set and range requests honored. func (pbs *pullthroughBlobStore) copyContent(store distribution.BlobStore, ctx context.Context, dgst digest.Digest, writer io.Writer, req *http.Request) (distribution.Descriptor, error) { desc, err := store.Stat(ctx, dgst) if err != nil { return distribution.Descriptor{}, err } remoteReader, err := store.Open(ctx, dgst) if err != nil { return distribution.Descriptor{}, err } rw, ok := writer.(http.ResponseWriter) if ok { setResponseHeaders(rw, desc.Size, desc.MediaType, dgst) // serve range requests if req != nil { http.ServeContent(rw, req, desc.Digest.String(), time.Time{}, remoteReader) return desc, nil } } if _, err = io.CopyN(writer, remoteReader, desc.Size); err != nil { return distribution.Descriptor{}, err } return desc, nil } // storeLocal retrieves the named blob from the provided store and writes it into the local store. func (pbs *pullthroughBlobStore) storeLocal(store distribution.BlobStore, ctx context.Context, dgst digest.Digest) error { defer func() { mu.Lock() delete(inflight, dgst) mu.Unlock() }() var desc distribution.Descriptor var err error var bw distribution.BlobWriter bw, err = pbs.BlobStore.Create(ctx) if err != nil { return err } desc, err = pbs.copyContent(store, ctx, dgst, bw, nil) if err != nil { return err } _, err = bw.Commit(ctx, desc) if err != nil { return err } return nil }