package content import ( "context" "io" contentapi "github.com/containerd/containerd/api/services/content/v1" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" protobuftypes "github.com/gogo/protobuf/types" digest "github.com/opencontainers/go-digest" ) type remoteStore struct { client contentapi.ContentClient } // NewStoreFromClient returns a new content store func NewStoreFromClient(client contentapi.ContentClient) content.Store { return &remoteStore{ client: client, } } func (rs *remoteStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { resp, err := rs.client.Info(ctx, &contentapi.InfoRequest{ Digest: dgst, }) if err != nil { return content.Info{}, errdefs.FromGRPC(err) } return infoFromGRPC(resp.Info), nil } func (rs *remoteStore) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error { session, err := rs.client.List(ctx, &contentapi.ListContentRequest{ Filters: filters, }) if err != nil { return errdefs.FromGRPC(err) } for { msg, err := session.Recv() if err != nil { if err != io.EOF { return errdefs.FromGRPC(err) } break } for _, info := range msg.Info { if err := fn(infoFromGRPC(info)); err != nil { return err } } } return nil } func (rs *remoteStore) Delete(ctx context.Context, dgst digest.Digest) error { if _, err := rs.client.Delete(ctx, &contentapi.DeleteContentRequest{ Digest: dgst, }); err != nil { return errdefs.FromGRPC(err) } return nil } func (rs *remoteStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { i, err := rs.Info(ctx, dgst) if err != nil { return nil, err } return &remoteReaderAt{ ctx: ctx, digest: dgst, size: i.Size, client: rs.client, }, nil } func (rs *remoteStore) Status(ctx context.Context, ref string) (content.Status, error) { resp, err := rs.client.Status(ctx, &contentapi.StatusRequest{ Ref: ref, }) if err != nil { return content.Status{}, errdefs.FromGRPC(err) } status := resp.Status return content.Status{ Ref: status.Ref, StartedAt: status.StartedAt, UpdatedAt: status.UpdatedAt, Offset: status.Offset, Total: status.Total, Expected: status.Expected, }, nil } func (rs *remoteStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { resp, err := rs.client.Update(ctx, &contentapi.UpdateRequest{ Info: infoToGRPC(info), UpdateMask: &protobuftypes.FieldMask{ Paths: fieldpaths, }, }) if err != nil { return content.Info{}, errdefs.FromGRPC(err) } return infoFromGRPC(resp.Info), nil } func (rs *remoteStore) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) { resp, err := rs.client.ListStatuses(ctx, &contentapi.ListStatusesRequest{ Filters: filters, }) if err != nil { return nil, errdefs.FromGRPC(err) } var statuses []content.Status for _, status := range resp.Statuses { statuses = append(statuses, content.Status{ Ref: status.Ref, StartedAt: status.StartedAt, UpdatedAt: status.UpdatedAt, Offset: status.Offset, Total: status.Total, Expected: status.Expected, }) } return statuses, nil } func (rs *remoteStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { wrclient, offset, err := rs.negotiate(ctx, ref, size, expected) if err != nil { return nil, errdefs.FromGRPC(err) } return &remoteWriter{ ref: ref, client: wrclient, offset: offset, }, nil } // Abort implements asynchronous abort. It starts a new write session on the ref l func (rs *remoteStore) Abort(ctx context.Context, ref string) error { if _, err := rs.client.Abort(ctx, &contentapi.AbortRequest{ Ref: ref, }); err != nil { return errdefs.FromGRPC(err) } return nil } func (rs *remoteStore) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) { wrclient, err := rs.client.Write(ctx) if err != nil { return nil, 0, err } if err := wrclient.Send(&contentapi.WriteContentRequest{ Action: contentapi.WriteActionStat, Ref: ref, Total: size, Expected: expected, }); err != nil { return nil, 0, err } resp, err := wrclient.Recv() if err != nil { return nil, 0, err } return wrclient, resp.Offset, nil } func infoToGRPC(info content.Info) contentapi.Info { return contentapi.Info{ Digest: info.Digest, Size_: info.Size, CreatedAt: info.CreatedAt, UpdatedAt: info.UpdatedAt, Labels: info.Labels, } } func infoFromGRPC(info contentapi.Info) content.Info { return content.Info{ Digest: info.Digest, Size: info.Size_, CreatedAt: info.CreatedAt, UpdatedAt: info.UpdatedAt, Labels: info.Labels, } }