Browse code

Add PubSub topics.

A TopicFunc is an interface to let the pubisher decide whether it needs
to send a message to a subscriber or not. It returns true if the
publisher must send the message and false otherwise.

Users of the pubsub package can create a subscriber with a topic
function by calling `pubsub.SubscribeTopic`.

Message delivery has also been modified to use concurrent channels per
subscriber. That way, topic verification and message delivery is not
o(N+M) anymore, based on the number of subscribers and topic verification
complexity.

Using pubsub topics, the API stops controlling the message delivery,
delegating that function to a topic generated with the filtering
provided by the user. The publisher sends every message to the
subscriber if there is no filter, but the api doesn't have to select
messages to return anymore.

Signed-off-by: David Calavera <david.calavera@gmail.com>

David Calavera authored on 2015/11/26 11:03:10
Showing 5 changed files
... ...
@@ -92,27 +92,11 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R
92 92
 
93 93
 	enc := json.NewEncoder(output)
94 94
 
95
-	current, l, cancel := s.daemon.SubscribeToEvents()
96
-	defer cancel()
95
+	buffered, l := s.daemon.SubscribeToEvents(since, sinceNano, ef)
96
+	defer s.daemon.UnsubscribeFromEvents(l)
97 97
 
98
-	eventFilter := s.daemon.GetEventFilter(ef)
99
-	handleEvent := func(ev *jsonmessage.JSONMessage) error {
100
-		if eventFilter.Include(ev) {
101
-			if err := enc.Encode(ev); err != nil {
102
-				return err
103
-			}
104
-		}
105
-		return nil
106
-	}
107
-
108
-	if since == -1 {
109
-		current = nil
110
-	}
111
-	for _, ev := range current {
112
-		if ev.Time < since || ((ev.Time == since) && (ev.TimeNano < sinceNano)) {
113
-			continue
114
-		}
115
-		if err := handleEvent(ev); err != nil {
98
+	for _, ev := range buffered {
99
+		if err := enc.Encode(ev); err != nil {
116 100
 			return err
117 101
 		}
118 102
 	}
... ...
@@ -129,7 +113,7 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R
129 129
 			if !ok {
130 130
 				continue
131 131
 			}
132
-			if err := handleEvent(jev); err != nil {
132
+			if err := enc.Encode(jev); err != nil {
133 133
 				return err
134 134
 			}
135 135
 		case <-timer.C:
... ...
@@ -532,8 +532,8 @@ func (daemon *Daemon) GetByName(name string) (*Container, error) {
532 532
 	return e, nil
533 533
 }
534 534
 
535
-// GetEventFilter returns a filters.Filter for a set of filters
536
-func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter {
535
+// getEventFilter returns a filters.Filter for a set of filters
536
+func (daemon *Daemon) getEventFilter(filter filters.Args) *events.Filter {
537 537
 	// incoming container filter can be name, id or partial id, convert to
538 538
 	// a full container id
539 539
 	for _, cn := range filter.Get("container") {
... ...
@@ -547,8 +547,15 @@ func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter {
547 547
 }
548 548
 
549 549
 // SubscribeToEvents returns the currently record of events, a channel to stream new events from, and a function to cancel the stream of events.
550
-func (daemon *Daemon) SubscribeToEvents() ([]*jsonmessage.JSONMessage, chan interface{}, func()) {
551
-	return daemon.EventsService.Subscribe()
550
+func (daemon *Daemon) SubscribeToEvents(since, sinceNano int64, filter filters.Args) ([]*jsonmessage.JSONMessage, chan interface{}) {
551
+	ef := daemon.getEventFilter(filter)
552
+	return daemon.EventsService.SubscribeTopic(since, sinceNano, ef)
553
+}
554
+
555
+// UnsubscribeFromEvents stops the event subscription for a client by closing the
556
+// channel where the daemon sends events to.
557
+func (daemon *Daemon) UnsubscribeFromEvents(listener chan interface{}) {
558
+	daemon.EventsService.Evict(listener)
552 559
 }
553 560
 
554 561
 // GetLabels for a container or image id
... ...
@@ -8,7 +8,10 @@ import (
8 8
 	"github.com/docker/docker/pkg/pubsub"
9 9
 )
10 10
 
11
-const eventsLimit = 64
11
+const (
12
+	eventsLimit = 64
13
+	bufferSize  = 1024
14
+)
12 15
 
13 16
 // Events is pubsub channel for *jsonmessage.JSONMessage
14 17
 type Events struct {
... ...
@@ -21,7 +24,7 @@ type Events struct {
21 21
 func New() *Events {
22 22
 	return &Events{
23 23
 		events: make([]*jsonmessage.JSONMessage, 0, eventsLimit),
24
-		pub:    pubsub.NewPublisher(100*time.Millisecond, 1024),
24
+		pub:    pubsub.NewPublisher(100*time.Millisecond, bufferSize),
25 25
 	}
26 26
 }
27 27
 
... ...
@@ -42,6 +45,41 @@ func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}, func
42 42
 	return current, l, cancel
43 43
 }
44 44
 
45
+// SubscribeTopic adds new listener to events, returns slice of 64 stored
46
+// last events, a channel in which you can expect new events (in form
47
+// of interface{}, so you need type assertion).
48
+func (e *Events) SubscribeTopic(since, sinceNano int64, ef *Filter) ([]*jsonmessage.JSONMessage, chan interface{}) {
49
+	e.mu.Lock()
50
+	defer e.mu.Unlock()
51
+
52
+	var buffered []*jsonmessage.JSONMessage
53
+	topic := func(m interface{}) bool {
54
+		return ef.Include(m.(*jsonmessage.JSONMessage))
55
+	}
56
+
57
+	if since != -1 {
58
+		for i := len(e.events) - 1; i >= 0; i-- {
59
+			ev := e.events[i]
60
+			if ev.Time < since || ((ev.Time == since) && (ev.TimeNano < sinceNano)) {
61
+				break
62
+			}
63
+			if ef.filter.Len() == 0 || topic(ev) {
64
+				buffered = append([]*jsonmessage.JSONMessage{ev}, buffered...)
65
+			}
66
+		}
67
+	}
68
+
69
+	var ch chan interface{}
70
+	if ef.filter.Len() > 0 {
71
+		ch = e.pub.SubscribeTopic(topic)
72
+	} else {
73
+		// Subscribe to all events if there are no filters
74
+		ch = e.pub.Subscribe()
75
+	}
76
+
77
+	return buffered, ch
78
+}
79
+
45 80
 // Evict evicts listener from pubsub
46 81
 func (e *Events) Evict(l chan interface{}) {
47 82
 	e.pub.Evict(l)
... ...
@@ -1891,9 +1891,7 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) {
1891 1891
 
1892 1892
 	startEpoch := daemonTime(c).Unix()
1893 1893
 	// Watch for events since epoch.
1894
-	eventsCmd := exec.Command(
1895
-		dockerBinary, "events",
1896
-		"--since", strconv.FormatInt(startEpoch, 10))
1894
+	eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(startEpoch, 10))
1897 1895
 	stdout, err := eventsCmd.StdoutPipe()
1898 1896
 	if err != nil {
1899 1897
 		c.Fatal(err)
... ...
@@ -1932,12 +1930,12 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) {
1932 1932
 		c.Fatalf("failed to run build: %s", err)
1933 1933
 	}
1934 1934
 
1935
-	matchCID := regexp.MustCompile("Running in ")
1935
+	matchCID := regexp.MustCompile("Running in (.+)")
1936 1936
 	scanner := bufio.NewScanner(stdoutBuild)
1937 1937
 	for scanner.Scan() {
1938 1938
 		line := scanner.Text()
1939
-		if ok := matchCID.MatchString(line); ok {
1940
-			containerID <- line[len(line)-12:]
1939
+		if matches := matchCID.FindStringSubmatch(line); len(matches) > 0 {
1940
+			containerID <- matches[1]
1941 1941
 			break
1942 1942
 		}
1943 1943
 	}
... ...
@@ -13,11 +13,12 @@ func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
13 13
 	return &Publisher{
14 14
 		buffer:      buffer,
15 15
 		timeout:     publishTimeout,
16
-		subscribers: make(map[subscriber]struct{}),
16
+		subscribers: make(map[subscriber]topicFunc),
17 17
 	}
18 18
 }
19 19
 
20 20
 type subscriber chan interface{}
21
+type topicFunc func(v interface{}) bool
21 22
 
22 23
 // Publisher is basic pub/sub structure. Allows to send events and subscribe
23 24
 // to them. Can be safely used from multiple goroutines.
... ...
@@ -25,7 +26,7 @@ type Publisher struct {
25 25
 	m           sync.RWMutex
26 26
 	buffer      int
27 27
 	timeout     time.Duration
28
-	subscribers map[subscriber]struct{}
28
+	subscribers map[subscriber]topicFunc
29 29
 }
30 30
 
31 31
 // Len returns the number of subscribers for the publisher
... ...
@@ -38,9 +39,14 @@ func (p *Publisher) Len() int {
38 38
 
39 39
 // Subscribe adds a new subscriber to the publisher returning the channel.
40 40
 func (p *Publisher) Subscribe() chan interface{} {
41
+	return p.SubscribeTopic(nil)
42
+}
43
+
44
+// SubscribeTopic adds a new subscriber that filters messages sent by a topic.
45
+func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
41 46
 	ch := make(chan interface{}, p.buffer)
42 47
 	p.m.Lock()
43
-	p.subscribers[ch] = struct{}{}
48
+	p.subscribers[ch] = topic
44 49
 	p.m.Unlock()
45 50
 	return ch
46 51
 }
... ...
@@ -56,20 +62,13 @@ func (p *Publisher) Evict(sub chan interface{}) {
56 56
 // Publish sends the data in v to all subscribers currently registered with the publisher.
57 57
 func (p *Publisher) Publish(v interface{}) {
58 58
 	p.m.RLock()
59
-	for sub := range p.subscribers {
60
-		// send under a select as to not block if the receiver is unavailable
61
-		if p.timeout > 0 {
62
-			select {
63
-			case sub <- v:
64
-			case <-time.After(p.timeout):
65
-			}
66
-			continue
67
-		}
68
-		select {
69
-		case sub <- v:
70
-		default:
71
-		}
59
+	wg := new(sync.WaitGroup)
60
+	for sub, topic := range p.subscribers {
61
+		wg.Add(1)
62
+
63
+		go p.sendTopic(sub, topic, v, wg)
72 64
 	}
65
+	wg.Wait()
73 66
 	p.m.RUnlock()
74 67
 }
75 68
 
... ...
@@ -82,3 +81,24 @@ func (p *Publisher) Close() {
82 82
 	}
83 83
 	p.m.Unlock()
84 84
 }
85
+
86
+func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
87
+	defer wg.Done()
88
+	if topic != nil && !topic(v) {
89
+		return
90
+	}
91
+
92
+	// send under a select as to not block if the receiver is unavailable
93
+	if p.timeout > 0 {
94
+		select {
95
+		case sub <- v:
96
+		case <-time.After(p.timeout):
97
+		}
98
+		return
99
+	}
100
+
101
+	select {
102
+	case sub <- v:
103
+	default:
104
+	}
105
+}