Also there is aux datastructure Copier which can copy lines from streams
to Loggers
Signed-off-by: Alexander Morozov <lk4d4@docker.com>
| 1 | 1 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,48 @@ |
| 0 |
+package logger |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "bufio" |
|
| 4 |
+ "io" |
|
| 5 |
+ "time" |
|
| 6 |
+ |
|
| 7 |
+ "github.com/Sirupsen/logrus" |
|
| 8 |
+) |
|
| 9 |
+ |
|
| 10 |
+// Copier can copy logs from specified sources to Logger and attach |
|
| 11 |
+// ContainerID and Timestamp. |
|
| 12 |
+// Writes are concurrent, so you need implement some sync in your logger |
|
| 13 |
+type Copier struct {
|
|
| 14 |
+ // cid is container id for which we copying logs |
|
| 15 |
+ cid string |
|
| 16 |
+ // srcs is map of name -> reader pairs, for example "stdout", "stderr" |
|
| 17 |
+ srcs map[string]io.Reader |
|
| 18 |
+ dst Logger |
|
| 19 |
+} |
|
| 20 |
+ |
|
| 21 |
+// NewCopier creates new Copier |
|
| 22 |
+func NewCopier(cid string, srcs map[string]io.Reader, dst Logger) (*Copier, error) {
|
|
| 23 |
+ return &Copier{
|
|
| 24 |
+ cid: cid, |
|
| 25 |
+ srcs: srcs, |
|
| 26 |
+ dst: dst, |
|
| 27 |
+ }, nil |
|
| 28 |
+} |
|
| 29 |
+ |
|
| 30 |
+// Run starts logs copying |
|
| 31 |
+func (c *Copier) Run() {
|
|
| 32 |
+ for src, w := range c.srcs {
|
|
| 33 |
+ go c.copySrc(src, w) |
|
| 34 |
+ } |
|
| 35 |
+} |
|
| 36 |
+ |
|
| 37 |
+func (c *Copier) copySrc(name string, src io.Reader) {
|
|
| 38 |
+ scanner := bufio.NewScanner(src) |
|
| 39 |
+ for scanner.Scan() {
|
|
| 40 |
+ if err := c.dst.Log(&Message{ContainerID: c.cid, Line: scanner.Bytes(), Source: name, Timestamp: time.Now().UTC()}); err != nil {
|
|
| 41 |
+ logrus.Errorf("Failed to log msg %q for logger %s: %s", scanner.Bytes(), c.dst.Name(), err)
|
|
| 42 |
+ } |
|
| 43 |
+ } |
|
| 44 |
+ if err := scanner.Err(); err != nil {
|
|
| 45 |
+ logrus.Errorf("Error scanning log stream: %s", err)
|
|
| 46 |
+ } |
|
| 47 |
+} |
| 0 | 48 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,100 @@ |
| 0 |
+package logger |
|
| 1 |
+ |
|
| 2 |
+import ( |
|
| 3 |
+ "bytes" |
|
| 4 |
+ "encoding/json" |
|
| 5 |
+ "io" |
|
| 6 |
+ "testing" |
|
| 7 |
+ "time" |
|
| 8 |
+) |
|
| 9 |
+ |
|
| 10 |
+type TestLoggerJSON struct {
|
|
| 11 |
+ *json.Encoder |
|
| 12 |
+} |
|
| 13 |
+ |
|
| 14 |
+func (l *TestLoggerJSON) Log(m *Message) error {
|
|
| 15 |
+ return l.Encode(m) |
|
| 16 |
+} |
|
| 17 |
+ |
|
| 18 |
+func (l *TestLoggerJSON) Close() error {
|
|
| 19 |
+ return nil |
|
| 20 |
+} |
|
| 21 |
+ |
|
| 22 |
+func (l *TestLoggerJSON) Name() string {
|
|
| 23 |
+ return "json" |
|
| 24 |
+} |
|
| 25 |
+ |
|
| 26 |
+type TestLoggerText struct {
|
|
| 27 |
+ *bytes.Buffer |
|
| 28 |
+} |
|
| 29 |
+ |
|
| 30 |
+func (l *TestLoggerText) Log(m *Message) error {
|
|
| 31 |
+ _, err := l.WriteString(m.ContainerID + " " + m.Source + " " + string(m.Line) + "\n") |
|
| 32 |
+ return err |
|
| 33 |
+} |
|
| 34 |
+ |
|
| 35 |
+func (l *TestLoggerText) Close() error {
|
|
| 36 |
+ return nil |
|
| 37 |
+} |
|
| 38 |
+ |
|
| 39 |
+func (l *TestLoggerText) Name() string {
|
|
| 40 |
+ return "text" |
|
| 41 |
+} |
|
| 42 |
+ |
|
| 43 |
+func TestCopier(t *testing.T) {
|
|
| 44 |
+ stdoutLine := "Line that thinks that it is log line from docker stdout" |
|
| 45 |
+ stderrLine := "Line that thinks that it is log line from docker stderr" |
|
| 46 |
+ var stdout bytes.Buffer |
|
| 47 |
+ var stderr bytes.Buffer |
|
| 48 |
+ for i := 0; i < 30; i++ {
|
|
| 49 |
+ if _, err := stdout.WriteString(stdoutLine + "\n"); err != nil {
|
|
| 50 |
+ t.Fatal(err) |
|
| 51 |
+ } |
|
| 52 |
+ if _, err := stderr.WriteString(stderrLine + "\n"); err != nil {
|
|
| 53 |
+ t.Fatal(err) |
|
| 54 |
+ } |
|
| 55 |
+ } |
|
| 56 |
+ |
|
| 57 |
+ var jsonBuf bytes.Buffer |
|
| 58 |
+ |
|
| 59 |
+ jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}
|
|
| 60 |
+ |
|
| 61 |
+ cid := "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657" |
|
| 62 |
+ c, err := NewCopier(cid, |
|
| 63 |
+ map[string]io.Reader{
|
|
| 64 |
+ "stdout": &stdout, |
|
| 65 |
+ "stderr": &stderr, |
|
| 66 |
+ }, |
|
| 67 |
+ jsonLog) |
|
| 68 |
+ if err != nil {
|
|
| 69 |
+ t.Fatal(err) |
|
| 70 |
+ } |
|
| 71 |
+ c.Run() |
|
| 72 |
+ time.Sleep(100 * time.Millisecond) |
|
| 73 |
+ dec := json.NewDecoder(&jsonBuf) |
|
| 74 |
+ for {
|
|
| 75 |
+ var msg Message |
|
| 76 |
+ if err := dec.Decode(&msg); err != nil {
|
|
| 77 |
+ if err == io.EOF {
|
|
| 78 |
+ break |
|
| 79 |
+ } |
|
| 80 |
+ t.Fatal(err) |
|
| 81 |
+ } |
|
| 82 |
+ if msg.Source != "stdout" && msg.Source != "stderr" {
|
|
| 83 |
+ t.Fatalf("Wrong Source: %q, should be %q or %q", msg.Source, "stdout", "stderr")
|
|
| 84 |
+ } |
|
| 85 |
+ if msg.ContainerID != cid {
|
|
| 86 |
+ t.Fatalf("Wrong ContainerID: %q, expected %q", msg.ContainerID, cid)
|
|
| 87 |
+ } |
|
| 88 |
+ if msg.Source == "stdout" {
|
|
| 89 |
+ if string(msg.Line) != stdoutLine {
|
|
| 90 |
+ t.Fatalf("Wrong Line: %q, expected %q", msg.Line, stdoutLine)
|
|
| 91 |
+ } |
|
| 92 |
+ } |
|
| 93 |
+ if msg.Source == "stderr" {
|
|
| 94 |
+ if string(msg.Line) != stderrLine {
|
|
| 95 |
+ t.Fatalf("Wrong Line: %q, expected %q", msg.Line, stderrLine)
|
|
| 96 |
+ } |
|
| 97 |
+ } |
|
| 98 |
+ } |
|
| 99 |
+} |
| 0 | 100 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,18 @@ |
| 0 |
+package logger |
|
| 1 |
+ |
|
| 2 |
+import "time" |
|
| 3 |
+ |
|
| 4 |
+// Message is datastructure that represents record from some container |
|
| 5 |
+type Message struct {
|
|
| 6 |
+ ContainerID string |
|
| 7 |
+ Line []byte |
|
| 8 |
+ Source string |
|
| 9 |
+ Timestamp time.Time |
|
| 10 |
+} |
|
| 11 |
+ |
|
| 12 |
+// Logger is interface for docker logging drivers |
|
| 13 |
+type Logger interface {
|
|
| 14 |
+ Log(*Message) error |
|
| 15 |
+ Name() string |
|
| 16 |
+ Close() error |
|
| 17 |
+} |