package raft

import (
	"errors"
	"fmt"
	"math"
	"math/rand"
	"net"
	"sync"
	"sync/atomic"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials"

	"golang.org/x/net/context"

	"github.com/Sirupsen/logrus"
	"github.com/coreos/etcd/pkg/idutil"
	"github.com/coreos/etcd/raft"
	"github.com/coreos/etcd/raft/raftpb"
	"github.com/coreos/etcd/snap"
	"github.com/coreos/etcd/wal"
	"github.com/docker/go-events"
	"github.com/docker/swarmkit/api"
	"github.com/docker/swarmkit/ca"
	"github.com/docker/swarmkit/log"
	"github.com/docker/swarmkit/manager/state/raft/membership"
	"github.com/docker/swarmkit/manager/state/store"
	"github.com/gogo/protobuf/proto"
	"github.com/pivotal-golang/clock"
)

var (
	// ErrHealthCheckFailure is returned when there is an issue with the initial handshake which means
	// that the address provided must be invalid or there is ongoing connectivity issues at join time.
	ErrHealthCheckFailure = errors.New("raft: could not connect to prospective new cluster member using its advertised address")
	// ErrNoRaftMember is thrown when the node is not yet part of a raft cluster
	ErrNoRaftMember = errors.New("raft: node is not yet part of a raft cluster")
	// ErrConfChangeRefused is returned when there is an issue with the configuration change
	ErrConfChangeRefused = errors.New("raft: propose configuration change refused")
	// ErrApplyNotSpecified is returned during the creation of a raft node when no apply method was provided
	ErrApplyNotSpecified = errors.New("raft: apply method was not specified")
	// ErrAppendEntry is thrown when the node fail to append an entry to the logs
	ErrAppendEntry = errors.New("raft: failed to append entry to logs")
	// ErrSetHardState is returned when the node fails to set the hard state
	ErrSetHardState = errors.New("raft: failed to set the hard state for log append entry")
	// ErrApplySnapshot is returned when the node fails to apply a snapshot
	ErrApplySnapshot = errors.New("raft: failed to apply snapshot on raft node")
	// ErrStopped is returned when an operation was submitted but the node was stopped in the meantime
	ErrStopped = errors.New("raft: failed to process the request: node is stopped")
	// ErrLostLeadership is returned when an operation was submitted but the node lost leader status before it became committed
	ErrLostLeadership = errors.New("raft: failed to process the request: node lost leader status")
	// ErrRequestTooLarge is returned when a raft internal message is too large to be sent
	ErrRequestTooLarge = errors.New("raft: raft message is too large and can't be sent")
	// ErrCannotRemoveMember is thrown when we try to remove a member from the cluster but this would result in a loss of quorum
	ErrCannotRemoveMember = errors.New("raft: member cannot be removed, because removing it may result in loss of quorum")
	// ErrMemberRemoved is thrown when a node was removed from the cluster
	ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
	// ErrNoClusterLeader is thrown when the cluster has no elected leader
	ErrNoClusterLeader = errors.New("raft: no elected cluster leader")
)

// LeadershipState indicates whether the node is a leader or follower.
type LeadershipState int

const (
	// IsLeader indicates that the node is a raft leader.
	IsLeader LeadershipState = iota
	// IsFollower indicates that the node is a raft follower.
	IsFollower
)

// Node represents the Raft Node useful
// configuration.
type Node struct {
	raft.Node
	cluster *membership.Cluster

	Server         *grpc.Server
	Ctx            context.Context
	cancel         func()
	tlsCredentials credentials.TransportAuthenticator

	Address  string
	StateDir string
	Error    error

	raftStore           *raft.MemoryStorage
	memoryStore         *store.MemoryStore
	Config              *raft.Config
	opts                NewNodeOptions
	reqIDGen            *idutil.Generator
	wait                *wait
	wal                 *wal.WAL
	snapshotter         *snap.Snapshotter
	campaignWhenAble    bool
	signalledLeadership uint32
	isMember            uint32
	joinAddr            string

	// waitProp waits for all the proposals to be terminated before
	// shutting down the node.
	waitProp sync.WaitGroup

	confState     raftpb.ConfState
	appliedIndex  uint64
	snapshotIndex uint64

	ticker      clock.Ticker
	sendTimeout time.Duration
	stopCh      chan struct{}
	doneCh      chan struct{}
	// removeRaftCh notifies about node deletion from raft cluster
	removeRaftCh        chan struct{}
	removeRaftOnce      sync.Once
	leadershipBroadcast *events.Broadcaster

	// used to coordinate shutdown
	stopMu sync.RWMutex
	// used for membership management checks
	membershipLock sync.Mutex

	snapshotInProgress chan uint64
	asyncTasks         sync.WaitGroup
}

// NewNodeOptions provides arguments for NewNode
type NewNodeOptions struct {
	// ID is the node's ID, from its certificate's CN field.
	ID string
	// Addr is the address of this node's listener
	Addr string
	// ForceNewCluster defines if we have to force a new cluster
	// because we are recovering from a backup data directory.
	ForceNewCluster bool
	// JoinAddr is the cluster to join. May be an empty string to create
	// a standalone cluster.
	JoinAddr string
	// Config is the raft config.
	Config *raft.Config
	// StateDir is the directory to store durable state.
	StateDir string
	// TickInterval interval is the time interval between raft ticks.
	TickInterval time.Duration
	// ClockSource is a Clock interface to use as a time base.
	// Leave this nil except for tests that are designed not to run in real
	// time.
	ClockSource clock.Clock
	// SendTimeout is the timeout on the sending messages to other raft
	// nodes. Leave this as 0 to get the default value.
	SendTimeout    time.Duration
	TLSCredentials credentials.TransportAuthenticator
}

