package csi

import (
	"context"
	"errors"
	"fmt"
	"net"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"github.com/container-storage-interface/spec/lib/go/csi"
	"github.com/moby/swarmkit/v2/api"
	"github.com/moby/swarmkit/v2/internal/csi/capability"
	"github.com/moby/swarmkit/v2/log"
	mobyplugin "github.com/moby/swarmkit/v2/node/plugin"
)

// Plugin is the interface for a CSI controller plugin.
//
// In this package, the word "plugin" is unfortunately overused. This
// particular "Plugin" is the interface used by volume Manager to interact with
// CSI controller plugins. It should not be confused with the "plugin" returned
// from the plugingetter interface, which is the interface that gives us the
// information we need to create this Plugin.
type Plugin interface {
	CreateVolume(context.Context, *api.Volume) (*api.VolumeInfo, error)
	DeleteVolume(context.Context, *api.Volume) error
	PublishVolume(context.Context, *api.Volume, string) (map[string]string, error)
	UnpublishVolume(context.Context, *api.Volume, string) error
	AddNode(swarmID, csiID string)
	RemoveNode(swarmID string)
	Addr() net.Addr
}

// plugin represents an individual CSI controller plugin
type plugin struct {
	// name is the name of the plugin, which is also the name used as the
	// Driver.Name field
	name string

	// socket is the unix socket to connect to this plugin at.
	socket string
	addr   net.Addr

	// provider is the SecretProvider, which allows retrieving secrets for CSI
	// calls.
	provider SecretProvider

	// cc is the grpc client connection
	// TODO(dperny): the client is never closed. it may be closed when it goes
	// out of scope, but this should be verified.
	cc *grpc.ClientConn
	// idClient is the identity service client
	idClient csi.IdentityClient
	// controllerClient is the controller service client
	controllerClient csi.ControllerClient

	// controller indicates that the plugin has controller capabilities.
	controller bool

	// publisher indicates that the controller plugin has
	// PUBLISH_UNPUBLISH_VOLUME capability.
	publisher bool

	// swarmToCSI maps a swarm node ID to the corresponding CSI node ID
	swarmToCSI map[string]string

	// csiToSwarm maps a CSI node ID back to the swarm node ID.
	csiToSwarm map[string]string
}

// NewPlugin creates a new Plugin object.
//
// NewPlugin takes both the CompatPlugin and the PluginAddr. These should be
// the same object. By taking both parts here, we can push off the work of
// assuring that the given plugin implements the PluginAddr interface without
// having to typecast in this constructor.
func NewPlugin(p mobyplugin.AddrPlugin, provider SecretProvider) Plugin {
	return &plugin{
		name: p.Name(),
		// TODO(dperny): verify that we do not need to include the Network()
		// portion of the Addr.
		socket:     fmt.Sprintf("%s://%s", p.Addr().Network(), p.Addr().String()),
		addr:       p.Addr(),
		provider:   provider,
		swarmToCSI: map[string]string{},
		csiToSwarm: map[string]string{},
	}
}

// connect is a private method that initializes a gRPC ClientConn and creates
// the IdentityClient and ControllerClient.
func (p *plugin) connect(ctx context.Context) error {
	cc, err := grpc.DialContext(ctx, p.socket, grpc.WithInsecure())
	if err != nil {
		return err
	}

	p.cc = cc

	// first, probe the plugin, to ensure that it exists and is ready to go
	idc := csi.NewIdentityClient(cc)
	p.idClient = idc

	// controllerClient may not do anything if the plugin does not support
	// the controller service, but it should not be an error to create it now
	// anyway
	p.controllerClient = csi.NewControllerClient(cc)

	return p.init(ctx)
}

