87992aef |
package server
import ( |
1afd938a |
"io" |
87992aef |
"net/http" |
1afd938a |
"sync" |
12457268 |
"time" |
87992aef |
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"k8s.io/kubernetes/pkg/api/errors"
imageapi "github.com/openshift/origin/pkg/image/api"
"github.com/openshift/origin/pkg/image/importer"
)
// pullthroughBlobStore wraps a distribution.BlobStore and allows remote repositories to serve blobs from remote
// repositories.
type pullthroughBlobStore struct {
distribution.BlobStore
|
8e1c3ef3 |
repo *repository
digestToStore map[string]distribution.BlobStore
pullFromInsecureRegistries bool |
1afd938a |
mirror bool |
87992aef |
}
var _ distribution.BlobStore = &pullthroughBlobStore{}
// Stat makes a local check for the blob, then falls through to the other servers referenced by
// the image stream and looks for those that have the layer.
func (r *pullthroughBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
// check the local store for the blob
desc, err := r.BlobStore.Stat(ctx, dgst)
switch {
case err == distribution.ErrBlobUnknown:
// continue on to the code below and look up the blob in a remote store since it is not in
// the local store
case err != nil: |
7fa0fa3d |
context.GetLogger(ctx).Errorf("Failed to find blob %q: %#v", dgst.String(), err) |
87992aef |
fallthrough
default:
return desc, err
}
|
550daa36 |
return r.remoteStat(ctx, dgst)
}
// remoteStat attempts to find requested blob in candidate remote repositories and if found, it updates
// digestToRepository store. ErrBlobUnknown will be returned if not found.
func (r *pullthroughBlobStore) remoteStat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { |
87992aef |
// look up the potential remote repositories that this blob could be part of (at this time,
// we don't know which image in the image stream surfaced the content).
is, err := r.repo.getImageStream()
if err != nil {
if errors.IsNotFound(err) || errors.IsForbidden(err) {
return distribution.Descriptor{}, distribution.ErrBlobUnknown
} |
7fa0fa3d |
context.GetLogger(ctx).Errorf("Error retrieving image stream for blob: %v", err) |
87992aef |
return distribution.Descriptor{}, err
}
|
0b3b478f |
r.pullFromInsecureRegistries = false
if insecure, ok := is.Annotations[imageapi.InsecureRepositoryAnnotation]; ok {
r.pullFromInsecureRegistries = insecure == "true"
}
|
87992aef |
var localRegistry string
if local, err := imageapi.ParseDockerImageReference(is.Status.DockerImageRepository); err == nil {
// TODO: normalize further?
localRegistry = local.Registry
}
retriever := r.repo.importContext()
cached := r.repo.cachedLayers.RepositoriesForDigest(dgst)
// look at the first level of tagged repositories first
search := identifyCandidateRepositories(is, localRegistry, true)
if desc, err := r.findCandidateRepository(ctx, search, cached, dgst, retriever); err == nil {
return desc, nil
}
// look at all other repositories tagged by the server
secondary := identifyCandidateRepositories(is, localRegistry, false)
for k := range search {
delete(secondary, k)
}
if desc, err := r.findCandidateRepository(ctx, secondary, cached, dgst, retriever); err == nil {
return desc, nil
}
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}
// proxyStat attempts to locate the digest in the provided remote repository or returns an error. If the digest is found,
// r.digestToStore saves the store.
func (r *pullthroughBlobStore) proxyStat(ctx context.Context, retriever importer.RepositoryRetriever, ref imageapi.DockerImageReference, dgst digest.Digest) (distribution.Descriptor, error) { |
7fa0fa3d |
context.GetLogger(ctx).Infof("Trying to stat %q from %q", dgst, ref.Exact()) |
8e1c3ef3 |
repo, err := retriever.Repository(ctx, ref.RegistryURL(), ref.RepositoryName(), r.pullFromInsecureRegistries) |
87992aef |
if err != nil { |
7fa0fa3d |
context.GetLogger(ctx).Errorf("Error getting remote repository for image %q: %v", ref.Exact(), err) |
87992aef |
return distribution.Descriptor{}, err
}
pullthroughBlobStore := repo.Blobs(ctx) |
dc591ac8 |
desc, err := pullthroughBlobStore.Stat(ctx, dgst) |
87992aef |
if err != nil {
if err != distribution.ErrBlobUnknown { |
7fa0fa3d |
context.GetLogger(ctx).Errorf("Error getting pullthroughBlobStore for image %q: %v", ref.Exact(), err) |
87992aef |
}
return distribution.Descriptor{}, err
}
r.digestToStore[dgst.String()] = pullthroughBlobStore
return desc, nil
}
// ServeBlob attempts to serve the requested digest onto w, using a remote proxy store if necessary. |
1afd938a |
func (pbs *pullthroughBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst digest.Digest) error {
store, ok := pbs.digestToStore[dgst.String()] |
87992aef |
if !ok { |
1afd938a |
return pbs.BlobStore.ServeBlob(ctx, w, req, dgst) |
87992aef |
}
|
1afd938a |
// store the content locally if requested, but ensure only one instance at a time
// is storing to avoid excessive local writes
if pbs.mirror {
mu.Lock()
if _, ok = inflight[dgst]; ok {
mu.Unlock()
context.GetLogger(ctx).Infof("Serving %q while mirroring in background", dgst)
_, err := pbs.copyContent(store, ctx, dgst, w, req)
return err
}
inflight[dgst] = struct{}{}
mu.Unlock() |
87992aef |
|
1afd938a |
go func(dgst digest.Digest) {
context.GetLogger(ctx).Infof("Start background mirroring of %q", dgst)
if err := pbs.storeLocal(store, ctx, dgst); err != nil {
context.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error())
}
context.GetLogger(ctx).Infof("Completed mirroring of %q", dgst)
}(dgst) |
87992aef |
}
|
1afd938a |
_, err := pbs.copyContent(store, ctx, dgst, w, req)
return err |
87992aef |
}
|
550daa36 |
// Get attempts to fetch the requested blob by digest using a remote proxy store if necessary.
func (r *pullthroughBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
store, ok := r.digestToStore[dgst.String()]
if ok {
return store.Get(ctx, dgst)
}
data, originalErr := r.BlobStore.Get(ctx, dgst)
if originalErr == nil {
return data, nil
}
desc, err := r.remoteStat(ctx, dgst)
if err != nil {
context.GetLogger(ctx).Errorf("failed to stat blob %q in remote repositories: %v", dgst.String(), err)
return nil, originalErr
}
store, ok = r.digestToStore[desc.Digest.String()]
if !ok {
return nil, originalErr
}
return store.Get(ctx, desc.Digest)
}
|
87992aef |
// findCandidateRepository looks in search for a particular blob, referring to previously cached items
func (r *pullthroughBlobStore) findCandidateRepository(ctx context.Context, search map[string]*imageapi.DockerImageReference, cachedLayers []string, dgst digest.Digest, retriever importer.RepositoryRetriever) (distribution.Descriptor, error) {
// no possible remote locations to search, exit early
if len(search) == 0 {
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}
// see if any of the previously located repositories containing this digest are in this
// image stream
for _, repo := range cachedLayers {
ref, ok := search[repo]
if !ok {
continue
}
desc, err := r.proxyStat(ctx, retriever, *ref, dgst)
if err != nil {
delete(search, repo)
continue
} |
7fa0fa3d |
context.GetLogger(ctx).Infof("Found digest location from cache %q in %q", dgst, repo) |
87992aef |
return desc, nil
}
// search the remaining registries for this digest
for repo, ref := range search {
desc, err := r.proxyStat(ctx, retriever, *ref, dgst)
if err != nil {
continue
} |
ba185fe8 |
r.repo.cachedLayers.RememberDigest(dgst, r.repo.blobrepositorycachettl, repo) |
7fa0fa3d |
context.GetLogger(ctx).Infof("Found digest location by search %q in %q", dgst, repo) |
87992aef |
return desc, nil
}
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}
// identifyCandidateRepositories returns a map of remote repositories referenced by this image stream.
func identifyCandidateRepositories(is *imageapi.ImageStream, localRegistry string, primary bool) map[string]*imageapi.DockerImageReference {
// identify the canonical location of referenced registries to search
search := make(map[string]*imageapi.DockerImageReference)
for _, tagEvent := range is.Status.Tags {
var candidates []imageapi.TagEvent
if primary {
if len(tagEvent.Items) == 0 {
continue
}
candidates = tagEvent.Items[:1]
} else {
if len(tagEvent.Items) <= 1 {
continue
}
candidates = tagEvent.Items[1:]
}
for _, event := range candidates {
ref, err := imageapi.ParseDockerImageReference(event.DockerImageReference)
if err != nil {
continue
}
// skip anything that matches the innate registry
// TODO: there may be a better way to make this determination
if len(localRegistry) != 0 && localRegistry == ref.Registry {
continue
}
ref = ref.DockerClientDefaults()
search[ref.AsRepository().Exact()] = &ref
}
}
return search
}
// setResponseHeaders sets the appropriate content serving headers
func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
w.Header().Set("Content-Type", mediaType)
w.Header().Set("Docker-Content-Digest", digest.String())
w.Header().Set("Etag", digest.String())
} |
1afd938a |
// inflight tracks currently downloading blobs
var inflight = make(map[digest.Digest]struct{})
// mu protects inflight
var mu sync.Mutex
// copyContent attempts to load and serve the provided blob. If req != nil and writer is an instance of http.ResponseWriter,
// response headers will be set and range requests honored.
func (pbs *pullthroughBlobStore) copyContent(store distribution.BlobStore, ctx context.Context, dgst digest.Digest, writer io.Writer, req *http.Request) (distribution.Descriptor, error) {
desc, err := store.Stat(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
remoteReader, err := store.Open(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
rw, ok := writer.(http.ResponseWriter)
if ok {
setResponseHeaders(rw, desc.Size, desc.MediaType, dgst)
// serve range requests
if req != nil {
http.ServeContent(rw, req, desc.Digest.String(), time.Time{}, remoteReader)
return desc, nil
}
}
if _, err = io.CopyN(writer, remoteReader, desc.Size); err != nil {
return distribution.Descriptor{}, err
}
return desc, nil
}
// storeLocal retrieves the named blob from the provided store and writes it into the local store.
func (pbs *pullthroughBlobStore) storeLocal(store distribution.BlobStore, ctx context.Context, dgst digest.Digest) error {
defer func() {
mu.Lock()
delete(inflight, dgst)
mu.Unlock()
}()
var desc distribution.Descriptor
var err error
var bw distribution.BlobWriter
bw, err = pbs.BlobStore.Create(ctx)
if err != nil {
return err
}
desc, err = pbs.copyContent(store, ctx, dgst, bw, nil)
if err != nil {
return err
}
_, err = bw.Commit(ctx, desc)
if err != nil {
return err
}
return nil
} |