distribution/push_v2.go
694df3ff
 package distribution
 
 import (
c8d277d2
 	"errors"
694df3ff
 	"fmt"
 	"io"
572ce802
 	"sync"
694df3ff
 
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/distribution"
 	"github.com/docker/distribution/digest"
 	"github.com/docker/distribution/manifest/schema1"
f33fa1b8
 	"github.com/docker/distribution/manifest/schema2"
63099477
 	distreference "github.com/docker/distribution/reference"
f33fa1b8
 	"github.com/docker/distribution/registry/client"
694df3ff
 	"github.com/docker/docker/distribution/metadata"
572ce802
 	"github.com/docker/docker/distribution/xfer"
c8d277d2
 	"github.com/docker/docker/image"
694df3ff
 	"github.com/docker/docker/layer"
572ce802
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/progress"
694df3ff
 	"github.com/docker/docker/pkg/stringid"
2655954c
 	"github.com/docker/docker/reference"
694df3ff
 	"github.com/docker/docker/registry"
 	"golang.org/x/net/context"
 )
 
65370be8
 // PushResult contains the tag, manifest digest, and manifest size from the
 // push. It's used to signal this information to the trust code in the client
 // so it can sign the manifest if necessary.
 type PushResult struct {
 	Tag    string
 	Digest digest.Digest
 	Size   int
 }
 
694df3ff
 type v2Pusher struct {
63099477
 	v2MetadataService *metadata.V2MetadataService
 	ref               reference.Named
 	endpoint          registry.APIEndpoint
 	repoInfo          *registry.RepositoryInfo
 	config            *ImagePushConfig
 	repo              distribution.Repository
694df3ff
 
5c99eebe
 	// pushState is state built by the Upload functions.
f33fa1b8
 	pushState pushState
572ce802
 }
 
f33fa1b8
 type pushState struct {
572ce802
 	sync.Mutex
f33fa1b8
 	// remoteLayers is the set of layers known to exist on the remote side.
 	// This avoids redundant queries when pushing multiple tags that
 	// involve the same layers. It is also used to fill in digest and size
 	// information when building the manifest.
 	remoteLayers map[layer.DiffID]distribution.Descriptor
 	// confirmedV2 is set to true if we confirm we're talking to a v2
 	// registry. This is used to limit fallbacks to the v1 protocol.
 	confirmedV2 bool
694df3ff
 }
 
a57478d6
 func (p *v2Pusher) Push(ctx context.Context) (err error) {
f33fa1b8
 	p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)
 
 	p.repo, p.pushState.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
694df3ff
 	if err != nil {
 		logrus.Debugf("Error getting v2 registry: %v", err)
5e8af46f
 		return err
694df3ff
 	}
 
a57478d6
 	if err = p.pushV2Repository(ctx); err != nil {
8f26fe4f
 		if continueOnError(err) {
5e8af46f
 			return fallbackError{
 				err:         err,
 				confirmedV2: p.pushState.confirmedV2,
 				transportOK: true,
 			}
a57478d6
 		}
 	}
 	return err
 }
 
 func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
c8d277d2
 	if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged {
2655954c
 		imageID, err := p.config.ReferenceStore.Get(p.ref)
694df3ff
 		if err != nil {
a57478d6
 			return fmt.Errorf("tag does not exist: %s", p.ref.String())
694df3ff
 		}
 
c8d277d2
 		return p.pushV2Tag(ctx, namedTagged, imageID)
694df3ff
 	}
c8d277d2
 
 	if !reference.IsNameOnly(p.ref) {
 		return errors.New("cannot push a digest reference")
694df3ff
 	}
 
c8d277d2
 	// Pull all tags
 	pushed := 0
 	for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
 		if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
 			pushed++
 			if err := p.pushV2Tag(ctx, namedTagged, association.ImageID); err != nil {
 				return err
 			}
694df3ff
 		}
 	}
 
c8d277d2
 	if pushed == 0 {
 		return fmt.Errorf("no tags to push for %s", p.repoInfo.Name())
 	}
 
a57478d6
 	return nil
694df3ff
 }
 
