package client
import (
"context"
"encoding/json"
"errors"
"io"
"iter"
"net/url"
"strings"
"sync"
cerrdefs "github.com/containerd/errdefs"
"github.com/distribution/reference"
"github.com/moby/moby/client/pkg/jsonmessage"
)
func newImagePullResponse(rc io.ReadCloser) ImagePullResponse {
if rc == nil {
panic("nil io.ReadCloser")
}
return ImagePullResponse{
rc: rc,
close: sync.OnceValue(rc.Close),
}
}
type ImagePullResponse struct {
rc io.ReadCloser
close func() error
}
// Read implements io.ReadCloser
func (r ImagePullResponse) Read(p []byte) (n int, err error) {
if r.rc == nil {
return 0, io.EOF
}
return r.rc.Read(p)
}
// Close implements io.ReadCloser
func (r ImagePullResponse) Close() error {
if r.close == nil {
return nil
}
return r.close()
}
// JSONMessages decodes the response stream as a sequence of JSONMessages.
// if stream ends or context is cancelled, the underlying [io.Reader] is closed.
func (r ImagePullResponse) JSONMessages(ctx context.Context) iter.Seq2[jsonmessage.JSONMessage, error] {
context.AfterFunc(ctx, func() {
_ = r.Close()
})
dec := json.NewDecoder(r)
return func(yield func(jsonmessage.JSONMessage, error) bool) {
defer r.Close()
for {
var jm jsonmessage.JSONMessage
err := dec.Decode(&jm)
if errors.Is(err, io.EOF) {
break
}
if ctx.Err() != nil {
yield(jm, ctx.Err())
return
}
if !yield(jm, err) {
return
}
}
}
}
// ImagePull requests the docker host to pull an image from a remote registry.
// It executes the privileged function if the operation is unauthorized
// and it tries one more time.
// Callers can use [ImagePullResponse.JSONMessages] to monitor pull progress as
// a sequence of JSONMessages, [ImagePullResponse.Close] does not need to be
// called in this case. Or, use the [io.Reader] interface and call
// [ImagePullResponse.Close] after processing.
func (cli *Client) ImagePull(ctx context.Context, refStr string, options ImagePullOptions) (ImagePullResponse, error) {
// FIXME(vdemeester): there is currently used in a few way in docker/docker
// - if not in trusted content, ref is used to pass the whole reference, and tag is empty
// - if in trusted content, ref is used to pass the reference name, and tag for the digest
//
// ref; https://github.com/docker-archive-public/docker.engine-api/pull/162
ref, err := reference.ParseNormalizedNamed(refStr)
if err != nil {
return ImagePullResponse{}, err
}
query := url.Values{}
query.Set("fromImage", ref.Name())
if !options.All {
query.Set("tag", getAPITagFromNamedRef(ref))
}
if options.Platform != "" {
query.Set("platform", strings.ToLower(options.Platform))
}
resp, err := cli.tryImageCreate(ctx, query, staticAuth(options.RegistryAuth))
if cerrdefs.IsUnauthorized(err) && options.PrivilegeFunc != nil {
resp, err = cli.tryImageCreate(ctx, query, options.PrivilegeFunc)
}
if err != nil {
return ImagePullResponse{}, err
}
return newImagePullResponse(resp.Body), nil
}
// getAPITagFromNamedRef returns a tag from the specified reference.
// This function is necessary as long as the docker "server" api expects
// digests to be sent as tags and makes a distinction between the name
// and tag/digest part of a reference.
func getAPITagFromNamedRef(ref reference.Named) string {
if digested, ok := ref.(reference.Digested); ok {
return digested.Digest().String()
}
ref = reference.TagNameOnly(ref)
if tagged, ok := ref.(reference.Tagged); ok {
return tagged.Tag()
}
return ""
}