Browse code

Docker pull/push with max concurrency limits.

This fix tries to address issues raised in #20936 and #22443
where `docker pull` or `docker push` fails because of the
concurrent connection failing.
Currently, the number of maximum concurrent connections is
controlled by `maxDownloadConcurrency` and `maxUploadConcurrency`
which are hardcoded to 3 and 5 respectively. Therefore, in
situations where network connections don't support multiple
downloads/uploads, failures may encounter for `docker push`
or `docker pull`.

This fix tries changes `maxDownloadConcurrency` and
`maxUploadConcurrency` to adjustable by passing
`--max-concurrent-uploads` and `--max-concurrent-downloads` to
`docker daemon` command.

The documentation related to docker daemon has been updated.

Additional test case have been added to cover the changes in this fix.

This fix fixes #20936. This fix fixes #22443.

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

Yong Tang authored on 2016/05/06 13:45:55
Showing 8 changed files
... ...
@@ -18,6 +18,17 @@ import (
18 18
 )
19 19
 
20 20
 const (
21
+	// defaultMaxConcurrentDownloads is the default value for
22
+	// maximum number of downloads that
23
+	// may take place at a time for each pull.
24
+	defaultMaxConcurrentDownloads = 3
25
+	// defaultMaxConcurrentUploads is the default value for
26
+	// maximum number of uploads that
27
+	// may take place at a time for each push.
28
+	defaultMaxConcurrentUploads = 5
29
+)
30
+
31
+const (
21 32
 	defaultNetworkMtu    = 1500
22 33
 	disableNetworkBridge = "none"
23 34
 )
