Browse code

Use condition variable to wake stats collector.

Before the collection goroutine wakes up every 1 second (as configured).
This sleep interval is in case there are no stats to collect we don't
end up in a tight loop.

Instead use a condition variable to signal that a collection is needed.
This prevents us from waking the goroutine needlessly when there is no
one looking for stats.

For now I've kept the sleep just moved it to the end of the loop, which
gives some space between collections.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
(cherry picked from commit e75e6b0e31428c00047bc814746aff4b4c7c90ad)
Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2020/02/09 01:58:42
Showing 1 changed files
... ...
@@ -14,6 +14,7 @@ import (
14 14
 // Collector manages and provides container resource stats
15 15
 type Collector struct {
16 16
 	m          sync.Mutex
17
+	cond       *sync.Cond
17 18
 	supervisor supervisor
18 19
 	interval   time.Duration
19 20
 	publishers map[*container.Container]*pubsub.Publisher
... ...
@@ -31,6 +32,7 @@ func NewCollector(supervisor supervisor, interval time.Duration) *Collector {
31 31
 		publishers: make(map[*container.Container]*pubsub.Publisher),
32 32
 		bufReader:  bufio.NewReaderSize(nil, 128),
33 33
 	}
34
+	s.cond = sync.NewCond(&s.m)
34 35
 
35 36
 	platformNewStatsCollector(s)
36 37
 
... ...
@@ -46,13 +48,16 @@ type supervisor interface {
46 46
 // the event loop for collection on the specified interval returning
47 47
 // a channel for the subscriber to receive on.
48 48
 func (s *Collector) Collect(c *container.Container) chan interface{} {
49
-	s.m.Lock()
50
-	defer s.m.Unlock()
49
+	s.cond.L.Lock()
50
+	defer s.cond.L.Unlock()
51
+
51 52
 	publisher, exists := s.publishers[c]
52 53
 	if !exists {
53 54
 		publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
54 55
 		s.publishers[c] = publisher
55 56
 	}
57
+
58
+	s.cond.Broadcast()
56 59
 	return publisher.Subscribe()
57 60
 }
58 61
 
... ...
@@ -91,23 +96,21 @@ func (s *Collector) Run() {
91 91
 	var pairs []publishersPair
92 92
 
93 93
 	for {
94
-		// Put sleep at the start so that it will always be hit,
95
-		// preventing a tight loop if no stats are collected.
96
-		time.Sleep(s.interval)
94
+		s.cond.L.Lock()
95
+		for len(s.publishers) == 0 {
96
+			s.cond.Wait()
97
+		}
97 98
 
98 99
 		// it does not make sense in the first iteration,
99 100
 		// but saves allocations in further iterations
100 101
 		pairs = pairs[:0]
101 102
 
102
-		s.m.Lock()
103 103
 		for container, publisher := range s.publishers {
104 104
 			// copy pointers here to release the lock ASAP
105 105
 			pairs = append(pairs, publishersPair{container, publisher})
106 106
 		}
107
-		s.m.Unlock()
108
-		if len(pairs) == 0 {
109
-			continue
110
-		}
107
+
108
+		s.cond.L.Unlock()
111 109
 
112 110
 		onlineCPUs, err := s.getNumberOnlineCPUs()
113 111
 		if err != nil {
... ...
@@ -149,6 +152,8 @@ func (s *Collector) Run() {
149 149
 				})
150 150
 			}
151 151
 		}
152
+
153
+		time.Sleep(s.interval)
152 154
 	}
153 155
 }
154 156