package manager

import (
	"context"
	"time"

	"code.cloudfoundry.org/clock"
	"github.com/moby/swarmkit/v2/api"
	"github.com/moby/swarmkit/v2/log"
	"github.com/moby/swarmkit/v2/manager/state/raft"
	"github.com/moby/swarmkit/v2/manager/state/raft/membership"
	"github.com/moby/swarmkit/v2/manager/state/store"
)

const (
	// roleReconcileInterval is how often to retry removing a node, if a reconciliation or
	// removal failed
	roleReconcileInterval = 5 * time.Second

	// removalTimeout is how long to wait before a raft member removal fails to be applied
	// to the store
	removalTimeout = 5 * time.Second
)

// roleManager reconciles the raft member list with desired role changes.
type roleManager struct {
	ctx    context.Context
	cancel func()

	store    *store.MemoryStore
	raft     *raft.Node
	doneChan chan struct{}

	// pendingReconciliation contains changed nodes that have not yet been reconciled in
	// the raft member list.
	pendingReconciliation map[string]*api.Node

	// pendingRemoval contains the IDs of nodes that have been deleted - if these correspond
	// to members in the raft cluster, those members need to be removed from raft
	pendingRemoval map[string]struct{}

	// leave this nil except for tests which need to inject a fake time source
	clocksource clock.Clock
}

// newRoleManager creates a new roleManager.
func newRoleManager(store *store.MemoryStore, raftNode *raft.Node) *roleManager {
	ctx, cancel := context.WithCancel(context.Background())
	return &roleManager{
		ctx:                   ctx,
		cancel:                cancel,
		store:                 store,
		raft:                  raftNode,
		doneChan:              make(chan struct{}),
		pendingReconciliation: make(map[string]*api.Node),
		pendingRemoval:        make(map[string]struct{}),
	}
}

// getTicker returns a ticker based on the configured clock source
func (rm *roleManager) getTicker(interval time.Duration) clock.Ticker {
	if rm.clocksource == nil {
		return clock.NewClock().NewTicker(interval)
	}
	return rm.clocksource.NewTicker(interval)

}

// Run is roleManager's main loop.  On startup, it looks at every node object in the cluster and
// attempts to reconcile the raft member list with all the nodes' desired roles.  If any nodes
// need to be demoted or promoted, it will add them to a reconciliation queue, and if any raft
// members' node have been deleted, it will add them to a removal queue.

// These queues are processed immediately, and any nodes that failed to be processed are
// processed again in the next reconciliation interval, so that nodes will hopefully eventually
// be reconciled.  As node updates come in, any promotions or demotions are also added to the
// reconciliation queue and reconciled.  As node removals come in, they are added to the removal
// queue to be removed from the raft cluster.

// Removal from a raft cluster is idempotent (and it's the only raft cluster change that will occur
// during reconciliation or removal), so it's fine if a node is in both the removal and reconciliation
// queues.

// The ctx param is only used for logging.
func (rm *roleManager) Run(ctx context.Context) {
	defer close(rm.doneChan)

	var (
		nodes []*api.Node

		// ticker and tickerCh are used to time the reconciliation interval, which will
		// periodically attempt to re-reconcile nodes that failed to reconcile the first
		// time through
		ticker   clock.Ticker
		tickerCh <-chan time.Time
	)

	watcher, cancelWatch, err := store.ViewAndWatch(rm.store,
		func(readTx store.ReadTx) error {
			var err error
			nodes, err = store.FindNodes(readTx, store.All)
			return err
		},
		api.EventUpdateNode{},
		api.EventDeleteNode{})
	defer cancelWatch()

	if err != nil {
		log.G(ctx).WithError(err).Error("failed to check nodes for role changes")
	} else {
		// Assume all raft members have been deleted from the cluster, until the node list
		// tells us otherwise.  We can make this assumption because the node object must
		// exist first before the raft member object.

		// Background life-cycle for a manager: it joins the cluster, getting a new TLS
		// certificate. To get a TLS certificate, it makes an RPC call to the CA server,
		// which on successful join adds its information to the cluster node list and
		// eventually generates a TLS certificate for it. Once it has a TLS certificate,
		// it can contact the other nodes, and makes an RPC call to request to join the
		// raft cluster.  The node it contacts will add the node to the raft membership.
		for _, member := range rm.raft.GetMemberlist() {
			rm.pendingRemoval[member.NodeID] = struct{}{}
		}
		for _, node := range nodes {
			// if the node exists, we don't want it removed from the raft membership cluster
			// necessarily
			delete(rm.pendingRemoval, node.ID)

			// reconcile each existing node
			rm.pendingReconciliation[node.ID] = node
			rm.reconcileRole(ctx, node)
		}
		for nodeID := range rm.pendingRemoval {
			rm.evictRemovedNode(ctx, nodeID)
		}
		// If any reconciliations or member removals failed, we want to try again, so
		// make sure that we start the ticker so we can try again and again every
		// roleReconciliationInterval seconds until the queues are both empty.
		if len(rm.pendingReconciliation) != 0 || len(rm.pendingRemoval) != 0 {
			ticker = rm.getTicker(roleReconcileInterval)
			tickerCh = ticker.C()
		}
	}

	for {
		select {
		case event := <-watcher:
			switch ev := event.(type) {
			case api.EventUpdateNode:
				rm.pendingReconciliation[ev.Node.ID] = ev.Node
				rm.reconcileRole(ctx, ev.Node)
			case api.EventDeleteNode:
				rm.pendingRemoval[ev.Node.ID] = struct{}{}
				rm.evictRemovedNode(ctx, ev.Node.ID)
			}
			// If any reconciliations or member removals failed, we want to try again, so
			// make sure that we start the ticker so we can try again and again every
			// roleReconciliationInterval seconds until the queues are both empty.
			if (len(rm.pendingReconciliation) != 0 || len(rm.pendingRemoval) != 0) && ticker == nil {
				ticker = rm.getTicker(roleReconcileInterval)
				tickerCh = ticker.C()
			}
		case <-tickerCh:
			for _, node := range rm.pendingReconciliation {
				rm.reconcileRole(ctx, node)
			}
			for nodeID := range rm.pendingRemoval {
				rm.evictRemovedNode(ctx, nodeID)
			}
			if len(rm.pendingReconciliation) == 0 && len(rm.pendingRemoval) == 0 {
				ticker.Stop()
				ticker = nil
				tickerCh = nil
			}
		case <-rm.ctx.Done():
			if ticker != nil {
				ticker.Stop()
			}
			return
		}
	}
}

