Browse code

refactor access to daemon member EventsService

Signed-off-by: Morgan Bauer <mbauer@us.ibm.com>

Morgan Bauer authored on 2015/10/13 03:54:46
Showing 4 changed files
... ...
@@ -88,12 +88,11 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R
88 88
 	}
89 89
 
90 90
 	enc := buildOutputEncoder(w)
91
-	d := s.daemon
92
-	es := d.EventsService
93
-	current, l := es.Subscribe()
94
-	defer es.Evict(l)
95 91
 
96
-	eventFilter := d.GetEventFilter(ef)
92
+	current, l, cancel := s.daemon.SubscribeToEvents()
93
+	defer cancel()
94
+
95
+	eventFilter := s.daemon.GetEventFilter(ef)
97 96
 	handleEvent := func(ev *jsonmessage.JSONMessage) error {
98 97
 		if eventFilter.Include(ev) {
99 98
 			if err := enc.Encode(ev); err != nil {
... ...
@@ -39,6 +39,7 @@ import (
39 39
 	"github.com/docker/docker/pkg/graphdb"
40 40
 	"github.com/docker/docker/pkg/idtools"
41 41
 	"github.com/docker/docker/pkg/ioutils"
42
+	"github.com/docker/docker/pkg/jsonmessage"
42 43
 	"github.com/docker/docker/pkg/namesgenerator"
43 44
 	"github.com/docker/docker/pkg/nat"
44 45
 	"github.com/docker/docker/pkg/parsers/filters"
... ...
@@ -548,6 +549,11 @@ func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter {
548 548
 	return events.NewFilter(filter, daemon.GetLabels)
549 549
 }
550 550
 
551
+// SubscribeToEvents returns the currently record of events, a channel to stream new events from, and a function to cancel the stream of events.
552
+func (daemon *Daemon) SubscribeToEvents() ([]*jsonmessage.JSONMessage, chan interface{}, func()) {
553
+	return daemon.EventsService.Subscribe()
554
+}
555
+
551 556
 // GetLabels for a container or image id
552 557
 func (daemon *Daemon) GetLabels(id string) map[string]string {
553 558
 	// TODO: TestCase
... ...
@@ -25,16 +25,21 @@ func New() *Events {
25 25
 	}
26 26
 }
27 27
 
28
-// Subscribe adds new listener to events, returns slice of 64 stored last events
29
-// channel in which you can expect new events in form of interface{}, so you
30
-// need type assertion.
31
-func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}) {
28
+// Subscribe adds new listener to events, returns slice of 64 stored
29
+// last events, a channel in which you can expect new events (in form
30
+// of interface{}, so you need type assertion), and a function to call
31
+// to stop the stream of events.
32
+func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}, func()) {
32 33
 	e.mu.Lock()
33 34
 	current := make([]*jsonmessage.JSONMessage, len(e.events))
34 35
 	copy(current, e.events)
35 36
 	l := e.pub.Subscribe()
36 37
 	e.mu.Unlock()
37
-	return current, l
38
+
39
+	cancel := func() {
40
+		e.Evict(l)
41
+	}
42
+	return current, l, cancel
38 43
 }
39 44
 
40 45
 // Evict evicts listener from pubsub
... ...
@@ -10,8 +10,8 @@ import (
10 10
 
11 11
 func TestEventsLog(t *testing.T) {
12 12
 	e := New()
13
-	_, l1 := e.Subscribe()
14
-	_, l2 := e.Subscribe()
13
+	_, l1, _ := e.Subscribe()
14
+	_, l2, _ := e.Subscribe()
15 15
 	defer e.Evict(l1)
16 16
 	defer e.Evict(l2)
17 17
 	count := e.SubscribersCount()
... ...
@@ -65,7 +65,7 @@ func TestEventsLog(t *testing.T) {
65 65
 
66 66
 func TestEventsLogTimeout(t *testing.T) {
67 67
 	e := New()
68
-	_, l := e.Subscribe()
68
+	_, l, _ := e.Subscribe()
69 69
 	defer e.Evict(l)
70 70
 
71 71
 	c := make(chan struct{})
... ...
@@ -91,7 +91,7 @@ func TestLogEvents(t *testing.T) {
91 91
 		e.Log(action, id, from)
92 92
 	}
93 93
 	time.Sleep(50 * time.Millisecond)
94
-	current, l := e.Subscribe()
94
+	current, l, _ := e.Subscribe()
95 95
 	for i := 0; i < 10; i++ {
96 96
 		num := i + eventsLimit + 16
97 97
 		action := fmt.Sprintf("action_%d", num)