Browse code

Merge pull request #39074 from thaJeztah/bump_fluentd

Bump fluent/fluent-logger-golang v1.4.0

Yong Tang authored on 2019/04/19 11:00:02
Showing 17 changed files
... ...
@@ -133,7 +133,7 @@ func New(info logger.Info) (logger.Logger, error) {
133 133
 		BufferLimit:        bufferLimit,
134 134
 		RetryWait:          retryWait,
135 135
 		MaxRetry:           maxRetries,
136
-		AsyncConnect:       asyncConnect,
136
+		Async:              asyncConnect,
137 137
 		SubSecondPrecision: subSecondPrecision,
138 138
 	}
139 139
 
... ...
@@ -95,9 +95,9 @@ github.com/golang/protobuf                          aa810b61a9c79d51363740d207bb
95 95
 github.com/Graylog2/go-gelf                         4143646226541087117ff2f83334ea48b3201841
96 96
 
97 97
 # fluent-logger-golang deps
98
-github.com/fluent/fluent-logger-golang              8bbc2356beaf021b04c9bd5cdc76ea5a7ccb40ec # v1.3.0
98
+github.com/fluent/fluent-logger-golang              7a6c9dcd7f14c2ed5d8c55c11b894e5455ee311b # v1.4.0
99 99
 github.com/philhofer/fwd                            bb6d471dc95d4fe11e432687f8b70ff496cf3136 # v1.0.0
100
-github.com/tinylib/msgp                             3b556c64540842d4f82967be066a7f7fffc3adad
100
+github.com/tinylib/msgp                             af6442a0fcf6e2a1b824f70dd0c734f01e817751 # v1.1.0
101 101
 
102 102
 # fsnotify
103 103
 github.com/fsnotify/fsnotify                        1485a34d5d5723fea214f5710708e19a831720e4 # v1.4.7-11-g1485a34
... ...
@@ -64,6 +64,16 @@ f := fluent.New(fluent.Config{FluentPort: 80, FluentHost: "example.com"})
64 64
 Sets the timeout for Write call of logger.Post.
65 65
 Since the default is zero value, Write will not time out.
66 66
 
67
+### Async
68
+
69
+Enable asynchronous I/O (connect and write) for sending events to Fluentd.
70
+The default is false.
71
+
72
+### RequestAck
73
+
74
+Sets whether to request acknowledgment from Fluentd to increase the reliability
75
+of the connection. The default is false.
76
+
67 77
 ## Tests
