Signed-off-by: Daniel Dao <dqminh@cloudflare.com>
| ... | ... |
@@ -68,7 +68,7 @@ clone git github.com/syndtr/gocapability 2c00daeb6c3b45114c80ac44119e7b8801fdd85 |
| 68 | 68 |
clone git github.com/golang/protobuf f7137ae6b19afbfd61a94b746fda3b3fe0491874 |
| 69 | 69 |
|
| 70 | 70 |
# gelf logging driver deps |
| 71 |
-clone git github.com/Graylog2/go-gelf 6c62a85f1d47a67f2a5144c0e745b325889a8120 |
|
| 71 |
+clone git github.com/Graylog2/go-gelf aab2f594e4585d43468ac57287b0dece9d806883 |
|
| 72 | 72 |
|
| 73 | 73 |
clone git github.com/fluent/fluent-logger-golang v1.0.0 |
| 74 | 74 |
# fluent-logger-golang deps |
| ... | ... |
@@ -66,7 +66,6 @@ func (r *Reader) ReadMessage() (*Message, error) {
|
| 66 | 66 |
var ( |
| 67 | 67 |
err error |
| 68 | 68 |
n, length int |
| 69 |
- buf bytes.Buffer |
|
| 70 | 69 |
cid, ocid []byte |
| 71 | 70 |
seq, total uint8 |
| 72 | 71 |
cHead []byte |
| ... | ... |
@@ -122,19 +121,18 @@ func (r *Reader) ReadMessage() (*Message, error) {
|
| 122 | 122 |
// zlib is slightly more complicated, but correct |
| 123 | 123 |
cReader, err = zlib.NewReader(bytes.NewReader(cBuf)) |
| 124 | 124 |
} else {
|
| 125 |
- return nil, fmt.Errorf("unknown magic: %x %v", cHead, cHead)
|
|
| 125 |
+ // compliance with https://github.com/Graylog2/graylog2-server |
|
| 126 |
+ // treating all messages as uncompressed if they are not gzip, zlib or |
|
| 127 |
+ // chunked |
|
| 128 |
+ cReader = bytes.NewReader(cBuf) |
|
| 126 | 129 |
} |
| 127 | 130 |
|
| 128 | 131 |
if err != nil {
|
| 129 | 132 |
return nil, fmt.Errorf("NewReader: %s", err)
|
| 130 | 133 |
} |
| 131 | 134 |
|
| 132 |
- if _, err = io.Copy(&buf, cReader); err != nil {
|
|
| 133 |
- return nil, fmt.Errorf("io.Copy: %s", err)
|
|
| 134 |
- } |
|
| 135 |
- |
|
| 136 | 135 |
msg := new(Message) |
| 137 |
- if err := json.Unmarshal(buf.Bytes(), &msg); err != nil {
|
|
| 136 |
+ if err := json.NewDecoder(cReader).Decode(&msg); err != nil {
|
|
| 138 | 137 |
return nil, fmt.Errorf("json.Unmarshal: %s", err)
|
| 139 | 138 |
} |
| 140 | 139 |
|
| ... | ... |
@@ -41,6 +41,7 @@ type CompressType int |
| 41 | 41 |
const ( |
| 42 | 42 |
CompressGzip CompressType = iota |
| 43 | 43 |
CompressZlib |
| 44 |
+ CompressNone |
|
| 44 | 45 |
) |
| 45 | 46 |
|
| 46 | 47 |
// Message represents the contents of the GELF message. It is gzipped |
| ... | ... |
@@ -49,15 +50,14 @@ type Message struct {
|
| 49 | 49 |
Version string `json:"version"` |
| 50 | 50 |
Host string `json:"host"` |
| 51 | 51 |
Short string `json:"short_message"` |
| 52 |
- Full string `json:"full_message"` |
|
| 52 |
+ Full string `json:"full_message,omitempty"` |
|
| 53 | 53 |
TimeUnix float64 `json:"timestamp"` |
| 54 |
- Level int32 `json:"level"` |
|
| 55 |
- Facility string `json:"facility"` |
|
| 54 |
+ Level int32 `json:"level,omitempty"` |
|
| 55 |
+ Facility string `json:"facility,omitempty"` |
|
| 56 | 56 |
Extra map[string]interface{} `json:"-"`
|
| 57 |
+ RawExtra json.RawMessage `json:"-"` |
|
| 57 | 58 |
} |
| 58 | 59 |
|
| 59 |
-type innerMessage Message //against circular (Un)MarshalJSON |
|
| 60 |
- |
|
| 61 | 60 |
// Used to control GELF chunking. Should be less than (MTU - len(UDP |
| 62 | 61 |
// header)). |
| 63 | 62 |
// |
| ... | ... |
@@ -76,14 +76,14 @@ var ( |
| 76 | 76 |
|
| 77 | 77 |
// Syslog severity levels |
| 78 | 78 |
const ( |
| 79 |
- LOG_EMERG = int32(0) |
|
| 80 |
- LOG_ALERT = int32(1) |
|
| 81 |
- LOG_CRIT = int32(2) |
|
| 82 |
- LOG_ERR = int32(3) |
|
| 83 |
- LOG_WARNING = int32(4) |
|
| 84 |
- LOG_NOTICE = int32(5) |
|
| 85 |
- LOG_INFO = int32(6) |
|
| 86 |
- LOG_DEBUG = int32(7) |
|
| 79 |
+ LOG_EMERG = int32(0) |
|
| 80 |
+ LOG_ALERT = int32(1) |
|
| 81 |
+ LOG_CRIT = int32(2) |
|
| 82 |
+ LOG_ERR = int32(3) |
|
| 83 |
+ LOG_WARNING = int32(4) |
|
| 84 |
+ LOG_NOTICE = int32(5) |
|
| 85 |
+ LOG_INFO = int32(6) |
|
| 86 |
+ LOG_DEBUG = int32(7) |
|
| 87 | 87 |
) |
| 88 | 88 |
|
| 89 | 89 |
// numChunks returns the number of GELF chunks necessary to transmit |
| ... | ... |
@@ -176,40 +176,70 @@ func (w *Writer) writeChunked(zBytes []byte) (err error) {
|
| 176 | 176 |
return nil |
| 177 | 177 |
} |
| 178 | 178 |
|
| 179 |
+// 1k bytes buffer by default |
|
| 180 |
+var bufPool = sync.Pool{
|
|
| 181 |
+ New: func() interface{} {
|
|
| 182 |
+ return bytes.NewBuffer(make([]byte, 0, 1024)) |
|
| 183 |
+ }, |
|
| 184 |
+} |
|
| 185 |
+ |
|
| 186 |
+func newBuffer() *bytes.Buffer {
|
|
| 187 |
+ b := bufPool.Get().(*bytes.Buffer) |
|
| 188 |
+ if b != nil {
|
|
| 189 |
+ b.Reset() |
|
| 190 |
+ return b |
|
| 191 |
+ } |
|
| 192 |
+ return bytes.NewBuffer(nil) |
|
| 193 |
+} |
|
| 194 |
+ |
|
| 179 | 195 |
// WriteMessage sends the specified message to the GELF server |
| 180 | 196 |
// specified in the call to New(). It assumes all the fields are |
| 181 | 197 |
// filled out appropriately. In general, clients will want to use |
| 182 | 198 |
// Write, rather than WriteMessage. |
| 183 | 199 |
func (w *Writer) WriteMessage(m *Message) (err error) {
|
| 184 |
- mBytes, err := json.Marshal(m) |
|
| 185 |
- if err != nil {
|
|
| 186 |
- return |
|
| 200 |
+ mBuf := newBuffer() |
|
| 201 |
+ defer bufPool.Put(mBuf) |
|
| 202 |
+ if err = m.MarshalJSONBuf(mBuf); err != nil {
|
|
| 203 |
+ return err |
|
| 187 | 204 |
} |
| 205 |
+ mBytes := mBuf.Bytes() |
|
| 206 |
+ |
|
| 207 |
+ var ( |
|
| 208 |
+ zBuf *bytes.Buffer |
|
| 209 |
+ zBytes []byte |
|
| 210 |
+ ) |
|
| 188 | 211 |
|
| 189 |
- var zBuf bytes.Buffer |
|
| 190 | 212 |
var zw io.WriteCloser |
| 191 | 213 |
switch w.CompressionType {
|
| 192 | 214 |
case CompressGzip: |
| 193 |
- zw, err = gzip.NewWriterLevel(&zBuf, w.CompressionLevel) |
|
| 215 |
+ zBuf = newBuffer() |
|
| 216 |
+ defer bufPool.Put(zBuf) |
|
| 217 |
+ zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel) |
|
| 194 | 218 |
case CompressZlib: |
| 195 |
- zw, err = zlib.NewWriterLevel(&zBuf, w.CompressionLevel) |
|
| 219 |
+ zBuf = newBuffer() |
|
| 220 |
+ defer bufPool.Put(zBuf) |
|
| 221 |
+ zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel) |
|
| 222 |
+ case CompressNone: |
|
| 223 |
+ zBytes = mBytes |
|
| 196 | 224 |
default: |
| 197 | 225 |
panic(fmt.Sprintf("unknown compression type %d",
|
| 198 | 226 |
w.CompressionType)) |
| 199 | 227 |
} |
| 200 |
- if err != nil {
|
|
| 201 |
- return |
|
| 202 |
- } |
|
| 203 |
- if _, err = zw.Write(mBytes); err != nil {
|
|
| 204 |
- return |
|
| 228 |
+ if zw != nil {
|
|
| 229 |
+ if err != nil {
|
|
| 230 |
+ return |
|
| 231 |
+ } |
|
| 232 |
+ if _, err = zw.Write(mBytes); err != nil {
|
|
| 233 |
+ zw.Close() |
|
| 234 |
+ return |
|
| 235 |
+ } |
|
| 236 |
+ zw.Close() |
|
| 237 |
+ zBytes = zBuf.Bytes() |
|
| 205 | 238 |
} |
| 206 |
- zw.Close() |
|
| 207 | 239 |
|
| 208 |
- zBytes := zBuf.Bytes() |
|
| 209 | 240 |
if numChunks(zBytes) > 1 {
|
| 210 | 241 |
return w.writeChunked(zBytes) |
| 211 | 242 |
} |
| 212 |
- |
|
| 213 | 243 |
n, err := w.conn.Write(zBytes) |
| 214 | 244 |
if err != nil {
|
| 215 | 245 |
return |
| ... | ... |
@@ -222,8 +252,8 @@ func (w *Writer) WriteMessage(m *Message) (err error) {
|
| 222 | 222 |
} |
| 223 | 223 |
|
| 224 | 224 |
// Close connection and interrupt blocked Read or Write operations |
| 225 |
-func (w *Writer) Close() (error) {
|
|
| 226 |
- return w.conn.Close() |
|
| 225 |
+func (w *Writer) Close() error {
|
|
| 226 |
+ return w.conn.Close() |
|
| 227 | 227 |
} |
| 228 | 228 |
|
| 229 | 229 |
/* |
| ... | ... |
@@ -315,28 +345,43 @@ func (w *Writer) Write(p []byte) (n int, err error) {
|
| 315 | 315 |
return len(p), nil |
| 316 | 316 |
} |
| 317 | 317 |
|
| 318 |
-func (m *Message) MarshalJSON() ([]byte, error) {
|
|
| 319 |
- var err error |
|
| 320 |
- var b, eb []byte |
|
| 321 |
- |
|
| 322 |
- extra := m.Extra |
|
| 323 |
- b, err = json.Marshal((*innerMessage)(m)) |
|
| 324 |
- m.Extra = extra |
|
| 318 |
+func (m *Message) MarshalJSONBuf(buf *bytes.Buffer) error {
|
|
| 319 |
+ b, err := json.Marshal(m) |
|
| 325 | 320 |
if err != nil {
|
| 326 |
- return nil, err |
|
| 321 |
+ return err |
|
| 327 | 322 |
} |
| 328 |
- |
|
| 329 |
- if len(extra) == 0 {
|
|
| 330 |
- return b, nil |
|
| 323 |
+ // write up until the final } |
|
| 324 |
+ if _, err = buf.Write(b[:len(b)-1]); err != nil {
|
|
| 325 |
+ return err |
|
| 326 |
+ } |
|
| 327 |
+ if len(m.Extra) > 0 {
|
|
| 328 |
+ eb, err := json.Marshal(m.Extra) |
|
| 329 |
+ if err != nil {
|
|
| 330 |
+ return err |
|
| 331 |
+ } |
|
| 332 |
+ // merge serialized message + serialized extra map |
|
| 333 |
+ if err = buf.WriteByte(','); err != nil {
|
|
| 334 |
+ return err |
|
| 335 |
+ } |
|
| 336 |
+ // write serialized extra bytes, without enclosing quotes |
|
| 337 |
+ if _, err = buf.Write(eb[1 : len(eb)-1]); err != nil {
|
|
| 338 |
+ return err |
|
| 339 |
+ } |
|
| 331 | 340 |
} |
| 332 | 341 |
|
| 333 |
- if eb, err = json.Marshal(extra); err != nil {
|
|
| 334 |
- return nil, err |
|
| 342 |
+ if len(m.RawExtra) > 0 {
|
|
| 343 |
+ if err := buf.WriteByte(','); err != nil {
|
|
| 344 |
+ return err |
|
| 345 |
+ } |
|
| 346 |
+ |
|
| 347 |
+ // write serialized extra bytes, without enclosing quotes |
|
| 348 |
+ if _, err = buf.Write(m.RawExtra[1 : len(m.RawExtra)-1]); err != nil {
|
|
| 349 |
+ return err |
|
| 350 |
+ } |
|
| 335 | 351 |
} |
| 336 | 352 |
|
| 337 |
- // merge serialized message + serialized extra map |
|
| 338 |
- b[len(b)-1] = ',' |
|
| 339 |
- return append(b, eb[1:len(eb)]...), nil |
|
| 353 |
+ // write final closing quotes |
|
| 354 |
+ return buf.WriteByte('}')
|
|
| 340 | 355 |
} |
| 341 | 356 |
|
| 342 | 357 |
func (m *Message) UnmarshalJSON(data []byte) error {
|