c8d277d2
 func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, imageID image.ID) error {
694df3ff
 	logrus.Debugf("Pushing repository: %s", ref.String())
 
c8d277d2
 	img, err := p.config.ImageStore.Get(imageID)
694df3ff
 	if err != nil {
 		return fmt.Errorf("could not find image from tag %s: %v", ref.String(), err)
 	}
 
 	var l layer.Layer
 
 	topLayerID := img.RootFS.ChainID()
 	if topLayerID == "" {
 		l = layer.EmptyLayer
 	} else {
 		l, err = p.config.LayerStore.Get(topLayerID)
 		if err != nil {
 			return fmt.Errorf("failed to get top layer from image: %v", err)
 		}
 		defer layer.ReleaseAndLog(p.config.LayerStore, l)
 	}
 
572ce802
 	var descriptors []xfer.UploadDescriptor
694df3ff
 
a57478d6
 	descriptorTemplate := v2PushDescriptor{
63099477
 		v2MetadataService: p.v2MetadataService,
 		repoInfo:          p.repoInfo,
 		repo:              p.repo,
 		pushState:         &p.pushState,
694df3ff
 	}
 
572ce802
 	// Loop bounds condition is to avoid pushing the base layer on Windows.
694df3ff
 	for i := 0; i < len(img.RootFS.DiffIDs); i++ {
a57478d6
 		descriptor := descriptorTemplate
 		descriptor.layer = l
 		descriptors = append(descriptors, &descriptor)
694df3ff
 
 		l = l.Parent()
 	}
 
f33fa1b8
 	if err := p.config.UploadManager.Upload(ctx, descriptors, p.config.ProgressOutput); err != nil {
572ce802
 		return err
 	}
 
c8d277d2
 	// Try schema2 first
 	builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), img.RawJSON())
 	manifest, err := manifestFromBuilder(ctx, builder, descriptors)
694df3ff
 	if err != nil {
 		return err
 	}
 
c8d277d2
 	manSvc, err := p.repo.Manifests(ctx)
694df3ff
 	if err != nil {
 		return err
 	}
c8d277d2
 
9d99e360
 	putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())}
c8d277d2
 	if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
 		logrus.Warnf("failed to upload schema2 manifest: %v - falling back to schema1", err)
 
4d437a29
 		manifestRef, err := distreference.WithTag(p.repo.Named(), ref.Tag())
 		if err != nil {
 			return err
 		}
 		builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, img.RawJSON())
c8d277d2
 		manifest, err = manifestFromBuilder(ctx, builder, descriptors)
 		if err != nil {
 			return err
 		}
 
 		if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
 			return err
694df3ff
 		}
 	}
 
c8d277d2
 	var canonicalManifest []byte
 
 	switch v := manifest.(type) {
 	case *schema1.SignedManifest:
 		canonicalManifest = v.Canonical
 	case *schema2.DeserializedManifest:
 		_, canonicalManifest, err = v.Payload()
 		if err != nil {
 			return err
 		}
694df3ff
 	}
f33fa1b8
 
c8d277d2
 	manifestDigest := digest.FromBytes(canonicalManifest)
 	progress.Messagef(p.config.ProgressOutput, "", "%s: digest: %s size: %d", ref.Tag(), manifestDigest, len(canonicalManifest))
 	// Signal digest to the trust client so it can sign the
 	// push, if appropriate.
 	progress.Aux(p.config.ProgressOutput, PushResult{Tag: ref.Tag(), Digest: manifestDigest, Size: len(canonicalManifest)})
 
 	return nil
 }
 
 func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuilder, descriptors []xfer.UploadDescriptor) (distribution.Manifest, error) {
 	// descriptors is in reverse order; iterate backwards to get references
 	// appended in the right order.
 	for i := len(descriptors) - 1; i >= 0; i-- {
 		if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil {
 			return nil, err
 		}
f33fa1b8
 	}
c8d277d2
 
 	return builder.Build(ctx)
694df3ff
 }
 
