Stop multiple containers in parallel to speed up stop process, allow
maximum 50 parallel stops.
Signed-off-by: Abhinav Dahiya <abhinavdtu2012@gmail.com>
Signed-off-by: Zhang Wei <zhangwei555@huawei.com>
| ... | ... |
@@ -39,11 +39,15 @@ func NewStopCommand(dockerCli *command.DockerCli) *cobra.Command {
|
| 39 | 39 |
|
| 40 | 40 |
func runStop(dockerCli *command.DockerCli, opts *stopOptions) error {
|
| 41 | 41 |
ctx := context.Background() |
| 42 |
+ timeout := time.Duration(opts.time) * time.Second |
|
| 42 | 43 |
|
| 43 | 44 |
var errs []string |
| 45 |
+ |
|
| 46 |
+ errChan := parallelOperation(ctx, opts.containers, func(ctx context.Context, id string) error {
|
|
| 47 |
+ return dockerCli.Client().ContainerStop(ctx, id, &timeout) |
|
| 48 |
+ }) |
|
| 44 | 49 |
for _, container := range opts.containers {
|
| 45 |
- timeout := time.Duration(opts.time) * time.Second |
|
| 46 |
- if err := dockerCli.Client().ContainerStop(ctx, container, &timeout); err != nil {
|
|
| 50 |
+ if err := <-errChan; err != nil {
|
|
| 47 | 51 |
errs = append(errs, err.Error()) |
| 48 | 52 |
} else {
|
| 49 | 53 |
fmt.Fprintf(dockerCli.Out(), "%s\n", container) |
| ... | ... |
@@ -90,3 +90,35 @@ func getExitCode(dockerCli *command.DockerCli, ctx context.Context, containerID |
| 90 | 90 |
} |
| 91 | 91 |
return c.State.Running, c.State.ExitCode, nil |
| 92 | 92 |
} |
| 93 |
+ |
|
| 94 |
+func parallelOperation(ctx context.Context, cids []string, op func(ctx context.Context, id string) error) chan error {
|
|
| 95 |
+ if len(cids) == 0 {
|
|
| 96 |
+ return nil |
|
| 97 |
+ } |
|
| 98 |
+ const defaultParallel int = 50 |
|
| 99 |
+ sem := make(chan struct{}, defaultParallel)
|
|
| 100 |
+ errChan := make(chan error) |
|
| 101 |
+ |
|
| 102 |
+ // make sure result is printed in correct order |
|
| 103 |
+ output := map[string]chan error{}
|
|
| 104 |
+ for _, c := range cids {
|
|
| 105 |
+ output[c] = make(chan error, 1) |
|
| 106 |
+ } |
|
| 107 |
+ go func() {
|
|
| 108 |
+ for _, c := range cids {
|
|
| 109 |
+ err := <-output[c] |
|
| 110 |
+ errChan <- err |
|
| 111 |
+ } |
|
| 112 |
+ }() |
|
| 113 |
+ |
|
| 114 |
+ go func() {
|
|
| 115 |
+ for _, c := range cids {
|
|
| 116 |
+ sem <- struct{}{} // Wait for active queue sem to drain.
|
|
| 117 |
+ go func(container string) {
|
|
| 118 |
+ output[container] <- op(ctx, container) |
|
| 119 |
+ <-sem |
|
| 120 |
+ }(c) |
|
| 121 |
+ } |
|
| 122 |
+ }() |
|
| 123 |
+ return errChan |
|
| 124 |
+} |