func init() {
	rand.Seed(time.Now().UnixNano())
}

// NewNode generates a new Raft node
func NewNode(ctx context.Context, opts NewNodeOptions) *Node {
	cfg := opts.Config
	if cfg == nil {
		cfg = DefaultNodeConfig()
	}
	if opts.TickInterval == 0 {
		opts.TickInterval = time.Second
	}

	raftStore := raft.NewMemoryStorage()

	ctx, cancel := context.WithCancel(ctx)

	n := &Node{
		Ctx:            ctx,
		cancel:         cancel,
		cluster:        membership.NewCluster(),
		tlsCredentials: opts.TLSCredentials,
		raftStore:      raftStore,
		Address:        opts.Addr,
		opts:           opts,
		Config: &raft.Config{
			ElectionTick:    cfg.ElectionTick,
			HeartbeatTick:   cfg.HeartbeatTick,
			Storage:         raftStore,
			MaxSizePerMsg:   cfg.MaxSizePerMsg,
			MaxInflightMsgs: cfg.MaxInflightMsgs,
			Logger:          cfg.Logger,
		},
		stopCh:              make(chan struct{}),
		doneCh:              make(chan struct{}),
		removeRaftCh:        make(chan struct{}),
		StateDir:            opts.StateDir,
		joinAddr:            opts.JoinAddr,
		sendTimeout:         2 * time.Second,
		leadershipBroadcast: events.NewBroadcaster(),
	}
	n.memoryStore = store.NewMemoryStore(n)

	if opts.ClockSource == nil {
		n.ticker = clock.NewClock().NewTicker(opts.TickInterval)
	} else {
		n.ticker = opts.ClockSource.NewTicker(opts.TickInterval)
	}
	if opts.SendTimeout != 0 {
		n.sendTimeout = opts.SendTimeout
	}

	n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now())
	n.wait = newWait()

	return n
}

// JoinAndStart joins and starts the raft server
func (n *Node) JoinAndStart() error {
	loadAndStartErr := n.loadAndStart(n.Ctx, n.opts.ForceNewCluster)
	if loadAndStartErr != nil && loadAndStartErr != errNoWAL {
		n.ticker.Stop()
		return loadAndStartErr
	}

	snapshot, err := n.raftStore.Snapshot()
	// Snapshot never returns an error
	if err != nil {
		panic("could not get snapshot of raft store")
	}

	n.confState = snapshot.Metadata.ConfState
	n.appliedIndex = snapshot.Metadata.Index
	n.snapshotIndex = snapshot.Metadata.Index

	if loadAndStartErr == errNoWAL {
		if n.joinAddr != "" {
			c, err := n.ConnectToMember(n.joinAddr, 10*time.Second)
			if err != nil {
				return err
			}
			client := api.NewRaftMembershipClient(c.Conn)
			defer func() {
				_ = c.Conn.Close()
			}()

			ctx, cancel := context.WithTimeout(n.Ctx, 10*time.Second)
			defer cancel()
			resp, err := client.Join(ctx, &api.JoinRequest{
				Addr: n.Address,
			})
			if err != nil {
				return err
			}

			n.Config.ID = resp.RaftID

			if _, err := n.createWAL(n.opts.ID); err != nil {
				return err
			}

			n.Node = raft.StartNode(n.Config, []raft.Peer{})

			if err := n.registerNodes(resp.Members); err != nil {
				return err
			}
		} else {
			// First member in the cluster, self-assign ID
			n.Config.ID = uint64(rand.Int63()) + 1
			peer, err := n.createWAL(n.opts.ID)
			if err != nil {
				return err
			}
			n.Node = raft.StartNode(n.Config, []raft.Peer{peer})
			if err := n.Campaign(n.Ctx); err != nil {
				return err
			}
		}
		atomic.StoreUint32(&n.isMember, 1)
		return nil
	}

	if n.joinAddr != "" {
		n.Config.Logger.Warning("ignoring request to join cluster, because raft state already exists")
	}
	n.campaignWhenAble = true
	n.Node = raft.RestartNode(n.Config)
	atomic.StoreUint32(&n.isMember, 1)
	return nil
}

// DefaultNodeConfig returns the default config for a
// raft node that can be modified and customized
func DefaultNodeConfig() *raft.Config {
	return &raft.Config{
		HeartbeatTick:   1,
		ElectionTick:    3,
		MaxSizePerMsg:   math.MaxUint16,
		MaxInflightMsgs: 256,
		Logger:          log.L,
	}
}

// DefaultRaftConfig returns a default api.RaftConfig.
func DefaultRaftConfig() api.RaftConfig {
	return api.RaftConfig{
		KeepOldSnapshots:           0,
		SnapshotInterval:           10000,
		LogEntriesForSlowFollowers: 500,
		ElectionTick:               3,
		HeartbeatTick:              1,
	}
}

// MemoryStore returns the memory store that is kept in sync with the raft log.
func (n *Node) MemoryStore() *store.MemoryStore {
	return n.memoryStore
}

