Browse code

move resumablerequestreader to pkg

Docker-DCO-1.1-Signed-off-by: Cristian Staretu <cristian.staretu@gmail.com> (github: unclejack)

unclejack authored on 2014/07/29 00:01:21
Showing 4 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1 @@
0
+Cristian Staretu <cristian.staretu@gmail.com> (@unclejack)
0 1
new file mode 100644
... ...
@@ -0,0 +1,92 @@
0
+package httputils
1
+
2
+import (
3
+	"fmt"
4
+	"io"
5
+	"log"
6
+	"net/http"
7
+	"time"
8
+)
9
+
10
+type resumableRequestReader struct {
11
+	client          *http.Client
12
+	request         *http.Request
13
+	lastRange       int64
14
+	totalSize       int64
15
+	currentResponse *http.Response
16
+	failures        uint32
17
+	maxFailures     uint32
18
+}
19
+
20
+// ResumableRequestReader makes it possible to resume reading a request's body transparently
21
+// maxfail is the number of times we retry to make requests again (not resumes)
22
+// totalsize is the total length of the body; auto detect if not provided
23
+func ResumableRequestReader(c *http.Client, r *http.Request, maxfail uint32, totalsize int64) io.ReadCloser {
24
+	return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize}
25
+}
26
+
27
+func ResumableRequestReaderWithInitialResponse(c *http.Client, r *http.Request, maxfail uint32, totalsize int64, initialResponse *http.Response) io.ReadCloser {
28
+	return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse}
29
+}
30
+
31
+func (r *resumableRequestReader) Read(p []byte) (n int, err error) {
32
+	if r.client == nil || r.request == nil {
33
+		return 0, fmt.Errorf("client and request can't be nil\n")
34
+	}
35
+	isFreshRequest := false
36
+	if r.lastRange != 0 && r.currentResponse == nil {
37
+		readRange := fmt.Sprintf("bytes=%d-%d", r.lastRange, r.totalSize)
38
+		r.request.Header.Set("Range", readRange)
39
+		time.Sleep(5 * time.Second)
40
+	}
41
+	if r.currentResponse == nil {
42
+		r.currentResponse, err = r.client.Do(r.request)
43
+		isFreshRequest = true
44
+	}
45
+	if err != nil && r.failures+1 != r.maxFailures {
46
+		r.cleanUpResponse()
47
+		r.failures += 1
48
+		time.Sleep(5 * time.Duration(r.failures) * time.Second)
49
+		return 0, nil
50
+	} else if err != nil {
51
+		r.cleanUpResponse()
52
+		return 0, err
53
+	}
54
+	if r.currentResponse.StatusCode == 416 && r.lastRange == r.totalSize && r.currentResponse.ContentLength == 0 {
55
+		r.cleanUpResponse()
56
+		return 0, io.EOF
57
+	} else if r.currentResponse.StatusCode != 206 && r.lastRange != 0 && isFreshRequest {
58
+		r.cleanUpResponse()
59
+		return 0, fmt.Errorf("the server doesn't support byte ranges")
60
+	}
61
+	if r.totalSize == 0 {
62
+		r.totalSize = r.currentResponse.ContentLength
63
+	} else if r.totalSize <= 0 {
64
+		r.cleanUpResponse()
65
+		return 0, fmt.Errorf("failed to auto detect content length")
66
+	}
67
+	n, err = r.currentResponse.Body.Read(p)
68
+	r.lastRange += int64(n)
69
+	if err != nil {
70
+		r.cleanUpResponse()
71
+	}
72
+	if err != nil && err != io.EOF {
73
+		log.Printf("encountered error during pull and clearing it before resume: %s", err)
74
+		err = nil
75
+	}
76
+	return n, err
77
+}
78
+
79
+func (r *resumableRequestReader) Close() error {
80
+	r.cleanUpResponse()
81
+	r.client = nil
82
+	r.request = nil
83
+	return nil
84
+}
85
+
86
+func (r *resumableRequestReader) cleanUpResponse() {
87
+	if r.currentResponse != nil {
88
+		r.currentResponse.Body.Close()
89
+		r.currentResponse = nil
90
+	}
91
+}
... ...
@@ -24,6 +24,7 @@ import (
24 24
 	"time"
25 25
 
26 26
 	"github.com/docker/docker/dockerversion"
27
+	"github.com/docker/docker/pkg/httputils"
27 28
 	"github.com/docker/docker/utils"
28 29
 )
