package raft

import (
	"fmt"
	"math"
	"math/rand"
	"net"
	"sync"
	"sync/atomic"
	"time"

	"github.com/coreos/etcd/pkg/idutil"
	"github.com/coreos/etcd/raft"
	"github.com/coreos/etcd/raft/raftpb"
	"github.com/docker/docker/pkg/signal"
	"github.com/docker/go-events"
	"github.com/docker/swarmkit/api"
	"github.com/docker/swarmkit/ca"
	"github.com/docker/swarmkit/log"
	"github.com/docker/swarmkit/manager/raftselector"
	"github.com/docker/swarmkit/manager/state"
	"github.com/docker/swarmkit/manager/state/raft/membership"
	"github.com/docker/swarmkit/manager/state/raft/storage"
	"github.com/docker/swarmkit/manager/state/raft/transport"
	"github.com/docker/swarmkit/manager/state/store"
	"github.com/docker/swarmkit/watch"
	"github.com/gogo/protobuf/proto"
	"github.com/pivotal-golang/clock"
	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
	"golang.org/x/net/context"
	"golang.org/x/time/rate"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/peer"
)

var (
	// ErrNoRaftMember is thrown when the node is not yet part of a raft cluster
	ErrNoRaftMember = errors.New("raft: node is not yet part of a raft cluster")
	// ErrConfChangeRefused is returned when there is an issue with the configuration change
	ErrConfChangeRefused = errors.New("raft: propose configuration change refused")
	// ErrApplyNotSpecified is returned during the creation of a raft node when no apply method was provided
	ErrApplyNotSpecified = errors.New("raft: apply method was not specified")
	// ErrSetHardState is returned when the node fails to set the hard state
	ErrSetHardState = errors.New("raft: failed to set the hard state for log append entry")
	// ErrStopped is returned when an operation was submitted but the node was stopped in the meantime
	ErrStopped = errors.New("raft: failed to process the request: node is stopped")
	// ErrLostLeadership is returned when an operation was submitted but the node lost leader status before it became committed
	ErrLostLeadership = errors.New("raft: failed to process the request: node lost leader status")
	// ErrRequestTooLarge is returned when a raft internal message is too large to be sent
	ErrRequestTooLarge = errors.New("raft: raft message is too large and can't be sent")
	// ErrCannotRemoveMember is thrown when we try to remove a member from the cluster but this would result in a loss of quorum
	ErrCannotRemoveMember = errors.New("raft: member cannot be removed, because removing it may result in loss of quorum")
	// ErrNoClusterLeader is thrown when the cluster has no elected leader
	ErrNoClusterLeader = errors.New("raft: no elected cluster leader")
	// ErrMemberUnknown is sent in response to a message from an
	// unrecognized peer.
	ErrMemberUnknown = errors.New("raft: member unknown")

	// work around lint
	lostQuorumMessage = "The swarm does not have a leader. It's possible that too few managers are online. Make sure more than half of the managers are online."
	errLostQuorum     = errors.New(lostQuorumMessage)
)

// LeadershipState indicates whether the node is a leader or follower.
type LeadershipState int

const (
	// IsLeader indicates that the node is a raft leader.
	IsLeader LeadershipState = iota
	// IsFollower indicates that the node is a raft follower.
	IsFollower

	// lostQuorumTimeout is the number of ticks that can elapse with no
	// leader before LeaderConn starts returning an error right away.
	lostQuorumTimeout = 10
)

// EncryptionKeys are the current and, if necessary, pending DEKs with which to
// encrypt raft data
type EncryptionKeys struct {
	CurrentDEK []byte
	PendingDEK []byte
}

// EncryptionKeyRotator is an interface to find out if any keys need rotating.
type EncryptionKeyRotator interface {
	GetKeys() EncryptionKeys
	UpdateKeys(EncryptionKeys) error
	NeedsRotation() bool
	RotationNotify() chan struct{}
}

// Node represents the Raft Node useful
// configuration.
type Node struct {
	raftNode  raft.Node
	cluster   *membership.Cluster
	transport *transport.Transport

	raftStore           *raft.MemoryStorage
	memoryStore         *store.MemoryStore
	Config              *raft.Config
	opts                NodeOptions
	reqIDGen            *idutil.Generator
	wait                *wait
	campaignWhenAble    bool
	signalledLeadership uint32
	isMember            uint32
	bootstrapMembers    []*api.RaftMember

	// waitProp waits for all the proposals to be terminated before
	// shutting down the node.
	waitProp sync.WaitGroup

	confState       raftpb.ConfState
	appliedIndex    uint64
	snapshotMeta    raftpb.SnapshotMetadata
	writtenWALIndex uint64

	ticker clock.Ticker
	doneCh chan struct{}
	// RemovedFromRaft notifies about node deletion from raft cluster
	RemovedFromRaft chan struct{}
	cancelFunc      func()
	// removeRaftCh notifies about node deletion from raft cluster
	removeRaftCh        chan struct{}
	removeRaftOnce      sync.Once
	leadershipBroadcast *watch.Queue

	// used to coordinate shutdown
	// Lock should be used only in stop(), all other functions should use RLock.
	stopMu sync.RWMutex
	// used for membership management checks
	membershipLock sync.Mutex
	// synchronizes access to n.opts.Addr, and makes sure the address is not
	// updated concurrently with JoinAndStart.
	addrLock sync.Mutex

	snapshotInProgress chan raftpb.SnapshotMetadata
	asyncTasks         sync.WaitGroup

	// stopped chan is used for notifying grpc handlers that raft node going
	// to stop.
	stopped chan struct{}

	raftLogger          *storage.EncryptedRaftLogger
	keyRotator          EncryptionKeyRotator
	rotationQueued      bool
	clearData           bool
	waitForAppliedIndex uint64
	ticksWithNoLeader   uint32
}

// NodeOptions provides node-level options.
type NodeOptions struct {
	// ID is the node's ID, from its certificate's CN field.
	ID string
	// Addr is the address of this node's listener
	Addr string
	// ForceNewCluster defines if we have to force a new cluster
	// because we are recovering from a backup data directory.
	ForceNewCluster bool
	// JoinAddr is the cluster to join. May be an empty string to create
	// a standalone cluster.
	JoinAddr string
	// ForceJoin tells us to join even if already part of a cluster.
	ForceJoin bool
	// Config is the raft config.
	Config *raft.Config
	// StateDir is the directory to store durable state.
	StateDir string
	// TickInterval interval is the time interval between raft ticks.
	TickInterval time.Duration
	// ClockSource is a Clock interface to use as a time base.
	// Leave this nil except for tests that are designed not to run in real
	// time.
	ClockSource clock.Clock
	// SendTimeout is the timeout on the sending messages to other raft
	// nodes. Leave this as 0 to get the default value.
	SendTimeout    time.Duration
	TLSCredentials credentials.TransportCredentials
	KeyRotator     EncryptionKeyRotator
	// DisableStackDump prevents Run from dumping goroutine stacks when the
	// store becomes stuck.
	DisableStackDump bool
}

func init() {
	rand.Seed(time.Now().UnixNano())
}

