694df3ff |
package distribution
import (
"bufio"
"compress/gzip"
"fmt"
"io"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/distribution/metadata" |
572ce802 |
"github.com/docker/docker/pkg/progress" |
2655954c |
"github.com/docker/docker/reference" |
694df3ff |
"github.com/docker/docker/registry" |
572ce802 |
"golang.org/x/net/context" |
694df3ff |
)
// Pusher is an interface that abstracts pushing for different API versions.
type Pusher interface {
// Push tries to push the image configured at the creation of Pusher.
// Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint.
//
// TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic. |
a57478d6 |
Push(ctx context.Context) error |
694df3ff |
}
const compressionBufSize = 32768
// NewPusher creates a new Pusher interface that will push to either a v1 or v2
// registry. The endpoint argument contains a Version field that determines
// whether a v1 or v2 pusher will be created. The other parameters are passed
// through to the underlying pusher implementation for use during the actual
// push operation. |
572ce802 |
func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig) (Pusher, error) { |
694df3ff |
switch endpoint.Version {
case registry.APIVersion2:
return &v2Pusher{ |
63099477 |
v2MetadataService: metadata.NewV2MetadataService(imagePushConfig.MetadataStore),
ref: ref,
endpoint: endpoint,
repoInfo: repoInfo,
config: imagePushConfig, |
694df3ff |
}, nil
case registry.APIVersion1:
return &v1Pusher{
v1IDService: metadata.NewV1IDService(imagePushConfig.MetadataStore),
ref: ref,
endpoint: endpoint,
repoInfo: repoInfo,
config: imagePushConfig,
}, nil
}
return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
}
|
ccaea227 |
// Push initiates a push operation on ref. |
694df3ff |
// ref is the specific variant of the image to be pushed.
// If no tag is provided, all tags will be pushed. |
572ce802 |
func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushConfig) error { |
694df3ff |
// FIXME: Allow to interrupt current push when new push of same image is done.
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref)
if err != nil {
return err
}
|
f2d481a2 |
endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(repoInfo.Hostname()) |
694df3ff |
if err != nil {
return err
}
|
ffded61d |
progress.Messagef(imagePushConfig.ProgressOutput, "", "The push refers to a repository [%s]", repoInfo.FullName()) |
694df3ff |
|
ffded61d |
associations := imagePushConfig.ReferenceStore.ReferencesByName(repoInfo) |
694df3ff |
if len(associations) == 0 { |
e57900a2 |
return fmt.Errorf("An image does not exist locally with the tag: %s", repoInfo.Name()) |
694df3ff |
}
|
a57478d6 |
var (
lastErr error
// confirmedV2 is set to true if a push attempt managed to
// confirm that it was talking to a v2 registry. This will
// prevent fallback to the v1 protocol.
confirmedV2 bool |
5e8af46f |
// confirmedTLSRegistries is a map indicating which registries
// are known to be using TLS. There should never be a plaintext
// retry for any of these.
confirmedTLSRegistries = make(map[string]struct{}) |
a57478d6 |
)
|
694df3ff |
for _, endpoint := range endpoints { |
bb37c67a |
if imagePushConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 {
continue
} |
a57478d6 |
if confirmedV2 && endpoint.Version == registry.APIVersion1 {
logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
continue
}
|
79db131a |
if endpoint.URL.Scheme != "https" {
if _, confirmedTLS := confirmedTLSRegistries[endpoint.URL.Host]; confirmedTLS { |
5e8af46f |
logrus.Debugf("Skipping non-TLS endpoint %s for host/port that appears to use TLS", endpoint.URL)
continue
}
}
|
ffded61d |
logrus.Debugf("Trying to push %s to %s %s", repoInfo.FullName(), endpoint.URL, endpoint.Version) |
694df3ff |
|
572ce802 |
pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig) |
694df3ff |
if err != nil {
lastErr = err
continue
} |
a57478d6 |
if err := pusher.Push(ctx); err != nil { |
572ce802 |
// Was this push cancelled? If so, don't try to fall
// back.
select {
case <-ctx.Done():
default: |
a57478d6 |
if fallbackErr, ok := err.(fallbackError); ok {
confirmedV2 = confirmedV2 || fallbackErr.confirmedV2 |
79db131a |
if fallbackErr.transportOK && endpoint.URL.Scheme == "https" {
confirmedTLSRegistries[endpoint.URL.Host] = struct{}{} |
5e8af46f |
} |
a57478d6 |
err = fallbackErr.err
lastErr = err |
8f26fe4f |
logrus.Errorf("Attempting next endpoint for push after error: %v", err) |
a57478d6 |
continue
} |
572ce802 |
}
|
8f26fe4f |
logrus.Errorf("Not continuing with push after error: %v", err) |
694df3ff |
return err
}
|
72f1881d |
imagePushConfig.ImageEventLogger(ref.String(), repoInfo.Name(), "push") |
694df3ff |
return nil
}
if lastErr == nil { |
ffded61d |
lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.FullName()) |
694df3ff |
}
return lastErr
}
// compress returns an io.ReadCloser which will supply a compressed version of
// the provided Reader. The caller must close the ReadCloser after reading the
// compressed data.
//
// Note that this function returns a reader instead of taking a writer as an
// argument so that it can be used with httpBlobWriter's ReadFrom method.
// Using httpBlobWriter's Write method would send a PATCH request for every
// Write call. |
e273445d |
//
// The second return value is a channel that gets closed when the goroutine
// is finished. This allows the caller to make sure the goroutine finishes
// before it releases any resources connected with the reader that was
// passed in.
func compress(in io.Reader) (io.ReadCloser, chan struct{}) {
compressionDone := make(chan struct{})
|
694df3ff |
pipeReader, pipeWriter := io.Pipe()
// Use a bufio.Writer to avoid excessive chunking in HTTP request.
bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize)
compressor := gzip.NewWriter(bufWriter)
go func() {
_, err := io.Copy(compressor, in)
if err == nil {
err = compressor.Close()
}
if err == nil {
err = bufWriter.Flush()
}
if err != nil {
pipeWriter.CloseWithError(err)
} else {
pipeWriter.Close()
} |
e273445d |
close(compressionDone) |
694df3ff |
}()
|
e273445d |
return pipeReader, compressionDone |
694df3ff |
} |