... ...
@@ -94,6 +105,14 @@ type CommonConfig struct {
94 94
 	// reachable by other hosts.
95 95
 	ClusterAdvertise string `json:"cluster-advertise,omitempty"`
96 96
 
97
+	// MaxConcurrentDownloads is the maximum number of downloads that
98
+	// may take place at a time for each pull.
99
+	MaxConcurrentDownloads *int `json:"max-concurrent-downloads,omitempty"`
100
+
101
+	// MaxConcurrentUploads is the maximum number of uploads that
102
+	// may take place at a time for each push.
103
+	MaxConcurrentUploads *int `json:"max-concurrent-uploads,omitempty"`
104
+
97 105
 	Debug     bool     `json:"debug,omitempty"`
98 106
 	Hosts     []string `json:"hosts,omitempty"`
99 107
 	LogLevel  string   `json:"log-level,omitempty"`
... ...
@@ -116,6 +135,8 @@ type CommonConfig struct {
116 116
 // Subsequent calls to `flag.Parse` will populate config with values parsed
117 117
 // from the command-line.
118 118
 func (config *Config) InstallCommonFlags(cmd *flag.FlagSet, usageFn func(string) string) {
119
+	var maxConcurrentDownloads, maxConcurrentUploads int
120
+
119 121
 	config.ServiceOptions.InstallCliFlags(cmd, usageFn)
120 122
 
121 123
 	cmd.Var(opts.NewNamedListOptsRef("storage-opts", &config.GraphOptions, nil), []string{"-storage-opt"}, usageFn("Set storage driver options"))
... ...
@@ -138,6 +159,11 @@ func (config *Config) InstallCommonFlags(cmd *flag.FlagSet, usageFn func(string)
138 138
 	cmd.StringVar(&config.ClusterStore, []string{"-cluster-store"}, "", usageFn("Set the cluster store"))
139 139
 	cmd.Var(opts.NewNamedMapOpts("cluster-store-opts", config.ClusterOpts, nil), []string{"-cluster-store-opt"}, usageFn("Set cluster store options"))
140 140
 	cmd.StringVar(&config.CorsHeaders, []string{"-api-cors-header"}, "", usageFn("Set CORS headers in the remote API"))
141
+	cmd.IntVar(&maxConcurrentDownloads, []string{"-max-concurrent-downloads"}, defaultMaxConcurrentDownloads, usageFn("Set the max concurrent downloads for each pull"))
142
+	cmd.IntVar(&maxConcurrentUploads, []string{"-max-concurrent-uploads"}, defaultMaxConcurrentUploads, usageFn("Set the max concurrent uploads for each push"))
143
+
144
+	config.MaxConcurrentDownloads = &maxConcurrentDownloads
145
+	config.MaxConcurrentUploads = &maxConcurrentUploads
141 146
 }
142 147
 
143 148
 // IsValueSet returns true if a configuration value
... ...
@@ -355,7 +381,8 @@ func findConfigurationConflicts(config map[string]interface{}, flags *flag.FlagS
355 355
 }
356 356
 
357 357
 // validateConfiguration validates some specific configs.
358
-// such as config.DNS, config.Labels, config.DNSSearch
358
+// such as config.DNS, config.Labels, config.DNSSearch,
359
+// as well as config.MaxConcurrentDownloads, config.MaxConcurrentUploads.
359 360
 func validateConfiguration(config *Config) error {
360 361
 	// validate DNS
361 362
 	for _, dns := range config.DNS {
... ...
@@ -378,5 +405,14 @@ func validateConfiguration(config *Config) error {
378 378
 		}
379 379
 	}
380 380
 
381
+	// validate MaxConcurrentDownloads
382
+	if config.IsValueSet("max-concurrent-downloads") && config.MaxConcurrentDownloads != nil && *config.MaxConcurrentDownloads < 0 {
383
+		return fmt.Errorf("invalid max concurrent downloads: %d", *config.MaxConcurrentDownloads)
384
+	}
385
+
386
+	// validate MaxConcurrentUploads
387
+	if config.IsValueSet("max-concurrent-uploads") && config.MaxConcurrentUploads != nil && *config.MaxConcurrentUploads < 0 {
388
+		return fmt.Errorf("invalid max concurrent uploads: %d", *config.MaxConcurrentUploads)
389
+	}
381 390
 	return nil
382 391
 }
... ...
@@ -71,15 +71,6 @@ import (
71 71
 	"golang.org/x/net/context"
72 72
 )
73 73
 
74
-const (
75
-	// maxDownloadConcurrency is the maximum number of downloads that
76
-	// may take place at a time for each pull.
77
-	maxDownloadConcurrency = 3
78
-	// maxUploadConcurrency is the maximum number of uploads that
79
-	// may take place at a time for each push.
80
-	maxUploadConcurrency = 5
81
-)
82
-
83 74
 var (
84 75
 	validContainerNameChars   = utils.RestrictedNameChars
85 76
 	validContainerNamePattern = utils.RestrictedNamePattern
... ...
@@ -719,8 +710,10 @@ func NewDaemon(config *Config, registryService *registry.Service, containerdRemo
719 719
 		return nil, err
720 720
 	}
721 721
 
722
-	d.downloadManager = xfer.NewLayerDownloadManager(d.layerStore, maxDownloadConcurrency)
723
-	d.uploadManager = xfer.NewLayerUploadManager(maxUploadConcurrency)
722
+	logrus.Debugf("Max Concurrent Downloads: %d", *config.MaxConcurrentDownloads)
723
+	d.downloadManager = xfer.NewLayerDownloadManager(d.layerStore, *config.MaxConcurrentDownloads)
724
+	logrus.Debugf("Max Concurrent Uploads: %d", *config.MaxConcurrentUploads)
725
+	d.uploadManager = xfer.NewLayerUploadManager(*config.MaxConcurrentUploads)
724 726
 
725 727
 	ifs, err := image.NewFSStoreBackend(filepath.Join(imageRoot, "imagedb"))
726 728
 	if err != nil {
... ...
@@ -1510,6 +1503,8 @@ func (daemon *Daemon) initDiscovery(config *Config) error {
1510 1510
 // These are the settings that Reload changes:
1511 1511
 // - Daemon labels.
1512 1512
 // - Daemon debug log level.
1513
+// - Daemon max concurrent downloads
1514
+// - Daemon max concurrent uploads
1513 1515
 // - Cluster discovery (reconfigure and restart).
1514 1516
 func (daemon *Daemon) Reload(config *Config) error {
1515 1517
 	daemon.configStore.reloadLock.Lock()
... ...
@@ -1520,6 +1515,33 @@ func (daemon *Daemon) Reload(config *Config) error {
1520 1520
 	if config.IsValueSet("debug") {
1521 1521
 		daemon.configStore.Debug = config.Debug
1522 1522
 	}
1523
+
1524
+	// If no value is set for max-concurrent-downloads we assume it is the default value
1525
+	// We always "reset" as the cost is lightweight and easy to maintain.
1526
+	if config.IsValueSet("max-concurrent-downloads") && config.MaxConcurrentDownloads != nil {
1527
+		*daemon.configStore.MaxConcurrentDownloads = *config.MaxConcurrentDownloads
1528
+	} else {
1529
+		maxConcurrentDownloads := defaultMaxConcurrentDownloads
1530
+		daemon.configStore.MaxConcurrentDownloads = &maxConcurrentDownloads
1531
+	}
1532
+	logrus.Debugf("Reset Max Concurrent Downloads: %d", *daemon.configStore.MaxConcurrentDownloads)
1533
+	if daemon.downloadManager != nil {
1534
+		daemon.downloadManager.SetConcurrency(*daemon.configStore.MaxConcurrentDownloads)
1535
+	}
1536
+
1537
+	// If no value is set for max-concurrent-upload we assume it is the default value
1538
+	// We always "reset" as the cost is lightweight and easy to maintain.
1539
+	if config.IsValueSet("max-concurrent-uploads") && config.MaxConcurrentUploads != nil {
1540
+		*daemon.configStore.MaxConcurrentUploads = *config.MaxConcurrentUploads
1541
+	} else {
1542
+		maxConcurrentUploads := defaultMaxConcurrentUploads
1543
+		daemon.configStore.MaxConcurrentUploads = &maxConcurrentUploads
1544
+	}
1545
+	logrus.Debugf("Reset Max Concurrent Uploads: %d", *daemon.configStore.MaxConcurrentUploads)
1546
+	if daemon.uploadManager != nil {
1547
+		daemon.uploadManager.SetConcurrency(*daemon.configStore.MaxConcurrentUploads)
1548
+	}
1549
+
1523 1550
 	return daemon.reloadClusterDiscovery(config)
1524 1551
 }
1525 1552
 
... ...
@@ -25,6 +25,11 @@ type LayerDownloadManager struct {
25 25
 	tm         TransferManager
26 26
 }
27 27
 
28
+// SetConcurrency set the max concurrent downloads for each pull
29
+func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
30
+	ldm.tm.SetConcurrency(concurrency)
31
+}
32
+
28 33
 // NewLayerDownloadManager returns a new LayerDownloadManager.
29 34
 func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int) *LayerDownloadManager {
30 35
 	return &LayerDownloadManager{
... ...
@@ -279,6 +279,8 @@ type TransferManager interface {
279 279
 	// so, it returns progress and error output from that transfer.
280 280
 	// Otherwise, it will call xferFunc to initiate the transfer.
281 281
 	Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher)
282
+	// SetConcurrency set the concurrencyLimit so that it is adjustable daemon reload
283
+	SetConcurrency(concurrency int)
282 284
 }
283 285
 
284 286
 type transferManager struct {
... ...
@@ -298,6 +300,13 @@ func NewTransferManager(concurrencyLimit int) TransferManager {
298 298
 	}
299 299
 }
300 300
 
301
+// SetConcurrency set the concurrencyLimit
302
+func (tm *transferManager) SetConcurrency(concurrency int) {
303
+	tm.mu.Lock()
304
+	tm.concurrencyLimit = concurrency
305
+	tm.mu.Unlock()
306
+}
307
+
301 308
 // Transfer checks if a transfer matching the given key is in progress. If not,
302 309
 // it starts one by calling xferFunc. The caller supplies a channel which
303 310
 // receives progress output from the transfer.
... ...
@@ -337,7 +346,7 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput
337 337
 	start := make(chan struct{})
338 338
 	inactive := make(chan struct{})
339 339
 
340
-	if tm.activeTransfers < tm.concurrencyLimit {
340
+	if tm.concurrencyLimit == 0 || tm.activeTransfers < tm.concurrencyLimit {
341 341
 		close(start)
342 342
 		tm.activeTransfers++
343 343
 	} else {
... ...
@@ -19,6 +19,11 @@ type LayerUploadManager struct {
19 19
 	tm TransferManager
20 20
 }
21 21
 
22
+// SetConcurrency set the max concurrent uploads for each push
23
+func (lum *LayerUploadManager) SetConcurrency(concurrency int) {
24
+	lum.tm.SetConcurrency(concurrency)
25
+}
26
+
22 27
 // NewLayerUploadManager returns a new LayerUploadManager.
23 28
 func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager {
24 29
 	return &LayerUploadManager{
... ...
@@ -54,6 +54,8 @@ weight = -1
54 54
       --log-driver="json-file"               Default driver for container logs
55 55
       --log-opt=[]                           Log driver specific options
56 56
       --mtu=0                                Set the containers network MTU
57
+      --max-concurrent-downloads=3           Set the max concurrent downloads for each pull
58
+      --max-concurrent-uploads=5             Set the max concurrent uploads for each push
57 59
       --disable-legacy-registry              Do not contact legacy registries
58 60
       -p, --pidfile="/var/run/docker.pid"    Path to use for daemon PID file
59 61
       --raw-logs                             Full timestamps without ANSI coloring
... ...
@@ -913,6 +915,8 @@ This is a full example of the allowed configuration options in the file:
913 913
 	"cluster-store": "",
914 914
 	"cluster-store-opts": [],
915 915
 	"cluster-advertise": "",
916
+	"max-concurrent-downloads": 3,
917
+	"max-concurrent-uploads": 5,
916 918
 	"debug": true,
917 919
 	"hosts": [],
918 920
 	"log-level": "",
... ...
@@ -963,6 +967,8 @@ The list of currently supported options that can be reconfigured is this:
963 963
 - `cluster-store-opts`: it uses the new options to reload the discovery store.
964 964
 - `cluster-advertise`: it modifies the address advertised after reloading.
965 965
 - `labels`: it replaces the daemon labels with a new set of labels.
966
+- `max-concurrent-downloads`: it updates the max concurrent downloads for each pull.
967
+- `max-concurrent-uploads`: it updates the max concurrent uploads for each push.
966 968
 
967 969
 Updating and reloading the cluster configurations such as `--cluster-store`,
968 970
 `--cluster-advertise` and `--cluster-store-opts` will take effect only if
... ...
@@ -2246,3 +2246,106 @@ func (s *DockerDaemonSuite) TestDaemonLogOptions(c *check.C) {
2246 2246
 	c.Assert(err, check.IsNil, check.Commentf(out))
2247 2247
 	c.Assert(out, checker.Contains, "{json-file map[]}")
2248 2248
 }
2249
+
2250
+// Test case for #20936, #22443
2251
+func (s *DockerDaemonSuite) TestDaemonMaxConcurrency(c *check.C) {
2252
+	c.Assert(s.d.Start("--max-concurrent-uploads=6", "--max-concurrent-downloads=8"), check.IsNil)
2253
+
2254
+	expectedMaxConcurrentUploads := `level=debug msg="Max Concurrent Uploads: 6"`
2255
+	expectedMaxConcurrentDownloads := `level=debug msg="Max Concurrent Downloads: 8"`
2256
+	content, _ := ioutil.ReadFile(s.d.logFile.Name())
2257
+	c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads)
2258
+	c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads)
2259
+}
2260
+
2261
+// Test case for #20936, #22443
2262
+func (s *DockerDaemonSuite) TestDaemonMaxConcurrencyWithConfigFile(c *check.C) {
2263
+	testRequires(c, SameHostDaemon, DaemonIsLinux)
2264
+
2265
+	// daemon config file
2266
+	configFilePath := "test.json"
2267
+	configFile, err := os.Create(configFilePath)
2268
+	c.Assert(err, checker.IsNil)
2269
+	defer os.Remove(configFilePath)
2270
+
2271
+	daemonConfig := `{ "max-concurrent-downloads" : 8 }`
2272
+	fmt.Fprintf(configFile, "%s", daemonConfig)
2273
+	configFile.Close()
2274
+	c.Assert(s.d.Start(fmt.Sprintf("--config-file=%s", configFilePath)), check.IsNil)
2275
+
2276
+	expectedMaxConcurrentUploads := `level=debug msg="Max Concurrent Uploads: 5"`
2277
+	expectedMaxConcurrentDownloads := `level=debug msg="Max Concurrent Downloads: 8"`
2278
+	content, _ := ioutil.ReadFile(s.d.logFile.Name())
2279
+	c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads)
2280
+	c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads)
2281
+
2282
+	configFile, err = os.Create(configFilePath)
2283
+	c.Assert(err, checker.IsNil)
2284
+	daemonConfig = `{ "max-concurrent-uploads" : 7, "max-concurrent-downloads" : 9 }`
2285
+	fmt.Fprintf(configFile, "%s", daemonConfig)
2286
+	configFile.Close()
2287
+
2288
+	syscall.Kill(s.d.cmd.Process.Pid, syscall.SIGHUP)
2289
+
2290
+	time.Sleep(3 * time.Second)
2291
+
2292
+	expectedMaxConcurrentUploads = `level=debug msg="Reset Max Concurrent Uploads: 7"`
2293
+	expectedMaxConcurrentDownloads = `level=debug msg="Reset Max Concurrent Downloads: 9"`
2294
+	content, _ = ioutil.ReadFile(s.d.logFile.Name())
2295
+	c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads)
2296
+	c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads)
2297
+}
2298
+
2299
+// Test case for #20936, #22443
2300
+func (s *DockerDaemonSuite) TestDaemonMaxConcurrencyWithConfigFileReload(c *check.C) {
2301
+	testRequires(c, SameHostDaemon, DaemonIsLinux)
2302
+
2303
+	// daemon config file
2304
+	configFilePath := "test.json"
2305
+	configFile, err := os.Create(configFilePath)
2306
+	c.Assert(err, checker.IsNil)
2307
+	defer os.Remove(configFilePath)
2308
+
2309
+	daemonConfig := `{ "max-concurrent-uploads" : null }`
2310
+	fmt.Fprintf(configFile, "%s", daemonConfig)
2311
+	configFile.Close()
2312
+	c.Assert(s.d.Start(fmt.Sprintf("--config-file=%s", configFilePath)), check.IsNil)
2313
+
2314
+	expectedMaxConcurrentUploads := `level=debug msg="Max Concurrent Uploads: 5"`
2315
+	expectedMaxConcurrentDownloads := `level=debug msg="Max Concurrent Downloads: 3"`
2316
+	content, _ := ioutil.ReadFile(s.d.logFile.Name())
2317
+	c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads)
2318
+	c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads)
2319
+
2320
+	configFile, err = os.Create(configFilePath)
2321
+	c.Assert(err, checker.IsNil)
2322
+	daemonConfig = `{ "max-concurrent-uploads" : 1, "max-concurrent-downloads" : null }`
2323
+	fmt.Fprintf(configFile, "%s", daemonConfig)
2324
+	configFile.Close()
2325
+
2326
+	syscall.Kill(s.d.cmd.Process.Pid, syscall.SIGHUP)
2327
+
2328
+	time.Sleep(3 * time.Second)
2329
+
2330
+	expectedMaxConcurrentUploads = `level=debug msg="Reset Max Concurrent Uploads: 1"`
2331
+	expectedMaxConcurrentDownloads = `level=debug msg="Reset Max Concurrent Downloads: 3"`
2332
+	content, _ = ioutil.ReadFile(s.d.logFile.Name())
2333
+	c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads)
2334
+	c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads)
2335
+
2336
+	configFile, err = os.Create(configFilePath)
2337
+	c.Assert(err, checker.IsNil)
2338
+	daemonConfig = `{ "labels":["foo=bar"] }`
2339
+	fmt.Fprintf(configFile, "%s", daemonConfig)
2340
+	configFile.Close()
2341
+
2342
+	syscall.Kill(s.d.cmd.Process.Pid, syscall.SIGHUP)
2343
+
2344
+	time.Sleep(3 * time.Second)
2345
+
2346
+	expectedMaxConcurrentUploads = `level=debug msg="Reset Max Concurrent Uploads: 5"`
2347
+	expectedMaxConcurrentDownloads = `level=debug msg="Reset Max Concurrent Downloads: 3"`
2348
+	content, _ = ioutil.ReadFile(s.d.logFile.Name())
2349
+	c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads)
2350
+	c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads)
2351
+}
... ...
@@ -44,6 +44,8 @@ dockerd - Enable daemon mode
44 44
 [**--log-driver**[=*json-file*]]
45 45
 [**--log-opt**[=*map[]*]]
46 46
 [**--mtu**[=*0*]]
47
+[**--max-concurrent-downloads**[=*3*]]
48
+[**--max-concurrent-uploads**[=*5*]]
47 49
 [**-p**|**--pidfile**[=*/var/run/docker.pid*]]
48 50
 [**--raw-logs**]
49 51
 [**--registry-mirror**[=*[]*]]
... ...
@@ -197,6 +199,12 @@ unix://[/path/to/socket] to use.
197 197
 **--mtu**=*0*
198 198
   Set the containers network mtu. Default is `0`.
199 199
 
200
+**--max-concurrent-downloads**=*3*
201
+  Set the max concurrent downloads for each pull. Default is `3`.
202
+
203
+**--max-concurrent-uploads**=*5*
204
+  Set the max concurrent uploads for each push. Default is `5`.
205
+
200 206
 **-p**, **--pidfile**=""
201 207
   Path to use for daemon PID file. Default is `/var/run/docker.pid`
202 208