// Run is the main loop for a Raft node, it goes along the state machine,
// acting on the messages received from other Raft nodes in the cluster.
//
// Before running the main loop, it first starts the raft node based on saved
// cluster state. If no saved state exists, it starts a single-node cluster.
func (n *Node) Run(ctx context.Context) error {
	defer func() {
		close(n.doneCh)
	}()

	wasLeader := false

	for {
		select {
		case <-n.ticker.C():
			n.Tick()

		case rd := <-n.Ready():
			raftConfig := DefaultRaftConfig()
			n.memoryStore.View(func(readTx store.ReadTx) {
				clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
				if err == nil && len(clusters) == 1 {
					raftConfig = clusters[0].Spec.Raft
				}
			})

			// Save entries to storage
			if err := n.saveToStorage(&raftConfig, rd.HardState, rd.Entries, rd.Snapshot); err != nil {
				n.Config.Logger.Error(err)
			}

			// Send raft messages to peers
			if err := n.send(rd.Messages); err != nil {
				n.Config.Logger.Error(err)
			}

			// Apply snapshot to memory store. The snapshot
			// was applied to the raft store in
			// saveToStorage.
			if !raft.IsEmptySnap(rd.Snapshot) {
				// Load the snapshot data into the store
				if err := n.restoreFromSnapshot(rd.Snapshot.Data, false); err != nil {
					n.Config.Logger.Error(err)
				}
				n.appliedIndex = rd.Snapshot.Metadata.Index
				n.snapshotIndex = rd.Snapshot.Metadata.Index
				n.confState = rd.Snapshot.Metadata.ConfState
			}

			// If we cease to be the leader, we must cancel any
			// proposals that are currently waiting for a quorum to
			// acknowledge them. It is still possible for these to
			// become committed, but if that happens we will apply
			// them as any follower would.

			// It is important that we cancel these proposals before
			// calling processCommitted, so processCommitted does
			// not deadlock.

			if rd.SoftState != nil {
				if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
					wasLeader = false
					if atomic.LoadUint32(&n.signalledLeadership) == 1 {
						atomic.StoreUint32(&n.signalledLeadership, 0)
						n.leadershipBroadcast.Write(IsFollower)
					}

					// It is important that we set n.signalledLeadership to 0
					// before calling n.wait.cancelAll. When a new raft
					// request is registered, it checks n.signalledLeadership
					// afterwards, and cancels the registration if it is 0.
					// If cancelAll was called first, this call might run
					// before the new request registers, but
					// signalledLeadership would be set after the check.
					// Setting signalledLeadership before calling cancelAll
					// ensures that if a new request is registered during
					// this transition, it will either be cancelled by
					// cancelAll, or by its own check of signalledLeadership.
					n.wait.cancelAll()
				} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
					wasLeader = true
				}
			}

			// Process committed entries
			for _, entry := range rd.CommittedEntries {
				if err := n.processCommitted(entry); err != nil {
					n.Config.Logger.Error(err)
				}
			}

			// Trigger a snapshot every once in awhile
			if n.snapshotInProgress == nil &&
				raftConfig.SnapshotInterval > 0 &&
				n.appliedIndex-n.snapshotIndex >= raftConfig.SnapshotInterval {
				n.doSnapshot(&raftConfig)
			}

			if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
				// If all the entries in the log have become
				// committed, broadcast our leadership status.
				if n.caughtUp() {
					atomic.StoreUint32(&n.signalledLeadership, 1)
					n.leadershipBroadcast.Write(IsLeader)
				}
			}

			// If we are the only registered member after
			// restoring from the state, campaign to be the
			// leader.
			if n.campaignWhenAble {
				members := n.cluster.Members()
				if len(members) >= 1 {
					n.campaignWhenAble = false
				}
				if len(members) == 1 && members[n.Config.ID] != nil {
					if err := n.Campaign(n.Ctx); err != nil {
						panic("raft: cannot campaign to be the leader on node restore")
					}
				}
			}

			// Advance the state machine
			n.Advance()

		case snapshotIndex := <-n.snapshotInProgress:
			if snapshotIndex > n.snapshotIndex {
				n.snapshotIndex = snapshotIndex
			}
			n.snapshotInProgress = nil
		case <-n.removeRaftCh:
			// If the node was removed from other members,
			// send back an error to the caller to start
			// the shutdown process.
			n.stop()

			// Move WAL and snapshot out of the way, since
			// they are no longer usable.
			if err := n.moveWALAndSnap(); err != nil {
				n.Config.Logger.Error(err)
			}

			return ErrMemberRemoved
		case <-n.stopCh:
			n.stop()
			return nil
		}
	}
}

// Shutdown stops the raft node processing loop.
// Calling Shutdown on an already stopped node
// will result in a panic.
func (n *Node) Shutdown() {
	select {
	case <-n.doneCh:
	default:
		close(n.stopCh)
		<-n.doneCh
	}
}

// isShutdown indicates if node was shut down.
// This method should be called under n.stopMu to avoid races with n.stop().
func (n *Node) isShutdown() bool {
	select {
	case <-n.Ctx.Done():
		return true
	default:
		return false
	}
}

func (n *Node) stop() {
	n.stopMu.Lock()
	defer n.stopMu.Unlock()

	n.cancel()
	n.waitProp.Wait()
	n.asyncTasks.Wait()

	members := n.cluster.Members()
	for _, member := range members {
		if member.Conn != nil {
			_ = member.Conn.Close()
		}
	}
	n.Stop()
	n.ticker.Stop()
	if err := n.wal.Close(); err != nil {
		n.Config.Logger.Errorf("raft: error closing WAL: %v", err)
	}
	// TODO(stevvooe): Handle ctx.Done()
}

