Browse code

Refactor events publishing

Events subscription (/events API endpoint) attributes pseudo-unique identifiers to incoming subscribers: originally its host, then its subscription time. This is unecessary and leads to code complexity.

Introduce a JSONMessagePublisher to provide simple pub/sub mechanism for JSONMessage, and rely on this new type to publish events to all subscribed listeners. The original logic is kept for the 'since' and 'until' parameters, and for client disconnection handling.

Docker-DCO-1.1-Signed-off-by: Arnaud Porterie <icecrime@gmail.com> (github: icecrime)

Arnaud Porterie authored on 2014/06/05 03:47:09
Showing 4 changed files
... ...
@@ -248,85 +248,63 @@ func (srv *Server) ContainerKill(job *engine.Job) engine.Status {
248 248
 	return engine.StatusOK
249 249
 }
250 250
 
251
-func (srv *Server) EvictListener(from int64) {
252
-	srv.Lock()
253
-	if old, ok := srv.listeners[from]; ok {
254
-		delete(srv.listeners, from)
255
-		close(old)
256
-	}
257
-	srv.Unlock()
258
-}
259
-
260 251
 func (srv *Server) Events(job *engine.Job) engine.Status {
261 252
 	if len(job.Args) != 0 {
262 253
 		return job.Errorf("Usage: %s", job.Name)
263 254
 	}
264 255
 
265 256
 	var (
266
-		from    = time.Now().UTC().UnixNano()
267 257
 		since   = job.GetenvInt64("since")
268 258
 		until   = job.GetenvInt64("until")
269 259
 		timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now()))
270 260
 	)
