Add timeouts for volume plugin ops
| ... | ... |
@@ -2,6 +2,7 @@ package plugins |
| 2 | 2 |
|
| 3 | 3 |
import ( |
| 4 | 4 |
"bytes" |
| 5 |
+ "context" |
|
| 5 | 6 |
"encoding/json" |
| 6 | 7 |
"io" |
| 7 | 8 |
"io/ioutil" |
| ... | ... |
@@ -9,6 +10,7 @@ import ( |
| 9 | 9 |
"net/url" |
| 10 | 10 |
"time" |
| 11 | 11 |
|
| 12 |
+ "github.com/docker/docker/pkg/ioutils" |
|
| 12 | 13 |
"github.com/docker/docker/pkg/plugins/transport" |
| 13 | 14 |
"github.com/docker/go-connections/sockets" |
| 14 | 15 |
"github.com/docker/go-connections/tlsconfig" |
| ... | ... |
@@ -82,16 +84,33 @@ type Client struct {
|
| 82 | 82 |
requestFactory transport.RequestFactory |
| 83 | 83 |
} |
| 84 | 84 |
|
| 85 |
+// RequestOpts is the set of options that can be passed into a request |
|
| 86 |
+type RequestOpts struct {
|
|
| 87 |
+ Timeout time.Duration |
|
| 88 |
+} |
|
| 89 |
+ |
|
| 90 |
+// WithRequestTimeout sets a timeout duration for plugin requests |
|
| 91 |
+func WithRequestTimeout(t time.Duration) func(*RequestOpts) {
|
|
| 92 |
+ return func(o *RequestOpts) {
|
|
| 93 |
+ o.Timeout = t |
|
| 94 |
+ } |
|
| 95 |
+} |
|
| 96 |
+ |
|
| 85 | 97 |
// Call calls the specified method with the specified arguments for the plugin. |
| 86 | 98 |
// It will retry for 30 seconds if a failure occurs when calling. |
| 87 |
-func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) error {
|
|
| 99 |
+func (c *Client) Call(serviceMethod string, args, ret interface{}) error {
|
|
| 100 |
+ return c.CallWithOptions(serviceMethod, args, ret) |
|
| 101 |
+} |
|
| 102 |
+ |
|
| 103 |
+// CallWithOptions is just like call except it takes options |
|
| 104 |
+func (c *Client) CallWithOptions(serviceMethod string, args interface{}, ret interface{}, opts ...func(*RequestOpts)) error {
|
|
| 88 | 105 |
var buf bytes.Buffer |
| 89 | 106 |
if args != nil {
|
| 90 | 107 |
if err := json.NewEncoder(&buf).Encode(args); err != nil {
|
| 91 | 108 |
return err |
| 92 | 109 |
} |
| 93 | 110 |
} |
| 94 |
- body, err := c.callWithRetry(serviceMethod, &buf, true) |
|
| 111 |
+ body, err := c.callWithRetry(serviceMethod, &buf, true, opts...) |
|
| 95 | 112 |
if err != nil {
|
| 96 | 113 |
return err |
| 97 | 114 |
} |
| ... | ... |
@@ -128,18 +147,31 @@ func (c *Client) SendFile(serviceMethod string, data io.Reader, ret interface{})
|
| 128 | 128 |
return nil |
| 129 | 129 |
} |
| 130 | 130 |
|
| 131 |
-func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) (io.ReadCloser, error) {
|
|
| 131 |
+func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool, reqOpts ...func(*RequestOpts)) (io.ReadCloser, error) {
|
|
| 132 | 132 |
var retries int |
| 133 | 133 |
start := time.Now() |
| 134 | 134 |
|
| 135 |
+ var opts RequestOpts |
|
| 136 |
+ for _, o := range reqOpts {
|
|
| 137 |
+ o(&opts) |
|
| 138 |
+ } |
|
| 139 |
+ |
|
| 135 | 140 |
for {
|
| 136 | 141 |
req, err := c.requestFactory.NewRequest(serviceMethod, data) |
| 137 | 142 |
if err != nil {
|
| 138 | 143 |
return nil, err |
| 139 | 144 |
} |
| 140 | 145 |
|
| 146 |
+ cancelRequest := func() {}
|
|
| 147 |
+ if opts.Timeout > 0 {
|
|
| 148 |
+ var ctx context.Context |
|
| 149 |
+ ctx, cancelRequest = context.WithTimeout(req.Context(), opts.Timeout) |
|
| 150 |
+ req = req.WithContext(ctx) |
|
| 151 |
+ } |
|
| 152 |
+ |
|
| 141 | 153 |
resp, err := c.http.Do(req) |
| 142 | 154 |
if err != nil {
|
| 155 |
+ cancelRequest() |
|
| 143 | 156 |
if !retry {
|
| 144 | 157 |
return nil, err |
| 145 | 158 |
} |
| ... | ... |
@@ -157,6 +189,7 @@ func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) |
| 157 | 157 |
if resp.StatusCode != http.StatusOK {
|
| 158 | 158 |
b, err := ioutil.ReadAll(resp.Body) |
| 159 | 159 |
resp.Body.Close() |
| 160 |
+ cancelRequest() |
|
| 160 | 161 |
if err != nil {
|
| 161 | 162 |
return nil, &statusError{resp.StatusCode, serviceMethod, err.Error()}
|
| 162 | 163 |
} |
| ... | ... |
@@ -176,7 +209,11 @@ func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) |
| 176 | 176 |
// old way... |
| 177 | 177 |
return nil, &statusError{resp.StatusCode, serviceMethod, string(b)}
|
| 178 | 178 |
} |
| 179 |
- return resp.Body, nil |
|
| 179 |
+ return ioutils.NewReadCloserWrapper(resp.Body, func() error {
|
|
| 180 |
+ err := resp.Body.Close() |
|
| 181 |
+ cancelRequest() |
|
| 182 |
+ return err |
|
| 183 |
+ }), nil |
|
| 180 | 184 |
} |
| 181 | 185 |
} |
| 182 | 186 |
|
| ... | ... |
@@ -4,12 +4,19 @@ package volumedrivers |
| 4 | 4 |
|
| 5 | 5 |
import ( |
| 6 | 6 |
"errors" |
| 7 |
+ "time" |
|
| 7 | 8 |
|
| 9 |
+ "github.com/docker/docker/pkg/plugins" |
|
| 8 | 10 |
"github.com/docker/docker/volume" |
| 9 | 11 |
) |
| 10 | 12 |
|
| 13 |
+const ( |
|
| 14 |
+ longTimeout = 2 * time.Minute |
|
| 15 |
+ shortTimeout = 1 * time.Minute |
|
| 16 |
+) |
|
| 17 |
+ |
|
| 11 | 18 |
type client interface {
|
| 12 |
- Call(string, interface{}, interface{}) error
|
|
| 19 |
+ CallWithOptions(string, interface{}, interface{}, ...func(*plugins.RequestOpts)) error
|
|
| 13 | 20 |
} |
| 14 | 21 |
|
| 15 | 22 |
type volumeDriverProxy struct {
|
| ... | ... |
@@ -33,7 +40,8 @@ func (pp *volumeDriverProxy) Create(name string, opts map[string]string) (err er |
| 33 | 33 |
|
| 34 | 34 |
req.Name = name |
| 35 | 35 |
req.Opts = opts |
| 36 |
- if err = pp.Call("VolumeDriver.Create", req, &ret); err != nil {
|
|
| 36 |
+ |
|
| 37 |
+ if err = pp.CallWithOptions("VolumeDriver.Create", req, &ret, plugins.WithRequestTimeout(longTimeout)); err != nil {
|
|
| 37 | 38 |
return |
| 38 | 39 |
} |
| 39 | 40 |
|
| ... | ... |
@@ -59,7 +67,8 @@ func (pp *volumeDriverProxy) Remove(name string) (err error) {
|
| 59 | 59 |
) |
| 60 | 60 |
|
| 61 | 61 |
req.Name = name |
| 62 |
- if err = pp.Call("VolumeDriver.Remove", req, &ret); err != nil {
|
|
| 62 |
+ |
|
| 63 |
+ if err = pp.CallWithOptions("VolumeDriver.Remove", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil {
|
|
| 63 | 64 |
return |
| 64 | 65 |
} |
| 65 | 66 |
|
| ... | ... |
@@ -86,7 +95,8 @@ func (pp *volumeDriverProxy) Path(name string) (mountpoint string, err error) {
|
| 86 | 86 |
) |
| 87 | 87 |
|
| 88 | 88 |
req.Name = name |
| 89 |
- if err = pp.Call("VolumeDriver.Path", req, &ret); err != nil {
|
|
| 89 |
+ |
|
| 90 |
+ if err = pp.CallWithOptions("VolumeDriver.Path", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil {
|
|
| 90 | 91 |
return |
| 91 | 92 |
} |
| 92 | 93 |
|
| ... | ... |
@@ -117,7 +127,8 @@ func (pp *volumeDriverProxy) Mount(name string, id string) (mountpoint string, e |
| 117 | 117 |
|
| 118 | 118 |
req.Name = name |
| 119 | 119 |
req.ID = id |
| 120 |
- if err = pp.Call("VolumeDriver.Mount", req, &ret); err != nil {
|
|
| 120 |
+ |
|
| 121 |
+ if err = pp.CallWithOptions("VolumeDriver.Mount", req, &ret, plugins.WithRequestTimeout(longTimeout)); err != nil {
|
|
| 121 | 122 |
return |
| 122 | 123 |
} |
| 123 | 124 |
|
| ... | ... |
@@ -147,7 +158,8 @@ func (pp *volumeDriverProxy) Unmount(name string, id string) (err error) {
|
| 147 | 147 |
|
| 148 | 148 |
req.Name = name |
| 149 | 149 |
req.ID = id |
| 150 |
- if err = pp.Call("VolumeDriver.Unmount", req, &ret); err != nil {
|
|
| 150 |
+ |
|
| 151 |
+ if err = pp.CallWithOptions("VolumeDriver.Unmount", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil {
|
|
| 151 | 152 |
return |
| 152 | 153 |
} |
| 153 | 154 |
|
| ... | ... |
@@ -172,7 +184,7 @@ func (pp *volumeDriverProxy) List() (volumes []*proxyVolume, err error) {
|
| 172 | 172 |
ret volumeDriverProxyListResponse |
| 173 | 173 |
) |
| 174 | 174 |
|
| 175 |
- if err = pp.Call("VolumeDriver.List", req, &ret); err != nil {
|
|
| 175 |
+ if err = pp.CallWithOptions("VolumeDriver.List", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil {
|
|
| 176 | 176 |
return |
| 177 | 177 |
} |
| 178 | 178 |
|
| ... | ... |
@@ -201,7 +213,8 @@ func (pp *volumeDriverProxy) Get(name string) (volume *proxyVolume, err error) {
|
| 201 | 201 |
) |
| 202 | 202 |
|
| 203 | 203 |
req.Name = name |
| 204 |
- if err = pp.Call("VolumeDriver.Get", req, &ret); err != nil {
|
|
| 204 |
+ |
|
| 205 |
+ if err = pp.CallWithOptions("VolumeDriver.Get", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil {
|
|
| 205 | 206 |
return |
| 206 | 207 |
} |
| 207 | 208 |
|
| ... | ... |
@@ -228,7 +241,7 @@ func (pp *volumeDriverProxy) Capabilities() (capabilities volume.Capability, err |
| 228 | 228 |
ret volumeDriverProxyCapabilitiesResponse |
| 229 | 229 |
) |
| 230 | 230 |
|
| 231 |
- if err = pp.Call("VolumeDriver.Capabilities", req, &ret); err != nil {
|
|
| 231 |
+ if err = pp.CallWithOptions("VolumeDriver.Capabilities", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil {
|
|
| 232 | 232 |
return |
| 233 | 233 |
} |
| 234 | 234 |
|