// init checks uses the identity service to check the properties of the plugin,
// most importantly, its capabilities.
func (p *plugin) init(ctx context.Context) error {
	probe, err := p.idClient.Probe(ctx, &csi.ProbeRequest{})
	if err != nil {
		return err
	}

	if probe.Ready != nil && !probe.Ready.Value {
		return errors.New("plugin not ready")
	}

	resp, err := p.idClient.GetPluginCapabilities(ctx, &csi.GetPluginCapabilitiesRequest{})
	if err != nil {
		return err
	}

	if resp == nil {
		return nil
	}

	for _, c := range resp.Capabilities {
		if sc := c.GetService(); sc != nil {
			switch sc.Type {
			case csi.PluginCapability_Service_CONTROLLER_SERVICE:
				p.controller = true
			}
		}
	}

	if p.controller {
		cCapResp, err := p.controllerClient.ControllerGetCapabilities(
			ctx, &csi.ControllerGetCapabilitiesRequest{},
		)
		if err != nil {
			return err
		}

		for _, c := range cCapResp.Capabilities {
			rpc := c.GetRpc()
			if rpc.Type == csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME {
				p.publisher = true
			}
		}
	}

	return nil
}

// CreateVolume wraps and abstracts the CSI CreateVolume logic and returns
// the volume info, or an error.
func (p *plugin) CreateVolume(ctx context.Context, v *api.Volume) (*api.VolumeInfo, error) {
	c, err := p.Client(ctx)
	if err != nil {
		return nil, err
	}

	if !p.controller {
		// TODO(dperny): come up with a scheme to handle headless plugins
		// TODO(dperny): handle plugins without create volume capabilities
		return &api.VolumeInfo{VolumeID: v.Spec.Annotations.Name}, nil
	}

	createVolumeRequest := p.makeCreateVolume(v)
	resp, err := c.CreateVolume(ctx, createVolumeRequest)
	if err != nil {
		return nil, err
	}

	return makeVolumeInfo(resp.Volume), nil
}

func (p *plugin) DeleteVolume(ctx context.Context, v *api.Volume) error {
	if v.VolumeInfo == nil {
		return errors.New("VolumeInfo must not be nil")
	}
	// we won't use a fancy createDeleteVolumeRequest method because the
	// request is simple enough to not bother with it
	secrets := p.makeSecrets(v)
	req := &csi.DeleteVolumeRequest{
		VolumeId: v.VolumeInfo.VolumeID,
		Secrets:  secrets,
	}
	c, err := p.Client(ctx)
	if err != nil {
		return err
	}
	// response from RPC intentionally left blank
	_, err = c.DeleteVolume(ctx, req)
	return err
}

// PublishVolume calls ControllerPublishVolume to publish the given Volume to
// the Node with the given swarmkit ID. It returns a map, which is the
// PublishContext for this Volume on this Node.
func (p *plugin) PublishVolume(ctx context.Context, v *api.Volume, nodeID string) (map[string]string, error) {
	if !p.publisher {
		return nil, nil
	}
	csiNodeID := p.swarmToCSI[nodeID]
	if csiNodeID == "" {
		log.L.Errorf("CSI node ID not found for given Swarm node ID. Plugin: %s , Swarm node ID: %s", p.name, nodeID)
		return nil, status.Error(codes.FailedPrecondition, "CSI node ID not found for given Swarm node ID")
	}

	req := p.makeControllerPublishVolumeRequest(v, nodeID)
	c, err := p.Client(ctx)
	if err != nil {
		return nil, err
	}
	resp, err := c.ControllerPublishVolume(ctx, req)

	if err != nil {
		return nil, err
	}
	return resp.PublishContext, nil
}

// UnpublishVolume calls ControllerUnpublishVolume to unpublish the given
// Volume from the Node with the given swarmkit ID. It returns an error if the
// unpublish does not succeed
func (p *plugin) UnpublishVolume(ctx context.Context, v *api.Volume, nodeID string) error {
	if !p.publisher {
		return nil
	}

	req := p.makeControllerUnpublishVolumeRequest(v, nodeID)
	c, err := p.Client(ctx)
	if err != nil {
		return err
	}

	// response of the RPC intentionally left blank
	_, err = c.ControllerUnpublishVolume(ctx, req)
	return err
}

