Browse code

Windows: Factor out stat collector

Signed-off-by: John Howard <jhoward@microsoft.com>

John Howard authored on 2015/05/29 04:21:32
Showing 3 changed files
1 1
deleted file mode 100644
... ...
@@ -1,160 +0,0 @@
1
-package daemon
2
-
3
-import (
4
-	"bufio"
5
-	"fmt"
6
-	"os"
7
-	"strconv"
8
-	"strings"
9
-	"sync"
10
-	"time"
11
-
12
-	"github.com/Sirupsen/logrus"
13
-	"github.com/docker/docker/daemon/execdriver"
14
-	"github.com/docker/docker/pkg/pubsub"
15
-	"github.com/docker/libcontainer/system"
16
-)
17
-
18
-// newStatsCollector returns a new statsCollector that collections
19
-// network and cgroup stats for a registered container at the specified
20
-// interval.  The collector allows non-running containers to be added
21
-// and will start processing stats when they are started.
22
-func newStatsCollector(interval time.Duration) *statsCollector {
23
-	s := &statsCollector{
24
-		interval:   interval,
25
-		publishers: make(map[*Container]*pubsub.Publisher),
26
-		clockTicks: uint64(system.GetClockTicks()),
27
-		bufReader:  bufio.NewReaderSize(nil, 128),
28
-	}
29
-	go s.run()
30
-	return s
31
-}
32
-
33
-// statsCollector manages and provides container resource stats
34
-type statsCollector struct {
35
-	m          sync.Mutex
36
-	interval   time.Duration
37
-	clockTicks uint64
38
-	publishers map[*Container]*pubsub.Publisher
39
-	bufReader  *bufio.Reader
40
-}
41
-
42
-// collect registers the container with the collector and adds it to
43
-// the event loop for collection on the specified interval returning
44
-// a channel for the subscriber to receive on.
45
-func (s *statsCollector) collect(c *Container) chan interface{} {
46
-	s.m.Lock()
47
-	defer s.m.Unlock()
48
-	publisher, exists := s.publishers[c]
49
-	if !exists {
50
-		publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
51
-		s.publishers[c] = publisher
52
-	}
53
-	return publisher.Subscribe()
54
-}
55
-
56
-// stopCollection closes the channels for all subscribers and removes
57
-// the container from metrics collection.
58
-func (s *statsCollector) stopCollection(c *Container) {
59
-	s.m.Lock()
60
-	if publisher, exists := s.publishers[c]; exists {
61
-		publisher.Close()
62
-		delete(s.publishers, c)
63
-	}
64
-	s.m.Unlock()
65
-}
66
-
67
-// unsubscribe removes a specific subscriber from receiving updates for a container's stats.
68
-func (s *statsCollector) unsubscribe(c *Container, ch chan interface{}) {
69
-	s.m.Lock()
70
-	publisher := s.publishers[c]
71
-	if publisher != nil {
72
-		publisher.Evict(ch)
73
-		if publisher.Len() == 0 {
74
-			delete(s.publishers, c)
75
-		}
76
-	}
77
-	s.m.Unlock()
78
-}
79
-
80
-func (s *statsCollector) run() {
81
-	type publishersPair struct {
82
-		container *Container
83
-		publisher *pubsub.Publisher
84
-	}
85
-	// we cannot determine the capacity here.
86
-	// it will grow enough in first iteration
87
-	var pairs []publishersPair
88
-
89
-	for range time.Tick(s.interval) {
90
-		systemUsage, err := s.getSystemCpuUsage()
91
-		if err != nil {
92
-			logrus.Errorf("collecting system cpu usage: %v", err)
93
-			continue
94
-		}
95
-
96
-		// it does not make sense in the first iteration,
97
-		// but saves allocations in further iterations
98
-		pairs = pairs[:0]
99
-
100
-		s.m.Lock()
101
-		for container, publisher := range s.publishers {
102
-			// copy pointers here to release the lock ASAP
103
-			pairs = append(pairs, publishersPair{container, publisher})
104
-		}
105
-		s.m.Unlock()
106
-
107
-		for _, pair := range pairs {
108
-			stats, err := pair.container.Stats()
109
-			if err != nil {
110
-				if err != execdriver.ErrNotRunning {
111
-					logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
112
-				}
113
-				continue
114
-			}
115
-			stats.SystemUsage = systemUsage
116
-			pair.publisher.Publish(stats)
117
-		}
118
-	}
119
-}
120
-
121
-const nanoSeconds = 1e9
122
-
123
-// getSystemCpuUSage returns the host system's cpu usage in nanoseconds
124
-// for the system to match the cgroup readings are returned in the same format.
125
-func (s *statsCollector) getSystemCpuUsage() (uint64, error) {
126
-	var line string
127
-	f, err := os.Open("/proc/stat")
128
-	if err != nil {
129
-		return 0, err
130
-	}
131
-	defer func() {
132
-		s.bufReader.Reset(nil)
133
-		f.Close()
134
-	}()
135
-	s.bufReader.Reset(f)
136
-	err = nil
137
-	for err == nil {
138
-		line, err = s.bufReader.ReadString('\n')
139
-		if err != nil {
140
-			break
141
-		}
142
-		parts := strings.Fields(line)
143
-		switch parts[0] {
144
-		case "cpu":
145
-			if len(parts) < 8 {
146
-				return 0, fmt.Errorf("invalid number of cpu fields")
147
-			}
148
-			var sum uint64
149
-			for _, i := range parts[1:8] {
150
-				v, err := strconv.ParseUint(i, 10, 64)
151
-				if err != nil {
152
-					return 0, fmt.Errorf("Unable to convert value %s to int: %s", i, err)
153
-				}
154
-				sum += v
155
-			}
156
-			return (sum * nanoSeconds) / s.clockTicks, nil
157
-		}
158
-	}
159
-	return 0, fmt.Errorf("invalid stat format")
160
-}
161 1
new file mode 100644
... ...
@@ -0,0 +1,162 @@
0
+// +build !windows
1
+
2
+package daemon
3
+
4
+import (
5
+	"bufio"
6
+	"fmt"
7
+	"os"
8
+	"strconv"
9
+	"strings"
10
+	"sync"
11
+	"time"
12
+
13
+	"github.com/Sirupsen/logrus"
14
+	"github.com/docker/docker/daemon/execdriver"
15
+	"github.com/docker/docker/pkg/pubsub"
16
+	"github.com/docker/libcontainer/system"
17
+)
18
+
19
+// newStatsCollector returns a new statsCollector that collections
20
+// network and cgroup stats for a registered container at the specified
21
+// interval.  The collector allows non-running containers to be added
22
+// and will start processing stats when they are started.
23
+func newStatsCollector(interval time.Duration) *statsCollector {
24
+	s := &statsCollector{
25
+		interval:   interval,
26
+		publishers: make(map[*Container]*pubsub.Publisher),
27
+		clockTicks: uint64(system.GetClockTicks()),
28
+		bufReader:  bufio.NewReaderSize(nil, 128),
29
+	}
30
+	go s.run()
31
+	return s
32
+}
33
+
34
+// statsCollector manages and provides container resource stats
35
+type statsCollector struct {
36
+	m          sync.Mutex
37
+	interval   time.Duration
38
+	clockTicks uint64
39
+	publishers map[*Container]*pubsub.Publisher
40
+	bufReader  *bufio.Reader
41
+}
42
+
43
+// collect registers the container with the collector and adds it to
44
+// the event loop for collection on the specified interval returning
45
+// a channel for the subscriber to receive on.
46
+func (s *statsCollector) collect(c *Container) chan interface{} {
47
+	s.m.Lock()
48
+	defer s.m.Unlock()
49
+	publisher, exists := s.publishers[c]
50
+	if !exists {
51
+		publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
52
+		s.publishers[c] = publisher
53
+	}
54
+	return publisher.Subscribe()
55
+}
56
+
57
+// stopCollection closes the channels for all subscribers and removes
58
+// the container from metrics collection.
59
+func (s *statsCollector) stopCollection(c *Container) {
60
+	s.m.Lock()
61
+	if publisher, exists := s.publishers[c]; exists {
62
+		publisher.Close()
63
+		delete(s.publishers, c)
64
+	}
65
+	s.m.Unlock()
66
+}
67
+
68
+// unsubscribe removes a specific subscriber from receiving updates for a container's stats.
69
+func (s *statsCollector) unsubscribe(c *Container, ch chan interface{}) {
70
+	s.m.Lock()
71
+	publisher := s.publishers[c]
72
+	if publisher != nil {
73
+		publisher.Evict(ch)
74
+		if publisher.Len() == 0 {
75
+			delete(s.publishers, c)
76
+		}
77
+	}
78
+	s.m.Unlock()
79
+}
80
+
81
+func (s *statsCollector) run() {
82
+	type publishersPair struct {
83
+		container *Container
84
+		publisher *pubsub.Publisher
85
+	}
86
+	// we cannot determine the capacity here.
87
+	// it will grow enough in first iteration
88
+	var pairs []publishersPair
89
+
90
+	for range time.Tick(s.interval) {
91
+		systemUsage, err := s.getSystemCpuUsage()
92
+		if err != nil {
93
+			logrus.Errorf("collecting system cpu usage: %v", err)
94
+			continue
95
+		}
96
+
97
+		// it does not make sense in the first iteration,
98
+		// but saves allocations in further iterations
99
+		pairs = pairs[:0]
100
+
101
+		s.m.Lock()
102
+		for container, publisher := range s.publishers {
103
+			// copy pointers here to release the lock ASAP
104
+			pairs = append(pairs, publishersPair{container, publisher})
105
+		}
106
+		s.m.Unlock()
107
+
108
+		for _, pair := range pairs {
109
+			stats, err := pair.container.Stats()
110
+			if err != nil {
111
+				if err != execdriver.ErrNotRunning {
112
+					logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
113
+				}
114
+				continue
115
+			}
116
+			stats.SystemUsage = systemUsage
117
+			pair.publisher.Publish(stats)
118
+		}
119
+	}
120
+}
121
+
122
+const nanoSeconds = 1e9
123
+
124
+// getSystemCpuUSage returns the host system's cpu usage in nanoseconds
125
+// for the system to match the cgroup readings are returned in the same format.
126
+func (s *statsCollector) getSystemCpuUsage() (uint64, error) {
127
+	var line string
128
+	f, err := os.Open("/proc/stat")
129
+	if err != nil {
130
+		return 0, err
131
+	}
132
+	defer func() {
133
+		s.bufReader.Reset(nil)
134
+		f.Close()
135
+	}()
136
+	s.bufReader.Reset(f)
137
+	err = nil
138
+	for err == nil {
139
+		line, err = s.bufReader.ReadString('\n')
140
+		if err != nil {
141
+			break
142
+		}
143
+		parts := strings.Fields(line)
144
+		switch parts[0] {
145
+		case "cpu":
146
+			if len(parts) < 8 {
147
+				return 0, fmt.Errorf("invalid number of cpu fields")
148
+			}
149
+			var sum uint64
150
+			for _, i := range parts[1:8] {
151
+				v, err := strconv.ParseUint(i, 10, 64)
152
+				if err != nil {
153
+					return 0, fmt.Errorf("Unable to convert value %s to int: %s", i, err)
154
+				}
155
+				sum += v
156
+			}
157
+			return (sum * nanoSeconds) / s.clockTicks, nil
158
+		}
159
+	}
160
+	return 0, fmt.Errorf("invalid stat format")
161
+}
0 162
new file mode 100644
... ...
@@ -0,0 +1,31 @@
0
+package daemon
1
+
2
+import "time"
3
+
4
+// newStatsCollector returns a new statsCollector for collection stats
5
+// for a registered container at the specified interval. The collector allows
6
+// non-running containers to be added and will start processing stats when
7
+// they are started.
8
+func newStatsCollector(interval time.Duration) *statsCollector {
9
+	return &statsCollector{}
10
+}
11
+
12
+// statsCollector manages and provides container resource stats
13
+type statsCollector struct {
14
+}
15
+
16
+// collect registers the container with the collector and adds it to
17
+// the event loop for collection on the specified interval returning
18
+// a channel for the subscriber to receive on.
19
+func (s *statsCollector) collect(c *Container) chan interface{} {
20
+	return nil
21
+}
22
+
23
+// stopCollection closes the channels for all subscribers and removes
24
+// the container from metrics collection.
25
+func (s *statsCollector) stopCollection(c *Container) {
26
+}
27
+
28
+// unsubscribe removes a specific subscriber from receiving updates for a container's stats.
29
+func (s *statsCollector) unsubscribe(c *Container, ch chan interface{}) {
30
+}