full diff: https://github.com/moby/swarmkit/compare/48dd89375d0a...6341884e5fc9
Pulls in a set of fixes to SwarmKit's nascent Cluster Volumes support
discovered during subsequent development and testing.
Signed-off-by: Bjorn Neergaard <bneergaard@mirantis.com>
| ... | ... |
@@ -55,7 +55,7 @@ require ( |
| 55 | 55 |
github.com/moby/locker v1.0.1 |
| 56 | 56 |
github.com/moby/patternmatcher v0.5.0 |
| 57 | 57 |
github.com/moby/pubsub v1.0.0 |
| 58 |
- github.com/moby/swarmkit/v2 v2.0.0-20220721174824-48dd89375d0a |
|
| 58 |
+ github.com/moby/swarmkit/v2 v2.0.0-20221102165002-6341884e5fc9 |
|
| 59 | 59 |
github.com/moby/sys/mount v0.3.3 |
| 60 | 60 |
github.com/moby/sys/mountinfo v0.6.2 |
| 61 | 61 |
github.com/moby/sys/sequential v0.5.0 |
| ... | ... |
@@ -777,8 +777,8 @@ github.com/moby/patternmatcher v0.5.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YO |
| 777 | 777 |
github.com/moby/pubsub v1.0.0 h1:jkp/imWsmJz2f6LyFsk7EkVeN2HxR/HTTOY8kHrsxfA= |
| 778 | 778 |
github.com/moby/pubsub v1.0.0/go.mod h1:bXSO+3h5MNXXCaEG+6/NlAIk7MMZbySZlnB+cUQhKKc= |
| 779 | 779 |
github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= |
| 780 |
-github.com/moby/swarmkit/v2 v2.0.0-20220721174824-48dd89375d0a h1:gLcTxHH4egYVhMVFWRxvWsb79Ok4kfTt1/irZNyovUY= |
|
| 781 |
-github.com/moby/swarmkit/v2 v2.0.0-20220721174824-48dd89375d0a/go.mod h1:/so6Lct4y1x14UprW/loFsOe6xoXVTlvh25V36ULXNQ= |
|
| 780 |
+github.com/moby/swarmkit/v2 v2.0.0-20221102165002-6341884e5fc9 h1:d/XCmjx1zKZdzlBX90kSGDex7V2GE2jdGDr9nXYZg/Q= |
|
| 781 |
+github.com/moby/swarmkit/v2 v2.0.0-20221102165002-6341884e5fc9/go.mod h1:/so6Lct4y1x14UprW/loFsOe6xoXVTlvh25V36ULXNQ= |
|
| 782 | 782 |
github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs= |
| 783 | 783 |
github.com/moby/sys/mount v0.3.3/go.mod h1:PBaEorSNTLG5t/+4EgukEQVlAvVEc6ZjTySwKdqp5K0= |
| 784 | 784 |
github.com/moby/sys/mountinfo v0.4.0/go.mod h1:rEr8tzG/lsIZHBtN/JjGG+LMYx9eXgW2JI+6q0qou+A= |
| ... | ... |
@@ -4,6 +4,7 @@ import ( |
| 4 | 4 |
"context" |
| 5 | 5 |
"fmt" |
| 6 | 6 |
"sync" |
| 7 |
+ "time" |
|
| 7 | 8 |
|
| 8 | 9 |
"github.com/sirupsen/logrus" |
| 9 | 10 |
|
| ... | ... |
@@ -16,6 +17,8 @@ import ( |
| 16 | 16 |
"github.com/moby/swarmkit/v2/volumequeue" |
| 17 | 17 |
) |
| 18 | 18 |
|
| 19 |
+const CSI_CALL_TIMEOUT = 15 * time.Second |
|
| 20 |
+ |
|
| 19 | 21 |
// volumeState keeps track of the state of a volume on this node. |
| 20 | 22 |
type volumeState struct {
|
| 21 | 23 |
// volume is the actual VolumeAssignment for this volume |
| ... | ... |
@@ -87,14 +90,35 @@ func (r *volumes) tryVolume(ctx context.Context, id string, attempt uint) {
|
| 87 | 87 |
return |
| 88 | 88 |
} |
| 89 | 89 |
|
| 90 |
+ // create a sub-context with a timeout. because we can only process one |
|
| 91 |
+ // volume at a time, if we rely on the server-side or default timeout, we |
|
| 92 |
+ // may be waiting a very long time for a particular volume to fail. |
|
| 93 |
+ // |
|
| 94 |
+ // TODO(dperny): there is almost certainly a more intelligent way to do |
|
| 95 |
+ // this. For example, we could: |
|
| 96 |
+ // |
|
| 97 |
+ // * Change code such that we can service volumes managed by different |
|
| 98 |
+ // plugins at the same time. |
|
| 99 |
+ // * Take longer timeouts when we don't have any other volumes in the |
|
| 100 |
+ // queue |
|
| 101 |
+ // * Have interruptible attempts, so that if we're taking longer |
|
| 102 |
+ // timeouts, we can abort them to service new volumes. |
|
| 103 |
+ // |
|
| 104 |
+ // These are too complicated to be worth the engineering effort at this |
|
| 105 |
+ // time. |
|
| 106 |
+ |
|
| 107 |
+ timeoutCtx, cancel := context.WithTimeout(ctx, CSI_CALL_TIMEOUT) |
|
| 108 |
+ // always gotta call the WithTimeout cancel |
|
| 109 |
+ defer cancel() |
|
| 110 |
+ |
|
| 90 | 111 |
if !vs.remove {
|
| 91 |
- if err := r.publishVolume(ctx, vs.volume); err != nil {
|
|
| 92 |
- log.G(ctx).WithError(err).Info("publishing volume failed")
|
|
| 112 |
+ if err := r.publishVolume(timeoutCtx, vs.volume); err != nil {
|
|
| 113 |
+ log.G(timeoutCtx).WithError(err).Info("publishing volume failed")
|
|
| 93 | 114 |
r.pendingVolumes.Enqueue(id, attempt+1) |
| 94 | 115 |
} |
| 95 | 116 |
} else {
|
| 96 |
- if err := r.unpublishVolume(ctx, vs.volume); err != nil {
|
|
| 97 |
- log.G(ctx).WithError(err).Info("upublishing volume failed")
|
|
| 117 |
+ if err := r.unpublishVolume(timeoutCtx, vs.volume); err != nil {
|
|
| 118 |
+ log.G(timeoutCtx).WithError(err).Info("upublishing volume failed")
|
|
| 98 | 119 |
r.pendingVolumes.Enqueue(id, attempt+1) |
| 99 | 120 |
} else {
|
| 100 | 121 |
// if unpublishing was successful, then call the callback |
| ... | ... |
@@ -695,7 +695,7 @@ func (s *Server) UpdateRootCA(ctx context.Context, cluster *api.Cluster, reconci |
| 695 | 695 |
log.G(ctx).Warn("no certificate expiration specified, using default")
|
| 696 | 696 |
} |
| 697 | 697 |
// Attempt to update our local RootCA with the new parameters |
| 698 |
- updatedRootCA, err := RootCAFromAPI(ctx, rCA, expiry) |
|
| 698 |
+ updatedRootCA, err := RootCAFromAPI(rCA, expiry) |
|
| 699 | 699 |
if err != nil {
|
| 700 | 700 |
return errors.Wrap(err, "invalid Root CA object in cluster") |
| 701 | 701 |
} |
| ... | ... |
@@ -901,7 +901,7 @@ func isFinalState(status api.IssuanceStatus) bool {
|
| 901 | 901 |
} |
| 902 | 902 |
|
| 903 | 903 |
// RootCAFromAPI creates a RootCA object from an api.RootCA object |
| 904 |
-func RootCAFromAPI(ctx context.Context, apiRootCA *api.RootCA, expiry time.Duration) (RootCA, error) {
|
|
| 904 |
+func RootCAFromAPI(apiRootCA *api.RootCA, expiry time.Duration) (RootCA, error) {
|
|
| 905 | 905 |
var intermediates []byte |
| 906 | 906 |
signingCert := apiRootCA.CACert |
| 907 | 907 |
signingKey := apiRootCA.CAKey |
| ... | ... |
@@ -119,7 +119,7 @@ func (s *Server) UpdateCluster(ctx context.Context, request *api.UpdateClusterRe |
| 119 | 119 |
} |
| 120 | 120 |
// This ensures that we have the current rootCA with which to generate tokens (expiration doesn't matter |
| 121 | 121 |
// for generating the tokens) |
| 122 |
- rootCA, err := ca.RootCAFromAPI(ctx, &cluster.RootCA, ca.DefaultNodeCertExpiration) |
|
| 122 |
+ rootCA, err := ca.RootCAFromAPI(&cluster.RootCA, ca.DefaultNodeCertExpiration) |
|
| 123 | 123 |
if err != nil {
|
| 124 | 124 |
log.G(ctx).WithField( |
| 125 | 125 |
"method", "(*controlapi.Server).UpdateCluster").WithError(err).Error("invalid cluster root CA")
|
| ... | ... |
@@ -2,6 +2,7 @@ package controlapi |
| 2 | 2 |
|
| 3 | 3 |
import ( |
| 4 | 4 |
"context" |
| 5 |
+ "reflect" |
|
| 5 | 6 |
"strings" |
| 6 | 7 |
|
| 7 | 8 |
"github.com/moby/swarmkit/v2/api" |
| ... | ... |
@@ -94,17 +95,28 @@ func (s *Server) UpdateVolume(ctx context.Context, request *api.UpdateVolumeRequ |
| 94 | 94 |
if request.Spec.Group != volume.Spec.Group {
|
| 95 | 95 |
return status.Errorf(codes.InvalidArgument, "Group cannot be updated") |
| 96 | 96 |
} |
| 97 |
- if request.Spec.AccessibilityRequirements != volume.Spec.AccessibilityRequirements {
|
|
| 97 |
+ if !reflect.DeepEqual(request.Spec.AccessibilityRequirements, volume.Spec.AccessibilityRequirements) {
|
|
| 98 | 98 |
return status.Errorf(codes.InvalidArgument, "AccessibilityRequirements cannot be updated") |
| 99 | 99 |
} |
| 100 |
- if request.Spec.Driver == nil || request.Spec.Driver.Name != volume.Spec.Driver.Name {
|
|
| 100 |
+ if !reflect.DeepEqual(request.Spec.Driver, volume.Spec.Driver) {
|
|
| 101 | 101 |
return status.Errorf(codes.InvalidArgument, "Driver cannot be updated") |
| 102 | 102 |
} |
| 103 |
- if request.Spec.AccessMode.Scope != volume.Spec.AccessMode.Scope || request.Spec.AccessMode.Sharing != volume.Spec.AccessMode.Sharing {
|
|
| 103 |
+ if !reflect.DeepEqual(request.Spec.AccessMode, volume.Spec.AccessMode) {
|
|
| 104 | 104 |
return status.Errorf(codes.InvalidArgument, "AccessMode cannot be updated") |
| 105 | 105 |
} |
| 106 |
+ if !reflect.DeepEqual(request.Spec.Secrets, volume.Spec.Secrets) {
|
|
| 107 |
+ return status.Errorf(codes.InvalidArgument, "Secrets cannot be updated") |
|
| 108 |
+ } |
|
| 109 |
+ if !reflect.DeepEqual(request.Spec.CapacityRange, volume.Spec.CapacityRange) {
|
|
| 110 |
+ return status.Errorf(codes.InvalidArgument, "CapacityRange cannot be updated") |
|
| 111 |
+ } |
|
| 112 |
+ |
|
| 113 |
+ // to further guard against changing fields we're not allowed to, don't |
|
| 114 |
+ // replace the entire spec. just replace the fields we are allowed to |
|
| 115 |
+ // change |
|
| 116 |
+ volume.Spec.Annotations.Labels = request.Spec.Annotations.Labels |
|
| 117 |
+ volume.Spec.Availability = request.Spec.Availability |
|
| 106 | 118 |
|
| 107 |
- volume.Spec = *request.Spec |
|
| 108 | 119 |
volume.Meta.Version = *request.VolumeVersion |
| 109 | 120 |
if err := store.UpdateVolume(tx, volume); err != nil {
|
| 110 | 121 |
return err |
| ... | ... |
@@ -5,6 +5,7 @@ import ( |
| 5 | 5 |
"errors" |
| 6 | 6 |
"fmt" |
| 7 | 7 |
"sync" |
| 8 |
+ "time" |
|
| 8 | 9 |
|
| 9 | 10 |
"github.com/docker/go-events" |
| 10 | 11 |
"github.com/sirupsen/logrus" |
| ... | ... |
@@ -23,6 +24,10 @@ const ( |
| 23 | 23 |
// plugin interface is "docker.csicontroller/1.0". This gets only the CSI |
| 24 | 24 |
// plugins with Controller capability. |
| 25 | 25 |
DockerCSIPluginCap = "csicontroller" |
| 26 |
+ |
|
| 27 |
+ // CSIRPCTimeout is the client-side timeout duration for RPCs to the CSI |
|
| 28 |
+ // plugin. |
|
| 29 |
+ CSIRPCTimeout = 15 * time.Second |
|
| 26 | 30 |
) |
| 27 | 31 |
|
| 28 | 32 |
type Manager struct {
|
| ... | ... |
@@ -149,11 +154,17 @@ func (vm *Manager) run(pctx context.Context) {
|
| 149 | 149 |
// processVolumes encapuslates the logic for processing pending Volumes. |
| 150 | 150 |
func (vm *Manager) processVolume(ctx context.Context, id string, attempt uint) {
|
| 151 | 151 |
// set up log fields for a derrived context to pass to handleVolume. |
| 152 |
- dctx := log.WithFields(ctx, logrus.Fields{
|
|
| 152 |
+ logCtx := log.WithFields(ctx, logrus.Fields{
|
|
| 153 | 153 |
"volume.id": id, |
| 154 | 154 |
"attempt": attempt, |
| 155 | 155 |
}) |
| 156 | 156 |
|
| 157 |
+ // Set a client-side timeout. Without this, one really long server-side |
|
| 158 |
+ // timeout can block processing all volumes until it completes or fails. |
|
| 159 |
+ dctx, cancel := context.WithTimeout(logCtx, CSIRPCTimeout) |
|
| 160 |
+ // always gotta call the WithTimeout cancel |
|
| 161 |
+ defer cancel() |
|
| 162 |
+ |
|
| 157 | 163 |
err := vm.handleVolume(dctx, id) |
| 158 | 164 |
// TODO(dperny): differentiate between retryable and non-retryable |
| 159 | 165 |
// errors. |
| ... | ... |
@@ -600,7 +600,7 @@ github.com/moby/patternmatcher |
| 600 | 600 |
# github.com/moby/pubsub v1.0.0 |
| 601 | 601 |
## explicit; go 1.19 |
| 602 | 602 |
github.com/moby/pubsub |
| 603 |
-# github.com/moby/swarmkit/v2 v2.0.0-20220721174824-48dd89375d0a |
|
| 603 |
+# github.com/moby/swarmkit/v2 v2.0.0-20221102165002-6341884e5fc9 |
|
| 604 | 604 |
## explicit; go 1.17 |
| 605 | 605 |
github.com/moby/swarmkit/v2/agent |
| 606 | 606 |
github.com/moby/swarmkit/v2/agent/configs |