Browse code

Merge pull request #6199 from icecrime/event_subscription_refactoring

Event subscription refactoring

Tibor Vass authored on 2014/06/18 06:39:28
Showing 4 changed files
... ...
@@ -248,85 +248,63 @@ func (srv *Server) ContainerKill(job *engine.Job) engine.Status {
248 248
 	return engine.StatusOK
249 249
 }
250 250
 
251
-func (srv *Server) EvictListener(from int64) {
252
-	srv.Lock()
253
-	if old, ok := srv.listeners[from]; ok {
254
-		delete(srv.listeners, from)
255
-		close(old)
256
-	}
257
-	srv.Unlock()
258
-}
259
-
260 251
 func (srv *Server) Events(job *engine.Job) engine.Status {
261 252
 	if len(job.Args) != 0 {
262 253
 		return job.Errorf("Usage: %s", job.Name)
263 254
 	}
264 255
 
265 256
 	var (
266
-		from    = time.Now().UTC().UnixNano()
267 257
 		since   = job.GetenvInt64("since")
268 258
 		until   = job.GetenvInt64("until")
269 259
 		timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now()))
270 260
 	)
271
-	sendEvent := func(event *utils.JSONMessage) error {
272
-		b, err := json.Marshal(event)
273
-		if err != nil {
274
-			return fmt.Errorf("JSON error")
275
-		}
276
-		_, err = job.Stdout.Write(b)
277
-		return err
261
+
262
+	// If no until, disable timeout
263
+	if until == 0 {
264
+		timeout.Stop()
278 265
 	}
279 266
 
280 267
 	listener := make(chan utils.JSONMessage)
281
-	srv.Lock()
282
-	if old, ok := srv.listeners[from]; ok {
283
-		delete(srv.listeners, from)
284
-		close(old)
268
+	srv.eventPublisher.Subscribe(listener)
269
+	defer srv.eventPublisher.Unsubscribe(listener)
270
+
271
+	// When sending an event JSON serialization errors are ignored, but all
272
+	// other errors lead to the eviction of the listener.
273
+	sendEvent := func(event *utils.JSONMessage) error {
274
+		if b, err := json.Marshal(event); err == nil {
275
+			if _, err = job.Stdout.Write(b); err != nil {
276
+				return err
277
+			}
278
+		}
279
+		return nil
285 280
 	}
286
-	srv.listeners[from] = listener
287
-	srv.Unlock()
288
-	job.Stdout.Write(nil) // flush
281
+
282
+	job.Stdout.Write(nil)
283
+
284
+	// Resend every event in the [since, until] time interval.
289 285
 	if since != 0 {
290
-		// If since, send previous events that happened after the timestamp and until timestamp
291 286
 		for _, event := range srv.GetEvents() {
292 287
 			if event.Time >= since && (event.Time <= until || until == 0) {
293
-				err := sendEvent(&event)
294
-				if err != nil && err.Error() == "JSON error" {
295
-					continue
296
-				}
297
-				if err != nil {
298
-					// On error, evict the listener
299
-					srv.EvictListener(from)
288
+				if err := sendEvent(&event); err != nil {
300 289
 					return job.Error(err)
301 290
 				}
302 291
 			}
303 292
 		}
304 293
 	}
305 294
 
306
-	// If no until, disable timeout
307
-	if until == 0 {
308
-		timeout.Stop()
309
-	}
310 295
 	for {
311 296
 		select {
312 297
 		case event, ok := <-listener:
313
-			if !ok { // Channel is closed: listener was evicted
298
+			if !ok {
314 299
 				return engine.StatusOK
315 300
 			}
316
-			err := sendEvent(&event)
317
-			if err != nil && err.Error() == "JSON error" {
318
-				continue
319
-			}
320
-			if err != nil {
321
-				// On error, evict the listener
322
-				srv.EvictListener(from)
301
+			if err := sendEvent(&event); err != nil {
323 302
 				return job.Error(err)
324 303
 			}
325 304
 		case <-timeout.C:
326 305
 			return engine.StatusOK
327 306
 		}
328 307
 	}
329
-	return engine.StatusOK
330 308
 }
331 309
 
332 310
 func (srv *Server) ContainerExport(job *engine.Job) engine.Status {
... ...
@@ -797,7 +775,7 @@ func (srv *Server) DockerInfo(job *engine.Job) engine.Status {
797 797
 	v.SetInt("NFd", utils.GetTotalUsedFds())
798 798
 	v.SetInt("NGoroutines", runtime.NumGoroutine())
799 799
 	v.Set("ExecutionDriver", srv.daemon.ExecutionDriver().Name())
800
-	v.SetInt("NEventsListener", len(srv.listeners))
800
+	v.SetInt("NEventsListener", srv.eventPublisher.SubscribersCount())
801 801
 	v.Set("KernelVersion", kernelVersion)
802 802
 	v.Set("IndexServerAddress", registry.IndexServerAddress())
803 803
 	v.Set("InitSha1", dockerversion.INITSHA1)
... ...
@@ -2387,12 +2365,12 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error)
2387 2387
 		return nil, err
2388 2388
 	}
2389 2389
 	srv := &Server{
2390
-		Eng:         eng,
2391
-		daemon:      daemon,
2392
-		pullingPool: make(map[string]chan struct{}),
2393
-		pushingPool: make(map[string]chan struct{}),
2394
-		events:      make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
2395
-		listeners:   make(map[int64]chan utils.JSONMessage),
2390
+		Eng:            eng,
2391
+		daemon:         daemon,
2392
+		pullingPool:    make(map[string]chan struct{}),
2393
+		pushingPool:    make(map[string]chan struct{}),
2394
+		events:         make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
2395
+		eventPublisher: utils.NewJSONMessagePublisher(),
2396 2396
 	}
2397 2397
 	daemon.SetServer(srv)
2398 2398
 	return srv, nil
... ...
@@ -2402,14 +2380,7 @@ func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage {
2402 2402
 	now := time.Now().UTC().Unix()
2403 2403
 	jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now}
2404 2404
 	srv.AddEvent(jm)
2405
-	srv.Lock()
2406
-	for _, c := range srv.listeners {
2407
-		select { // non blocking channel
2408
-		case c <- jm:
2409
-		default:
2410
-		}
2411
-	}
2412
-	srv.Unlock()
2405
+	srv.eventPublisher.Publish(jm)
2413 2406
 	return &jm
2414 2407
 }
2415 2408
 
... ...
@@ -2461,12 +2432,12 @@ func (srv *Server) Close() error {
2461 2461
 
2462 2462
 type Server struct {
2463 2463
 	sync.RWMutex
2464
-	daemon      *daemon.Daemon
2465
-	pullingPool map[string]chan struct{}
2466
-	pushingPool map[string]chan struct{}
2467
-	events      []utils.JSONMessage
2468
-	listeners   map[int64]chan utils.JSONMessage
2469
-	Eng         *engine.Engine
2470
-	running     bool
2471
-	tasks       sync.WaitGroup
2464
+	daemon         *daemon.Daemon
2465
+	pullingPool    map[string]chan struct{}
2466
+	pushingPool    map[string]chan struct{}
2467
+	events         []utils.JSONMessage
2468
+	eventPublisher *utils.JSONMessagePublisher
2469
+	Eng            *engine.Engine
2470
+	running        bool
2471
+	tasks          sync.WaitGroup
2472 2472
 }
... ...
@@ -47,16 +47,14 @@ func TestPools(t *testing.T) {
47 47
 
48 48
 func TestLogEvent(t *testing.T) {
49 49
 	srv := &Server{
50
-		events:    make([]utils.JSONMessage, 0, 64),
51
-		listeners: make(map[int64]chan utils.JSONMessage),
50
+		events:         make([]utils.JSONMessage, 0, 64),
51
+		eventPublisher: utils.NewJSONMessagePublisher(),
52 52
 	}
53 53
 
54 54
 	srv.LogEvent("fakeaction", "fakeid", "fakeimage")
55 55
 
56 56
 	listener := make(chan utils.JSONMessage)
57
-	srv.Lock()
58
-	srv.listeners[1337] = listener
59
-	srv.Unlock()
57
+	srv.eventPublisher.Subscribe(listener)
60 58
 
61 59
 	srv.LogEvent("fakeaction2", "fakeid", "fakeimage")
62 60
 
63 61
new file mode 100644
... ...
@@ -0,0 +1,61 @@
0
+package utils
1
+
2
+import (
3
+	"sync"
4
+	"time"
5
+)
6
+
7
+func NewJSONMessagePublisher() *JSONMessagePublisher {
8
+	return &JSONMessagePublisher{}
9
+}
10
+
11
+type JSONMessageListener chan<- JSONMessage
12
+
13
+type JSONMessagePublisher struct {
14
+	m           sync.RWMutex
15
+	subscribers []JSONMessageListener
16
+}
17
+
18
+func (p *JSONMessagePublisher) Subscribe(l JSONMessageListener) {
19
+	p.m.Lock()
20
+	p.subscribers = append(p.subscribers, l)
21
+	p.m.Unlock()
22
+}
23
+
24
+func (p *JSONMessagePublisher) SubscribersCount() int {
25
+	p.m.RLock()
26
+	count := len(p.subscribers)
27
+	p.m.RUnlock()
28
+	return count
29
+}
30
+
31
+// Unsubscribe closes and removes the specified listener from the list of
32
+// previously registed ones.
33
+// It returns a boolean value indicating if the listener was successfully
34
+// found, closed and unregistered.
35
+func (p *JSONMessagePublisher) Unsubscribe(l JSONMessageListener) bool {
36
+	p.m.Lock()
37
+	defer p.m.Unlock()
38
+
39
+	for i, subscriber := range p.subscribers {
40
+		if subscriber == l {
41
+			close(l)
42
+			p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
43
+			return true
44
+		}
45
+	}
46
+	return false
47
+}
48
+
49
+func (p *JSONMessagePublisher) Publish(m JSONMessage) {
50
+	p.m.RLock()
51
+	for _, subscriber := range p.subscribers {
52
+		// We give each subscriber a 100ms time window to receive the event,
53
+		// after which we move to the next.
54
+		select {
55
+		case subscriber <- m:
56
+		case <-time.After(100 * time.Millisecond):
57
+		}
58
+	}
59
+	p.m.RUnlock()
60
+}
0 61
new file mode 100644
... ...
@@ -0,0 +1,73 @@
0
+package utils
1
+
2
+import (
3
+	"testing"
4
+	"time"
5
+)
6
+
7
+func assertSubscribersCount(t *testing.T, q *JSONMessagePublisher, expected int) {
8
+	if q.SubscribersCount() != expected {
9
+		t.Fatalf("Expected %d registered subscribers, got %d", expected, q.SubscribersCount())
10
+	}
11
+}
12
+
13
+func TestJSONMessagePublisherSubscription(t *testing.T) {
14
+	q := NewJSONMessagePublisher()
15
+	l1 := make(chan JSONMessage)
16
+	l2 := make(chan JSONMessage)
17
+
18
+	assertSubscribersCount(t, q, 0)
19
+	q.Subscribe(l1)
20
+	assertSubscribersCount(t, q, 1)
21
+	q.Subscribe(l2)
22
+	assertSubscribersCount(t, q, 2)
23
+
24
+	q.Unsubscribe(l1)
25
+	q.Unsubscribe(l2)
26
+	assertSubscribersCount(t, q, 0)
27
+}
28
+
29
+func TestJSONMessagePublisherPublish(t *testing.T) {
30
+	q := NewJSONMessagePublisher()
31
+	l1 := make(chan JSONMessage)
32
+	l2 := make(chan JSONMessage)
33
+
34
+	go func() {
35
+		for {
36
+			select {
37
+			case <-l1:
38
+				close(l1)
39
+				l1 = nil
40
+			case <-l2:
41
+				close(l2)
42
+				l2 = nil
43
+			case <-time.After(1 * time.Second):
44
+				q.Unsubscribe(l1)
45
+				q.Unsubscribe(l2)
46
+				t.Fatal("Timeout waiting for broadcasted message")
47
+			}
48
+		}
49
+	}()
50
+
51
+	q.Subscribe(l1)
52
+	q.Subscribe(l2)
53
+	q.Publish(JSONMessage{})
54
+}
55
+
56
+func TestJSONMessagePublishTimeout(t *testing.T) {
57
+	q := NewJSONMessagePublisher()
58
+	l := make(chan JSONMessage)
59
+	q.Subscribe(l)
60
+
61
+	c := make(chan struct{})
62
+	go func() {
63
+		q.Publish(JSONMessage{})
64
+		close(c)
65
+	}()
66
+
67
+	select {
68
+	case <-c:
69
+	case <-time.After(time.Second):
70
+		t.Fatal("Timeout publishing message")
71
+	}
72
+}