694df3ff |
package distribution
import ( |
c8d277d2 |
"errors" |
694df3ff |
"fmt"
"io" |
adee2845 |
"runtime" |
0928f3f2 |
"sort"
"strings" |
572ce802 |
"sync" |
694df3ff |
|
0928f3f2 |
"golang.org/x/net/context"
|
694df3ff |
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1" |
f33fa1b8 |
"github.com/docker/distribution/manifest/schema2" |
3a127939 |
"github.com/docker/distribution/reference" |
f33fa1b8 |
"github.com/docker/distribution/registry/client" |
13222160 |
apitypes "github.com/docker/docker/api/types" |
694df3ff |
"github.com/docker/docker/distribution/metadata" |
572ce802 |
"github.com/docker/docker/distribution/xfer" |
694df3ff |
"github.com/docker/docker/layer" |
572ce802 |
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progress" |
694df3ff |
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry" |
709bf8b7 |
digest "github.com/opencontainers/go-digest" |
1009e6a4 |
"github.com/sirupsen/logrus" |
694df3ff |
)
|
81f7b1f1 |
const (
smallLayerMaximumSize = 100 * (1 << 10) // 100KB
middleLayerMaximumSize = 10 * (1 << 20) // 10MB |
694df3ff |
)
type v2Pusher struct { |
d3bd14a4 |
v2MetadataService metadata.V2MetadataService |
63099477 |
ref reference.Named
endpoint registry.APIEndpoint
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
repo distribution.Repository |
694df3ff |
|
5c99eebe |
// pushState is state built by the Upload functions. |
f33fa1b8 |
pushState pushState |
572ce802 |
}
|
f33fa1b8 |
type pushState struct { |
572ce802 |
sync.Mutex |
f33fa1b8 |
// remoteLayers is the set of layers known to exist on the remote side.
// This avoids redundant queries when pushing multiple tags that
// involve the same layers. It is also used to fill in digest and size
// information when building the manifest.
remoteLayers map[layer.DiffID]distribution.Descriptor
// confirmedV2 is set to true if we confirm we're talking to a v2
// registry. This is used to limit fallbacks to the v1 protocol.
confirmedV2 bool |
694df3ff |
}
|
a57478d6 |
func (p *v2Pusher) Push(ctx context.Context) (err error) { |
f33fa1b8 |
p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)
p.repo, p.pushState.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull") |
694df3ff |
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err) |
5e8af46f |
return err |
694df3ff |
}
|
a57478d6 |
if err = p.pushV2Repository(ctx); err != nil { |
305801f5 |
if continueOnError(err, p.endpoint.Mirror) { |
5e8af46f |
return fallbackError{
err: err,
confirmedV2: p.pushState.confirmedV2,
transportOK: true,
} |
a57478d6 |
}
}
return err
}
func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) { |
c8d277d2 |
if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged { |
2655954c |
imageID, err := p.config.ReferenceStore.Get(p.ref) |
694df3ff |
if err != nil { |
3a127939 |
return fmt.Errorf("tag does not exist: %s", reference.FamiliarString(p.ref)) |
694df3ff |
}
|
c8d277d2 |
return p.pushV2Tag(ctx, namedTagged, imageID) |
694df3ff |
} |
c8d277d2 |
if !reference.IsNameOnly(p.ref) {
return errors.New("cannot push a digest reference") |
694df3ff |
}
|
c8d277d2 |
// Pull all tags
pushed := 0
for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
pushed++ |
80522398 |
if err := p.pushV2Tag(ctx, namedTagged, association.ID); err != nil { |
c8d277d2 |
return err
} |
694df3ff |
}
}
|
c8d277d2 |
if pushed == 0 { |
3a127939 |
return fmt.Errorf("no tags to push for %s", reference.FamiliarName(p.repoInfo.Name)) |
c8d277d2 |
}
|
a57478d6 |
return nil |
694df3ff |
}
|
80522398 |
func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error { |
3a127939 |
logrus.Debugf("Pushing repository: %s", reference.FamiliarString(ref)) |
694df3ff |
|
3c7676a0 |
imgConfig, err := p.config.ImageStore.Get(id) |
694df3ff |
if err != nil { |
3a127939 |
return fmt.Errorf("could not find image from tag %s: %v", reference.FamiliarString(ref), err) |
694df3ff |
}
|
afd305c4 |
rootfs, os, err := p.config.ImageStore.RootFSAndOSFromConfig(imgConfig) |
3c7676a0 |
if err != nil { |
3a127939 |
return fmt.Errorf("unable to get rootfs for image %s: %s", reference.FamiliarString(ref), err) |
3c7676a0 |
} |
694df3ff |
|
afd305c4 |
l, err := p.config.LayerStores[os].Get(rootfs.ChainID()) |
3c7676a0 |
if err != nil {
return fmt.Errorf("failed to get top layer from image: %v", err) |
694df3ff |
} |
3c7676a0 |
defer l.Release() |
694df3ff |
|
0928f3f2 |
hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig)
if err != nil {
return fmt.Errorf("failed to compute hmac key of auth config: %v", err)
}
|
572ce802 |
var descriptors []xfer.UploadDescriptor |
694df3ff |
|
a57478d6 |
descriptorTemplate := v2PushDescriptor{ |
63099477 |
v2MetadataService: p.v2MetadataService, |
0928f3f2 |
hmacKey: hmacKey, |
3a127939 |
repoInfo: p.repoInfo.Name, |
1333ef3c |
ref: p.ref, |
67fdf574 |
endpoint: p.endpoint, |
63099477 |
repo: p.repo,
pushState: &p.pushState, |
694df3ff |
}
|
572ce802 |
// Loop bounds condition is to avoid pushing the base layer on Windows. |
80b2c326 |
for range rootfs.DiffIDs { |
a57478d6 |
descriptor := descriptorTemplate
descriptor.layer = l |
81f7b1f1 |
descriptor.checkedDigests = make(map[digest.Digest]struct{}) |
a57478d6 |
descriptors = append(descriptors, &descriptor) |
694df3ff |
l = l.Parent()
}
|
f33fa1b8 |
if err := p.config.UploadManager.Upload(ctx, descriptors, p.config.ProgressOutput); err != nil { |
572ce802 |
return err
}
|
c8d277d2 |
// Try schema2 first |
3c7676a0 |
builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), p.config.ConfigMediaType, imgConfig) |
c8d277d2 |
manifest, err := manifestFromBuilder(ctx, builder, descriptors) |
694df3ff |
if err != nil {
return err
}
|
c8d277d2 |
manSvc, err := p.repo.Manifests(ctx) |
694df3ff |
if err != nil {
return err
} |
c8d277d2 |
|
c18d03a7 |
putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())} |
c8d277d2 |
if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil { |
3c7676a0 |
if runtime.GOOS == "windows" || p.config.TrustKey == nil || p.config.RequireSchema2 { |
adee2845 |
logrus.Warnf("failed to upload schema2 manifest: %v", err)
return err
}
|
c8d277d2 |
logrus.Warnf("failed to upload schema2 manifest: %v - falling back to schema1", err)
|
3a127939 |
manifestRef, err := reference.WithTag(p.repo.Named(), ref.Tag()) |
4d437a29 |
if err != nil {
return err
} |
3c7676a0 |
builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, imgConfig) |
c8d277d2 |
manifest, err = manifestFromBuilder(ctx, builder, descriptors)
if err != nil {
return err
}
if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
return err |
694df3ff |
}
}
|
c8d277d2 |
var canonicalManifest []byte
switch v := manifest.(type) {
case *schema1.SignedManifest:
canonicalManifest = v.Canonical
case *schema2.DeserializedManifest:
_, canonicalManifest, err = v.Payload()
if err != nil {
return err
} |
694df3ff |
} |
f33fa1b8 |
|
c8d277d2 |
manifestDigest := digest.FromBytes(canonicalManifest)
progress.Messagef(p.config.ProgressOutput, "", "%s: digest: %s size: %d", ref.Tag(), manifestDigest, len(canonicalManifest)) |
33984f25 |
|
80522398 |
if err := addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil { |
33984f25 |
return err
}
|
c8d277d2 |
// Signal digest to the trust client so it can sign the
// push, if appropriate. |
13222160 |
progress.Aux(p.config.ProgressOutput, apitypes.PushResult{Tag: ref.Tag(), Digest: manifestDigest.String(), Size: len(canonicalManifest)}) |
c8d277d2 |
return nil
}
func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuilder, descriptors []xfer.UploadDescriptor) (distribution.Manifest, error) {
// descriptors is in reverse order; iterate backwards to get references
// appended in the right order.
for i := len(descriptors) - 1; i >= 0; i-- {
if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil {
return nil, err
} |
f33fa1b8 |
} |
c8d277d2 |
return builder.Build(ctx) |
694df3ff |
}
|
572ce802 |
type v2PushDescriptor struct { |
3c7676a0 |
layer PushLayer |
d3bd14a4 |
v2MetadataService metadata.V2MetadataService |
0928f3f2 |
hmacKey []byte |
63099477 |
repoInfo reference.Named |
1333ef3c |
ref reference.Named |
67fdf574 |
endpoint registry.APIEndpoint |
63099477 |
repo distribution.Repository
pushState *pushState |
5c99eebe |
remoteDescriptor distribution.Descriptor |
81f7b1f1 |
// a set of digests whose presence has been checked in a target repository
checkedDigests map[digest.Digest]struct{} |
572ce802 |
}
func (pd *v2PushDescriptor) Key() string { |
3a127939 |
return "v2push:" + pd.ref.Name() + " " + pd.layer.DiffID().String() |
572ce802 |
}
func (pd *v2PushDescriptor) ID() string {
return stringid.TruncateID(pd.layer.DiffID().String())
}
func (pd *v2PushDescriptor) DiffID() layer.DiffID {
return pd.layer.DiffID()
}
|
5c99eebe |
func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) { |
67fdf574 |
// Skip foreign layers unless this registry allows nondistributable artifacts.
if !pd.endpoint.AllowNondistributableArtifacts {
if fs, ok := pd.layer.(distribution.Describable); ok {
if d := fs.Descriptor(); len(d.URLs) > 0 {
progress.Update(progressOutput, pd.ID(), "Skipped foreign layer")
return d, nil
} |
05bd0435 |
}
}
|
572ce802 |
diffID := pd.DiffID()
|
f33fa1b8 |
pd.pushState.Lock() |
5c99eebe |
if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok { |
f33fa1b8 |
// it is already known that the push is not needed and
// therefore doing a stat is unnecessary
pd.pushState.Unlock()
progress.Update(progressOutput, pd.ID(), "Layer already exists") |
5c99eebe |
return descriptor, nil |
f33fa1b8 |
}
pd.pushState.Unlock() |
694df3ff |
|
81f7b1f1 |
maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer)
|
63099477 |
// Do we have any metadata associated with this layer's DiffID?
v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID) |
694df3ff |
if err == nil { |
cdc46abd |
// check for blob existence in the target repository
descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, v2Metadata) |
81f7b1f1 |
if exists || err != nil {
return descriptor, err |
694df3ff |
}
}
// if digest was empty or not saved, or if blob does not exist on the remote repository,
// then push the blob. |
572ce802 |
bs := pd.repo.Blobs(ctx)
|
1d3480f9 |
var layerUpload distribution.BlobWriter
|
0928f3f2 |
// Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload |
81f7b1f1 |
candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata) |
0928f3f2 |
for _, mountCandidate := range candidates {
logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository)
createOpts := []distribution.BlobCreateOption{}
if len(mountCandidate.SourceRepository) > 0 { |
3a127939 |
namedRef, err := reference.ParseNormalizedNamed(mountCandidate.SourceRepository) |
0928f3f2 |
if err != nil { |
3a127939 |
logrus.Errorf("failed to parse source repository reference %v: %v", reference.FamiliarString(namedRef), err) |
0928f3f2 |
pd.v2MetadataService.Remove(mountCandidate)
continue
} |
7289c721 |
|
3a127939 |
// Candidates are always under same domain, create remote reference
// with only path to set mount from with
remoteRef, err := reference.WithName(reference.Path(namedRef)) |
0928f3f2 |
if err != nil { |
3a127939 |
logrus.Errorf("failed to make remote reference out of %q: %v", reference.Path(namedRef), err) |
0928f3f2 |
continue
} |
7289c721 |
|
3a127939 |
canonicalRef, err := reference.WithDigest(reference.TrimNamed(remoteRef), mountCandidate.Digest) |
0928f3f2 |
if err != nil {
logrus.Errorf("failed to make canonical reference: %v", err)
continue
} |
7289c721 |
|
0928f3f2 |
createOpts = append(createOpts, client.WithMountFrom(canonicalRef)) |
7289c721 |
} |
63099477 |
|
0928f3f2 |
// send the layer
lu, err := bs.Create(ctx, createOpts...) |
1d3480f9 |
switch err := err.(type) { |
0928f3f2 |
case nil:
// noop |
1d3480f9 |
case distribution.ErrBlobMounted:
progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name()) |
63099477 |
|
1d3480f9 |
err.Descriptor.MediaType = schema2.MediaTypeLayer |
06e9a056 |
|
1d3480f9 |
pd.pushState.Lock()
pd.pushState.confirmedV2 = true
pd.pushState.remoteLayers[diffID] = err.Descriptor
pd.pushState.Unlock() |
63099477 |
|
1d3480f9 |
// Cache mapping from this layer's DiffID to the blobsum |
0928f3f2 |
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: err.Descriptor.Digest, |
3a127939 |
SourceRepository: pd.repoInfo.Name(), |
0928f3f2 |
}); err != nil { |
5c99eebe |
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} |
1d3480f9 |
} |
5c99eebe |
return err.Descriptor, nil |
1d3480f9 |
default: |
0928f3f2 |
logrus.Infof("failed to mount layer %s (%s) from %s: %v", diffID, mountCandidate.Digest, mountCandidate.SourceRepository, err)
}
if len(mountCandidate.SourceRepository) > 0 &&
(metadata.CheckV2MetadataHMAC(&mountCandidate, pd.hmacKey) ||
len(mountCandidate.HMAC) == 0) {
cause := "blob mount failure"
if err != nil {
cause = fmt.Sprintf("an error: %v", err.Error())
}
logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause)
pd.v2MetadataService.Remove(mountCandidate)
}
|
c6dd51c3 |
if lu != nil {
// cancel previous upload
cancelLayerUpload(ctx, mountCandidate.Digest, layerUpload)
layerUpload = lu |
63099477 |
}
}
|
81f7b1f1 |
if maxExistenceChecks-len(pd.checkedDigests) > 0 {
// do additional layer existence checks with other known digests if any
descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata)
if exists || err != nil {
return descriptor, err |
63099477 |
}
}
|
81f7b1f1 |
logrus.Debugf("Pushing layer: %s", diffID) |
1d3480f9 |
if layerUpload == nil {
layerUpload, err = bs.Create(ctx)
if err != nil { |
5c99eebe |
return distribution.Descriptor{}, retryOnError(err) |
1d3480f9 |
} |
572ce802 |
}
defer layerUpload.Close()
|
0928f3f2 |
// upload the blob |
f7f101d5 |
return pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload) |
0928f3f2 |
}
func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
pd.remoteDescriptor = descriptor
}
func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
return pd.remoteDescriptor
}
func (pd *v2PushDescriptor) uploadUsingSession(
ctx context.Context,
progressOutput progress.Output,
diffID layer.DiffID,
layerUpload distribution.BlobWriter,
) (distribution.Descriptor, error) { |
3c7676a0 |
var reader io.ReadCloser
contentReader, err := pd.layer.Open() |
595901bd |
if err != nil {
return distribution.Descriptor{}, retryOnError(err)
}
|
3c7676a0 |
size, _ := pd.layer.Size()
reader = progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, contentReader), progressOutput, size, pd.ID(), "Pushing")
switch m := pd.layer.MediaType(); m {
case schema2.MediaTypeUncompressedLayer:
compressedReader, compressionDone := compress(reader)
defer func(closer io.Closer) {
closer.Close()
<-compressionDone
}(reader)
reader = compressedReader
case schema2.MediaTypeLayer:
default: |
e273445d |
reader.Close() |
3c7676a0 |
return distribution.Descriptor{}, fmt.Errorf("unsupported layer media type %s", m)
} |
572ce802 |
|
7a855799 |
digester := digest.Canonical.Digester() |
3c7676a0 |
tee := io.TeeReader(reader, digester.Hash()) |
572ce802 |
nn, err := layerUpload.ReadFrom(tee) |
3c7676a0 |
reader.Close() |
572ce802 |
if err != nil { |
5c99eebe |
return distribution.Descriptor{}, retryOnError(err) |
572ce802 |
}
pushDigest := digester.Digest()
if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil { |
5c99eebe |
return distribution.Descriptor{}, retryOnError(err) |
572ce802 |
}
logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
progress.Update(progressOutput, pd.ID(), "Pushed")
|
694df3ff |
// Cache mapping from this layer's DiffID to the blobsum |
0928f3f2 |
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: pushDigest, |
3a127939 |
SourceRepository: pd.repoInfo.Name(), |
0928f3f2 |
}); err != nil { |
5c99eebe |
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} |
694df3ff |
}
|
81f7b1f1 |
desc := distribution.Descriptor{ |
f33fa1b8 |
Digest: pushDigest,
MediaType: schema2.MediaTypeLayer,
Size: nn,
}
|
81f7b1f1 |
pd.pushState.Lock()
// If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol.
pd.pushState.confirmedV2 = true
pd.pushState.remoteLayers[diffID] = desc |
f33fa1b8 |
pd.pushState.Unlock()
|
81f7b1f1 |
return desc, nil |
694df3ff |
}
|
81f7b1f1 |
// layerAlreadyExists checks if the registry already knows about any of the metadata passed in the "metadata"
// slice. If it finds one that the registry knows about, it returns the known digest and "true". If
// "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository
// (not just the target one).
func (pd *v2PushDescriptor) layerAlreadyExists(
ctx context.Context,
progressOutput progress.Output,
diffID layer.DiffID,
checkOtherRepositories bool,
maxExistenceCheckAttempts int,
v2Metadata []metadata.V2Metadata,
) (desc distribution.Descriptor, exists bool, err error) {
// filter the metadata
candidates := []metadata.V2Metadata{}
for _, meta := range v2Metadata { |
3a127939 |
if len(meta.SourceRepository) > 0 && !checkOtherRepositories && meta.SourceRepository != pd.repoInfo.Name() { |
81f7b1f1 |
continue
}
candidates = append(candidates, meta)
}
// sort the candidates by similarity
sortV2MetadataByLikenessAndAge(pd.repoInfo, pd.hmacKey, candidates)
digestToMetadata := make(map[digest.Digest]*metadata.V2Metadata)
// an array of unique blob digests ordered from the best mount candidates to worst
layerDigests := []digest.Digest{}
for i := 0; i < len(candidates); i++ {
if len(layerDigests) >= maxExistenceCheckAttempts {
break
}
meta := &candidates[i]
if _, exists := digestToMetadata[meta.Digest]; exists {
// keep reference just to the first mapping (the best mount candidate) |
7289c721 |
continue
} |
81f7b1f1 |
if _, exists := pd.checkedDigests[meta.Digest]; exists {
// existence of this digest has already been tested |
7289c721 |
continue
} |
81f7b1f1 |
digestToMetadata[meta.Digest] = meta
layerDigests = append(layerDigests, meta.Digest)
}
|
e0702e9f |
attempts: |
81f7b1f1 |
for _, dgst := range layerDigests {
meta := digestToMetadata[dgst] |
3a127939 |
logrus.Debugf("Checking for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name()) |
81f7b1f1 |
desc, err = pd.repo.Blobs(ctx).Stat(ctx, dgst)
pd.checkedDigests[meta.Digest] = struct{}{} |
694df3ff |
switch err {
case nil: |
3a127939 |
if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.Name() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) { |
81f7b1f1 |
// cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: desc.Digest, |
3a127939 |
SourceRepository: pd.repoInfo.Name(), |
81f7b1f1 |
}); err != nil {
return distribution.Descriptor{}, false, xfer.DoNotRetry{Err: err}
}
}
desc.MediaType = schema2.MediaTypeLayer
exists = true |
e0702e9f |
break attempts |
694df3ff |
case distribution.ErrBlobUnknown: |
3a127939 |
if meta.SourceRepository == pd.repoInfo.Name() { |
81f7b1f1 |
// remove the mapping to the target repository
pd.v2MetadataService.Remove(*meta)
} |
694df3ff |
default: |
3a127939 |
logrus.WithError(err).Debugf("Failed to check for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name()) |
694df3ff |
}
} |
81f7b1f1 |
if exists {
progress.Update(progressOutput, pd.ID(), "Layer already exists")
pd.pushState.Lock()
pd.pushState.remoteLayers[diffID] = desc
pd.pushState.Unlock()
}
return desc, exists, nil
}
// getMaxMountAndExistenceCheckAttempts returns a maximum number of cross repository mount attempts from
// source repositories of target registry, maximum number of layer existence checks performed on the target
// repository and whether the check shall be done also with digests mapped to different repositories. The
// decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost
// of upload does not outweigh a latency. |
3c7676a0 |
func getMaxMountAndExistenceCheckAttempts(layer PushLayer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
size, err := layer.Size() |
81f7b1f1 |
switch {
// big blob
case size > middleLayerMaximumSize:
// 1st attempt to mount the blob few times
// 2nd few existence checks with digests associated to any repository
// then fallback to upload
return 4, 3, true
// middle sized blobs; if we could not get the size, assume we deal with middle sized blob
case size > smallLayerMaximumSize, err != nil:
// 1st attempt to mount blobs of average size few times
// 2nd try at most 1 existence check if there's an existing mapping to the target repository
// then fallback to upload
return 3, 1, false
// small blobs, do a minimum number of checks
default:
return 1, 1, false
} |
694df3ff |
} |
0928f3f2 |
// getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The |
e0577d5f |
// array is sorted from youngest to oldest. If requireRegistryMatch is true, the resulting array will contain |
0928f3f2 |
// only metadata entries having registry part of SourceRepository matching the part of repoInfo.
func getRepositoryMountCandidates(
repoInfo reference.Named,
hmacKey []byte,
max int,
v2Metadata []metadata.V2Metadata,
) []metadata.V2Metadata {
candidates := []metadata.V2Metadata{}
for _, meta := range v2Metadata {
sourceRepo, err := reference.ParseNamed(meta.SourceRepository) |
3a127939 |
if err != nil || reference.Domain(repoInfo) != reference.Domain(sourceRepo) { |
0928f3f2 |
continue
}
// target repository is not a viable candidate |
3a127939 |
if meta.SourceRepository == repoInfo.Name() { |
0928f3f2 |
continue
}
candidates = append(candidates, meta)
}
sortV2MetadataByLikenessAndAge(repoInfo, hmacKey, candidates)
if max >= 0 && len(candidates) > max {
// select the youngest metadata
candidates = candidates[:max]
}
return candidates
}
// byLikeness is a sorting container for v2 metadata candidates for cross repository mount. The
// candidate "a" is preferred over "b":
//
// 1. if it was hashed using the same AuthConfig as the one used to authenticate to target repository and the
// "b" was not
// 2. if a number of its repository path components exactly matching path components of target repository is higher
type byLikeness struct {
arr []metadata.V2Metadata
hmacKey []byte
pathComponents []string
}
func (bla byLikeness) Less(i, j int) bool {
aMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[i], bla.hmacKey)
bMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[j], bla.hmacKey)
if aMacMatch != bMacMatch {
return aMacMatch
}
aMatch := numOfMatchingPathComponents(bla.arr[i].SourceRepository, bla.pathComponents)
bMatch := numOfMatchingPathComponents(bla.arr[j].SourceRepository, bla.pathComponents)
return aMatch > bMatch
}
func (bla byLikeness) Swap(i, j int) {
bla.arr[i], bla.arr[j] = bla.arr[j], bla.arr[i]
}
func (bla byLikeness) Len() int { return len(bla.arr) }
|
709bf8b7 |
// nolint: interfacer |
0928f3f2 |
func sortV2MetadataByLikenessAndAge(repoInfo reference.Named, hmacKey []byte, marr []metadata.V2Metadata) {
// reverse the metadata array to shift the newest entries to the beginning
for i := 0; i < len(marr)/2; i++ {
marr[i], marr[len(marr)-i-1] = marr[len(marr)-i-1], marr[i]
}
// keep equal entries ordered from the youngest to the oldest
sort.Stable(byLikeness{
arr: marr,
hmacKey: hmacKey, |
3a127939 |
pathComponents: getPathComponents(repoInfo.Name()), |
0928f3f2 |
})
}
// numOfMatchingPathComponents returns a number of path components in "pth" that exactly match "matchComponents".
func numOfMatchingPathComponents(pth string, matchComponents []string) int {
pthComponents := getPathComponents(pth)
i := 0
for ; i < len(pthComponents) && i < len(matchComponents); i++ {
if matchComponents[i] != pthComponents[i] {
return i
}
}
return i
}
func getPathComponents(path string) []string {
return strings.Split(path, "/")
} |
c6dd51c3 |
func cancelLayerUpload(ctx context.Context, dgst digest.Digest, layerUpload distribution.BlobWriter) {
if layerUpload != nil {
logrus.Debugf("cancelling upload of blob %s", dgst)
err := layerUpload.Cancel(ctx)
if err != nil {
logrus.Warnf("failed to cancel upload: %v", err) |
694df3ff |
}
}
} |