Browse code

vendor buildkit to 8f4dff0d16ea91cb43315d5f5aa4b27f4fe4e1f2

Signed-off-by: Tibor Vass <tibor@docker.com>

Tibor Vass authored on 2018/09/28 06:21:47
Showing 13 changed files
... ...
@@ -26,7 +26,7 @@ github.com/imdario/mergo v0.3.6
26 26
 golang.org/x/sync 1d60e4601c6fd243af51cc01ddf169918a5407ca
27 27
 
28 28
 # buildkit
29
-github.com/moby/buildkit 39404586a50d1b9d0fb1c578cf0f4de7bdb7afe5
29
+github.com/moby/buildkit 8f4dff0d16ea91cb43315d5f5aa4b27f4fe4e1f2
30 30
 github.com/tonistiigi/fsutil b19464cd1b6a00773b4f2eb7acf9c30426f9df42
31 31
 github.com/grpc-ecosystem/grpc-opentracing 8e809c8a86450a29b90dcc9efbf062d0fe6d9746
32 32
 github.com/opentracing/opentracing-go 1361b9cd60be79c4c3a7fa9841b3c132e40066a7
... ...
@@ -10,7 +10,6 @@ import (
10 10
 	"path/filepath"
11 11
 	"sync"
12 12
 
13
-	"github.com/containerd/continuity/fs"
14 13
 	"github.com/docker/docker/pkg/locker"
15 14
 	iradix "github.com/hashicorp/go-immutable-radix"
16 15
 	"github.com/hashicorp/golang-lru/simplelru"
... ...
@@ -400,7 +399,11 @@ func (cc *cacheContext) commitActiveTransaction() {
400 400
 
401 401
 func (cc *cacheContext) lazyChecksum(ctx context.Context, m *mount, p string) (*CacheRecord, error) {
402 402
 	root := cc.tree.Root()
403
-	if cc.needsScan(root, p) {
403
+	scan, err := cc.needsScan(root, p)
404
+	if err != nil {
405
+		return nil, err
406
+	}
407
+	if scan {
404 408
 		if err := cc.scanPath(ctx, m, p); err != nil {
405 409
 			return nil, err
406 410
 		}
... ...
@@ -418,13 +421,13 @@ func (cc *cacheContext) lazyChecksum(ctx context.Context, m *mount, p string) (*
418 418
 }
419 419
 
420 420
 func (cc *cacheContext) checksum(ctx context.Context, root *iradix.Node, txn *iradix.Txn, m *mount, k []byte) (*CacheRecord, bool, error) {
421
-	v, ok := root.Get(k)
422
-
423
-	if !ok {
421
+	k, cr, err := getFollowLinks(root, k)
422
+	if err != nil {
423
+		return nil, false, err
424
+	}
425
+	if cr == nil {
424 426
 		return nil, false, errors.Wrapf(errNotFound, "%s not found", convertKeyToPath(k))
425 427
 	}
426
-	cr := v.(*CacheRecord)
427
-
428 428
 	if cr.Digest != "" {
429 429
 		return cr, false, nil
430 430
 	}
... ...
@@ -491,17 +494,37 @@ func (cc *cacheContext) checksum(ctx context.Context, root *iradix.Node, txn *ir
491 491
 	return cr2, true, nil
492 492
 }
493 493
 
494
-func (cc *cacheContext) needsScan(root *iradix.Node, p string) bool {
494
+// needsScan returns false if path is in the tree or a parent path is in tree
495
+// and subpath is missing
496
+func (cc *cacheContext) needsScan(root *iradix.Node, p string) (bool, error) {
497
+	var linksWalked int
498
+	return cc.needsScanFollow(root, p, &linksWalked)
499
+}
500
+
501
+func (cc *cacheContext) needsScanFollow(root *iradix.Node, p string, linksWalked *int) (bool, error) {
495 502
 	if p == "/" {
496 503
 		p = ""
497 504
 	}
498
-	if _, ok := root.Get(convertPathToKey([]byte(p))); !ok {
505
+	if v, ok := root.Get(convertPathToKey([]byte(p))); !ok {
499 506
 		if p == "" {
500
-			return true
507
+			return true, nil
508
+		}
509
+		return cc.needsScanFollow(root, path.Clean(path.Dir(p)), linksWalked)
510
+	} else {
511
+		cr := v.(*CacheRecord)
512
+		if cr.Type == CacheRecordTypeSymlink {
513
+			if *linksWalked > 255 {
514
+				return false, errTooManyLinks
515
+			}
516
+			*linksWalked++
517
+			link := path.Clean(cr.Linkname)
518
+			if !path.IsAbs(cr.Linkname) {
519
+				link = path.Join("/", path.Dir(p), link)
520
+			}
521
+			return cc.needsScanFollow(root, link, linksWalked)
501 522
 		}
502
-		return cc.needsScan(root, path.Clean(path.Dir(p)))
503 523
 	}
504
-	return false
524
+	return false, nil
505 525
 }
506 526
 
507 527
 func (cc *cacheContext) scanPath(ctx context.Context, m *mount, p string) (retErr error) {
... ...
@@ -513,14 +536,23 @@ func (cc *cacheContext) scanPath(ctx context.Context, m *mount, p string) (retEr
513 513
 		return err
514 514
 	}
515 515
 
516
-	parentPath, err := fs.RootPath(mp, filepath.FromSlash(d))
516
+	n := cc.tree.Root()
517
+	txn := cc.tree.Txn()
518
+
519
+	parentPath, err := rootPath(mp, filepath.FromSlash(d), func(p, link string) error {
520
+		cr := &CacheRecord{
521
+			Type:     CacheRecordTypeSymlink,
522
+			Linkname: filepath.ToSlash(link),
523
+		}
524
+		k := []byte(filepath.Join("/", filepath.ToSlash(p)))
525
+		k = convertPathToKey(k)
526
+		txn.Insert(k, cr)
527
+		return nil
528
+	})
517 529
 	if err != nil {
518 530
 		return err
519 531
 	}
520 532
 
521
-	n := cc.tree.Root()
522
-	txn := cc.tree.Txn()
523
-
524 533
 	err = filepath.Walk(parentPath, func(path string, fi os.FileInfo, err error) error {
525 534
 		if err != nil {
526 535
 			return errors.Wrapf(err, "failed to walk %s", path)
... ...
@@ -566,6 +598,45 @@ func (cc *cacheContext) scanPath(ctx context.Context, m *mount, p string) (retEr
566 566
 	return nil
567 567
 }
568 568
 
569
+func getFollowLinks(root *iradix.Node, k []byte) ([]byte, *CacheRecord, error) {
570
+	var linksWalked int
571
+	return getFollowLinksWalk(root, k, &linksWalked)
572
+}
573
+
574
+func getFollowLinksWalk(root *iradix.Node, k []byte, linksWalked *int) ([]byte, *CacheRecord, error) {
575
+	v, ok := root.Get(k)
576
+	if ok {
577
+		return k, v.(*CacheRecord), nil
578
+	}
579
+	if len(k) == 0 {
580
+		return nil, nil, nil
581
+	}
582
+
583
+	dir, file := splitKey(k)
584
+
585
+	_, parent, err := getFollowLinksWalk(root, dir, linksWalked)
586
+	if err != nil {
587
+		return nil, nil, err
588
+	}
589
+	if parent != nil && parent.Type == CacheRecordTypeSymlink {
590
+		*linksWalked++
591
+		if *linksWalked > 255 {
592
+			return nil, nil, errors.Errorf("too many links")
593
+		}
594
+		dirPath := path.Clean(string(convertKeyToPath(dir)))
595
+		if dirPath == "." || dirPath == "/" {
596
+			dirPath = ""
597
+		}
598
+		link := parent.Linkname
599
+		if !path.IsAbs(link) {
600
+			link = path.Join("/", path.Join(path.Dir(dirPath), link))
601
+		}
602
+		return getFollowLinksWalk(root, append(convertPathToKey([]byte(link)), file...), linksWalked)
603
+	}
604
+
605
+	return nil, nil, nil
606
+}
607
+
569 608
 func prepareDigest(fp, p string, fi os.FileInfo) (digest.Digest, error) {
570 609
 	h, err := NewFileHash(fp, fi)
571 610
 	if err != nil {
... ...
@@ -632,3 +703,18 @@ func convertPathToKey(p []byte) []byte {
632 632
 func convertKeyToPath(p []byte) []byte {
633 633
 	return bytes.Replace([]byte(p), []byte{0}, []byte("/"), -1)
634 634
 }
635
+
636
+func splitKey(k []byte) ([]byte, []byte) {
637
+	foundBytes := false
638
+	i := len(k) - 1
639
+	for {
640
+		if i <= 0 || foundBytes && k[i] == 0 {
641
+			break
642
+		}
643
+		if k[i] != 0 {
644
+			foundBytes = true
645
+		}
646
+		i--
647
+	}
648
+	return append([]byte{}, k[:i]...), k[i:]
649
+}
635 650
new file mode 100644
... ...
@@ -0,0 +1,107 @@
0
+package contenthash
1
+
2
+import (
3
+	"errors"
4
+	"os"
5
+	"path/filepath"
6
+)
7
+
8
+var (
9
+	errTooManyLinks = errors.New("too many links")
10
+)
11
+
12
+type onSymlinkFunc func(string, string) error
13
+
14
+// rootPath joins a path with a root, evaluating and bounding any
15
+// symlink to the root directory.
16
+// This is containerd/continuity/fs RootPath implementation with a callback on
17
+// resolving the symlink.
18
+func rootPath(root, path string, cb onSymlinkFunc) (string, error) {
19
+	if path == "" {
20
+		return root, nil
21
+	}
22
+	var linksWalked int // to protect against cycles
23
+	for {
24
+		i := linksWalked
25
+		newpath, err := walkLinks(root, path, &linksWalked, cb)
26
+		if err != nil {
27
+			return "", err
28
+		}
29
+		path = newpath
30
+		if i == linksWalked {
31
+			newpath = filepath.Join("/", newpath)
32
+			if path == newpath {
33
+				return filepath.Join(root, newpath), nil
34
+			}
35
+			path = newpath
36
+		}
37
+	}
38
+}
39
+
40
+func walkLink(root, path string, linksWalked *int, cb onSymlinkFunc) (newpath string, islink bool, err error) {
41
+	if *linksWalked > 255 {
42
+		return "", false, errTooManyLinks
43
+	}
44
+
45
+	path = filepath.Join("/", path)
46
+	if path == "/" {
47
+		return path, false, nil
48
+	}
49
+	realPath := filepath.Join(root, path)
50
+
51
+	fi, err := os.Lstat(realPath)
52
+	if err != nil {
53
+		// If path does not yet exist, treat as non-symlink
54
+		if os.IsNotExist(err) {
55
+			return path, false, nil
56
+		}
57
+		return "", false, err
58
+	}
59
+	if fi.Mode()&os.ModeSymlink == 0 {
60
+		return path, false, nil
61
+	}
62
+	newpath, err = os.Readlink(realPath)
63
+	if err != nil {
64
+		return "", false, err
65
+	}
66
+	if cb != nil {
67
+		if err := cb(path, newpath); err != nil {
68
+			return "", false, err
69
+		}
70
+	}
71
+	*linksWalked++
72
+	return newpath, true, nil
73
+}
74
+
75
+func walkLinks(root, path string, linksWalked *int, cb onSymlinkFunc) (string, error) {
76
+	switch dir, file := filepath.Split(path); {
77
+	case dir == "":
78
+		newpath, _, err := walkLink(root, file, linksWalked, cb)
79
+		return newpath, err
80
+	case file == "":
81
+		if os.IsPathSeparator(dir[len(dir)-1]) {
82
+			if dir == "/" {
83
+				return dir, nil
84
+			}
85
+			return walkLinks(root, dir[:len(dir)-1], linksWalked, cb)
86
+		}
87
+		newpath, _, err := walkLink(root, dir, linksWalked, cb)
88
+		return newpath, err
89
+	default:
90
+		newdir, err := walkLinks(root, dir, linksWalked, cb)
91
+		if err != nil {
92
+			return "", err
93
+		}
94
+		newpath, islink, err := walkLink(root, filepath.Join(newdir, file), linksWalked, cb)
95
+		if err != nil {
96
+			return "", err
97
+		}
98
+		if !islink {
99
+			return newpath, nil
100
+		}
101
+		if filepath.IsAbs(newpath) {
102
+			return newpath, nil
103
+		}
104
+		return filepath.Join(newdir, newpath), nil
105
+	}
106
+}
... ...
@@ -411,6 +411,13 @@ func WithoutDefaultExportCache() ConstraintsOpt {
411 411
 	})
412 412
 }
413 413
 
414
+// WithCaps exposes supported LLB caps to the marshaler
415
+func WithCaps(caps apicaps.CapSet) ConstraintsOpt {
416
+	return constraintsOptFunc(func(c *Constraints) {
417
+		c.Caps = &caps
418
+	})
419
+}
420
+
414 421
 type constraintsWrapper struct {
415 422
 	Constraints
416 423
 }
... ...
@@ -424,6 +431,7 @@ type Constraints struct {
424 424
 	WorkerConstraints []string
425 425
 	Metadata          pb.OpMetadata
426 426
 	LocalUniqueID     string
427
+	Caps              *apicaps.CapSet
427 428
 }
428 429
 
429 430
 func Platform(p specs.Platform) ConstraintsOpt {
... ...
@@ -11,6 +11,7 @@ import (
11 11
 	"github.com/containerd/containerd/mount"
12 12
 	"github.com/containerd/containerd/namespaces"
13 13
 	"github.com/containerd/containerd/oci"
14
+	"github.com/containerd/continuity/fs"
14 15
 	"github.com/mitchellh/hashstructure"
15 16
 	"github.com/moby/buildkit/executor"
16 17
 	"github.com/moby/buildkit/snapshot"
... ...
@@ -114,7 +115,11 @@ func (s *submounts) subMount(m mount.Mount, subPath string) (mount.Mount, error)
114 114
 		return mount.Mount{}, nil
115 115
 	}
116 116
 	if mr, ok := s.m[h]; ok {
117
-		return sub(mr.mount, subPath), nil
117
+		sm, err := sub(mr.mount, subPath)
118
+		if err != nil {
119
+			return mount.Mount{}, nil
120
+		}
121
+		return sm, nil
118 122
 	}
119 123
 
120 124
 	lm := snapshot.LocalMounterWithMounts([]mount.Mount{m})
... ...
@@ -140,7 +145,11 @@ func (s *submounts) subMount(m mount.Mount, subPath string) (mount.Mount, error)
140 140
 		unmount: lm.Unmount,
141 141
 	}
142 142
 
143
-	return sub(s.m[h].mount, subPath), nil
143
+	sm, err := sub(s.m[h].mount, subPath)
144
+	if err != nil {
145
+		return mount.Mount{}, err
146
+	}
147
+	return sm, nil
144 148
 }
145 149
 
146 150
 func (s *submounts) cleanup() {
... ...
@@ -157,7 +166,11 @@ func (s *submounts) cleanup() {
157 157
 	wg.Wait()
158 158
 }
159 159
 
160
-func sub(m mount.Mount, subPath string) mount.Mount {
161
-	m.Source = path.Join(m.Source, subPath)
162
-	return m
160
+func sub(m mount.Mount, subPath string) (mount.Mount, error) {
161
+	src, err := fs.RootPath(m.Source, subPath)
162
+	if err != nil {
163
+		return mount.Mount{}, err
164
+	}
165
+	m.Source = src
166
+	return m, nil
163 167
 }
... ...
@@ -263,7 +263,13 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
263 263
 	}
264 264
 
265 265
 	if status != 0 {
266
-		return errors.Errorf("exit code: %d", status)
266
+		err := errors.Errorf("exit code: %d", status)
267
+		select {
268
+		case <-ctx.Done():
269
+			return errors.Wrapf(ctx.Err(), err.Error())
270
+		default:
271
+			return err
272
+		}
267 273
 	}
268 274
 
269 275
 	return nil
... ...
@@ -48,6 +48,9 @@ var gitUrlPathWithFragmentSuffix = regexp.MustCompile("\\.git(?:#.+)?$")
48 48
 
49 49
 func Build(ctx context.Context, c client.Client) (*client.Result, error) {
50 50
 	opts := c.BuildOpts().Opts
51
+	caps := c.BuildOpts().LLBCaps
52
+
53
+	marshalOpts := []llb.ConstraintsOpt{llb.WithCaps(caps)}
51 54
 
52 55
 	defaultBuildPlatform := platforms.DefaultSpec()
53 56
 	if workers := c.BuildOpts().Workers; len(workers) > 0 && len(workers[0].Platforms) > 0 {
... ...
@@ -111,7 +114,7 @@ func Build(ctx context.Context, c client.Client) (*client.Result, error) {
111 111
 		buildContext = &src
112 112
 	} else if httpPrefix.MatchString(opts[LocalNameContext]) {
113 113
 		httpContext := llb.HTTP(opts[LocalNameContext], llb.Filename("context"), dockerfile2llb.WithInternalName("load remote build context"))
114
-		def, err := httpContext.Marshal()
114
+		def, err := httpContext.Marshal(marshalOpts...)
115 115
 		if err != nil {
116 116
 			return nil, errors.Wrapf(err, "failed to marshal httpcontext")
117 117
 		}
... ...
@@ -154,7 +157,7 @@ func Build(ctx context.Context, c client.Client) (*client.Result, error) {
154 154
 		}
155 155
 	}
156 156
 
157
-	def, err := src.Marshal()
157
+	def, err := src.Marshal(marshalOpts...)
158 158
 	if err != nil {
159 159
 		return nil, errors.Wrapf(err, "failed to marshal local source")
160 160
 	}
... ...
@@ -195,7 +198,7 @@ func Build(ctx context.Context, c client.Client) (*client.Result, error) {
195 195
 				)
196 196
 				dockerignoreState = &st
197 197
 			}
198
-			def, err := dockerignoreState.Marshal()
198
+			def, err := dockerignoreState.Marshal(marshalOpts...)
199 199
 			if err != nil {
200 200
 				return err
201 201
 			}
... ...
@@ -272,6 +275,7 @@ func Build(ctx context.Context, c client.Client) (*client.Result, error) {
272 272
 					ExtraHosts:        extraHosts,
273 273
 					ForceNetMode:      defaultNetMode,
274 274
 					OverrideCopyImage: opts[keyOverrideCopyImage],
275
+					LLBCaps:           &caps,
275 276
 				})
276 277
 
277 278
 				if err != nil {
... ...
@@ -23,6 +23,7 @@ import (
23 23
 	"github.com/moby/buildkit/frontend/dockerfile/shell"
24 24
 	gw "github.com/moby/buildkit/frontend/gateway/client"
25 25
 	"github.com/moby/buildkit/solver/pb"
26
+	"github.com/moby/buildkit/util/apicaps"
26 27
 	"github.com/moby/buildkit/util/system"
27 28
 	specs "github.com/opencontainers/image-spec/specs-go/v1"
28 29
 	"github.com/pkg/errors"
... ...
@@ -34,7 +35,7 @@ const (
34 34
 	localNameContext = "context"
35 35
 	historyComment   = "buildkit.dockerfile.v0"
36 36
 
37
-	DefaultCopyImage = "tonistiigi/copy:v0.1.4@sha256:d9d49bedbbe2b27df88115e6aff7b9cd11ed2fbd8d9013f02d3da735c08c92e5"
37
+	DefaultCopyImage = "tonistiigi/copy:v0.1.5@sha256:eab89b76ffbb3c807663a67a41e8be31b8a0e362d7fb074a55bddace563a28bb"
38 38
 )
39 39
 
40 40
 type ConvertOpt struct {
... ...
@@ -57,6 +58,7 @@ type ConvertOpt struct {
57 57
 	ExtraHosts        []llb.HostIP
58 58
 	ForceNetMode      pb.NetMode
59 59
 	OverrideCopyImage string
60
+	LLBCaps           *apicaps.CapSet
60 61
 }
61 62
 
62 63
 func Dockerfile2LLB(ctx context.Context, dt []byte, opt ConvertOpt) (*llb.State, *Image, error) {
... ...
@@ -368,7 +370,13 @@ func Dockerfile2LLB(ctx context.Context, dt []byte, opt ConvertOpt) (*llb.State,
368 368
 	}
369 369
 	buildContext.Output = bc.Output()
370 370
 
371
-	st := target.state.SetMarshalDefaults(llb.Platform(platformOpt.targetPlatform))
371
+	defaults := []llb.ConstraintsOpt{
372
+		llb.Platform(platformOpt.targetPlatform),
373
+	}
374
+	if opt.LLBCaps != nil {
375
+		defaults = append(defaults, llb.WithCaps(*opt.LLBCaps))
376
+	}
377
+	st := target.state.SetMarshalDefaults(defaults...)
372 378
 
373 379
 	if !platformOpt.implicitTarget {
374 380
 		target.image.OS = platformOpt.targetPlatform.OS
... ...
@@ -4,6 +4,7 @@ import (
4 4
 	"context"
5 5
 
6 6
 	"github.com/moby/buildkit/solver/pb"
7
+	"github.com/moby/buildkit/util/apicaps"
7 8
 	digest "github.com/opencontainers/go-digest"
8 9
 	specs "github.com/opencontainers/image-spec/specs-go/v1"
9 10
 )
... ...
@@ -49,6 +50,8 @@ type BuildOpts struct {
49 49
 	SessionID string
50 50
 	Workers   []WorkerInfo
51 51
 	Product   string
52
+	LLBCaps   apicaps.CapSet
53
+	Caps      apicaps.CapSet
52 54
 }
53 55
 
54 56
 type ResolveImageConfigOpt struct {
... ...
@@ -8,8 +8,10 @@ import (
8 8
 	clienttypes "github.com/moby/buildkit/client"
9 9
 	"github.com/moby/buildkit/frontend"
10 10
 	"github.com/moby/buildkit/frontend/gateway/client"
11
+	gwpb "github.com/moby/buildkit/frontend/gateway/pb"
11 12
 	"github.com/moby/buildkit/session"
12 13
 	"github.com/moby/buildkit/solver"
14
+	opspb "github.com/moby/buildkit/solver/pb"
13 15
 	"github.com/moby/buildkit/util/apicaps"
14 16
 	"github.com/moby/buildkit/worker"
15 17
 	"github.com/pkg/errors"
... ...
@@ -79,6 +81,8 @@ func (c *bridgeClient) BuildOpts() client.BuildOpts {
79 79
 		SessionID: c.sid,
80 80
 		Workers:   workers,
81 81
 		Product:   apicaps.ExportedProduct,
82
+		Caps:      gwpb.Caps.CapSet(gwpb.Caps.All()),
83
+		LLBCaps:   opspb.Caps.CapSet(opspb.Caps.All()),
82 84
 	}
83 85
 }
84 86
 
... ...
@@ -329,21 +329,11 @@ func (c *grpcClient) BuildOpts() client.BuildOpts {
329 329
 		SessionID: c.sessionID,
330 330
 		Workers:   c.workers,
331 331
 		Product:   c.product,
332
+		LLBCaps:   c.llbCaps,
333
+		Caps:      c.caps,
332 334
 	}
333 335
 }
334 336
 
335
-func (c *grpcClient) Opts() map[string]string {
336
-	return c.opts
337
-}
338
-
339
-func (c *grpcClient) SessionID() string {
340
-	return c.sessionID
341
-}
342
-
343
-func (c *grpcClient) WorkerInfos() []client.WorkerInfo {
344
-	return c.workers
345
-}
346
-
347 337
 type reference struct {
348 338
 	id string
349 339
 	c  *grpcClient
... ...
@@ -271,7 +271,7 @@ func (e *edge) currentIndexKey() *CacheKey {
271 271
 func (e *edge) skipPhase2SlowCache(dep *dep) bool {
272 272
 	isPhase1 := false
273 273
 	for _, dep := range e.deps {
274
-		if !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) == 0 {
274
+		if (!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil || dep.state < edgeStatusCacheSlow) && len(dep.keyMap) == 0 {
275 275
 			isPhase1 = true
276 276
 			break
277 277
 		}
... ...
@@ -3,6 +3,7 @@ package solver
3 3
 import (
4 4
 	"context"
5 5
 	"fmt"
6
+	"strings"
6 7
 	"sync"
7 8
 	"time"
8 9
 
... ...
@@ -560,15 +561,14 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, f ResultBased
560 560
 		key, err := f(ctx, res)
561 561
 		complete := true
562 562
 		if err != nil {
563
-			canceled := false
564 563
 			select {
565 564
 			case <-ctx.Done():
566
-				canceled = true
565
+				if strings.Contains(err.Error(), context.Canceled.Error()) {
566
+					complete = false
567
+					err = errors.Wrap(ctx.Err(), err.Error())
568
+				}
567 569
 			default:
568 570
 			}
569
-			if canceled && errors.Cause(err) == context.Canceled {
570
-				complete = false
571
-			}
572 571
 		}
573 572
 		s.slowMu.Lock()
574 573
 		defer s.slowMu.Unlock()
... ...
@@ -615,15 +615,14 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (*cacheMapResp, erro
615 615
 		res, done, err := op.CacheMap(ctx, len(s.cacheRes))
616 616
 		complete := true
617 617
 		if err != nil {
618
-			canceled := false
619 618
 			select {
620 619
 			case <-ctx.Done():
621
-				canceled = true
620
+				if strings.Contains(err.Error(), context.Canceled.Error()) {
621
+					complete = false
622
+					err = errors.Wrap(ctx.Err(), err.Error())
623
+				}
622 624
 			default:
623 625
 			}
624
-			if canceled && errors.Cause(err) == context.Canceled {
625
-				complete = false
626
-			}
627 626
 		}
628 627
 		if complete {
629 628
 			if err == nil {
... ...
@@ -669,15 +668,14 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
669 669
 		res, err := op.Exec(ctx, inputs)
670 670
 		complete := true
671 671
 		if err != nil {
672
-			canceled := false
673 672
 			select {
674 673
 			case <-ctx.Done():
675
-				canceled = true
674
+				if strings.Contains(err.Error(), context.Canceled.Error()) {
675
+					complete = false
676
+					err = errors.Wrap(ctx.Err(), err.Error())
677
+				}
676 678
 			default:
677 679
 			}
678
-			if canceled && errors.Cause(err) == context.Canceled {
679
-				complete = false
680
-			}
681 680
 		}
682 681
 		if complete {
683 682
 			if res != nil {