Browse code

Remove publisher if no one is listening

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Michael Crosby authored on 2015/01/21 04:37:50
Showing 3 changed files
... ...
@@ -68,6 +68,9 @@ func (s *statsCollector) unsubscribe(c *Container, ch chan interface{}) {
68 68
 	publisher := s.publishers[c]
69 69
 	if publisher != nil {
70 70
 		publisher.Evict(ch)
71
+		if publisher.Len() == 0 {
72
+			delete(s.publishers, c)
73
+		}
71 74
 	}
72 75
 	s.m.Unlock()
73 76
 }
... ...
@@ -274,10 +274,10 @@ func TestGetContainerStats(t *testing.T) {
274 274
 		t.Fatalf("GET containers/stats sockRequest failed: %v", err)
275 275
 	}
276 276
 
277
+	dec := json.NewDecoder(bytes.NewBuffer(body))
277 278
 	var s *stats.Stats
278
-	if err := json.Unmarshal(body, &s); err != nil {
279
+	if err := dec.Decode(&s); err != nil {
279 280
 		t.Fatal(err)
280 281
 	}
281
-
282 282
 	logDone("container REST API - check GET containers/stats")
283 283
 }
... ...
@@ -26,6 +26,14 @@ type Publisher struct {
26 26
 	subscribers map[subscriber]struct{}
27 27
 }
28 28
 
29
+// Len returns the number of subscribers for the publisher
30
+func (p *Publisher) Len() int {
31
+	p.m.RLock()
32
+	i := len(p.subscribers)
33
+	p.m.RUnlock()
34
+	return i
35
+}
36
+
29 37
 // Subscribe adds a new subscriber to the publisher returning the channel.
30 38
 func (p *Publisher) Subscribe() chan interface{} {
31 39
 	ch := make(chan interface{}, p.buffer)