29 30
 
... ...
@@ -423,7 +424,7 @@ func (r *Registry) GetRemoteImageLayer(imgID, registry string, token []string, i
423 423
 
424 424
 	if res.Header.Get("Accept-Ranges") == "bytes" && imgSize > 0 {
425 425
 		utils.Debugf("server supports resume")
426
-		return utils.ResumableRequestReaderWithInitialResponse(client, req, 5, imgSize, res), nil
426
+		return httputils.ResumableRequestReaderWithInitialResponse(client, req, 5, imgSize, res), nil
427 427
 	}
428 428
 	utils.Debugf("server doesn't support resume")
429 429
 	return res.Body, nil
430 430
deleted file mode 100644
... ...
@@ -1,91 +0,0 @@
1
-package utils
2
-
3
-import (
4
-	"fmt"
5
-	"io"
6
-	"net/http"
7
-	"time"
8
-)
9
-
10
-type resumableRequestReader struct {
11
-	client          *http.Client
12
-	request         *http.Request
13
-	lastRange       int64
14
-	totalSize       int64
15
-	currentResponse *http.Response
16
-	failures        uint32
17
-	maxFailures     uint32
18
-}
19
-
20
-// ResumableRequestReader makes it possible to resume reading a request's body transparently
21
-// maxfail is the number of times we retry to make requests again (not resumes)
22
-// totalsize is the total length of the body; auto detect if not provided
23
-func ResumableRequestReader(c *http.Client, r *http.Request, maxfail uint32, totalsize int64) io.ReadCloser {
24
-	return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize}
25
-}
26
-
27
-func ResumableRequestReaderWithInitialResponse(c *http.Client, r *http.Request, maxfail uint32, totalsize int64, initialResponse *http.Response) io.ReadCloser {
28
-	return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse}
29
-}
30
-
31
-func (r *resumableRequestReader) Read(p []byte) (n int, err error) {
32
-	if r.client == nil || r.request == nil {
33
-		return 0, fmt.Errorf("client and request can't be nil\n")
34
-	}
35
-	isFreshRequest := false
36
-	if r.lastRange != 0 && r.currentResponse == nil {
37
-		readRange := fmt.Sprintf("bytes=%d-%d", r.lastRange, r.totalSize)
38
-		r.request.Header.Set("Range", readRange)
39
-		time.Sleep(5 * time.Second)
40
-	}
41
-	if r.currentResponse == nil {
42
-		r.currentResponse, err = r.client.Do(r.request)
43
-		isFreshRequest = true
44
-	}
45
-	if err != nil && r.failures+1 != r.maxFailures {
46
-		r.cleanUpResponse()
47
-		r.failures += 1
48
-		time.Sleep(5 * time.Duration(r.failures) * time.Second)
49
-		return 0, nil
50
-	} else if err != nil {
51
-		r.cleanUpResponse()
52
-		return 0, err
53
-	}
54
-	if r.currentResponse.StatusCode == 416 && r.lastRange == r.totalSize && r.currentResponse.ContentLength == 0 {
55
-		r.cleanUpResponse()
56
-		return 0, io.EOF
57
-	} else if r.currentResponse.StatusCode != 206 && r.lastRange != 0 && isFreshRequest {
58
-		r.cleanUpResponse()
59
-		return 0, fmt.Errorf("the server doesn't support byte ranges")
60
-	}
61
-	if r.totalSize == 0 {
62
-		r.totalSize = r.currentResponse.ContentLength
63
-	} else if r.totalSize <= 0 {
64
-		r.cleanUpResponse()
65
-		return 0, fmt.Errorf("failed to auto detect content length")
66
-	}
67
-	n, err = r.currentResponse.Body.Read(p)
68
-	r.lastRange += int64(n)
69
-	if err != nil {
70
-		r.cleanUpResponse()
71
-	}
72
-	if err != nil && err != io.EOF {
73
-		Debugf("encountered error during pull and clearing it before resume: %s", err)
74
-		err = nil
75
-	}
76
-	return n, err
77
-}
78
-
79
-func (r *resumableRequestReader) Close() error {
80
-	r.cleanUpResponse()
81
-	r.client = nil
82
-	r.request = nil
83
-	return nil
84
-}
85
-
86
-func (r *resumableRequestReader) cleanUpResponse() {
87
-	if r.currentResponse != nil {
88
-		r.currentResponse.Body.Close()
89
-		r.currentResponse = nil
90
-	}
91
-}