package logger import ( "io" "os" "strings" "sync" "time" "github.com/docker/docker/api/types/plugins/logdriver" "github.com/docker/docker/pkg/plugingetter" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) // pluginAdapter takes a plugin and implements the Logger interface for logger // instances type pluginAdapter struct { driverName string id string plugin logPlugin basePath string fifoPath string capabilities Capability logInfo Info // synchronize access to the log stream and shared buffer mu sync.Mutex enc logdriver.LogEntryEncoder stream io.WriteCloser // buf is shared for each `Log()` call to reduce allocations. // buf must be protected by mutex buf logdriver.LogEntry } func (a *pluginAdapter) Log(msg *Message) error { a.mu.Lock() a.buf.Line = msg.Line a.buf.TimeNano = msg.Timestamp.UnixNano() a.buf.Partial = msg.Partial a.buf.Source = msg.Source err := a.enc.Encode(&a.buf) a.buf.Reset() a.mu.Unlock() PutMessage(msg) return err } func (a *pluginAdapter) Name() string { return a.driverName } func (a *pluginAdapter) Close() error { a.mu.Lock() defer a.mu.Unlock() if err := a.plugin.StopLogging(strings.TrimPrefix(a.fifoPath, a.basePath)); err != nil { return err } if err := a.stream.Close(); err != nil { logrus.WithError(err).Error("error closing plugin fifo") } if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) { logrus.WithError(err).Error("error cleaning up plugin fifo") } // may be nil, especially for unit tests if pluginGetter != nil { pluginGetter.Get(a.Name(), extName, plugingetter.Release) } return nil } type pluginAdapterWithRead struct { *pluginAdapter } func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { watcher := NewLogWatcher() go func() { defer close(watcher.Msg) stream, err := a.plugin.ReadLogs(a.logInfo, config) if err != nil { watcher.Err <- errors.Wrap(err, "error getting log reader") return } defer stream.Close() dec := logdriver.NewLogEntryDecoder(stream) for { select { case <-watcher.WatchClose(): return default: } var buf logdriver.LogEntry if err := dec.Decode(&buf); err != nil { if err == io.EOF { return } select { case watcher.Err <- errors.Wrap(err, "error decoding log message"): case <-watcher.WatchClose(): } return } msg := &Message{ Timestamp: time.Unix(0, buf.TimeNano), Line: buf.Line, Source: buf.Source, } // plugin should handle this, but check just in case if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) { continue } if !config.Until.IsZero() && msg.Timestamp.After(config.Until) { return } select { case watcher.Msg <- msg: case <-watcher.WatchClose(): // make sure the message we consumed is sent watcher.Msg <- msg return } } }() return watcher }