Browse code

Subsystems can register cleanup handlers with Engine.OnShutdown

Signed-off-by: Solomon Hykes <solomon@docker.com>

Solomon Hykes authored on 2014/08/05 19:00:05
Showing 2 changed files
... ...
@@ -7,6 +7,8 @@ import (
7 7
 	"os"
8 8
 	"sort"
9 9
 	"strings"
10
+	"sync"
11
+	"time"
10 12
 
11 13
 	"github.com/docker/docker/utils"
12 14
 )
... ...
@@ -43,14 +45,18 @@ func unregister(name string) {
43 43
 // It acts as a store for *containers*, and allows manipulation of these
44 44
 // containers by executing *jobs*.
45 45
 type Engine struct {
46
-	handlers map[string]Handler
47
-	catchall Handler
48
-	hack     Hack // data for temporary hackery (see hack.go)
49
-	id       string
50
-	Stdout   io.Writer
51
-	Stderr   io.Writer
52
-	Stdin    io.Reader
53
-	Logging  bool
46
+	handlers   map[string]Handler
47
+	catchall   Handler
48
+	hack       Hack // data for temporary hackery (see hack.go)
49
+	id         string
50
+	Stdout     io.Writer
51
+	Stderr     io.Writer
52
+	Stdin      io.Reader
53
+	Logging    bool
54
+	tasks      sync.WaitGroup
55
+	l          sync.RWMutex // lock for shutdown
56
+	shutdown   bool
57
+	onShutdown []func() // shutdown handlers
54 58
 }
55 59
 
56 60
 func (eng *Engine) Register(name string, handler Handler) error {
... ...
@@ -130,6 +136,68 @@ func (eng *Engine) Job(name string, args ...string) *Job {
130 130
 	return job
131 131
 }
132 132
 
133
+// OnShutdown registers a new callback to be called by Shutdown.
134
+// This is typically used by services to perform cleanup.
135
+func (eng *Engine) OnShutdown(h func()) {
136
+	eng.l.Lock()
137
+	eng.onShutdown = append(eng.onShutdown, h)
138
+	eng.l.Unlock()
139
+}
140
+
141
+// Shutdown permanently shuts down eng as follows:
142
+// - It refuses all new jobs, permanently.
143
+// - It waits for all active jobs to complete (with no timeout)
144
+// - It calls all shutdown handlers concurrently (if any)
145
+// - It returns when all handlers complete, or after 15 seconds,
146
+//	whichever happens first.
147
+func (eng *Engine) Shutdown() {
148
+	eng.l.Lock()
149
+	if eng.shutdown {
150
+		eng.l.Unlock()
151
+		return
152
+	}
153
+	eng.shutdown = true
154
+	eng.l.Unlock()
155
+	// We don't need to protect the rest with a lock, to allow
156
+	// for other calls to immediately fail with "shutdown" instead
157
+	// of hanging for 15 seconds.
158
+	// This requires all concurrent calls to check for shutdown, otherwise
159
+	// it might cause a race.
160
+
161
+	// Wait for all jobs to complete
162
+	eng.tasks.Wait()
163
+
164
+	// Call shutdown handlers, if any.
165
+	// Timeout after 15 seconds.
166
+	var wg sync.WaitGroup
167
+	for _, h := range eng.onShutdown {
168
+		wg.Add(1)
169
+		go func(h func()) {
170
+			defer wg.Done()
171
+			h()
172
+		}(h)
173
+	}
174
+	done := make(chan struct{})
175
+	go func() {
176
+		wg.Wait()
177
+		close(done)
178
+	}()
179
+	select {
180
+	case <-time.After(time.Second * 15):
181
+	case <-done:
182
+	}
183
+	return
184
+}
185
+
186
+// IsShutdown returns true if the engine is in the process
187
+// of shutting down, or already shut down.
188
+// Otherwise it returns false.
189
+func (eng *Engine) IsShutdown() bool {
190
+	eng.l.RLock()
191
+	defer eng.l.RUnlock()
192
+	return eng.shutdown
193
+}
194
+
133 195
 // ParseJob creates a new job from a text description using a shell-like syntax.
134 196
 //
135 197
 // The following syntax is used to parse `input`:
... ...
@@ -47,6 +47,13 @@ const (
47 47
 // If the job returns a failure status, an error is returned
48 48
 // which includes the status.
49 49
 func (job *Job) Run() error {
50
+	if job.Eng.IsShutdown() {
51
+		return fmt.Errorf("engine is shutdown")
52
+	}
53
+	job.Eng.l.Lock()
54
+	job.Eng.tasks.Add(1)
55
+	job.Eng.l.Unlock()
56
+	defer job.Eng.tasks.Done()
50 57
 	// FIXME: make this thread-safe
51 58
 	// FIXME: implement wait
52 59
 	if !job.end.IsZero() {