package volumequeue

import (
	"sync"
	"time"
)

// baseRetryInterval is the base interval to retry volume operations. each
// subsequent attempt is exponential from this one
const baseRetryInterval = 100 * time.Millisecond

// maxRetryInterval is the maximum amount of time we will wait between retrying
// volume operations.
const maxRetryInterval = 10 * time.Minute

// vqTimerSource is an interface for creating timers for the volumeQueue
type vqTimerSource interface {
	// NewTimer takes an attempt number and returns a vqClockTrigger which will
	// trigger after a set period based on that attempt number.
	NewTimer(attempt uint) vqTimer
}

// vqTimer is an interface representing a timer. However, the timer
// trigger channel, C, is instead wrapped in a Done method, so that in testing,
// the timer can be substituted for a different object.
type vqTimer interface {
	Done() <-chan time.Time
	Stop() bool
}

// timerSource is an empty struct type which is used to represent the default
// vqTimerSource, which uses time.Timer.
type timerSource struct{}

// NewTimer creates a new timer.
func (timerSource) NewTimer(attempt uint) vqTimer {
	var waitFor time.Duration
	if attempt == 0 {
		waitFor = 0
	} else {
		// bit-shifting the base retry interval will raise it by 2 to the power
		// of attempt. this is an easy way to do an exponent solely with
		// integers
		waitFor = baseRetryInterval << attempt
		if waitFor > maxRetryInterval {
			waitFor = maxRetryInterval
		}
	}
	return timer{Timer: time.NewTimer(waitFor)}
}

// timer wraps a time.Timer to provide a Done method.
type timer struct {
	*time.Timer
}

// Done returns the timer's C channel, which triggers in response to the timer
// expiring.
func (t timer) Done() <-chan time.Time {
	return t.C
}

// VolumeQueue manages the exponential backoff of retrying volumes. it behaves
// somewhat like a priority queue. however, the key difference is that volumes
// which are ready to process or reprocess are read off of an unbuffered
// channel, meaning the order in which ready volumes are processed is at the
// mercy of the golang scheduler. in practice, this does not matter.
type VolumeQueue struct {
	sync.Mutex
	// next returns the next volumeQueueEntry when it is ready.
	next chan *volumeQueueEntry
	// outstanding is the set of all pending volumeQueueEntries, mapped by
	// volume ID.
	outstanding map[string]*volumeQueueEntry
	// stopChan stops the volumeQueue and cancels all entries.
	stopChan chan struct{}

	// timerSource is an object which is used to create the timer for a
	// volumeQueueEntry. it exists so that in testing, the timer can be
	// substituted for an object that we control.
	timerSource vqTimerSource
}

// volumeQueueEntry represents one entry in the volumeQueue
type volumeQueueEntry struct {
	// id is the id of the volume this entry represents. we only need the ID,
	// because the CSI manager will look up the latest revision of the volume
	// before doing any work on it.
	id string
	// attempt is the current retry attempt of the entry.
	attempt uint
	// cancel is a function which is called to abort the retry attempt.
	cancel func()
}

// NewVolumeQueue returns a new VolumeQueue with the default timerSource.
func NewVolumeQueue() *VolumeQueue {
	return &VolumeQueue{
		next:        make(chan *volumeQueueEntry),
		outstanding: make(map[string]*volumeQueueEntry),
		stopChan:    make(chan struct{}),
		timerSource: timerSource{},
	}
}

// Enqueue adds an entry to the VolumeQueue with the specified retry attempt.
// if an entry for the specified id already exists, enqueue will remove it and
// create a new entry.
func (vq *VolumeQueue) Enqueue(id string, attempt uint) {
	// we must lock the volumeQueue when we add entries, because we will be
	// accessing the outstanding map
	vq.Lock()
	defer vq.Unlock()

	if entry, ok := vq.outstanding[id]; ok {
		entry.cancel()
		delete(vq.outstanding, id)
	}

	cancelChan := make(chan struct{})
	v := &volumeQueueEntry{
		id:      id,
		attempt: attempt,
		cancel: func() {
			close(cancelChan)
		},
	}

	t := vq.timerSource.NewTimer(attempt)

	// this goroutine is the meat of the volumeQueue. when the timer triggers,
	// the volume queue entry is written out to the next channel.
	//
	// the nature of the select statement, and of goroutines and of
	// ansynchronous operations means that this is not actually strictly
	// ordered. if several entries are ready, then the one that actually gets
	// dequeued is at the mercy of the golang scheduler.
	//
	// however, the flip side of this is that canceling an entry truly cancels
	// it. because we're blocking on a write attempt, if we cancel, we don't
	// do that write attempt, and there's no need to try to remove from the
	// queue a ready-but-now-canceled entry before it is processed.
	go func() {
		select {
		case <-t.Done():
			// once the timer fires, we will try to write this entry to the
			// next channel. however, because next is unbuffered, if we ended
			// up in a situation where no read occurred, we would be
			// deadlocked. to avoid this, we select on both a vq.next write and
			// on a read from cancelChan, which allows us to abort our write
			// attempt.
			select {
			case vq.next <- v:
			case <-cancelChan:
			}
		case <-cancelChan:
			// the documentation for timer recommends draining the channel like
			// this.
			if !t.Stop() {
				<-t.Done()
			}
		}
	}()

	vq.outstanding[id] = v
}

// Wait returns the ID and attempt number of the next Volume ready to process.
// If no volume is ready, wait blocks until one is ready. if the volumeQueue
// is stopped, wait returns "", 0
func (vq *VolumeQueue) Wait() (string, uint) {
	select {
	case v := <-vq.next:
		vq.Lock()
		defer vq.Unlock()
		// we need to be certain that this entry is the same entry that we
		// read, because otherwise there may be a race.
		//
		// it would be possible for the read from next to succeed, but before
		// the lock is acquired, a new attempt is enqueued. enqueuing the new
		// attempt deletes the old entry before replacing it with the new entry
		// and releasing the lock. then, this routine may acquire the lock, and
		// delete a new entry.
		//
		// in practice, it is unclear if this race could happen or would matter
		// if it did, but always better safe than sorry.
		e, ok := vq.outstanding[v.id]
		if ok && e == v {
			delete(vq.outstanding, v.id)
		}

		return v.id, v.attempt
	case <-vq.stopChan:
		// if the volumeQueue is stopped, then there may be no more writes, so
		// we should return an empty result from wait
		return "", 0
	}
}

// Outstanding returns the number of items outstanding in this queue
func (vq *VolumeQueue) Outstanding() int {
	return len(vq.outstanding)
}

// Stop stops the volumeQueue and cancels all outstanding entries. stop may
// only be called once.
func (vq *VolumeQueue) Stop() {
	vq.Lock()
	defer vq.Unlock()
	close(vq.stopChan)
	for _, entry := range vq.outstanding {
		entry.cancel()
	}
	return
}