package taskinit

import (
	"sort"
	"time"

	"github.com/docker/swarmkit/api"
	"github.com/docker/swarmkit/api/defaults"
	"github.com/docker/swarmkit/log"
	"github.com/docker/swarmkit/manager/orchestrator"
	"github.com/docker/swarmkit/manager/orchestrator/restart"
	"github.com/docker/swarmkit/manager/state/store"
	gogotypes "github.com/gogo/protobuf/types"
	"golang.org/x/net/context"
)

// InitHandler defines orchestrator's action to fix tasks at start.
type InitHandler interface {
	IsRelatedService(service *api.Service) bool
	FixTask(ctx context.Context, batch *store.Batch, t *api.Task)
	SlotTuple(t *api.Task) orchestrator.SlotTuple
}

// CheckTasks fixes tasks in the store before orchestrator runs. The previous leader might
// not have finished processing their updates and left them in an inconsistent state.
func CheckTasks(ctx context.Context, s *store.MemoryStore, readTx store.ReadTx, initHandler InitHandler, startSupervisor *restart.Supervisor) error {
	instances := make(map[orchestrator.SlotTuple][]*api.Task)
	err := s.Batch(func(batch *store.Batch) error {
		tasks, err := store.FindTasks(readTx, store.All)
		if err != nil {
			return err
		}
		for _, t := range tasks {
			if t.ServiceID == "" {
				continue
			}

			// TODO(aluzzardi): We should NOT retrieve the service here.
			service := store.GetService(readTx, t.ServiceID)
			if service == nil {
				// Service was deleted
				err := batch.Update(func(tx store.Tx) error {
					return store.DeleteTask(tx, t.ID)
				})
				if err != nil {
					log.G(ctx).WithError(err).Error("failed to delete task")
				}
				continue
			}
			if !initHandler.IsRelatedService(service) {
				continue
			}

			tuple := initHandler.SlotTuple(t)
			instances[tuple] = append(instances[tuple], t)

			// handle task updates from agent which should have been triggered by task update events
			initHandler.FixTask(ctx, batch, t)

			// desired state ready is a transient state that it should be started.
			// however previous leader may not have started it, retry start here
			if t.DesiredState != api.TaskStateReady || t.Status.State > api.TaskStateRunning {
				continue
			}
			restartDelay, _ := gogotypes.DurationFromProto(defaults.Service.Task.Restart.Delay)
			if t.Spec.Restart != nil && t.Spec.Restart.Delay != nil {
				var err error
				restartDelay, err = gogotypes.DurationFromProto(t.Spec.Restart.Delay)
				if err != nil {
					log.G(ctx).WithError(err).Error("invalid restart delay")
					restartDelay, _ = gogotypes.DurationFromProto(defaults.Service.Task.Restart.Delay)
				}
			}
			if restartDelay != 0 {
				var timestamp time.Time
				if t.Status.AppliedAt != nil {
					timestamp, err = gogotypes.TimestampFromProto(t.Status.AppliedAt)
				} else {
					timestamp, err = gogotypes.TimestampFromProto(t.Status.Timestamp)
				}
				if err == nil {
					restartTime := timestamp.Add(restartDelay)
					calculatedRestartDelay := restartTime.Sub(time.Now())
					if calculatedRestartDelay < restartDelay {
						restartDelay = calculatedRestartDelay
					}
					if restartDelay > 0 {
						_ = batch.Update(func(tx store.Tx) error {
							t := store.GetTask(tx, t.ID)
							// TODO(aluzzardi): This is shady as well. We should have a more generic condition.
							if t == nil || t.DesiredState != api.TaskStateReady {
								return nil
							}
							startSupervisor.DelayStart(ctx, tx, nil, t.ID, restartDelay, true)
							return nil
						})
						continue
					}
				} else {
					log.G(ctx).WithError(err).Error("invalid status timestamp")
				}
			}

			// Start now
			err := batch.Update(func(tx store.Tx) error {
				return startSupervisor.StartNow(tx, t.ID)
			})
			if err != nil {
				log.G(ctx).WithError(err).WithField("task.id", t.ID).Error("moving task out of delayed state failed")
			}
		}
		return nil
	})
	if err != nil {
		return err
	}

	for tuple, instance := range instances {
		// Find the most current spec version. That's the only one
		// we care about for the purpose of reconstructing restart
		// history.
		maxVersion := uint64(0)
		for _, t := range instance {
			if t.SpecVersion != nil && t.SpecVersion.Index > maxVersion {
				maxVersion = t.SpecVersion.Index
			}
		}

		// Create a new slice with just the current spec version tasks.
		var upToDate []*api.Task
		for _, t := range instance {
			if t.SpecVersion != nil && t.SpecVersion.Index == maxVersion {
				upToDate = append(upToDate, t)
			}
		}

		// Sort by creation timestamp
		sort.Sort(tasksByCreationTimestamp(upToDate))

		// All up-to-date tasks in this instance except the first one
		// should be considered restarted.
		if len(upToDate) < 2 {
			continue
		}
		for _, t := range upToDate[1:] {
			startSupervisor.RecordRestartHistory(tuple, t)
		}
	}
	return nil
}

type tasksByCreationTimestamp []*api.Task

func (t tasksByCreationTimestamp) Len() int {
	return len(t)
}
func (t tasksByCreationTimestamp) Swap(i, j int) {
	t[i], t[j] = t[j], t[i]
}
func (t tasksByCreationTimestamp) Less(i, j int) bool {
	if t[i].Meta.CreatedAt == nil {
		return true
	}
	if t[j].Meta.CreatedAt == nil {
		return false
	}
	if t[i].Meta.CreatedAt.Seconds < t[j].Meta.CreatedAt.Seconds {
		return true
	}
	if t[i].Meta.CreatedAt.Seconds > t[j].Meta.CreatedAt.Seconds {
		return false
	}
	return t[i].Meta.CreatedAt.Nanos < t[j].Meta.CreatedAt.Nanos
}