// IsLeader checks if we are the leader or not
func (n *Node) IsLeader() bool {
	if !n.IsMember() {
		return false
	}

	if n.Node.Status().Lead == n.Config.ID {
		return true
	}
	return false
}

// Leader returns the id of the leader
func (n *Node) Leader() uint64 {
	if !n.IsMember() {
		return 0
	}
	return n.Node.Status().Lead
}

// ReadyForProposals returns true if the node has broadcasted a message
// saying that it has become the leader. This means it is ready to accept
// proposals.
func (n *Node) ReadyForProposals() bool {
	return atomic.LoadUint32(&n.signalledLeadership) == 1
}

func (n *Node) caughtUp() bool {
	// obnoxious function that always returns a nil error
	lastIndex, _ := n.raftStore.LastIndex()
	return n.appliedIndex >= lastIndex
}

// Join asks to a member of the raft to propose
// a configuration change and add us as a member thus
// beginning the log replication process. This method
// is called from an aspiring member to an existing member
func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinResponse, error) {
	nodeInfo, err := ca.RemoteNode(ctx)
	if err != nil {
		return nil, err
	}

	fields := logrus.Fields{
		"node.id": nodeInfo.NodeID,
		"method":  "(*Node).Join",
	}
	if nodeInfo.ForwardedBy != nil {
		fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
	}
	log := log.G(ctx).WithFields(fields)

	// can't stop the raft node while an async RPC is in progress
	n.stopMu.RLock()
	defer n.stopMu.RUnlock()

	n.membershipLock.Lock()
	defer n.membershipLock.Unlock()

	if !n.IsMember() {
		return nil, ErrNoRaftMember
	}

	if n.IsStopped() {
		log.WithError(ErrStopped).Errorf(ErrStopped.Error())
		return nil, ErrStopped
	}

	if !n.IsLeader() {
		return nil, ErrLostLeadership
	}

	// Find a unique ID for the joining member.
	var raftID uint64
	for {
		raftID = uint64(rand.Int63()) + 1
		if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) {
			break
		}
	}

	remoteAddr := req.Addr

	// If the joining node sent an address like 0.0.0.0:4242, automatically
	// determine its actual address based on the GRPC connection. This
	// avoids the need for a prospective member to know its own address.

	requestHost, requestPort, err := net.SplitHostPort(remoteAddr)
	if err != nil {
		return nil, fmt.Errorf("invalid address %s in raft join request", remoteAddr)
	}

	requestIP := net.ParseIP(requestHost)
	if requestIP != nil && requestIP.IsUnspecified() {
		remoteHost, _, err := net.SplitHostPort(nodeInfo.RemoteAddr)
		if err != nil {
			return nil, err
		}
		remoteAddr = net.JoinHostPort(remoteHost, requestPort)
	}

	// We do not bother submitting a configuration change for the
	// new member if we can't contact it back using its address
	if err := n.checkHealth(ctx, remoteAddr, 5*time.Second); err != nil {
		return nil, err
	}

	err = n.addMember(ctx, remoteAddr, raftID, nodeInfo.NodeID)
	if err != nil {
		log.WithError(err).Errorf("failed to add member")
		return nil, err
	}

	var nodes []*api.RaftMember
	for _, node := range n.cluster.Members() {
		nodes = append(nodes, &api.RaftMember{
			RaftID: node.RaftID,
			NodeID: node.NodeID,
			Addr:   node.Addr,
		})
	}
	log.Debugf("node joined")

	return &api.JoinResponse{Members: nodes, RaftID: raftID}, nil
}

// checkHealth tries to contact an aspiring member through its advertised address
// and checks if its raft server is running.
func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Duration) error {
	conn, err := dial(addr, "tcp", n.tlsCredentials, timeout)
	if err != nil {
		return err
	}

	client := api.NewHealthClient(conn)
	defer conn.Close()

	resp, err := client.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
	if err != nil {
		return ErrHealthCheckFailure
	}
	if resp != nil && resp.Status != api.HealthCheckResponse_SERVING {
		return ErrHealthCheckFailure
	}

	return nil
}

// addMember submits a configuration change to add a new member on the raft cluster.
func (n *Node) addMember(ctx context.Context, addr string, raftID uint64, nodeID string) error {
	node := api.RaftMember{
		RaftID: raftID,
		NodeID: nodeID,
		Addr:   addr,
	}

	meta, err := node.Marshal()
	if err != nil {
		return err
	}

	cc := raftpb.ConfChange{
		Type:    raftpb.ConfChangeAddNode,
		NodeID:  raftID,
		Context: meta,
	}

	// Wait for a raft round to process the configuration change
	err = n.configure(ctx, cc)
	return err
}

// Leave asks to a member of the raft to remove
// us from the raft cluster. This method is called
// from a member who is willing to leave its raft
// membership to an active member of the raft
func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) {
	nodeInfo, err := ca.RemoteNode(ctx)
	if err != nil {
		return nil, err
	}

	fields := logrus.Fields{
		"node.id": nodeInfo.NodeID,
		"method":  "(*Node).Leave",
	}
	if nodeInfo.ForwardedBy != nil {
		fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
	}
	log.G(ctx).WithFields(fields).Debugf("")

	// can't stop the raft node while an async RPC is in progress
	n.stopMu.RLock()
	defer n.stopMu.RUnlock()

	if !n.IsMember() {
		return nil, ErrNoRaftMember
	}

	if n.IsStopped() {
		return nil, ErrStopped
	}

	if !n.IsLeader() {
		return nil, ErrLostLeadership
	}

	err = n.RemoveMember(ctx, req.Node.RaftID)
	if err != nil {
		return nil, err
	}

	return &api.LeaveResponse{}, nil
}