572ce802
 type v2PushDescriptor struct {
63099477
 	layer             layer.Layer
 	v2MetadataService *metadata.V2MetadataService
 	repoInfo          reference.Named
 	repo              distribution.Repository
 	pushState         *pushState
5c99eebe
 	remoteDescriptor  distribution.Descriptor
572ce802
 }
 
 func (pd *v2PushDescriptor) Key() string {
4d437a29
 	return "v2push:" + pd.repo.Named().Name() + " " + pd.layer.DiffID().String()
572ce802
 }
 
 func (pd *v2PushDescriptor) ID() string {
 	return stringid.TruncateID(pd.layer.DiffID().String())
 }
 
 func (pd *v2PushDescriptor) DiffID() layer.DiffID {
 	return pd.layer.DiffID()
 }
 
5c99eebe
 func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
572ce802
 	diffID := pd.DiffID()
 
f33fa1b8
 	pd.pushState.Lock()
5c99eebe
 	if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
f33fa1b8
 		// it is already known that the push is not needed and
 		// therefore doing a stat is unnecessary
 		pd.pushState.Unlock()
 		progress.Update(progressOutput, pd.ID(), "Layer already exists")
5c99eebe
 		return descriptor, nil
f33fa1b8
 	}
 	pd.pushState.Unlock()
694df3ff
 
63099477
 	// Do we have any metadata associated with this layer's DiffID?
 	v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
694df3ff
 	if err == nil {
63099477
 		descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState)
694df3ff
 		if err != nil {
572ce802
 			progress.Update(progressOutput, pd.ID(), "Image push failed")
5c99eebe
 			return distribution.Descriptor{}, retryOnError(err)
694df3ff
 		}
 		if exists {
572ce802
 			progress.Update(progressOutput, pd.ID(), "Layer already exists")
f33fa1b8
 			pd.pushState.Lock()
 			pd.pushState.remoteLayers[diffID] = descriptor
 			pd.pushState.Unlock()
5c99eebe
 			return descriptor, nil
694df3ff
 		}
 	}
 
f33fa1b8
 	logrus.Debugf("Pushing layer: %s", diffID)
 
694df3ff
 	// if digest was empty or not saved, or if blob does not exist on the remote repository,
 	// then push the blob.
572ce802
 	bs := pd.repo.Blobs(ctx)
 
1d3480f9
 	var layerUpload distribution.BlobWriter
 	mountAttemptsRemaining := 3
 
 	// Attempt to find another repository in the same registry to mount the layer
 	// from to avoid an unnecessary upload.
 	// Note: metadata is stored from oldest to newest, so we iterate through this
 	// slice in reverse to maximize our chances of the blob still existing in the
 	// remote repository.
 	for i := len(v2Metadata) - 1; i >= 0 && mountAttemptsRemaining > 0; i-- {
 		mountFrom := v2Metadata[i]
 
 		sourceRepo, err := reference.ParseNamed(mountFrom.SourceRepository)
7289c721
 		if err != nil {
 			continue
 		}
1d3480f9
 		if pd.repoInfo.Hostname() != sourceRepo.Hostname() {
 			// don't mount blobs from another registry
 			continue
63099477
 		}
7289c721
 
63099477
 		namedRef, err := reference.WithName(mountFrom.SourceRepository)
 		if err != nil {
1d3480f9
 			continue
63099477
 		}
7289c721
 
63099477
 		// TODO (brianbland): We need to construct a reference where the Name is
 		// only the full remote name, so clean this up when distribution has a
 		// richer reference package
 		remoteRef, err := distreference.WithName(namedRef.RemoteName())
 		if err != nil {
1d3480f9
 			continue
63099477
 		}
7289c721
 
63099477
 		canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest)
 		if err != nil {
1d3480f9
 			continue
7289c721
 		}
63099477
 
1d3480f9
 		logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountFrom.Digest, sourceRepo.FullName())
7289c721
 
1d3480f9
 		layerUpload, err = bs.Create(ctx, client.WithMountFrom(canonicalRef))
 		switch err := err.(type) {
 		case distribution.ErrBlobMounted:
 			progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
63099477
 
1d3480f9
 			err.Descriptor.MediaType = schema2.MediaTypeLayer
06e9a056
 
1d3480f9
 			pd.pushState.Lock()
 			pd.pushState.confirmedV2 = true
 			pd.pushState.remoteLayers[diffID] = err.Descriptor
 			pd.pushState.Unlock()
63099477
 
1d3480f9
 			// Cache mapping from this layer's DiffID to the blobsum
 			if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
5c99eebe
 				return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
1d3480f9
 			}
