daemon/logger/copier.go
14887e2e
 package logger
 
 import (
 	"bufio"
f779cfc5
 	"bytes"
14887e2e
 	"io"
b6a42673
 	"sync"
14887e2e
 	"time"
 
 	"github.com/Sirupsen/logrus"
 )
 
 // Copier can copy logs from specified sources to Logger and attach
 // ContainerID and Timestamp.
 // Writes are concurrent, so you need implement some sync in your logger
 type Copier struct {
41d85c01
 	// cid is the container id for which we are copying logs
14887e2e
 	cid string
 	// srcs is map of name -> reader pairs, for example "stdout", "stderr"
b6a42673
 	srcs     map[string]io.Reader
 	dst      Logger
 	copyJobs sync.WaitGroup
bfa80edf
 	closed   chan struct{}
14887e2e
 }
 
41d85c01
 // NewCopier creates a new Copier
 func NewCopier(cid string, srcs map[string]io.Reader, dst Logger) *Copier {
14887e2e
 	return &Copier{
bfa80edf
 		cid:    cid,
 		srcs:   srcs,
 		dst:    dst,
 		closed: make(chan struct{}),
41d85c01
 	}
14887e2e
 }
 
 // Run starts logs copying
 func (c *Copier) Run() {
 	for src, w := range c.srcs {
b6a42673
 		c.copyJobs.Add(1)
14887e2e
 		go c.copySrc(src, w)
 	}
 }
 
 func (c *Copier) copySrc(name string, src io.Reader) {
b6a42673
 	defer c.copyJobs.Done()
f779cfc5
 	reader := bufio.NewReader(src)
 
 	for {
bfa80edf
 		select {
 		case <-c.closed:
 			return
 		default:
 			line, err := reader.ReadBytes('\n')
 			line = bytes.TrimSuffix(line, []byte{'\n'})
f779cfc5
 
bfa80edf
 			// ReadBytes can return full or partial output even when it failed.
 			// e.g. it can return a full entry and EOF.
 			if err == nil || len(line) > 0 {
 				if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
 					logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
 				}
f779cfc5
 			}
 
bfa80edf
 			if err != nil {
 				if err != io.EOF {
 					logrus.Errorf("Error scanning log stream: %s", err)
 				}
 				return
f779cfc5
 			}
 		}
14887e2e
 	}
 }
b6a42673
 
 // Wait waits until all copying is done
 func (c *Copier) Wait() {
 	c.copyJobs.Wait()
 }
bfa80edf
 
 // Close closes the copier
 func (c *Copier) Close() {
 	select {
 	case <-c.closed:
 	default:
 		close(c.closed)
 	}
 }