// CanRemoveMember checks if a member can be removed from
// the context of the current node.
func (n *Node) CanRemoveMember(id uint64) bool {
	return n.cluster.CanRemoveMember(n.Config.ID, id)
}

// RemoveMember submits a configuration change to remove a member from the raft cluster
// after checking if the operation would not result in a loss of quorum.
func (n *Node) RemoveMember(ctx context.Context, id uint64) error {
	n.membershipLock.Lock()
	defer n.membershipLock.Unlock()

	if n.cluster.CanRemoveMember(n.Config.ID, id) {
		cc := raftpb.ConfChange{
			ID:      id,
			Type:    raftpb.ConfChangeRemoveNode,
			NodeID:  id,
			Context: []byte(""),
		}

		err := n.configure(ctx, cc)
		return err
	}

	return ErrCannotRemoveMember
}

// ProcessRaftMessage calls 'Step' which advances the
// raft state machine with the provided message on the
// receiving node
func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
	if msg == nil || msg.Message == nil {
		return nil, grpc.Errorf(codes.InvalidArgument, "no message provided")
	}

	// Don't process the message if this comes from
	// a node in the remove set
	if n.cluster.IsIDRemoved(msg.Message.From) {
		return nil, ErrMemberRemoved
	}

	if msg.Message.Type == raftpb.MsgProp {
		// We don't accepted forwarded proposals. Our
		// current architecture depends on only the leader
		// making proposals, so in-flight proposals can be
		// guaranteed not to conflict.
		return nil, grpc.Errorf(codes.InvalidArgument, "proposals not accepted")
	}

	// can't stop the raft node while an async RPC is in progress
	n.stopMu.RLock()
	defer n.stopMu.RUnlock()

	if !n.IsMember() {
		return nil, ErrNoRaftMember
	}

	if n.IsStopped() {
		return nil, ErrStopped
	}

	if err := n.Step(n.Ctx, *msg.Message); err != nil {
		return nil, err
	}

	return &api.ProcessRaftMessageResponse{}, nil
}

// ResolveAddress returns the address reaching for a given node ID.
func (n *Node) ResolveAddress(ctx context.Context, msg *api.ResolveAddressRequest) (*api.ResolveAddressResponse, error) {
	if !n.IsMember() {
		return nil, ErrNoRaftMember
	}

	nodeInfo, err := ca.RemoteNode(ctx)
	if err != nil {
		return nil, err
	}

	fields := logrus.Fields{
		"node.id": nodeInfo.NodeID,
		"method":  "(*Node).ResolveAddress",
	}
	if nodeInfo.ForwardedBy != nil {
		fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
	}
	log.G(ctx).WithFields(fields).Debugf("")

	member := n.cluster.GetMember(msg.RaftID)
	if member == nil {
		return nil, grpc.Errorf(codes.NotFound, "member %x not found", msg.RaftID)
	}
	return &api.ResolveAddressResponse{Addr: member.Addr}, nil
}

// LeaderAddr returns address of current cluster leader.
// With this method Node satisfies raftpicker.AddrSelector interface.
func (n *Node) LeaderAddr() (string, error) {
	n.stopMu.RLock()
	defer n.stopMu.RUnlock()
	if n.isShutdown() {
		return "", fmt.Errorf("raft node is shut down")
	}
	ctx, cancel := context.WithTimeout(n.Ctx, 10*time.Second)
	defer cancel()
	if err := WaitForLeader(ctx, n); err != nil {
		return "", ErrNoClusterLeader
	}
	if n.IsStopped() {
		return "", ErrStopped
	}
	ms := n.cluster.Members()
	l := ms[n.Leader()]
	if l == nil {
		return "", ErrNoClusterLeader
	}
	return l.Addr, nil
}

// registerNode registers a new node on the cluster memberlist
func (n *Node) registerNode(node *api.RaftMember) error {
	if n.cluster.IsIDRemoved(node.RaftID) {
		return nil
	}

	member := &membership.Member{}

	existingMember := n.cluster.GetMember(node.RaftID)
	if existingMember != nil {
		// Member already exists

		// If the address is different from what we thought it was,
		// update it. This can happen if we just joined a cluster
		// and are adding ourself now with the remotely-reachable
		// address.
		if existingMember.Addr != node.Addr {
			member.RaftMember = node
			member.RaftClient = existingMember.RaftClient
			member.Conn = existingMember.Conn
			n.cluster.AddMember(member)
		}

		return nil
	}

	// Avoid opening a connection to the local node
	if node.RaftID != n.Config.ID {
		// We don't want to impose a timeout on the grpc connection. It
		// should keep retrying as long as necessary, in case the peer
		// is temporarily unavailable.
		var err error
		if member, err = n.ConnectToMember(node.Addr, 0); err != nil {
			return err
		}
	}

	member.RaftMember = node
	err := n.cluster.AddMember(member)
	if err != nil {
		if member.Conn != nil {
			_ = member.Conn.Close()
		}
		return err
	}

	return nil
}

// registerNodes registers a set of nodes in the cluster
func (n *Node) registerNodes(nodes []*api.RaftMember) error {
	for _, node := range nodes {
		if err := n.registerNode(node); err != nil {
			return err
		}
	}

	return nil
}