// NewNode generates a new Raft node
func NewNode(opts NodeOptions) *Node {
	cfg := opts.Config
	if cfg == nil {
		cfg = DefaultNodeConfig()
	}
	if opts.TickInterval == 0 {
		opts.TickInterval = time.Second
	}
	if opts.SendTimeout == 0 {
		opts.SendTimeout = 2 * time.Second
	}

	raftStore := raft.NewMemoryStorage()

	n := &Node{
		cluster:   membership.NewCluster(),
		raftStore: raftStore,
		opts:      opts,
		Config: &raft.Config{
			ElectionTick:    cfg.ElectionTick,
			HeartbeatTick:   cfg.HeartbeatTick,
			Storage:         raftStore,
			MaxSizePerMsg:   cfg.MaxSizePerMsg,
			MaxInflightMsgs: cfg.MaxInflightMsgs,
			Logger:          cfg.Logger,
			CheckQuorum:     cfg.CheckQuorum,
		},
		doneCh:              make(chan struct{}),
		RemovedFromRaft:     make(chan struct{}),
		stopped:             make(chan struct{}),
		leadershipBroadcast: watch.NewQueue(),
		keyRotator:          opts.KeyRotator,
	}
	n.memoryStore = store.NewMemoryStore(n)

	if opts.ClockSource == nil {
		n.ticker = clock.NewClock().NewTicker(opts.TickInterval)
	} else {
		n.ticker = opts.ClockSource.NewTicker(opts.TickInterval)
	}

	n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now())
	n.wait = newWait()

	n.cancelFunc = func(n *Node) func() {
		var cancelOnce sync.Once
		return func() {
			cancelOnce.Do(func() {
				close(n.stopped)
			})
		}
	}(n)

	return n
}

// IsIDRemoved reports if member with id was removed from cluster.
// Part of transport.Raft interface.
func (n *Node) IsIDRemoved(id uint64) bool {
	return n.cluster.IsIDRemoved(id)
}

// NodeRemoved signals that node was removed from cluster and should stop.
// Part of transport.Raft interface.
func (n *Node) NodeRemoved() {
	n.removeRaftOnce.Do(func() {
		atomic.StoreUint32(&n.isMember, 0)
		close(n.RemovedFromRaft)
	})
}

// ReportSnapshot reports snapshot status to underlying raft node.
// Part of transport.Raft interface.
func (n *Node) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
	n.raftNode.ReportSnapshot(id, status)
}

// ReportUnreachable reports to underlying raft node that member with id is
// unreachable.
// Part of transport.Raft interface.
func (n *Node) ReportUnreachable(id uint64) {
	n.raftNode.ReportUnreachable(id)
}

