Browse code

Update containerd dependencies to match 1.0.2

- https://github.com/containerd/go-runc/compare/ed1cbe1fc31f5fb2359d3a54b6330d1a097858b7...4f6e87ae043f859a38255247b49c9abc262d002f
- https://github.com/containerd/cgroups/compare/29da22c6171a4316169f9205ab6c49f59b5b852f...c0710c92e8b3a44681d1321dcfd1360fc5c6c089
- runc (already ahead)
- https://github.com/stevvooe/ttrpc/compare/76e68349ad9ab4d03d764c713826d31216715e4f...d4528379866b0ce7e9d71f3eb96f0582fc374577

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>

Sebastiaan van Stijn authored on 2018/02/14 10:03:03
Showing 8 changed files
... ...
@@ -107,12 +107,12 @@ google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
107 107
 github.com/containerd/containerd 3fa104f843ec92328912e042b767d26825f202aa
108 108
 github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
109 109
 github.com/containerd/continuity 992a5f112bd2211d0983a1cc8562d2882848f3a3
110
-github.com/containerd/cgroups 29da22c6171a4316169f9205ab6c49f59b5b852f
110
+github.com/containerd/cgroups c0710c92e8b3a44681d1321dcfd1360fc5c6c089
111 111
 github.com/containerd/console 84eeaae905fa414d03e07bcd6c8d3f19e7cf180e
112
-github.com/containerd/go-runc ed1cbe1fc31f5fb2359d3a54b6330d1a097858b7
112
+github.com/containerd/go-runc 4f6e87ae043f859a38255247b49c9abc262d002f
113 113
 github.com/containerd/typeurl f6943554a7e7e88b3c14aad190bf05932da84788
114 114
 github.com/dmcgowan/go-tar go1.10
115
-github.com/stevvooe/ttrpc 76e68349ad9ab4d03d764c713826d31216715e4f
115
+github.com/stevvooe/ttrpc d4528379866b0ce7e9d71f3eb96f0582fc374577
116 116
 
117 117
 # cluster
118 118
 github.com/docker/swarmkit 68a376dc30d8c4001767c39456b990dbd821371b
... ...
@@ -3,12 +3,12 @@ package cgroups
3 3
 import (
4 4
 	"bufio"
5 5
 	"fmt"
6
+	"io"
6 7
 	"io/ioutil"
7 8
 	"os"
8 9
 	"path/filepath"
9 10
 	"strconv"
10 11
 	"strings"
11
-	"syscall"
12 12
 
13 13
 	specs "github.com/opencontainers/runtime-spec/specs-go"
14 14
 )
... ...
@@ -105,8 +105,13 @@ func (b *blkioController) Stat(path string, stats *Metrics) error {
105 105
 			},
106 106
 		)
107 107
 	}
108
+	f, err := os.Open("/proc/diskstats")
109
+	if err != nil {
110
+		return err
111
+	}
112
+	defer f.Close()
108 113
 
109
-	devices, err := getDevices("/dev")
114
+	devices, err := getDevices(f)
110 115
 	if err != nil {
111 116
 		return err
112 117
 	}
