package csi import ( "context" "errors" "fmt" "net" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/moby/swarmkit/v2/api" "github.com/moby/swarmkit/v2/internal/csi/capability" "github.com/moby/swarmkit/v2/log" mobyplugin "github.com/moby/swarmkit/v2/node/plugin" ) // Plugin is the interface for a CSI controller plugin. // // In this package, the word "plugin" is unfortunately overused. This // particular "Plugin" is the interface used by volume Manager to interact with // CSI controller plugins. It should not be confused with the "plugin" returned // from the plugingetter interface, which is the interface that gives us the // information we need to create this Plugin. type Plugin interface { CreateVolume(context.Context, *api.Volume) (*api.VolumeInfo, error) DeleteVolume(context.Context, *api.Volume) error PublishVolume(context.Context, *api.Volume, string) (map[string]string, error) UnpublishVolume(context.Context, *api.Volume, string) error AddNode(swarmID, csiID string) RemoveNode(swarmID string) Addr() net.Addr } // plugin represents an individual CSI controller plugin type plugin struct { // name is the name of the plugin, which is also the name used as the // Driver.Name field name string // socket is the unix socket to connect to this plugin at. socket string addr net.Addr // provider is the SecretProvider, which allows retrieving secrets for CSI // calls. provider SecretProvider // cc is the grpc client connection // TODO(dperny): the client is never closed. it may be closed when it goes // out of scope, but this should be verified. cc *grpc.ClientConn // idClient is the identity service client idClient csi.IdentityClient // controllerClient is the controller service client controllerClient csi.ControllerClient // controller indicates that the plugin has controller capabilities. controller bool // publisher indicates that the controller plugin has // PUBLISH_UNPUBLISH_VOLUME capability. publisher bool // swarmToCSI maps a swarm node ID to the corresponding CSI node ID swarmToCSI map[string]string // csiToSwarm maps a CSI node ID back to the swarm node ID. csiToSwarm map[string]string } // NewPlugin creates a new Plugin object. // // NewPlugin takes both the CompatPlugin and the PluginAddr. These should be // the same object. By taking both parts here, we can push off the work of // assuring that the given plugin implements the PluginAddr interface without // having to typecast in this constructor. func NewPlugin(p mobyplugin.AddrPlugin, provider SecretProvider) Plugin { return &plugin{ name: p.Name(), // TODO(dperny): verify that we do not need to include the Network() // portion of the Addr. socket: fmt.Sprintf("%s://%s", p.Addr().Network(), p.Addr().String()), addr: p.Addr(), provider: provider, swarmToCSI: map[string]string{}, csiToSwarm: map[string]string{}, } } // connect is a private method that initializes a gRPC ClientConn and creates // the IdentityClient and ControllerClient. func (p *plugin) connect(ctx context.Context) error { cc, err := grpc.DialContext(ctx, p.socket, grpc.WithInsecure()) if err != nil { return err } p.cc = cc // first, probe the plugin, to ensure that it exists and is ready to go idc := csi.NewIdentityClient(cc) p.idClient = idc // controllerClient may not do anything if the plugin does not support // the controller service, but it should not be an error to create it now // anyway p.controllerClient = csi.NewControllerClient(cc) return p.init(ctx) } // init checks uses the identity service to check the properties of the plugin, // most importantly, its capabilities. func (p *plugin) init(ctx context.Context) error { probe, err := p.idClient.Probe(ctx, &csi.ProbeRequest{}) if err != nil { return err } if probe.Ready != nil && !probe.Ready.Value { return errors.New("plugin not ready") } resp, err := p.idClient.GetPluginCapabilities(ctx, &csi.GetPluginCapabilitiesRequest{}) if err != nil { return err } if resp == nil { return nil } for _, c := range resp.Capabilities { if sc := c.GetService(); sc != nil { switch sc.Type { case csi.PluginCapability_Service_CONTROLLER_SERVICE: p.controller = true } } } if p.controller { cCapResp, err := p.controllerClient.ControllerGetCapabilities( ctx, &csi.ControllerGetCapabilitiesRequest{}, ) if err != nil { return err } for _, c := range cCapResp.Capabilities { rpc := c.GetRpc() if rpc.Type == csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME { p.publisher = true } } } return nil } // CreateVolume wraps and abstracts the CSI CreateVolume logic and returns // the volume info, or an error. func (p *plugin) CreateVolume(ctx context.Context, v *api.Volume) (*api.VolumeInfo, error) { c, err := p.Client(ctx) if err != nil { return nil, err } if !p.controller { // TODO(dperny): come up with a scheme to handle headless plugins // TODO(dperny): handle plugins without create volume capabilities return &api.VolumeInfo{VolumeID: v.Spec.Annotations.Name}, nil } createVolumeRequest := p.makeCreateVolume(v) resp, err := c.CreateVolume(ctx, createVolumeRequest) if err != nil { return nil, err } return makeVolumeInfo(resp.Volume), nil } func (p *plugin) DeleteVolume(ctx context.Context, v *api.Volume) error { if v.VolumeInfo == nil { return errors.New("VolumeInfo must not be nil") } // we won't use a fancy createDeleteVolumeRequest method because the // request is simple enough to not bother with it secrets := p.makeSecrets(v) req := &csi.DeleteVolumeRequest{ VolumeId: v.VolumeInfo.VolumeID, Secrets: secrets, } c, err := p.Client(ctx) if err != nil { return err } // response from RPC intentionally left blank _, err = c.DeleteVolume(ctx, req) return err } // PublishVolume calls ControllerPublishVolume to publish the given Volume to // the Node with the given swarmkit ID. It returns a map, which is the // PublishContext for this Volume on this Node. func (p *plugin) PublishVolume(ctx context.Context, v *api.Volume, nodeID string) (map[string]string, error) { if !p.publisher { return nil, nil } csiNodeID := p.swarmToCSI[nodeID] if csiNodeID == "" { log.L.Errorf("CSI node ID not found for given Swarm node ID. Plugin: %s , Swarm node ID: %s", p.name, nodeID) return nil, status.Error(codes.FailedPrecondition, "CSI node ID not found for given Swarm node ID") } req := p.makeControllerPublishVolumeRequest(v, nodeID) c, err := p.Client(ctx) if err != nil { return nil, err } resp, err := c.ControllerPublishVolume(ctx, req) if err != nil { return nil, err } return resp.PublishContext, nil } // UnpublishVolume calls ControllerUnpublishVolume to unpublish the given // Volume from the Node with the given swarmkit ID. It returns an error if the // unpublish does not succeed func (p *plugin) UnpublishVolume(ctx context.Context, v *api.Volume, nodeID string) error { if !p.publisher { return nil } req := p.makeControllerUnpublishVolumeRequest(v, nodeID) c, err := p.Client(ctx) if err != nil { return err } // response of the RPC intentionally left blank _, err = c.ControllerUnpublishVolume(ctx, req) return err } // AddNode adds a mapping for a node's swarm ID to the ID provided by this CSI // plugin. This allows future calls to the plugin to be done entirely in terms // of the swarm node ID. // // The CSI node ID is provided by the node as part of the NodeDescription. func (p *plugin) AddNode(swarmID, csiID string) { p.swarmToCSI[swarmID] = csiID p.csiToSwarm[csiID] = swarmID } // RemoveNode removes a node from this plugin's node mappings. func (p *plugin) RemoveNode(swarmID string) { csiID := p.swarmToCSI[swarmID] delete(p.swarmToCSI, swarmID) delete(p.csiToSwarm, csiID) } // Client retrieves a csi.ControllerClient for this plugin // // If this is the first time client has been called and no client yet exists, // it will initialize the gRPC connection to the remote plugin and create a new // ControllerClient. func (p *plugin) Client(ctx context.Context) (csi.ControllerClient, error) { if p.controllerClient == nil { if err := p.connect(ctx); err != nil { return nil, err } } return p.controllerClient, nil } // makeCreateVolume makes a csi.CreateVolumeRequest from the volume object and // spec. it uses the Plugin's SecretProvider to retrieve relevant secrets. func (p *plugin) makeCreateVolume(v *api.Volume) *csi.CreateVolumeRequest { secrets := p.makeSecrets(v) return &csi.CreateVolumeRequest{ Name: v.Spec.Annotations.Name, Parameters: v.Spec.Driver.Options, VolumeCapabilities: []*csi.VolumeCapability{ capability.MakeCapability(v.Spec.AccessMode), }, Secrets: secrets, AccessibilityRequirements: makeTopologyRequirement(v.Spec.AccessibilityRequirements), CapacityRange: makeCapacityRange(v.Spec.CapacityRange), } } // makeSecrets uses the plugin's SecretProvider to make the secrets map to pass // to CSI RPCs. func (p *plugin) makeSecrets(v *api.Volume) map[string]string { secrets := map[string]string{} for _, vs := range v.Spec.Secrets { // a secret should never be nil, but check just to be sure if vs != nil { secret := p.provider.GetSecret(vs.Secret) if secret != nil { // TODO(dperny): return an error, but this should never happen, // as secrets should be validated at volume creation time secrets[vs.Key] = string(secret.Spec.Data) } } } return secrets } func (p *plugin) makeControllerPublishVolumeRequest(v *api.Volume, nodeID string) *csi.ControllerPublishVolumeRequest { if v.VolumeInfo == nil { return nil } secrets := p.makeSecrets(v) capability := capability.MakeCapability(v.Spec.AccessMode) capability.AccessType = &csi.VolumeCapability_Mount{ Mount: &csi.VolumeCapability_MountVolume{}, } return &csi.ControllerPublishVolumeRequest{ VolumeId: v.VolumeInfo.VolumeID, NodeId: p.swarmToCSI[nodeID], Secrets: secrets, VolumeCapability: capability, VolumeContext: v.VolumeInfo.VolumeContext, } } func (p *plugin) makeControllerUnpublishVolumeRequest(v *api.Volume, nodeID string) *csi.ControllerUnpublishVolumeRequest { if v.VolumeInfo == nil { return nil } secrets := p.makeSecrets(v) return &csi.ControllerUnpublishVolumeRequest{ VolumeId: v.VolumeInfo.VolumeID, NodeId: p.swarmToCSI[nodeID], Secrets: secrets, } } func (p *plugin) Addr() net.Addr { return p.addr }