// ProposeValue calls Propose on the raft and waits
// on the commit log action before returning a result
func (n *Node) ProposeValue(ctx context.Context, storeAction []*api.StoreAction, cb func()) error {
	_, err := n.processInternalRaftRequest(ctx, &api.InternalRaftRequest{Action: storeAction}, cb)
	if err != nil {
		return err
	}
	return nil
}

// GetVersion returns the sequence information for the current raft round.
func (n *Node) GetVersion() *api.Version {
	status := n.Node.Status()
	return &api.Version{Index: status.Commit}
}

// GetMemberlist returns the current list of raft members in the cluster.
func (n *Node) GetMemberlist() map[uint64]*api.RaftMember {
	memberlist := make(map[uint64]*api.RaftMember)
	members := n.cluster.Members()
	leaderID := n.Leader()

	for id, member := range members {
		reachability := api.RaftMemberStatus_REACHABLE
		leader := false

		if member.RaftID != n.Config.ID {
			connState, err := member.Conn.State()
			if err != nil || connState != grpc.Ready {
				reachability = api.RaftMemberStatus_UNREACHABLE
			}
		}

		if member.RaftID == leaderID {
			leader = true
		}

		memberlist[id] = &api.RaftMember{
			RaftID: member.RaftID,
			NodeID: member.NodeID,
			Addr:   member.Addr,
			Status: api.RaftMemberStatus{
				Leader:       leader,
				Reachability: reachability,
			},
		}
	}

	return memberlist
}

// GetMemberByNodeID returns member information based
// on its generic Node ID.
func (n *Node) GetMemberByNodeID(nodeID string) *membership.Member {
	members := n.cluster.Members()
	for _, member := range members {
		if member.NodeID == nodeID {
			return member
		}
	}
	return nil
}

// IsMember checks if the raft node has effectively joined
// a cluster of existing members.
func (n *Node) IsMember() bool {
	return atomic.LoadUint32(&n.isMember) == 1
}

// IsStopped checks if the raft node is stopped or not
func (n *Node) IsStopped() bool {
	if n.Node == nil {
		return true
	}
	return false
}

// canSubmitProposal defines if any more proposals
// could be submitted and processed.
func (n *Node) canSubmitProposal() bool {
	select {
	case <-n.Ctx.Done():
		return false
	default:
		return true
	}
}

// Saves a log entry to our Store
func (n *Node) saveToStorage(raftConfig *api.RaftConfig, hardState raftpb.HardState, entries []raftpb.Entry, snapshot raftpb.Snapshot) (err error) {
	if !raft.IsEmptySnap(snapshot) {
		if err := n.saveSnapshot(snapshot, raftConfig.KeepOldSnapshots); err != nil {
			return ErrApplySnapshot
		}
		if err = n.raftStore.ApplySnapshot(snapshot); err != nil {
			return ErrApplySnapshot
		}
	}

	if err := n.wal.Save(hardState, entries); err != nil {
		// TODO(aaronl): These error types should really wrap more
		// detailed errors.
		return ErrApplySnapshot
	}

	if err = n.raftStore.Append(entries); err != nil {
		return ErrAppendEntry
	}

	return nil
}

// Sends a series of messages to members in the raft
func (n *Node) send(messages []raftpb.Message) error {
	members := n.cluster.Members()

	n.stopMu.RLock()
	defer n.stopMu.RUnlock()

	for _, m := range messages {
		// Process locally
		if m.To == n.Config.ID {
			if err := n.Step(n.Ctx, m); err != nil {
				return err
			}
			continue
		}

		if m.Type == raftpb.MsgProp {
			// We don't forward proposals to the leader. Our
			// current architecture depends on only the leader
			// making proposals, so in-flight proposals can be
			// guaranteed not to conflict.
			continue
		}

		n.asyncTasks.Add(1)
		go n.sendToMember(members, m)
	}

	return nil
}

func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Message) {
	defer n.asyncTasks.Done()

	if n.cluster.IsIDRemoved(m.To) {
		// Should not send to removed members
		return
	}

	ctx, cancel := context.WithTimeout(n.Ctx, n.sendTimeout)
	defer cancel()

	var (
		conn *membership.Member
	)
	if toMember, ok := members[m.To]; ok {
		conn = toMember
	} else {
		// If we are being asked to send to a member that's not in
		// our member list, that could indicate that the current leader
		// was added while we were offline. Try to resolve its address.
		n.Config.Logger.Warningf("sending message to an unrecognized member ID %x", m.To)

		// Choose a random member
		var (
			queryMember *membership.Member
			id          uint64
		)
		for id, queryMember = range members {
			if id != n.Config.ID {
				break
			}
		}

		if queryMember == nil || queryMember.RaftID == n.Config.ID {
			n.Config.Logger.Error("could not find cluster member to query for leader address")
			return
		}

		resp, err := queryMember.ResolveAddress(ctx, &api.ResolveAddressRequest{RaftID: m.To})
		if err != nil {
			n.Config.Logger.Errorf("could not resolve address of member ID %x: %v", m.To, err)
			return
		}
		conn, err = n.ConnectToMember(resp.Addr, n.sendTimeout)
		if err != nil {
			n.Config.Logger.Errorf("could connect to member ID %x at %s: %v", m.To, resp.Addr, err)
			return
		}
		// The temporary connection is only used for this message.
		// Eventually, we should catch up and add a long-lived
		// connection to the member list.
		defer conn.Conn.Close()
	}

	_, err := conn.ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
	if err != nil {
		if grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
			n.removeRaftOnce.Do(func() {
				close(n.removeRaftCh)
			})
		}
		if m.Type == raftpb.MsgSnap {
			n.ReportSnapshot(m.To, raft.SnapshotFailure)
		}
		if n.IsStopped() {
			panic("node is nil")
		}
		n.ReportUnreachable(m.To)

		// Bounce the connection
		newConn, err := n.ConnectToMember(conn.Addr, 0)
		if err != nil {
			n.Config.Logger.Errorf("could connect to member ID %x at %s: %v", m.To, conn.Addr, err)
		} else {
			n.cluster.ReplaceMemberConnection(m.To, conn, newConn)
		}
	} else if m.Type == raftpb.MsgSnap {
		n.ReportSnapshot(m.To, raft.SnapshotFinish)
	}
}

