package jsonmessage
import (
"encoding/json"
"errors"
"fmt"
"io"
"iter"
"strings"
"time"
"github.com/docker/go-units"
"github.com/moby/moby/api/types/jsonstream"
"github.com/moby/term"
)
var timeNow = time.Now // For overriding in tests.
// RFC3339NanoFixed is time.RFC3339Nano with nanoseconds padded using zeros to
// ensure the formatted time is always the same number of characters.
const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
// DisplayOpt configures behavior for [DisplayStream] and [DisplayMessages].
// Options are applied in order; if an option returns an error, processing
// stops and the error is returned to the caller.
type DisplayOpt func(*displayOpts) error
type displayOpts struct {
auxCallback func(jsonstream.Message)
}
// WithAuxCallback registers a callback that is invoked for auxiliary
// jsonstream messages as they are processed. The callback is optional;
// if not provided, auxiliary messages are ignored.
func WithAuxCallback(fn func(jsonstream.Message)) DisplayOpt {
return func(opts *displayOpts) error {
opts.auxCallback = fn
return nil
}
}
func RenderTUIProgress(p jsonstream.Progress, width uint16) string {
var (
pbBox string
numbersBox string
)
if p.Current <= 0 && p.Total <= 0 {
return ""
}
if p.Total <= 0 {
switch p.Units {
case "":
return fmt.Sprintf("%8v", units.HumanSize(float64(p.Current)))
default:
return fmt.Sprintf("%d %s", p.Current, p.Units)
}
}
percentage := min(int(float64(p.Current)/float64(p.Total)*100)/2, 50)
if width > 110 {
// this number can't be negative gh#7136
numSpaces := max(50-percentage, 0)
pbBox = fmt.Sprintf("[%s>%s] ", strings.Repeat("=", percentage), strings.Repeat(" ", numSpaces))
}
switch {
case p.HideCounts:
case p.Units == "": // no units, use bytes
current := units.HumanSize(float64(p.Current))
total := units.HumanSize(float64(p.Total))
numbersBox = fmt.Sprintf("%8v/%v", current, total)
if p.Current > p.Total {
// remove total display if the reported current is wonky.
numbersBox = fmt.Sprintf("%8v", current)
}
default:
numbersBox = fmt.Sprintf("%d/%d %s", p.Current, p.Total, p.Units)
if p.Current > p.Total {
// remove total display if the reported current is wonky.
numbersBox = fmt.Sprintf("%d %s", p.Current, p.Units)
}
}
// Show approximation of remaining time if there's enough width.
var timeLeftBox string
if width > 50 {
if p.Current > 0 && p.Start > 0 && percentage < 50 {
fromStart := timeNow().UTC().Sub(time.Unix(p.Start, 0))
perEntry := fromStart / time.Duration(p.Current)
left := time.Duration(p.Total-p.Current) * perEntry
timeLeftBox = " " + left.Round(time.Second).String()
}
}
return pbBox + numbersBox + timeLeftBox
}
// We can probably use [aec.EmptyBuilder] for managing the output, but
// currently we're doing it all manually, so defining some consts for
// the basics we use.
//
// [aec.EmptyBuilder]: https://pkg.go.dev/github.com/morikuni/aec#EmptyBuilder
const (
ansiEraseLine = "\x1b[2K" // Erase entire line
ansiCursorUpFmt = "\x1b[%dA" // Move cursor up N lines
ansiCursorDownFmt = "\x1b[%dB" // Move cursor down N lines
)
func clearLine(out io.Writer) {
_, _ = out.Write([]byte(ansiEraseLine))
}
func cursorUp(out io.Writer, l uint) {
if l == 0 {
return
}
_, _ = fmt.Fprintf(out, ansiCursorUpFmt, l)
}
func cursorDown(out io.Writer, l uint) {
if l == 0 {
return
}
_, _ = fmt.Fprintf(out, ansiCursorDownFmt, l)
}
// Display prints the JSONMessage to out. If isTerminal is true, it erases
// the entire current line when displaying the progressbar. It returns an
// error if the [JSONMessage.Error] field is non-nil.
func Display(jm jsonstream.Message, out io.Writer, isTerminal bool, width uint16) error {
if jm.Error != nil {
return jm.Error
}
var endl string
if isTerminal && jm.Stream == "" && jm.Progress != nil {
clearLine(out)
endl = "\r"
_, _ = fmt.Fprint(out, endl)
} else if jm.Progress != nil && (jm.Progress.Current > 0 || jm.Progress.Total > 0) { // disable progressbar in non-terminal
return nil
}
if jm.ID != "" {
_, _ = fmt.Fprintf(out, "%s: ", jm.ID)
}
if jm.Progress != nil && isTerminal {
if width == 0 {
width = 200
}
_, _ = fmt.Fprintf(out, "%s %s%s", jm.Status, RenderTUIProgress(*jm.Progress, width), endl)
} else if jm.Stream != "" {
_, _ = fmt.Fprintf(out, "%s%s", jm.Stream, endl)
} else {
_, _ = fmt.Fprintf(out, "%s%s\n", jm.Status, endl)
}
return nil
}
type JSONMessagesStream = iter.Seq2[jsonstream.Message, error]
// DisplayJSONMessagesStream is like [DisplayStream], but allows the caller to
// explicitly provide the terminal file descriptor and whether out is a terminal.
func DisplayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
var opts []DisplayOpt
if auxCallback != nil {
opts = append(opts, WithAuxCallback(auxCallback))
}
return displayJSONMessagesStream(in, out, terminalFd, isTerminal, opts...)
}
// DisplayStream reads a JSON message stream from in, and writes each
// [jsonstream.Message] to out. See [DisplayMessages] for details.
func DisplayStream(in io.Reader, out io.Writer, opts ...DisplayOpt) error {
terminalFd, isTerminal := term.GetFdInfo(out)
return displayJSONMessagesStream(in, out, terminalFd, isTerminal, opts...)
}
func displayJSONMessagesStream(in io.Reader, out io.Writer, terminalFd uintptr, isTerminal bool, opts ...DisplayOpt) error {
dec := json.NewDecoder(in)
f := func(yield func(jsonstream.Message, error) bool) {
for {
var jm jsonstream.Message
err := dec.Decode(&jm)
if errors.Is(err, io.EOF) {
break
}
if !yield(jm, err) {
return
}
}
}
return displayJSONMessages(f, out, terminalFd, isTerminal, opts...)
}
// DisplayJSONMessages is like [DisplayMessages], but allows the caller to
// explicitly provide the terminal file descriptor and whether out is a terminal.
func DisplayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, auxCallback func(jsonstream.Message)) error {
var opts []DisplayOpt
if auxCallback != nil {
opts = append(opts, WithAuxCallback(auxCallback))
}
return displayJSONMessages(messages, out, terminalFd, isTerminal, opts...)
}
// DisplayMessages writes each [jsonstream.Message] from stream to out.
// It returns an error if an invalid [jsonstream.Message] is received, or if
// a message contains a non-zero [jsonstream.Message.Error].
//
// Presentation of the message depends on whether out is a terminal, and on the
// terminal width. Progress bars ([jsonstream.Progress]) are suppressed on
// narrower terminals (< 110 characters). If out is a terminal, it prints a
// newline ("\n") at the end of each line and moves the cursor while displaying.
//
// auxCallback allows handling the [jsonstream.Message.Aux] field. It is called
// if a message contains an Aux field, in which case DisplayMessages does not
// present the message.
func DisplayMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, opts ...DisplayOpt) error {
terminalFd, isTerminal := term.GetFdInfo(out)
return displayJSONMessages(messages, out, terminalFd, isTerminal, opts...)
}
func displayJSONMessages(messages iter.Seq2[jsonstream.Message, error], out io.Writer, terminalFd uintptr, isTerminal bool, opts ...DisplayOpt) error {
var cfg displayOpts
for _, opt := range opts {
if opt == nil {
continue
}
if err := opt(&cfg); err != nil {
return err
}
}
auxCallback := cfg.auxCallback
ids := make(map[string]uint)
var width uint16 = 200
if isTerminal {
ws, err := term.GetWinsize(terminalFd)
if err == nil {
width = ws.Width
}
}
for jm, err := range messages {
var diff uint
if err != nil {
return err
}
if jm.Aux != nil {
if auxCallback != nil {
auxCallback(jm)
}
continue
}
if jm.ID != "" && jm.Progress != nil {
line, ok := ids[jm.ID]
if !ok {
// NOTE: This approach of using len(id) to
// figure out the number of lines of history
// only works as long as we clear the history
// when we output something that's not
// accounted for in the map, such as a line
// with no ID.
line = uint(len(ids))
ids[jm.ID] = line
if isTerminal {
_, _ = fmt.Fprintf(out, "\n")
}
}
diff = uint(len(ids)) - line
if isTerminal {
cursorUp(out, diff)
}
} else {
// When outputting something that isn't progress
// output, clear the history of previous lines. We
// don't want progress entries from some previous
// operation to be updated (for example, pull -a
// with multiple tags).
ids = make(map[string]uint)
}
err := Display(jm, out, isTerminal, width)
if jm.ID != "" && isTerminal {
cursorDown(out, diff)
}
if err != nil {
return err
}
}
return nil
}