// SetAddr provides the raft node's address. This can be used in cases where
// opts.Addr was not provided to NewNode, for example when a port was not bound
// until after the raft node was created.
func (n *Node) SetAddr(ctx context.Context, addr string) error {
	n.addrLock.Lock()
	defer n.addrLock.Unlock()

	n.opts.Addr = addr

	if !n.IsMember() {
		return nil
	}

	newRaftMember := &api.RaftMember{
		RaftID: n.Config.ID,
		NodeID: n.opts.ID,
		Addr:   addr,
	}
	if err := n.cluster.UpdateMember(n.Config.ID, newRaftMember); err != nil {
		return err
	}

	// If the raft node is running, submit a configuration change
	// with the new address.

	// TODO(aaronl): Currently, this node must be the leader to
	// submit this configuration change. This works for the initial
	// use cases (single-node cluster late binding ports, or calling
	// SetAddr before joining a cluster). In the future, we may want
	// to support having a follower proactively change its remote
	// address.

	leadershipCh, cancelWatch := n.SubscribeLeadership()
	defer cancelWatch()

	ctx, cancelCtx := n.WithContext(ctx)
	defer cancelCtx()

	isLeader := atomic.LoadUint32(&n.signalledLeadership) == 1
	for !isLeader {
		select {
		case leadershipChange := <-leadershipCh:
			if leadershipChange == IsLeader {
				isLeader = true
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}

	return n.updateNodeBlocking(ctx, n.Config.ID, addr)
}

// WithContext returns context which is cancelled when parent context cancelled
// or node is stopped.
func (n *Node) WithContext(ctx context.Context) (context.Context, context.CancelFunc) {
	ctx, cancel := context.WithCancel(ctx)

	go func() {
		select {
		case <-ctx.Done():
		case <-n.stopped:
			cancel()
		}
	}()
	return ctx, cancel
}

func (n *Node) initTransport() {
	transportConfig := &transport.Config{
		HeartbeatInterval: time.Duration(n.Config.ElectionTick) * n.opts.TickInterval,
		SendTimeout:       n.opts.SendTimeout,
		Credentials:       n.opts.TLSCredentials,
		Raft:              n,
	}
	n.transport = transport.New(transportConfig)
}

// JoinAndStart joins and starts the raft server
func (n *Node) JoinAndStart(ctx context.Context) (err error) {
	ctx, cancel := n.WithContext(ctx)
	defer func() {
		cancel()
		if err != nil {
			n.stopMu.Lock()
			// to shutdown transport
			n.cancelFunc()
			n.stopMu.Unlock()
			n.done()
		} else {
			atomic.StoreUint32(&n.isMember, 1)
		}
	}()

	loadAndStartErr := n.loadAndStart(ctx, n.opts.ForceNewCluster)
	if loadAndStartErr != nil && loadAndStartErr != storage.ErrNoWAL {
		return loadAndStartErr
	}

	snapshot, err := n.raftStore.Snapshot()
	// Snapshot never returns an error
	if err != nil {
		panic("could not get snapshot of raft store")
	}

	n.confState = snapshot.Metadata.ConfState
	n.appliedIndex = snapshot.Metadata.Index
	n.snapshotMeta = snapshot.Metadata
	n.writtenWALIndex, _ = n.raftStore.LastIndex() // lastIndex always returns nil as an error

	n.addrLock.Lock()
	defer n.addrLock.Unlock()

	// override the module field entirely, since etcd/raft is not exactly a submodule
	n.Config.Logger = log.G(ctx).WithField("module", "raft")

	// restore from snapshot
	if loadAndStartErr == nil {
		if n.opts.JoinAddr != "" && n.opts.ForceJoin {
			if err := n.joinCluster(ctx); err != nil {
				return errors.Wrap(err, "failed to rejoin cluster")
			}
		}
		n.campaignWhenAble = true
		n.initTransport()
		n.raftNode = raft.RestartNode(n.Config)
		return nil
	}

	if n.opts.JoinAddr == "" {
		// First member in the cluster, self-assign ID
		n.Config.ID = uint64(rand.Int63()) + 1
		peer, err := n.newRaftLogs(n.opts.ID)
		if err != nil {
			return err
		}
		n.campaignWhenAble = true
		n.initTransport()
		n.raftNode = raft.StartNode(n.Config, []raft.Peer{peer})
		return nil
	}

	// join to existing cluster

	if err := n.joinCluster(ctx); err != nil {
		return err
	}

	if _, err := n.newRaftLogs(n.opts.ID); err != nil {
		return err
	}

	n.initTransport()
	n.raftNode = raft.StartNode(n.Config, nil)

	return nil
}

func (n *Node) joinCluster(ctx context.Context) error {
	if n.opts.Addr == "" {
		return errors.New("attempted to join raft cluster without knowing own address")
	}

	conn, err := dial(n.opts.JoinAddr, "tcp", n.opts.TLSCredentials, 10*time.Second)
	if err != nil {
		return err
	}
	defer conn.Close()
	client := api.NewRaftMembershipClient(conn)

	joinCtx, joinCancel := context.WithTimeout(ctx, n.reqTimeout())
	defer joinCancel()
	resp, err := client.Join(joinCtx, &api.JoinRequest{
		Addr: n.opts.Addr,
	})
	if err != nil {
		return err
	}

	n.Config.ID = resp.RaftID
	n.bootstrapMembers = resp.Members
	return nil
}

// DefaultNodeConfig returns the default config for a
// raft node that can be modified and customized
func DefaultNodeConfig() *raft.Config {
	return &raft.Config{
		HeartbeatTick:   1,
		ElectionTick:    3,
		MaxSizePerMsg:   math.MaxUint16,
		MaxInflightMsgs: 256,
		Logger:          log.L,
		CheckQuorum:     true,
	}
}

// DefaultRaftConfig returns a default api.RaftConfig.
func DefaultRaftConfig() api.RaftConfig {
	return api.RaftConfig{
		KeepOldSnapshots:           0,
		SnapshotInterval:           10000,
		LogEntriesForSlowFollowers: 500,
		ElectionTick:               3,
		HeartbeatTick:              1,
	}
}

// MemoryStore returns the memory store that is kept in sync with the raft log.
func (n *Node) MemoryStore() *store.MemoryStore {
	return n.memoryStore
}

func (n *Node) done() {
	n.cluster.Clear()

	n.ticker.Stop()
	n.leadershipBroadcast.Close()
	n.cluster.PeersBroadcast.Close()
	n.memoryStore.Close()
	if n.transport != nil {
		n.transport.Stop()
	}

	close(n.doneCh)
}

// ClearData tells the raft node to delete its WALs, snapshots, and keys on
// shutdown.
func (n *Node) ClearData() {
	n.clearData = true
}

// Run is the main loop for a Raft node, it goes along the state machine,
// acting on the messages received from other Raft nodes in the cluster.
//
// Before running the main loop, it first starts the raft node based on saved
// cluster state. If no saved state exists, it starts a single-node cluster.
func (n *Node) Run(ctx context.Context) error {
	ctx = log.WithLogger(ctx, logrus.WithField("raft_id", fmt.Sprintf("%x", n.Config.ID)))
	ctx, cancel := context.WithCancel(ctx)

	for _, node := range n.bootstrapMembers {
		if err := n.registerNode(node); err != nil {
			log.G(ctx).WithError(err).Errorf("failed to register member %x", node.RaftID)
		}
	}

	defer func() {
		cancel()
		n.stop(ctx)
		if n.clearData {
			// Delete WAL and snapshots, since they are no longer
			// usable.
			if err := n.raftLogger.Clear(ctx); err != nil {
				log.G(ctx).WithError(err).Error("failed to move wal after node removal")
			}
			// clear out the DEKs
			if err := n.keyRotator.UpdateKeys(EncryptionKeys{}); err != nil {
				log.G(ctx).WithError(err).Error("could not remove DEKs")
			}
		}
		n.done()
	}()

	wasLeader := false
	transferLeadershipLimit := rate.NewLimiter(rate.Every(time.Minute), 1)

	for {
		select {
		case <-n.ticker.C():
			n.raftNode.Tick()

			if n.leader() == raft.None {
				atomic.AddUint32(&n.ticksWithNoLeader, 1)
			} else {
				atomic.StoreUint32(&n.ticksWithNoLeader, 0)
			}
		case rd := <-n.raftNode.Ready():
			raftConfig := n.getCurrentRaftConfig()

			// Save entries to storage
			if err := n.saveToStorage(ctx, &raftConfig, rd.HardState, rd.Entries, rd.Snapshot); err != nil {
				return errors.Wrap(err, "failed to save entries to storage")
			}

			if wasLeader &&
				(rd.SoftState == nil || rd.SoftState.RaftState == raft.StateLeader) &&
				n.memoryStore.Wedged() &&
				transferLeadershipLimit.Allow() {
				if !n.opts.DisableStackDump {
					signal.DumpStacks("")
				}
				transferee, err := n.transport.LongestActive()
				if err != nil {
					log.G(ctx).WithError(err).Error("failed to get longest-active member")
				} else {
					log.G(ctx).Error("data store lock held too long - transferring leadership")
					n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
				}
			}

			for _, msg := range rd.Messages {
				// Send raft messages to peers
				if err := n.transport.Send(msg); err != nil {
					log.G(ctx).WithError(err).Error("failed to send message to member")
				}
			}

			// Apply snapshot to memory store. The snapshot
			// was applied to the raft store in
			// saveToStorage.
			if !raft.IsEmptySnap(rd.Snapshot) {
				// Load the snapshot data into the store
				if err := n.restoreFromSnapshot(ctx, rd.Snapshot.Data); err != nil {
					log.G(ctx).WithError(err).Error("failed to restore cluster from snapshot")
				}
				n.appliedIndex = rd.Snapshot.Metadata.Index
				n.snapshotMeta = rd.Snapshot.Metadata
				n.confState = rd.Snapshot.Metadata.ConfState
			}

			// If we cease to be the leader, we must cancel any
			// proposals that are currently waiting for a quorum to
			// acknowledge them. It is still possible for these to
			// become committed, but if that happens we will apply
			// them as any follower would.

			// It is important that we cancel these proposals before
			// calling processCommitted, so processCommitted does
			// not deadlock.

			if rd.SoftState != nil {
				if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
					wasLeader = false
					if atomic.LoadUint32(&n.signalledLeadership) == 1 {
						atomic.StoreUint32(&n.signalledLeadership, 0)
						n.leadershipBroadcast.Publish(IsFollower)
					}

					// It is important that we set n.signalledLeadership to 0
					// before calling n.wait.cancelAll. When a new raft
					// request is registered, it checks n.signalledLeadership
					// afterwards, and cancels the registration if it is 0.
					// If cancelAll was called first, this call might run
					// before the new request registers, but
					// signalledLeadership would be set after the check.
					// Setting signalledLeadership before calling cancelAll
					// ensures that if a new request is registered during
					// this transition, it will either be cancelled by
					// cancelAll, or by its own check of signalledLeadership.
					n.wait.cancelAll()
				} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
					wasLeader = true
				}
			}

			// Process committed entries
			for _, entry := range rd.CommittedEntries {
				if err := n.processCommitted(ctx, entry); err != nil {
					log.G(ctx).WithError(err).Error("failed to process committed entries")
				}
			}

			// in case the previous attempt to update the key failed
			n.maybeMarkRotationFinished(ctx)

			// Trigger a snapshot every once in awhile
			if n.snapshotInProgress == nil &&
				(n.needsSnapshot(ctx) || raftConfig.SnapshotInterval > 0 &&
					n.appliedIndex-n.snapshotMeta.Index >= raftConfig.SnapshotInterval) {
				n.doSnapshot(ctx, raftConfig)
			}

			if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
				// If all the entries in the log have become
				// committed, broadcast our leadership status.
				if n.caughtUp() {
					atomic.StoreUint32(&n.signalledLeadership, 1)
					n.leadershipBroadcast.Publish(IsLeader)
				}
			}

			// Advance the state machine
			n.raftNode.Advance()

			// On the first startup, or if we are the only
			// registered member after restoring from the state,
			// campaign to be the leader.
			if n.campaignWhenAble {
				members := n.cluster.Members()
				if len(members) >= 1 {
					n.campaignWhenAble = false
				}
				if len(members) == 1 && members[n.Config.ID] != nil {
					n.raftNode.Campaign(ctx)
				}
			}

		case snapshotMeta := <-n.snapshotInProgress:
			raftConfig := n.getCurrentRaftConfig()
			if snapshotMeta.Index > n.snapshotMeta.Index {
				n.snapshotMeta = snapshotMeta
				if err := n.raftLogger.GC(snapshotMeta.Index, snapshotMeta.Term, raftConfig.KeepOldSnapshots); err != nil {
					log.G(ctx).WithError(err).Error("failed to clean up old snapshots and WALs")
				}
			}
			n.snapshotInProgress = nil
			n.maybeMarkRotationFinished(ctx)
			if n.rotationQueued && n.needsSnapshot(ctx) {
				// there was a key rotation that took place before while the snapshot
				// was in progress - we have to take another snapshot and encrypt with the new key
				n.rotationQueued = false
				n.doSnapshot(ctx, raftConfig)
			}
		case <-n.keyRotator.RotationNotify():
			// There are 2 separate checks:  rotationQueued, and n.needsSnapshot().
			// We set rotationQueued so that when we are notified of a rotation, we try to
			// do a snapshot as soon as possible.  However, if there is an error while doing
			// the snapshot, we don't want to hammer the node attempting to do snapshots over
			// and over.  So if doing a snapshot fails, wait until the next entry comes in to
			// try again.
			switch {
			case n.snapshotInProgress != nil:
				n.rotationQueued = true
			case n.needsSnapshot(ctx):
				n.doSnapshot(ctx, n.getCurrentRaftConfig())
			}
		case <-ctx.Done():
			return nil
		}
	}
}

