package stream

import (
	"fmt"
	"io"
	"io/ioutil"
	"strings"
	"sync"

	"github.com/docker/docker/libcontainerd"
	"github.com/docker/docker/pkg/broadcaster"
	"github.com/docker/docker/pkg/ioutils"
	"github.com/docker/docker/pkg/pools"
	"github.com/sirupsen/logrus"
)

// Config holds information about I/O streams managed together.
//
// config.StdinPipe returns a WriteCloser which can be used to feed data
// to the standard input of the streamConfig's active process.
// config.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
// which can be used to retrieve the standard output (and error) generated
// by the container's active process. The output (and error) are actually
// copied and delivered to all StdoutPipe and StderrPipe consumers, using
// a kind of "broadcaster".
type Config struct {
	sync.WaitGroup
	stdout    *broadcaster.Unbuffered
	stderr    *broadcaster.Unbuffered
	stdin     io.ReadCloser
	stdinPipe io.WriteCloser
}

// NewConfig creates a stream config and initializes
// the standard err and standard out to new unbuffered broadcasters.
func NewConfig() *Config {
	return &Config{
		stderr: new(broadcaster.Unbuffered),
		stdout: new(broadcaster.Unbuffered),
	}
}

// Stdout returns the standard output in the configuration.
func (c *Config) Stdout() *broadcaster.Unbuffered {
	return c.stdout
}

// Stderr returns the standard error in the configuration.
func (c *Config) Stderr() *broadcaster.Unbuffered {
	return c.stderr
}

// Stdin returns the standard input in the configuration.
func (c *Config) Stdin() io.ReadCloser {
	return c.stdin
}

// StdinPipe returns an input writer pipe as an io.WriteCloser.
func (c *Config) StdinPipe() io.WriteCloser {
	return c.stdinPipe
}

// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
// It adds this new out pipe to the Stdout broadcaster.
// This will block stdout if unconsumed.
func (c *Config) StdoutPipe() io.ReadCloser {
	bytesPipe := ioutils.NewBytesPipe()
	c.stdout.Add(bytesPipe)
	return bytesPipe
}

// StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
// It adds this new err pipe to the Stderr broadcaster.
// This will block stderr if unconsumed.
func (c *Config) StderrPipe() io.ReadCloser {
	bytesPipe := ioutils.NewBytesPipe()
	c.stderr.Add(bytesPipe)
	return bytesPipe
}

// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
func (c *Config) NewInputPipes() {
	c.stdin, c.stdinPipe = io.Pipe()
}

// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
func (c *Config) NewNopInputPipe() {
	c.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard)
}

// CloseStreams ensures that the configured streams are properly closed.
func (c *Config) CloseStreams() error {
	var errors []string

	if c.stdin != nil {
		if err := c.stdin.Close(); err != nil {
			errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
		}
	}

	if err := c.stdout.Clean(); err != nil {
		errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
	}

	if err := c.stderr.Clean(); err != nil {
		errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
	}

	if len(errors) > 0 {
		return fmt.Errorf(strings.Join(errors, "\n"))
	}

	return nil
}

// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
func (c *Config) CopyToPipe(iop *libcontainerd.IOPipe) {
	copyFunc := func(w io.Writer, r io.ReadCloser) {
		c.Add(1)
		go func() {
			if _, err := pools.Copy(w, r); err != nil {
				logrus.Errorf("stream copy error: %v", err)
			}
			r.Close()
			c.Done()
		}()
	}

	if iop.Stdout != nil {
		copyFunc(c.Stdout(), iop.Stdout)
	}
	if iop.Stderr != nil {
		copyFunc(c.Stderr(), iop.Stderr)
	}

	if stdin := c.Stdin(); stdin != nil {
		if iop.Stdin != nil {
			go func() {
				pools.Copy(iop.Stdin, stdin)
				if err := iop.Stdin.Close(); err != nil {
					logrus.Warnf("failed to close stdin: %v", err)
				}
			}()
		}
	}
}