Browse code

vendor: github.com/containerd/ttrpc v1.0.2

full diff: https://github.com/containerd/ttrpc/compare/v1.0.1...v1.0.2

- fix bug, failed to assert net error due to error wrap
- fixes: ttrpc client receive "read: connection reset by peer: unknown"
- client: add UserOnCloseWait function
- travis: add go 1.15

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>

Sebastiaan van Stijn authored on 2020/11/18 18:51:09
Showing 3 changed files
... ...
@@ -137,7 +137,7 @@ github.com/containerd/cgroups                       318312a373405e5e91134d8063d0
137 137
 github.com/containerd/console                       5d7e1412f07b502a01029ea20e20e0d2be31fa7c # v1.0.1
138 138
 github.com/containerd/go-runc                       16b287bc67d069a60fa48db15f330b790b74365b
139 139
 github.com/containerd/typeurl                       cd3ce7159eae562a4f60ceff37dada11a939d247 # v1.0.1
140
-github.com/containerd/ttrpc                         72bb1b21c5b0a4a107f59dd85f6ab58e564b68d6 # v1.0.1
140
+github.com/containerd/ttrpc                         bfba540dc45464586c106b1f31c8547933c1eb41 # v1.0.2
141 141
 github.com/gogo/googleapis                          01e0f9cca9b92166042241267ee2a5cdf5cff46c # v1.3.2
142 142
 github.com/cilium/ebpf                              1c8d4c9ef7759622653a1d319284a44652333b28
143 143
 
144 144
new file mode 100644
... ...
@@ -0,0 +1,53 @@
0
+/*
1
+   Copyright The containerd Authors.
2
+
3
+   Licensed under the Apache License, Version 2.0 (the "License");
4
+   you may not use this file except in compliance with the License.
5
+   You may obtain a copy of the License at
6
+
7
+       http://www.apache.org/licenses/LICENSE-2.0
8
+
9
+   Unless required by applicable law or agreed to in writing, software
10
+   distributed under the License is distributed on an "AS IS" BASIS,
11
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+   See the License for the specific language governing permissions and
13
+   limitations under the License.
14
+*/
15
+
16
+package runtime
17
+
18
+import (
19
+	"net/url"
20
+	"os"
21
+	"os/exec"
22
+)
23
+
24
+// NewBinaryCmd returns a Cmd to be used to start a logging binary.
25
+// The Cmd is generated from the provided uri, and the container ID and
26
+// namespace are appended to the Cmd environment.
27
+func NewBinaryCmd(binaryURI *url.URL, id, ns string) *exec.Cmd {
28
+	var args []string
29
+	for k, vs := range binaryURI.Query() {
30
+		args = append(args, k)
31
+		if len(vs) > 0 {
32
+			args = append(args, vs[0])
33
+		}
34
+	}
35
+
36
+	cmd := exec.Command(binaryURI.Path, args...)
37
+
38
+	cmd.Env = append(cmd.Env,
39
+		"CONTAINER_ID="+id,
40
+		"CONTAINER_NAMESPACE="+ns,
41
+	)
42
+
43
+	return cmd
44
+}
45
+
46
+// CloseFiles closes any files passed in.
47
+// It it used for cleanup in the event of unexpected errors.
48
+func CloseFiles(files ...*os.File) {
49
+	for _, file := range files {
50
+		file.Close()
51
+	}
52
+}
... ...
@@ -47,8 +47,9 @@ type Client struct {
47 47
 	ctx    context.Context
48 48
 	closed func()
49 49
 
50
-	closeOnce     sync.Once
51
-	userCloseFunc func()
50
+	closeOnce       sync.Once
51
+	userCloseFunc   func()
52
+	userCloseWaitCh chan struct{}
52 53
 
53 54
 	errOnce     sync.Once
54 55
 	err         error
... ...
@@ -75,14 +76,15 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
75 75
 func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
76 76
 	ctx, cancel := context.WithCancel(context.Background())
77 77
 	c := &Client{
78
-		codec:         codec{},
79
-		conn:          conn,
80
-		channel:       newChannel(conn),
81
-		calls:         make(chan *callRequest),
82
-		closed:        cancel,
83
-		ctx:           ctx,
84
-		userCloseFunc: func() {},
85
-		interceptor:   defaultClientInterceptor,
78
+		codec:           codec{},
79
+		conn:            conn,
80
+		channel:         newChannel(conn),
81
+		calls:           make(chan *callRequest),
82
+		closed:          cancel,
83
+		ctx:             ctx,
84
+		userCloseFunc:   func() {},
85
+		userCloseWaitCh: make(chan struct{}),
86
+		interceptor:     defaultClientInterceptor,
86 87
 	}
87 88
 
88 89
 	for _, o := range opts {
... ...
@@ -175,6 +177,17 @@ func (c *Client) Close() error {
175 175
 	return nil
176 176
 }
177 177
 
178
+// UserOnCloseWait is used to blocks untils the user's on-close callback
179
+// finishes.
180
+func (c *Client) UserOnCloseWait(ctx context.Context) error {
181
+	select {
182
+	case <-c.userCloseWaitCh:
183
+		return nil
184
+	case <-ctx.Done():
185
+		return ctx.Err()
186
+	}
187
+}
188
+
178 189
 type message struct {
179 190
 	messageHeader
180 191
 	p   []byte
... ...
@@ -251,6 +264,7 @@ func (c *Client) run() {
251 251
 	defer func() {
252 252
 		c.conn.Close()
253 253
 		c.userCloseFunc()
254
+		close(c.userCloseWaitCh)
254 255
 	}()
255 256
 
256 257
 	for {
... ...
@@ -339,7 +353,8 @@ func filterCloseErr(err error) error {
339 339
 		return ErrClosed
340 340
 	default:
341 341
 		// if we have an epipe on a write or econnreset on a read , we cast to errclosed
342
-		if oerr, ok := err.(*net.OpError); ok && (oerr.Op == "write" || oerr.Op == "read") {
342
+		var oerr *net.OpError
343
+		if errors.As(err, &oerr) && (oerr.Op == "write" || oerr.Op == "read") {
343 344
 			serr, sok := oerr.Err.(*os.SyscallError)
344 345
 			if sok && ((serr.Err == syscall.EPIPE && oerr.Op == "write") ||
345 346
 				(serr.Err == syscall.ECONNRESET && oerr.Op == "read")) {