func (n *Node) restoreFromSnapshot(ctx context.Context, data []byte) error {
	snapCluster, err := n.clusterSnapshot(data)
	if err != nil {
		return err
	}

	oldMembers := n.cluster.Members()

	for _, member := range snapCluster.Members {
		delete(oldMembers, member.RaftID)
	}

	for _, removedMember := range snapCluster.Removed {
		n.cluster.RemoveMember(removedMember)
		n.transport.RemovePeer(removedMember)
		delete(oldMembers, removedMember)
	}

	for id, member := range oldMembers {
		n.cluster.ClearMember(id)
		if err := n.transport.RemovePeer(member.RaftID); err != nil {
			log.G(ctx).WithError(err).Errorf("failed to remove peer %x from transport", member.RaftID)
		}
	}
	for _, node := range snapCluster.Members {
		if err := n.registerNode(&api.RaftMember{RaftID: node.RaftID, NodeID: node.NodeID, Addr: node.Addr}); err != nil {
			log.G(ctx).WithError(err).Error("failed to register node from snapshot")
		}
	}
	return nil
}

func (n *Node) needsSnapshot(ctx context.Context) bool {
	if n.waitForAppliedIndex == 0 && n.keyRotator.NeedsRotation() {
		keys := n.keyRotator.GetKeys()
		if keys.PendingDEK != nil {
			n.raftLogger.RotateEncryptionKey(keys.PendingDEK)
			// we want to wait for the last index written with the old DEK to be committed, else a snapshot taken
			// may have an index less than the index of a WAL written with an old DEK.  We want the next snapshot
			// written with the new key to supercede any WAL written with an old DEK.
			n.waitForAppliedIndex = n.writtenWALIndex
			// if there is already a snapshot at this index or higher, bump the wait index up to 1 higher than the current
			// snapshot index, because the rotation cannot be completed until the next snapshot
			if n.waitForAppliedIndex <= n.snapshotMeta.Index {
				n.waitForAppliedIndex = n.snapshotMeta.Index + 1
			}
			log.G(ctx).Debugf(
				"beginning raft DEK rotation - last indices written with the old key are (snapshot: %d, WAL: %d) - waiting for snapshot of index %d to be written before rotation can be completed", n.snapshotMeta.Index, n.writtenWALIndex, n.waitForAppliedIndex)
		}
	}

	result := n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.appliedIndex
	if result {
		log.G(ctx).Debugf(
			"a snapshot at index %d is needed in order to complete raft DEK rotation - a snapshot with index >= %d can now be triggered",
			n.waitForAppliedIndex, n.appliedIndex)
	}
	return result
}

func (n *Node) maybeMarkRotationFinished(ctx context.Context) {
	if n.waitForAppliedIndex > 0 && n.waitForAppliedIndex <= n.snapshotMeta.Index {
		// this means we tried to rotate - so finish the rotation
		if err := n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: n.raftLogger.EncryptionKey}); err != nil {
			log.G(ctx).WithError(err).Error("failed to update encryption keys after a successful rotation")
		} else {
			log.G(ctx).Debugf(
				"a snapshot with index %d is available, which completes the DEK rotation requiring a snapshot of at least index %d - throwing away DEK and older snapshots encrypted with the old key",
				n.snapshotMeta.Index, n.waitForAppliedIndex)
			n.waitForAppliedIndex = 0

			if err := n.raftLogger.GC(n.snapshotMeta.Index, n.snapshotMeta.Term, 0); err != nil {
				log.G(ctx).WithError(err).Error("failed to remove old snapshots and WALs that were written with the previous raft DEK")
			}
		}
	}
}

func (n *Node) getCurrentRaftConfig() api.RaftConfig {
	raftConfig := DefaultRaftConfig()
	n.memoryStore.View(func(readTx store.ReadTx) {
		clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
		if err == nil && len(clusters) == 1 {
			raftConfig = clusters[0].Spec.Raft
		}
	})
	return raftConfig
}

// Cancel interrupts all ongoing proposals, and prevents new ones from
// starting. This is useful for the shutdown sequence because it allows
// the manager to shut down raft-dependent services that might otherwise
// block on shutdown if quorum isn't met. Then the raft node can be completely
// shut down once no more code is using it.
func (n *Node) Cancel() {
	n.cancelFunc()
}

// Done returns channel which is closed when raft node is fully stopped.
func (n *Node) Done() <-chan struct{} {
	return n.doneCh
}

func (n *Node) stop(ctx context.Context) {
	n.stopMu.Lock()
	defer n.stopMu.Unlock()

	n.Cancel()
	n.waitProp.Wait()
	n.asyncTasks.Wait()

	n.raftNode.Stop()
	n.ticker.Stop()
	n.raftLogger.Close(ctx)
	atomic.StoreUint32(&n.isMember, 0)
	// TODO(stevvooe): Handle ctx.Done()
}

// isLeader checks if we are the leader or not, without the protection of lock
func (n *Node) isLeader() bool {
	if !n.IsMember() {
		return false
	}

	if n.Status().Lead == n.Config.ID {
		return true
	}
	return false
}

// IsLeader checks if we are the leader or not, with the protection of lock
func (n *Node) IsLeader() bool {
	n.stopMu.RLock()
	defer n.stopMu.RUnlock()

	return n.isLeader()
}

// leader returns the id of the leader, without the protection of lock and
// membership check, so it's caller task.
func (n *Node) leader() uint64 {
	return n.Status().Lead
}

// Leader returns the id of the leader, with the protection of lock
func (n *Node) Leader() (uint64, error) {
	n.stopMu.RLock()
	defer n.stopMu.RUnlock()

	if !n.IsMember() {
		return raft.None, ErrNoRaftMember
	}
	leader := n.leader()
	if leader == raft.None {
		return raft.None, ErrNoClusterLeader
	}

	return leader, nil
}

// ReadyForProposals returns true if the node has broadcasted a message
// saying that it has become the leader. This means it is ready to accept
// proposals.
func (n *Node) ReadyForProposals() bool {
	return atomic.LoadUint32(&n.signalledLeadership) == 1
}

func (n *Node) caughtUp() bool {
	// obnoxious function that always returns a nil error
	lastIndex, _ := n.raftStore.LastIndex()
	return n.appliedIndex >= lastIndex
}

