package pullprogress
import (
"context"
"io"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/remotes"
cerrdefs "github.com/containerd/errdefs"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/progress"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type PullManager interface {
content.IngestManager
content.Manager
}
type ProviderWithProgress struct {
Provider content.Provider
Manager PullManager
}
func (p *ProviderWithProgress) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
ra, err := p.Provider.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancelCause(ctx)
doneCh := make(chan struct{})
go trackProgress(ctx, desc, p.Manager, doneCh)
return readerAtWithCancel{ReaderAt: ra, cancel: cancel, doneCh: doneCh, logger: bklog.G(ctx)}, nil
}
type readerAtWithCancel struct {
content.ReaderAt
cancel func(error)
doneCh <-chan struct{}
logger *logrus.Entry
}
func (ra readerAtWithCancel) Close() error {
ra.cancel(errors.WithStack(context.Canceled))
select {
case <-ra.doneCh:
case <-time.After(time.Second):
ra.logger.Warn("timeout waiting for pull progress to complete")
}
return ra.ReaderAt.Close()
}
type FetcherWithProgress struct {
Fetcher remotes.Fetcher
Manager PullManager
}
func (f *FetcherWithProgress) Fetch(ctx context.Context, desc ocispecs.Descriptor) (io.ReadCloser, error) {
rc, err := f.Fetcher.Fetch(ctx, desc)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancelCause(ctx)
doneCh := make(chan struct{})
go trackProgress(ctx, desc, f.Manager, doneCh)
return readerWithCancel{ReadCloser: rc, cancel: cancel, doneCh: doneCh, logger: bklog.G(ctx)}, nil
}
type readerWithCancel struct {
io.ReadCloser
cancel func(error)
doneCh <-chan struct{}
logger *logrus.Entry
}
func (r readerWithCancel) Close() error {
r.cancel(errors.WithStack(context.Canceled))
select {
case <-r.doneCh:
case <-time.After(time.Second):
r.logger.Warn("timeout waiting for pull progress to complete")
}
return r.ReadCloser.Close()
}
func trackProgress(ctx context.Context, desc ocispecs.Descriptor, manager PullManager, doneCh chan<- struct{}) {
defer close(doneCh)
ticker := time.NewTicker(150 * time.Millisecond)
defer ticker.Stop()
go func(ctx context.Context) {
<-ctx.Done()
ticker.Stop()
}(ctx)
pw, _, _ := progress.NewFromContext(ctx)
defer pw.Close()
ingestRef := remotes.MakeRefKey(ctx, desc)
started := time.Now()
onFinalStatus := false
for !onFinalStatus {
select {
case <-ctx.Done():
onFinalStatus = true
// we need a context for the manager.Status() calls to pass once. after that this function will exit
ctx = context.TODO()
case <-ticker.C:
}
status, err := manager.Status(ctx, ingestRef)
if err == nil {
pw.Write(desc.Digest.String(), progress.Status{
Current: int(status.Offset),
Total: int(status.Total),
Started: &started,
})
continue
} else if !errors.Is(err, cerrdefs.ErrNotFound) {
bklog.G(ctx).Errorf("unexpected error getting ingest status of %q: %v", ingestRef, err)
return
}
info, err := manager.Info(ctx, desc.Digest)
if err == nil {
// info.CreatedAt could be before started if parallel pull just completed
if info.CreatedAt.Before(started) {
started = info.CreatedAt
}
pw.Write(desc.Digest.String(), progress.Status{
Current: int(info.Size),
Total: int(info.Size),
Started: &started,
Completed: &info.CreatedAt,
})
return
}
}
}