// Package streamformatter provides helper functions to format a stream.
package streamformatter
import (
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/docker/go-units"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/moby/client/pkg/progress"
)
const streamNewline = "\r\n"
func appendNewline(source []byte) []byte {
return append(source, '\r', '\n')
}
type jsonProgressFormatter struct{}
func (sf *jsonProgressFormatter) format(prog progress.Progress) []byte {
if sf == nil {
return nil
}
if prog.Message != "" {
return sf.formatStatus(prog.ID, prog.Message)
}
jsonProgress := jsonstream.Progress{
Current: prog.Current,
Total: prog.Total,
HideCounts: prog.HideCounts,
Units: prog.Units,
}
return sf.formatProgress(prog.ID, prog.Action, &jsonProgress, prog.Aux)
}
func (sf *jsonProgressFormatter) emptyMessage() []byte {
return []byte(`{}` + streamNewline)
}
// formatStatus formats the id and status.
func (sf *jsonProgressFormatter) formatStatus(id, status string) []byte {
b, err := json.Marshal(&jsonstream.Message{
ID: id,
Status: status,
})
if err != nil {
// should never happen with the given struct.
return nil
}
return appendNewline(b)
}
// formatProgress formats the progress information for a specified action.
func (sf *jsonProgressFormatter) formatProgress(id, action string, progress *jsonstream.Progress, aux any) []byte {
var auxJSON *json.RawMessage
if aux != nil {
b, err := json.Marshal(aux)
if err != nil {
return nil
}
auxJSON = (*json.RawMessage)(&b)
}
if progress == nil {
progress = &jsonstream.Progress{}
}
b, err := json.Marshal(&jsonstream.Message{
Status: action,
Progress: progress,
ID: id,
Aux: auxJSON,
})
if err != nil {
return nil
}
return appendNewline(b)
}
type rawProgressFormatter struct{}
func (sf *rawProgressFormatter) format(prog progress.Progress) []byte {
if sf == nil {
return nil
}
if prog.Message != "" {
return []byte(prog.Message + streamNewline)
}
// TODO(thaJeztah): ID and Aux are not printed by the rawProgressFormatter; should they?
return sf.formatProgress(prog.Action, &jsonstream.Progress{
Current: prog.Current,
Total: prog.Total,
HideCounts: prog.HideCounts,
Units: prog.Units,
})
}
func (sf *rawProgressFormatter) emptyMessage() []byte {
return []byte(streamNewline)
}
func rawProgressString(p *jsonstream.Progress) string {
if p == nil || (p.Current <= 0 && p.Total <= 0) {
return ""
}
if p.Total <= 0 {
switch p.Units {
case "":
return fmt.Sprintf("%8v", units.HumanSize(float64(p.Current)))
default:
return fmt.Sprintf("%d %s", p.Current, p.Units)
}
}
percentage := min(int(float64(p.Current)/float64(p.Total)*100)/2, 50)
numSpaces := max(50-percentage, 0)
pbBox := fmt.Sprintf("[%s>%s] ", strings.Repeat("=", percentage), strings.Repeat(" ", numSpaces))
var numbersBox string
switch {
case p.HideCounts:
case p.Units == "": // no units, use bytes
current := units.HumanSize(float64(p.Current))
total := units.HumanSize(float64(p.Total))
numbersBox = fmt.Sprintf("%8v/%v", current, total)
if p.Current > p.Total {
// remove total display if the reported current is wonky.
numbersBox = fmt.Sprintf("%8v", current)
}
default:
numbersBox = fmt.Sprintf("%d/%d %s", p.Current, p.Total, p.Units)
if p.Current > p.Total {
// remove total display if the reported current is wonky.
numbersBox = fmt.Sprintf("%d %s", p.Current, p.Units)
}
}
// FIXME(thaJeztah): p.Start is never set, because progress.Progress doesn't have this field
var timeLeftBox string
if p.Current > 0 && p.Start > 0 && percentage < 50 {
fromStart := time.Since(time.Unix(p.Start, 0))
perEntry := fromStart / time.Duration(p.Current)
left := time.Duration(p.Total-p.Current) * perEntry
timeLeftBox = " " + left.Round(time.Second).String()
}
return pbBox + numbersBox + timeLeftBox
}
func (sf *rawProgressFormatter) formatProgress(action string, progress *jsonstream.Progress) []byte {
endl := "\r"
out := rawProgressString(progress)
if out == "" {
endl += "\n"
}
return []byte(action + " " + out + endl)
}
// NewProgressOutput returns a progress.Output object that can be passed to
// progress.NewProgressReader.
func NewProgressOutput(out io.Writer) progress.Output {
return &progressOutput{sf: &rawProgressFormatter{}, out: out, newLines: true}
}
// NewJSONProgressOutput returns a progress.Output that formats output
// using JSON objects
func NewJSONProgressOutput(out io.Writer, newLines bool) progress.Output {
return &progressOutput{sf: &jsonProgressFormatter{}, out: out, newLines: newLines}
}
type formatProgress interface {
// format formats progress information from a ProgressReader.
format(prog progress.Progress) []byte
// emptyMessage returns the output of an empty message if the formatter
// is configured to emit a trailing newline.
emptyMessage() []byte
}
type progressOutput struct {
sf formatProgress
out io.Writer
// TODO(thaJeztah): investigate if this can be removed or replaced.
//
// It was a workaround for responses adding an extra (final) (aux) message
// progress; see https://github.com/moby/moby/pull/1425. When updating, also
// check for the similar implementation in daemon/internal/streamformatter.
newLines bool
mu sync.Mutex
}
// WriteProgress formats progress information from a ProgressReader.
func (out *progressOutput) WriteProgress(prog progress.Progress) error {
formatted := out.sf.format(prog)
out.mu.Lock()
defer out.mu.Unlock()
_, err := out.out.Write(formatted)
if err != nil {
return err
}
if out.newLines && prog.LastUpdate {
_, err = out.out.Write(out.sf.emptyMessage())
return err
}
return nil
}