271
-	sendEvent := func(event *utils.JSONMessage) error {
272
-		b, err := json.Marshal(event)
273
-		if err != nil {
274
-			return fmt.Errorf("JSON error")
275
-		}
276
-		_, err = job.Stdout.Write(b)
277
-		return err
261
+
262
+	// If no until, disable timeout
263
+	if until == 0 {
264
+		timeout.Stop()
278 265
 	}
279 266
 
280 267
 	listener := make(chan utils.JSONMessage)
281
-	srv.Lock()
282
-	if old, ok := srv.listeners[from]; ok {
283
-		delete(srv.listeners, from)
284
-		close(old)
268
+	srv.eventPublisher.Subscribe(listener)
269
+	defer srv.eventPublisher.Unsubscribe(listener)
270
+
271
+	// When sending an event JSON serialization errors are ignored, but all
272
+	// other errors lead to the eviction of the listener.
273
+	sendEvent := func(event *utils.JSONMessage) error {
274
+		if b, err := json.Marshal(event); err == nil {
275
+			if _, err = job.Stdout.Write(b); err != nil {
276
+				return err
277
+			}
278
+		}
279
+		return nil
285 280
 	}
286
-	srv.listeners[from] = listener
287
-	srv.Unlock()
288
-	job.Stdout.Write(nil) // flush
281
+
282
+	job.Stdout.Write(nil)
283
+
284
+	// Resend every event in the [since, until] time interval.
289 285
 	if since != 0 {
290
-		// If since, send previous events that happened after the timestamp and until timestamp
291 286
 		for _, event := range srv.GetEvents() {
292 287
 			if event.Time >= since && (event.Time <= until || until == 0) {
293
-				err := sendEvent(&event)
294
-				if err != nil && err.Error() == "JSON error" {
295
-					continue
296
-				}
297
-				if err != nil {
298
-					// On error, evict the listener
299
-					srv.EvictListener(from)
288
+				if err := sendEvent(&event); err != nil {
300 289
 					return job.Error(err)
301 290
 				}
302 291
 			}
303 292
 		}
304 293
 	}
305 294
 
306
-	// If no until, disable timeout
307
-	if until == 0 {
308
-		timeout.Stop()
309
-	}
310 295
 	for {
311 296
 		select {
312 297
 		case event, ok := <-listener:
313
-			if !ok { // Channel is closed: listener was evicted
298
+			if !ok {
314 299
 				return engine.StatusOK
315 300
 			}
316
-			err := sendEvent(&event)
317
-			if err != nil && err.Error() == "JSON error" {
318
-				continue
319
-			}
320
-			if err != nil {
321
-				// On error, evict the listener
322
-				srv.EvictListener(from)
301
+			if err := sendEvent(&event); err != nil {
323 302
 				return job.Error(err)
324 303
 			}
325 304
 		case <-timeout.C:
326 305
 			return engine.StatusOK
327 306
 		}
328 307
 	}
329
-	return engine.StatusOK
330 308
 }
331 309
 
332 310
 func (srv *Server) ContainerExport(job *engine.Job) engine.Status {
... ...
@@ -797,7 +775,7 @@ func (srv *Server) DockerInfo(job *engine.Job) engine.Status {
797 797
 	v.SetInt("NFd", utils.GetTotalUsedFds())
798 798
 	v.SetInt("NGoroutines", runtime.NumGoroutine())
799 799
 	v.Set("ExecutionDriver", srv.daemon.ExecutionDriver().Name())
800
-	v.SetInt("NEventsListener", len(srv.listeners))
800
+	v.SetInt("NEventsListener", srv.eventPublisher.SubscribersCount())
801 801
 	v.Set("KernelVersion", kernelVersion)
802 802
 	v.Set("IndexServerAddress", registry.IndexServerAddress())
803 803
 	v.Set("InitSha1", dockerversion.INITSHA1)
... ...
@@ -2387,12 +2365,12 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error)
2387 2387
 		return nil, err
2388 2388
 	}
2389 2389
 	srv := &Server{
2390
-		Eng:         eng,
2391
-		daemon:      daemon,
2392
-		pullingPool: make(map[string]chan struct{}),
2393
-		pushingPool: make(map[string]chan struct{}),
2394
-		events:      make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
2395
-		listeners:   make(map[int64]chan utils.JSONMessage),
2390
+		Eng:            eng,
2391
+		daemon:         daemon,
2392
+		pullingPool:    make(map[string]chan struct{}),
2393
+		pushingPool:    make(map[string]chan struct{}),
2394
+		events:         make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
2395
+		eventPublisher: utils.NewJSONMessagePublisher(),
2396 2396
 	}
2397 2397
 	daemon.SetServer(srv)
2398 2398
 	return srv, nil
... ...
@@ -2402,14 +2380,7 @@ func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage {
2402 2402
 	now := time.Now().UTC().Unix()
2403 2403
 	jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now}
2404 2404
 	srv.AddEvent(jm)
2405
-	srv.Lock()
2406
-	for _, c := range srv.listeners {
2407
-		select { // non blocking channel
2408
-		case c <- jm:
2409
-		default:
2410
-		}
2411
-	}
2412
-	srv.Unlock()
2405
+	srv.eventPublisher.Publish(jm)
2413 2406
 	return &jm
2414 2407
 }
2415 2408
 
... ...
@@ -2461,12 +2432,12 @@ func (srv *Server) Close() error {
2461 2461
 
2462 2462
 type Server struct {
2463 2463
 	sync.RWMutex
2464
-	daemon      *daemon.Daemon
2465
-	pullingPool map[string]chan struct{}
2466
-	pushingPool map[string]chan struct{}
2467
-	events      []utils.JSONMessage
2468
-	listeners   map[int64]chan utils.JSONMessage
2469
-	Eng         *engine.Engine
2470
-	running     bool
2471
-	tasks       sync.WaitGroup
2464
+	daemon         *daemon.Daemon
2465
+	pullingPool    map[string]chan struct{}
2466
+	pushingPool    map[string]chan struct{}
2467
+	events         []utils.JSONMessage
2468
+	eventPublisher *utils.JSONMessagePublisher
2469
+	Eng            *engine.Engine
2470
+	running        bool
2471
+	tasks          sync.WaitGroup
2472 2472
 }
... ...
@@ -47,16 +47,14 @@ func TestPools(t *testing.T) {
47 47
 
48 48
 func TestLogEvent(t *testing.T) {
49 49
 	srv := &Server{
50
-		events:    make([]utils.JSONMessage, 0, 64),
51
-		listeners: make(map[int64]chan utils.JSONMessage),
50
+		events:         make([]utils.JSONMessage, 0, 64),
51
+		eventPublisher: utils.NewJSONMessagePublisher(),
52 52
 	}
53 53
 
54 54
 	srv.LogEvent("fakeaction", "fakeid", "fakeimage")
55 55
 
56 56
 	listener := make(chan utils.JSONMessage)
57
-	srv.Lock()
58
-	srv.listeners[1337] = listener
59
-	srv.Unlock()
57
+	srv.eventPublisher.Subscribe(listener)
60 58
 
61 59
 	srv.LogEvent("fakeaction2", "fakeid", "fakeimage")
62 60
 
63 61
new file mode 100644
... ...
@@ -0,0 +1,61 @@
0
+package utils
1
+
2
+import (
3
+	"sync"
4
+	"time"
5
+)
6
+
7
+func NewJSONMessagePublisher() *JSONMessagePublisher {
8
+	return &JSONMessagePublisher{}
9
+}
10
+
11
+type JSONMessageListener chan<- JSONMessage
12
+
13
+type JSONMessagePublisher struct {
14
+	m           sync.RWMutex
15
+	subscribers []JSONMessageListener
16
+}
17
+
18
+func (p *JSONMessagePublisher) Subscribe(l JSONMessageListener) {
19
+	p.m.Lock()
20
+	p.subscribers = append(p.subscribers, l)
21
+	p.m.Unlock()
22
+}
23
+
24
+func (p *JSONMessagePublisher) SubscribersCount() int {
25
+	p.m.RLock()
26
+	count := len(p.subscribers)
27
+	p.m.RUnlock()
28
+	return count
29
+}
30
+
31
+// Unsubscribe closes and removes the specified listener from the list of
32
+// previously registed ones.
33
+// It returns a boolean value indicating if the listener was successfully
34
+// found, closed and unregistered.
35
+func (p *JSONMessagePublisher) Unsubscribe(l JSONMessageListener) bool {
36
+	p.m.Lock()
37
+	defer p.m.Unlock()
38
+
39
+	for i, subscriber := range p.subscribers {
40
+		if subscriber == l {
41
+			close(l)
42
+			p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
43
+			return true
44
+		}
45
+	}
46
+	return false
47
+}
48
+
49
+func (p *JSONMessagePublisher) Publish(m JSONMessage) {
50
+	p.m.RLock()
51
+	for _, subscriber := range p.subscribers {
52
+		// We give each subscriber a 100ms time window to receive the event,
53
+		// after which we move to the next.
54
+		select {
55
+		case subscriber <- m:
56
+		case <-time.After(100 * time.Millisecond):
57
+		}
58
+	}
59
+	p.m.RUnlock()
60
+}
0 61
new file mode 100644
... ...
@@ -0,0 +1,73 @@
0
+package utils
1
+
2
+import (
3
+	"testing"
4
+	"time"
5
+)
6
+
7
+func assertSubscribersCount(t *testing.T, q *JSONMessagePublisher, expected int) {
8
+	if q.SubscribersCount() != expected {
9
+		t.Fatalf("Expected %d registered subscribers, got %d", expected, q.SubscribersCount())
10
+	}
11
+}
12
+
13
+func TestJSONMessagePublisherSubscription(t *testing.T) {
14
+	q := NewJSONMessagePublisher()
15
+	l1 := make(chan JSONMessage)
16
+	l2 := make(chan JSONMessage)
17
+
18
+	assertSubscribersCount(t, q, 0)
19
+	q.Subscribe(l1)
20
+	assertSubscribersCount(t, q, 1)
21
+	q.Subscribe(l2)
22
+	assertSubscribersCount(t, q, 2)
23
+
24
+	q.Unsubscribe(l1)
25
+	q.Unsubscribe(l2)
26
+	assertSubscribersCount(t, q, 0)
27
+}
28
+
29
+func TestJSONMessagePublisherPublish(t *testing.T) {
30
+	q := NewJSONMessagePublisher()
31
+	l1 := make(chan JSONMessage)
32
+	l2 := make(chan JSONMessage)
33
+
34
+	go func() {
35
+		for {
36
+			select {
37
+			case <-l1:
38
+				close(l1)
39
+				l1 = nil
40
+			case <-l2:
41
+				close(l2)
42
+				l2 = nil
43
+			case <-time.After(1 * time.Second):
44
+				q.Unsubscribe(l1)
45
+				q.Unsubscribe(l2)
46
+				t.Fatal("Timeout waiting for broadcasted message")
47
+			}
48
+		}
49
+	}()
50
+
51
+	q.Subscribe(l1)
52
+	q.Subscribe(l2)
53
+	q.Publish(JSONMessage{})
54
+}
55
+
56
+func TestJSONMessagePublishTimeout(t *testing.T) {
57
+	q := NewJSONMessagePublisher()
58
+	l := make(chan JSONMessage)
59
+	q.Subscribe(l)
60
+
61
+	c := make(chan struct{})
62
+	go func() {
63
+		q.Publish(JSONMessage{})
64
+		close(c)
65
+	}()
66
+
67
+	select {
68
+	case <-c:
69
+	case <-time.After(time.Second):
70
+		t.Fatal("Timeout publishing message")
71
+	}
72
+}