Browse code

builder: add support for separate upload-request

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>

Tonis Tiigi authored on 2018/05/19 09:50:52
Showing 2 changed files
... ...
@@ -63,6 +63,10 @@ func (b *Backend) Build(ctx context.Context, config backend.BuildConfig) (string
63 63
 		}
64 64
 	}
65 65
 
66
+	if build == nil {
67
+		return "", nil
68
+	}
69
+
66 70
 	var imageID = build.ImageID
67 71
 	if options.Squash {
68 72
 		if imageID, err = squashBuild(build, b.imageComponent); err != nil {
... ...
@@ -6,6 +6,7 @@ import (
6 6
 	"io"
7 7
 	"strings"
8 8
 	"sync"
9
+	"time"
9 10
 
10 11
 	"github.com/containerd/containerd/content"
11 12
 	"github.com/docker/docker/api/types"
... ...
@@ -34,7 +35,7 @@ type Builder struct {
34 34
 	reqBodyHandler *reqBodyHandler
35 35
 
36 36
 	mu   sync.Mutex
37
-	jobs map[string]func()
37
+	jobs map[string]*buildJob
38 38
 }
39 39
 
40 40
 func New(opt Opt) (*Builder, error) {
... ...
@@ -47,15 +48,15 @@ func New(opt Opt) (*Builder, error) {
47 47
 	b := &Builder{
48 48
 		controller:     c,
49 49
 		reqBodyHandler: reqHandler,
50
-		jobs:           map[string]func(){},
50
+		jobs:           map[string]*buildJob{},
51 51
 	}
52 52
 	return b, nil
53 53
 }
54 54
 
55 55
 func (b *Builder) Cancel(ctx context.Context, id string) error {
56 56
 	b.mu.Lock()
57
-	if cancel, ok := b.jobs[id]; ok {
58
-		cancel()
57
+	if j, ok := b.jobs[id]; ok && j.cancel != nil {
58
+		j.cancel()
59 59
 	}
60 60
 	b.mu.Unlock()
61 61
 	return nil
... ...
@@ -114,10 +115,43 @@ func (b *Builder) Prune(ctx context.Context) (int64, error) {
114 114
 }
115 115
 
116 116
 func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder.Result, error) {
117
+	var rc = opt.Source
118
+
117 119
 	if buildID := opt.Options.BuildID; buildID != "" {
118 120
 		b.mu.Lock()
119
-		ctx, b.jobs[buildID] = context.WithCancel(ctx)
121
+
122
+		upload := false
123
+		if strings.HasPrefix(buildID, "upload-request:") {
124
+			upload = true
125
+			buildID = strings.TrimPrefix(buildID, "upload-request:")
126
+		}
127
+
128
+		if _, ok := b.jobs[buildID]; !ok {
129
+			b.jobs[buildID] = newBuildJob()
130
+		}
131
+		j := b.jobs[buildID]
132
+		ctx, cancel := context.WithCancel(ctx)
133
+		j.cancel = cancel
120 134
 		b.mu.Unlock()
135
+
136
+		if upload {
137
+			ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
138
+			defer cancel()
139
+			err := j.SetUpload(ctx2, rc)
140
+			return nil, err
141
+		}
142
+
143
+		if remoteContext := opt.Options.RemoteContext; remoteContext == "upload-request" {
144
+			ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
145
+			defer cancel()
146
+			var err error
147
+			rc, err = j.WaitUpload(ctx2)
148
+			if err != nil {
149
+				return nil, err
150
+			}
151
+			opt.Options.RemoteContext = ""
152
+		}
153
+
121 154
 		defer func() {
122 155
 			delete(b.jobs, buildID)
123 156
 		}()
... ...
@@ -142,7 +176,7 @@ func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder.
142 142
 			frontendAttrs["context"] = opt.Options.RemoteContext
143 143
 		}
144 144
 	} else {
145
-		url, cancel := b.reqBodyHandler.newRequest(opt.Source)
145
+		url, cancel := b.reqBodyHandler.newRequest(rc)
146 146
 		defer cancel()
147 147
 		frontendAttrs["context"] = url
148 148
 	}
... ...
@@ -295,3 +329,78 @@ type contentStoreNoLabels struct {
295 295
 func (c *contentStoreNoLabels) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
296 296
 	return content.Info{}, nil
297 297
 }
298
+
299
+type wrapRC struct {
300
+	io.ReadCloser
301
+	once   sync.Once
302
+	err    error
303
+	waitCh chan struct{}
304
+}
305
+
306
+func (w *wrapRC) Read(b []byte) (int, error) {
307
+	n, err := w.ReadCloser.Read(b)
308
+	if err != nil {
309
+		e := err
310
+		if e == io.EOF {
311
+			e = nil
312
+		}
313
+		w.close(e)
314
+	}
315
+	return n, err
316
+}
317
+
318
+func (w *wrapRC) Close() error {
319
+	err := w.ReadCloser.Close()
320
+	w.close(err)
321
+	return err
322
+}
323
+
324
+func (w *wrapRC) close(err error) {
325
+	w.once.Do(func() {
326
+		w.err = err
327
+		close(w.waitCh)
328
+	})
329
+}
330
+
331
+func (w *wrapRC) wait() error {
332
+	<-w.waitCh
333
+	return w.err
334
+}
335
+
336
+type buildJob struct {
337
+	cancel func()
338
+	waitCh chan func(io.ReadCloser) error
339
+}
340
+
341
+func newBuildJob() *buildJob {
342
+	return &buildJob{waitCh: make(chan func(io.ReadCloser) error)}
343
+}
344
+
345
+func (j *buildJob) WaitUpload(ctx context.Context) (io.ReadCloser, error) {
346
+	done := make(chan struct{})
347
+
348
+	var upload io.ReadCloser
349
+	fn := func(rc io.ReadCloser) error {
350
+		w := &wrapRC{ReadCloser: rc, waitCh: make(chan struct{})}
351
+		upload = w
352
+		close(done)
353
+		return w.wait()
354
+	}
355
+
356
+	select {
357
+	case <-ctx.Done():
358
+		return nil, ctx.Err()
359
+	case j.waitCh <- fn:
360
+		<-done
361
+		return upload, nil
362
+	}
363
+}
364
+
365
+func (j *buildJob) SetUpload(ctx context.Context, rc io.ReadCloser) error {
366
+	select {
367
+	case <-ctx.Done():
368
+		return ctx.Err()
369
+	case fn := <-j.waitCh:
370
+		return fn(rc)
371
+	}
372
+}