type applyResult struct {
	resp proto.Message
	err  error
}

// processInternalRaftRequest sends a message to nodes participating
// in the raft to apply a log entry and then waits for it to be applied
// on the server. It will block until the update is performed, there is
// an error or until the raft node finalizes all the proposals on node
// shutdown.
func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRaftRequest, cb func()) (proto.Message, error) {
	n.waitProp.Add(1)
	defer n.waitProp.Done()

	if !n.canSubmitProposal() {
		return nil, ErrStopped
	}

	r.ID = n.reqIDGen.Next()

	// This must be derived from the context which is cancelled by stop()
	// to avoid a deadlock on shutdown.
	waitCtx, cancel := context.WithCancel(n.Ctx)

	ch := n.wait.register(r.ID, cb, cancel)

	// Do this check after calling register to avoid a race.
	if atomic.LoadUint32(&n.signalledLeadership) != 1 {
		n.wait.cancel(r.ID)
		return nil, ErrLostLeadership
	}

	data, err := r.Marshal()
	if err != nil {
		n.wait.cancel(r.ID)
		return nil, err
	}

	if len(data) > store.MaxTransactionBytes {
		n.wait.cancel(r.ID)
		return nil, ErrRequestTooLarge
	}

	err = n.Propose(waitCtx, data)
	if err != nil {
		n.wait.cancel(r.ID)
		return nil, err
	}

	select {
	case x := <-ch:
		res := x.(*applyResult)
		return res.resp, res.err
	case <-waitCtx.Done():
		n.wait.cancel(r.ID)
		return nil, ErrLostLeadership
	case <-ctx.Done():
		n.wait.cancel(r.ID)
		return nil, ctx.Err()
	}
}

// configure sends a configuration change through consensus and
// then waits for it to be applied to the server. It will block
// until the change is performed or there is an error.
func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error {
	cc.ID = n.reqIDGen.Next()

	ctx, cancel := context.WithCancel(ctx)
	ch := n.wait.register(cc.ID, nil, cancel)

	if err := n.ProposeConfChange(ctx, cc); err != nil {
		n.wait.cancel(cc.ID)
		return err
	}

	select {
	case x := <-ch:
		if err, ok := x.(error); ok {
			return err
		}
		if x != nil {
			log.G(ctx).Panic("raft: configuration change error, return type should always be error")
		}
		return nil
	case <-ctx.Done():
		n.wait.cancel(cc.ID)
		return ctx.Err()
	case <-n.Ctx.Done():
		return ErrStopped
	}
}

func (n *Node) processCommitted(entry raftpb.Entry) error {
	// Process a normal entry
	if entry.Type == raftpb.EntryNormal && entry.Data != nil {
		if err := n.processEntry(entry); err != nil {
			return err
		}
	}

	// Process a configuration change (add/remove node)
	if entry.Type == raftpb.EntryConfChange {
		n.processConfChange(entry)
	}

	n.appliedIndex = entry.Index
	return nil
}

func (n *Node) processEntry(entry raftpb.Entry) error {
	r := &api.InternalRaftRequest{}
	err := proto.Unmarshal(entry.Data, r)
	if err != nil {
		return err
	}

	if r.Action == nil {
		return nil
	}

	if !n.wait.trigger(r.ID, &applyResult{resp: r, err: nil}) {
		// There was no wait on this ID, meaning we don't have a
		// transaction in progress that would be committed to the
		// memory store by the "trigger" call. Either a different node
		// wrote this to raft, or we wrote it before losing the leader
		// position and cancelling the transaction. Create a new
		// transaction to commit the data.

		// It should not be possible for processInternalRaftRequest
		// to be running in this situation, but out of caution we
		// cancel any current invocations to avoid a deadlock.
		n.wait.cancelAll()

		err := n.memoryStore.ApplyStoreActions(r.Action)
		if err != nil {
			log.G(context.Background()).Errorf("error applying actions from raft: %v", err)
		}
	}
	return nil
}

func (n *Node) processConfChange(entry raftpb.Entry) {
	var (
		err error
		cc  raftpb.ConfChange
	)

	if err := proto.Unmarshal(entry.Data, &cc); err != nil {
		n.wait.trigger(cc.ID, err)
	}

	if err := n.cluster.ValidateConfigurationChange(cc); err != nil {
		n.wait.trigger(cc.ID, err)
	}

	switch cc.Type {
	case raftpb.ConfChangeAddNode:
		err = n.applyAddNode(cc)
	case raftpb.ConfChangeRemoveNode:
		err = n.applyRemoveNode(cc)
	}

	if err != nil {
		n.wait.trigger(cc.ID, err)
	}

	n.confState = *n.ApplyConfChange(cc)
	n.wait.trigger(cc.ID, nil)
}

