package utils
import (
"context"
"errors"
"io"
"syscall"
"github.com/containerd/log"
"github.com/moby/moby/v2/daemon/internal/progress"
"github.com/moby/moby/v2/daemon/internal/streamformatter"
)
// WriteDistributionProgress is a helper for writing progress from chan to JSON
// stream with an optional cancel function.
func WriteDistributionProgress(cancelFunc func(), outStream io.Writer, progressChan <-chan progress.Progress) {
progressOutput := streamformatter.NewJSONProgressOutput(outStream, false)
operationCancelled := false
for prog := range progressChan {
if err := progressOutput.WriteProgress(prog); err != nil && !operationCancelled {
// don't log broken pipe errors as this is the normal case when a client aborts
if errors.Is(err, syscall.EPIPE) {
log.G(context.TODO()).Info("Pull session cancelled")
} else {
log.G(context.TODO()).Errorf("error writing progress to client: %v", err)
}
cancelFunc()
operationCancelled = true
// Don't return, because we need to continue draining
// progressChan until it's closed to avoid a deadlock.
}
}
}