distribution/push.go
694df3ff
 package distribution
 
 import (
 	"bufio"
 	"compress/gzip"
 	"fmt"
 	"io"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/distribution/metadata"
572ce802
 	"github.com/docker/docker/pkg/progress"
2655954c
 	"github.com/docker/docker/reference"
694df3ff
 	"github.com/docker/docker/registry"
572ce802
 	"golang.org/x/net/context"
694df3ff
 )
 
 // Pusher is an interface that abstracts pushing for different API versions.
 type Pusher interface {
 	// Push tries to push the image configured at the creation of Pusher.
 	// Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint.
 	//
 	// TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
a57478d6
 	Push(ctx context.Context) error
694df3ff
 }
 
 const compressionBufSize = 32768
 
 // NewPusher creates a new Pusher interface that will push to either a v1 or v2
 // registry. The endpoint argument contains a Version field that determines
 // whether a v1 or v2 pusher will be created. The other parameters are passed
 // through to the underlying pusher implementation for use during the actual
 // push operation.
572ce802
 func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig) (Pusher, error) {
694df3ff
 	switch endpoint.Version {
 	case registry.APIVersion2:
 		return &v2Pusher{
63099477
 			v2MetadataService: metadata.NewV2MetadataService(imagePushConfig.MetadataStore),
 			ref:               ref,
 			endpoint:          endpoint,
 			repoInfo:          repoInfo,
 			config:            imagePushConfig,
694df3ff
 		}, nil
 	case registry.APIVersion1:
 		return &v1Pusher{
 			v1IDService: metadata.NewV1IDService(imagePushConfig.MetadataStore),
 			ref:         ref,
 			endpoint:    endpoint,
 			repoInfo:    repoInfo,
 			config:      imagePushConfig,
 		}, nil
 	}
 	return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
 }
 
ccaea227
 // Push initiates a push operation on ref.
694df3ff
 // ref is the specific variant of the image to be pushed.
 // If no tag is provided, all tags will be pushed.
572ce802
 func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushConfig) error {
694df3ff
 	// FIXME: Allow to interrupt current push when new push of same image is done.
 
 	// Resolve the Repository name from fqn to RepositoryInfo
 	repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref)
 	if err != nil {
 		return err
 	}
 
f2d481a2
 	endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(repoInfo.Hostname())
694df3ff
 	if err != nil {
 		return err
 	}
 
ffded61d
 	progress.Messagef(imagePushConfig.ProgressOutput, "", "The push refers to a repository [%s]", repoInfo.FullName())
694df3ff
 
ffded61d
 	associations := imagePushConfig.ReferenceStore.ReferencesByName(repoInfo)
694df3ff
 	if len(associations) == 0 {
e57900a2
 		return fmt.Errorf("An image does not exist locally with the tag: %s", repoInfo.Name())
694df3ff
 	}
 
a57478d6
 	var (
 		lastErr error
 
 		// confirmedV2 is set to true if a push attempt managed to
 		// confirm that it was talking to a v2 registry. This will
 		// prevent fallback to the v1 protocol.
 		confirmedV2 bool
5e8af46f
 
 		// confirmedTLSRegistries is a map indicating which registries
 		// are known to be using TLS. There should never be a plaintext
 		// retry for any of these.
 		confirmedTLSRegistries = make(map[string]struct{})
a57478d6
 	)
 
694df3ff
 	for _, endpoint := range endpoints {
bb37c67a
 		if imagePushConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 {
 			continue
 		}
a57478d6
 		if confirmedV2 && endpoint.Version == registry.APIVersion1 {
 			logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
 			continue
 		}
 
79db131a
 		if endpoint.URL.Scheme != "https" {
 			if _, confirmedTLS := confirmedTLSRegistries[endpoint.URL.Host]; confirmedTLS {
5e8af46f
 				logrus.Debugf("Skipping non-TLS endpoint %s for host/port that appears to use TLS", endpoint.URL)
 				continue
 			}
 		}
 
ffded61d
 		logrus.Debugf("Trying to push %s to %s %s", repoInfo.FullName(), endpoint.URL, endpoint.Version)
694df3ff
 
572ce802
 		pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig)
694df3ff
 		if err != nil {
 			lastErr = err
 			continue
 		}
a57478d6
 		if err := pusher.Push(ctx); err != nil {
572ce802
 			// Was this push cancelled? If so, don't try to fall
 			// back.
 			select {
 			case <-ctx.Done():
 			default:
a57478d6
 				if fallbackErr, ok := err.(fallbackError); ok {
 					confirmedV2 = confirmedV2 || fallbackErr.confirmedV2
79db131a
 					if fallbackErr.transportOK && endpoint.URL.Scheme == "https" {
 						confirmedTLSRegistries[endpoint.URL.Host] = struct{}{}
5e8af46f
 					}
a57478d6
 					err = fallbackErr.err
 					lastErr = err
8f26fe4f
 					logrus.Errorf("Attempting next endpoint for push after error: %v", err)
a57478d6
 					continue
 				}
572ce802
 			}
 
8f26fe4f
 			logrus.Errorf("Not continuing with push after error: %v", err)
694df3ff
 			return err
 		}
 
72f1881d
 		imagePushConfig.ImageEventLogger(ref.String(), repoInfo.Name(), "push")
694df3ff
 		return nil
 	}
 
 	if lastErr == nil {
ffded61d
 		lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.FullName())
694df3ff
 	}
 	return lastErr
 }
 
 // compress returns an io.ReadCloser which will supply a compressed version of
 // the provided Reader. The caller must close the ReadCloser after reading the
 // compressed data.
 //
 // Note that this function returns a reader instead of taking a writer as an
 // argument so that it can be used with httpBlobWriter's ReadFrom method.
 // Using httpBlobWriter's Write method would send a PATCH request for every
 // Write call.
e273445d
 //
 // The second return value is a channel that gets closed when the goroutine
 // is finished. This allows the caller to make sure the goroutine finishes
 // before it releases any resources connected with the reader that was
 // passed in.
 func compress(in io.Reader) (io.ReadCloser, chan struct{}) {
 	compressionDone := make(chan struct{})
 
694df3ff
 	pipeReader, pipeWriter := io.Pipe()
 	// Use a bufio.Writer to avoid excessive chunking in HTTP request.
 	bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
 	compressor := gzip.NewWriter(bufWriter)
 
 	go func() {
 		_, err := io.Copy(compressor, in)
 		if err == nil {
 			err = compressor.Close()
 		}
 		if err == nil {
 			err = bufWriter.Flush()
 		}
 		if err != nil {
 			pipeWriter.CloseWithError(err)
 		} else {
 			pipeWriter.Close()
 		}
e273445d
 		close(compressionDone)
694df3ff
 	}()
 
e273445d
 	return pipeReader, compressionDone
694df3ff
 }