... ...
@@ -268,50 +273,32 @@ type deviceKey struct {
268 268
 // getDevices makes a best effort attempt to read all the devices into a map
269 269
 // keyed by major and minor number. Since devices may be mapped multiple times,
270 270
 // we err on taking the first occurrence.
271
-func getDevices(path string) (map[deviceKey]string, error) {
272
-	// TODO(stevvooe): We are ignoring lots of errors. It might be kind of
273
-	// challenging to debug this if we aren't mapping devices correctly.
274
-	// Consider logging these errors.
275
-	devices := map[deviceKey]string{}
276
-	if err := filepath.Walk(path, func(p string, fi os.FileInfo, err error) error {
271
+func getDevices(r io.Reader) (map[deviceKey]string, error) {
272
+
273
+	var (
274
+		s       = bufio.NewScanner(r)
275
+		devices = make(map[deviceKey]string)
276
+	)
277
+	for s.Scan() {
278
+		fields := strings.Fields(s.Text())
279
+		major, err := strconv.Atoi(fields[0])
277 280
 		if err != nil {
278
-			return err
281
+			return nil, err
279 282
 		}
280
-		switch {
281
-		case fi.IsDir():
282
-			switch fi.Name() {
283
-			case "pts", "shm", "fd", "mqueue", ".lxc", ".lxd-mounts":
284
-				return filepath.SkipDir
285
-			default:
286
-				return nil
287
-			}
288
-		case fi.Name() == "console":
289
-			return nil
290
-		default:
291
-			if fi.Mode()&os.ModeDevice == 0 {
292
-				// skip non-devices
293
-				return nil
294
-			}
295
-
296
-			st, ok := fi.Sys().(*syscall.Stat_t)
297
-			if !ok {
298
-				return fmt.Errorf("%s: unable to convert to system stat", p)
299
-			}
300
-
301
-			key := deviceKey{major(st.Rdev), minor(st.Rdev)}
302
-			if _, ok := devices[key]; ok {
303
-				return nil // skip it if we have already populated the path.
304
-			}
305
-
306
-			devices[key] = p
283
+		minor, err := strconv.Atoi(fields[1])
284
+		if err != nil {
285
+			return nil, err
307 286
 		}
308
-
309
-		return nil
310
-	}); err != nil {
311
-		return nil, err
287
+		key := deviceKey{
288
+			major: uint64(major),
289
+			minor: uint64(minor),
290
+		}
291
+		if _, ok := devices[key]; ok {
292
+			continue
293
+		}
294
+		devices[key] = filepath.Join("/dev", fields[2])
312 295
 	}
313
-
314
-	return devices, nil
296
+	return devices, s.Err()
315 297
 }
316 298
 
317 299
 func major(devNumber uint64) uint64 {
... ...
@@ -12,7 +12,7 @@ var (
12 12
 	ErrFreezerNotSupported      = errors.New("cgroups: freezer cgroup not supported on this system")
13 13
 	ErrMemoryNotSupported       = errors.New("cgroups: memory cgroup not supported on this system")
14 14
 	ErrCgroupDeleted            = errors.New("cgroups: cgroup deleted")
15
-	ErrNoCgroupMountDestination = errors.New("cgroups: cannot found cgroup mount destination")
15
+	ErrNoCgroupMountDestination = errors.New("cgroups: cannot find cgroup mount destination")
16 16
 )
17 17
 
18 18
 // ErrorHandler is a function that handles and acts on errors
... ...
@@ -1,7 +1,6 @@
1 1
 package runc
2 2
 
3 3
 import (
4
-	"bytes"
5 4
 	"context"
6 5
 	"encoding/json"
7 6
 	"errors"
... ...
@@ -532,7 +531,9 @@ func (r *Runc) Restore(context context.Context, id, bundle string, opts *Restore
532 532
 
533 533
 // Update updates the current container with the provided resource spec
534 534
 func (r *Runc) Update(context context.Context, id string, resources *specs.LinuxResources) error {
535
-	buf := bytes.NewBuffer(nil)
535
+	buf := getBuf()
536
+	defer putBuf(buf)
537
+
536 538
 	if err := json.NewEncoder(buf).Encode(resources); err != nil {
537 539
 		return err
538 540
 	}
... ...
@@ -638,11 +639,12 @@ func (r *Runc) runOrError(cmd *exec.Cmd) error {
638 638
 }
639 639
 
640 640
 func cmdOutput(cmd *exec.Cmd, combined bool) ([]byte, error) {
641
-	var b bytes.Buffer
641
+	b := getBuf()
642
+	defer putBuf(b)
642 643
 
643
-	cmd.Stdout = &b
644
+	cmd.Stdout = b
644 645
 	if combined {
645
-		cmd.Stderr = &b
646
+		cmd.Stderr = b
646 647
 	}
647 648
 	ec, err := Monitor.Start(cmd)
648 649
 	if err != nil {
... ...
@@ -1,8 +1,10 @@
1 1
 package runc
2 2
 
3 3
 import (
4
+	"bytes"
4 5
 	"io/ioutil"
5 6
 	"strconv"
7
+	"sync"
6 8
 	"syscall"
7 9
 )
8 10
 
... ...
@@ -26,3 +28,18 @@ func exitStatus(status syscall.WaitStatus) int {
26 26
 	}
27 27
 	return status.ExitStatus()
28 28
 }
29
+
30
+var bytesBufferPool = sync.Pool{
31
+	New: func() interface{} {
32
+		return bytes.NewBuffer(nil)
33
+	},
34
+}
35
+
36
+func getBuf() *bytes.Buffer {
37
+	return bytesBufferPool.Get().(*bytes.Buffer)
38
+}
39
+
40
+func putBuf(b *bytes.Buffer) {
41
+	b.Reset()
42
+	bytesBufferPool.Put(b)
43
+}
... ...
@@ -5,6 +5,7 @@ import (
5 5
 	"context"
6 6
 	"encoding/binary"
7 7
 	"io"
8
+	"net"
8 9
 	"sync"
9 10
 
10 11
 	"github.com/pkg/errors"
... ...
@@ -60,16 +61,18 @@ func writeMessageHeader(w io.Writer, p []byte, mh messageHeader) error {
60 60
 var buffers sync.Pool
61 61
 
62 62
 type channel struct {
63
+	conn  net.Conn
63 64
 	bw    *bufio.Writer
64 65
 	br    *bufio.Reader
65 66
 	hrbuf [messageHeaderLength]byte // avoid alloc when reading header
66 67
 	hwbuf [messageHeaderLength]byte
67 68
 }
68 69
 
69
-func newChannel(w io.Writer, r io.Reader) *channel {
70
+func newChannel(conn net.Conn) *channel {
70 71
 	return &channel{
71
-		bw: bufio.NewWriter(w),
72
-		br: bufio.NewReader(r),
72
+		conn: conn,
73
+		bw:   bufio.NewWriter(conn),
74
+		br:   bufio.NewReader(conn),
73 75
 	}
74 76
 }
75 77
 
... ...
@@ -2,8 +2,12 @@ package ttrpc
2 2
 
3 3
 import (
4 4
 	"context"
5
+	"io"
5 6
 	"net"
7
+	"os"
8
+	"strings"
6 9
 	"sync"
10
+	"syscall"
7 11
 
8 12
 	"github.com/containerd/containerd/log"
9 13
 	"github.com/gogo/protobuf/proto"
... ...
@@ -11,6 +15,10 @@ import (
11 11
 	"google.golang.org/grpc/status"
12 12
 )
13 13
 
14
+// ErrClosed is returned by client methods when the underlying connection is
15
+// closed.
16
+var ErrClosed = errors.New("ttrpc: closed")
17
+
14 18
 type Client struct {
15 19
 	codec   codec
16 20
 	conn    net.Conn
... ...
@@ -19,18 +27,20 @@ type Client struct {
19 19
 
20 20
 	closed    chan struct{}
21 21
 	closeOnce sync.Once
22
+	closeFunc func()
22 23
 	done      chan struct{}
23 24
 	err       error
24 25
 }
25 26
 
26 27
 func NewClient(conn net.Conn) *Client {
27 28
 	c := &Client{
28
-		codec:   codec{},
29
-		conn:    conn,
30
-		channel: newChannel(conn, conn),
31
-		calls:   make(chan *callRequest),
32
-		closed:  make(chan struct{}),
33
-		done:    make(chan struct{}),
29
+		codec:     codec{},
30
+		conn:      conn,
31
+		channel:   newChannel(conn),
32
+		calls:     make(chan *callRequest),
33
+		closed:    make(chan struct{}),
34
+		done:      make(chan struct{}),
35
+		closeFunc: func() {},
34 36
 	}
35 37
 
36 38
 	go c.run()
... ...
@@ -91,7 +101,7 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err
91 91
 
92 92
 	select {
93 93
 	case err := <-errs:
94
-		return err
94
+		return filterCloseErr(err)
95 95
 	case <-c.done:
96 96
 		return c.err
97 97
 	}
... ...
@@ -105,6 +115,11 @@ func (c *Client) Close() error {
105 105
 	return nil
106 106
 }
107 107
 
108
+// OnClose allows a close func to be called when the server is closed
109
+func (c *Client) OnClose(closer func()) {
110
+	c.closeFunc = closer
111
+}
112
+
108 113
 type message struct {
109 114
 	messageHeader
110 115
 	p   []byte
... ...
@@ -150,6 +165,7 @@ func (c *Client) run() {
150 150
 
151 151
 	defer c.conn.Close()
152 152
 	defer close(c.done)
153
+	defer c.closeFunc()
153 154
 
154 155
 	for {
155 156
 		select {
... ...
@@ -171,7 +187,14 @@ func (c *Client) run() {
171 171
 			call.errs <- c.recv(call.resp, msg)
172 172
 			delete(waiters, msg.StreamID)
173 173
 		case <-shutdown:
174
+			if shutdownErr != nil {
175
+				shutdownErr = filterCloseErr(shutdownErr)
176
+			} else {
177
+				shutdownErr = ErrClosed
178
+			}
179
+
174 180
 			shutdownErr = errors.Wrapf(shutdownErr, "ttrpc: client shutting down")
181
+
175 182
 			c.err = shutdownErr
176 183
 			for _, waiter := range waiters {
177 184
 				waiter.errs <- shutdownErr
... ...
@@ -179,9 +202,12 @@ func (c *Client) run() {
179 179
 			c.Close()
180 180
 			return
181 181
 		case <-c.closed:
182
+			if c.err == nil {
183
+				c.err = ErrClosed
184
+			}
182 185
 			// broadcast the shutdown error to the remaining waiters.
183 186
 			for _, waiter := range waiters {
184
-				waiter.errs <- shutdownErr
187
+				waiter.errs <- c.err
185 188
 			}
186 189
 			return
187 190
 		}
... ...
@@ -209,3 +235,30 @@ func (c *Client) recv(resp *Response, msg *message) error {
209 209
 	defer c.channel.putmbuf(msg.p)
210 210
 	return proto.Unmarshal(msg.p, resp)
211 211
 }
212
+
213
+// filterCloseErr rewrites EOF and EPIPE errors to ErrClosed. Use when
214
+// returning from call or handling errors from main read loop.
215
+//
216
+// This purposely ignores errors with a wrapped cause.
217
+func filterCloseErr(err error) error {
218
+	if err == nil {
219
+		return nil
220
+	}
221
+
222
+	if err == io.EOF {
223
+		return ErrClosed
224
+	}
225
+
226
+	if strings.Contains(err.Error(), "use of closed network connection") {
227
+		return ErrClosed
228
+	}
229
+
230
+	// if we have an epipe on a write, we cast to errclosed
231
+	if oerr, ok := err.(*net.OpError); ok && oerr.Op == "write" {
232
+		if serr, ok := oerr.Err.(*os.SyscallError); ok && serr.Err == syscall.EPIPE {
233
+			return ErrClosed
234
+		}
235
+	}
236
+
237
+	return err
238
+}
... ...
@@ -16,7 +16,7 @@ import (
16 16
 )
17 17
 
18 18
 var (
19
-	ErrServerClosed = errors.New("ttrpc: server close")
19
+	ErrServerClosed = errors.New("ttrpc: server closed")
20 20
 )
21 21
 
22 22
 type Server struct {
... ...
@@ -281,7 +281,7 @@ func (c *serverConn) run(sctx context.Context) {
281 281
 	)
282 282
 
283 283
 	var (
284
-		ch          = newChannel(c.conn, c.conn)
284
+		ch          = newChannel(c.conn)
285 285
 		ctx, cancel = context.WithCancel(sctx)
286 286
 		active      int
287 287
 		state       connState = connStateIdle