engine/streams.go
a4f8a249
 package engine
 
 import (
e304e3a6
 	"bytes"
a4f8a249
 	"fmt"
 	"io"
f41e0cf0
 	"io/ioutil"
a4f8a249
 	"sync"
 )
 
 type Output struct {
 	sync.Mutex
 	dests []io.Writer
 	tasks sync.WaitGroup
a33bc301
 	used  bool
a4f8a249
 }
 
e304e3a6
 // Tail returns the n last lines of a buffer
 // stripped out of the last \n, if any
 // if n <= 0, returns an empty string
 func Tail(buffer *bytes.Buffer, n int) string {
 	if n <= 0 {
 		return ""
 	}
 	bytes := buffer.Bytes()
 	if len(bytes) > 0 && bytes[len(bytes)-1] == '\n' {
 		bytes = bytes[:len(bytes)-1]
 	}
 	for i := buffer.Len() - 2; i >= 0; i-- {
 		if bytes[i] == '\n' {
 			n--
 			if n == 0 {
 				return string(bytes[i+1:])
 			}
 		}
 	}
 	return string(bytes)
 }
 
a4f8a249
 // NewOutput returns a new Output object with no destinations attached.
 // Writing to an empty Output will cause the written data to be discarded.
 func NewOutput() *Output {
 	return &Output{}
 }
 
a33bc301
 // Return true if something was written on this output
 func (o *Output) Used() bool {
42e35ecf
 	o.Lock()
 	defer o.Unlock()
a33bc301
 	return o.used
 }
 
a4f8a249
 // Add attaches a new destination to the Output. Any data subsequently written
 // to the output will be written to the new destination in addition to all the others.
 // This method is thread-safe.
e7a9d236
 func (o *Output) Add(dst io.Writer) {
42e35ecf
 	o.Lock()
 	defer o.Unlock()
a4f8a249
 	o.dests = append(o.dests, dst)
e7a9d236
 }
 
 // Set closes and remove existing destination and then attaches a new destination to
 // the Output. Any data subsequently written to the output will be written to the new
 // destination in addition to all the others. This method is thread-safe.
 func (o *Output) Set(dst io.Writer) {
 	o.Close()
42e35ecf
 	o.Lock()
 	defer o.Unlock()
e7a9d236
 	o.dests = []io.Writer{dst}
a4f8a249
 }
 
 // AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination,
 // and returns its reading end for consumption by the caller.
 // This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package.
 // This method is thread-safe.
 func (o *Output) AddPipe() (io.Reader, error) {
 	r, w := io.Pipe()
 	o.Add(w)
 	return r, nil
 }
 
 // Write writes the same data to all registered destinations.
 // This method is thread-safe.
 func (o *Output) Write(p []byte) (n int, err error) {
42e35ecf
 	o.Lock()
 	defer o.Unlock()
a33bc301
 	o.used = true
a4f8a249
 	var firstErr error
 	for _, dst := range o.dests {
 		_, err := dst.Write(p)
 		if err != nil && firstErr == nil {
 			firstErr = err
 		}
 	}
35d54c66
 	return len(p), firstErr
a4f8a249
 }
 
 // Close unregisters all destinations and waits for all background
 // AddTail and AddString tasks to complete.
 // The Close method of each destination is called if it exists.
 func (o *Output) Close() error {
42e35ecf
 	o.Lock()
 	defer o.Unlock()
a4f8a249
 	var firstErr error
 	for _, dst := range o.dests {
170e4d2e
 		if closer, ok := dst.(io.Closer); ok {
a4f8a249
 			err := closer.Close()
 			if err != nil && firstErr == nil {
 				firstErr = err
 			}
 		}
 	}
 	o.tasks.Wait()
 	return firstErr
 }
 
 type Input struct {
 	src io.Reader
 	sync.Mutex
 }
 
 // NewInput returns a new Input object with no source attached.
 // Reading to an empty Input will return io.EOF.
 func NewInput() *Input {
 	return &Input{}
 }
 
 // Read reads from the input in a thread-safe way.
 func (i *Input) Read(p []byte) (n int, err error) {
 	i.Mutex.Lock()
 	defer i.Mutex.Unlock()
 	if i.src == nil {
 		return 0, io.EOF
 	}
 	return i.src.Read(p)
 }
 
e1d8543c
 // Closes the src
 // Not thread safe on purpose
 func (i *Input) Close() error {
 	if i.src != nil {
170e4d2e
 		if closer, ok := i.src.(io.Closer); ok {
e1d8543c
 			return closer.Close()
 		}
 	}
 	return nil
 }
 
a4f8a249
 // Add attaches a new source to the input.
 // Add can only be called once per input. Subsequent calls will
 // return an error.
 func (i *Input) Add(src io.Reader) error {
 	i.Mutex.Lock()
 	defer i.Mutex.Unlock()
 	if i.src != nil {
 		return fmt.Errorf("Maximum number of sources reached: 1")
 	}
 	i.src = src
 	return nil
 }
 
a7a171b6
 // AddEnv starts a new goroutine which will decode all subsequent data
 // as a stream of json-encoded objects, and point `dst` to the last
 // decoded object.
bef8de93
 // The result `env` can be queried using the type-neutral Env interface.
a7a171b6
 // It is not safe to query `env` until the Output is closed.
 func (o *Output) AddEnv() (dst *Env, err error) {
 	src, err := o.AddPipe()
 	if err != nil {
 		return nil, err
 	}
 	dst = &Env{}
 	o.tasks.Add(1)
 	go func() {
 		defer o.tasks.Done()
 		decoder := NewDecoder(src)
 		for {
 			env, err := decoder.Decode()
 			if err != nil {
 				return
 			}
bef8de93
 			*dst = *env
a7a171b6
 		}
 	}()
 	return dst, nil
 }
17a806c8
 
e3461bc8
 func (o *Output) AddListTable() (dst *Table, err error) {
 	src, err := o.AddPipe()
 	if err != nil {
 		return nil, err
 	}
 	dst = NewTable("", 0)
 	o.tasks.Add(1)
 	go func() {
 		defer o.tasks.Done()
f41e0cf0
 		content, err := ioutil.ReadAll(src)
 		if err != nil {
 			return
 		}
 		if _, err := dst.ReadListFrom(content); err != nil {
e3461bc8
 			return
 		}
 	}()
 	return dst, nil
 }
 
17a806c8
 func (o *Output) AddTable() (dst *Table, err error) {
 	src, err := o.AddPipe()
 	if err != nil {
 		return nil, err
 	}
 	dst = NewTable("", 0)
 	o.tasks.Add(1)
 	go func() {
 		defer o.tasks.Done()
 		if _, err := dst.ReadFrom(src); err != nil {
 			return
 		}
 	}()
 	return dst, nil
 }