// Join asks to a member of the raft to propose
// a configuration change and add us as a member thus
// beginning the log replication process. This method
// is called from an aspiring member to an existing member
func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinResponse, error) {
	nodeInfo, err := ca.RemoteNode(ctx)
	if err != nil {
		return nil, err
	}

	fields := logrus.Fields{
		"node.id": nodeInfo.NodeID,
		"method":  "(*Node).Join",
		"raft_id": fmt.Sprintf("%x", n.Config.ID),
	}
	if nodeInfo.ForwardedBy != nil {
		fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
	}
	log := log.G(ctx).WithFields(fields)
	log.Debug("")

	// can't stop the raft node while an async RPC is in progress
	n.stopMu.RLock()
	defer n.stopMu.RUnlock()

	n.membershipLock.Lock()
	defer n.membershipLock.Unlock()

	if !n.IsMember() {
		return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrNoRaftMember.Error())
	}

	if !n.isLeader() {
		return nil, grpc.Errorf(codes.FailedPrecondition, "%s", ErrLostLeadership.Error())
	}

	remoteAddr := req.Addr

	// If the joining node sent an address like 0.0.0.0:4242, automatically
	// determine its actual address based on the GRPC connection. This
	// avoids the need for a prospective member to know its own address.

	requestHost, requestPort, err := net.SplitHostPort(remoteAddr)
	if err != nil {
		return nil, grpc.Errorf(codes.InvalidArgument, "invalid address %s in raft join request", remoteAddr)
	}

	requestIP := net.ParseIP(requestHost)
	if requestIP != nil && requestIP.IsUnspecified() {
		remoteHost, _, err := net.SplitHostPort(nodeInfo.RemoteAddr)
		if err != nil {
			return nil, err
		}
		remoteAddr = net.JoinHostPort(remoteHost, requestPort)
	}

	// We do not bother submitting a configuration change for the
	// new member if we can't contact it back using its address
	if err := n.checkHealth(ctx, remoteAddr, 5*time.Second); err != nil {
		return nil, err
	}

	// If the peer is already a member of the cluster, we will only update
	// its information, not add it as a new member. Adding it again would
	// cause the quorum to be computed incorrectly.
	for _, m := range n.cluster.Members() {
		if m.NodeID == nodeInfo.NodeID {
			if remoteAddr == m.Addr {
				return n.joinResponse(m.RaftID), nil
			}
			updatedRaftMember := &api.RaftMember{
				RaftID: m.RaftID,
				NodeID: m.NodeID,
				Addr:   remoteAddr,
			}
			if err := n.cluster.UpdateMember(m.RaftID, updatedRaftMember); err != nil {
				return nil, err
			}

			if err := n.updateNodeBlocking(ctx, m.RaftID, remoteAddr); err != nil {
				log.WithError(err).Error("failed to update node address")
				return nil, err
			}

			log.Info("updated node address")
			return n.joinResponse(m.RaftID), nil
		}
	}

	// Find a unique ID for the joining member.
	var raftID uint64
	for {
		raftID = uint64(rand.Int63()) + 1
		if n.cluster.GetMember(raftID) == nil && !n.cluster.IsIDRemoved(raftID) {
			break
		}
	}

	err = n.addMember(ctx, remoteAddr, raftID, nodeInfo.NodeID)
	if err != nil {
		log.WithError(err).Errorf("failed to add member %x", raftID)
		return nil, err
	}

	log.Debug("node joined")

	return n.joinResponse(raftID), nil
}

func (n *Node) joinResponse(raftID uint64) *api.JoinResponse {
	var nodes []*api.RaftMember
	for _, node := range n.cluster.Members() {
		nodes = append(nodes, &api.RaftMember{
			RaftID: node.RaftID,
			NodeID: node.NodeID,
			Addr:   node.Addr,
		})
	}

	return &api.JoinResponse{Members: nodes, RaftID: raftID}
}

// checkHealth tries to contact an aspiring member through its advertised address
// and checks if its raft server is running.
func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Duration) error {
	conn, err := dial(addr, "tcp", n.opts.TLSCredentials, timeout)
	if err != nil {
		return err
	}

	defer conn.Close()

	if timeout != 0 {
		tctx, cancel := context.WithTimeout(ctx, timeout)
		defer cancel()
		ctx = tctx
	}

	healthClient := api.NewHealthClient(conn)
	resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
	if err != nil {
		return errors.Wrap(err, "could not connect to prospective new cluster member using its advertised address")
	}
	if resp.Status != api.HealthCheckResponse_SERVING {
		return fmt.Errorf("health check returned status %s", resp.Status.String())
	}

	return nil
}

// addMember submits a configuration change to add a new member on the raft cluster.
func (n *Node) addMember(ctx context.Context, addr string, raftID uint64, nodeID string) error {
	node := api.RaftMember{
		RaftID: raftID,
		NodeID: nodeID,
		Addr:   addr,
	}

	meta, err := node.Marshal()
	if err != nil {
		return err
	}

	cc := raftpb.ConfChange{
		Type:    raftpb.ConfChangeAddNode,
		NodeID:  raftID,
		Context: meta,
	}

	// Wait for a raft round to process the configuration change
	return n.configure(ctx, cc)
}

// updateNodeBlocking runs synchronous job to update node address in whole cluster.
func (n *Node) updateNodeBlocking(ctx context.Context, id uint64, addr string) error {
	m := n.cluster.GetMember(id)
	if m == nil {
		return errors.Errorf("member %x is not found for update", id)
	}
	node := api.RaftMember{
		RaftID: m.RaftID,
		NodeID: m.NodeID,
		Addr:   addr,
	}

	meta, err := node.Marshal()
	if err != nil {
		return err
	}

	cc := raftpb.ConfChange{
		Type:    raftpb.ConfChangeUpdateNode,
		NodeID:  id,
		Context: meta,
	}

	// Wait for a raft round to process the configuration change
	return n.configure(ctx, cc)
}

// UpdateNode submits a configuration change to change a member's address.
func (n *Node) UpdateNode(id uint64, addr string) {
	ctx, cancel := n.WithContext(context.Background())
	defer cancel()
	// spawn updating info in raft in background to unblock transport
	go func() {
		if err := n.updateNodeBlocking(ctx, id, addr); err != nil {
			log.G(ctx).WithFields(logrus.Fields{"raft_id": n.Config.ID, "update_id": id}).WithError(err).Error("failed to update member address in cluster")
		}
	}()
}

// Leave asks to a member of the raft to remove
// us from the raft cluster. This method is called
// from a member who is willing to leave its raft
// membership to an active member of the raft
func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) {
	if req.Node == nil {
		return nil, grpc.Errorf(codes.InvalidArgument, "no node information provided")
	}

	nodeInfo, err := ca.RemoteNode(ctx)
	if err != nil {
		return nil, err
	}

	ctx, cancel := n.WithContext(ctx)
	defer cancel()

	fields := logrus.Fields{
		"node.id": nodeInfo.NodeID,
		"method":  "(*Node).Leave",
		"raft_id": fmt.Sprintf("%x", n.Config.ID),
	}
	if nodeInfo.ForwardedBy != nil {
		fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
	}
	log.G(ctx).WithFields(fields).Debug("")

	if err := n.removeMember(ctx, req.Node.RaftID); err != nil {
		return nil, err
	}

	return &api.LeaveResponse{}, nil
}

// CanRemoveMember checks if a member can be removed from
// the context of the current node.
func (n *Node) CanRemoveMember(id uint64) bool {
	members := n.cluster.Members()
	nreachable := 0 // reachable managers after removal

	for _, m := range members {
		if m.RaftID == id {
			continue
		}

		// Local node from where the remove is issued
		if m.RaftID == n.Config.ID {
			nreachable++
			continue
		}

		if n.transport.Active(m.RaftID) {
			nreachable++
		}
	}

	nquorum := (len(members)-1)/2 + 1
	if nreachable < nquorum {
		return false
	}

	return true
}

func (n *Node) removeMember(ctx context.Context, id uint64) error {
	// can't stop the raft node while an async RPC is in progress
	n.stopMu.RLock()
	defer n.stopMu.RUnlock()

	if !n.IsMember() {
		return ErrNoRaftMember
	}

	if !n.isLeader() {
		return ErrLostLeadership
	}

	n.membershipLock.Lock()
	defer n.membershipLock.Unlock()
	if !n.CanRemoveMember(id) {
		return ErrCannotRemoveMember
	}

	cc := raftpb.ConfChange{
		ID:      id,
		Type:    raftpb.ConfChangeRemoveNode,
		NodeID:  id,
		Context: []byte(""),
	}
	return n.configure(ctx, cc)
}

// TransferLeadership attempts to transfer leadership to a different node,
// and wait for the transfer to happen.
func (n *Node) TransferLeadership(ctx context.Context) error {
	ctx, cancelTransfer := context.WithTimeout(ctx, n.reqTimeout())
	defer cancelTransfer()

	n.stopMu.RLock()
	defer n.stopMu.RUnlock()

	if !n.IsMember() {
		return ErrNoRaftMember
	}

	if !n.isLeader() {
		return ErrLostLeadership
	}

	transferee, err := n.transport.LongestActive()
	if err != nil {
		return errors.Wrap(err, "failed to get longest-active member")
	}
	start := time.Now()
	n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
	ticker := time.NewTicker(n.opts.TickInterval / 10)
	defer ticker.Stop()
	var leader uint64
	for {
		leader = n.leader()
		if leader != raft.None && leader != n.Config.ID {
			break
		}
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-ticker.C:
		}
	}
	log.G(ctx).Infof("raft: transfer leadership %x -> %x finished in %v", n.Config.ID, leader, time.Since(start))
	return nil
}

