Browse code

Extract daemon statsCollector to its own package

Signed-off-by: Vincent Demeester <vincent@sbr.pm>

Vincent Demeester authored on 2017/01/05 02:01:59
Showing 13 changed files
... ...
@@ -26,14 +26,13 @@ import (
26 26
 	"github.com/docker/docker/container"
27 27
 	"github.com/docker/docker/daemon/events"
28 28
 	"github.com/docker/docker/daemon/exec"
29
-	"github.com/docker/docker/daemon/initlayer"
30
-	"github.com/docker/docker/dockerversion"
31
-	"github.com/docker/docker/plugin"
32
-	"github.com/docker/libnetwork/cluster"
33 29
 	// register graph drivers
34 30
 	_ "github.com/docker/docker/daemon/graphdriver/register"
31
+	"github.com/docker/docker/daemon/initlayer"
32
+	"github.com/docker/docker/daemon/stats"
35 33
 	dmetadata "github.com/docker/docker/distribution/metadata"
36 34
 	"github.com/docker/docker/distribution/xfer"
35
+	"github.com/docker/docker/dockerversion"
37 36
 	"github.com/docker/docker/image"
38 37
 	"github.com/docker/docker/layer"
39 38
 	"github.com/docker/docker/libcontainerd"
... ...
@@ -46,6 +45,7 @@ import (
46 46
 	"github.com/docker/docker/pkg/sysinfo"
47 47
 	"github.com/docker/docker/pkg/system"
48 48
 	"github.com/docker/docker/pkg/truncindex"
49
+	"github.com/docker/docker/plugin"
49 50
 	"github.com/docker/docker/reference"
50 51
 	"github.com/docker/docker/registry"
51 52
 	"github.com/docker/docker/runconfig"
... ...
@@ -53,6 +53,7 @@ import (
53 53
 	"github.com/docker/docker/volume/local"
54 54
 	"github.com/docker/docker/volume/store"
55 55
 	"github.com/docker/libnetwork"
56
+	"github.com/docker/libnetwork/cluster"
56 57
 	nwconfig "github.com/docker/libnetwork/config"
57 58
 	"github.com/docker/libtrust"
58 59
 	"github.com/pkg/errors"
... ...
@@ -82,7 +83,7 @@ type Daemon struct {
82 82
 	trustKey                  libtrust.PrivateKey
83 83
 	idIndex                   *truncindex.TruncIndex
84 84
 	configStore               *Config
85
-	statsCollector            *statsCollector
85
+	statsCollector            *stats.Collector
86 86
 	defaultLogConfig          containertypes.LogConfig
87 87
 	RegistryService           registry.Service
88 88
 	EventsService             *events.Events
... ...
@@ -106,6 +107,8 @@ type Daemon struct {
106 106
 	clusterProvider           cluster.Provider
107 107
 	cluster                   Cluster
108 108
 
109
+	machineMemory uint64
110
+
109 111
 	seccompProfile     []byte
110 112
 	seccompProfilePath string
111 113
 }
... ...
@@ -1125,8 +1125,8 @@ func (daemon *Daemon) stats(c *container.Container) (*types.StatsJSON, error) {
1125 1125
 			Limit:    mem.Limit,
1126 1126
 		}
1127 1127
 		// if the container does not set memory limit, use the machineMemory
1128
-		if mem.Limit > daemon.statsCollector.machineMemory && daemon.statsCollector.machineMemory > 0 {
1129
-			s.MemoryStats.Limit = daemon.statsCollector.machineMemory
1128
+		if mem.Limit > daemon.machineMemory && daemon.machineMemory > 0 {
1129
+			s.MemoryStats.Limit = daemon.machineMemory
1130 1130
 		}
1131 1131
 		if cgs.PidsStats != nil {
1132 1132
 			s.PidsStats = types.PidsStats{
... ...
@@ -89,7 +89,7 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, forceRemo
89 89
 
90 90
 	// stop collection of stats for the container regardless
91 91
 	// if stats are currently getting collected.
92
-	daemon.statsCollector.stopCollection(container)
92
+	daemon.statsCollector.StopCollection(container)
93 93
 
94 94
 	if err = daemon.containerStop(container, 3); err != nil {
95 95
 		return err
... ...
@@ -133,11 +133,11 @@ func (daemon *Daemon) ContainerStats(ctx context.Context, prefixOrName string, c
133 133
 }
134 134
 
135 135
 func (daemon *Daemon) subscribeToContainerStats(c *container.Container) chan interface{} {
136
-	return daemon.statsCollector.collect(c)
136
+	return daemon.statsCollector.Collect(c)
137 137
 }
138 138
 
139 139
 func (daemon *Daemon) unsubscribeToContainerStats(c *container.Container, ch chan interface{}) {
140
-	daemon.statsCollector.unsubscribe(c, ch)
140
+	daemon.statsCollector.Unsubscribe(c, ch)
141 141
 }
142 142
 
143 143
 // GetContainerStats collects all the stats published by a container
144 144
new file mode 100644
... ...
@@ -0,0 +1,101 @@
0
+// +build !solaris
1
+
2
+package stats
3
+
4
+import (
5
+	"time"
6
+
7
+	"github.com/Sirupsen/logrus"
8
+	"github.com/docker/docker/container"
9
+	"github.com/docker/docker/pkg/pubsub"
10
+)
11
+
12
+// Collect registers the container with the collector and adds it to
13
+// the event loop for collection on the specified interval returning
14
+// a channel for the subscriber to receive on.
15
+func (s *Collector) Collect(c *container.Container) chan interface{} {
16
+	s.m.Lock()
17
+	defer s.m.Unlock()
18
+	publisher, exists := s.publishers[c]
19
+	if !exists {
20
+		publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
21
+		s.publishers[c] = publisher
22
+	}
23
+	return publisher.Subscribe()
24
+}
25
+
26
+// StopCollection closes the channels for all subscribers and removes
27
+// the container from metrics collection.
28
+func (s *Collector) StopCollection(c *container.Container) {
29
+	s.m.Lock()
30
+	if publisher, exists := s.publishers[c]; exists {
31
+		publisher.Close()
32
+		delete(s.publishers, c)
33
+	}
34
+	s.m.Unlock()
35
+}
36
+
37
+// Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
38
+func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) {
39
+	s.m.Lock()
40
+	publisher := s.publishers[c]
41
+	if publisher != nil {
42
+		publisher.Evict(ch)
43
+		if publisher.Len() == 0 {
44
+			delete(s.publishers, c)
45
+		}
46
+	}
47
+	s.m.Unlock()
48
+}
49
+
50
+// Run starts the collectors and will indefinitely collect stats from the supervisor
51
+func (s *Collector) Run() {
52
+	type publishersPair struct {
53
+		container *container.Container
54
+		publisher *pubsub.Publisher
55
+	}
56
+	// we cannot determine the capacity here.
57
+	// it will grow enough in first iteration
58
+	var pairs []publishersPair
59
+
60
+	for range time.Tick(s.interval) {
61
+		// it does not make sense in the first iteration,
62
+		// but saves allocations in further iterations
63
+		pairs = pairs[:0]
64
+
65
+		s.m.Lock()
66
+		for container, publisher := range s.publishers {
67
+			// copy pointers here to release the lock ASAP
68
+			pairs = append(pairs, publishersPair{container, publisher})
69
+		}
70
+		s.m.Unlock()
71
+		if len(pairs) == 0 {
72
+			continue
73
+		}
74
+
75
+		systemUsage, err := s.getSystemCPUUsage()
76
+		if err != nil {
77
+			logrus.Errorf("collecting system cpu usage: %v", err)
78
+			continue
79
+		}
80
+
81
+		for _, pair := range pairs {
82
+			stats, err := s.supervisor.GetContainerStats(pair.container)
83
+			if err != nil {
84
+				if _, ok := err.(notRunningErr); !ok {
85
+					logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
86
+				}
87
+				continue
88
+			}
89
+			// FIXME: move to containerd on Linux (not Windows)
90
+			stats.CPUStats.SystemUsage = systemUsage
91
+
92
+			pair.publisher.Publish(*stats)
93
+		}
94
+	}
95
+}
96
+
97
+type notRunningErr interface {
98
+	error
99
+	ContainerIsRunning() bool
100
+}
0 101
new file mode 100644
... ...
@@ -0,0 +1,29 @@
0
+package stats
1
+
2
+import (
3
+	"github.com/docker/docker/container"
4
+)
5
+
6
+// platformNewStatsCollector performs platform specific initialisation of the
7
+// Collector structure. This is a no-op on Windows.
8
+func platformNewStatsCollector(s *Collector) {
9
+}
10
+
11
+// Collect registers the container with the collector and adds it to
12
+// the event loop for collection on the specified interval returning
13
+// a channel for the subscriber to receive on.
14
+// Currently not supported on Solaris
15
+func (s *Collector) Collect(c *container.Container) chan interface{} {
16
+	return nil
17
+}
18
+
19
+// StopCollection closes the channels for all subscribers and removes
20
+// the container from metrics collection.
21
+// Currently not supported on Solaris
22
+func (s *Collector) StopCollection(c *container.Container) {
23
+}
24
+
25
+// Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
26
+// Currently not supported on Solaris
27
+func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) {
28
+}
0 29
new file mode 100644
... ...
@@ -0,0 +1,66 @@
0
+// +build !windows,!solaris
1
+
2
+package stats
3
+
4
+import (
5
+	"fmt"
6
+	"os"
7
+	"strconv"
8
+	"strings"
9
+
10
+	"github.com/opencontainers/runc/libcontainer/system"
11
+)
12
+
13
+// platformNewStatsCollector performs platform specific initialisation of the
14
+// Collector structure.
15
+func platformNewStatsCollector(s *Collector) {
16
+	s.clockTicksPerSecond = uint64(system.GetClockTicks())
17
+}
18
+
19
+const nanoSecondsPerSecond = 1e9
20
+
21
+// getSystemCPUUsage returns the host system's cpu usage in
22
+// nanoseconds. An error is returned if the format of the underlying
23
+// file does not match.
24
+//
25
+// Uses /proc/stat defined by POSIX. Looks for the cpu
26
+// statistics line and then sums up the first seven fields
27
+// provided. See `man 5 proc` for details on specific field
28
+// information.
29
+func (s *Collector) getSystemCPUUsage() (uint64, error) {
30
+	var line string
31
+	f, err := os.Open("/proc/stat")
32
+	if err != nil {
33
+		return 0, err
34
+	}
35
+	defer func() {
36
+		s.bufReader.Reset(nil)
37
+		f.Close()
38
+	}()
39
+	s.bufReader.Reset(f)
40
+	err = nil
41
+	for err == nil {
42
+		line, err = s.bufReader.ReadString('\n')
43
+		if err != nil {
44
+			break
45
+		}
46
+		parts := strings.Fields(line)
47
+		switch parts[0] {
48
+		case "cpu":
49
+			if len(parts) < 8 {
50
+				return 0, fmt.Errorf("invalid number of cpu fields")
51
+			}
52
+			var totalClockTicks uint64
53
+			for _, i := range parts[1:8] {
54
+				v, err := strconv.ParseUint(i, 10, 64)
55
+				if err != nil {
56
+					return 0, fmt.Errorf("Unable to convert value %s to int: %s", i, err)
57
+				}
58
+				totalClockTicks += v
59
+			}
60
+			return (totalClockTicks * nanoSecondsPerSecond) /
61
+				s.clockTicksPerSecond, nil
62
+		}
63
+	}
64
+	return 0, fmt.Errorf("invalid stat format. Error trying to parse the '/proc/stat' file")
65
+}
0 66
new file mode 100644
... ...
@@ -0,0 +1,15 @@
0
+// +build windows
1
+
2
+package stats
3
+
4
+// platformNewStatsCollector performs platform specific initialisation of the
5
+// Collector structure. This is a no-op on Windows.
6
+func platformNewStatsCollector(s *Collector) {
7
+}
8
+
9
+// getSystemCPUUsage returns the host system's cpu usage in
10
+// nanoseconds. An error is returned if the format of the underlying
11
+// file does not match. This is a no-op on Windows.
12
+func (s *Collector) getSystemCPUUsage() (uint64, error) {
13
+	return 0, nil
14
+}
0 15
new file mode 100644
... ...
@@ -0,0 +1,42 @@
0
+package stats
1
+
2
+import (
3
+	"bufio"
4
+	"sync"
5
+	"time"
6
+
7
+	"github.com/docker/docker/api/types"
8
+	"github.com/docker/docker/container"
9
+	"github.com/docker/docker/pkg/pubsub"
10
+)
11
+
12
+type supervisor interface {
13
+	// GetContainerStats collects all the stats related to a container
14
+	GetContainerStats(container *container.Container) (*types.StatsJSON, error)
15
+}
16
+
17
+// NewCollector creates a stats collector that will poll the supervisor with the specified interval
18
+func NewCollector(supervisor supervisor, interval time.Duration) *Collector {
19
+	s := &Collector{
20
+		interval:   interval,
21
+		supervisor: supervisor,
22
+		publishers: make(map[*container.Container]*pubsub.Publisher),
23
+		bufReader:  bufio.NewReaderSize(nil, 128),
24
+	}
25
+
26
+	platformNewStatsCollector(s)
27
+
28
+	return s
29
+}
30
+
31
+// Collector manages and provides container resource stats
32
+type Collector struct {
33
+	m          sync.Mutex
34
+	supervisor supervisor
35
+	interval   time.Duration
36
+	publishers map[*container.Container]*pubsub.Publisher
37
+	bufReader  *bufio.Reader
38
+
39
+	// The following fields are not set on Windows currently.
40
+	clockTicksPerSecond uint64
41
+}
... ...
@@ -1,132 +1,26 @@
1
-// +build !solaris
2
-
3 1
 package daemon
4 2
 
5 3
 import (
6
-	"bufio"
7
-	"sync"
4
+	"runtime"
8 5
 	"time"
9 6
 
10
-	"github.com/Sirupsen/logrus"
11
-	"github.com/docker/docker/api/types"
12
-	"github.com/docker/docker/container"
13
-	"github.com/docker/docker/pkg/pubsub"
7
+	"github.com/docker/docker/daemon/stats"
8
+	"github.com/docker/docker/pkg/system"
14 9
 )
15 10
 
16
-type statsSupervisor interface {
17
-	// GetContainerStats collects all the stats related to a container
18
-	GetContainerStats(container *container.Container) (*types.StatsJSON, error)
19
-}
20
-
21 11
 // newStatsCollector returns a new statsCollector that collections
22 12
 // stats for a registered container at the specified interval.
23 13
 // The collector allows non-running containers to be added
24 14
 // and will start processing stats when they are started.
25
-func (daemon *Daemon) newStatsCollector(interval time.Duration) *statsCollector {
26
-	s := &statsCollector{
27
-		interval:   interval,
28
-		supervisor: daemon,
29
-		publishers: make(map[*container.Container]*pubsub.Publisher),
30
-		bufReader:  bufio.NewReaderSize(nil, 128),
31
-	}
32
-	platformNewStatsCollector(s)
33
-	go s.run()
34
-	return s
35
-}
36
-
37
-// statsCollector manages and provides container resource stats
38
-type statsCollector struct {
39
-	m          sync.Mutex
40
-	supervisor statsSupervisor
41
-	interval   time.Duration
42
-	publishers map[*container.Container]*pubsub.Publisher
43
-	bufReader  *bufio.Reader
44
-
45
-	// The following fields are not set on Windows currently.
46
-	clockTicksPerSecond uint64
47
-	machineMemory       uint64
48
-}
49
-
50
-// collect registers the container with the collector and adds it to
51
-// the event loop for collection on the specified interval returning
52
-// a channel for the subscriber to receive on.
53
-func (s *statsCollector) collect(c *container.Container) chan interface{} {
54
-	s.m.Lock()
55
-	defer s.m.Unlock()
56
-	publisher, exists := s.publishers[c]
57
-	if !exists {
58
-		publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
59
-		s.publishers[c] = publisher
60
-	}
61
-	return publisher.Subscribe()
62
-}
63
-
64
-// stopCollection closes the channels for all subscribers and removes
65
-// the container from metrics collection.
66
-func (s *statsCollector) stopCollection(c *container.Container) {
67
-	s.m.Lock()
68
-	if publisher, exists := s.publishers[c]; exists {
69
-		publisher.Close()
70
-		delete(s.publishers, c)
71
-	}
72
-	s.m.Unlock()
73
-}
74
-
75
-// unsubscribe removes a specific subscriber from receiving updates for a container's stats.
76
-func (s *statsCollector) unsubscribe(c *container.Container, ch chan interface{}) {
77
-	s.m.Lock()
78
-	publisher := s.publishers[c]
79
-	if publisher != nil {
80
-		publisher.Evict(ch)
81
-		if publisher.Len() == 0 {
82
-			delete(s.publishers, c)
83
-		}
84
-	}
85
-	s.m.Unlock()
86
-}
87
-
88
-func (s *statsCollector) run() {
89
-	type publishersPair struct {
90
-		container *container.Container
91
-		publisher *pubsub.Publisher
92
-	}
93
-	// we cannot determine the capacity here.
94
-	// it will grow enough in first iteration
95
-	var pairs []publishersPair
96
-
97
-	for range time.Tick(s.interval) {
98
-		// it does not make sense in the first iteration,
99
-		// but saves allocations in further iterations
100
-		pairs = pairs[:0]
101
-
102
-		s.m.Lock()
103
-		for container, publisher := range s.publishers {
104
-			// copy pointers here to release the lock ASAP
105
-			pairs = append(pairs, publishersPair{container, publisher})
106
-		}
107
-		s.m.Unlock()
108
-		if len(pairs) == 0 {
109
-			continue
110
-		}
111
-
112
-		systemUsage, err := s.getSystemCPUUsage()
113
-		if err != nil {
114
-			logrus.Errorf("collecting system cpu usage: %v", err)
115
-			continue
116
-		}
117
-
118
-		for _, pair := range pairs {
119
-			stats, err := s.supervisor.GetContainerStats(pair.container)
120
-			if err != nil {
121
-				if _, ok := err.(errNotRunning); !ok {
122
-					logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
123
-				}
124
-				continue
125
-			}
126
-			// FIXME: move to containerd on Linux (not Windows)
127
-			stats.CPUStats.SystemUsage = systemUsage
128
-
129
-			pair.publisher.Publish(*stats)
15
+func (daemon *Daemon) newStatsCollector(interval time.Duration) *stats.Collector {
16
+	// FIXME(vdemeester) move this elsewhere
17
+	if runtime.GOOS == "linux" {
18
+		meminfo, err := system.ReadMemInfo()
19
+		if err == nil && meminfo.MemTotal > 0 {
20
+			daemon.machineMemory = uint64(meminfo.MemTotal)
130 21
 		}
131 22
 	}
23
+	s := stats.NewCollector(daemon, interval)
24
+	go s.Run()
25
+	return s
132 26
 }
133 27
deleted file mode 100644
... ...
@@ -1,34 +0,0 @@
1
-package daemon
2
-
3
-import (
4
-	"github.com/docker/docker/container"
5
-	"time"
6
-)
7
-
8
-// newStatsCollector returns a new statsCollector for collection stats
9
-// for a registered container at the specified interval. The collector allows
10
-// non-running containers to be added and will start processing stats when
11
-// they are started.
12
-func (daemon *Daemon) newStatsCollector(interval time.Duration) *statsCollector {
13
-	return &statsCollector{}
14
-}
15
-
16
-// statsCollector manages and provides container resource stats
17
-type statsCollector struct {
18
-}
19
-
20
-// collect registers the container with the collector and adds it to
21
-// the event loop for collection on the specified interval returning
22
-// a channel for the subscriber to receive on.
23
-func (s *statsCollector) collect(c *container.Container) chan interface{} {
24
-	return nil
25
-}
26
-
27
-// stopCollection closes the channels for all subscribers and removes
28
-// the container from metrics collection.
29
-func (s *statsCollector) stopCollection(c *container.Container) {
30
-}
31
-
32
-// unsubscribe removes a specific subscriber from receiving updates for a container's stats.
33
-func (s *statsCollector) unsubscribe(c *container.Container, ch chan interface{}) {
34
-}
35 1
deleted file mode 100644
... ...
@@ -1,71 +0,0 @@
1
-// +build !windows,!solaris
2
-
3
-package daemon
4
-
5
-import (
6
-	"fmt"
7
-	"os"
8
-	"strconv"
9
-	"strings"
10
-
11
-	sysinfo "github.com/docker/docker/pkg/system"
12
-	"github.com/opencontainers/runc/libcontainer/system"
13
-)
14
-
15
-// platformNewStatsCollector performs platform specific initialisation of the
16
-// statsCollector structure.
17
-func platformNewStatsCollector(s *statsCollector) {
18
-	s.clockTicksPerSecond = uint64(system.GetClockTicks())
19
-	meminfo, err := sysinfo.ReadMemInfo()
20
-	if err == nil && meminfo.MemTotal > 0 {
21
-		s.machineMemory = uint64(meminfo.MemTotal)
22
-	}
23
-}
24
-
25
-const nanoSecondsPerSecond = 1e9
26
-
27
-// getSystemCPUUsage returns the host system's cpu usage in
28
-// nanoseconds. An error is returned if the format of the underlying
29
-// file does not match.
30
-//
31
-// Uses /proc/stat defined by POSIX. Looks for the cpu
32
-// statistics line and then sums up the first seven fields
33
-// provided. See `man 5 proc` for details on specific field
34
-// information.
35
-func (s *statsCollector) getSystemCPUUsage() (uint64, error) {
36
-	var line string
37
-	f, err := os.Open("/proc/stat")
38
-	if err != nil {
39
-		return 0, err
40
-	}
41
-	defer func() {
42
-		s.bufReader.Reset(nil)
43
-		f.Close()
44
-	}()
45
-	s.bufReader.Reset(f)
46
-	err = nil
47
-	for err == nil {
48
-		line, err = s.bufReader.ReadString('\n')
49
-		if err != nil {
50
-			break
51
-		}
52
-		parts := strings.Fields(line)
53
-		switch parts[0] {
54
-		case "cpu":
55
-			if len(parts) < 8 {
56
-				return 0, fmt.Errorf("invalid number of cpu fields")
57
-			}
58
-			var totalClockTicks uint64
59
-			for _, i := range parts[1:8] {
60
-				v, err := strconv.ParseUint(i, 10, 64)
61
-				if err != nil {
62
-					return 0, fmt.Errorf("Unable to convert value %s to int: %s", i, err)
63
-				}
64
-				totalClockTicks += v
65
-			}
66
-			return (totalClockTicks * nanoSecondsPerSecond) /
67
-				s.clockTicksPerSecond, nil
68
-		}
69
-	}
70
-	return 0, fmt.Errorf("invalid stat format. Error trying to parse the '/proc/stat' file")
71
-}
72 1
deleted file mode 100644
... ...
@@ -1,15 +0,0 @@
1
-// +build windows
2
-
3
-package daemon
4
-
5
-// platformNewStatsCollector performs platform specific initialisation of the
6
-// statsCollector structure. This is a no-op on Windows.
7
-func platformNewStatsCollector(s *statsCollector) {
8
-}
9
-
10
-// getSystemCPUUsage returns the host system's cpu usage in
11
-// nanoseconds. An error is returned if the format of the underlying
12
-// file does not match. This is a no-op on Windows.
13
-func (s *statsCollector) getSystemCPUUsage() (uint64, error) {
14
-	return 0, nil
15
-}