68 78
 ```
69 79
 go test
... ...
@@ -6,12 +6,17 @@ import (
6 6
 	"fmt"
7 7
 	"math"
8 8
 	"net"
9
+	"os"
9 10
 	"reflect"
10 11
 	"strconv"
11 12
 	"sync"
12 13
 	"time"
13 14
 
15
+	"bytes"
16
+	"encoding/base64"
17
+	"encoding/binary"
14 18
 	"github.com/tinylib/msgp/msgp"
19
+	"math/rand"
15 20
 )
16 21
 
17 22
 const (
... ...
@@ -21,8 +26,9 @@ const (
21 21
 	defaultPort                   = 24224
22 22
 	defaultTimeout                = 3 * time.Second
23 23
 	defaultWriteTimeout           = time.Duration(0) // Write() will not time out
24
-	defaultBufferLimit            = 8 * 1024 * 1024
24
+	defaultBufferLimit            = 8 * 1024
25 25
 	defaultRetryWait              = 500
26
+	defaultMaxRetryWait           = 60000
26 27
 	defaultMaxRetry               = 13
27 28
 	defaultReconnectWaitIncreRate = 1.5
28 29
 	// Default sub-second precision value to false since it is only compatible
... ...
@@ -40,24 +46,36 @@ type Config struct {
40 40
 	BufferLimit      int           `json:"buffer_limit"`
41 41
 	RetryWait        int           `json:"retry_wait"`
42 42
 	MaxRetry         int           `json:"max_retry"`
43
+	MaxRetryWait     int           `json:"max_retry_wait"`
43 44
 	TagPrefix        string        `json:"tag_prefix"`
44
-	AsyncConnect     bool          `json:"async_connect"`
45
-	MarshalAsJSON    bool          `json:"marshal_as_json"`
45
+	Async            bool          `json:"async"`
46
+	// Deprecated: Use Async instead
47
+	AsyncConnect  bool `json:"async_connect"`
48
+	MarshalAsJSON bool `json:"marshal_as_json"`
46 49
 
47 50
 	// Sub-second precision timestamps are only possible for those using fluentd
48 51
 	// v0.14+ and serializing their messages with msgpack.
49 52
 	SubSecondPrecision bool `json:"sub_second_precision"`
53
+
54
+	// RequestAck sends the chunk option with a unique ID. The server will
55
+	// respond with an acknowledgement. This option improves the reliability
56
+	// of the message transmission.
57
+	RequestAck bool `json:"request_ack"`
58
+}
59
+
60
+type msgToSend struct {
61
+	data []byte
62
+	ack  string
50 63
 }
51 64
 
52 65
 type Fluent struct {
53 66
 	Config
54 67
 
55
-	mubuff  sync.Mutex
56
-	pending []byte
68
+	pending chan *msgToSend
69
+	wg      sync.WaitGroup
57 70
 
58
-	muconn       sync.Mutex
59
-	conn         net.Conn
60
-	reconnecting bool
71
+	muconn sync.Mutex
72
+	conn   net.Conn
61 73
 }
62 74
 
63 75
 // New creates a new Logger.
... ...
@@ -89,11 +107,22 @@ func New(config Config) (f *Fluent, err error) {
89 89
 	if config.MaxRetry == 0 {
90 90
 		config.MaxRetry = defaultMaxRetry
91 91
 	}
92
+	if config.MaxRetryWait == 0 {
93
+		config.MaxRetryWait = defaultMaxRetryWait
94
+	}
92 95
 	if config.AsyncConnect {
93
-		f = &Fluent{Config: config, reconnecting: true}
94
-		go f.reconnect()
96
+		fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
97
+		config.Async = config.Async || config.AsyncConnect
98
+	}
99
+	if config.Async {
100
+		f = &Fluent{
101
+			Config:  config,
102
+			pending: make(chan *msgToSend, config.BufferLimit),
103
+		}
104
+		f.wg.Add(1)
105
+		go f.run()
95 106
 	} else {
96
-		f = &Fluent{Config: config, reconnecting: false}
107
+		f = &Fluent{Config: config}
97 108
 		err = f.connect()
98 109
 	}
99 110
 	return
... ...
@@ -173,28 +202,25 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
173 173
 }
174 174
 
175 175
 func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
176
-	var data []byte
176
+	var msg *msgToSend
177 177
 	var err error
178
-	if data, err = f.EncodeData(tag, tm, message); err != nil {
178
+	if msg, err = f.EncodeData(tag, tm, message); err != nil {
179 179
 		return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
180 180
 	}
181
-	return f.postRawData(data)
181
+	return f.postRawData(msg)
182 182
 }
183 183
 
184 184
 // Deprecated: Use EncodeAndPostData instead
185
-func (f *Fluent) PostRawData(data []byte) {
186
-	f.postRawData(data)
185
+func (f *Fluent) PostRawData(msg *msgToSend) {
186
+	f.postRawData(msg)
187 187
 }
188 188
 
189
-func (f *Fluent) postRawData(data []byte) error {
190
-	if err := f.appendBuffer(data); err != nil {
191
-		return err
192
-	}
193
-	if err := f.send(); err != nil {
194
-		f.close()
195
-		return err
189
+func (f *Fluent) postRawData(msg *msgToSend) error {
190
+	if f.Config.Async {
191
+		return f.appendBuffer(msg)
196 192
 	}
197
-	return nil
193
+	// Synchronous write
194
+	return f.write(msg)
198 195
 }
199 196
 
200 197
 // For sending forward protocol adopted JSON
... ...
@@ -207,43 +233,80 @@ type MessageChunk struct {
207 207
 // So, it should write JSON marshaler by hand.
208 208
 func (chunk *MessageChunk) MarshalJSON() ([]byte, error) {
209 209
 	data, err := json.Marshal(chunk.message.Record)
210
-	return []byte(fmt.Sprintf("[\"%s\",%d,%s,null]", chunk.message.Tag,
211
-		chunk.message.Time, data)), err
210
+	if err != nil {
211
+		return nil, err
212
+	}
213
+	option, err := json.Marshal(chunk.message.Option)
214
+	if err != nil {
215
+		return nil, err
216
+	}
217
+	return []byte(fmt.Sprintf("[\"%s\",%d,%s,%s]", chunk.message.Tag,
218
+		chunk.message.Time, data, option)), err
212 219
 }
213 220
 
214
-func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data []byte, err error) {
221
+// getUniqueID returns a base64 encoded unique ID that can be used for chunk/ack
222
+// mechanism, see
223
+// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#option
224
+func getUniqueID(timeUnix int64) (string, error) {
225
+	buf := bytes.NewBuffer(nil)
226
+	enc := base64.NewEncoder(base64.StdEncoding, buf)
227
+	if err := binary.Write(enc, binary.LittleEndian, timeUnix); err != nil {
228
+		enc.Close()
229
+		return "", err
230
+	}
231
+	if err := binary.Write(enc, binary.LittleEndian, rand.Uint64()); err != nil {
232
+		enc.Close()
233
+		return "", err
234
+	}
235
+	// encoder needs to be closed before buf.String(), defer does not work
236
+	// here
237
+	enc.Close()
238
+	return buf.String(), nil
239
+}
240
+
241
+func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) {
242
+	option := make(map[string]string)
243
+	msg = &msgToSend{}
215 244
 	timeUnix := tm.Unix()
245
+	if f.Config.RequestAck {
246
+		var err error
247
+		msg.ack, err = getUniqueID(timeUnix)
248
+		if err != nil {
249
+			return nil, err
250
+		}
251
+		option["chunk"] = msg.ack
252
+	}
216 253
 	if f.Config.MarshalAsJSON {
217
-		msg := Message{Tag: tag, Time: timeUnix, Record: message}
218
-		chunk := &MessageChunk{message: msg}
219
-		data, err = json.Marshal(chunk)
254
+		m := Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
255
+		chunk := &MessageChunk{message: m}
256
+		msg.data, err = json.Marshal(chunk)
220 257
 	} else if f.Config.SubSecondPrecision {
221
-		msg := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message}
222
-		data, err = msg.MarshalMsg(nil)
258
+		m := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message, Option: option}
259
+		msg.data, err = m.MarshalMsg(nil)
223 260
 	} else {
224
-		msg := &Message{Tag: tag, Time: timeUnix, Record: message}
225
-		data, err = msg.MarshalMsg(nil)
261
+		m := &Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
262
+		msg.data, err = m.MarshalMsg(nil)
226 263
 	}
227 264
 	return
228 265
 }
229 266
 
230
-// Close closes the connection.
267
+// Close closes the connection, waiting for pending logs to be sent
231 268
 func (f *Fluent) Close() (err error) {
232
-	if len(f.pending) > 0 {
233
-		err = f.send()
269
+	if f.Config.Async {
270
+		close(f.pending)
271
+		f.wg.Wait()
234 272
 	}
235 273
 	f.close()
236 274
 	return
237 275
 }
238 276
 
239 277
 // appendBuffer appends data to buffer with lock.
240
-func (f *Fluent) appendBuffer(data []byte) error {
241
-	f.mubuff.Lock()
242
-	defer f.mubuff.Unlock()
243
-	if len(f.pending)+len(data) > f.Config.BufferLimit {
244
-		return errors.New(fmt.Sprintf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit))
278
+func (f *Fluent) appendBuffer(msg *msgToSend) error {
279
+	select {
280
+	case f.pending <- msg:
281
+	default:
282
+		return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
245 283
 	}
246
-	f.pending = append(f.pending, data...)
247 284
 	return nil
248 285
 }
249 286
 
... ...
@@ -259,8 +322,6 @@ func (f *Fluent) close() {
259 259
 
260 260
 // connect establishes a new connection using the specified transport.
261 261
 func (f *Fluent) connect() (err error) {
262
-	f.muconn.Lock()
263
-	defer f.muconn.Unlock()
264 262
 
265 263
 	switch f.Config.FluentNetwork {
266 264
 	case "tcp":
... ...
@@ -270,63 +331,78 @@ func (f *Fluent) connect() (err error) {
270 270
 	default:
271 271
 		err = net.UnknownNetworkError(f.Config.FluentNetwork)
272 272
 	}
273
+	return err
274
+}
273 275
 
274
-	if err == nil {
275
-		f.reconnecting = false
276
+func (f *Fluent) run() {
277
+	for {
278
+		select {
279
+		case entry, ok := <-f.pending:
280
+			if !ok {
281
+				f.wg.Done()
282
+				return
283
+			}
284
+			err := f.write(entry)
285
+			if err != nil {
286
+				fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
287
+			}
288
+		}
276 289
 	}
277
-	return
278 290
 }
279 291
 
280 292
 func e(x, y float64) int {
281 293
 	return int(math.Pow(x, y))
282 294
 }
283 295
 
284
-func (f *Fluent) reconnect() {
285
-	for i := 0; ; i++ {
286
-		err := f.connect()
287
-		if err == nil {
288
-			f.send()
289
-			return
290
-		}
291
-		if i == f.Config.MaxRetry {
292
-			// TODO: What we can do when connection failed MaxRetry times?
293
-			panic("fluent#reconnect: failed to reconnect!")
294
-		}
295
-		waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
296
-		time.Sleep(time.Duration(waitTime) * time.Millisecond)
297
-	}
298
-}
299
-
300
-func (f *Fluent) send() error {
301
-	f.muconn.Lock()
302
-	defer f.muconn.Unlock()
303
-
304
-	if f.conn == nil {
305
-		if f.reconnecting == false {
306
-			f.reconnecting = true
307
-			go f.reconnect()
296
+func (f *Fluent) write(msg *msgToSend) error {
297
+
298
+	for i := 0; i < f.Config.MaxRetry; i++ {
299
+
300
+		// Connect if needed
301
+		f.muconn.Lock()
302
+		if f.conn == nil {
303
+			err := f.connect()
304
+			if err != nil {
305
+				f.muconn.Unlock()
306
+				waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
307
+				if waitTime > f.Config.MaxRetryWait {
308
+					waitTime = f.Config.MaxRetryWait
309
+				}
310
+				time.Sleep(time.Duration(waitTime) * time.Millisecond)
311
+				continue
312
+			}
308 313
 		}
309
-		return errors.New("fluent#send: can't send logs, client is reconnecting")
310
-	}
311
-
312
-	f.mubuff.Lock()
313
-	defer f.mubuff.Unlock()
314
+		f.muconn.Unlock()
314 315
 
315
-	var err error
316
-	if len(f.pending) > 0 {
316
+		// We're connected, write msg
317 317
 		t := f.Config.WriteTimeout
318 318
 		if time.Duration(0) < t {
319 319
 			f.conn.SetWriteDeadline(time.Now().Add(t))
320 320
 		} else {
321 321
 			f.conn.SetWriteDeadline(time.Time{})
322 322
 		}
323
-		_, err = f.conn.Write(f.pending)
323
+		_, err := f.conn.Write(msg.data)
324 324
 		if err != nil {
325
-			f.conn.Close()
326
-			f.conn = nil
325
+			f.close()
327 326
 		} else {
328
-			f.pending = f.pending[:0]
327
+			// Acknowledgment check
328
+			if msg.ack != "" {
329
+				resp := &AckResp{}
330
+				if f.Config.MarshalAsJSON {
331
+					dec := json.NewDecoder(f.conn)
332
+					err = dec.Decode(resp)
333
+				} else {
334
+					r := msgp.NewReader(f.conn)
335
+					err = resp.DecodeMsg(r)
336
+				}
337
+				if err != nil || resp.Ack != msg.ack {
338
+					f.close()
339
+					continue
340
+				}
341
+			}
342
+			return err
329 343
 		}
330 344
 	}
331
-	return err
345
+
346
+	return fmt.Errorf("fluent#write: failed to reconnect, max retry: %v", f.Config.MaxRetry)
332 347
 }
... ...
@@ -16,9 +16,9 @@ type Entry struct {
16 16
 
17 17
 //msgp:tuple Forward
18 18
 type Forward struct {
19
-	Tag     string      `msg:"tag"`
20
-	Entries []Entry     `msg:"entries"`
21
-	Option  interface{} `msg:"option"`
19
+	Tag     string  `msg:"tag"`
20
+	Entries []Entry `msg:"entries"`
21
+	Option  map[string]string
22 22
 }
23 23
 
24 24
 //msgp:tuple Message
... ...
@@ -26,7 +26,7 @@ type Message struct {
26 26
 	Tag    string      `msg:"tag"`
27 27
 	Time   int64       `msg:"time"`
28 28
 	Record interface{} `msg:"record"`
29
-	Option interface{} `msg:"option"`
29
+	Option map[string]string
30 30
 }
31 31
 
32 32
 //msgp:tuple MessageExt
... ...
@@ -34,7 +34,11 @@ type MessageExt struct {
34 34
 	Tag    string      `msg:"tag"`
35 35
 	Time   EventTime   `msg:"time,extension"`
36 36
 	Record interface{} `msg:"record"`
37
-	Option interface{} `msg:"option"`
37
+	Option map[string]string
38
+}
39
+
40
+type AckResp struct {
41
+	Ack string `json:"ack" msg:"ack"`
38 42
 }
39 43
 
40 44
 // EventTime is an extension to the serialized time value. It builds in support
... ...
@@ -1,30 +1,134 @@
1 1
 package fluent
2 2
 
3
-// NOTE: THIS FILE WAS PRODUCED BY THE
4
-// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp)
5
-// DO NOT EDIT
3
+// Code generated by github.com/tinylib/msgp DO NOT EDIT.
6 4
 
7 5
 import (
8 6
 	"github.com/tinylib/msgp/msgp"
9 7
 )
10 8
 
11 9
 // DecodeMsg implements msgp.Decodable
10
+func (z *AckResp) DecodeMsg(dc *msgp.Reader) (err error) {
11
+	var field []byte
12
+	_ = field
13
+	var zb0001 uint32
14
+	zb0001, err = dc.ReadMapHeader()
15
+	if err != nil {
16
+		err = msgp.WrapError(err)
17
+		return
18
+	}
19
+	for zb0001 > 0 {
20
+		zb0001--
21
+		field, err = dc.ReadMapKeyPtr()
22
+		if err != nil {
23
+			err = msgp.WrapError(err)
24
+			return
25
+		}
26
+		switch msgp.UnsafeString(field) {
27
+		case "ack":
28
+			z.Ack, err = dc.ReadString()
29
+			if err != nil {
30
+				err = msgp.WrapError(err, "Ack")
31
+				return
32
+			}
33
+		default:
34
+			err = dc.Skip()
35
+			if err != nil {
36
+				err = msgp.WrapError(err)
37
+				return
38
+			}
39
+		}
40
+	}
41
+	return
42
+}
43
+
44
+// EncodeMsg implements msgp.Encodable
45
+func (z AckResp) EncodeMsg(en *msgp.Writer) (err error) {
46
+	// map header, size 1
47
+	// write "ack"
48
+	err = en.Append(0x81, 0xa3, 0x61, 0x63, 0x6b)
49
+	if err != nil {
50
+		return
51
+	}
52
+	err = en.WriteString(z.Ack)
53
+	if err != nil {
54
+		err = msgp.WrapError(err, "Ack")
55
+		return
56
+	}
57
+	return
58
+}
59
+
60
+// MarshalMsg implements msgp.Marshaler
61
+func (z AckResp) MarshalMsg(b []byte) (o []byte, err error) {
62
+	o = msgp.Require(b, z.Msgsize())
63
+	// map header, size 1
64
+	// string "ack"
65
+	o = append(o, 0x81, 0xa3, 0x61, 0x63, 0x6b)
66
+	o = msgp.AppendString(o, z.Ack)
67
+	return
68
+}
69
+
70
+// UnmarshalMsg implements msgp.Unmarshaler
71
+func (z *AckResp) UnmarshalMsg(bts []byte) (o []byte, err error) {
72
+	var field []byte
73
+	_ = field
74
+	var zb0001 uint32
75
+	zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
76
+	if err != nil {
77
+		err = msgp.WrapError(err)
78
+		return
79
+	}
80
+	for zb0001 > 0 {
81
+		zb0001--
82
+		field, bts, err = msgp.ReadMapKeyZC(bts)
83
+		if err != nil {
84
+			err = msgp.WrapError(err)
85
+			return
86
+		}
87
+		switch msgp.UnsafeString(field) {
88
+		case "ack":
89
+			z.Ack, bts, err = msgp.ReadStringBytes(bts)
90
+			if err != nil {
91
+				err = msgp.WrapError(err, "Ack")
92
+				return
93
+			}
94
+		default:
95
+			bts, err = msgp.Skip(bts)
96
+			if err != nil {
97
+				err = msgp.WrapError(err)
98
+				return
99
+			}
100
+		}
101
+	}
102
+	o = bts
103
+	return
104
+}
105
+
106
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
107
+func (z AckResp) Msgsize() (s int) {
108
+	s = 1 + 4 + msgp.StringPrefixSize + len(z.Ack)
109
+	return
110
+}
111
+
112
+// DecodeMsg implements msgp.Decodable
12 113
 func (z *Entry) DecodeMsg(dc *msgp.Reader) (err error) {
13
-	var zxvk uint32
14
-	zxvk, err = dc.ReadArrayHeader()
114
+	var zb0001 uint32
115
+	zb0001, err = dc.ReadArrayHeader()
15 116
 	if err != nil {
117
+		err = msgp.WrapError(err)
16 118
 		return
17 119
 	}
18
-	if zxvk != 2 {
19
-		err = msgp.ArrayError{Wanted: 2, Got: zxvk}
120
+	if zb0001 != 2 {
121
+		err = msgp.ArrayError{Wanted: 2, Got: zb0001}
20 122
 		return
21 123
 	}
22 124
 	z.Time, err = dc.ReadInt64()
23 125
 	if err != nil {
126
+		err = msgp.WrapError(err, "Time")
24 127
 		return
25 128
 	}
26 129
 	z.Record, err = dc.ReadIntf()
27 130
 	if err != nil {
131
+		err = msgp.WrapError(err, "Record")
28 132
 		return
29 133
 	}
30 134
 	return
... ...
@@ -35,14 +139,16 @@ func (z Entry) EncodeMsg(en *msgp.Writer) (err error) {
35 35
 	// array header, size 2
36 36
 	err = en.Append(0x92)
37 37
 	if err != nil {
38
-		return err
38
+		return
39 39
 	}
40 40
 	err = en.WriteInt64(z.Time)
41 41
 	if err != nil {
42
+		err = msgp.WrapError(err, "Time")
42 43
 		return
43 44
 	}
44 45
 	err = en.WriteIntf(z.Record)
45 46
 	if err != nil {
47
+		err = msgp.WrapError(err, "Record")
46 48
 		return
47 49
 	}
48 50
 	return
... ...
@@ -56,6 +162,7 @@ func (z Entry) MarshalMsg(b []byte) (o []byte, err error) {
56 56
 	o = msgp.AppendInt64(o, z.Time)
57 57
 	o, err = msgp.AppendIntf(o, z.Record)
58 58
 	if err != nil {
59
+		err = msgp.WrapError(err, "Record")
59 60
 		return
60 61
 	}
61 62
 	return
... ...
@@ -63,21 +170,24 @@ func (z Entry) MarshalMsg(b []byte) (o []byte, err error) {
63 63
 
64 64
 // UnmarshalMsg implements msgp.Unmarshaler
65 65
 func (z *Entry) UnmarshalMsg(bts []byte) (o []byte, err error) {
66
-	var zbzg uint32
67
-	zbzg, bts, err = msgp.ReadArrayHeaderBytes(bts)
66
+	var zb0001 uint32
67
+	zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
68 68
 	if err != nil {
69
+		err = msgp.WrapError(err)
69 70
 		return
70 71
 	}
71
-	if zbzg != 2 {
72
-		err = msgp.ArrayError{Wanted: 2, Got: zbzg}
72
+	if zb0001 != 2 {
73
+		err = msgp.ArrayError{Wanted: 2, Got: zb0001}
73 74
 		return
74 75
 	}
75 76
 	z.Time, bts, err = msgp.ReadInt64Bytes(bts)
76 77
 	if err != nil {
78
+		err = msgp.WrapError(err, "Time")
77 79
 		return
78 80
 	}
79 81
 	z.Record, bts, err = msgp.ReadIntfBytes(bts)
80 82
 	if err != nil {
83
+		err = msgp.WrapError(err, "Record")
81 84
 		return
82 85
 	}
83 86
 	o = bts
... ...
@@ -92,52 +202,83 @@ func (z Entry) Msgsize() (s int) {
92 92
 
93 93
 // DecodeMsg implements msgp.Decodable
94 94
 func (z *Forward) DecodeMsg(dc *msgp.Reader) (err error) {
95
-	var zcmr uint32
96
-	zcmr, err = dc.ReadArrayHeader()
95
+	var zb0001 uint32
96
+	zb0001, err = dc.ReadArrayHeader()
97 97
 	if err != nil {
98
+		err = msgp.WrapError(err)
98 99
 		return
99 100
 	}
100
-	if zcmr != 3 {
101
-		err = msgp.ArrayError{Wanted: 3, Got: zcmr}
101
+	if zb0001 != 3 {
102
+		err = msgp.ArrayError{Wanted: 3, Got: zb0001}
102 103
 		return
103 104
 	}
104 105
 	z.Tag, err = dc.ReadString()
105 106
 	if err != nil {
107
+		err = msgp.WrapError(err, "Tag")
106 108
 		return
107 109
 	}
108
-	var zajw uint32
109
-	zajw, err = dc.ReadArrayHeader()
110
+	var zb0002 uint32
111
+	zb0002, err = dc.ReadArrayHeader()
110 112
 	if err != nil {
113
+		err = msgp.WrapError(err, "Entries")
111 114
 		return
112 115
 	}
113
-	if cap(z.Entries) >= int(zajw) {
114
-		z.Entries = (z.Entries)[:zajw]
116
+	if cap(z.Entries) >= int(zb0002) {
117
+		z.Entries = (z.Entries)[:zb0002]
115 118
 	} else {
116
-		z.Entries = make([]Entry, zajw)
119
+		z.Entries = make([]Entry, zb0002)
117 120
 	}
118
-	for zbai := range z.Entries {
119
-		var zwht uint32
120
-		zwht, err = dc.ReadArrayHeader()
121
+	for za0001 := range z.Entries {
122
+		var zb0003 uint32
123
+		zb0003, err = dc.ReadArrayHeader()
121 124
 		if err != nil {
125
+			err = msgp.WrapError(err, "Entries", za0001)
122 126
 			return
123 127
 		}
124
-		if zwht != 2 {
125
-			err = msgp.ArrayError{Wanted: 2, Got: zwht}
128
+		if zb0003 != 2 {
129
+			err = msgp.ArrayError{Wanted: 2, Got: zb0003}
126 130
 			return
127 131
 		}
128
-		z.Entries[zbai].Time, err = dc.ReadInt64()
132
+		z.Entries[za0001].Time, err = dc.ReadInt64()
129 133
 		if err != nil {
134
+			err = msgp.WrapError(err, "Entries", za0001, "Time")
130 135
 			return
131 136
 		}
132
-		z.Entries[zbai].Record, err = dc.ReadIntf()
137
+		z.Entries[za0001].Record, err = dc.ReadIntf()
133 138
 		if err != nil {
139
+			err = msgp.WrapError(err, "Entries", za0001, "Record")
134 140
 			return
135 141
 		}
136 142
 	}
137
-	z.Option, err = dc.ReadIntf()
143
+	var zb0004 uint32
144
+	zb0004, err = dc.ReadMapHeader()
138 145
 	if err != nil {
146
+		err = msgp.WrapError(err, "Option")
139 147
 		return
140 148
 	}
149
+	if z.Option == nil {
150
+		z.Option = make(map[string]string, zb0004)
151
+	} else if len(z.Option) > 0 {
152
+		for key := range z.Option {
153
+			delete(z.Option, key)
154
+		}
155
+	}
156
+	for zb0004 > 0 {
157
+		zb0004--
158
+		var za0002 string
159
+		var za0003 string
160
+		za0002, err = dc.ReadString()
161
+		if err != nil {
162
+			err = msgp.WrapError(err, "Option")
163
+			return
164
+		}
165
+		za0003, err = dc.ReadString()
166
+		if err != nil {
167
+			err = msgp.WrapError(err, "Option", za0002)
168
+			return
169
+		}
170
+		z.Option[za0002] = za0003
171
+	}
141 172
 	return
142 173
 }
143 174
 
... ...
@@ -146,35 +287,52 @@ func (z *Forward) EncodeMsg(en *msgp.Writer) (err error) {
146 146
 	// array header, size 3
147 147
 	err = en.Append(0x93)
148 148
 	if err != nil {
149
-		return err
149
+		return
150 150
 	}
151 151
 	err = en.WriteString(z.Tag)
152 152
 	if err != nil {
153
+		err = msgp.WrapError(err, "Tag")
153 154
 		return
154 155
 	}
155 156
 	err = en.WriteArrayHeader(uint32(len(z.Entries)))
156 157
 	if err != nil {
158
+		err = msgp.WrapError(err, "Entries")
157 159
 		return
158 160
 	}
159
-	for zbai := range z.Entries {
161
+	for za0001 := range z.Entries {
160 162
 		// array header, size 2
161 163
 		err = en.Append(0x92)
162 164
 		if err != nil {
163
-			return err
165
+			return
164 166
 		}
165
-		err = en.WriteInt64(z.Entries[zbai].Time)
167
+		err = en.WriteInt64(z.Entries[za0001].Time)
166 168
 		if err != nil {
169
+			err = msgp.WrapError(err, "Entries", za0001, "Time")
167 170
 			return
168 171
 		}
169
-		err = en.WriteIntf(z.Entries[zbai].Record)
172
+		err = en.WriteIntf(z.Entries[za0001].Record)
170 173
 		if err != nil {
174
+			err = msgp.WrapError(err, "Entries", za0001, "Record")
171 175
 			return
172 176
 		}
173 177
 	}
174
-	err = en.WriteIntf(z.Option)
178
+	err = en.WriteMapHeader(uint32(len(z.Option)))
175 179
 	if err != nil {
180
+		err = msgp.WrapError(err, "Option")
176 181
 		return
177 182
 	}
183
+	for za0002, za0003 := range z.Option {
184
+		err = en.WriteString(za0002)
185
+		if err != nil {
186
+			err = msgp.WrapError(err, "Option")
187
+			return
188
+		}
189
+		err = en.WriteString(za0003)
190
+		if err != nil {
191
+			err = msgp.WrapError(err, "Option", za0002)
192
+			return
193
+		}
194
+	}
178 195
 	return
179 196
 }
180 197
 
... ...
@@ -185,70 +343,103 @@ func (z *Forward) MarshalMsg(b []byte) (o []byte, err error) {
185 185
 	o = append(o, 0x93)
186 186
 	o = msgp.AppendString(o, z.Tag)
187 187
 	o = msgp.AppendArrayHeader(o, uint32(len(z.Entries)))
188
-	for zbai := range z.Entries {
188
+	for za0001 := range z.Entries {
189 189
 		// array header, size 2
190 190
 		o = append(o, 0x92)
191
-		o = msgp.AppendInt64(o, z.Entries[zbai].Time)
192
-		o, err = msgp.AppendIntf(o, z.Entries[zbai].Record)
191
+		o = msgp.AppendInt64(o, z.Entries[za0001].Time)
192
+		o, err = msgp.AppendIntf(o, z.Entries[za0001].Record)
193 193
 		if err != nil {
194
+			err = msgp.WrapError(err, "Entries", za0001, "Record")
194 195
 			return
195 196
 		}
196 197
 	}
197
-	o, err = msgp.AppendIntf(o, z.Option)
198
-	if err != nil {
199
-		return
198
+	o = msgp.AppendMapHeader(o, uint32(len(z.Option)))
199
+	for za0002, za0003 := range z.Option {
200
+		o = msgp.AppendString(o, za0002)
201
+		o = msgp.AppendString(o, za0003)
200 202
 	}
201 203
 	return
202 204
 }
203 205
 
204 206
 // UnmarshalMsg implements msgp.Unmarshaler
205 207
 func (z *Forward) UnmarshalMsg(bts []byte) (o []byte, err error) {
206
-	var zhct uint32
207
-	zhct, bts, err = msgp.ReadArrayHeaderBytes(bts)
208
+	var zb0001 uint32
209
+	zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
208 210
 	if err != nil {
211
+		err = msgp.WrapError(err)
209 212
 		return
210 213
 	}
211
-	if zhct != 3 {
212
-		err = msgp.ArrayError{Wanted: 3, Got: zhct}
214
+	if zb0001 != 3 {
215
+		err = msgp.ArrayError{Wanted: 3, Got: zb0001}
213 216
 		return
214 217
 	}
215 218
 	z.Tag, bts, err = msgp.ReadStringBytes(bts)
216 219
 	if err != nil {
220
+		err = msgp.WrapError(err, "Tag")
217 221
 		return
218 222
 	}
219
-	var zcua uint32
220
-	zcua, bts, err = msgp.ReadArrayHeaderBytes(bts)
223
+	var zb0002 uint32
224
+	zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
221 225
 	if err != nil {
226
+		err = msgp.WrapError(err, "Entries")
222 227
 		return
223 228
 	}
224
-	if cap(z.Entries) >= int(zcua) {
225
-		z.Entries = (z.Entries)[:zcua]
229
+	if cap(z.Entries) >= int(zb0002) {
230
+		z.Entries = (z.Entries)[:zb0002]
226 231
 	} else {
227
-		z.Entries = make([]Entry, zcua)
232
+		z.Entries = make([]Entry, zb0002)
228 233
 	}
229
-	for zbai := range z.Entries {
230
-		var zxhx uint32
231
-		zxhx, bts, err = msgp.ReadArrayHeaderBytes(bts)
234
+	for za0001 := range z.Entries {
235
+		var zb0003 uint32
236
+		zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts)
232 237
 		if err != nil {
238
+			err = msgp.WrapError(err, "Entries", za0001)
233 239
 			return
234 240
 		}
235
-		if zxhx != 2 {
236
-			err = msgp.ArrayError{Wanted: 2, Got: zxhx}
241
+		if zb0003 != 2 {
242
+			err = msgp.ArrayError{Wanted: 2, Got: zb0003}
237 243
 			return
238 244
 		}
239
-		z.Entries[zbai].Time, bts, err = msgp.ReadInt64Bytes(bts)
245
+		z.Entries[za0001].Time, bts, err = msgp.ReadInt64Bytes(bts)
240 246
 		if err != nil {
247
+			err = msgp.WrapError(err, "Entries", za0001, "Time")
241 248
 			return
242 249
 		}
243
-		z.Entries[zbai].Record, bts, err = msgp.ReadIntfBytes(bts)
250
+		z.Entries[za0001].Record, bts, err = msgp.ReadIntfBytes(bts)
244 251
 		if err != nil {
252
+			err = msgp.WrapError(err, "Entries", za0001, "Record")
245 253
 			return
246 254
 		}
247 255
 	}
248
-	z.Option, bts, err = msgp.ReadIntfBytes(bts)
256
+	var zb0004 uint32
257
+	zb0004, bts, err = msgp.ReadMapHeaderBytes(bts)
249 258
 	if err != nil {
259
+		err = msgp.WrapError(err, "Option")
250 260
 		return
251 261
 	}
262
+	if z.Option == nil {
263
+		z.Option = make(map[string]string, zb0004)
264
+	} else if len(z.Option) > 0 {
265
+		for key := range z.Option {
266
+			delete(z.Option, key)
267
+		}
268
+	}
269
+	for zb0004 > 0 {
270
+		var za0002 string
271
+		var za0003 string
272
+		zb0004--
273
+		za0002, bts, err = msgp.ReadStringBytes(bts)
274
+		if err != nil {
275
+			err = msgp.WrapError(err, "Option")
276
+			return
277
+		}
278
+		za0003, bts, err = msgp.ReadStringBytes(bts)
279
+		if err != nil {
280
+			err = msgp.WrapError(err, "Option", za0002)
281
+			return
282
+		}
283
+		z.Option[za0002] = za0003
284
+	}
252 285
 	o = bts
253 286
 	return
254 287
 }
... ...
@@ -256,40 +447,75 @@ func (z *Forward) UnmarshalMsg(bts []byte) (o []byte, err error) {
256 256
 // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
257 257
 func (z *Forward) Msgsize() (s int) {
258 258
 	s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ArrayHeaderSize
259
-	for zbai := range z.Entries {
260
-		s += 1 + msgp.Int64Size + msgp.GuessSize(z.Entries[zbai].Record)
259
+	for za0001 := range z.Entries {
260
+		s += 1 + msgp.Int64Size + msgp.GuessSize(z.Entries[za0001].Record)
261
+	}
262
+	s += msgp.MapHeaderSize
263
+	if z.Option != nil {
264
+		for za0002, za0003 := range z.Option {
265
+			_ = za0003
266
+			s += msgp.StringPrefixSize + len(za0002) + msgp.StringPrefixSize + len(za0003)
267
+		}
261 268
 	}
262
-	s += msgp.GuessSize(z.Option)
263 269
 	return
264 270
 }
265 271
 
266 272
 // DecodeMsg implements msgp.Decodable
267 273
 func (z *Message) DecodeMsg(dc *msgp.Reader) (err error) {
268
-	var zlqf uint32
269
-	zlqf, err = dc.ReadArrayHeader()
274
+	var zb0001 uint32
275
+	zb0001, err = dc.ReadArrayHeader()
270 276
 	if err != nil {
277
+		err = msgp.WrapError(err)
271 278
 		return
272 279
 	}
273
-	if zlqf != 4 {
274
-		err = msgp.ArrayError{Wanted: 4, Got: zlqf}
280
+	if zb0001 != 4 {
281
+		err = msgp.ArrayError{Wanted: 4, Got: zb0001}
275 282
 		return
276 283
 	}
277 284
 	z.Tag, err = dc.ReadString()
278 285
 	if err != nil {
286
+		err = msgp.WrapError(err, "Tag")
279 287
 		return
280 288
 	}
281 289
 	z.Time, err = dc.ReadInt64()
282 290
 	if err != nil {
291
+		err = msgp.WrapError(err, "Time")
283 292
 		return
284 293
 	}
285 294
 	z.Record, err = dc.ReadIntf()
286 295
 	if err != nil {
296
+		err = msgp.WrapError(err, "Record")
287 297
 		return
288 298
 	}
289
-	z.Option, err = dc.ReadIntf()
299
+	var zb0002 uint32
300
+	zb0002, err = dc.ReadMapHeader()
290 301
 	if err != nil {
302
+		err = msgp.WrapError(err, "Option")
291 303
 		return
292 304
 	}
305
+	if z.Option == nil {
306
+		z.Option = make(map[string]string, zb0002)
307
+	} else if len(z.Option) > 0 {
308
+		for key := range z.Option {
309
+			delete(z.Option, key)
310
+		}
311
+	}
312
+	for zb0002 > 0 {
313
+		zb0002--
314
+		var za0001 string
315
+		var za0002 string
316
+		za0001, err = dc.ReadString()
317
+		if err != nil {
318
+			err = msgp.WrapError(err, "Option")
319
+			return
320
+		}
321
+		za0002, err = dc.ReadString()
322
+		if err != nil {
323
+			err = msgp.WrapError(err, "Option", za0001)
324
+			return
325
+		}
326
+		z.Option[za0001] = za0002
327
+	}
293 328
 	return
294 329
 }
295 330
 
... ...
@@ -298,24 +524,40 @@ func (z *Message) EncodeMsg(en *msgp.Writer) (err error) {
298 298
 	// array header, size 4
299 299
 	err = en.Append(0x94)
300 300
 	if err != nil {
301
-		return err
301
+		return
302 302
 	}
303 303
 	err = en.WriteString(z.Tag)
304 304
 	if err != nil {
305
+		err = msgp.WrapError(err, "Tag")
305 306
 		return
306 307
 	}
307 308
 	err = en.WriteInt64(z.Time)
308 309
 	if err != nil {
310
+		err = msgp.WrapError(err, "Time")
309 311
 		return
310 312
 	}
311 313
 	err = en.WriteIntf(z.Record)
312 314
 	if err != nil {
315
+		err = msgp.WrapError(err, "Record")
313 316
 		return
314 317
 	}
315
-	err = en.WriteIntf(z.Option)
318
+	err = en.WriteMapHeader(uint32(len(z.Option)))
316 319
 	if err != nil {
320
+		err = msgp.WrapError(err, "Option")
317 321
 		return
318 322
 	}
323
+	for za0001, za0002 := range z.Option {
324
+		err = en.WriteString(za0001)
325
+		if err != nil {
326
+			err = msgp.WrapError(err, "Option")
327
+			return
328
+		}
329
+		err = en.WriteString(za0002)
330
+		if err != nil {
331
+			err = msgp.WrapError(err, "Option", za0001)
332
+			return
333
+		}
334
+	}
319 335
 	return
320 336
 }
321 337
 
... ...
@@ -328,79 +570,145 @@ func (z *Message) MarshalMsg(b []byte) (o []byte, err error) {
328 328
 	o = msgp.AppendInt64(o, z.Time)
329 329
 	o, err = msgp.AppendIntf(o, z.Record)
330 330
 	if err != nil {
331
+		err = msgp.WrapError(err, "Record")
331 332
 		return
332 333
 	}
333
-	o, err = msgp.AppendIntf(o, z.Option)
334
-	if err != nil {
335
-		return
334
+	o = msgp.AppendMapHeader(o, uint32(len(z.Option)))
335
+	for za0001, za0002 := range z.Option {
336
+		o = msgp.AppendString(o, za0001)
337
+		o = msgp.AppendString(o, za0002)
336 338
 	}
337 339
 	return
338 340
 }
339 341
 
340 342
 // UnmarshalMsg implements msgp.Unmarshaler
341 343
 func (z *Message) UnmarshalMsg(bts []byte) (o []byte, err error) {
342
-	var zdaf uint32
343
-	zdaf, bts, err = msgp.ReadArrayHeaderBytes(bts)
344
+	var zb0001 uint32
345
+	zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
344 346
 	if err != nil {
347
+		err = msgp.WrapError(err)
345 348
 		return
346 349
 	}
347
-	if zdaf != 4 {
348
-		err = msgp.ArrayError{Wanted: 4, Got: zdaf}
350
+	if zb0001 != 4 {
351
+		err = msgp.ArrayError{Wanted: 4, Got: zb0001}
349 352
 		return
350 353
 	}
351 354
 	z.Tag, bts, err = msgp.ReadStringBytes(bts)
352 355
 	if err != nil {
356
+		err = msgp.WrapError(err, "Tag")
353 357
 		return
354 358
 	}
355 359
 	z.Time, bts, err = msgp.ReadInt64Bytes(bts)
356 360
 	if err != nil {
361
+		err = msgp.WrapError(err, "Time")
357 362
 		return
358 363
 	}
359 364
 	z.Record, bts, err = msgp.ReadIntfBytes(bts)
360 365
 	if err != nil {
366
+		err = msgp.WrapError(err, "Record")
361 367
 		return
362 368
 	}
363
-	z.Option, bts, err = msgp.ReadIntfBytes(bts)
369
+	var zb0002 uint32
370
+	zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
364 371
 	if err != nil {
372
+		err = msgp.WrapError(err, "Option")
365 373
 		return
366 374
 	}
375
+	if z.Option == nil {
376
+		z.Option = make(map[string]string, zb0002)
377
+	} else if len(z.Option) > 0 {
378
+		for key := range z.Option {
379
+			delete(z.Option, key)
380
+		}
381
+	}
382
+	for zb0002 > 0 {
383
+		var za0001 string
384
+		var za0002 string
385
+		zb0002--
386
+		za0001, bts, err = msgp.ReadStringBytes(bts)
387
+		if err != nil {
388
+			err = msgp.WrapError(err, "Option")
389
+			return
390
+		}
391
+		za0002, bts, err = msgp.ReadStringBytes(bts)
392
+		if err != nil {
393
+			err = msgp.WrapError(err, "Option", za0001)
394
+			return
395
+		}
396
+		z.Option[za0001] = za0002
397
+	}
367 398
 	o = bts
368 399
 	return
369 400
 }
370 401
 
371 402
 // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
372 403
 func (z *Message) Msgsize() (s int) {
373
-	s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.Int64Size + msgp.GuessSize(z.Record) + msgp.GuessSize(z.Option)
404
+	s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.Int64Size + msgp.GuessSize(z.Record) + msgp.MapHeaderSize
405
+	if z.Option != nil {
406
+		for za0001, za0002 := range z.Option {
407
+			_ = za0002
408
+			s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002)
409
+		}
410
+	}
374 411
 	return
375 412
 }
376 413
 
377 414
 // DecodeMsg implements msgp.Decodable
378 415
 func (z *MessageExt) DecodeMsg(dc *msgp.Reader) (err error) {
379
-	var zpks uint32
380
-	zpks, err = dc.ReadArrayHeader()
416
+	var zb0001 uint32
417
+	zb0001, err = dc.ReadArrayHeader()
381 418
 	if err != nil {
419
+		err = msgp.WrapError(err)
382 420
 		return
383 421
 	}
384
-	if zpks != 4 {
385
-		err = msgp.ArrayError{Wanted: 4, Got: zpks}
422
+	if zb0001 != 4 {
423
+		err = msgp.ArrayError{Wanted: 4, Got: zb0001}
386 424
 		return
387 425
 	}
388 426
 	z.Tag, err = dc.ReadString()
389 427
 	if err != nil {
428
+		err = msgp.WrapError(err, "Tag")
390 429
 		return
391 430
 	}
392 431
 	err = dc.ReadExtension(&z.Time)
393 432
 	if err != nil {
433
+		err = msgp.WrapError(err, "Time")
394 434
 		return
395 435
 	}
396 436
 	z.Record, err = dc.ReadIntf()
397 437
 	if err != nil {
438
+		err = msgp.WrapError(err, "Record")
398 439
 		return
399 440
 	}
400
-	z.Option, err = dc.ReadIntf()
441
+	var zb0002 uint32
442
+	zb0002, err = dc.ReadMapHeader()
401 443
 	if err != nil {
444
+		err = msgp.WrapError(err, "Option")
402 445
 		return
403 446
 	}
447
+	if z.Option == nil {
448
+		z.Option = make(map[string]string, zb0002)
449
+	} else if len(z.Option) > 0 {
450
+		for key := range z.Option {
451
+			delete(z.Option, key)
452
+		}
453
+	}
454
+	for zb0002 > 0 {
455
+		zb0002--
456
+		var za0001 string
457
+		var za0002 string
458
+		za0001, err = dc.ReadString()
459
+		if err != nil {
460
+			err = msgp.WrapError(err, "Option")
461
+			return
462
+		}
463
+		za0002, err = dc.ReadString()
464
+		if err != nil {
465
+			err = msgp.WrapError(err, "Option", za0001)
466
+			return
467
+		}
468
+		z.Option[za0001] = za0002
469
+	}
404 470
 	return
405 471
 }
406 472
 
... ...
@@ -409,24 +717,40 @@ func (z *MessageExt) EncodeMsg(en *msgp.Writer) (err error) {
409 409
 	// array header, size 4
410 410
 	err = en.Append(0x94)
411 411
 	if err != nil {
412
-		return err
412
+		return
413 413
 	}
414 414
 	err = en.WriteString(z.Tag)
415 415
 	if err != nil {
416
+		err = msgp.WrapError(err, "Tag")
416 417
 		return
417 418
 	}
418 419
 	err = en.WriteExtension(&z.Time)
419 420
 	if err != nil {
421
+		err = msgp.WrapError(err, "Time")
420 422
 		return
421 423
 	}
422 424
 	err = en.WriteIntf(z.Record)
423 425
 	if err != nil {
426
+		err = msgp.WrapError(err, "Record")
424 427
 		return
425 428
 	}
426
-	err = en.WriteIntf(z.Option)
429
+	err = en.WriteMapHeader(uint32(len(z.Option)))
427 430
 	if err != nil {
431
+		err = msgp.WrapError(err, "Option")
428 432
 		return
429 433
 	}
434
+	for za0001, za0002 := range z.Option {
435
+		err = en.WriteString(za0001)
436
+		if err != nil {
437
+			err = msgp.WrapError(err, "Option")
438
+			return
439
+		}
440
+		err = en.WriteString(za0002)
441
+		if err != nil {
442
+			err = msgp.WrapError(err, "Option", za0001)
443
+			return
444
+		}
445
+	}
430 446
 	return
431 447
 }
432 448
 
... ...
@@ -438,52 +762,90 @@ func (z *MessageExt) MarshalMsg(b []byte) (o []byte, err error) {
438 438
 	o = msgp.AppendString(o, z.Tag)
439 439
 	o, err = msgp.AppendExtension(o, &z.Time)
440 440
 	if err != nil {
441
+		err = msgp.WrapError(err, "Time")
441 442
 		return
442 443
 	}
443 444
 	o, err = msgp.AppendIntf(o, z.Record)
444 445
 	if err != nil {
446
+		err = msgp.WrapError(err, "Record")
445 447
 		return
446 448
 	}
447
-	o, err = msgp.AppendIntf(o, z.Option)
448
-	if err != nil {
449
-		return
449
+	o = msgp.AppendMapHeader(o, uint32(len(z.Option)))
450
+	for za0001, za0002 := range z.Option {
451
+		o = msgp.AppendString(o, za0001)
452
+		o = msgp.AppendString(o, za0002)
450 453
 	}
451 454
 	return
452 455
 }
453 456
 
454 457
 // UnmarshalMsg implements msgp.Unmarshaler
455 458
 func (z *MessageExt) UnmarshalMsg(bts []byte) (o []byte, err error) {
456
-	var zjfb uint32
457
-	zjfb, bts, err = msgp.ReadArrayHeaderBytes(bts)
459
+	var zb0001 uint32
460
+	zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
458 461
 	if err != nil {
462
+		err = msgp.WrapError(err)
459 463
 		return
460 464
 	}
461
-	if zjfb != 4 {
462
-		err = msgp.ArrayError{Wanted: 4, Got: zjfb}
465
+	if zb0001 != 4 {
466
+		err = msgp.ArrayError{Wanted: 4, Got: zb0001}
463 467
 		return
464 468
 	}
465 469
 	z.Tag, bts, err = msgp.ReadStringBytes(bts)
466 470
 	if err != nil {
471
+		err = msgp.WrapError(err, "Tag")
467 472
 		return
468 473
 	}
469 474
 	bts, err = msgp.ReadExtensionBytes(bts, &z.Time)
470 475
 	if err != nil {
476
+		err = msgp.WrapError(err, "Time")
471 477
 		return
472 478
 	}
473 479
 	z.Record, bts, err = msgp.ReadIntfBytes(bts)
474 480
 	if err != nil {
481
+		err = msgp.WrapError(err, "Record")
475 482
 		return
476 483
 	}
477
-	z.Option, bts, err = msgp.ReadIntfBytes(bts)
484
+	var zb0002 uint32
485
+	zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
478 486
 	if err != nil {
487
+		err = msgp.WrapError(err, "Option")
479 488
 		return
480 489
 	}
490
+	if z.Option == nil {
491
+		z.Option = make(map[string]string, zb0002)
492
+	} else if len(z.Option) > 0 {
493
+		for key := range z.Option {
494
+			delete(z.Option, key)
495
+		}
496
+	}
497
+	for zb0002 > 0 {
498
+		var za0001 string
499
+		var za0002 string
500
+		zb0002--
501
+		za0001, bts, err = msgp.ReadStringBytes(bts)
502
+		if err != nil {
503
+			err = msgp.WrapError(err, "Option")
504
+			return
505
+		}
506
+		za0002, bts, err = msgp.ReadStringBytes(bts)
507
+		if err != nil {
508
+			err = msgp.WrapError(err, "Option", za0001)
509
+			return
510
+		}
511
+		z.Option[za0001] = za0002
512
+	}
481 513
 	o = bts
482 514
 	return
483 515
 }
484 516
 
485 517
 // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
486 518
 func (z *MessageExt) Msgsize() (s int) {
487
-	s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ExtensionPrefixSize + z.Time.Len() + msgp.GuessSize(z.Record) + msgp.GuessSize(z.Option)
519
+	s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ExtensionPrefixSize + z.Time.Len() + msgp.GuessSize(z.Record) + msgp.MapHeaderSize
520
+	if z.Option != nil {
521
+		for za0001, za0002 := range z.Option {
522
+			_ = za0002
523
+			s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002)
524
+		}
525
+	}
488 526
 	return
489 527
 }
... ...
@@ -1,8 +1,6 @@
1 1
 package fluent
2 2
 
3
-// NOTE: THIS FILE WAS PRODUCED BY THE
4
-// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp)
5
-// DO NOT EDIT
3
+// Code generated by github.com/tinylib/msgp DO NOT EDIT.
6 4
 
7 5
 import (
8 6
 	"github.com/tinylib/msgp/msgp"
... ...
@@ -12,31 +10,36 @@ import (
12 12
 func (z *TestMessage) DecodeMsg(dc *msgp.Reader) (err error) {
13 13
 	var field []byte
14 14
 	_ = field
15
-	var zxvk uint32
16
-	zxvk, err = dc.ReadMapHeader()
15
+	var zb0001 uint32
16
+	zb0001, err = dc.ReadMapHeader()
17 17
 	if err != nil {
18
+		err = msgp.WrapError(err)
18 19
 		return
19 20
 	}
20
-	for zxvk > 0 {
21
-		zxvk--
21
+	for zb0001 > 0 {
22
+		zb0001--
22 23
 		field, err = dc.ReadMapKeyPtr()
23 24
 		if err != nil {
25
+			err = msgp.WrapError(err)
24 26
 			return
25 27
 		}
26 28
 		switch msgp.UnsafeString(field) {
27 29
 		case "foo":
28 30
 			z.Foo, err = dc.ReadString()
29 31
 			if err != nil {
32
+				err = msgp.WrapError(err, "Foo")
30 33
 				return
31 34
 			}
32 35
 		case "hoge":
33 36
 			z.Hoge, err = dc.ReadString()
34 37
 			if err != nil {
38
+				err = msgp.WrapError(err, "Hoge")
35 39
 				return
36 40
 			}
37 41
 		default:
38 42
 			err = dc.Skip()
39 43
 			if err != nil {
44
+				err = msgp.WrapError(err)
40 45
 				return
41 46
 			}
42 47
 		}
... ...
@@ -50,19 +53,21 @@ func (z TestMessage) EncodeMsg(en *msgp.Writer) (err error) {
50 50
 	// write "foo"
51 51
 	err = en.Append(0x82, 0xa3, 0x66, 0x6f, 0x6f)
52 52
 	if err != nil {
53
-		return err
53
+		return
54 54
 	}
55 55
 	err = en.WriteString(z.Foo)
56 56
 	if err != nil {
57
+		err = msgp.WrapError(err, "Foo")
57 58
 		return
58 59
 	}
59 60
 	// write "hoge"
60 61
 	err = en.Append(0xa4, 0x68, 0x6f, 0x67, 0x65)
61 62
 	if err != nil {
62
-		return err
63
+		return
63 64
 	}
64 65
 	err = en.WriteString(z.Hoge)
65 66
 	if err != nil {
67
+		err = msgp.WrapError(err, "Hoge")
66 68
 		return
67 69
 	}
68 70
 	return
... ...
@@ -85,31 +90,36 @@ func (z TestMessage) MarshalMsg(b []byte) (o []byte, err error) {
85 85
 func (z *TestMessage) UnmarshalMsg(bts []byte) (o []byte, err error) {
86 86
 	var field []byte
87 87
 	_ = field
88
-	var zbzg uint32
89
-	zbzg, bts, err = msgp.ReadMapHeaderBytes(bts)
88
+	var zb0001 uint32
89
+	zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
90 90
 	if err != nil {
91
+		err = msgp.WrapError(err)
91 92
 		return
92 93
 	}
93
-	for zbzg > 0 {
94
-		zbzg--
94
+	for zb0001 > 0 {
95
+		zb0001--
95 96
 		field, bts, err = msgp.ReadMapKeyZC(bts)
96 97
 		if err != nil {
98
+			err = msgp.WrapError(err)
97 99
 			return
98 100
 		}
99 101
 		switch msgp.UnsafeString(field) {
100 102
 		case "foo":
101 103
 			z.Foo, bts, err = msgp.ReadStringBytes(bts)
102 104
 			if err != nil {
105
+				err = msgp.WrapError(err, "Foo")
103 106
 				return
104 107
 			}
105 108
 		case "hoge":
106 109
 			z.Hoge, bts, err = msgp.ReadStringBytes(bts)
107 110
 			if err != nil {
111
+				err = msgp.WrapError(err, "Hoge")
108 112
 				return
109 113
 			}
110 114
 		default:
111 115
 			bts, err = msgp.Skip(bts)
112 116
 			if err != nil {
117
+				err = msgp.WrapError(err)
113 118
 				return
114 119
 			}
115 120
 		}
... ...
@@ -1,3 +1,3 @@
1 1
 package fluent
2 2
 
3
-const Version = "1.3.0"
3
+const Version = "1.4.0"
4 4
deleted file mode 100644
... ...
@@ -1,15 +0,0 @@
1
-// +build appengine
2
-
3
-package msgp
4
-
5
-// let's just assume appengine
6
-// uses 64-bit hardware...
7
-const smallint = false
8
-
9
-func UnsafeString(b []byte) string {
10
-	return string(b)
11
-}
12
-
13
-func UnsafeBytes(s string) []byte {
14
-	return []byte(s)
15
-}
... ...
@@ -198,6 +198,7 @@ func resizeMap(raw []byte, delta int64) []byte {
198 198
 		if cap(raw)-len(raw) >= 2 {
199 199
 			raw = raw[0 : len(raw)+2]
200 200
 			copy(raw[5:], raw[3:])
201
+			raw[0] = mmap32
201 202
 			big.PutUint32(raw[1:], uint32(sz+delta))
202 203
 			return raw
203 204
 		}
... ...
@@ -5,6 +5,8 @@ import (
5 5
 	"reflect"
6 6
 )
7 7
 
8
+const resumableDefault = false
9
+
8 10
 var (
9 11
 	// ErrShortBytes is returned when the
10 12
 	// slice being decoded is too short to
... ...
@@ -26,84 +28,240 @@ type Error interface {
26 26
 	// Resumable returns whether
27 27
 	// or not the error means that
28 28
 	// the stream of data is malformed
29
-	// and  the information is unrecoverable.
29
+	// and the information is unrecoverable.
30 30
 	Resumable() bool
31 31
 }
32 32
 
33
+// contextError allows msgp Error instances to be enhanced with additional
34
+// context about their origin.
35
+type contextError interface {
36
+	Error
37
+
38
+	// withContext must not modify the error instance - it must clone and
39
+	// return a new error with the context added.
40
+	withContext(ctx string) error
41
+}
42
+
43
+// Cause returns the underlying cause of an error that has been wrapped
44
+// with additional context.
45
+func Cause(e error) error {
46
+	out := e
47
+	if e, ok := e.(errWrapped); ok && e.cause != nil {
48
+		out = e.cause
49
+	}
50
+	return out
51
+}
52
+
53
+// Resumable returns whether or not the error means that the stream of data is
54
+// malformed and the information is unrecoverable.
55
+func Resumable(e error) bool {
56
+	if e, ok := e.(Error); ok {
57
+		return e.Resumable()
58
+	}
59
+	return resumableDefault
60
+}
61
+
62
+// WrapError wraps an error with additional context that allows the part of the
63
+// serialized type that caused the problem to be identified. Underlying errors
64
+// can be retrieved using Cause()
65
+//
66
+// The input error is not modified - a new error should be returned.
67
+//
68
+// ErrShortBytes is not wrapped with any context due to backward compatibility
69
+// issues with the public API.
70
+//
71
+func WrapError(err error, ctx ...interface{}) error {
72
+	switch e := err.(type) {
73
+	case errShort:
74
+		return e
75
+	case contextError:
76
+		return e.withContext(ctxString(ctx))
77
+	default:
78
+		return errWrapped{cause: err, ctx: ctxString(ctx)}
79
+	}
80
+}
81
+
82
+// ctxString converts the incoming interface{} slice into a single string.
83
+func ctxString(ctx []interface{}) string {
84
+	out := ""
85
+	for idx, cv := range ctx {
86
+		if idx > 0 {
87
+			out += "/"
88
+		}
89
+		out += fmt.Sprintf("%v", cv)
90
+	}
91
+	return out
92
+}
93
+
94
+func addCtx(ctx, add string) string {
95
+	if ctx != "" {
96
+		return add + "/" + ctx
97
+	} else {
98
+		return add
99
+	}
100
+}
101
+
102
+// errWrapped allows arbitrary errors passed to WrapError to be enhanced with
103
+// context and unwrapped with Cause()
104
+type errWrapped struct {
105
+	cause error
106
+	ctx   string
107
+}
108
+
109
+func (e errWrapped) Error() string {
110
+	if e.ctx != "" {
111
+		return fmt.Sprintf("%s at %s", e.cause, e.ctx)
112
+	} else {
113
+		return e.cause.Error()
114
+	}
115
+}
116
+
117
+func (e errWrapped) Resumable() bool {
118
+	if e, ok := e.cause.(Error); ok {
119
+		return e.Resumable()
120
+	}
121
+	return resumableDefault
122
+}
123
+
33 124
 type errShort struct{}
34 125
 
35 126
 func (e errShort) Error() string   { return "msgp: too few bytes left to read object" }
36 127
 func (e errShort) Resumable() bool { return false }
37 128
 
38
-type errFatal struct{}
129
+type errFatal struct {
130
+	ctx string
131
+}
132
+
133
+func (f errFatal) Error() string {
134
+	out := "msgp: fatal decoding error (unreachable code)"
135
+	if f.ctx != "" {
136
+		out += " at " + f.ctx
137
+	}
138
+	return out
139
+}
39 140
 
40
-func (f errFatal) Error() string   { return "msgp: fatal decoding error (unreachable code)" }
41 141
 func (f errFatal) Resumable() bool { return false }
42 142
 
143
+func (f errFatal) withContext(ctx string) error { f.ctx = addCtx(f.ctx, ctx); return f }
144
+
43 145
 // ArrayError is an error returned
44 146
 // when decoding a fix-sized array
45 147
 // of the wrong size
46 148
 type ArrayError struct {
47 149
 	Wanted uint32
48 150
 	Got    uint32
151
+	ctx    string
49 152
 }
50 153
 
51 154
 // Error implements the error interface
52 155
 func (a ArrayError) Error() string {
53
-	return fmt.Sprintf("msgp: wanted array of size %d; got %d", a.Wanted, a.Got)
156
+	out := fmt.Sprintf("msgp: wanted array of size %d; got %d", a.Wanted, a.Got)
157
+	if a.ctx != "" {
158
+		out += " at " + a.ctx
159
+	}
160
+	return out
54 161
 }
55 162
 
56 163
 // Resumable is always 'true' for ArrayErrors
57 164
 func (a ArrayError) Resumable() bool { return true }
58 165
 
166
+func (a ArrayError) withContext(ctx string) error { a.ctx = addCtx(a.ctx, ctx); return a }
167
+
59 168
 // IntOverflow is returned when a call
60 169
 // would downcast an integer to a type
61 170
 // with too few bits to hold its value.
62 171
 type IntOverflow struct {
63 172
 	Value         int64 // the value of the integer
64 173
 	FailedBitsize int   // the bit size that the int64 could not fit into
174
+	ctx           string
65 175
 }
66 176
 
67 177
 // Error implements the error interface
68 178
 func (i IntOverflow) Error() string {
69
-	return fmt.Sprintf("msgp: %d overflows int%d", i.Value, i.FailedBitsize)
179
+	str := fmt.Sprintf("msgp: %d overflows int%d", i.Value, i.FailedBitsize)
180
+	if i.ctx != "" {
181
+		str += " at " + i.ctx
182
+	}
183
+	return str
70 184
 }
71 185
 
72 186
 // Resumable is always 'true' for overflows
73 187
 func (i IntOverflow) Resumable() bool { return true }
74 188
 
189
+func (i IntOverflow) withContext(ctx string) error { i.ctx = addCtx(i.ctx, ctx); return i }
190
+
75 191
 // UintOverflow is returned when a call
76 192
 // would downcast an unsigned integer to a type
77 193
 // with too few bits to hold its value
78 194
 type UintOverflow struct {
79 195
 	Value         uint64 // value of the uint
80 196
 	FailedBitsize int    // the bit size that couldn't fit the value
197
+	ctx           string
81 198
 }
82 199
 
83 200
 // Error implements the error interface
84 201
 func (u UintOverflow) Error() string {
85
-	return fmt.Sprintf("msgp: %d overflows uint%d", u.Value, u.FailedBitsize)
202
+	str := fmt.Sprintf("msgp: %d overflows uint%d", u.Value, u.FailedBitsize)
203
+	if u.ctx != "" {
204
+		str += " at " + u.ctx
205
+	}
206
+	return str
86 207
 }
87 208
 
88 209
 // Resumable is always 'true' for overflows
89 210
 func (u UintOverflow) Resumable() bool { return true }
90 211
 
212
+func (u UintOverflow) withContext(ctx string) error { u.ctx = addCtx(u.ctx, ctx); return u }
213
+
214
+// UintBelowZero is returned when a call
215
+// would cast a signed integer below zero
216
+// to an unsigned integer.
217
+type UintBelowZero struct {
218
+	Value int64 // value of the incoming int
219
+	ctx   string
220
+}
221
+
222
+// Error implements the error interface
223
+func (u UintBelowZero) Error() string {
224
+	str := fmt.Sprintf("msgp: attempted to cast int %d to unsigned", u.Value)
225
+	if u.ctx != "" {
226
+		str += " at " + u.ctx
227
+	}
228
+	return str
229
+}
230
+
231
+// Resumable is always 'true' for overflows
232
+func (u UintBelowZero) Resumable() bool { return true }
233
+
234
+func (u UintBelowZero) withContext(ctx string) error {
235
+	u.ctx = ctx
236
+	return u
237
+}
238
+
91 239
 // A TypeError is returned when a particular
92 240
 // decoding method is unsuitable for decoding
93 241
 // a particular MessagePack value.
94 242
 type TypeError struct {
95 243
 	Method  Type // Type expected by method
96 244
 	Encoded Type // Type actually encoded
245
+
246
+	ctx string
97 247
 }
98 248
 
99 249
 // Error implements the error interface
100 250
 func (t TypeError) Error() string {
101
-	return fmt.Sprintf("msgp: attempted to decode type %q with method for %q", t.Encoded, t.Method)
251
+	out := fmt.Sprintf("msgp: attempted to decode type %q with method for %q", t.Encoded, t.Method)
252
+	if t.ctx != "" {
253
+		out += " at " + t.ctx
254
+	}
255
+	return out
102 256
 }
103 257
 
104 258
 // Resumable returns 'true' for TypeErrors
105 259
 func (t TypeError) Resumable() bool { return true }
106 260
 
261
+func (t TypeError) withContext(ctx string) error { t.ctx = addCtx(t.ctx, ctx); return t }
262
+
107 263
 // returns either InvalidPrefixError or
108 264
 // TypeError depending on whether or not
109 265
 // the prefix is recognized
... ...
@@ -133,10 +291,24 @@ func (i InvalidPrefixError) Resumable() bool { return false }
133 133
 // to a function that takes `interface{}`.
134 134
 type ErrUnsupportedType struct {
135 135
 	T reflect.Type
136
+
137
+	ctx string
136 138
 }
137 139
 
138 140
 // Error implements error
139
-func (e *ErrUnsupportedType) Error() string { return fmt.Sprintf("msgp: type %q not supported", e.T) }
141
+func (e *ErrUnsupportedType) Error() string {
142
+	out := fmt.Sprintf("msgp: type %q not supported", e.T)
143
+	if e.ctx != "" {
144
+		out += " at " + e.ctx
145
+	}
146
+	return out
147
+}
140 148
 
141 149
 // Resumable returns 'true' for ErrUnsupportedType
142 150
 func (e *ErrUnsupportedType) Resumable() bool { return true }
151
+
152
+func (e *ErrUnsupportedType) withContext(ctx string) error {
153
+	o := *e
154
+	o.ctx = addCtx(o.ctx, ctx)
155
+	return &o
156
+}
... ...
@@ -445,26 +445,27 @@ func AppendExtension(b []byte, e Extension) ([]byte, error) {
445 445
 		o[n] = mfixext16
446 446
 		o[n+1] = byte(e.ExtensionType())
447 447
 		n += 2
448
-	}
449
-	switch {
450
-	case l < math.MaxUint8:
451
-		o, n = ensure(b, l+3)
452
-		o[n] = mext8
453
-		o[n+1] = byte(uint8(l))
454
-		o[n+2] = byte(e.ExtensionType())
455
-		n += 3
456
-	case l < math.MaxUint16:
457
-		o, n = ensure(b, l+4)
458
-		o[n] = mext16
459
-		big.PutUint16(o[n+1:], uint16(l))
460
-		o[n+3] = byte(e.ExtensionType())
461
-		n += 4
462 448
 	default:
463
-		o, n = ensure(b, l+6)
464
-		o[n] = mext32
465
-		big.PutUint32(o[n+1:], uint32(l))
466
-		o[n+5] = byte(e.ExtensionType())
467
-		n += 6
449
+		switch {
450
+		case l < math.MaxUint8:
451
+			o, n = ensure(b, l+3)
452
+			o[n] = mext8
453
+			o[n+1] = byte(uint8(l))
454
+			o[n+2] = byte(e.ExtensionType())
455
+			n += 3
456
+		case l < math.MaxUint16:
457
+			o, n = ensure(b, l+4)
458
+			o[n] = mext16
459
+			big.PutUint16(o[n+1:], uint16(l))
460
+			o[n+3] = byte(e.ExtensionType())
461
+			n += 4
462
+		default:
463
+			o, n = ensure(b, l+6)
464
+			o[n] = mext32
465
+			big.PutUint32(o[n+1:], uint32(l))
466
+			o[n+5] = byte(e.ExtensionType())
467
+			n += 6
468
+		}
468 469
 	}
469 470
 	return o, e.MarshalBinaryTo(o[n:])
470 471
 }
471 472
new file mode 100644
... ...
@@ -0,0 +1,15 @@
0
+// +build purego appengine
1
+
2
+package msgp
3
+
4
+// let's just assume appengine
5
+// uses 64-bit hardware...
6
+const smallint = false
7
+
8
+func UnsafeString(b []byte) string {
9
+	return string(b)
10
+}
11
+
12
+func UnsafeBytes(s string) []byte {
13
+	return []byte(s)
14
+}
... ...
@@ -583,6 +583,14 @@ func (m *Reader) ReadInt64() (i int64, err error) {
583 583
 		i = int64(getMint8(p))
584 584
 		return
585 585
 
586
+	case muint8:
587
+		p, err = m.R.Next(2)
588
+		if err != nil {
589
+			return
590
+		}
591
+		i = int64(getMuint8(p))
592
+		return
593
+
586 594
 	case mint16:
587 595
 		p, err = m.R.Next(3)
588 596
 		if err != nil {
... ...
@@ -591,6 +599,14 @@ func (m *Reader) ReadInt64() (i int64, err error) {
591 591
 		i = int64(getMint16(p))
592 592
 		return
593 593
 
594
+	case muint16:
595
+		p, err = m.R.Next(3)
596
+		if err != nil {
597
+			return
598
+		}
599
+		i = int64(getMuint16(p))
600
+		return
601
+
594 602
 	case mint32:
595 603
 		p, err = m.R.Next(5)
596 604
 		if err != nil {
... ...
@@ -599,6 +615,14 @@ func (m *Reader) ReadInt64() (i int64, err error) {
599 599
 		i = int64(getMint32(p))
600 600
 		return
601 601
 
602
+	case muint32:
603
+		p, err = m.R.Next(5)
604
+		if err != nil {
605
+			return
606
+		}
607
+		i = int64(getMuint32(p))
608
+		return
609
+
602 610
 	case mint64:
603 611
 		p, err = m.R.Next(9)
604 612
 		if err != nil {
... ...
@@ -607,6 +631,19 @@ func (m *Reader) ReadInt64() (i int64, err error) {
607 607
 		i = getMint64(p)
608 608
 		return
609 609
 
610
+	case muint64:
611
+		p, err = m.R.Next(9)
612
+		if err != nil {
613
+			return
614
+		}
615
+		u := getMuint64(p)
616
+		if u > math.MaxInt64 {
617
+			err = UintOverflow{Value: u, FailedBitsize: 64}
618
+			return
619
+		}
620
+		i = int64(u)
621
+		return
622
+
610 623
 	default:
611 624
 		err = badPrefix(IntType, lead)
612 625
 		return
... ...
@@ -678,6 +715,19 @@ func (m *Reader) ReadUint64() (u uint64, err error) {
678 678
 		return
679 679
 	}
680 680
 	switch lead {
681
+	case mint8:
682
+		p, err = m.R.Next(2)
683
+		if err != nil {
684
+			return
685
+		}
686
+		v := int64(getMint8(p))
687
+		if v < 0 {
688
+			err = UintBelowZero{Value: v}
689
+			return
690
+		}
691
+		u = uint64(v)
692
+		return
693
+
681 694
 	case muint8:
682 695
 		p, err = m.R.Next(2)
683 696
 		if err != nil {
... ...
@@ -686,6 +736,19 @@ func (m *Reader) ReadUint64() (u uint64, err error) {
686 686
 		u = uint64(getMuint8(p))
687 687
 		return
688 688
 
689
+	case mint16:
690
+		p, err = m.R.Next(3)
691
+		if err != nil {
692
+			return
693
+		}
694
+		v := int64(getMint16(p))
695
+		if v < 0 {
696
+			err = UintBelowZero{Value: v}
697
+			return
698
+		}
699
+		u = uint64(v)
700
+		return
701
+
689 702
 	case muint16:
690 703
 		p, err = m.R.Next(3)
691 704
 		if err != nil {
... ...
@@ -694,6 +757,19 @@ func (m *Reader) ReadUint64() (u uint64, err error) {
694 694
 		u = uint64(getMuint16(p))
695 695
 		return
696 696
 
697
+	case mint32:
698
+		p, err = m.R.Next(5)
699
+		if err != nil {
700
+			return
701
+		}
702
+		v := int64(getMint32(p))
703
+		if v < 0 {
704
+			err = UintBelowZero{Value: v}
705
+			return
706
+		}
707
+		u = uint64(v)
708
+		return
709
+
697 710
 	case muint32:
698 711
 		p, err = m.R.Next(5)
699 712
 		if err != nil {
... ...
@@ -702,6 +778,19 @@ func (m *Reader) ReadUint64() (u uint64, err error) {
702 702
 		u = uint64(getMuint32(p))
703 703
 		return
704 704
 
705
+	case mint64:
706
+		p, err = m.R.Next(9)
707
+		if err != nil {
708
+			return
709
+		}
710
+		v := int64(getMint64(p))
711
+		if v < 0 {
712
+			err = UintBelowZero{Value: v}
713
+			return
714
+		}
715
+		u = uint64(v)
716
+		return
717
+
705 718
 	case muint64:
706 719
 		p, err = m.R.Next(9)
707 720
 		if err != nil {
... ...
@@ -711,7 +800,11 @@ func (m *Reader) ReadUint64() (u uint64, err error) {
711 711
 		return
712 712
 
713 713
 	default:
714
-		err = badPrefix(UintType, lead)
714
+		if isnfixint(lead) {
715
+			err = UintBelowZero{Value: int64(rnfixint(lead))}
716
+		} else {
717
+			err = badPrefix(UintType, lead)
718
+		}
715 719
 		return
716 720
 
717 721
 	}
... ...
@@ -79,6 +79,9 @@ func (r *Raw) UnmarshalMsg(b []byte) ([]byte, error) {
79 79
 		return b, err
80 80
 	}
81 81
 	rlen := l - len(out)
82
+	if IsNil(b[:rlen]) {
83
+		rlen = 0
84
+	}
82 85
 	if cap(*r) < rlen {
83 86
 		*r = make(Raw, rlen)
84 87
 	} else {
... ...
@@ -104,7 +107,11 @@ func (r Raw) EncodeMsg(w *Writer) error {
104 104
 // next object on the wire.
105 105
 func (r *Raw) DecodeMsg(f *Reader) error {
106 106
 	*r = (*r)[:0]
107
-	return appendNext(f, (*[]byte)(r))
107
+	err := appendNext(f, (*[]byte)(r))
108
+	if IsNil(*r) {
109
+		*r = (*r)[:0]
110
+	}
111
+	return err
108 112
 }
109 113
 
110 114
 // Msgsize implements msgp.Sizer
... ...
@@ -368,6 +375,15 @@ func ReadInt64Bytes(b []byte) (i int64, o []byte, err error) {
368 368
 		o = b[2:]
369 369
 		return
370 370
 
371
+	case muint8:
372
+		if l < 2 {
373
+			err = ErrShortBytes
374
+			return
375
+		}
376
+		i = int64(getMuint8(b))
377
+		o = b[2:]
378
+		return
379
+
371 380
 	case mint16:
372 381
 		if l < 3 {
373 382
 			err = ErrShortBytes
... ...
@@ -377,6 +393,15 @@ func ReadInt64Bytes(b []byte) (i int64, o []byte, err error) {
377 377
 		o = b[3:]
378 378
 		return
379 379
 
380
+	case muint16:
381
+		if l < 3 {
382
+			err = ErrShortBytes
383
+			return
384
+		}
385
+		i = int64(getMuint16(b))
386
+		o = b[3:]
387
+		return
388
+
380 389
 	case mint32:
381 390
 		if l < 5 {
382 391
 			err = ErrShortBytes
... ...
@@ -386,12 +411,35 @@ func ReadInt64Bytes(b []byte) (i int64, o []byte, err error) {
386 386
 		o = b[5:]
387 387
 		return
388 388
 
389
+	case muint32:
390
+		if l < 5 {
391
+			err = ErrShortBytes
392
+			return
393
+		}
394
+		i = int64(getMuint32(b))
395
+		o = b[5:]
396
+		return
397
+
389 398
 	case mint64:
390 399
 		if l < 9 {
391 400
 			err = ErrShortBytes
392 401
 			return
393 402
 		}
394
-		i = getMint64(b)
403
+		i = int64(getMint64(b))
404
+		o = b[9:]
405
+		return
406
+
407
+	case muint64:
408
+		if l < 9 {
409
+			err = ErrShortBytes
410
+			return
411
+		}
412
+		u := getMuint64(b)
413
+		if u > math.MaxInt64 {
414
+			err = UintOverflow{Value: u, FailedBitsize: 64}
415
+			return
416
+		}
417
+		i = int64(u)
395 418
 		o = b[9:]
396 419
 		return
397 420
 
... ...
@@ -477,6 +525,20 @@ func ReadUint64Bytes(b []byte) (u uint64, o []byte, err error) {
477 477
 	}
478 478
 
479 479
 	switch lead {
480
+	case mint8:
481
+		if l < 2 {
482
+			err = ErrShortBytes
483
+			return
484
+		}
485
+		v := int64(getMint8(b))
486
+		if v < 0 {
487
+			err = UintBelowZero{Value: v}
488
+			return
489
+		}
490
+		u = uint64(v)
491
+		o = b[2:]
492
+		return
493
+
480 494
 	case muint8:
481 495
 		if l < 2 {
482 496
 			err = ErrShortBytes
... ...
@@ -486,6 +548,20 @@ func ReadUint64Bytes(b []byte) (u uint64, o []byte, err error) {
486 486
 		o = b[2:]
487 487
 		return
488 488
 
489
+	case mint16:
490
+		if l < 3 {
491
+			err = ErrShortBytes
492
+			return
493
+		}
494
+		v := int64(getMint16(b))
495
+		if v < 0 {
496
+			err = UintBelowZero{Value: v}
497
+			return
498
+		}
499
+		u = uint64(v)
500
+		o = b[3:]
501
+		return
502
+
489 503
 	case muint16:
490 504
 		if l < 3 {
491 505
 			err = ErrShortBytes
... ...
@@ -495,6 +571,20 @@ func ReadUint64Bytes(b []byte) (u uint64, o []byte, err error) {
495 495
 		o = b[3:]
496 496
 		return
497 497
 
498
+	case mint32:
499
+		if l < 5 {
500
+			err = ErrShortBytes
501
+			return
502
+		}
503
+		v := int64(getMint32(b))
504
+		if v < 0 {
505
+			err = UintBelowZero{Value: v}
506
+			return
507
+		}
508
+		u = uint64(v)
509
+		o = b[5:]
510
+		return
511
+
498 512
 	case muint32:
499 513
 		if l < 5 {
500 514
 			err = ErrShortBytes
... ...
@@ -504,6 +594,20 @@ func ReadUint64Bytes(b []byte) (u uint64, o []byte, err error) {
504 504
 		o = b[5:]
505 505
 		return
506 506
 
507
+	case mint64:
508
+		if l < 9 {
509
+			err = ErrShortBytes
510
+			return
511
+		}
512
+		v := int64(getMint64(b))
513
+		if v < 0 {
514
+			err = UintBelowZero{Value: v}
515
+			return
516
+		}
517
+		u = uint64(v)
518
+		o = b[9:]
519
+		return
520
+
507 521
 	case muint64:
508 522
 		if l < 9 {
509 523
 			err = ErrShortBytes
... ...
@@ -514,7 +618,11 @@ func ReadUint64Bytes(b []byte) (u uint64, o []byte, err error) {
514 514
 		return
515 515
 
516 516
 	default:
517
-		err = badPrefix(UintType, lead)
517
+		if isnfixint(lead) {
518
+			err = UintBelowZero{Value: int64(rnfixint(lead))}
519
+		} else {
520
+			err = badPrefix(UintType, lead)
521
+		}
518 522
 		return
519 523
 	}
520 524
 }
... ...
@@ -1,4 +1,4 @@
1
-// +build !appengine
1
+// +build !purego,!appengine
2 2
 
3 3
 package msgp
4 4
 
... ...
@@ -685,7 +685,7 @@ func (mw *Writer) WriteIntf(v interface{}) error {
685 685
 	case reflect.Map:
686 686
 		return mw.writeMap(val)
687 687
 	}
688
-	return &ErrUnsupportedType{val.Type()}
688
+	return &ErrUnsupportedType{T: val.Type()}
689 689
 }
690 690
 
691 691
 func (mw *Writer) writeMap(v reflect.Value) (err error) {