Browse code

add UDP GELF logging-driver

allows to send container logs to Graylog or Logstash.

Signed-off-by: Marius Sturm <marius@graylog.com>

Marius Sturm authored on 2015/05/30 06:00:46
Showing 17 changed files
... ...
@@ -689,9 +689,14 @@ func (container *Container) getLogger() (logger.Logger, error) {
689 689
 		return nil, fmt.Errorf("Failed to get logging factory: %v", err)
690 690
 	}
691 691
 	ctx := logger.Context{
692
-		Config:        cfg.Config,
693
-		ContainerID:   container.ID,
694
-		ContainerName: container.Name,
692
+		Config:              cfg.Config,
693
+		ContainerID:         container.ID,
694
+		ContainerName:       container.Name,
695
+		ContainerEntrypoint: container.Path,
696
+		ContainerArgs:       container.Args,
697
+		ContainerImageID:    container.ImageID,
698
+		ContainerImageName:  container.Config.Image,
699
+		ContainerCreated:    container.Created,
695 700
 	}
696 701
 
697 702
 	// Set logging file for "json-logger"
... ...
@@ -3,6 +3,7 @@ package daemon
3 3
 // Importing packages here only to make sure their init gets called and
4 4
 // therefore they register themselves to the logdriver factory.
5 5
 import (
6
+	_ "github.com/docker/docker/daemon/logger/gelf"
6 7
 	_ "github.com/docker/docker/daemon/logger/journald"
7 8
 	_ "github.com/docker/docker/daemon/logger/jsonfilelog"
8 9
 	_ "github.com/docker/docker/daemon/logger/syslog"
... ...
@@ -2,7 +2,10 @@ package logger
2 2
 
3 3
 import (
4 4
 	"fmt"
5
+	"os"
6
+	"strings"
5 7
 	"sync"
8
+	"time"
6 9
 )
7 10
 
8 11
 // Creator is a method that builds a logging driver instance with given context
... ...
@@ -10,10 +13,32 @@ type Creator func(Context) (Logger, error)
10 10
 
11 11
 // Context provides enough information for a logging driver to do its function
12 12
 type Context struct {
13
-	Config        map[string]string
14
-	ContainerID   string
15
-	ContainerName string
16
-	LogPath       string
13
+	Config              map[string]string
14
+	ContainerID         string
15
+	ContainerName       string
16
+	ContainerEntrypoint string
17
+	ContainerArgs       []string
18
+	ContainerImageID    string
19
+	ContainerImageName  string
20
+	ContainerCreated    time.Time
21
+	LogPath             string
22
+}
23
+
24
+func (ctx *Context) Hostname() (string, error) {
25
+	hostname, err := os.Hostname()
26
+	if err != nil {
27
+		return "", fmt.Errorf("logger: can not resolve hostname: %v", err)
28
+	}
29
+	return hostname, nil
30
+}
31
+
32
+func (ctx *Context) Command() string {
33
+	terms := []string{ctx.ContainerEntrypoint}
34
+	for _, arg := range ctx.ContainerArgs {
35
+		terms = append(terms, arg)
36
+	}
37
+	command := strings.Join(terms, " ")
38
+	return command
17 39
 }
18 40
 
19 41
 type logdriverFactory struct {
20 42
new file mode 100644
... ...
@@ -0,0 +1,149 @@
0
+// +build linux
1
+
2
+package gelf
3
+
4
+import (
5
+	"bytes"
6
+	"fmt"
7
+	"io"
8
+	"net"
9
+	"net/url"
10
+	"time"
11
+
12
+	"github.com/Graylog2/go-gelf/gelf"
13
+	"github.com/Sirupsen/logrus"
14
+	"github.com/docker/docker/daemon/logger"
15
+	"github.com/docker/docker/pkg/urlutil"
16
+)
17
+
18
+const name = "gelf"
19
+
20
+type GelfLogger struct {
21
+	writer *gelf.Writer
22
+	ctx    logger.Context
23
+	fields GelfFields
24
+}
25
+
26
+type GelfFields struct {
27
+	hostname      string
28
+	containerId   string
29
+	containerName string
30
+	imageId       string
31
+	imageName     string
32
+	command       string
33
+	tag           string
34
+	created       time.Time
35
+}
36
+
37
+func init() {
38
+	if err := logger.RegisterLogDriver(name, New); err != nil {
39
+		logrus.Fatal(err)
40
+	}
41
+}
42
+
43
+func New(ctx logger.Context) (logger.Logger, error) {
44
+	// parse gelf address
45
+	address, err := parseAddress(ctx.Config["gelf-address"])
46
+	if err != nil {
47
+		return nil, err
48
+	}
49
+
50
+	// collect extra data for GELF message
51
+	hostname, err := ctx.Hostname()
52
+	if err != nil {
53
+		return nil, fmt.Errorf("gelf: cannot access hostname to set source field")
54
+	}
55
+
56
+	// remove trailing slash from container name
57
+	containerName := bytes.TrimLeft([]byte(ctx.ContainerName), "/")
58
+
59
+	fields := GelfFields{
60
+		hostname:      hostname,
61
+		containerId:   ctx.ContainerID,
62
+		containerName: string(containerName),
63
+		imageId:       ctx.ContainerImageID,
64
+		imageName:     ctx.ContainerImageName,
65
+		command:       ctx.Command(),
66
+		tag:           ctx.Config["gelf-tag"],
67
+		created:       ctx.ContainerCreated,
68
+	}
69
+
70
+	// create new gelfWriter
71
+	gelfWriter, err := gelf.NewWriter(address)
72
+	if err != nil {
73
+		return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
74
+	}
75
+
76
+	return &GelfLogger{
77
+		writer: gelfWriter,
78
+		ctx:    ctx,
79
+		fields: fields,
80
+	}, nil
81
+}
82
+
83
+func (s *GelfLogger) Log(msg *logger.Message) error {
84
+	// remove trailing and leading whitespace
85
+	short := bytes.TrimSpace([]byte(msg.Line))
86
+
87
+	level := gelf.LOG_INFO
88
+	if msg.Source == "stderr" {
89
+		level = gelf.LOG_ERR
90
+	}
91
+
92
+	m := gelf.Message{
93
+		Version:  "1.1",
94
+		Host:     s.fields.hostname,
95
+		Short:    string(short),
96
+		TimeUnix: float64(msg.Timestamp.UnixNano()/int64(time.Millisecond)) / 1000.0,
97
+		Level:    level,
98
+		Extra: map[string]interface{}{
99
+			"_container_id":   s.fields.containerId,
100
+			"_container_name": s.fields.containerName,
101
+			"_image_id":       s.fields.imageId,
102
+			"_image_name":     s.fields.imageName,
103
+			"_command":        s.fields.command,
104
+			"_tag":            s.fields.tag,
105
+			"_created":        s.fields.created,
106
+		},
107
+	}
108
+
109
+	if err := s.writer.WriteMessage(&m); err != nil {
110
+		return fmt.Errorf("gelf: cannot send GELF message: %v", err)
111
+	}
112
+	return nil
113
+}
114
+
115
+func (s *GelfLogger) GetReader() (io.Reader, error) {
116
+	return nil, logger.ReadLogsNotSupported
117
+}
118
+
119
+func (s *GelfLogger) Close() error {
120
+	return s.writer.Close()
121
+}
122
+
123
+func (s *GelfLogger) Name() string {
124
+	return name
125
+}
126
+
127
+func parseAddress(address string) (string, error) {
128
+	if urlutil.IsTransportURL(address) {
129
+		url, err := url.Parse(address)
130
+		if err != nil {
131
+			return "", err
132
+		}
133
+
134
+		// we support only udp
135
+		if url.Scheme != "udp" {
136
+			return "", fmt.Errorf("gelf: endpoint needs to be UDP")
137
+		}
138
+
139
+		// get host and port
140
+		if _, _, err = net.SplitHostPort(url.Host); err != nil {
141
+			return "", fmt.Errorf("gelf: please provide gelf-address as udp://host:port")
142
+		}
143
+
144
+		return url.Host, nil
145
+	}
146
+
147
+	return "", nil
148
+}
0 149
new file mode 100644
... ...
@@ -0,0 +1,3 @@
0
+// +build !linux
1
+
2
+package gelf
... ...
@@ -155,7 +155,7 @@ two memory nodes.
155 155
 **--lxc-conf**=[]
156 156
    (lxc exec-driver only) Add custom lxc options --lxc-conf="lxc.cgroup.cpuset.cpus = 0,1"
157 157
 
158
-**--log-driver**="|*json-file*|*syslog*|*journald*|*none*"
158
+**--log-driver**="|*json-file*|*syslog*|*journald*|*gelf*|*none*"
159 159
   Logging driver for container. Default is defined by daemon `--log-driver` flag.
160 160
   **Warning**: `docker logs` command works only for `json-file` logging driver.
161 161
 
... ...
@@ -252,7 +252,7 @@ which interface and port to use.
252 252
 **--lxc-conf**=[]
253 253
    (lxc exec-driver only) Add custom lxc options --lxc-conf="lxc.cgroup.cpuset.cpus = 0,1"
254 254
 
255
-**--log-driver**="|*json-file*|*syslog*|*journald*|*none*"
255
+**--log-driver**="|*json-file*|*syslog*|*journald*|*gelf*|*none*"
256 256
   Logging driver for container. Default is defined by daemon `--log-driver` flag.
257 257
   **Warning**: `docker logs` command works only for `json-file` logging driver.
258 258
 
... ...
@@ -103,7 +103,7 @@ unix://[/path/to/socket] to use.
103 103
 **--label**="[]"
104 104
   Set key=value labels to the daemon (displayed in `docker info`)
105 105
 
106
-**--log-driver**="*json-file*|*syslog*|*journald*|*none*"
106
+**--log-driver**="*json-file*|*syslog*|*journald*|*gelf*|*none*"
107 107
   Default driver for container logs. Default is `json-file`.
108 108
   **Warning**: `docker logs` command works only for `json-file` logging driver.
109 109
 
... ...
@@ -269,7 +269,7 @@ Json Parameters:
269 269
         systems, such as SELinux.
270 270
     -   **LogConfig** - Log configuration for the container, specified as a JSON object in the form
271 271
           `{ "Type": "<driver_name>", "Config": {"key1": "val1"}}`.
272
-          Available types: `json-file`, `syslog`, `journald`, `none`.
272
+          Available types: `json-file`, `syslog`, `journald`, `gelf`, `none`.
273 273
           `json-file` logging driver.
274 274
     -   **CgroupParent** - Path to `cgroups` under which the container's `cgroup` is created. If the path is not absolute, the path is considered to be relative to the `cgroups` path of the init process. Cgroups are created if they do not already exist.
275 275
 
... ...
@@ -906,6 +906,25 @@ reference documentation.
906 906
 
907 907
 The following logging options are supported for this logging driver: [none]
908 908
 
909
+#### Logging driver: gelf
910
+
911
+Graylog Extended Log Format (GELF) logging driver for Docker. Writes log messages to a GELF endpoint like
912
+Graylog or Logstash. The `docker logs` command is not available for this logging driver.
913
+
914
+The GELF logging driver supports the following options:
915
+
916
+    --log-opt gelf-address=udp://host:port
917
+    --log-opt gelf-tag="database"
918
+
919
+The `gelf-address` option specifies the remote GELF server address that the
920
+driver connects to. Currently, only `udp` is supported as the transport and you must
921
+specify a `port` value. The following example shows how to connect the `gelf`
922
+driver to a GELF remote server at `192.168.0.42` on port `12201`
923
+
924
+    $ docker run --log-driver=gelf --log-opt gelf-address=udp://192.168.0.42:12201
925
+
926
+The `gelf-tag` option specifies a tag for easy container identification.
927
+
909 928
 ## Overriding Dockerfile image defaults
910 929
 
911 930
 When a developer builds an image from a [*Dockerfile*](/reference/builder)
... ...
@@ -75,3 +75,4 @@ clone git github.com/coreos/go-systemd v2
75 75
 clone git github.com/godbus/dbus v2
76 76
 clone git github.com/syndtr/gocapability 66ef2aa7a23ba682594e2b6f74cf40c0692b49fb
77 77
 clone git github.com/golang/protobuf 655cdfa588ea
78
+clone git github.com/Graylog2/go-gelf 6c62a85f1d47a67f2a5144c0e745b325889a8120
78 79
new file mode 100644
... ...
@@ -0,0 +1,2 @@
0
+*~
1
+.#*
0 2
new file mode 100644
... ...
@@ -0,0 +1,21 @@
0
+Copyright 2012 SocialCode
1
+
2
+Permission is hereby granted, free of charge, to any person obtaining
3
+a copy of this software and associated documentation files (the
4
+"Software"), to deal in the Software without restriction, including
5
+without limitation the rights to use, copy, modify, merge, publish,
6
+distribute, sublicense, and/or sell copies of the Software, and to
7
+permit persons to whom the Software is furnished to do so, subject to
8
+the following conditions:
9
+
10
+The above copyright notice and this permission notice shall be
11
+included in all copies or substantial portions of the Software.
12
+
13
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
15
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
17
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
18
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20
+
0 21
new file mode 100644
... ...
@@ -0,0 +1,76 @@
0
+go-gelf - GELF library and writer for Go
1
+========================================
2
+
3
+GELF is graylog2's UDP logging format.  This library provides an API
4
+that applications can use to log messages directly to a graylog2
5
+server, along with an `io.Writer` that can be use to redirect the
6
+standard library's log messages (or `os.Stdout`), to a graylog2 server.
7
+
8
+Installing
9
+----------
10
+
11
+go-gelf is go get-able:
12
+
13
+	go get github.com/Graylog2/go-gelf/gelf
14
+
15
+Usage
16
+-----
17
+
18
+The easiest way to integrate graylog logging into your go app is by
19
+having your `main` function (or even `init`) call `log.SetOutput()`.
20
+By using an `io.MultiWriter`, we can log to both stdout and graylog -
21
+giving us both centralized and local logs.  (Redundancy is nice).
22
+
23
+	package main
24
+
25
+	import (
26
+		"flag"
27
+		"github.com/Graylog2/go-gelf/gelf"
28
+		"io"
29
+		"log"
30
+		"os"
31
+	)
32
+
33
+	func main() {
34
+		var graylogAddr string
35
+
36
+		flag.StringVar(&graylogAddr, "graylog", "", "graylog server addr")
37
+		flag.Parse()
38
+
39
+		if graylogAddr != "" {
40
+			gelfWriter, err := gelf.NewWriter(graylogAddr)
41
+			if err != nil {
42
+				log.Fatalf("gelf.NewWriter: %s", err)
43
+			}
44
+			// log to both stderr and graylog2
45
+			log.SetOutput(io.MultiWriter(os.Stderr, gelfWriter))
46
+			log.Printf("logging to stderr & graylog2@'%s'", graylogAddr)
47
+		}
48
+
49
+		// From here on out, any calls to log.Print* functions
50
+		// will appear on stdout, and be sent over UDP to the
51
+		// specified Graylog2 server.
52
+
53
+		log.Printf("Hello gray World")
54
+
55
+		// ...
56
+	}
57
+
58
+The above program can be invoked as:
59
+
60
+	go run test.go -graylog=localhost:12201
61
+
62
+Because GELF messages are sent over UDP, graylog server availability
63
+doesn't impact application performance or response time.  There is a
64
+small, fixed overhead per log call, regardless of whether the target
65
+server is reachable or not.
66
+
67
+To Do
68
+-----
69
+
70
+- WriteMessage example
71
+
72
+License
73
+-------
74
+
75
+go-gelf is offered under the MIT license, see LICENSE for details.
0 76
new file mode 100644
... ...
@@ -0,0 +1,142 @@
0
+// Copyright 2012 SocialCode. All rights reserved.
1
+// Use of this source code is governed by the MIT
2
+// license that can be found in the LICENSE file.
3
+
4
+package gelf
5
+
6
+import (
7
+	"bytes"
8
+	"compress/gzip"
9
+	"compress/zlib"
10
+	"encoding/json"
11
+	"fmt"
12
+	"io"
13
+	"net"
14
+	"strings"
15
+	"sync"
16
+)
17
+
18
+type Reader struct {
19
+	mu   sync.Mutex
20
+	conn net.Conn
21
+}
22
+
23
+func NewReader(addr string) (*Reader, error) {
24
+	var err error
25
+	udpAddr, err := net.ResolveUDPAddr("udp", addr)
26
+	if err != nil {
27
+		return nil, fmt.Errorf("ResolveUDPAddr('%s'): %s", addr, err)
28
+	}
29
+
30
+	conn, err := net.ListenUDP("udp", udpAddr)
31
+	if err != nil {
32
+		return nil, fmt.Errorf("ListenUDP: %s", err)
33
+	}
34
+
35
+	r := new(Reader)
36
+	r.conn = conn
37
+	return r, nil
38
+}
39
+
40
+func (r *Reader) Addr() string {
41
+	return r.conn.LocalAddr().String()
42
+}
43
+
44
+// FIXME: this will discard data if p isn't big enough to hold the
45
+// full message.
46
+func (r *Reader) Read(p []byte) (int, error) {
47
+	msg, err := r.ReadMessage()
48
+	if err != nil {
49
+		return -1, err
50
+	}
51
+
52
+	var data string
53
+
54
+	if msg.Full == "" {
55
+		data = msg.Short
56
+	} else {
57
+		data = msg.Full
58
+	}
59
+
60
+	return strings.NewReader(data).Read(p)
61
+}
62
+
63
+func (r *Reader) ReadMessage() (*Message, error) {
64
+	cBuf := make([]byte, ChunkSize)
65
+	var (
66
+		err        error
67
+		n, length  int
68
+		buf        bytes.Buffer
69
+		cid, ocid  []byte
70
+		seq, total uint8
71
+		cHead      []byte
72
+		cReader    io.Reader
73
+		chunks     [][]byte
74
+	)
75
+
76
+	for got := 0; got < 128 && (total == 0 || got < int(total)); got++ {
77
+		if n, err = r.conn.Read(cBuf); err != nil {
78
+			return nil, fmt.Errorf("Read: %s", err)
79
+		}
80
+		cHead, cBuf = cBuf[:2], cBuf[:n]
81
+
82
+		if bytes.Equal(cHead, magicChunked) {
83
+			//fmt.Printf("chunked %v\n", cBuf[:14])
84
+			cid, seq, total = cBuf[2:2+8], cBuf[2+8], cBuf[2+8+1]
85
+			if ocid != nil && !bytes.Equal(cid, ocid) {
86
+				return nil, fmt.Errorf("out-of-band message %v (awaited %v)", cid, ocid)
87
+			} else if ocid == nil {
88
+				ocid = cid
89
+				chunks = make([][]byte, total)
90
+			}
91
+			n = len(cBuf) - chunkedHeaderLen
92
+			//fmt.Printf("setting chunks[%d]: %d\n", seq, n)
93
+			chunks[seq] = append(make([]byte, 0, n), cBuf[chunkedHeaderLen:]...)
94
+			length += n
95
+		} else { //not chunked
96
+			if total > 0 {
97
+				return nil, fmt.Errorf("out-of-band message (not chunked)")
98
+			}
99
+			break
100
+		}
101
+	}
102
+	//fmt.Printf("\nchunks: %v\n", chunks)
103
+
104
+	if length > 0 {
105
+		if cap(cBuf) < length {
106
+			cBuf = append(cBuf, make([]byte, 0, length-cap(cBuf))...)
107
+		}
108
+		cBuf = cBuf[:0]
109
+		for i := range chunks {
110
+			//fmt.Printf("appending %d %v\n", i, chunks[i])
111
+			cBuf = append(cBuf, chunks[i]...)
112
+		}
113
+		cHead = cBuf[:2]
114
+	}
115
+
116
+	// the data we get from the wire is compressed
117
+	if bytes.Equal(cHead, magicGzip) {
118
+		cReader, err = gzip.NewReader(bytes.NewReader(cBuf))
119
+	} else if cHead[0] == magicZlib[0] &&
120
+		(int(cHead[0])*256+int(cHead[1]))%31 == 0 {
121
+		// zlib is slightly more complicated, but correct
122
+		cReader, err = zlib.NewReader(bytes.NewReader(cBuf))
123
+	} else {
124
+		return nil, fmt.Errorf("unknown magic: %x %v", cHead, cHead)
125
+	}
126
+
127
+	if err != nil {
128
+		return nil, fmt.Errorf("NewReader: %s", err)
129
+	}
130
+
131
+	if _, err = io.Copy(&buf, cReader); err != nil {
132
+		return nil, fmt.Errorf("io.Copy: %s", err)
133
+	}
134
+
135
+	msg := new(Message)
136
+	if err := json.Unmarshal(buf.Bytes(), &msg); err != nil {
137
+		return nil, fmt.Errorf("json.Unmarshal: %s", err)
138
+	}
139
+
140
+	return msg, nil
141
+}
0 142
new file mode 100644
... ...
@@ -0,0 +1,373 @@
0
+// Copyright 2012 SocialCode. All rights reserved.
1
+// Use of this source code is governed by the MIT
2
+// license that can be found in the LICENSE file.
3
+
4
+package gelf
5
+
6
+import (
7
+	"bytes"
8
+	"compress/flate"
9
+	"compress/gzip"
10
+	"compress/zlib"
11
+	"crypto/rand"
12
+	"encoding/json"
13
+	"fmt"
14
+	"io"
15
+	"net"
16
+	"os"
17
+	"path"
18
+	"runtime"
19
+	"strings"
20
+	"sync"
21
+	"time"
22
+)
23
+
24
+// Writer implements io.Writer and is used to send both discrete
25
+// messages to a graylog2 server, or data from a stream-oriented
26
+// interface (like the functions in log).
27
+type Writer struct {
28
+	mu               sync.Mutex
29
+	conn             net.Conn
30
+	hostname         string
31
+	Facility         string // defaults to current process name
32
+	CompressionLevel int    // one of the consts from compress/flate
33
+	CompressionType  CompressType
34
+}
35
+
36
+// What compression type the writer should use when sending messages
37
+// to the graylog2 server
38
+type CompressType int
39
+
40
+const (
41
+	CompressGzip CompressType = iota
42
+	CompressZlib
43
+)
44
+
45
+// Message represents the contents of the GELF message.  It is gzipped
46
+// before sending.
47
+type Message struct {
48
+	Version  string                 `json:"version"`
49
+	Host     string                 `json:"host"`
50
+	Short    string                 `json:"short_message"`
51
+	Full     string                 `json:"full_message"`
52
+	TimeUnix float64                `json:"timestamp"`
53
+	Level    int32                  `json:"level"`
54
+	Facility string                 `json:"facility"`
55
+	Extra    map[string]interface{} `json:"-"`
56
+}
57
+
58
+type innerMessage Message //against circular (Un)MarshalJSON
59
+
60
+// Used to control GELF chunking.  Should be less than (MTU - len(UDP
61
+// header)).
62
+//
63
+// TODO: generate dynamically using Path MTU Discovery?
64
+const (
65
+	ChunkSize        = 1420
66
+	chunkedHeaderLen = 12
67
+	chunkedDataLen   = ChunkSize - chunkedHeaderLen
68
+)
69
+
70
+var (
71
+	magicChunked = []byte{0x1e, 0x0f}
72
+	magicZlib    = []byte{0x78}
73
+	magicGzip    = []byte{0x1f, 0x8b}
74
+)
75
+
76
+// Syslog severity levels
77
+const (
78
+  LOG_EMERG   = int32(0)
79
+  LOG_ALERT   = int32(1)
80
+  LOG_CRIT    = int32(2)
81
+  LOG_ERR     = int32(3)
82
+  LOG_WARNING = int32(4)
83
+  LOG_NOTICE  = int32(5)
84
+  LOG_INFO    = int32(6)
85
+  LOG_DEBUG   = int32(7)
86
+)
87
+
88
+// numChunks returns the number of GELF chunks necessary to transmit
89
+// the given compressed buffer.
90
+func numChunks(b []byte) int {
91
+	lenB := len(b)
92
+	if lenB <= ChunkSize {
93
+		return 1
94
+	}
95
+	return len(b)/chunkedDataLen + 1
96
+}
97
+
98
+// New returns a new GELF Writer.  This writer can be used to send the
99
+// output of the standard Go log functions to a central GELF server by
100
+// passing it to log.SetOutput()
101
+func NewWriter(addr string) (*Writer, error) {
102
+	var err error
103
+	w := new(Writer)
104
+	w.CompressionLevel = flate.BestSpeed
105
+
106
+	if w.conn, err = net.Dial("udp", addr); err != nil {
107
+		return nil, err
108
+	}
109
+	if w.hostname, err = os.Hostname(); err != nil {
110
+		return nil, err
111
+	}
112
+
113
+	w.Facility = path.Base(os.Args[0])
114
+
115
+	return w, nil
116
+}
117
+
118
+// writes the gzip compressed byte array to the connection as a series
119
+// of GELF chunked messages.  The header format is documented at
120
+// https://github.com/Graylog2/graylog2-docs/wiki/GELF as:
121
+//
122
+//     2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte
123
+//     total, chunk-data
124
+func (w *Writer) writeChunked(zBytes []byte) (err error) {
125
+	b := make([]byte, 0, ChunkSize)
126
+	buf := bytes.NewBuffer(b)
127
+	nChunksI := numChunks(zBytes)
128
+	if nChunksI > 255 {
129
+		return fmt.Errorf("msg too large, would need %d chunks", nChunksI)
130
+	}
131
+	nChunks := uint8(nChunksI)
132
+	// use urandom to get a unique message id
133
+	msgId := make([]byte, 8)
134
+	n, err := io.ReadFull(rand.Reader, msgId)
135
+	if err != nil || n != 8 {
136
+		return fmt.Errorf("rand.Reader: %d/%s", n, err)
137
+	}
138
+
139
+	bytesLeft := len(zBytes)
140
+	for i := uint8(0); i < nChunks; i++ {
141
+		buf.Reset()
142
+		// manually write header.  Don't care about
143
+		// host/network byte order, because the spec only
144
+		// deals in individual bytes.
145
+		buf.Write(magicChunked) //magic
146
+		buf.Write(msgId)
147
+		buf.WriteByte(i)
148
+		buf.WriteByte(nChunks)
149
+		// slice out our chunk from zBytes
150
+		chunkLen := chunkedDataLen
151
+		if chunkLen > bytesLeft {
152
+			chunkLen = bytesLeft
153
+		}
154
+		off := int(i) * chunkedDataLen
155
+		chunk := zBytes[off : off+chunkLen]
156
+		buf.Write(chunk)
157
+
158
+		// write this chunk, and make sure the write was good
159
+		n, err := w.conn.Write(buf.Bytes())
160
+		if err != nil {
161
+			return fmt.Errorf("Write (chunk %d/%d): %s", i,
162
+				nChunks, err)
163
+		}
164
+		if n != len(buf.Bytes()) {
165
+			return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)",
166
+				i, nChunks, n, len(buf.Bytes()))
167
+		}
168
+
169
+		bytesLeft -= chunkLen
170
+	}
171
+
172
+	if bytesLeft != 0 {
173
+		return fmt.Errorf("error: %d bytes left after sending", bytesLeft)
174
+	}
175
+	return nil
176
+}
177
+
178
+// WriteMessage sends the specified message to the GELF server
179
+// specified in the call to New().  It assumes all the fields are
180
+// filled out appropriately.  In general, clients will want to use
181
+// Write, rather than WriteMessage.
182
+func (w *Writer) WriteMessage(m *Message) (err error) {
183
+	mBytes, err := json.Marshal(m)
184
+	if err != nil {
185
+		return
186
+	}
187
+
188
+	var zBuf bytes.Buffer
189
+	var zw io.WriteCloser
190
+	switch w.CompressionType {
191
+	case CompressGzip:
192
+		zw, err = gzip.NewWriterLevel(&zBuf, w.CompressionLevel)
193
+	case CompressZlib:
194
+		zw, err = zlib.NewWriterLevel(&zBuf, w.CompressionLevel)
195
+	default:
196
+		panic(fmt.Sprintf("unknown compression type %d",
197
+			w.CompressionType))
198
+	}
199
+	if err != nil {
200
+		return
201
+	}
202
+	if _, err = zw.Write(mBytes); err != nil {
203
+		return
204
+	}
205
+	zw.Close()
206
+
207
+	zBytes := zBuf.Bytes()
208
+	if numChunks(zBytes) > 1 {
209
+		return w.writeChunked(zBytes)
210
+	}
211
+
212
+	n, err := w.conn.Write(zBytes)
213
+	if err != nil {
214
+		return
215
+	}
216
+	if n != len(zBytes) {
217
+		return fmt.Errorf("bad write (%d/%d)", n, len(zBytes))
218
+	}
219
+
220
+	return nil
221
+}
222
+
223
+// Close connection and interrupt blocked Read or Write operations
224
+func (w *Writer) Close() (error) {
225
+  return w.conn.Close()
226
+}
227
+
228
+/*
229
+func (w *Writer) Alert(m string) (err error)
230
+func (w *Writer) Close() error
231
+func (w *Writer) Crit(m string) (err error)
232
+func (w *Writer) Debug(m string) (err error)
233
+func (w *Writer) Emerg(m string) (err error)
234
+func (w *Writer) Err(m string) (err error)
235
+func (w *Writer) Info(m string) (err error)
236
+func (w *Writer) Notice(m string) (err error)
237
+func (w *Writer) Warning(m string) (err error)
238
+*/
239
+
240
+// getCaller returns the filename and the line info of a function
241
+// further down in the call stack.  Passing 0 in as callDepth would
242
+// return info on the function calling getCallerIgnoringLog, 1 the
243
+// parent function, and so on.  Any suffixes passed to getCaller are
244
+// path fragments like "/pkg/log/log.go", and functions in the call
245
+// stack from that file are ignored.
246
+func getCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) {
247
+	// bump by 1 to ignore the getCaller (this) stackframe
248
+	callDepth++
249
+outer:
250
+	for {
251
+		var ok bool
252
+		_, file, line, ok = runtime.Caller(callDepth)
253
+		if !ok {
254
+			file = "???"
255
+			line = 0
256
+			break
257
+		}
258
+
259
+		for _, s := range suffixesToIgnore {
260
+			if strings.HasSuffix(file, s) {
261
+				callDepth++
262
+				continue outer
263
+			}
264
+		}
265
+		break
266
+	}
267
+	return
268
+}
269
+
270
+func getCallerIgnoringLogMulti(callDepth int) (string, int) {
271
+	// the +1 is to ignore this (getCallerIgnoringLogMulti) frame
272
+	return getCaller(callDepth+1, "/pkg/log/log.go", "/pkg/io/multi.go")
273
+}
274
+
275
+// Write encodes the given string in a GELF message and sends it to
276
+// the server specified in New().
277
+func (w *Writer) Write(p []byte) (n int, err error) {
278
+
279
+	// 1 for the function that called us.
280
+	file, line := getCallerIgnoringLogMulti(1)
281
+
282
+	// remove trailing and leading whitespace
283
+	p = bytes.TrimSpace(p)
284
+
285
+	// If there are newlines in the message, use the first line
286
+	// for the short message and set the full message to the
287
+	// original input.  If the input has no newlines, stick the
288
+	// whole thing in Short.
289
+	short := p
290
+	full := []byte("")
291
+	if i := bytes.IndexRune(p, '\n'); i > 0 {
292
+		short = p[:i]
293
+		full = p
294
+	}
295
+
296
+	m := Message{
297
+		Version:  "1.1",
298
+		Host:     w.hostname,
299
+		Short:    string(short),
300
+		Full:     string(full),
301
+		TimeUnix: float64(time.Now().Unix()),
302
+		Level:    6, // info
303
+		Facility: w.Facility,
304
+		Extra: map[string]interface{}{
305
+			"_file": file,
306
+			"_line": line,
307
+		},
308
+	}
309
+
310
+	if err = w.WriteMessage(&m); err != nil {
311
+		return 0, err
312
+	}
313
+
314
+	return len(p), nil
315
+}
316
+
317
+func (m *Message) MarshalJSON() ([]byte, error) {
318
+	var err error
319
+	var b, eb []byte
320
+
321
+	extra := m.Extra
322
+	b, err = json.Marshal((*innerMessage)(m))
323
+	m.Extra = extra
324
+	if err != nil {
325
+		return nil, err
326
+	}
327
+
328
+	if len(extra) == 0 {
329
+		return b, nil
330
+	}
331
+
332
+	if eb, err = json.Marshal(extra); err != nil {
333
+		return nil, err
334
+	}
335
+
336
+	// merge serialized message + serialized extra map
337
+	b[len(b)-1] = ','
338
+	return append(b, eb[1:len(eb)]...), nil
339
+}
340
+
341
+func (m *Message) UnmarshalJSON(data []byte) error {
342
+	i := make(map[string]interface{}, 16)
343
+	if err := json.Unmarshal(data, &i); err != nil {
344
+		return err
345
+	}
346
+	for k, v := range i {
347
+		if k[0] == '_' {
348
+			if m.Extra == nil {
349
+				m.Extra = make(map[string]interface{}, 1)
350
+			}
351
+			m.Extra[k] = v
352
+			continue
353
+		}
354
+		switch k {
355
+		case "version":
356
+			m.Version = v.(string)
357
+		case "host":
358
+			m.Host = v.(string)
359
+		case "short_message":
360
+			m.Short = v.(string)
361
+		case "full_message":
362
+			m.Full = v.(string)
363
+		case "timestamp":
364
+			m.TimeUnix = v.(float64)
365
+		case "level":
366
+			m.Level = int32(v.(float64))
367
+		case "facility":
368
+			m.Facility = v.(string)
369
+		}
370
+	}
371
+	return nil
372
+}
0 373
new file mode 100644
... ...
@@ -0,0 +1,230 @@
0
+// Copyright 2012 SocialCode. All rights reserved.
1
+// Use of this source code is governed by the MIT
2
+// license that can be found in the LICENSE file.
3
+
4
+package gelf
5
+
6
+import (
7
+	"crypto/rand"
8
+	"encoding/base64"
9
+	"fmt"
10
+	"strings"
11
+	"testing"
12
+	"time"
13
+)
14
+
15
+func TestNewWriter(t *testing.T) {
16
+	w, err := NewWriter("")
17
+	if err == nil || w != nil {
18
+		t.Errorf("New didn't fail")
19
+		return
20
+	}
21
+}
22
+
23
+func sendAndRecv(msgData string, compress CompressType) (*Message, error) {
24
+	r, err := NewReader("127.0.0.1:0")
25
+	if err != nil {
26
+		return nil, fmt.Errorf("NewReader: %s", err)
27
+	}
28
+
29
+	w, err := NewWriter(r.Addr())
30
+	if err != nil {
31
+		return nil, fmt.Errorf("NewWriter: %s", err)
32
+	}
33
+	w.CompressionType = compress
34
+
35
+	if _, err = w.Write([]byte(msgData)); err != nil {
36
+		return nil, fmt.Errorf("w.Write: %s", err)
37
+	}
38
+
39
+	return r.ReadMessage()
40
+}
41
+
42
+func sendAndRecvMsg(msg *Message, compress CompressType) (*Message, error) {
43
+	r, err := NewReader("127.0.0.1:0")
44
+	if err != nil {
45
+		return nil, fmt.Errorf("NewReader: %s", err)
46
+	}
47
+
48
+	w, err := NewWriter(r.Addr())
49
+	if err != nil {
50
+		return nil, fmt.Errorf("NewWriter: %s", err)
51
+	}
52
+	w.CompressionType = compress
53
+
54
+	if err = w.WriteMessage(msg); err != nil {
55
+		return nil, fmt.Errorf("w.Write: %s", err)
56
+	}
57
+
58
+	return r.ReadMessage()
59
+}
60
+
61
+// tests single-message (non-chunked) messages that are split over
62
+// multiple lines
63
+func TestWriteSmallMultiLine(t *testing.T) {
64
+	for _, i := range []CompressType{CompressGzip, CompressZlib} {
65
+		msgData := "awesomesauce\nbananas"
66
+
67
+		msg, err := sendAndRecv(msgData, i)
68
+		if err != nil {
69
+			t.Errorf("sendAndRecv: %s", err)
70
+			return
71
+		}
72
+
73
+		if msg.Short != "awesomesauce" {
74
+			t.Errorf("msg.Short: expected %s, got %s", msgData, msg.Full)
75
+			return
76
+		}
77
+
78
+		if msg.Full != msgData {
79
+			t.Errorf("msg.Full: expected %s, got %s", msgData, msg.Full)
80
+			return
81
+		}
82
+	}
83
+}
84
+
85
+// tests single-message (non-chunked) messages that are a single line long
86
+func TestWriteSmallOneLine(t *testing.T) {
87
+	msgData := "some awesome thing\n"
88
+	msgDataTrunc := msgData[:len(msgData)-1]
89
+
90
+	msg, err := sendAndRecv(msgData, CompressGzip)
91
+	if err != nil {
92
+		t.Errorf("sendAndRecv: %s", err)
93
+		return
94
+	}
95
+
96
+	// we should remove the trailing newline
97
+	if msg.Short != msgDataTrunc {
98
+		t.Errorf("msg.Short: expected %s, got %s",
99
+			msgDataTrunc, msg.Short)
100
+		return
101
+	}
102
+
103
+	if msg.Full != "" {
104
+		t.Errorf("msg.Full: expected %s, got %s", msgData, msg.Full)
105
+		return
106
+	}
107
+
108
+	fileExpected := "/go-gelf/gelf/writer_test.go"
109
+	if !strings.HasSuffix(msg.Extra["_file"].(string), fileExpected) {
110
+		t.Errorf("msg.File: expected %s, got %s", fileExpected,
111
+			msg.Extra["_file"].(string))
112
+		return
113
+	}
114
+
115
+	if len(msg.Extra) != 2 {
116
+		t.Errorf("extra fields in %v (expect only file and line)", msg.Extra)
117
+		return
118
+	}
119
+}
120
+
121
+func TestGetCaller(t *testing.T) {
122
+	file, line := getCallerIgnoringLogMulti(1000)
123
+	if line != 0 || file != "???" {
124
+		t.Errorf("didn't fail 1 %s %d", file, line)
125
+		return
126
+	}
127
+
128
+	file, _ = getCaller(0)
129
+	if !strings.HasSuffix(file, "/gelf/writer_test.go") {
130
+		t.Errorf("not writer_test.go 1? %s", file)
131
+	}
132
+
133
+	file, _ = getCallerIgnoringLogMulti(0)
134
+	if !strings.HasSuffix(file, "/gelf/writer_test.go") {
135
+		t.Errorf("not writer_test.go 2? %s", file)
136
+	}
137
+}
138
+
139
+// tests single-message (chunked) messages
140
+func TestWriteBigChunked(t *testing.T) {
141
+	randData := make([]byte, 4096)
142
+	if _, err := rand.Read(randData); err != nil {
143
+		t.Errorf("cannot get random data: %s", err)
144
+		return
145
+	}
146
+	msgData := "awesomesauce\n" + base64.StdEncoding.EncodeToString(randData)
147
+
148
+	for _, i := range []CompressType{CompressGzip, CompressZlib} {
149
+		msg, err := sendAndRecv(msgData, i)
150
+		if err != nil {
151
+			t.Errorf("sendAndRecv: %s", err)
152
+			return
153
+		}
154
+
155
+		if msg.Short != "awesomesauce" {
156
+			t.Errorf("msg.Short: expected %s, got %s", msgData, msg.Full)
157
+			return
158
+		}
159
+
160
+		if msg.Full != msgData {
161
+			t.Errorf("msg.Full: expected %s, got %s", msgData, msg.Full)
162
+			return
163
+		}
164
+	}
165
+}
166
+
167
+// tests messages with extra data
168
+func TestExtraData(t *testing.T) {
169
+
170
+	// time.Now().Unix() seems fine, UnixNano() won't roundtrip
171
+	// through string -> float64 -> int64
172
+	extra := map[string]interface{}{
173
+		"_a":    10 * time.Now().Unix(),
174
+		"C":     9,
175
+		"_file": "writer_test.go",
176
+		"_line": 186,
177
+	}
178
+
179
+	short := "quick"
180
+	full := short + "\nwith more detail"
181
+	m := Message{
182
+		Version:  "1.0",
183
+		Host:     "fake-host",
184
+		Short:    string(short),
185
+		Full:     string(full),
186
+		TimeUnix: float64(time.Now().Unix()),
187
+		Level:    6, // info
188
+		Facility: "writer_test",
189
+		Extra:    extra,
190
+	}
191
+
192
+	for _, i := range []CompressType{CompressGzip, CompressZlib} {
193
+		msg, err := sendAndRecvMsg(&m, i)
194
+		if err != nil {
195
+			t.Errorf("sendAndRecv: %s", err)
196
+			return
197
+		}
198
+
199
+		if msg.Short != short {
200
+			t.Errorf("msg.Short: expected %s, got %s", short, msg.Full)
201
+			return
202
+		}
203
+
204
+		if msg.Full != full {
205
+			t.Errorf("msg.Full: expected %s, got %s", full, msg.Full)
206
+			return
207
+		}
208
+
209
+		if len(msg.Extra) != 3 {
210
+			t.Errorf("extra extra fields in %v", msg.Extra)
211
+			return
212
+		}
213
+
214
+		if int64(msg.Extra["_a"].(float64)) != extra["_a"].(int64) {
215
+			t.Errorf("_a didn't roundtrip (%v != %v)", int64(msg.Extra["_a"].(float64)), extra["_a"].(int64))
216
+			return
217
+		}
218
+
219
+		if string(msg.Extra["_file"].(string)) != extra["_file"] {
220
+			t.Errorf("_file didn't roundtrip (%v != %v)", msg.Extra["_file"].(string), extra["_file"].(string))
221
+			return
222
+		}
223
+
224
+		if int(msg.Extra["_line"].(float64)) != extra["_line"].(int) {
225
+			t.Errorf("_line didn't roundtrip (%v != %v)", int(msg.Extra["_line"].(float64)), extra["_line"].(int))
226
+			return
227
+		}
228
+	}
229
+}