// RemoveMember submits a configuration change to remove a member from the raft cluster
// after checking if the operation would not result in a loss of quorum.
func (n *Node) RemoveMember(ctx context.Context, id uint64) error {
	ctx, cancel := n.WithContext(ctx)
	defer cancel()
	return n.removeMember(ctx, id)
}

// processRaftMessageLogger is used to lazily create a logger for
// ProcessRaftMessage. Usually nothing will be logged, so it is useful to avoid
// formatting strings and allocating a logger when it won't be used.
func (n *Node) processRaftMessageLogger(ctx context.Context, msg *api.ProcessRaftMessageRequest) *logrus.Entry {
	fields := logrus.Fields{
		"method": "(*Node).ProcessRaftMessage",
	}

	if n.IsMember() {
		fields["raft_id"] = fmt.Sprintf("%x", n.Config.ID)
	}

	if msg != nil && msg.Message != nil {
		fields["from"] = fmt.Sprintf("%x", msg.Message.From)
	}

	return log.G(ctx).WithFields(fields)
}

func (n *Node) reportNewAddress(ctx context.Context, id uint64) error {
	// too early
	if !n.IsMember() {
		return nil
	}
	p, ok := peer.FromContext(ctx)
	if !ok {
		return nil
	}
	oldAddr, err := n.transport.PeerAddr(id)
	if err != nil {
		return err
	}
	if oldAddr == "" {
		// Don't know the address of the peer yet, so can't report an
		// update.
		return nil
	}
	newHost, _, err := net.SplitHostPort(p.Addr.String())
	if err != nil {
		return err
	}
	_, officialPort, err := net.SplitHostPort(oldAddr)
	if err != nil {
		return err
	}
	newAddr := net.JoinHostPort(newHost, officialPort)
	if err := n.transport.UpdatePeerAddr(id, newAddr); err != nil {
		return err
	}
	return nil
}

// ProcessRaftMessage calls 'Step' which advances the
// raft state machine with the provided message on the
// receiving node
func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
	if msg == nil || msg.Message == nil {
		n.processRaftMessageLogger(ctx, msg).Debug("received empty message")
		return &api.ProcessRaftMessageResponse{}, nil
	}

	// Don't process the message if this comes from
	// a node in the remove set
	if n.cluster.IsIDRemoved(msg.Message.From) {
		n.processRaftMessageLogger(ctx, msg).Debug("received message from removed member")
		return nil, grpc.Errorf(codes.NotFound, "%s", membership.ErrMemberRemoved.Error())
	}

	ctx, cancel := n.WithContext(ctx)
	defer cancel()

	// TODO(aaronl): Address changes are temporarily disabled.
	// See https://github.com/docker/docker/issues/30455.
	// This should be reenabled in the future with additional
	// safeguards (perhaps storing multiple addresses per node).
	//if err := n.reportNewAddress(ctx, msg.Message.From); err != nil {
	//	log.G(ctx).WithError(err).Errorf("failed to report new address of %x to transport", msg.Message.From)
	//}

	// Reject vote requests from unreachable peers
	if msg.Message.Type == raftpb.MsgVote {
		member := n.cluster.GetMember(msg.Message.From)
		if member == nil {
			n.processRaftMessageLogger(ctx, msg).Debug("received message from unknown member")
			return &api.ProcessRaftMessageResponse{}, nil
		}

		if err := n.transport.HealthCheck(ctx, msg.Message.From); err != nil {
			n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("member which sent vote request failed health check")
			return &api.ProcessRaftMessageResponse{}, nil
		}
	}

	if msg.Message.Type == raftpb.MsgProp {
		// We don't accept forwarded proposals. Our
		// current architecture depends on only the leader
		// making proposals, so in-flight proposals can be
		// guaranteed not to conflict.
		n.processRaftMessageLogger(ctx, msg).Debug("dropped forwarded proposal")
		return &api.ProcessRaftMessageResponse{}, nil
	}

	// can't stop the raft node while an async RPC is in progress
	n.stopMu.RLock()
	defer n.stopMu.RUnlock()

	if n.IsMember() {
		if msg.Message.To != n.Config.ID {
			n.processRaftMessageLogger(ctx, msg).Errorf("received message intended for raft_id %x", msg.Message.To)
			return &api.ProcessRaftMessageResponse{}, nil
		}

		if err := n.raftNode.Step(ctx, *msg.Message); err != nil {
			n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("raft Step failed")
		}
	}

	return &api.ProcessRaftMessageResponse{}, nil
}

// ResolveAddress returns the address reaching for a given node ID.
func (n *Node) ResolveAddress(ctx context.Context, msg *api.ResolveAddressRequest) (*api.ResolveAddressResponse, error) {
	if !n.IsMember() {
		return nil, ErrNoRaftMember
	}

	nodeInfo, err := ca.RemoteNode(ctx)
	if err != nil {
		return nil, err
	}

	fields := logrus.Fields{
		"node.id": nodeInfo.NodeID,
		"method":  "(*Node).ResolveAddress",
		"raft_id": fmt.Sprintf("%x", n.Config.ID),
	}
	if nodeInfo.ForwardedBy != nil {
		fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
	}
	log.G(ctx).WithFields(fields).Debug("")

	member := n.cluster.GetMember(msg.RaftID)
	if member == nil {
		return nil, grpc.Errorf(codes.NotFound, "member %x not found", msg.RaftID)
	}
	return &api.ResolveAddressResponse{Addr: member.Addr}, nil
}

func (n *Node) getLeaderConn() (*grpc.ClientConn, error) {
	leader, err := n.Leader()
	if err != nil {
		return nil, err
	}

	if leader == n.Config.ID {
		return nil, raftselector.ErrIsLeader
	}
	conn, err := n.transport.PeerConn(leader)
	if err != nil {
		return nil, errors.Wrap(err, "failed to get connection to leader")
	}
	return conn, nil
}

// LeaderConn returns current connection to cluster leader or raftselector.ErrIsLeader
// if current machine is leader.
func (n *Node) LeaderConn(ctx context.Context) (*grpc.ClientConn, error) {
	cc, err := n.getLeaderConn()
	if err == nil {
		return cc, nil
	}
	if err == raftselector.ErrIsLeader {
		return nil, err
	}
	if atomic.LoadUint32(&n.ticksWithNoLeader) > lostQuorumTimeout {
		return nil, errLostQuorum
	}

	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			cc, err := n.getLeaderConn()
			if err == nil {
				return cc, nil
			}
			if err == raftselector.ErrIsLeader {
				return nil, err
			}
		case <-ctx.Done():
			return nil, ctx.Err()
		}
	}
}

// registerNode registers a new node on the cluster memberlist
func (n *Node) registerNode(node *api.RaftMember) error {
	if n.cluster.IsIDRemoved(node.RaftID) {
		return nil
	}

	member := &membership.Member{}

	existingMember := n.cluster.GetMember(node.RaftID)
	if existingMember != nil {
		// Member already exists

		// If the address is different from what we thought it was,
		// update it. This can happen if we just joined a cluster
		// and are adding ourself now with the remotely-reachable
		// address.
		if existingMember.Addr != node.Addr {
			if node.RaftID != n.Config.ID {
				if err := n.transport.UpdatePeer(node.RaftID, node.Addr); err != nil {
					return err
				}
			}
			member.RaftMember = node
			n.cluster.AddMember(member)
		}

		return nil
	}

	// Avoid opening a connection to the local node
	if node.RaftID != n.Config.ID {
		if err := n.transport.AddPeer(node.RaftID, node.Addr); err != nil {
			return err
		}
	}

	member.RaftMember = node
	err := n.cluster.AddMember(member)
	if err != nil {
		if rerr := n.transport.RemovePeer(node.RaftID); rerr != nil {
			return errors.Wrapf(rerr, "failed to remove peer after error %v", err)
		}
		return err
	}

	return nil
}