// AddNode adds a mapping for a node's swarm ID to the ID provided by this CSI
// plugin. This allows future calls to the plugin to be done entirely in terms
// of the swarm node ID.
//
// The CSI node ID is provided by the node as part of the NodeDescription.
func (p *plugin) AddNode(swarmID, csiID string) {
	p.swarmToCSI[swarmID] = csiID
	p.csiToSwarm[csiID] = swarmID
}

// RemoveNode removes a node from this plugin's node mappings.
func (p *plugin) RemoveNode(swarmID string) {
	csiID := p.swarmToCSI[swarmID]
	delete(p.swarmToCSI, swarmID)
	delete(p.csiToSwarm, csiID)
}

// Client retrieves a csi.ControllerClient for this plugin
//
// If this is the first time client has been called and no client yet exists,
// it will initialize the gRPC connection to the remote plugin and create a new
// ControllerClient.
func (p *plugin) Client(ctx context.Context) (csi.ControllerClient, error) {
	if p.controllerClient == nil {
		if err := p.connect(ctx); err != nil {
			return nil, err
		}
	}
	return p.controllerClient, nil
}

// makeCreateVolume makes a csi.CreateVolumeRequest from the volume object and
// spec. it uses the Plugin's SecretProvider to retrieve relevant secrets.
func (p *plugin) makeCreateVolume(v *api.Volume) *csi.CreateVolumeRequest {
	secrets := p.makeSecrets(v)
	return &csi.CreateVolumeRequest{
		Name:       v.Spec.Annotations.Name,
		Parameters: v.Spec.Driver.Options,
		VolumeCapabilities: []*csi.VolumeCapability{
			capability.MakeCapability(v.Spec.AccessMode),
		},
		Secrets:                   secrets,
		AccessibilityRequirements: makeTopologyRequirement(v.Spec.AccessibilityRequirements),
		CapacityRange:             makeCapacityRange(v.Spec.CapacityRange),
	}
}

// makeSecrets uses the plugin's SecretProvider to make the secrets map to pass
// to CSI RPCs.
func (p *plugin) makeSecrets(v *api.Volume) map[string]string {
	secrets := map[string]string{}
	for _, vs := range v.Spec.Secrets {
		// a secret should never be nil, but check just to be sure
		if vs != nil {
			secret := p.provider.GetSecret(vs.Secret)
			if secret != nil {
				// TODO(dperny): return an error, but this should never happen,
				// as secrets should be validated at volume creation time
				secrets[vs.Key] = string(secret.Spec.Data)
			}
		}
	}
	return secrets
}

func (p *plugin) makeControllerPublishVolumeRequest(v *api.Volume, nodeID string) *csi.ControllerPublishVolumeRequest {
	if v.VolumeInfo == nil {
		return nil
	}

	secrets := p.makeSecrets(v)
	capability := capability.MakeCapability(v.Spec.AccessMode)
	capability.AccessType = &csi.VolumeCapability_Mount{
		Mount: &csi.VolumeCapability_MountVolume{},
	}
	return &csi.ControllerPublishVolumeRequest{
		VolumeId:         v.VolumeInfo.VolumeID,
		NodeId:           p.swarmToCSI[nodeID],
		Secrets:          secrets,
		VolumeCapability: capability,
		VolumeContext:    v.VolumeInfo.VolumeContext,
	}
}

func (p *plugin) makeControllerUnpublishVolumeRequest(v *api.Volume, nodeID string) *csi.ControllerUnpublishVolumeRequest {
	if v.VolumeInfo == nil {
		return nil
	}

	secrets := p.makeSecrets(v)
	return &csi.ControllerUnpublishVolumeRequest{
		VolumeId: v.VolumeInfo.VolumeID,
		NodeId:   p.swarmToCSI[nodeID],
		Secrets:  secrets,
	}
}

func (p *plugin) Addr() net.Addr {
	return p.addr
}