// applyAddNode is called when we receive a ConfChange
// from a member in the raft cluster, this adds a new
// node to the existing raft cluster
func (n *Node) applyAddNode(cc raftpb.ConfChange) error {
	member := &api.RaftMember{}
	err := proto.Unmarshal(cc.Context, member)
	if err != nil {
		return err
	}

	// ID must be non zero
	if member.RaftID == 0 {
		return nil
	}

	if err = n.registerNode(member); err != nil {
		return err
	}
	return nil
}

// applyRemoveNode is called when we receive a ConfChange
// from a member in the raft cluster, this removes a node
// from the existing raft cluster
func (n *Node) applyRemoveNode(cc raftpb.ConfChange) (err error) {
	// If the node from where the remove is issued is
	// a follower and the leader steps down, Campaign
	// to be the leader.

	if cc.NodeID == n.Leader() && !n.IsLeader() {
		if err = n.Campaign(n.Ctx); err != nil {
			return err
		}
	}

	if cc.NodeID == n.Config.ID {
		// wait the commit ack to be sent before closing connection
		n.asyncTasks.Wait()

		// if there are only 2 nodes in the cluster, and leader is leaving
		// before closing the connection, leader has to ensure that follower gets
		// noticed about this raft conf change commit. Otherwise, follower would
		// assume there are still 2 nodes in the cluster and won't get elected
		// into the leader by acquiring the majority (2 nodes)

		// while n.asyncTasks.Wait() could be helpful in this case
		// it's the best-effort strategy, because this send could be fail due to some errors (such as time limit exceeds)
		// TODO(Runshen Zhu): use leadership transfer to solve this case, after vendoring raft 3.0+
	}

	return n.cluster.RemoveMember(cc.NodeID)
}

// ConnectToMember returns a member object with an initialized
// connection to communicate with other raft members
func (n *Node) ConnectToMember(addr string, timeout time.Duration) (*membership.Member, error) {
	conn, err := dial(addr, "tcp", n.tlsCredentials, timeout)
	if err != nil {
		return nil, err
	}

	return &membership.Member{
		RaftClient: api.NewRaftClient(conn),
		Conn:       conn,
	}, nil
}

// SubscribeLeadership returns channel to which events about leadership change
// will be sent in form of raft.LeadershipState. Also cancel func is returned -
// it should be called when listener is not longer interested in events.
func (n *Node) SubscribeLeadership() (q chan events.Event, cancel func()) {
	ch := events.NewChannel(0)
	sink := events.Sink(events.NewQueue(ch))
	n.leadershipBroadcast.Add(sink)
	return ch.C, func() {
		n.leadershipBroadcast.Remove(sink)
		ch.Close()
		sink.Close()
	}
}

// createConfigChangeEnts creates a series of Raft entries (i.e.
// EntryConfChange) to remove the set of given IDs from the cluster. The ID
// `self` is _not_ removed, even if present in the set.
// If `self` is not inside the given ids, it creates a Raft entry to add a
// default member with the given `self`.
func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
	var ents []raftpb.Entry
	next := index + 1
	found := false
	for _, id := range ids {
		if id == self {
			found = true
			continue
		}
		cc := &raftpb.ConfChange{
			Type:   raftpb.ConfChangeRemoveNode,
			NodeID: id,
		}
		data, err := cc.Marshal()
		if err != nil {
			log.G(context.Background()).Panicf("marshal configuration change should never fail: %v", err)
		}
		e := raftpb.Entry{
			Type:  raftpb.EntryConfChange,
			Data:  data,
			Term:  term,
			Index: next,
		}
		ents = append(ents, e)
		next++
	}
	if !found {
		node := &api.RaftMember{RaftID: self}
		meta, err := node.Marshal()
		if err != nil {
			log.G(context.Background()).Panicf("marshal member should never fail: %v", err)
		}
		cc := &raftpb.ConfChange{
			Type:    raftpb.ConfChangeAddNode,
			NodeID:  self,
			Context: meta,
		}
		data, err := cc.Marshal()
		if err != nil {
			log.G(context.Background()).Panicf("marshal configuration change should never fail: %v", err)
		}
		e := raftpb.Entry{
			Type:  raftpb.EntryConfChange,
			Data:  data,
			Term:  term,
			Index: next,
		}
		ents = append(ents, e)
	}
	return ents
}

// getIDs returns an ordered set of IDs included in the given snapshot and
// the entries. The given snapshot/entries can contain two kinds of
// ID-related entry:
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
	ids := make(map[uint64]bool)
	if snap != nil {
		for _, id := range snap.Metadata.ConfState.Nodes {
			ids[id] = true
		}
	}
	for _, e := range ents {
		if e.Type != raftpb.EntryConfChange {
			continue
		}
		if snap != nil && e.Index < snap.Metadata.Index {
			continue
		}
		var cc raftpb.ConfChange
		if err := cc.Unmarshal(e.Data); err != nil {
			log.G(context.Background()).Panicf("unmarshal configuration change should never fail: %v", err)
		}
		switch cc.Type {
		case raftpb.ConfChangeAddNode:
			ids[cc.NodeID] = true
		case raftpb.ConfChangeRemoveNode:
			delete(ids, cc.NodeID)
		case raftpb.ConfChangeUpdateNode:
			// do nothing
		default:
			log.G(context.Background()).Panic("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
		}
	}
	var sids []uint64
	for id := range ids {
		sids = append(sids, id)
	}
	return sids
}