Signed-off-by: David Calavera <david.calavera@gmail.com>
David Calavera authored on 2016/03/08 09:02:35... | ... |
@@ -50,33 +50,23 @@ func (e *Events) Subscribe() ([]eventtypes.Message, chan interface{}, func()) { |
50 | 50 |
// of interface{}, so you need type assertion). |
51 | 51 |
func (e *Events) SubscribeTopic(since, sinceNano int64, ef *Filter) ([]eventtypes.Message, chan interface{}) { |
52 | 52 |
e.mu.Lock() |
53 |
- defer e.mu.Unlock() |
|
54 | 53 |
|
55 |
- var buffered []eventtypes.Message |
|
56 |
- topic := func(m interface{}) bool { |
|
57 |
- return ef.Include(m.(eventtypes.Message)) |
|
54 |
+ var topic func(m interface{}) bool |
|
55 |
+ if ef != nil && ef.filter.Len() > 0 { |
|
56 |
+ topic = func(m interface{}) bool { return ef.Include(m.(eventtypes.Message)) } |
|
58 | 57 |
} |
59 | 58 |
|
60 |
- if since != -1 { |
|
61 |
- for i := len(e.events) - 1; i >= 0; i-- { |
|
62 |
- ev := e.events[i] |
|
63 |
- if ev.Time < since || ((ev.Time == since) && (ev.TimeNano < sinceNano)) { |
|
64 |
- break |
|
65 |
- } |
|
66 |
- if ef.filter.Len() == 0 || topic(ev) { |
|
67 |
- buffered = append([]eventtypes.Message{ev}, buffered...) |
|
68 |
- } |
|
69 |
- } |
|
70 |
- } |
|
59 |
+ buffered := e.loadBufferedEvents(since, sinceNano, topic) |
|
71 | 60 |
|
72 | 61 |
var ch chan interface{} |
73 |
- if ef.filter.Len() > 0 { |
|
62 |
+ if topic != nil { |
|
74 | 63 |
ch = e.pub.SubscribeTopic(topic) |
75 | 64 |
} else { |
76 | 65 |
// Subscribe to all events if there are no filters |
77 | 66 |
ch = e.pub.Subscribe() |
78 | 67 |
} |
79 | 68 |
|
69 |
+ e.mu.Unlock() |
|
80 | 70 |
return buffered, ch |
81 | 71 |
} |
82 | 72 |
|
... | ... |
@@ -124,3 +114,29 @@ func (e *Events) Log(action, eventType string, actor eventtypes.Actor) { |
124 | 124 |
func (e *Events) SubscribersCount() int { |
125 | 125 |
return e.pub.Len() |
126 | 126 |
} |
127 |
+ |
|
128 |
+// loadBufferedEvents iterates over the cached events in the buffer |
|
129 |
+// and returns those that were emitted before a specific date. |
|
130 |
+// The date is splitted in two values: |
|
131 |
+// - the `since` argument is a date timestamp without nanoseconds, or -1 to return an empty slice. |
|
132 |
+// - the `sinceNano` argument is the nanoseconds offset from the timestamp. |
|
133 |
+// It uses `time.Unix(seconds, nanoseconds)` to generate a valid date with those two first arguments. |
|
134 |
+// It filters those buffered messages with a topic function if it's not nil, otherwise it adds all messages. |
|
135 |
+func (e *Events) loadBufferedEvents(since, sinceNano int64, topic func(interface{}) bool) []eventtypes.Message { |
|
136 |
+ var buffered []eventtypes.Message |
|
137 |
+ if since == -1 { |
|
138 |
+ return buffered |
|
139 |
+ } |
|
140 |
+ |
|
141 |
+ sinceNanoUnix := time.Unix(since, sinceNano).UnixNano() |
|
142 |
+ for i := len(e.events) - 1; i >= 0; i-- { |
|
143 |
+ ev := e.events[i] |
|
144 |
+ if ev.TimeNano < sinceNanoUnix { |
|
145 |
+ break |
|
146 |
+ } |
|
147 |
+ if topic == nil || topic(ev) { |
|
148 |
+ buffered = append([]eventtypes.Message{ev}, buffered...) |
|
149 |
+ } |
|
150 |
+ } |
|
151 |
+ return buffered |
|
152 |
+} |
... | ... |
@@ -5,7 +5,9 @@ import ( |
5 | 5 |
"testing" |
6 | 6 |
"time" |
7 | 7 |
|
8 |
+ "github.com/docker/docker/daemon/events/testutils" |
|
8 | 9 |
"github.com/docker/engine-api/types/events" |
10 |
+ timetypes "github.com/docker/engine-api/types/time" |
|
9 | 11 |
) |
10 | 12 |
|
11 | 13 |
func TestEventsLog(t *testing.T) { |
... | ... |
@@ -150,3 +152,45 @@ func TestLogEvents(t *testing.T) { |
150 | 150 |
t.Fatalf("Last action is %s, must be action_89", lastC.Status) |
151 | 151 |
} |
152 | 152 |
} |
153 |
+ |
|
154 |
+// https://github.com/docker/docker/issues/20999 |
|
155 |
+// Fixtures: |
|
156 |
+// |
|
157 |
+//2016-03-07T17:28:03.022433271+02:00 container die 0b863f2a26c18557fc6cdadda007c459f9ec81b874780808138aea78a3595079 (image=ubuntu, name=small_hoover) |
|
158 |
+//2016-03-07T17:28:03.091719377+02:00 network disconnect 19c5ed41acb798f26b751e0035cd7821741ab79e2bbd59a66b5fd8abf954eaa0 (type=bridge, container=0b863f2a26c18557fc6cdadda007c459f9ec81b874780808138aea78a3595079, name=bridge) |
|
159 |
+//2016-03-07T17:28:03.129014751+02:00 container destroy 0b863f2a26c18557fc6cdadda007c459f9ec81b874780808138aea78a3595079 (image=ubuntu, name=small_hoover) |
|
160 |
+func TestLoadBufferedEvents(t *testing.T) { |
|
161 |
+ now := time.Now() |
|
162 |
+ f, err := timetypes.GetTimestamp("2016-03-07T17:28:03.100000000+02:00", now) |
|
163 |
+ if err != nil { |
|
164 |
+ t.Fatal(err) |
|
165 |
+ } |
|
166 |
+ since, sinceNano, err := timetypes.ParseTimestamps(f, -1) |
|
167 |
+ if err != nil { |
|
168 |
+ t.Fatal(err) |
|
169 |
+ } |
|
170 |
+ |
|
171 |
+ m1, err := eventstestutils.Scan("2016-03-07T17:28:03.022433271+02:00 container die 0b863f2a26c18557fc6cdadda007c459f9ec81b874780808138aea78a3595079 (image=ubuntu, name=small_hoover)") |
|
172 |
+ if err != nil { |
|
173 |
+ t.Fatal(err) |
|
174 |
+ } |
|
175 |
+ m2, err := eventstestutils.Scan("2016-03-07T17:28:03.091719377+02:00 network disconnect 19c5ed41acb798f26b751e0035cd7821741ab79e2bbd59a66b5fd8abf954eaa0 (type=bridge, container=0b863f2a26c18557fc6cdadda007c459f9ec81b874780808138aea78a3595079, name=bridge)") |
|
176 |
+ if err != nil { |
|
177 |
+ t.Fatal(err) |
|
178 |
+ } |
|
179 |
+ m3, err := eventstestutils.Scan("2016-03-07T17:28:03.129014751+02:00 container destroy 0b863f2a26c18557fc6cdadda007c459f9ec81b874780808138aea78a3595079 (image=ubuntu, name=small_hoover)") |
|
180 |
+ if err != nil { |
|
181 |
+ t.Fatal(err) |
|
182 |
+ } |
|
183 |
+ |
|
184 |
+ buffered := []events.Message{*m1, *m2, *m3} |
|
185 |
+ |
|
186 |
+ events := &Events{ |
|
187 |
+ events: buffered, |
|
188 |
+ } |
|
189 |
+ |
|
190 |
+ out := events.loadBufferedEvents(since, sinceNano, nil) |
|
191 |
+ if len(out) != 1 { |
|
192 |
+ t.Fatalf("expected 1 message, got %d: %v", len(out), out) |
|
193 |
+ } |
|
194 |
+} |
153 | 195 |
new file mode 100644 |
... | ... |
@@ -0,0 +1,76 @@ |
0 |
+package eventstestutils |
|
1 |
+ |
|
2 |
+import ( |
|
3 |
+ "fmt" |
|
4 |
+ "regexp" |
|
5 |
+ "strings" |
|
6 |
+ "time" |
|
7 |
+ |
|
8 |
+ "github.com/docker/engine-api/types/events" |
|
9 |
+ timetypes "github.com/docker/engine-api/types/time" |
|
10 |
+) |
|
11 |
+ |
|
12 |
+var ( |
|
13 |
+ reTimestamp = `(?P<timestamp>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{9}(:?(:?(:?-|\+)\d{2}:\d{2})|Z))` |
|
14 |
+ reEventType = `(?P<eventType>\w+)` |
|
15 |
+ reAction = `(?P<action>\w+)` |
|
16 |
+ reID = `(?P<id>[^\s]+)` |
|
17 |
+ reAttributes = `(\s\((?P<attributes>[^\)]+)\))?` |
|
18 |
+ reString = fmt.Sprintf(`\A%s\s%s\s%s\s%s%s\z`, reTimestamp, reEventType, reAction, reID, reAttributes) |
|
19 |
+ |
|
20 |
+ // eventCliRegexp is a regular expression that matches all possible event outputs in the cli |
|
21 |
+ eventCliRegexp = regexp.MustCompile(reString) |
|
22 |
+) |
|
23 |
+ |
|
24 |
+// ScanMap turns an event string like the default ones formatted in the cli output |
|
25 |
+// and turns it into map. |
|
26 |
+func ScanMap(text string) map[string]string { |
|
27 |
+ matches := eventCliRegexp.FindAllStringSubmatch(text, -1) |
|
28 |
+ md := map[string]string{} |
|
29 |
+ if len(matches) == 0 { |
|
30 |
+ return md |
|
31 |
+ } |
|
32 |
+ |
|
33 |
+ names := eventCliRegexp.SubexpNames() |
|
34 |
+ for i, n := range matches[0] { |
|
35 |
+ md[names[i]] = n |
|
36 |
+ } |
|
37 |
+ return md |
|
38 |
+} |
|
39 |
+ |
|
40 |
+// Scan turns an event string like the default ones formatted in the cli output |
|
41 |
+// and turns it into an event message. |
|
42 |
+func Scan(text string) (*events.Message, error) { |
|
43 |
+ md := ScanMap(text) |
|
44 |
+ if len(md) == 0 { |
|
45 |
+ return nil, fmt.Errorf("text is not an event: %s", text) |
|
46 |
+ } |
|
47 |
+ |
|
48 |
+ f, err := timetypes.GetTimestamp(md["timestamp"], time.Now()) |
|
49 |
+ if err != nil { |
|
50 |
+ return nil, err |
|
51 |
+ } |
|
52 |
+ |
|
53 |
+ t, tn, err := timetypes.ParseTimestamps(f, -1) |
|
54 |
+ if err != nil { |
|
55 |
+ return nil, err |
|
56 |
+ } |
|
57 |
+ |
|
58 |
+ attrs := make(map[string]string) |
|
59 |
+ for _, a := range strings.SplitN(md["attributes"], ", ", -1) { |
|
60 |
+ kv := strings.SplitN(a, "=", 2) |
|
61 |
+ attrs[kv[0]] = kv[1] |
|
62 |
+ } |
|
63 |
+ |
|
64 |
+ tu := time.Unix(t, tn) |
|
65 |
+ return &events.Message{ |
|
66 |
+ Time: t, |
|
67 |
+ TimeNano: tu.UnixNano(), |
|
68 |
+ Type: md["eventType"], |
|
69 |
+ Action: md["action"], |
|
70 |
+ Actor: events.Actor{ |
|
71 |
+ ID: md["id"], |
|
72 |
+ Attributes: attrs, |
|
73 |
+ }, |
|
74 |
+ }, nil |
|
75 |
+} |
... | ... |
@@ -12,6 +12,7 @@ import ( |
12 | 12 |
"sync" |
13 | 13 |
"time" |
14 | 14 |
|
15 |
+ "github.com/docker/docker/daemon/events/testutils" |
|
15 | 16 |
"github.com/docker/docker/pkg/integration/checker" |
16 | 17 |
"github.com/go-check/check" |
17 | 18 |
) |
... | ... |
@@ -152,7 +153,7 @@ func (s *DockerSuite) TestEventsContainerEventsAttrSort(c *check.C) { |
152 | 152 |
c.Assert(nEvents, checker.GreaterOrEqualThan, 3) //Missing expected event |
153 | 153 |
matchedEvents := 0 |
154 | 154 |
for _, event := range events { |
155 |
- matches := parseEventText(event) |
|
155 |
+ matches := eventstestutils.ScanMap(event) |
|
156 | 156 |
if matches["id"] != containerID { |
157 | 157 |
continue |
158 | 158 |
} |
... | ... |
@@ -201,7 +202,7 @@ func (s *DockerSuite) TestEventsImageTag(c *check.C) { |
201 | 201 |
c.Assert(events, checker.HasLen, 1, check.Commentf("was expecting 1 event. out=%s", out)) |
202 | 202 |
event := strings.TrimSpace(events[0]) |
203 | 203 |
|
204 |
- matches := parseEventText(event) |
|
204 |
+ matches := eventstestutils.ScanMap(event) |
|
205 | 205 |
c.Assert(matchEventID(matches, image), checker.True, check.Commentf("matches: %v\nout:\n%s", matches, out)) |
206 | 206 |
c.Assert(matches["action"], checker.Equals, "tag") |
207 | 207 |
} |
... | ... |
@@ -220,7 +221,7 @@ func (s *DockerSuite) TestEventsImagePull(c *check.C) { |
220 | 220 |
|
221 | 221 |
events := strings.Split(strings.TrimSpace(out), "\n") |
222 | 222 |
event := strings.TrimSpace(events[len(events)-1]) |
223 |
- matches := parseEventText(event) |
|
223 |
+ matches := eventstestutils.ScanMap(event) |
|
224 | 224 |
c.Assert(matches["id"], checker.Equals, "hello-world:latest") |
225 | 225 |
c.Assert(matches["action"], checker.Equals, "pull") |
226 | 226 |
|
... | ... |
@@ -245,7 +246,7 @@ func (s *DockerSuite) TestEventsImageImport(c *check.C) { |
245 | 245 |
out, _ = dockerCmd(c, "events", fmt.Sprintf("--since=%d", since), fmt.Sprintf("--until=%d", daemonTime(c).Unix()), "--filter", "event=import") |
246 | 246 |
events := strings.Split(strings.TrimSpace(out), "\n") |
247 | 247 |
c.Assert(events, checker.HasLen, 1) |
248 |
- matches := parseEventText(events[0]) |
|
248 |
+ matches := eventstestutils.ScanMap(events[0]) |
|
249 | 249 |
c.Assert(matches["id"], checker.Equals, imageRef, check.Commentf("matches: %v\nout:\n%s\n", matches, out)) |
250 | 250 |
c.Assert(matches["action"], checker.Equals, "import", check.Commentf("matches: %v\nout:\n%s\n", matches, out)) |
251 | 251 |
} |
... | ... |
@@ -370,7 +371,7 @@ func (s *DockerSuite) TestEventsFilterContainer(c *check.C) { |
370 | 370 |
return fmt.Errorf("expected 4 events, got %v", events) |
371 | 371 |
} |
372 | 372 |
for _, event := range events { |
373 |
- matches := parseEventText(event) |
|
373 |
+ matches := eventstestutils.ScanMap(event) |
|
374 | 374 |
if !matchEventID(matches, id) { |
375 | 375 |
return fmt.Errorf("expected event for container id %s: %s - parsed container id: %s", id, event, matches["id"]) |
376 | 376 |
} |
... | ... |
@@ -3,7 +3,6 @@ package main |
3 | 3 |
import ( |
4 | 4 |
"bufio" |
5 | 5 |
"bytes" |
6 |
- "fmt" |
|
7 | 6 |
"io" |
8 | 7 |
"os/exec" |
9 | 8 |
"regexp" |
... | ... |
@@ -11,22 +10,11 @@ import ( |
11 | 11 |
"strings" |
12 | 12 |
|
13 | 13 |
"github.com/Sirupsen/logrus" |
14 |
+ "github.com/docker/docker/daemon/events/testutils" |
|
14 | 15 |
"github.com/docker/docker/pkg/integration/checker" |
15 | 16 |
"github.com/go-check/check" |
16 | 17 |
) |
17 | 18 |
|
18 |
-var ( |
|
19 |
- reTimestamp = `\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{9}(:?(:?(:?-|\+)\d{2}:\d{2})|Z)` |
|
20 |
- reEventType = `(?P<eventType>\w+)` |
|
21 |
- reAction = `(?P<action>\w+)` |
|
22 |
- reID = `(?P<id>[^\s]+)` |
|
23 |
- reAttributes = `(\s\((?P<attributes>[^\)]+)\))?` |
|
24 |
- reString = fmt.Sprintf(`\A%s\s%s\s%s\s%s%s\z`, reTimestamp, reEventType, reAction, reID, reAttributes) |
|
25 |
- |
|
26 |
- // eventCliRegexp is a regular expression that matches all possible event outputs in the cli |
|
27 |
- eventCliRegexp = regexp.MustCompile(reString) |
|
28 |
-) |
|
29 |
- |
|
30 | 19 |
// eventMatcher is a function that tries to match an event input. |
31 | 20 |
// It returns true if the event matches and a map with |
32 | 21 |
// a set of key/value to identify the match. |
... | ... |
@@ -131,7 +119,7 @@ func (e *eventObserver) CheckEventError(c *check.C, id, event string, match even |
131 | 131 |
// It returns an empty map and false if there is no match. |
132 | 132 |
func matchEventLine(id, eventType string, actions map[string]chan bool) eventMatcher { |
133 | 133 |
return func(text string) (map[string]string, bool) { |
134 |
- matches := parseEventText(text) |
|
134 |
+ matches := eventstestutils.ScanMap(text) |
|
135 | 135 |
if len(matches) == 0 { |
136 | 136 |
return matches, false |
137 | 137 |
} |
... | ... |
@@ -154,26 +142,10 @@ func processEventMatch(actions map[string]chan bool) eventMatchProcessor { |
154 | 154 |
} |
155 | 155 |
} |
156 | 156 |
|
157 |
-// parseEventText parses a line of events coming from the cli and returns |
|
158 |
-// the matchers in a map. |
|
159 |
-func parseEventText(text string) map[string]string { |
|
160 |
- matches := eventCliRegexp.FindAllStringSubmatch(text, -1) |
|
161 |
- md := map[string]string{} |
|
162 |
- if len(matches) == 0 { |
|
163 |
- return md |
|
164 |
- } |
|
165 |
- |
|
166 |
- names := eventCliRegexp.SubexpNames() |
|
167 |
- for i, n := range matches[0] { |
|
168 |
- md[names[i]] = n |
|
169 |
- } |
|
170 |
- return md |
|
171 |
-} |
|
172 |
- |
|
173 | 157 |
// parseEventAction parses an event text and returns the action. |
174 | 158 |
// It fails if the text is not in the event format. |
175 | 159 |
func parseEventAction(c *check.C, text string) string { |
176 |
- matches := parseEventText(text) |
|
160 |
+ matches := eventstestutils.ScanMap(text) |
|
177 | 161 |
return matches["action"] |
178 | 162 |
} |
179 | 163 |
|
... | ... |
@@ -182,7 +154,7 @@ func parseEventAction(c *check.C, text string) string { |
182 | 182 |
func eventActionsByIDAndType(c *check.C, events []string, id, eventType string) []string { |
183 | 183 |
var filtered []string |
184 | 184 |
for _, event := range events { |
185 |
- matches := parseEventText(event) |
|
185 |
+ matches := eventstestutils.ScanMap(event) |
|
186 | 186 |
c.Assert(matches, checker.Not(checker.IsNil)) |
187 | 187 |
if matchIDAndEventType(matches, id, eventType) { |
188 | 188 |
filtered = append(filtered, matches["action"]) |
... | ... |
@@ -214,7 +186,7 @@ func matchEventID(matches map[string]string, id string) bool { |
214 | 214 |
func parseEvents(c *check.C, out, match string) { |
215 | 215 |
events := strings.Split(strings.TrimSpace(out), "\n") |
216 | 216 |
for _, event := range events { |
217 |
- matches := parseEventText(event) |
|
217 |
+ matches := eventstestutils.ScanMap(event) |
|
218 | 218 |
matched, err := regexp.MatchString(match, matches["action"]) |
219 | 219 |
c.Assert(err, checker.IsNil) |
220 | 220 |
c.Assert(matched, checker.True, check.Commentf("Matcher: %s did not match %s", match, matches["action"])) |
... | ... |
@@ -224,7 +196,7 @@ func parseEvents(c *check.C, out, match string) { |
224 | 224 |
func parseEventsWithID(c *check.C, out, match, id string) { |
225 | 225 |
events := strings.Split(strings.TrimSpace(out), "\n") |
226 | 226 |
for _, event := range events { |
227 |
- matches := parseEventText(event) |
|
227 |
+ matches := eventstestutils.ScanMap(event) |
|
228 | 228 |
c.Assert(matchEventID(matches, id), checker.True) |
229 | 229 |
|
230 | 230 |
matched, err := regexp.MatchString(match, matches["action"]) |