// ProposeValue calls Propose on the raft and waits
// on the commit log action before returning a result
func (n *Node) ProposeValue(ctx context.Context, storeAction []api.StoreAction, cb func()) error {
	ctx, cancel := n.WithContext(ctx)
	defer cancel()
	_, err := n.processInternalRaftRequest(ctx, &api.InternalRaftRequest{Action: storeAction}, cb)
	if err != nil {
		return err
	}
	return nil
}

// GetVersion returns the sequence information for the current raft round.
func (n *Node) GetVersion() *api.Version {
	n.stopMu.RLock()
	defer n.stopMu.RUnlock()

	if !n.IsMember() {
		return nil
	}

	status := n.Status()
	return &api.Version{Index: status.Commit}
}

// ChangesBetween returns the changes starting after "from", up to and
// including "to". If these changes are not available because the log
// has been compacted, an error will be returned.
func (n *Node) ChangesBetween(from, to api.Version) ([]state.Change, error) {
	n.stopMu.RLock()
	defer n.stopMu.RUnlock()

	if from.Index > to.Index {
		return nil, errors.New("versions are out of order")
	}

	if !n.IsMember() {
		return nil, ErrNoRaftMember
	}

	// never returns error
	last, _ := n.raftStore.LastIndex()

	if to.Index > last {
		return nil, errors.New("last version is out of bounds")
	}

	pbs, err := n.raftStore.Entries(from.Index+1, to.Index+1, math.MaxUint64)
	if err != nil {
		return nil, err
	}

	var changes []state.Change
	for _, pb := range pbs {
		if pb.Type != raftpb.EntryNormal || pb.Data == nil {
			continue
		}
		r := &api.InternalRaftRequest{}
		err := proto.Unmarshal(pb.Data, r)
		if err != nil {
			return nil, errors.Wrap(err, "error umarshalling internal raft request")
		}

		if r.Action != nil {
			changes = append(changes, state.Change{StoreActions: r.Action, Version: api.Version{Index: pb.Index}})
		}
	}

	return changes, nil
}

// SubscribePeers subscribes to peer updates in cluster. It sends always full
// list of peers.
func (n *Node) SubscribePeers() (q chan events.Event, cancel func()) {
	return n.cluster.PeersBroadcast.Watch()
}

// GetMemberlist returns the current list of raft members in the cluster.
func (n *Node) GetMemberlist() map[uint64]*api.RaftMember {
	memberlist := make(map[uint64]*api.RaftMember)
	members := n.cluster.Members()
	leaderID, err := n.Leader()
	if err != nil {
		leaderID = raft.None
	}

	for id, member := range members {
		reachability := api.RaftMemberStatus_REACHABLE
		leader := false

		if member.RaftID != n.Config.ID {
			if !n.transport.Active(member.RaftID) {
				reachability = api.RaftMemberStatus_UNREACHABLE
			}
		}

		if member.RaftID == leaderID {
			leader = true
		}

		memberlist[id] = &api.RaftMember{
			RaftID: member.RaftID,
			NodeID: member.NodeID,
			Addr:   member.Addr,
			Status: api.RaftMemberStatus{
				Leader:       leader,
				Reachability: reachability,
			},
		}
	}

	return memberlist
}

// Status returns status of underlying etcd.Node.
func (n *Node) Status() raft.Status {
	return n.raftNode.Status()
}

// GetMemberByNodeID returns member information based
// on its generic Node ID.
func (n *Node) GetMemberByNodeID(nodeID string) *membership.Member {
	members := n.cluster.Members()
	for _, member := range members {
		if member.NodeID == nodeID {
			return member
		}
	}
	return nil
}

// IsMember checks if the raft node has effectively joined
// a cluster of existing members.
func (n *Node) IsMember() bool {
	return atomic.LoadUint32(&n.isMember) == 1
}

// Saves a log entry to our Store
func (n *Node) saveToStorage(
	ctx context.Context,
	raftConfig *api.RaftConfig,
	hardState raftpb.HardState,
	entries []raftpb.Entry,
	snapshot raftpb.Snapshot,
) (err error) {

	if !raft.IsEmptySnap(snapshot) {
		if err := n.raftLogger.SaveSnapshot(snapshot); err != nil {
			return errors.Wrap(err, "failed to save snapshot")
		}
		if err := n.raftLogger.GC(snapshot.Metadata.Index, snapshot.Metadata.Term, raftConfig.KeepOldSnapshots); err != nil {
			log.G(ctx).WithError(err).Error("unable to clean old snapshots and WALs")
		}
		if err = n.raftStore.ApplySnapshot(snapshot); err != nil {
			return errors.Wrap(err, "failed to apply snapshot on raft node")
		}
	}

	if err := n.raftLogger.SaveEntries(hardState, entries); err != nil {
		return errors.Wrap(err, "failed to save raft log entries")
	}

	if len(entries) > 0 {
		lastIndex := entries[len(entries)-1].Index
		if lastIndex > n.writtenWALIndex {
			n.writtenWALIndex = lastIndex
		}
	}

	if err = n.raftStore.Append(entries); err != nil {
		return errors.Wrap(err, "failed to append raft log entries")
	}

	return nil
}

// processInternalRaftRequest sends a message to nodes participating
// in the raft to apply a log entry and then waits for it to be applied
// on the server. It will block until the update is performed, there is
// an error or until the raft node finalizes all the proposals on node
// shutdown.
func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRaftRequest, cb func()) (proto.Message, error) {
	n.stopMu.RLock()
	if !n.IsMember() {
		n.stopMu.RUnlock()
		return nil, ErrStopped
	}
	n.waitProp.Add(1)
	defer n.waitProp.Done()
	n.stopMu.RUnlock()

	r.ID = n.reqIDGen.Next()

	// This must be derived from the context which is cancelled by stop()
	// to avoid a deadlock on shutdown.
	waitCtx, cancel := context.WithCancel(ctx)

	ch := n.wait.register(r.ID, cb, cancel)

	// Do this check after calling register to avoid a race.
	if atomic.LoadUint32(&n.signalledLeadership) != 1 {
		n.wait.cancel(r.ID)
		return nil, ErrLostLeadership
	}

	data, err := r.Marshal()
	if err != nil {
		n.wait.cancel(r.ID)
		return nil, err
	}

	if len(data) > store.MaxTransactionBytes {
		n.wait.cancel(r.ID)
		return nil, ErrRequestTooLarge
	}

	err = n.raftNode.Propose(waitCtx, data)
	if err != nil {
		n.wait.cancel(r.ID)
		return nil, err
	}

	select {
	case x, ok := <-ch:
		if !ok {
			return nil, ErrLostLeadership
		}
		return x.(proto.Message), nil
	case <-waitCtx.Done():
		n.wait.cancel(r.ID)
		// if channel is closed, wait item was canceled, otherwise it was triggered
		x, ok := <-ch
		if !ok {
			return nil, ErrLostLeadership
		}
		return x.(proto.Message), nil
	case <-ctx.Done():
		n.wait.cancel(r.ID)
		// if channel is closed, wait item was canceled, otherwise it was triggered
		x, ok := <-ch
		if !ok {
			return nil, ctx.Err()
		}
		return x.(proto.Message), nil
	}
}

