| ... | ... |
@@ -101,7 +101,7 @@ github.com/docker/containerd 4ab9917febca54791c5f071a9d1f404867857fcc |
| 101 | 101 |
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 |
| 102 | 102 |
|
| 103 | 103 |
# cluster |
| 104 |
-github.com/docker/swarmkit 17756457ad6dc4d8a639a1f0b7a85d1b65a617bb |
|
| 104 |
+github.com/docker/swarmkit e68072200ebbba6ce9745b3a3e49fdba3eb71ff8 |
|
| 105 | 105 |
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 |
| 106 | 106 |
github.com/gogo/protobuf v0.3 |
| 107 | 107 |
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a |
| ... | ... |
@@ -264,8 +264,8 @@ func (a *Agent) run(ctx context.Context) {
|
| 264 | 264 |
sessionq = a.sessionq |
| 265 | 265 |
case err := <-session.errs: |
| 266 | 266 |
// TODO(stevvooe): This may actually block if a session is closed |
| 267 |
- // but no error was sent. Session.close must only be called here |
|
| 268 |
- // for this to work. |
|
| 267 |
+ // but no error was sent. This must be the only place |
|
| 268 |
+ // session.close is called in response to errors, for this to work. |
|
| 269 | 269 |
if err != nil {
|
| 270 | 270 |
log.G(ctx).WithError(err).Error("agent: session failed")
|
| 271 | 271 |
backoff = initialSessionFailureBackoff + 2*backoff |
| ... | ... |
@@ -315,7 +315,11 @@ func (a *Agent) run(ctx context.Context) {
|
| 315 | 315 |
nodeDescription = newNodeDescription |
| 316 | 316 |
// close the session |
| 317 | 317 |
log.G(ctx).Info("agent: found node update")
|
| 318 |
- session.sendError(nil) |
|
| 318 |
+ if err := session.close(); err != nil {
|
|
| 319 |
+ log.G(ctx).WithError(err).Error("agent: closing session failed")
|
|
| 320 |
+ } |
|
| 321 |
+ sessionq = nil |
|
| 322 |
+ registered = nil |
|
| 319 | 323 |
} |
| 320 | 324 |
case <-a.stopped: |
| 321 | 325 |
// TODO(stevvooe): Wait on shutdown and cleanup. May need to pump |
| ... | ... |
@@ -165,7 +165,7 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit |
| 165 | 165 |
// responding properly (for example, it may have just been demoted). |
| 166 | 166 |
var signedCert []byte |
| 167 | 167 |
for i := 0; i != 5; i++ {
|
| 168 |
- signedCert, err = GetRemoteSignedCertificate(ctx, csr, token, rca.Pool, r, transport, nodeInfo) |
|
| 168 |
+ signedCert, err = GetRemoteSignedCertificate(ctx, csr, token, rca.Pool, r, transport, nodeInfo, 0) |
|
| 169 | 169 |
if err == nil {
|
| 170 | 170 |
break |
| 171 | 171 |
} |
| ... | ... |
@@ -545,7 +545,7 @@ func CreateRootCA(rootCN string, paths CertPaths) (RootCA, error) {
|
| 545 | 545 |
|
| 546 | 546 |
// GetRemoteSignedCertificate submits a CSR to a remote CA server address, |
| 547 | 547 |
// and that is part of a CA identified by a specific certificate pool. |
| 548 |
-func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, rootCAPool *x509.CertPool, r remotes.Remotes, creds credentials.TransportCredentials, nodeInfo chan<- api.IssueNodeCertificateResponse) ([]byte, error) {
|
|
| 548 |
+func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, rootCAPool *x509.CertPool, r remotes.Remotes, creds credentials.TransportCredentials, nodeInfo chan<- api.IssueNodeCertificateResponse, nodeCertificateStatusRequestTimeout time.Duration) ([]byte, error) {
|
|
| 549 | 549 |
if rootCAPool == nil {
|
| 550 | 550 |
return nil, errors.New("valid root CA pool required")
|
| 551 | 551 |
} |
| ... | ... |
@@ -560,7 +560,6 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r |
| 560 | 560 |
if err != nil {
|
| 561 | 561 |
return nil, err |
| 562 | 562 |
} |
| 563 |
- defer conn.Close() |
|
| 564 | 563 |
|
| 565 | 564 |
// Create a CAClient to retrieve a new Certificate |
| 566 | 565 |
caClient := api.NewNodeCAClient(conn) |
| ... | ... |
@@ -570,6 +569,7 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r |
| 570 | 570 |
issueResponse, err := caClient.IssueNodeCertificate(ctx, issueRequest) |
| 571 | 571 |
if err != nil {
|
| 572 | 572 |
r.Observe(peer, -remotes.DefaultObservationWeight) |
| 573 |
+ conn.Close() |
|
| 573 | 574 |
return nil, err |
| 574 | 575 |
} |
| 575 | 576 |
|
| ... | ... |
@@ -587,18 +587,31 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r |
| 587 | 587 |
|
| 588 | 588 |
// Exponential backoff with Max of 30 seconds to wait for a new retry |
| 589 | 589 |
for {
|
| 590 |
+ timeout := 5 * time.Second |
|
| 591 |
+ if nodeCertificateStatusRequestTimeout > 0 {
|
|
| 592 |
+ timeout = nodeCertificateStatusRequestTimeout |
|
| 593 |
+ } |
|
| 590 | 594 |
// Send the Request and retrieve the certificate |
| 591 |
- ctx, cancel := context.WithTimeout(ctx, 5*time.Second) |
|
| 595 |
+ stateCtx, cancel := context.WithTimeout(ctx, timeout) |
|
| 592 | 596 |
defer cancel() |
| 593 |
- statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest) |
|
| 594 |
- if err != nil {
|
|
| 597 |
+ statusResponse, err := caClient.NodeCertificateStatus(stateCtx, statusRequest) |
|
| 598 |
+ switch {
|
|
| 599 |
+ case err != nil && grpc.Code(err) != codes.DeadlineExceeded: |
|
| 600 |
+ // Because IssueNodeCertificate succeeded, if this call failed likely it is due to an issue with this |
|
| 601 |
+ // particular connection, so we need to get another. |
|
| 595 | 602 |
r.Observe(peer, -remotes.DefaultObservationWeight) |
| 596 |
- return nil, err |
|
| 597 |
- } |
|
| 603 |
+ conn.Close() |
|
| 604 |
+ conn, peer, err = getGRPCConnection(creds, r) |
|
| 605 |
+ if err != nil {
|
|
| 606 |
+ return nil, err |
|
| 607 |
+ } |
|
| 608 |
+ caClient = api.NewNodeCAClient(conn) |
|
| 598 | 609 |
|
| 599 |
- // If the certificate was issued, return |
|
| 600 |
- if statusResponse.Status.State == api.IssuanceStateIssued {
|
|
| 610 |
+ // If there was no deadline exceeded error, and the certificate was issued, return |
|
| 611 |
+ case err == nil && statusResponse.Status.State == api.IssuanceStateIssued: |
|
| 601 | 612 |
if statusResponse.Certificate == nil {
|
| 613 |
+ r.Observe(peer, -remotes.DefaultObservationWeight) |
|
| 614 |
+ conn.Close() |
|
| 602 | 615 |
return nil, errors.New("no certificate in CertificateStatus response")
|
| 603 | 616 |
} |
| 604 | 617 |
|
| ... | ... |
@@ -609,14 +622,20 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r |
| 609 | 609 |
// current request. |
| 610 | 610 |
if bytes.Equal(statusResponse.Certificate.CSR, csr) {
|
| 611 | 611 |
r.Observe(peer, remotes.DefaultObservationWeight) |
| 612 |
+ conn.Close() |
|
| 612 | 613 |
return statusResponse.Certificate.Certificate, nil |
| 613 | 614 |
} |
| 614 | 615 |
} |
| 615 | 616 |
|
| 616 | 617 |
// If we're still pending, the issuance failed, or the state is unknown |
| 617 |
- // let's continue trying. |
|
| 618 |
+ // let's continue trying after an exponential backoff |
|
| 618 | 619 |
expBackoff.Failure(nil, nil) |
| 619 |
- time.Sleep(expBackoff.Proceed(nil)) |
|
| 620 |
+ select {
|
|
| 621 |
+ case <-ctx.Done(): |
|
| 622 |
+ conn.Close() |
|
| 623 |
+ return nil, err |
|
| 624 |
+ case <-time.After(expBackoff.Proceed(nil)): |
|
| 625 |
+ } |
|
| 620 | 626 |
} |
| 621 | 627 |
} |
| 622 | 628 |
|
| ... | ... |
@@ -7,6 +7,7 @@ import ( |
| 7 | 7 |
"io/ioutil" |
| 8 | 8 |
"net/http" |
| 9 | 9 |
"sync" |
| 10 |
+ "time" |
|
| 10 | 11 |
|
| 11 | 12 |
"github.com/Sirupsen/logrus" |
| 12 | 13 |
"github.com/cloudflare/cfssl/api" |
| ... | ... |
@@ -23,6 +24,8 @@ var ErrNoExternalCAURLs = errors.New("no external CA URLs")
|
| 23 | 23 |
// ExternalCA is able to make certificate signing requests to one of a list |
| 24 | 24 |
// remote CFSSL API endpoints. |
| 25 | 25 |
type ExternalCA struct {
|
| 26 |
+ ExternalRequestTimeout time.Duration |
|
| 27 |
+ |
|
| 26 | 28 |
mu sync.Mutex |
| 27 | 29 |
rootCA *RootCA |
| 28 | 30 |
urls []string |
| ... | ... |
@@ -33,8 +36,9 @@ type ExternalCA struct {
|
| 33 | 33 |
// authenticate to any of the given URLS of CFSSL API endpoints. |
| 34 | 34 |
func NewExternalCA(rootCA *RootCA, tlsConfig *tls.Config, urls ...string) *ExternalCA {
|
| 35 | 35 |
return &ExternalCA{
|
| 36 |
- rootCA: rootCA, |
|
| 37 |
- urls: urls, |
|
| 36 |
+ ExternalRequestTimeout: 5 * time.Second, |
|
| 37 |
+ rootCA: rootCA, |
|
| 38 |
+ urls: urls, |
|
| 38 | 39 |
client: &http.Client{
|
| 39 | 40 |
Transport: &http.Transport{
|
| 40 | 41 |
TLSClientConfig: tlsConfig, |
| ... | ... |
@@ -87,7 +91,9 @@ func (eca *ExternalCA) Sign(ctx context.Context, req signer.SignRequest) (cert [ |
| 87 | 87 |
// Try each configured proxy URL. Return after the first success. If |
| 88 | 88 |
// all fail then the last error will be returned. |
| 89 | 89 |
for _, url := range urls {
|
| 90 |
- cert, err = makeExternalSignRequest(ctx, client, url, csrJSON) |
|
| 90 |
+ requestCtx, cancel := context.WithTimeout(ctx, eca.ExternalRequestTimeout) |
|
| 91 |
+ cert, err = makeExternalSignRequest(requestCtx, client, url, csrJSON) |
|
| 92 |
+ cancel() |
|
| 91 | 93 |
if err == nil {
|
| 92 | 94 |
return eca.rootCA.AppendFirstRootPEM(cert) |
| 93 | 95 |
} |
| ... | ... |
@@ -243,8 +243,9 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin |
| 243 | 243 |
updates := make(map[*api.Service][]orchestrator.Slot) |
| 244 | 244 |
|
| 245 | 245 |
_, err := g.store.Batch(func(batch *store.Batch) error {
|
| 246 |
- var updateTasks []orchestrator.Slot |
|
| 247 | 246 |
for _, serviceID := range serviceIDs {
|
| 247 |
+ var updateTasks []orchestrator.Slot |
|
| 248 |
+ |
|
| 248 | 249 |
if _, exists := nodeTasks[serviceID]; !exists {
|
| 249 | 250 |
continue |
| 250 | 251 |
} |
| ... | ... |
@@ -298,7 +299,6 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin |
| 298 | 298 |
for service, updateTasks := range updates {
|
| 299 | 299 |
g.updater.Update(ctx, g.cluster, service, updateTasks) |
| 300 | 300 |
} |
| 301 |
- |
|
| 302 | 301 |
} |
| 303 | 302 |
|
| 304 | 303 |
// updateNode updates g.nodes based on the current node value |
| ... | ... |
@@ -406,7 +406,11 @@ func (u *Updater) updateTask(ctx context.Context, slot orchestrator.Slot, update |
| 406 | 406 |
} |
| 407 | 407 |
|
| 408 | 408 |
if delayStartCh != nil {
|
| 409 |
- <-delayStartCh |
|
| 409 |
+ select {
|
|
| 410 |
+ case <-delayStartCh: |
|
| 411 |
+ case <-u.stopChan: |
|
| 412 |
+ return nil |
|
| 413 |
+ } |
|
| 410 | 414 |
} |
| 411 | 415 |
|
| 412 | 416 |
// Wait for the new task to come up. |
| ... | ... |
@@ -456,7 +460,11 @@ func (u *Updater) useExistingTask(ctx context.Context, slot orchestrator.Slot, e |
| 456 | 456 |
} |
| 457 | 457 |
|
| 458 | 458 |
if delayStartCh != nil {
|
| 459 |
- <-delayStartCh |
|
| 459 |
+ select {
|
|
| 460 |
+ case <-delayStartCh: |
|
| 461 |
+ case <-u.stopChan: |
|
| 462 |
+ return nil |
|
| 463 |
+ } |
|
| 460 | 464 |
} |
| 461 | 465 |
} |
| 462 | 466 |
|