Browse code

Add pubsub package to handle robust publisher

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>

Michael Crosby authored on 2015/01/20 08:29:42
Showing 6 changed files
... ...
@@ -1,11 +1,8 @@
1
+// This package is used for API stability in the types and response to the
2
+// consumers of the API stats endpoint.
1 3
 package stats
2 4
 
3
-import (
4
-	"time"
5
-
6
-	"github.com/docker/libcontainer"
7
-	"github.com/docker/libcontainer/cgroups"
8
-)
5
+import "time"
9 6
 
10 7
 type ThrottlingData struct {
11 8
 	// Number of periods with throttling active
... ...
@@ -88,69 +85,3 @@ type Stats struct {
88 88
 	MemoryStats MemoryStats `json:"memory_stats,omitempty"`
89 89
 	BlkioStats  BlkioStats  `json:"blkio_stats,omitempty"`
90 90
 }
91
-
92
-// ToStats converts the libcontainer.ContainerStats to the api specific
93
-// structs.  This is done to preserve API compatibility and versioning.
94
-func ToStats(ls *libcontainer.ContainerStats) *Stats {
95
-	s := &Stats{}
96
-	if ls.NetworkStats != nil {
97
-		s.Network = Network{
98
-			RxBytes:   ls.NetworkStats.RxBytes,
99
-			RxPackets: ls.NetworkStats.RxPackets,
100
-			RxErrors:  ls.NetworkStats.RxErrors,
101
-			RxDropped: ls.NetworkStats.RxDropped,
102
-			TxBytes:   ls.NetworkStats.TxBytes,
103
-			TxPackets: ls.NetworkStats.TxPackets,
104
-			TxErrors:  ls.NetworkStats.TxErrors,
105
-			TxDropped: ls.NetworkStats.TxDropped,
106
-		}
107
-	}
108
-	cs := ls.CgroupStats
109
-	if cs != nil {
110
-		s.BlkioStats = BlkioStats{
111
-			IoServiceBytesRecursive: copyBlkioEntry(cs.BlkioStats.IoServiceBytesRecursive),
112
-			IoServicedRecursive:     copyBlkioEntry(cs.BlkioStats.IoServicedRecursive),
113
-			IoQueuedRecursive:       copyBlkioEntry(cs.BlkioStats.IoQueuedRecursive),
114
-			IoServiceTimeRecursive:  copyBlkioEntry(cs.BlkioStats.IoServiceTimeRecursive),
115
-			IoWaitTimeRecursive:     copyBlkioEntry(cs.BlkioStats.IoWaitTimeRecursive),
116
-			IoMergedRecursive:       copyBlkioEntry(cs.BlkioStats.IoMergedRecursive),
117
-			IoTimeRecursive:         copyBlkioEntry(cs.BlkioStats.IoTimeRecursive),
118
-			SectorsRecursive:        copyBlkioEntry(cs.BlkioStats.SectorsRecursive),
119
-		}
120
-		cpu := cs.CpuStats
121
-		s.CpuStats = CpuStats{
122
-			CpuUsage: CpuUsage{
123
-				TotalUsage:        cpu.CpuUsage.TotalUsage,
124
-				PercpuUsage:       cpu.CpuUsage.PercpuUsage,
125
-				UsageInKernelmode: cpu.CpuUsage.UsageInKernelmode,
126
-				UsageInUsermode:   cpu.CpuUsage.UsageInUsermode,
127
-			},
128
-			ThrottlingData: ThrottlingData{
129
-				Periods:          cpu.ThrottlingData.Periods,
130
-				ThrottledPeriods: cpu.ThrottlingData.ThrottledPeriods,
131
-				ThrottledTime:    cpu.ThrottlingData.ThrottledTime,
132
-			},
133
-		}
134
-		mem := cs.MemoryStats
135
-		s.MemoryStats = MemoryStats{
136
-			Usage:    mem.Usage,
137
-			MaxUsage: mem.MaxUsage,
138
-			Stats:    mem.Stats,
139
-			Failcnt:  mem.Failcnt,
140
-		}
141
-	}
142
-	return s
143
-}
144
-
145
-func copyBlkioEntry(entries []cgroups.BlkioStatEntry) []BlkioStatEntry {
146
-	out := make([]BlkioStatEntry, len(entries))
147
-	for i, re := range entries {
148
-		out[i] = BlkioStatEntry{
149
-			Major: re.Major,
150
-			Minor: re.Minor,
151
-			Op:    re.Op,
152
-			Value: re.Value,
153
-		}
154
-	}
155
-	return out
156
-}
... ...
@@ -1099,7 +1099,7 @@ func (daemon *Daemon) Stats(c *Container) (*execdriver.ResourceStats, error) {
1099 1099
 	return daemon.execDriver.Stats(c.ID)
1100 1100
 }
1101 1101
 
1102
-func (daemon *Daemon) SubscribeToContainerStats(name string) (chan *execdriver.ResourceStats, error) {
1102
+func (daemon *Daemon) SubscribeToContainerStats(name string) (chan interface{}, error) {
1103 1103
 	c := daemon.Get(name)
1104 1104
 	if c == nil {
1105 1105
 		return nil, fmt.Errorf("no such container")
... ...
@@ -1108,7 +1108,7 @@ func (daemon *Daemon) SubscribeToContainerStats(name string) (chan *execdriver.R
1108 1108
 	return ch, nil
1109 1109
 }
1110 1110
 
1111
-func (daemon *Daemon) UnsubscribeToContainerStats(name string, ch chan *execdriver.ResourceStats) error {
1111
+func (daemon *Daemon) UnsubscribeToContainerStats(name string, ch chan interface{}) error {
1112 1112
 	c := daemon.Get(name)
1113 1113
 	if c == nil {
1114 1114
 		return fmt.Errorf("no such container")
... ...
@@ -4,25 +4,95 @@ import (
4 4
 	"encoding/json"
5 5
 
6 6
 	"github.com/docker/docker/api/stats"
7
+	"github.com/docker/docker/daemon/execdriver"
7 8
 	"github.com/docker/docker/engine"
9
+	"github.com/docker/libcontainer"
10
+	"github.com/docker/libcontainer/cgroups"
8 11
 )
9 12
 
10 13
 func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status {
11
-	s, err := daemon.SubscribeToContainerStats(job.Args[0])
14
+	updates, err := daemon.SubscribeToContainerStats(job.Args[0])
12 15
 	if err != nil {
13 16
 		return job.Error(err)
14 17
 	}
15 18
 	enc := json.NewEncoder(job.Stdout)
16
-	for update := range s {
17
-		ss := stats.ToStats(update.ContainerStats)
19
+	for v := range updates {
20
+		update := v.(*execdriver.ResourceStats)
21
+		ss := convertToAPITypes(update.ContainerStats)
18 22
 		ss.MemoryStats.Limit = uint64(update.MemoryLimit)
19 23
 		ss.Read = update.Read
20 24
 		ss.CpuStats.SystemUsage = update.SystemUsage
21 25
 		if err := enc.Encode(ss); err != nil {
22 26
 			// TODO: handle the specific broken pipe
23
-			daemon.UnsubscribeToContainerStats(job.Args[0], s)
27
+			daemon.UnsubscribeToContainerStats(job.Args[0], updates)
24 28
 			return job.Error(err)
25 29
 		}
26 30
 	}
27 31
 	return engine.StatusOK
28 32
 }
33
+
34
+// convertToAPITypes converts the libcontainer.ContainerStats to the api specific
35
+// structs.  This is done to preserve API compatibility and versioning.
36
+func convertToAPITypes(ls *libcontainer.ContainerStats) *stats.Stats {
37
+	s := &stats.Stats{}
38
+	if ls.NetworkStats != nil {
39
+		s.Network = stats.Network{
40
+			RxBytes:   ls.NetworkStats.RxBytes,
41
+			RxPackets: ls.NetworkStats.RxPackets,
42
+			RxErrors:  ls.NetworkStats.RxErrors,
43
+			RxDropped: ls.NetworkStats.RxDropped,
44
+			TxBytes:   ls.NetworkStats.TxBytes,
45
+			TxPackets: ls.NetworkStats.TxPackets,
46
+			TxErrors:  ls.NetworkStats.TxErrors,
47
+			TxDropped: ls.NetworkStats.TxDropped,
48
+		}
49
+	}
50
+	cs := ls.CgroupStats
51
+	if cs != nil {
52
+		s.BlkioStats = stats.BlkioStats{
53
+			IoServiceBytesRecursive: copyBlkioEntry(cs.BlkioStats.IoServiceBytesRecursive),
54
+			IoServicedRecursive:     copyBlkioEntry(cs.BlkioStats.IoServicedRecursive),
55
+			IoQueuedRecursive:       copyBlkioEntry(cs.BlkioStats.IoQueuedRecursive),
56
+			IoServiceTimeRecursive:  copyBlkioEntry(cs.BlkioStats.IoServiceTimeRecursive),
57
+			IoWaitTimeRecursive:     copyBlkioEntry(cs.BlkioStats.IoWaitTimeRecursive),
58
+			IoMergedRecursive:       copyBlkioEntry(cs.BlkioStats.IoMergedRecursive),
59
+			IoTimeRecursive:         copyBlkioEntry(cs.BlkioStats.IoTimeRecursive),
60
+			SectorsRecursive:        copyBlkioEntry(cs.BlkioStats.SectorsRecursive),
61
+		}
62
+		cpu := cs.CpuStats
63
+		s.CpuStats = stats.CpuStats{
64
+			CpuUsage: stats.CpuUsage{
65
+				TotalUsage:        cpu.CpuUsage.TotalUsage,
66
+				PercpuUsage:       cpu.CpuUsage.PercpuUsage,
67
+				UsageInKernelmode: cpu.CpuUsage.UsageInKernelmode,
68
+				UsageInUsermode:   cpu.CpuUsage.UsageInUsermode,
69
+			},
70
+			ThrottlingData: stats.ThrottlingData{
71
+				Periods:          cpu.ThrottlingData.Periods,
72
+				ThrottledPeriods: cpu.ThrottlingData.ThrottledPeriods,
73
+				ThrottledTime:    cpu.ThrottlingData.ThrottledTime,
74
+			},
75
+		}
76
+		mem := cs.MemoryStats
77
+		s.MemoryStats = stats.MemoryStats{
78
+			Usage:    mem.Usage,
79
+			MaxUsage: mem.MaxUsage,
80
+			Stats:    mem.Stats,
81
+			Failcnt:  mem.Failcnt,
82
+		}
83
+	}
84
+	return s
85
+}
86
+
87
+func copyBlkioEntry(entries []cgroups.BlkioStatEntry) []stats.BlkioStatEntry {
88
+	out := make([]stats.BlkioStatEntry, len(entries))
89
+	for i, re := range entries {
90
+		out[i] = stats.BlkioStatEntry{
91
+			Major: re.Major,
92
+			Minor: re.Minor,
93
+			Op:    re.Op,
94
+			Value: re.Value,
95
+		}
96
+	}
97
+	return out
98
+}
... ...
@@ -11,6 +11,7 @@ import (
11 11
 
12 12
 	log "github.com/Sirupsen/logrus"
13 13
 	"github.com/docker/docker/daemon/execdriver"
14
+	"github.com/docker/docker/pkg/pubsub"
14 15
 	"github.com/docker/libcontainer/system"
15 16
 )
16 17
 
... ...
@@ -21,114 +22,75 @@ import (
21 21
 func newStatsCollector(interval time.Duration) *statsCollector {
22 22
 	s := &statsCollector{
23 23
 		interval:   interval,
24
-		containers: make(map[string]*statsData),
24
+		publishers: make(map[*Container]*pubsub.Publisher),
25 25
 		clockTicks: uint64(system.GetClockTicks()),
26 26
 	}
27
-	s.start()
27
+	go s.run()
28 28
 	return s
29 29
 }
30 30
 
31
-type statsData struct {
32
-	c         *Container
33
-	lastStats *execdriver.ResourceStats
34
-	subs      []chan *execdriver.ResourceStats
35
-}
36
-
37 31
 // statsCollector manages and provides container resource stats
38 32
 type statsCollector struct {
39 33
 	m          sync.Mutex
40 34
 	interval   time.Duration
41 35
 	clockTicks uint64
42
-	containers map[string]*statsData
36
+	publishers map[*Container]*pubsub.Publisher
43 37
 }
44 38
 
45 39
 // collect registers the container with the collector and adds it to
46 40
 // the event loop for collection on the specified interval returning
47 41
 // a channel for the subscriber to receive on.
48
-func (s *statsCollector) collect(c *Container) chan *execdriver.ResourceStats {
42
+func (s *statsCollector) collect(c *Container) chan interface{} {
49 43
 	s.m.Lock()
50 44
 	defer s.m.Unlock()
51
-	ch := make(chan *execdriver.ResourceStats, 1024)
52
-	if _, exists := s.containers[c.ID]; exists {
53
-		s.containers[c.ID].subs = append(s.containers[c.ID].subs, ch)
54
-		return ch
45
+	publisher, exists := s.publishers[c]
46
+	if !exists {
47
+		publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
48
+		s.publishers[c] = publisher
55 49
 	}
56
-	s.containers[c.ID] = &statsData{
57
-		c: c,
58
-		subs: []chan *execdriver.ResourceStats{
59
-			ch,
60
-		},
61
-	}
62
-	return ch
50
+	return publisher.Subscribe()
63 51
 }
64 52
 
65 53
 // stopCollection closes the channels for all subscribers and removes
66 54
 // the container from metrics collection.
67 55
 func (s *statsCollector) stopCollection(c *Container) {
68 56
 	s.m.Lock()
69
-	defer s.m.Unlock()
70
-	d := s.containers[c.ID]
71
-	if d == nil {
72
-		return
57
+	if publisher, exists := s.publishers[c]; exists {
58
+		publisher.Close()
59
+		delete(s.publishers, c)
73 60
 	}
74
-	for _, sub := range d.subs {
75
-		close(sub)
76
-	}
77
-	delete(s.containers, c.ID)
61
+	s.m.Unlock()
78 62
 }
79 63
 
80
-// unsubscribe removes a specific subscriber from receiving updates for a
81
-// container's stats.
82
-func (s *statsCollector) unsubscribe(c *Container, ch chan *execdriver.ResourceStats) {
64
+// unsubscribe removes a specific subscriber from receiving updates for a container's stats.
65
+func (s *statsCollector) unsubscribe(c *Container, ch chan interface{}) {
83 66
 	s.m.Lock()
84
-	cd := s.containers[c.ID]
85
-	for i, sub := range cd.subs {
86
-		if ch == sub {
87
-			cd.subs = append(cd.subs[:i], cd.subs[i+1:]...)
88
-			close(ch)
89
-		}
90
-	}
91
-	// if there are no more subscribers then remove the entire container
92
-	// from collection.
93
-	if len(cd.subs) == 0 {
94
-		delete(s.containers, c.ID)
67
+	publisher := s.publishers[c]
68
+	if publisher != nil {
69
+		publisher.Evict(ch)
95 70
 	}
96 71
 	s.m.Unlock()
97 72
 }
98 73
 
99
-func (s *statsCollector) start() {
100
-	go func() {
101
-		for _ = range time.Tick(s.interval) {
102
-			s.m.Lock()
103
-			for id, d := range s.containers {
104
-				systemUsage, err := s.getSystemCpuUsage()
105
-				if err != nil {
106
-					log.Errorf("collecting system cpu usage for %s: %v", id, err)
107
-					continue
108
-				}
109
-				stats, err := d.c.Stats()
110
-				if err != nil {
111
-					if err == execdriver.ErrNotRunning {
112
-						continue
113
-					}
114
-					// if the error is not because the container is currently running then
115
-					// evict the container from the collector and close the channel for
116
-					// any subscribers currently waiting on changes.
117
-					log.Errorf("collecting stats for %s: %v", id, err)
118
-					for _, sub := range s.containers[id].subs {
119
-						close(sub)
120
-					}
121
-					delete(s.containers, id)
122
-					continue
123
-				}
124
-				stats.SystemUsage = systemUsage
125
-				for _, sub := range s.containers[id].subs {
126
-					sub <- stats
74
+func (s *statsCollector) run() {
75
+	for _ = range time.Tick(s.interval) {
76
+		for container, publisher := range s.publishers {
77
+			systemUsage, err := s.getSystemCpuUsage()
78
+			if err != nil {
79
+				log.Errorf("collecting system cpu usage for %s: %v", container.ID, err)
80
+				continue
81
+			}
82
+			stats, err := container.Stats()
83
+			if err != nil {
84
+				if err != execdriver.ErrNotRunning {
85
+					log.Errorf("collecting stats for %s: %v", container.ID, err)
127 86
 				}
87
+				continue
128 88
 			}
129
-			s.m.Unlock()
89
+			stats.SystemUsage = systemUsage
90
+			publisher.Publish(stats)
130 91
 		}
131
-	}()
92
+	}
132 93
 }
133 94
 
134 95
 const nanoSeconds = 1e9
135 96
new file mode 100644
... ...
@@ -0,0 +1,66 @@
0
+package pubsub
1
+
2
+import (
3
+	"sync"
4
+	"time"
5
+)
6
+
7
+// NewPublisher creates a new pub/sub publisher to broadcast messages.
8
+// The duration is used as the send timeout as to not block the publisher publishing
9
+// messages to other clients if one client is slow or unresponsive.
10
+// The buffer is used when creating new channels for subscribers.
11
+func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
12
+	return &Publisher{
13
+		buffer:      buffer,
14
+		timeout:     publishTimeout,
15
+		subscribers: make(map[subscriber]struct{}),
16
+	}
17
+}
18
+
19
+type subscriber chan interface{}
20
+
21
+type Publisher struct {
22
+	m           sync.RWMutex
23
+	buffer      int
24
+	timeout     time.Duration
25
+	subscribers map[subscriber]struct{}
26
+}
27
+
28
+// Subscribe adds a new subscriber to the publisher returning the channel.
29
+func (p *Publisher) Subscribe() chan interface{} {
30
+	ch := make(chan interface{}, p.buffer)
31
+	p.m.Lock()
32
+	p.subscribers[ch] = struct{}{}
33
+	p.m.Unlock()
34
+	return ch
35
+}
36
+
37
+// Evict removes the specified subscriber from receiving any more messages.
38
+func (p *Publisher) Evict(sub chan interface{}) {
39
+	p.m.Lock()
40
+	delete(p.subscribers, sub)
41
+	close(sub)
42
+	p.m.Unlock()
43
+}
44
+
45
+// Publish sends the data in v to all subscribers currently registered with the publisher.
46
+func (p *Publisher) Publish(v interface{}) {
47
+	p.m.RLock()
48
+	for sub := range p.subscribers {
49
+		// send under a select as to not block if the receiver is unavailable
50
+		select {
51
+		case sub <- v:
52
+		case <-time.After(p.timeout):
53
+		}
54
+	}
55
+	p.m.RUnlock()
56
+}
57
+
58
+// Close closes the channels to all subscribers registered with the publisher.
59
+func (p *Publisher) Close() {
60
+	p.m.Lock()
61
+	for sub := range p.subscribers {
62
+		close(sub)
63
+	}
64
+	p.m.Unlock()
65
+}
0 66
new file mode 100644
... ...
@@ -0,0 +1,63 @@
0
+package pubsub
1
+
2
+import (
3
+	"testing"
4
+	"time"
5
+)
6
+
7
+func TestSendToOneSub(t *testing.T) {
8
+	p := NewPublisher(100*time.Millisecond, 10)
9
+	c := p.Subscribe()
10
+
11
+	p.Publish("hi")
12
+
13
+	msg := <-c
14
+	if msg.(string) != "hi" {
15
+		t.Fatalf("expected message hi but received %v", msg)
16
+	}
17
+}
18
+
19
+func TestSendToMultipleSubs(t *testing.T) {
20
+	p := NewPublisher(100*time.Millisecond, 10)
21
+	subs := []chan interface{}{}
22
+	subs = append(subs, p.Subscribe(), p.Subscribe(), p.Subscribe())
23
+
24
+	p.Publish("hi")
25
+
26
+	for _, c := range subs {
27
+		msg := <-c
28
+		if msg.(string) != "hi" {
29
+			t.Fatalf("expected message hi but received %v", msg)
30
+		}
31
+	}
32
+}
33
+
34
+func TestEvictOneSub(t *testing.T) {
35
+	p := NewPublisher(100*time.Millisecond, 10)
36
+	s1 := p.Subscribe()
37
+	s2 := p.Subscribe()
38
+
39
+	p.Evict(s1)
40
+	p.Publish("hi")
41
+	if _, ok := <-s1; ok {
42
+		t.Fatal("expected s1 to not receive the published message")
43
+	}
44
+
45
+	msg := <-s2
46
+	if msg.(string) != "hi" {
47
+		t.Fatalf("expected message hi but received %v", msg)
48
+	}
49
+}
50
+
51
+func TestClosePublisher(t *testing.T) {
52
+	p := NewPublisher(100*time.Millisecond, 10)
53
+	subs := []chan interface{}{}
54
+	subs = append(subs, p.Subscribe(), p.Subscribe(), p.Subscribe())
55
+	p.Close()
56
+
57
+	for _, c := range subs {
58
+		if _, ok := <-c; ok {
59
+			t.Fatal("expected all subscriber channels to be closed")
60
+		}
61
+	}
62
+}