// evictRemovedNode evicts a removed node from the raft cluster membership.  This is to cover an edge case in which
// a node might have been removed, but somehow the role was not reconciled (possibly a demotion and a removal happened
// in rapid succession before the raft membership configuration went through).
func (rm *roleManager) evictRemovedNode(ctx context.Context, nodeID string) {
	// Check if the member still exists in the membership
	member := rm.raft.GetMemberByNodeID(nodeID)
	if member != nil {
		// We first try to remove the raft node from the raft cluster.  On the next tick, if the node
		// has been removed from the cluster membership, we then delete it from the removed list
		rm.removeMember(ctx, member)
		return
	}
	delete(rm.pendingRemoval, nodeID)
}

// removeMember removes a member from the raft cluster membership
func (rm *roleManager) removeMember(ctx context.Context, member *membership.Member) {
	// Quorum safeguard - quorum should have been checked before a node was allowed to be demoted, but if in the
	// intervening time some other node disconnected, removing this node would result in a loss of cluster quorum.
	// We leave it
	if !rm.raft.CanRemoveMember(member.RaftID) {
		// TODO(aaronl): Retry later
		log.G(ctx).Debugf("can't demote node %s at this time: removing member from raft would result in a loss of quorum", member.NodeID)
		return
	}

	rmCtx, rmCancel := context.WithTimeout(rm.ctx, removalTimeout)
	defer rmCancel()

	if member.RaftID == rm.raft.Config.ID {
		// Don't use rmCtx, because we expect to lose
		// leadership, which will cancel this context.
		log.G(ctx).Info("demoted; transferring leadership")
		err := rm.raft.TransferLeadership(context.Background())
		if err == nil {
			return
		}
		log.G(ctx).WithError(err).Info("failed to transfer leadership")
	}
	if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil {
		// TODO(aaronl): Retry later
		log.G(ctx).WithError(err).Debugf("can't demote node %s at this time", member.NodeID)
	}
}

// reconcileRole looks at the desired role for a node, and if it is being demoted or promoted, updates the
// node role accordingly.   If the node is being demoted, it also removes the node from the raft cluster membership.
func (rm *roleManager) reconcileRole(ctx context.Context, node *api.Node) {
	if node.Role == node.Spec.DesiredRole {
		// Nothing to do.
		delete(rm.pendingReconciliation, node.ID)
		return
	}

	// Promotion can proceed right away.
	if node.Spec.DesiredRole == api.NodeRoleManager && node.Role == api.NodeRoleWorker {
		err := rm.store.Update(func(tx store.Tx) error {
			updatedNode := store.GetNode(tx, node.ID)
			if updatedNode == nil || updatedNode.Spec.DesiredRole != node.Spec.DesiredRole || updatedNode.Role != node.Role {
				return nil
			}
			updatedNode.Role = api.NodeRoleManager
			return store.UpdateNode(tx, updatedNode)
		})
		if err != nil {
			log.G(ctx).WithError(err).Errorf("failed to promote node %s", node.ID)
		} else {
			delete(rm.pendingReconciliation, node.ID)
		}
	} else if node.Spec.DesiredRole == api.NodeRoleWorker && node.Role == api.NodeRoleManager {
		// Check for node in memberlist
		member := rm.raft.GetMemberByNodeID(node.ID)
		if member != nil {
			// We first try to remove the raft node from the raft cluster.  On the next tick, if the node
			// has been removed from the cluster membership, we then update the store to reflect the fact
			// that it has been successfully demoted, and if that works, remove it from the pending list.
			rm.removeMember(ctx, member)
			return
		}

		err := rm.store.Update(func(tx store.Tx) error {
			updatedNode := store.GetNode(tx, node.ID)
			if updatedNode == nil || updatedNode.Spec.DesiredRole != node.Spec.DesiredRole || updatedNode.Role != node.Role {
				return nil
			}
			updatedNode.Role = api.NodeRoleWorker

			return store.UpdateNode(tx, updatedNode)
		})
		if err != nil {
			log.G(ctx).WithError(err).Errorf("failed to demote node %s", node.ID)
		} else {
			delete(rm.pendingReconciliation, node.ID)
		}
	}
}

// Stop stops the roleManager and waits for the main loop to exit.
func (rm *roleManager) Stop() {
	rm.cancel()
	<-rm.doneChan
}