- Make it possible to define a shorter waiting time of httputils
- Make a small hack to reduce the waiting time on distribution/xfer
Signed-off-by: Vincent Demeester <vincent@sbr.pm>
| ... | ... |
@@ -22,8 +22,9 @@ const maxDownloadAttempts = 5 |
| 22 | 22 |
// registers and downloads those, taking into account dependencies between |
| 23 | 23 |
// layers. |
| 24 | 24 |
type LayerDownloadManager struct {
|
| 25 |
- layerStore layer.Store |
|
| 26 |
- tm TransferManager |
|
| 25 |
+ layerStore layer.Store |
|
| 26 |
+ tm TransferManager |
|
| 27 |
+ waitDuration time.Duration |
|
| 27 | 28 |
} |
| 28 | 29 |
|
| 29 | 30 |
// SetConcurrency sets the max concurrent downloads for each pull |
| ... | ... |
@@ -32,11 +33,16 @@ func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
|
| 32 | 32 |
} |
| 33 | 33 |
|
| 34 | 34 |
// NewLayerDownloadManager returns a new LayerDownloadManager. |
| 35 |
-func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int) *LayerDownloadManager {
|
|
| 36 |
- return &LayerDownloadManager{
|
|
| 37 |
- layerStore: layerStore, |
|
| 38 |
- tm: NewTransferManager(concurrencyLimit), |
|
| 35 |
+func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager {
|
|
| 36 |
+ manager := LayerDownloadManager{
|
|
| 37 |
+ layerStore: layerStore, |
|
| 38 |
+ tm: NewTransferManager(concurrencyLimit), |
|
| 39 |
+ waitDuration: time.Second, |
|
| 40 |
+ } |
|
| 41 |
+ for _, option := range options {
|
|
| 42 |
+ option(&manager) |
|
| 39 | 43 |
} |
| 44 |
+ return &manager |
|
| 40 | 45 |
} |
| 41 | 46 |
|
| 42 | 47 |
type downloadTransfer struct {
|
| ... | ... |
@@ -269,7 +275,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, |
| 269 | 269 |
|
| 270 | 270 |
logrus.Errorf("Download failed, retrying: %v", err)
|
| 271 | 271 |
delay := retries * 5 |
| 272 |
- ticker := time.NewTicker(time.Second) |
|
| 272 |
+ ticker := time.NewTicker(ldm.waitDuration) |
|
| 273 | 273 |
|
| 274 | 274 |
selectLoop: |
| 275 | 275 |
for {
|
| ... | ... |
@@ -265,8 +265,9 @@ func TestSuccessfulDownload(t *testing.T) {
|
| 265 | 265 |
if runtime.GOOS == "windows" {
|
| 266 | 266 |
t.Skip("Needs fixing on Windows")
|
| 267 | 267 |
} |
| 268 |
+ |
|
| 268 | 269 |
layerStore := &mockLayerStore{make(map[layer.ChainID]*mockLayer)}
|
| 269 |
- ldm := NewLayerDownloadManager(layerStore, maxDownloadConcurrency) |
|
| 270 |
+ ldm := NewLayerDownloadManager(layerStore, maxDownloadConcurrency, func(m *LayerDownloadManager) { m.waitDuration = time.Millisecond })
|
|
| 270 | 271 |
|
| 271 | 272 |
progressChan := make(chan progress.Progress) |
| 272 | 273 |
progressDone := make(chan struct{})
|
| ... | ... |
@@ -327,7 +328,7 @@ func TestSuccessfulDownload(t *testing.T) {
|
| 327 | 327 |
} |
| 328 | 328 |
|
| 329 | 329 |
func TestCancelledDownload(t *testing.T) {
|
| 330 |
- ldm := NewLayerDownloadManager(&mockLayerStore{make(map[layer.ChainID]*mockLayer)}, maxDownloadConcurrency)
|
|
| 330 |
+ ldm := NewLayerDownloadManager(&mockLayerStore{make(map[layer.ChainID]*mockLayer)}, maxDownloadConcurrency, func(m *LayerDownloadManager) { m.waitDuration = time.Millisecond })
|
|
| 331 | 331 |
|
| 332 | 332 |
progressChan := make(chan progress.Progress) |
| 333 | 333 |
progressDone := make(chan struct{})
|
| ... | ... |
@@ -16,7 +16,8 @@ const maxUploadAttempts = 5 |
| 16 | 16 |
// LayerUploadManager provides task management and progress reporting for |
| 17 | 17 |
// uploads. |
| 18 | 18 |
type LayerUploadManager struct {
|
| 19 |
- tm TransferManager |
|
| 19 |
+ tm TransferManager |
|
| 20 |
+ waitDuration time.Duration |
|
| 20 | 21 |
} |
| 21 | 22 |
|
| 22 | 23 |
// SetConcurrency sets the max concurrent uploads for each push |
| ... | ... |
@@ -25,10 +26,15 @@ func (lum *LayerUploadManager) SetConcurrency(concurrency int) {
|
| 25 | 25 |
} |
| 26 | 26 |
|
| 27 | 27 |
// NewLayerUploadManager returns a new LayerUploadManager. |
| 28 |
-func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager {
|
|
| 29 |
- return &LayerUploadManager{
|
|
| 30 |
- tm: NewTransferManager(concurrencyLimit), |
|
| 28 |
+func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadManager)) *LayerUploadManager {
|
|
| 29 |
+ manager := LayerUploadManager{
|
|
| 30 |
+ tm: NewTransferManager(concurrencyLimit), |
|
| 31 |
+ waitDuration: time.Second, |
|
| 31 | 32 |
} |
| 33 |
+ for _, option := range options {
|
|
| 34 |
+ option(&manager) |
|
| 35 |
+ } |
|
| 36 |
+ return &manager |
|
| 32 | 37 |
} |
| 33 | 38 |
|
| 34 | 39 |
type uploadTransfer struct {
|
| ... | ... |
@@ -142,7 +148,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun |
| 142 | 142 |
|
| 143 | 143 |
logrus.Errorf("Upload failed, retrying: %v", err)
|
| 144 | 144 |
delay := retries * 5 |
| 145 |
- ticker := time.NewTicker(time.Second) |
|
| 145 |
+ ticker := time.NewTicker(lum.waitDuration) |
|
| 146 | 146 |
|
| 147 | 147 |
selectLoop: |
| 148 | 148 |
for {
|
| ... | ... |
@@ -79,7 +79,7 @@ func uploadDescriptors(currentUploads *int32) []UploadDescriptor {
|
| 79 | 79 |
} |
| 80 | 80 |
|
| 81 | 81 |
func TestSuccessfulUpload(t *testing.T) {
|
| 82 |
- lum := NewLayerUploadManager(maxUploadConcurrency) |
|
| 82 |
+ lum := NewLayerUploadManager(maxUploadConcurrency, func(m *LayerUploadManager) { m.waitDuration = time.Millisecond })
|
|
| 83 | 83 |
|
| 84 | 84 |
progressChan := make(chan progress.Progress) |
| 85 | 85 |
progressDone := make(chan struct{})
|
| ... | ... |
@@ -105,7 +105,7 @@ func TestSuccessfulUpload(t *testing.T) {
|
| 105 | 105 |
} |
| 106 | 106 |
|
| 107 | 107 |
func TestCancelledUpload(t *testing.T) {
|
| 108 |
- lum := NewLayerUploadManager(maxUploadConcurrency) |
|
| 108 |
+ lum := NewLayerUploadManager(maxUploadConcurrency, func(m *LayerUploadManager) { m.waitDuration = time.Millisecond })
|
|
| 109 | 109 |
|
| 110 | 110 |
progressChan := make(chan progress.Progress) |
| 111 | 111 |
progressDone := make(chan struct{})
|
| ... | ... |
@@ -17,19 +17,20 @@ type resumableRequestReader struct {
|
| 17 | 17 |
currentResponse *http.Response |
| 18 | 18 |
failures uint32 |
| 19 | 19 |
maxFailures uint32 |
| 20 |
+ waitDuration time.Duration |
|
| 20 | 21 |
} |
| 21 | 22 |
|
| 22 | 23 |
// ResumableRequestReader makes it possible to resume reading a request's body transparently |
| 23 | 24 |
// maxfail is the number of times we retry to make requests again (not resumes) |
| 24 | 25 |
// totalsize is the total length of the body; auto detect if not provided |
| 25 | 26 |
func ResumableRequestReader(c *http.Client, r *http.Request, maxfail uint32, totalsize int64) io.ReadCloser {
|
| 26 |
- return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize}
|
|
| 27 |
+ return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, waitDuration: 5 * time.Second}
|
|
| 27 | 28 |
} |
| 28 | 29 |
|
| 29 | 30 |
// ResumableRequestReaderWithInitialResponse makes it possible to resume |
| 30 | 31 |
// reading the body of an already initiated request. |
| 31 | 32 |
func ResumableRequestReaderWithInitialResponse(c *http.Client, r *http.Request, maxfail uint32, totalsize int64, initialResponse *http.Response) io.ReadCloser {
|
| 32 |
- return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse}
|
|
| 33 |
+ return &resumableRequestReader{client: c, request: r, maxFailures: maxfail, totalSize: totalsize, currentResponse: initialResponse, waitDuration: 5 * time.Second}
|
|
| 33 | 34 |
} |
| 34 | 35 |
|
| 35 | 36 |
func (r *resumableRequestReader) Read(p []byte) (n int, err error) {
|
| ... | ... |
@@ -40,7 +41,7 @@ func (r *resumableRequestReader) Read(p []byte) (n int, err error) {
|
| 40 | 40 |
if r.lastRange != 0 && r.currentResponse == nil {
|
| 41 | 41 |
readRange := fmt.Sprintf("bytes=%d-%d", r.lastRange, r.totalSize)
|
| 42 | 42 |
r.request.Header.Set("Range", readRange)
|
| 43 |
- time.Sleep(5 * time.Second) |
|
| 43 |
+ time.Sleep(r.waitDuration) |
|
| 44 | 44 |
} |
| 45 | 45 |
if r.currentResponse == nil {
|
| 46 | 46 |
r.currentResponse, err = r.client.Do(r.request) |
| ... | ... |
@@ -49,7 +50,7 @@ func (r *resumableRequestReader) Read(p []byte) (n int, err error) {
|
| 49 | 49 |
if err != nil && r.failures+1 != r.maxFailures {
|
| 50 | 50 |
r.cleanUpResponse() |
| 51 | 51 |
r.failures++ |
| 52 |
- time.Sleep(5 * time.Duration(r.failures) * time.Second) |
|
| 52 |
+ time.Sleep(time.Duration(r.failures) * r.waitDuration) |
|
| 53 | 53 |
return 0, nil |
| 54 | 54 |
} else if err != nil {
|
| 55 | 55 |
r.cleanUpResponse() |
| ... | ... |
@@ -8,6 +8,7 @@ import ( |
| 8 | 8 |
"net/http/httptest" |
| 9 | 9 |
"strings" |
| 10 | 10 |
"testing" |
| 11 |
+ "time" |
|
| 11 | 12 |
) |
| 12 | 13 |
|
| 13 | 14 |
func TestResumableRequestHeaderSimpleErrors(t *testing.T) {
|
| ... | ... |
@@ -55,10 +56,11 @@ func TestResumableRequestHeaderNotTooMuchFailures(t *testing.T) {
|
| 55 | 55 |
} |
| 56 | 56 |
|
| 57 | 57 |
resreq := &resumableRequestReader{
|
| 58 |
- client: client, |
|
| 59 |
- request: badReq, |
|
| 60 |
- failures: 0, |
|
| 61 |
- maxFailures: 2, |
|
| 58 |
+ client: client, |
|
| 59 |
+ request: badReq, |
|
| 60 |
+ failures: 0, |
|
| 61 |
+ maxFailures: 2, |
|
| 62 |
+ waitDuration: 10 * time.Millisecond, |
|
| 62 | 63 |
} |
| 63 | 64 |
read, err := resreq.Read([]byte{})
|
| 64 | 65 |
if err != nil || read != 0 {
|
| ... | ... |
@@ -234,7 +234,7 @@ func TestConsumeWithSpeed(t *testing.T) {
|
| 234 | 234 |
reader := strings.NewReader("1234567890")
|
| 235 | 235 |
chunksize := 2 |
| 236 | 236 |
|
| 237 |
- bytes1, err := ConsumeWithSpeed(reader, chunksize, 1*time.Second, nil) |
|
| 237 |
+ bytes1, err := ConsumeWithSpeed(reader, chunksize, 10*time.Millisecond, nil) |
|
| 238 | 238 |
if err != nil {
|
| 239 | 239 |
t.Fatal(err) |
| 240 | 240 |
} |