5c99eebe
 			return err.Descriptor, nil
1d3480f9
 		case nil:
 			// blob upload session created successfully, so begin the upload
 			mountAttemptsRemaining = 0
 		default:
 			// unable to mount layer from this repository, so this source mapping is no longer valid
 			logrus.Debugf("unassociating layer %s (%s) with %s", diffID, mountFrom.Digest, mountFrom.SourceRepository)
 			pd.v2MetadataService.Remove(mountFrom)
 			mountAttemptsRemaining--
63099477
 		}
 	}
 
1d3480f9
 	if layerUpload == nil {
 		layerUpload, err = bs.Create(ctx)
 		if err != nil {
5c99eebe
 			return distribution.Descriptor{}, retryOnError(err)
1d3480f9
 		}
572ce802
 	}
 	defer layerUpload.Close()
 
 	arch, err := pd.layer.TarStream()
694df3ff
 	if err != nil {
5c99eebe
 		return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
694df3ff
 	}
572ce802
 
 	// don't care if this fails; best effort
 	size, _ := pd.layer.DiffSize()
 
 	reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing")
e273445d
 	compressedReader, compressionDone := compress(reader)
 	defer func() {
 		reader.Close()
 		<-compressionDone
 	}()
572ce802
 
 	digester := digest.Canonical.New()
 	tee := io.TeeReader(compressedReader, digester.Hash())
 
 	nn, err := layerUpload.ReadFrom(tee)
 	compressedReader.Close()
 	if err != nil {
5c99eebe
 		return distribution.Descriptor{}, retryOnError(err)
572ce802
 	}
 
 	pushDigest := digester.Digest()
 	if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
5c99eebe
 		return distribution.Descriptor{}, retryOnError(err)
572ce802
 	}
 
 	logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
 	progress.Update(progressOutput, pd.ID(), "Pushed")
 
694df3ff
 	// Cache mapping from this layer's DiffID to the blobsum
63099477
 	if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
5c99eebe
 		return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
694df3ff
 	}
 
f33fa1b8
 	pd.pushState.Lock()
 
99a39690
 	// If Commit succeeded, that's an indication that the remote registry
f33fa1b8
 	// speaks the v2 protocol.
 	pd.pushState.confirmedV2 = true
 
5c99eebe
 	descriptor := distribution.Descriptor{
f33fa1b8
 		Digest:    pushDigest,
 		MediaType: schema2.MediaTypeLayer,
 		Size:      nn,
 	}
5c99eebe
 	pd.pushState.remoteLayers[diffID] = descriptor
f33fa1b8
 
 	pd.pushState.Unlock()
 
5c99eebe
 	return descriptor, nil
 }
 
 func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
 	pd.remoteDescriptor = descriptor
f33fa1b8
 }
572ce802
 
f33fa1b8
 func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
5c99eebe
 	return pd.remoteDescriptor
694df3ff
 }
 
63099477
 // layerAlreadyExists checks if the registry already know about any of the
 // metadata passed in the "metadata" slice. If it finds one that the registry
694df3ff
 // knows about, it returns the known digest and "true".
63099477
 func layerAlreadyExists(ctx context.Context, metadata []metadata.V2Metadata, repoInfo reference.Named, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) {
 	for _, meta := range metadata {
7289c721
 		// Only check blobsums that are known to this repository or have an unknown source
63099477
 		if meta.SourceRepository != "" && meta.SourceRepository != repoInfo.FullName() {
7289c721
 			continue
 		}
63099477
 		descriptor, err := repo.Blobs(ctx).Stat(ctx, meta.Digest)
694df3ff
 		switch err {
 		case nil:
f33fa1b8
 			descriptor.MediaType = schema2.MediaTypeLayer
 			return descriptor, true, nil
694df3ff
 		case distribution.ErrBlobUnknown:
 			// nop
 		default:
f33fa1b8
 			return distribution.Descriptor{}, false, err
694df3ff
 		}
 	}
f33fa1b8
 	return distribution.Descriptor{}, false, nil
694df3ff
 }