// configure sends a configuration change through consensus and
// then waits for it to be applied to the server. It will block
// until the change is performed or there is an error.
func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error {
	cc.ID = n.reqIDGen.Next()

	ctx, cancel := context.WithCancel(ctx)
	ch := n.wait.register(cc.ID, nil, cancel)

	if err := n.raftNode.ProposeConfChange(ctx, cc); err != nil {
		n.wait.cancel(cc.ID)
		return err
	}

	select {
	case x := <-ch:
		if err, ok := x.(error); ok {
			return err
		}
		if x != nil {
			log.G(ctx).Panic("raft: configuration change error, return type should always be error")
		}
		return nil
	case <-ctx.Done():
		n.wait.cancel(cc.ID)
		return ctx.Err()
	}
}

func (n *Node) processCommitted(ctx context.Context, entry raftpb.Entry) error {
	// Process a normal entry
	if entry.Type == raftpb.EntryNormal && entry.Data != nil {
		if err := n.processEntry(ctx, entry); err != nil {
			return err
		}
	}

	// Process a configuration change (add/remove node)
	if entry.Type == raftpb.EntryConfChange {
		n.processConfChange(ctx, entry)
	}

	n.appliedIndex = entry.Index
	return nil
}

func (n *Node) processEntry(ctx context.Context, entry raftpb.Entry) error {
	r := &api.InternalRaftRequest{}
	err := proto.Unmarshal(entry.Data, r)
	if err != nil {
		return err
	}

	if !n.wait.trigger(r.ID, r) {
		// There was no wait on this ID, meaning we don't have a
		// transaction in progress that would be committed to the
		// memory store by the "trigger" call. Either a different node
		// wrote this to raft, or we wrote it before losing the leader
		// position and cancelling the transaction. Create a new
		// transaction to commit the data.

		// It should not be possible for processInternalRaftRequest
		// to be running in this situation, but out of caution we
		// cancel any current invocations to avoid a deadlock.
		n.wait.cancelAll()

		err := n.memoryStore.ApplyStoreActions(r.Action)
		if err != nil {
			log.G(ctx).WithError(err).Error("failed to apply actions from raft")
		}
	}
	return nil
}

func (n *Node) processConfChange(ctx context.Context, entry raftpb.Entry) {
	var (
		err error
		cc  raftpb.ConfChange
	)

	if err := proto.Unmarshal(entry.Data, &cc); err != nil {
		n.wait.trigger(cc.ID, err)
	}

	if err := n.cluster.ValidateConfigurationChange(cc); err != nil {
		n.wait.trigger(cc.ID, err)
	}

	switch cc.Type {
	case raftpb.ConfChangeAddNode:
		err = n.applyAddNode(cc)
	case raftpb.ConfChangeUpdateNode:
		err = n.applyUpdateNode(ctx, cc)
	case raftpb.ConfChangeRemoveNode:
		err = n.applyRemoveNode(ctx, cc)
	}

	if err != nil {
		n.wait.trigger(cc.ID, err)
	}

	n.confState = *n.raftNode.ApplyConfChange(cc)
	n.wait.trigger(cc.ID, nil)
}

// applyAddNode is called when we receive a ConfChange
// from a member in the raft cluster, this adds a new
// node to the existing raft cluster
func (n *Node) applyAddNode(cc raftpb.ConfChange) error {
	member := &api.RaftMember{}
	err := proto.Unmarshal(cc.Context, member)
	if err != nil {
		return err
	}

	// ID must be non zero
	if member.RaftID == 0 {
		return nil
	}

	if err = n.registerNode(member); err != nil {
		return err
	}
	return nil
}

// applyUpdateNode is called when we receive a ConfChange from a member in the
// raft cluster which update the address of an existing node.
func (n *Node) applyUpdateNode(ctx context.Context, cc raftpb.ConfChange) error {
	newMember := &api.RaftMember{}
	err := proto.Unmarshal(cc.Context, newMember)
	if err != nil {
		return err
	}

	if newMember.RaftID == n.Config.ID {
		return nil
	}
	if err := n.transport.UpdatePeer(newMember.RaftID, newMember.Addr); err != nil {
		return err
	}
	return n.cluster.UpdateMember(newMember.RaftID, newMember)
}

// applyRemoveNode is called when we receive a ConfChange
// from a member in the raft cluster, this removes a node
// from the existing raft cluster
func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err error) {
	// If the node from where the remove is issued is
	// a follower and the leader steps down, Campaign
	// to be the leader.

	if cc.NodeID == n.leader() && !n.isLeader() {
		if err = n.raftNode.Campaign(ctx); err != nil {
			return err
		}
	}

	if cc.NodeID == n.Config.ID {
		// wait for the commit ack to be sent before closing connection
		n.asyncTasks.Wait()

		n.NodeRemoved()
	} else if err := n.transport.RemovePeer(cc.NodeID); err != nil {
		return err
	}

	return n.cluster.RemoveMember(cc.NodeID)
}

// SubscribeLeadership returns channel to which events about leadership change
// will be sent in form of raft.LeadershipState. Also cancel func is returned -
// it should be called when listener is no longer interested in events.
func (n *Node) SubscribeLeadership() (q chan events.Event, cancel func()) {
	return n.leadershipBroadcast.Watch()
}

// createConfigChangeEnts creates a series of Raft entries (i.e.
// EntryConfChange) to remove the set of given IDs from the cluster. The ID
// `self` is _not_ removed, even if present in the set.
// If `self` is not inside the given ids, it creates a Raft entry to add a
// default member with the given `self`.
func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
	var ents []raftpb.Entry
	next := index + 1
	found := false
	for _, id := range ids {
		if id == self {
			found = true
			continue
		}
		cc := &raftpb.ConfChange{
			Type:   raftpb.ConfChangeRemoveNode,
			NodeID: id,
		}
		data, err := cc.Marshal()
		if err != nil {
			log.L.WithError(err).Panic("marshal configuration change should never fail")
		}
		e := raftpb.Entry{
			Type:  raftpb.EntryConfChange,
			Data:  data,
			Term:  term,
			Index: next,
		}
		ents = append(ents, e)
		next++
	}
	if !found {
		node := &api.RaftMember{RaftID: self}
		meta, err := node.Marshal()
		if err != nil {
			log.L.WithError(err).Panic("marshal member should never fail")
		}
		cc := &raftpb.ConfChange{
			Type:    raftpb.ConfChangeAddNode,
			NodeID:  self,
			Context: meta,
		}
		data, err := cc.Marshal()
		if err != nil {
			log.L.WithError(err).Panic("marshal configuration change should never fail")
		}
		e := raftpb.Entry{
			Type:  raftpb.EntryConfChange,
			Data:  data,
			Term:  term,
			Index: next,
		}
		ents = append(ents, e)
	}
	return ents
}

// getIDs returns an ordered set of IDs included in the given snapshot and
// the entries. The given snapshot/entries can contain two kinds of
// ID-related entry:
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
	ids := make(map[uint64]struct{})
	if snap != nil {
		for _, id := range snap.Metadata.ConfState.Nodes {
			ids[id] = struct{}{}
		}
	}
	for _, e := range ents {
		if e.Type != raftpb.EntryConfChange {
			continue
		}
		if snap != nil && e.Index < snap.Metadata.Index {
			continue
		}
		var cc raftpb.ConfChange
		if err := cc.Unmarshal(e.Data); err != nil {
			log.L.WithError(err).Panic("unmarshal configuration change should never fail")
		}
		switch cc.Type {
		case raftpb.ConfChangeAddNode:
			ids[cc.NodeID] = struct{}{}
		case raftpb.ConfChangeRemoveNode:
			delete(ids, cc.NodeID)
		case raftpb.ConfChangeUpdateNode:
			// do nothing
		default:
			log.L.Panic("ConfChange Type should be either ConfChangeAddNode, or ConfChangeRemoveNode, or ConfChangeUpdateNode!")
		}
	}
	var sids []uint64
	for id := range ids {
		sids = append(sids, id)
	}
	return sids
}

func (n *Node) reqTimeout() time.Duration {
	return 5*time.Second + 2*time.Duration(n.Config.ElectionTick)*n.opts.TickInterval
}