package global import ( "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/constraint" "github.com/docker/swarmkit/manager/orchestrator" "github.com/docker/swarmkit/manager/orchestrator/restart" "github.com/docker/swarmkit/manager/orchestrator/taskinit" "github.com/docker/swarmkit/manager/orchestrator/update" "github.com/docker/swarmkit/manager/state/store" "golang.org/x/net/context" ) type globalService struct { *api.Service // Compiled constraints constraints []constraint.Constraint } // Orchestrator runs a reconciliation loop to create and destroy tasks as // necessary for global services. type Orchestrator struct { store *store.MemoryStore // nodes is the set of non-drained nodes in the cluster, indexed by node ID nodes map[string]*api.Node // globalServices has all the global services in the cluster, indexed by ServiceID globalServices map[string]globalService restartTasks map[string]struct{} // stopChan signals to the state machine to stop running. stopChan chan struct{} // doneChan is closed when the state machine terminates. doneChan chan struct{} updater *update.Supervisor restarts *restart.Supervisor cluster *api.Cluster // local instance of the cluster } // NewGlobalOrchestrator creates a new global Orchestrator func NewGlobalOrchestrator(store *store.MemoryStore) *Orchestrator { restartSupervisor := restart.NewSupervisor(store) updater := update.NewSupervisor(store, restartSupervisor) return &Orchestrator{ store: store, nodes: make(map[string]*api.Node), globalServices: make(map[string]globalService), stopChan: make(chan struct{}), doneChan: make(chan struct{}), updater: updater, restarts: restartSupervisor, restartTasks: make(map[string]struct{}), } } func (g *Orchestrator) initTasks(ctx context.Context, readTx store.ReadTx) error { return taskinit.CheckTasks(ctx, g.store, readTx, g, g.restarts) } // Run contains the global orchestrator event loop func (g *Orchestrator) Run(ctx context.Context) error { defer close(g.doneChan) // Watch changes to services and tasks queue := g.store.WatchQueue() watcher, cancel := queue.Watch() defer cancel() // lookup the cluster var err error g.store.View(func(readTx store.ReadTx) { var clusters []*api.Cluster clusters, err = store.FindClusters(readTx, store.ByName("default")) if len(clusters) != 1 { return // just pick up the cluster when it is created. } g.cluster = clusters[0] }) if err != nil { return err } // Get list of nodes var nodes []*api.Node g.store.View(func(readTx store.ReadTx) { nodes, err = store.FindNodes(readTx, store.All) }) if err != nil { return err } for _, n := range nodes { g.updateNode(n) } // Lookup global services var existingServices []*api.Service g.store.View(func(readTx store.ReadTx) { existingServices, err = store.FindServices(readTx, store.All) }) if err != nil { return err } var reconcileServiceIDs []string for _, s := range existingServices { if orchestrator.IsGlobalService(s) { g.updateService(s) reconcileServiceIDs = append(reconcileServiceIDs, s.ID) } } // fix tasks in store before reconciliation loop g.store.View(func(readTx store.ReadTx) { err = g.initTasks(ctx, readTx) }) if err != nil { return err } g.tickTasks(ctx) g.reconcileServices(ctx, reconcileServiceIDs) for { select { case event := <-watcher: // TODO(stevvooe): Use ctx to limit running time of operation. switch v := event.(type) { case api.EventUpdateCluster: g.cluster = v.Cluster case api.EventCreateService: if !orchestrator.IsGlobalService(v.Service) { continue } g.updateService(v.Service) g.reconcileServices(ctx, []string{v.Service.ID}) case api.EventUpdateService: if !orchestrator.IsGlobalService(v.Service) { continue } g.updateService(v.Service) g.reconcileServices(ctx, []string{v.Service.ID}) case api.EventDeleteService: if !orchestrator.IsGlobalService(v.Service) { continue } orchestrator.DeleteServiceTasks(ctx, g.store, v.Service) // delete the service from service map delete(g.globalServices, v.Service.ID) g.restarts.ClearServiceHistory(v.Service.ID) case api.EventCreateNode: g.updateNode(v.Node) g.reconcileOneNode(ctx, v.Node) case api.EventUpdateNode: g.updateNode(v.Node) g.reconcileOneNode(ctx, v.Node) case api.EventDeleteNode: g.foreachTaskFromNode(ctx, v.Node, g.deleteTask) delete(g.nodes, v.Node.ID) case api.EventUpdateTask: g.handleTaskChange(ctx, v.Task) } case <-g.stopChan: return nil } g.tickTasks(ctx) } } // FixTask validates a task with the current cluster settings, and takes // action to make it conformant to node state and service constraint // it's called at orchestrator initialization func (g *Orchestrator) FixTask(ctx context.Context, batch *store.Batch, t *api.Task) { if _, exists := g.globalServices[t.ServiceID]; !exists { return } // if a task's DesiredState has past running, the task has been processed if t.DesiredState > api.TaskStateRunning { return } var node *api.Node if t.NodeID != "" { node = g.nodes[t.NodeID] } // if the node no longer valid, remove the task if t.NodeID == "" || orchestrator.InvalidNode(node) { g.shutdownTask(ctx, batch, t) return } // restart a task if it fails if t.Status.State > api.TaskStateRunning { g.restartTasks[t.ID] = struct{}{} } } // handleTaskChange defines what orchestrator does when a task is updated by agent func (g *Orchestrator) handleTaskChange(ctx context.Context, t *api.Task) { if _, exists := g.globalServices[t.ServiceID]; !exists { return } // if a task's DesiredState has passed running, it // means the task has been processed if t.DesiredState > api.TaskStateRunning { return } // if a task has passed running, restart it if t.Status.State > api.TaskStateRunning { g.restartTasks[t.ID] = struct{}{} } } // Stop stops the orchestrator. func (g *Orchestrator) Stop() { close(g.stopChan) <-g.doneChan g.updater.CancelAll() g.restarts.CancelAll() } func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node, cb func(context.Context, *store.Batch, *api.Task)) { var ( tasks []*api.Task err error ) g.store.View(func(tx store.ReadTx) { tasks, err = store.FindTasks(tx, store.ByNodeID(node.ID)) }) if err != nil { log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed finding tasks") return } err = g.store.Batch(func(batch *store.Batch) error { for _, t := range tasks { // Global orchestrator only removes tasks from globalServices if _, exists := g.globalServices[t.ServiceID]; exists { cb(ctx, batch, t) } } return nil }) if err != nil { log.G(ctx).WithError(err).Errorf("global orchestrator: foreachTaskFromNode failed batching tasks") } } func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []string) { nodeTasks := make(map[string]map[string][]*api.Task) g.store.View(func(tx store.ReadTx) { for _, serviceID := range serviceIDs { service := g.globalServices[serviceID].Service if service == nil { continue } tasks, err := store.FindTasks(tx, store.ByServiceID(serviceID)) if err != nil { log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices failed finding tasks for service %s", serviceID) continue } // nodeID -> task list nodeTasks[serviceID] = make(map[string][]*api.Task) for _, t := range tasks { nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t) } // Keep all runnable instances of this service, // and instances that were not be restarted due // to restart policy but may be updated if the // service spec changed. for nodeID, slot := range nodeTasks[serviceID] { updatable := g.restarts.UpdatableTasksInSlot(ctx, slot, g.globalServices[serviceID].Service) if len(updatable) != 0 { nodeTasks[serviceID][nodeID] = updatable } else { delete(nodeTasks[serviceID], nodeID) } } } }) updates := make(map[*api.Service][]orchestrator.Slot) err := g.store.Batch(func(batch *store.Batch) error { for _, serviceID := range serviceIDs { var updateTasks []orchestrator.Slot if _, exists := nodeTasks[serviceID]; !exists { continue } service := g.globalServices[serviceID] for nodeID, node := range g.nodes { meetsConstraints := constraint.NodeMatches(service.constraints, node) ntasks := nodeTasks[serviceID][nodeID] delete(nodeTasks[serviceID], nodeID) if !meetsConstraints { g.shutdownTasks(ctx, batch, ntasks) continue } if node.Spec.Availability == api.NodeAvailabilityPause { // the node is paused, so we won't add or update // any tasks continue } // this node needs to run 1 copy of the task if len(ntasks) == 0 { g.addTask(ctx, batch, service.Service, nodeID) } else { updateTasks = append(updateTasks, ntasks) } } if len(updateTasks) > 0 { updates[service.Service] = updateTasks } // Remove any tasks assigned to nodes not found in g.nodes. // These must be associated with nodes that are drained, or // nodes that no longer exist. for _, ntasks := range nodeTasks[serviceID] { g.shutdownTasks(ctx, batch, ntasks) } } return nil }) if err != nil { log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices transaction failed") } for service, updateTasks := range updates { g.updater.Update(ctx, g.cluster, service, updateTasks) } } // updateNode updates g.nodes based on the current node value func (g *Orchestrator) updateNode(node *api.Node) { if node.Spec.Availability == api.NodeAvailabilityDrain || node.Status.State == api.NodeStatus_DOWN { delete(g.nodes, node.ID) } else { g.nodes[node.ID] = node } } // updateService updates g.globalServices based on the current service value func (g *Orchestrator) updateService(service *api.Service) { var constraints []constraint.Constraint if service.Spec.Task.Placement != nil && len(service.Spec.Task.Placement.Constraints) != 0 { constraints, _ = constraint.Parse(service.Spec.Task.Placement.Constraints) } g.globalServices[service.ID] = globalService{ Service: service, constraints: constraints, } } // reconcileOneNode checks all global services on one node func (g *Orchestrator) reconcileOneNode(ctx context.Context, node *api.Node) { if node.Spec.Availability == api.NodeAvailabilityDrain { log.G(ctx).Debugf("global orchestrator: node %s in drain state, shutting down its tasks", node.ID) g.foreachTaskFromNode(ctx, node, g.shutdownTask) return } if node.Status.State == api.NodeStatus_DOWN { log.G(ctx).Debugf("global orchestrator: node %s is down, shutting down its tasks", node.ID) g.foreachTaskFromNode(ctx, node, g.shutdownTask) return } if node.Spec.Availability == api.NodeAvailabilityPause { // the node is paused, so we won't add or update tasks return } node, exists := g.nodes[node.ID] if !exists { return } // tasks by service tasks := make(map[string][]*api.Task) var ( tasksOnNode []*api.Task err error ) g.store.View(func(tx store.ReadTx) { tasksOnNode, err = store.FindTasks(tx, store.ByNodeID(node.ID)) }) if err != nil { log.G(ctx).WithError(err).Errorf("global orchestrator: reconcile failed finding tasks on node %s", node.ID) return } for serviceID, service := range g.globalServices { for _, t := range tasksOnNode { if t.ServiceID != serviceID { continue } tasks[serviceID] = append(tasks[serviceID], t) } // Keep all runnable instances of this service, // and instances that were not be restarted due // to restart policy but may be updated if the // service spec changed. for serviceID, slot := range tasks { updatable := g.restarts.UpdatableTasksInSlot(ctx, slot, service.Service) if len(updatable) != 0 { tasks[serviceID] = updatable } else { delete(tasks, serviceID) } } } err = g.store.Batch(func(batch *store.Batch) error { for serviceID, service := range g.globalServices { if !constraint.NodeMatches(service.constraints, node) { continue } if len(tasks) == 0 { g.addTask(ctx, batch, service.Service, node.ID) } else { // If task is out of date, update it. This can happen // on node reconciliation if, for example, we pause a // node, update the service, and then activate the node // later. // We don't use g.updater here for two reasons: // - This is not a rolling update. Since it was not // triggered directly by updating the service, it // should not observe the rolling update parameters // or show status in UpdateStatus. // - Calling Update cancels any current rolling updates // for the service, such as one triggered by service // reconciliation. var ( dirtyTasks []*api.Task cleanTasks []*api.Task ) for _, t := range tasks[serviceID] { if orchestrator.IsTaskDirty(service.Service, t) { dirtyTasks = append(dirtyTasks, t) } else { cleanTasks = append(cleanTasks, t) } } if len(cleanTasks) == 0 { g.addTask(ctx, batch, service.Service, node.ID) } else { dirtyTasks = append(dirtyTasks, cleanTasks[1:]...) } g.shutdownTasks(ctx, batch, dirtyTasks) } } return nil }) if err != nil { log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServiceOneNode batch failed") } } func (g *Orchestrator) tickTasks(ctx context.Context) { if len(g.restartTasks) == 0 { return } err := g.store.Batch(func(batch *store.Batch) error { for taskID := range g.restartTasks { err := batch.Update(func(tx store.Tx) error { t := store.GetTask(tx, taskID) if t == nil || t.DesiredState > api.TaskStateRunning { return nil } service := store.GetService(tx, t.ServiceID) if service == nil { return nil } node, nodeExists := g.nodes[t.NodeID] serviceEntry, serviceExists := g.globalServices[t.ServiceID] if !nodeExists || !serviceExists { return nil } if node.Spec.Availability == api.NodeAvailabilityPause || !constraint.NodeMatches(serviceEntry.constraints, node) { t.DesiredState = api.TaskStateShutdown return store.UpdateTask(tx, t) } return g.restarts.Restart(ctx, tx, g.cluster, service, *t) }) if err != nil { log.G(ctx).WithError(err).Errorf("orchestrator restartTask transaction failed") } } return nil }) if err != nil { log.G(ctx).WithError(err).Errorf("global orchestrator: restartTask transaction failed") } g.restartTasks = make(map[string]struct{}) } func (g *Orchestrator) shutdownTask(ctx context.Context, batch *store.Batch, t *api.Task) { // set existing task DesiredState to TaskStateShutdown // TODO(aaronl): optimistic update? err := batch.Update(func(tx store.Tx) error { t = store.GetTask(tx, t.ID) if t != nil && t.DesiredState < api.TaskStateShutdown { t.DesiredState = api.TaskStateShutdown return store.UpdateTask(tx, t) } return nil }) if err != nil { log.G(ctx).WithError(err).Errorf("global orchestrator: shutdownTask failed to shut down %s", t.ID) } } func (g *Orchestrator) addTask(ctx context.Context, batch *store.Batch, service *api.Service, nodeID string) { task := orchestrator.NewTask(g.cluster, service, 0, nodeID) err := batch.Update(func(tx store.Tx) error { if store.GetService(tx, service.ID) == nil { return nil } return store.CreateTask(tx, task) }) if err != nil { log.G(ctx).WithError(err).Errorf("global orchestrator: failed to create task") } } func (g *Orchestrator) shutdownTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) { for _, t := range tasks { g.shutdownTask(ctx, batch, t) } } func (g *Orchestrator) deleteTask(ctx context.Context, batch *store.Batch, t *api.Task) { err := batch.Update(func(tx store.Tx) error { return store.DeleteTask(tx, t.ID) }) if err != nil { log.G(ctx).WithError(err).Errorf("global orchestrator: deleteTask failed to delete %s", t.ID) } } // IsRelatedService returns true if the service should be governed by this orchestrator func (g *Orchestrator) IsRelatedService(service *api.Service) bool { return orchestrator.IsGlobalService(service) } // SlotTuple returns a slot tuple for the global service task. func (g *Orchestrator) SlotTuple(t *api.Task) orchestrator.SlotTuple { return orchestrator.SlotTuple{ ServiceID: t.ServiceID, NodeID: t.NodeID, } } func isTaskCompleted(t *api.Task, restartPolicy api.RestartPolicy_RestartCondition) bool { if t == nil || t.DesiredState <= api.TaskStateRunning { return false } return restartPolicy == api.RestartOnNone || (restartPolicy == api.RestartOnFailure && t.Status.State == api.TaskStateCompleted) }