694df3ff |
package distribution
import ( |
c8d277d2 |
"errors" |
694df3ff |
"fmt"
"io" |
572ce802 |
"sync" |
694df3ff |
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest/schema1" |
f33fa1b8 |
"github.com/docker/distribution/manifest/schema2" |
63099477 |
distreference "github.com/docker/distribution/reference" |
f33fa1b8 |
"github.com/docker/distribution/registry/client" |
694df3ff |
"github.com/docker/docker/distribution/metadata" |
572ce802 |
"github.com/docker/docker/distribution/xfer" |
c8d277d2 |
"github.com/docker/docker/image" |
694df3ff |
"github.com/docker/docker/layer" |
572ce802 |
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progress" |
694df3ff |
"github.com/docker/docker/pkg/stringid" |
2655954c |
"github.com/docker/docker/reference" |
694df3ff |
"github.com/docker/docker/registry"
"golang.org/x/net/context"
)
|
65370be8 |
// PushResult contains the tag, manifest digest, and manifest size from the
// push. It's used to signal this information to the trust code in the client
// so it can sign the manifest if necessary.
type PushResult struct {
Tag string
Digest digest.Digest
Size int
}
|
694df3ff |
type v2Pusher struct { |
63099477 |
v2MetadataService *metadata.V2MetadataService
ref reference.Named
endpoint registry.APIEndpoint
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
repo distribution.Repository |
694df3ff |
|
5c99eebe |
// pushState is state built by the Upload functions. |
f33fa1b8 |
pushState pushState |
572ce802 |
}
|
f33fa1b8 |
type pushState struct { |
572ce802 |
sync.Mutex |
f33fa1b8 |
// remoteLayers is the set of layers known to exist on the remote side.
// This avoids redundant queries when pushing multiple tags that
// involve the same layers. It is also used to fill in digest and size
// information when building the manifest.
remoteLayers map[layer.DiffID]distribution.Descriptor
// confirmedV2 is set to true if we confirm we're talking to a v2
// registry. This is used to limit fallbacks to the v1 protocol.
confirmedV2 bool |
694df3ff |
}
|
a57478d6 |
func (p *v2Pusher) Push(ctx context.Context) (err error) { |
f33fa1b8 |
p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)
p.repo, p.pushState.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull") |
694df3ff |
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err) |
5e8af46f |
return err |
694df3ff |
}
|
a57478d6 |
if err = p.pushV2Repository(ctx); err != nil { |
8f26fe4f |
if continueOnError(err) { |
5e8af46f |
return fallbackError{
err: err,
confirmedV2: p.pushState.confirmedV2,
transportOK: true,
} |
a57478d6 |
}
}
return err
}
func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) { |
c8d277d2 |
if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged { |
2655954c |
imageID, err := p.config.ReferenceStore.Get(p.ref) |
694df3ff |
if err != nil { |
a57478d6 |
return fmt.Errorf("tag does not exist: %s", p.ref.String()) |
694df3ff |
}
|
c8d277d2 |
return p.pushV2Tag(ctx, namedTagged, imageID) |
694df3ff |
} |
c8d277d2 |
if !reference.IsNameOnly(p.ref) {
return errors.New("cannot push a digest reference") |
694df3ff |
}
|
c8d277d2 |
// Pull all tags
pushed := 0
for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
pushed++
if err := p.pushV2Tag(ctx, namedTagged, association.ImageID); err != nil {
return err
} |
694df3ff |
}
}
|
c8d277d2 |
if pushed == 0 {
return fmt.Errorf("no tags to push for %s", p.repoInfo.Name())
}
|
a57478d6 |
return nil |
694df3ff |
}
|
c8d277d2 |
func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, imageID image.ID) error { |
694df3ff |
logrus.Debugf("Pushing repository: %s", ref.String())
|
c8d277d2 |
img, err := p.config.ImageStore.Get(imageID) |
694df3ff |
if err != nil {
return fmt.Errorf("could not find image from tag %s: %v", ref.String(), err)
}
var l layer.Layer
topLayerID := img.RootFS.ChainID()
if topLayerID == "" {
l = layer.EmptyLayer
} else {
l, err = p.config.LayerStore.Get(topLayerID)
if err != nil {
return fmt.Errorf("failed to get top layer from image: %v", err)
}
defer layer.ReleaseAndLog(p.config.LayerStore, l)
}
|
572ce802 |
var descriptors []xfer.UploadDescriptor |
694df3ff |
|
a57478d6 |
descriptorTemplate := v2PushDescriptor{ |
63099477 |
v2MetadataService: p.v2MetadataService,
repoInfo: p.repoInfo,
repo: p.repo,
pushState: &p.pushState, |
694df3ff |
}
|
572ce802 |
// Loop bounds condition is to avoid pushing the base layer on Windows. |
694df3ff |
for i := 0; i < len(img.RootFS.DiffIDs); i++ { |
a57478d6 |
descriptor := descriptorTemplate
descriptor.layer = l
descriptors = append(descriptors, &descriptor) |
694df3ff |
l = l.Parent()
}
|
f33fa1b8 |
if err := p.config.UploadManager.Upload(ctx, descriptors, p.config.ProgressOutput); err != nil { |
572ce802 |
return err
}
|
c8d277d2 |
// Try schema2 first
builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), img.RawJSON())
manifest, err := manifestFromBuilder(ctx, builder, descriptors) |
694df3ff |
if err != nil {
return err
}
|
c8d277d2 |
manSvc, err := p.repo.Manifests(ctx) |
694df3ff |
if err != nil {
return err
} |
c8d277d2 |
|
9d99e360 |
putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())} |
c8d277d2 |
if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
logrus.Warnf("failed to upload schema2 manifest: %v - falling back to schema1", err)
|
4d437a29 |
manifestRef, err := distreference.WithTag(p.repo.Named(), ref.Tag())
if err != nil {
return err
}
builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, img.RawJSON()) |
c8d277d2 |
manifest, err = manifestFromBuilder(ctx, builder, descriptors)
if err != nil {
return err
}
if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
return err |
694df3ff |
}
}
|
c8d277d2 |
var canonicalManifest []byte
switch v := manifest.(type) {
case *schema1.SignedManifest:
canonicalManifest = v.Canonical
case *schema2.DeserializedManifest:
_, canonicalManifest, err = v.Payload()
if err != nil {
return err
} |
694df3ff |
} |
f33fa1b8 |
|
c8d277d2 |
manifestDigest := digest.FromBytes(canonicalManifest)
progress.Messagef(p.config.ProgressOutput, "", "%s: digest: %s size: %d", ref.Tag(), manifestDigest, len(canonicalManifest))
// Signal digest to the trust client so it can sign the
// push, if appropriate.
progress.Aux(p.config.ProgressOutput, PushResult{Tag: ref.Tag(), Digest: manifestDigest, Size: len(canonicalManifest)})
return nil
}
func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuilder, descriptors []xfer.UploadDescriptor) (distribution.Manifest, error) {
// descriptors is in reverse order; iterate backwards to get references
// appended in the right order.
for i := len(descriptors) - 1; i >= 0; i-- {
if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil {
return nil, err
} |
f33fa1b8 |
} |
c8d277d2 |
return builder.Build(ctx) |
694df3ff |
}
|
572ce802 |
type v2PushDescriptor struct { |
63099477 |
layer layer.Layer
v2MetadataService *metadata.V2MetadataService
repoInfo reference.Named
repo distribution.Repository
pushState *pushState |
5c99eebe |
remoteDescriptor distribution.Descriptor |
572ce802 |
}
func (pd *v2PushDescriptor) Key() string { |
4d437a29 |
return "v2push:" + pd.repo.Named().Name() + " " + pd.layer.DiffID().String() |
572ce802 |
}
func (pd *v2PushDescriptor) ID() string {
return stringid.TruncateID(pd.layer.DiffID().String())
}
func (pd *v2PushDescriptor) DiffID() layer.DiffID {
return pd.layer.DiffID()
}
|
5c99eebe |
func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) { |
572ce802 |
diffID := pd.DiffID()
|
f33fa1b8 |
pd.pushState.Lock() |
5c99eebe |
if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok { |
f33fa1b8 |
// it is already known that the push is not needed and
// therefore doing a stat is unnecessary
pd.pushState.Unlock()
progress.Update(progressOutput, pd.ID(), "Layer already exists") |
5c99eebe |
return descriptor, nil |
f33fa1b8 |
}
pd.pushState.Unlock() |
694df3ff |
|
63099477 |
// Do we have any metadata associated with this layer's DiffID?
v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID) |
694df3ff |
if err == nil { |
63099477 |
descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState) |
694df3ff |
if err != nil { |
572ce802 |
progress.Update(progressOutput, pd.ID(), "Image push failed") |
5c99eebe |
return distribution.Descriptor{}, retryOnError(err) |
694df3ff |
}
if exists { |
572ce802 |
progress.Update(progressOutput, pd.ID(), "Layer already exists") |
f33fa1b8 |
pd.pushState.Lock()
pd.pushState.remoteLayers[diffID] = descriptor
pd.pushState.Unlock() |
5c99eebe |
return descriptor, nil |
694df3ff |
}
}
|
f33fa1b8 |
logrus.Debugf("Pushing layer: %s", diffID)
|
694df3ff |
// if digest was empty or not saved, or if blob does not exist on the remote repository,
// then push the blob. |
572ce802 |
bs := pd.repo.Blobs(ctx)
|
1d3480f9 |
var layerUpload distribution.BlobWriter
mountAttemptsRemaining := 3
// Attempt to find another repository in the same registry to mount the layer
// from to avoid an unnecessary upload.
// Note: metadata is stored from oldest to newest, so we iterate through this
// slice in reverse to maximize our chances of the blob still existing in the
// remote repository.
for i := len(v2Metadata) - 1; i >= 0 && mountAttemptsRemaining > 0; i-- {
mountFrom := v2Metadata[i]
sourceRepo, err := reference.ParseNamed(mountFrom.SourceRepository) |
7289c721 |
if err != nil {
continue
} |
1d3480f9 |
if pd.repoInfo.Hostname() != sourceRepo.Hostname() {
// don't mount blobs from another registry
continue |
63099477 |
} |
7289c721 |
|
63099477 |
namedRef, err := reference.WithName(mountFrom.SourceRepository)
if err != nil { |
1d3480f9 |
continue |
63099477 |
} |
7289c721 |
|
63099477 |
// TODO (brianbland): We need to construct a reference where the Name is
// only the full remote name, so clean this up when distribution has a
// richer reference package
remoteRef, err := distreference.WithName(namedRef.RemoteName())
if err != nil { |
1d3480f9 |
continue |
63099477 |
} |
7289c721 |
|
63099477 |
canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest)
if err != nil { |
1d3480f9 |
continue |
7289c721 |
} |
63099477 |
|
1d3480f9 |
logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountFrom.Digest, sourceRepo.FullName()) |
7289c721 |
|
1d3480f9 |
layerUpload, err = bs.Create(ctx, client.WithMountFrom(canonicalRef))
switch err := err.(type) {
case distribution.ErrBlobMounted:
progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name()) |
63099477 |
|
1d3480f9 |
err.Descriptor.MediaType = schema2.MediaTypeLayer |
06e9a056 |
|
1d3480f9 |
pd.pushState.Lock()
pd.pushState.confirmedV2 = true
pd.pushState.remoteLayers[diffID] = err.Descriptor
pd.pushState.Unlock() |
63099477 |
|
1d3480f9 |
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil { |
5c99eebe |
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} |
1d3480f9 |
} |
5c99eebe |
return err.Descriptor, nil |
1d3480f9 |
case nil:
// blob upload session created successfully, so begin the upload
mountAttemptsRemaining = 0
default:
// unable to mount layer from this repository, so this source mapping is no longer valid
logrus.Debugf("unassociating layer %s (%s) with %s", diffID, mountFrom.Digest, mountFrom.SourceRepository)
pd.v2MetadataService.Remove(mountFrom)
mountAttemptsRemaining-- |
63099477 |
}
}
|
1d3480f9 |
if layerUpload == nil {
layerUpload, err = bs.Create(ctx)
if err != nil { |
5c99eebe |
return distribution.Descriptor{}, retryOnError(err) |
1d3480f9 |
} |
572ce802 |
}
defer layerUpload.Close()
arch, err := pd.layer.TarStream() |
694df3ff |
if err != nil { |
5c99eebe |
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} |
694df3ff |
} |
572ce802 |
// don't care if this fails; best effort
size, _ := pd.layer.DiffSize()
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing") |
e273445d |
compressedReader, compressionDone := compress(reader)
defer func() {
reader.Close()
<-compressionDone
}() |
572ce802 |
digester := digest.Canonical.New()
tee := io.TeeReader(compressedReader, digester.Hash())
nn, err := layerUpload.ReadFrom(tee)
compressedReader.Close()
if err != nil { |
5c99eebe |
return distribution.Descriptor{}, retryOnError(err) |
572ce802 |
}
pushDigest := digester.Digest()
if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil { |
5c99eebe |
return distribution.Descriptor{}, retryOnError(err) |
572ce802 |
}
logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
progress.Update(progressOutput, pd.ID(), "Pushed")
|
694df3ff |
// Cache mapping from this layer's DiffID to the blobsum |
63099477 |
if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil { |
5c99eebe |
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} |
694df3ff |
}
|
f33fa1b8 |
pd.pushState.Lock()
|
99a39690 |
// If Commit succeeded, that's an indication that the remote registry |
f33fa1b8 |
// speaks the v2 protocol.
pd.pushState.confirmedV2 = true
|
5c99eebe |
descriptor := distribution.Descriptor{ |
f33fa1b8 |
Digest: pushDigest,
MediaType: schema2.MediaTypeLayer,
Size: nn,
} |
5c99eebe |
pd.pushState.remoteLayers[diffID] = descriptor |
f33fa1b8 |
pd.pushState.Unlock()
|
5c99eebe |
return descriptor, nil
}
func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
pd.remoteDescriptor = descriptor |
f33fa1b8 |
} |
572ce802 |
|
f33fa1b8 |
func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor { |
5c99eebe |
return pd.remoteDescriptor |
694df3ff |
}
|
63099477 |
// layerAlreadyExists checks if the registry already know about any of the
// metadata passed in the "metadata" slice. If it finds one that the registry |
694df3ff |
// knows about, it returns the known digest and "true". |
63099477 |
func layerAlreadyExists(ctx context.Context, metadata []metadata.V2Metadata, repoInfo reference.Named, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) {
for _, meta := range metadata { |
7289c721 |
// Only check blobsums that are known to this repository or have an unknown source |
63099477 |
if meta.SourceRepository != "" && meta.SourceRepository != repoInfo.FullName() { |
7289c721 |
continue
} |
63099477 |
descriptor, err := repo.Blobs(ctx).Stat(ctx, meta.Digest) |
694df3ff |
switch err {
case nil: |
f33fa1b8 |
descriptor.MediaType = schema2.MediaTypeLayer
return descriptor, true, nil |
694df3ff |
case distribution.ErrBlobUnknown:
// nop
default: |
f33fa1b8 |
return distribution.Descriptor{}, false, err |
694df3ff |
}
} |
f33fa1b8 |
return distribution.Descriptor{}, false, nil |
694df3ff |
} |