Browse code

Merge pull request #12066 from LK4D4/split_events

Remove engine usage from events

Alexander Morozov authored on 2015/04/09 00:29:24
Showing 16 changed files
... ...
@@ -3,6 +3,7 @@ package server
3 3
 import (
4 4
 	"bufio"
5 5
 	"bytes"
6
+	"time"
6 7
 
7 8
 	"encoding/base64"
8 9
 	"encoding/json"
... ...
@@ -23,7 +24,9 @@ import (
23 23
 	"github.com/docker/docker/daemon"
24 24
 	"github.com/docker/docker/daemon/networkdriver/bridge"
25 25
 	"github.com/docker/docker/engine"
26
+	"github.com/docker/docker/pkg/jsonmessage"
26 27
 	"github.com/docker/docker/pkg/parsers"
28
+	"github.com/docker/docker/pkg/parsers/filters"
27 29
 	"github.com/docker/docker/pkg/stdcopy"
28 30
 	"github.com/docker/docker/pkg/streamformatter"
29 31
 	"github.com/docker/docker/pkg/version"
... ...
@@ -324,13 +327,104 @@ func getEvents(eng *engine.Engine, version version.Version, w http.ResponseWrite
324 324
 	if err := parseForm(r); err != nil {
325 325
 		return err
326 326
 	}
327
+	var since int64 = -1
328
+	if r.Form.Get("since") != "" {
329
+		s, err := strconv.ParseInt(r.Form.Get("since"), 10, 64)
330
+		if err != nil {
331
+			return err
332
+		}
333
+		since = s
334
+	}
327 335
 
328
-	var job = eng.Job("events")
329
-	streamJSON(job, w, true)
330
-	job.Setenv("since", r.Form.Get("since"))
331
-	job.Setenv("until", r.Form.Get("until"))
332
-	job.Setenv("filters", r.Form.Get("filters"))
333
-	return job.Run()
336
+	var until int64 = -1
337
+	if r.Form.Get("until") != "" {
338
+		u, err := strconv.ParseInt(r.Form.Get("until"), 10, 64)
339
+		if err != nil {
340
+			return err
341
+		}
342
+		until = u
343
+	}
344
+	timer := time.NewTimer(0)
345
+	timer.Stop()
346
+	if until > 0 {
347
+		dur := time.Unix(until, 0).Sub(time.Now())
348
+		timer = time.NewTimer(dur)
349
+	}
350
+
351
+	ef, err := filters.FromParam(r.Form.Get("filters"))
352
+	if err != nil {
353
+		return err
354
+	}
355
+
356
+	isFiltered := func(field string, filter []string) bool {
357
+		if len(filter) == 0 {
358
+			return false
359
+		}
360
+		for _, v := range filter {
361
+			if v == field {
362
+				return false
363
+			}
364
+			if strings.Contains(field, ":") {
365
+				image := strings.Split(field, ":")
366
+				if image[0] == v {
367
+					return false
368
+				}
369
+			}
370
+		}
371
+		return true
372
+	}
373
+
374
+	d := getDaemon(eng)
375
+	es := d.EventsService
376
+	w.Header().Set("Content-Type", "application/json")
377
+	enc := json.NewEncoder(utils.NewWriteFlusher(w))
378
+
379
+	getContainerId := func(cn string) string {
380
+		c, err := d.Get(cn)
381
+		if err != nil {
382
+			return ""
383
+		}
384
+		return c.ID
385
+	}
386
+
387
+	sendEvent := func(ev *jsonmessage.JSONMessage) error {
388
+		//incoming container filter can be name,id or partial id, convert and replace as a full container id
389
+		for i, cn := range ef["container"] {
390
+			ef["container"][i] = getContainerId(cn)
391
+		}
392
+
393
+		if isFiltered(ev.Status, ef["event"]) || isFiltered(ev.From, ef["image"]) ||
394
+			isFiltered(ev.ID, ef["container"]) {
395
+			return nil
396
+		}
397
+
398
+		return enc.Encode(ev)
399
+	}
400
+
401
+	current, l := es.Subscribe()
402
+	defer es.Evict(l)
403
+	for _, ev := range current {
404
+		if ev.Time < since {
405
+			continue
406
+		}
407
+		if err := sendEvent(ev); err != nil {
408
+			return err
409
+		}
410
+	}
411
+	for {
412
+		select {
413
+		case ev := <-l:
414
+			jev, ok := ev.(*jsonmessage.JSONMessage)
415
+			if !ok {
416
+				continue
417
+			}
418
+			if err := sendEvent(jev); err != nil {
419
+				return err
420
+			}
421
+		case <-timer.C:
422
+			return nil
423
+		}
424
+	}
334 425
 }
335 426
 
336 427
 func getImagesHistory(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
... ...
@@ -252,47 +252,6 @@ func TestGetContainersByName(t *testing.T) {
252 252
 	}
253 253
 }
254 254
 
255
-func TestGetEvents(t *testing.T) {
256
-	eng := engine.New()
257
-	var called bool
258
-	eng.Register("events", func(job *engine.Job) error {
259
-		called = true
260
-		since := job.Getenv("since")
261
-		if since != "1" {
262
-			t.Fatalf("'since' should be 1, found %#v instead", since)
263
-		}
264
-		until := job.Getenv("until")
265
-		if until != "0" {
266
-			t.Fatalf("'until' should be 0, found %#v instead", until)
267
-		}
268
-		v := &engine.Env{}
269
-		v.Set("since", since)
270
-		v.Set("until", until)
271
-		if _, err := v.WriteTo(job.Stdout); err != nil {
272
-			return err
273
-		}
274
-		return nil
275
-	})
276
-	r := serveRequest("GET", "/events?since=1&until=0", nil, eng, t)
277
-	if !called {
278
-		t.Fatal("handler was not called")
279
-	}
280
-	assertContentType(r, "application/json", t)
281
-	var stdoutJSON struct {
282
-		Since int
283
-		Until int
284
-	}
285
-	if err := json.Unmarshal(r.Body.Bytes(), &stdoutJSON); err != nil {
286
-		t.Fatal(err)
287
-	}
288
-	if stdoutJSON.Since != 1 {
289
-		t.Errorf("since != 1: %#v", stdoutJSON.Since)
290
-	}
291
-	if stdoutJSON.Until != 0 {
292
-		t.Errorf("until != 0: %#v", stdoutJSON.Until)
293
-	}
294
-}
295
-
296 255
 func TestLogs(t *testing.T) {
297 256
 	eng := engine.New()
298 257
 	var inspect bool
... ...
@@ -8,7 +8,6 @@ import (
8 8
 	"github.com/docker/docker/autogen/dockerversion"
9 9
 	"github.com/docker/docker/daemon/networkdriver/bridge"
10 10
 	"github.com/docker/docker/engine"
11
-	"github.com/docker/docker/events"
12 11
 	"github.com/docker/docker/pkg/parsers/kernel"
13 12
 )
14 13
 
... ...
@@ -19,9 +18,6 @@ func Register(eng *engine.Engine) error {
19 19
 	if err := remote(eng); err != nil {
20 20
 		return err
21 21
 	}
22
-	if err := events.New().Install(eng); err != nil {
23
-		return err
24
-	}
25 22
 	if err := eng.Register("version", dockerVersion); err != nil {
26 23
 		return err
27 24
 	}
... ...
@@ -200,9 +200,11 @@ func (container *Container) WriteHostConfig() error {
200 200
 
201 201
 func (container *Container) LogEvent(action string) {
202 202
 	d := container.daemon
203
-	if err := d.eng.Job("log", action, container.ID, d.Repositories().ImageName(container.ImageID)).Run(); err != nil {
204
-		logrus.Errorf("Error logging event %s for %s: %s", action, container.ID, err)
205
-	}
203
+	d.EventsService.Log(
204
+		action,
205
+		container.ID,
206
+		d.Repositories().ImageName(container.ImageID),
207
+	)
206 208
 }
207 209
 
208 210
 func (container *Container) getResourcePath(path string) (string, error) {
... ...
@@ -19,6 +19,7 @@ import (
19 19
 	"github.com/Sirupsen/logrus"
20 20
 	"github.com/docker/docker/api"
21 21
 	"github.com/docker/docker/autogen/dockerversion"
22
+	"github.com/docker/docker/daemon/events"
22 23
 	"github.com/docker/docker/daemon/execdriver"
23 24
 	"github.com/docker/docker/daemon/execdriver/execdrivers"
24 25
 	"github.com/docker/docker/daemon/execdriver/lxc"
... ...
@@ -109,6 +110,7 @@ type Daemon struct {
109 109
 	statsCollector   *statsCollector
110 110
 	defaultLogConfig runconfig.LogConfig
111 111
 	RegistryService  *registry.Service
112
+	EventsService    *events.Events
112 113
 }
113 114
 
114 115
 // Install installs daemon capabilities to eng.
... ...
@@ -930,8 +932,9 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService
930 930
 		return nil, err
931 931
 	}
932 932
 
933
+	eventsService := events.New()
933 934
 	logrus.Debug("Creating repository list")
934
-	repositories, err := graph.NewTagStore(path.Join(config.Root, "repositories-"+driver.String()), g, trustKey, registryService)
935
+	repositories, err := graph.NewTagStore(path.Join(config.Root, "repositories-"+driver.String()), g, trustKey, registryService, eventsService)
935 936
 	if err != nil {
936 937
 		return nil, fmt.Errorf("Couldn't create Tag store: %s", err)
937 938
 	}
... ...
@@ -1023,6 +1026,7 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService
1023 1023
 		statsCollector:   newStatsCollector(1 * time.Second),
1024 1024
 		defaultLogConfig: config.LogConfig,
1025 1025
 		RegistryService:  registryService,
1026
+		EventsService:    eventsService,
1026 1027
 	}
1027 1028
 
1028 1029
 	eng.OnShutdown(func() {
1029 1030
new file mode 100644
... ...
@@ -0,0 +1,66 @@
0
+package events
1
+
2
+import (
3
+	"sync"
4
+	"time"
5
+
6
+	"github.com/docker/docker/pkg/jsonmessage"
7
+	"github.com/docker/docker/pkg/pubsub"
8
+)
9
+
10
+const eventsLimit = 64
11
+
12
+// Events is pubsub channel for *jsonmessage.JSONMessage
13
+type Events struct {
14
+	mu     sync.Mutex
15
+	events []*jsonmessage.JSONMessage
16
+	pub    *pubsub.Publisher
17
+}
18
+
19
+// New returns new *Events instance
20
+func New() *Events {
21
+	return &Events{
22
+		events: make([]*jsonmessage.JSONMessage, 0, eventsLimit),
23
+		pub:    pubsub.NewPublisher(100*time.Millisecond, 1024),
24
+	}
25
+}
26
+
27
+// Subscribe adds new listener to events, returns slice of 64 stored last events
28
+// channel in which you can expect new events in form of interface{}, so you
29
+// need type assertion.
30
+func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}) {
31
+	e.mu.Lock()
32
+	current := make([]*jsonmessage.JSONMessage, len(e.events))
33
+	copy(current, e.events)
34
+	l := e.pub.Subscribe()
35
+	e.mu.Unlock()
36
+	return current, l
37
+}
38
+
39
+// Evict evicts listener from pubsub
40
+func (e *Events) Evict(l chan interface{}) {
41
+	e.pub.Evict(l)
42
+}
43
+
44
+// Log broadcasts event to listeners. Each listener has 100 millisecond for
45
+// receiving event or it will be skipped.
46
+func (e *Events) Log(action, id, from string) {
47
+	go func() {
48
+		e.mu.Lock()
49
+		jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: time.Now().UTC().Unix()}
50
+		if len(e.events) == cap(e.events) {
51
+			// discard oldest event
52
+			copy(e.events, e.events[1:])
53
+			e.events[len(e.events)-1] = jm
54
+		} else {
55
+			e.events = append(e.events, jm)
56
+		}
57
+		e.mu.Unlock()
58
+		e.pub.Publish(jm)
59
+	}()
60
+}
61
+
62
+// SubscribersCount returns number of event listeners
63
+func (e *Events) SubscribersCount() int {
64
+	return e.pub.Len()
65
+}
0 66
new file mode 100644
... ...
@@ -0,0 +1,135 @@
0
+package events
1
+
2
+import (
3
+	"fmt"
4
+	"testing"
5
+	"time"
6
+
7
+	"github.com/docker/docker/pkg/jsonmessage"
8
+)
9
+
10
+func TestEventsLog(t *testing.T) {
11
+	e := New()
12
+	_, l1 := e.Subscribe()
13
+	_, l2 := e.Subscribe()
14
+	defer e.Evict(l1)
15
+	defer e.Evict(l2)
16
+	count := e.SubscribersCount()
17
+	if count != 2 {
18
+		t.Fatalf("Must be 2 subscribers, got %d", count)
19
+	}
20
+	e.Log("test", "cont", "image")
21
+	select {
22
+	case msg := <-l1:
23
+		jmsg, ok := msg.(*jsonmessage.JSONMessage)
24
+		if !ok {
25
+			t.Fatalf("Unexpected type %T", msg)
26
+		}
27
+		if len(e.events) != 1 {
28
+			t.Fatalf("Must be only one event, got %d", len(e.events))
29
+		}
30
+		if jmsg.Status != "test" {
31
+			t.Fatalf("Status should be test, got %s", jmsg.Status)
32
+		}
33
+		if jmsg.ID != "cont" {
34
+			t.Fatalf("ID should be cont, got %s", jmsg.ID)
35
+		}
36
+		if jmsg.From != "image" {
37
+			t.Fatalf("From should be image, got %s", jmsg.From)
38
+		}
39
+	case <-time.After(1 * time.Second):
40
+		t.Fatal("Timeout waiting for broadcasted message")
41
+	}
42
+	select {
43
+	case msg := <-l2:
44
+		jmsg, ok := msg.(*jsonmessage.JSONMessage)
45
+		if !ok {
46
+			t.Fatalf("Unexpected type %T", msg)
47
+		}
48
+		if len(e.events) != 1 {
49
+			t.Fatalf("Must be only one event, got %d", len(e.events))
50
+		}
51
+		if jmsg.Status != "test" {
52
+			t.Fatalf("Status should be test, got %s", jmsg.Status)
53
+		}
54
+		if jmsg.ID != "cont" {
55
+			t.Fatalf("ID should be cont, got %s", jmsg.ID)
56
+		}
57
+		if jmsg.From != "image" {
58
+			t.Fatalf("From should be image, got %s", jmsg.From)
59
+		}
60
+	case <-time.After(1 * time.Second):
61
+		t.Fatal("Timeout waiting for broadcasted message")
62
+	}
63
+}
64
+
65
+func TestEventsLogTimeout(t *testing.T) {
66
+	e := New()
67
+	_, l := e.Subscribe()
68
+	defer e.Evict(l)
69
+
70
+	c := make(chan struct{})
71
+	go func() {
72
+		e.Log("test", "cont", "image")
73
+		close(c)
74
+	}()
75
+
76
+	select {
77
+	case <-c:
78
+	case <-time.After(time.Second):
79
+		t.Fatal("Timeout publishing message")
80
+	}
81
+}
82
+
83
+func TestLogEvents(t *testing.T) {
84
+	e := New()
85
+
86
+	for i := 0; i < eventsLimit+16; i++ {
87
+		action := fmt.Sprintf("action_%d", i)
88
+		id := fmt.Sprintf("cont_%d", i)
89
+		from := fmt.Sprintf("image_%d", i)
90
+		e.Log(action, id, from)
91
+	}
92
+	time.Sleep(50 * time.Millisecond)
93
+	current, l := e.Subscribe()
94
+	for i := 0; i < 10; i++ {
95
+		num := i + eventsLimit + 16
96
+		action := fmt.Sprintf("action_%d", num)
97
+		id := fmt.Sprintf("cont_%d", num)
98
+		from := fmt.Sprintf("image_%d", num)
99
+		e.Log(action, id, from)
100
+	}
101
+	if len(e.events) != eventsLimit {
102
+		t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events))
103
+	}
104
+
105
+	var msgs []*jsonmessage.JSONMessage
106
+	for len(msgs) < 10 {
107
+		m := <-l
108
+		jm, ok := (m).(*jsonmessage.JSONMessage)
109
+		if !ok {
110
+			t.Fatalf("Unexpected type %T", m)
111
+		}
112
+		msgs = append(msgs, jm)
113
+	}
114
+	if len(current) != eventsLimit {
115
+		t.Fatalf("Must be %d events, got %d", eventsLimit, len(current))
116
+	}
117
+	first := current[0]
118
+	if first.Status != "action_16" {
119
+		t.Fatalf("First action is %s, must be action_16", first.Status)
120
+	}
121
+	last := current[len(current)-1]
122
+	if last.Status != "action_79" {
123
+		t.Fatalf("Last action is %s, must be action_79", last.Status)
124
+	}
125
+
126
+	firstC := msgs[0]
127
+	if firstC.Status != "action_80" {
128
+		t.Fatalf("First action is %s, must be action_80", firstC.Status)
129
+	}
130
+	lastC := msgs[len(msgs)-1]
131
+	if lastC.Status != "action_89" {
132
+		t.Fatalf("Last action is %s, must be action_89", lastC.Status)
133
+	}
134
+}
... ...
@@ -108,7 +108,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, list *[]types
108 108
 			*list = append(*list, types.ImageDelete{
109 109
 				Untagged: utils.ImageReference(repoName, tag),
110 110
 			})
111
-			eng.Job("log", "untag", img.ID, "").Run()
111
+			daemon.EventsService.Log("untag", img.ID, "")
112 112
 		}
113 113
 	}
114 114
 	tags = daemon.Repositories().ByID()[img.ID]
... ...
@@ -123,6 +123,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, list *[]types
123 123
 			*list = append(*list, types.ImageDelete{
124 124
 				Deleted: img.ID,
125 125
 			})
126
+			daemon.EventsService.Log("delete", img.ID, "")
126 127
 			eng.Job("log", "delete", img.ID, "").Run()
127 128
 			if img.Parent != "" && !noprune {
128 129
 				err := daemon.DeleteImage(eng, img.Parent, list, false, force, noprune)
... ...
@@ -51,11 +51,6 @@ func (daemon *Daemon) CmdInfo(job *engine.Job) error {
51 51
 		initPath = daemon.SystemInitPath()
52 52
 	}
53 53
 
54
-	cjob := job.Eng.Job("subscribers_count")
55
-	env, _ := cjob.Stdout.AddEnv()
56
-	if err := cjob.Run(); err != nil {
57
-		return err
58
-	}
59 54
 	v := &engine.Env{}
60 55
 	v.SetJson("ID", daemon.ID)
61 56
 	v.SetInt("Containers", len(daemon.List()))
... ...
@@ -71,7 +66,7 @@ func (daemon *Daemon) CmdInfo(job *engine.Job) error {
71 71
 	v.Set("SystemTime", time.Now().Format(time.RFC3339Nano))
72 72
 	v.Set("ExecutionDriver", daemon.ExecutionDriver().Name())
73 73
 	v.Set("LoggingDriver", daemon.defaultLogConfig.Type)
74
-	v.SetInt("NEventsListener", env.GetInt("count"))
74
+	v.SetInt("NEventsListener", daemon.EventsService.SubscribersCount())
75 75
 	v.Set("KernelVersion", kernelVersion)
76 76
 	v.Set("OperatingSystem", operatingSystem)
77 77
 	v.Set("IndexServerAddress", registry.IndexServerAddress())
78 78
deleted file mode 100644
... ...
@@ -1,231 +0,0 @@
1
-package events
2
-
3
-import (
4
-	"bytes"
5
-	"encoding/json"
6
-	"fmt"
7
-	"io"
8
-	"strings"
9
-	"sync"
10
-	"time"
11
-
12
-	"github.com/docker/docker/engine"
13
-	"github.com/docker/docker/pkg/jsonmessage"
14
-	"github.com/docker/docker/pkg/parsers/filters"
15
-)
16
-
17
-const eventsLimit = 64
18
-
19
-type listener chan<- *jsonmessage.JSONMessage
20
-
21
-type Events struct {
22
-	mu          sync.RWMutex
23
-	events      []*jsonmessage.JSONMessage
24
-	subscribers []listener
25
-}
26
-
27
-func New() *Events {
28
-	return &Events{
29
-		events: make([]*jsonmessage.JSONMessage, 0, eventsLimit),
30
-	}
31
-}
32
-
33
-// Install installs events public api in docker engine
34
-func (e *Events) Install(eng *engine.Engine) error {
35
-	// Here you should describe public interface
36
-	jobs := map[string]engine.Handler{
37
-		"events":            e.Get,
38
-		"log":               e.Log,
39
-		"subscribers_count": e.SubscribersCount,
40
-	}
41
-	for name, job := range jobs {
42
-		if err := eng.Register(name, job); err != nil {
43
-			return err
44
-		}
45
-	}
46
-	return nil
47
-}
48
-
49
-func (e *Events) Get(job *engine.Job) error {
50
-	var (
51
-		since   = job.GetenvInt64("since")
52
-		until   = job.GetenvInt64("until")
53
-		timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now()))
54
-	)
55
-
56
-	eventFilters, err := filters.FromParam(job.Getenv("filters"))
57
-	if err != nil {
58
-		return err
59
-	}
60
-
61
-	// If no until, disable timeout
62
-	if job.Getenv("until") == "" {
63
-		timeout.Stop()
64
-	}
65
-
66
-	listener := make(chan *jsonmessage.JSONMessage)
67
-	e.subscribe(listener)
68
-	defer e.unsubscribe(listener)
69
-
70
-	job.Stdout.Write(nil)
71
-
72
-	// Resend every event in the [since, until] time interval.
73
-	if job.Getenv("since") != "" {
74
-		if err := e.writeCurrent(job, since, until, eventFilters); err != nil {
75
-			return err
76
-		}
77
-	}
78
-
79
-	for {
80
-		select {
81
-		case event, ok := <-listener:
82
-			if !ok {
83
-				return nil
84
-			}
85
-			if err := writeEvent(job, event, eventFilters); err != nil {
86
-				return err
87
-			}
88
-		case <-timeout.C:
89
-			return nil
90
-		}
91
-	}
92
-}
93
-
94
-func (e *Events) Log(job *engine.Job) error {
95
-	if len(job.Args) != 3 {
96
-		return fmt.Errorf("usage: %s ACTION ID FROM", job.Name)
97
-	}
98
-	// not waiting for receivers
99
-	go e.log(job.Args[0], job.Args[1], job.Args[2])
100
-	return nil
101
-}
102
-
103
-func (e *Events) SubscribersCount(job *engine.Job) error {
104
-	ret := &engine.Env{}
105
-	ret.SetInt("count", e.subscribersCount())
106
-	ret.WriteTo(job.Stdout)
107
-	return nil
108
-}
109
-
110
-func writeEvent(job *engine.Job, event *jsonmessage.JSONMessage, eventFilters filters.Args) error {
111
-	isFiltered := func(field string, filter []string) bool {
112
-		if len(filter) == 0 {
113
-			return false
114
-		}
115
-		for _, v := range filter {
116
-			if v == field {
117
-				return false
118
-			}
119
-			if strings.Contains(field, ":") {
120
-				image := strings.Split(field, ":")
121
-				if image[0] == v {
122
-					return false
123
-				}
124
-			}
125
-		}
126
-		return true
127
-	}
128
-
129
-	//incoming container filter can be name,id or partial id, convert and replace as a full container id
130
-	for i, cn := range eventFilters["container"] {
131
-		eventFilters["container"][i] = GetContainerId(job.Eng, cn)
132
-	}
133
-
134
-	if isFiltered(event.Status, eventFilters["event"]) || isFiltered(event.From, eventFilters["image"]) ||
135
-		isFiltered(event.ID, eventFilters["container"]) {
136
-		return nil
137
-	}
138
-
139
-	// When sending an event JSON serialization errors are ignored, but all
140
-	// other errors lead to the eviction of the listener.
141
-	if b, err := json.Marshal(event); err == nil {
142
-		if _, err = job.Stdout.Write(b); err != nil {
143
-			return err
144
-		}
145
-	}
146
-	return nil
147
-}
148
-
149
-func (e *Events) writeCurrent(job *engine.Job, since, until int64, eventFilters filters.Args) error {
150
-	e.mu.RLock()
151
-	for _, event := range e.events {
152
-		if event.Time >= since && (event.Time <= until || until == 0) {
153
-			if err := writeEvent(job, event, eventFilters); err != nil {
154
-				e.mu.RUnlock()
155
-				return err
156
-			}
157
-		}
158
-	}
159
-	e.mu.RUnlock()
160
-	return nil
161
-}
162
-
163
-func (e *Events) subscribersCount() int {
164
-	e.mu.RLock()
165
-	c := len(e.subscribers)
166
-	e.mu.RUnlock()
167
-	return c
168
-}
169
-
170
-func (e *Events) log(action, id, from string) {
171
-	e.mu.Lock()
172
-	now := time.Now().UTC().Unix()
173
-	jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: now}
174
-	if len(e.events) == cap(e.events) {
175
-		// discard oldest event
176
-		copy(e.events, e.events[1:])
177
-		e.events[len(e.events)-1] = jm
178
-	} else {
179
-		e.events = append(e.events, jm)
180
-	}
181
-	for _, s := range e.subscribers {
182
-		// We give each subscriber a 100ms time window to receive the event,
183
-		// after which we move to the next.
184
-		select {
185
-		case s <- jm:
186
-		case <-time.After(100 * time.Millisecond):
187
-		}
188
-	}
189
-	e.mu.Unlock()
190
-}
191
-
192
-func (e *Events) subscribe(l listener) {
193
-	e.mu.Lock()
194
-	e.subscribers = append(e.subscribers, l)
195
-	e.mu.Unlock()
196
-}
197
-
198
-// unsubscribe closes and removes the specified listener from the list of
199
-// previously registed ones.
200
-// It returns a boolean value indicating if the listener was successfully
201
-// found, closed and unregistered.
202
-func (e *Events) unsubscribe(l listener) bool {
203
-	e.mu.Lock()
204
-	for i, subscriber := range e.subscribers {
205
-		if subscriber == l {
206
-			close(l)
207
-			e.subscribers = append(e.subscribers[:i], e.subscribers[i+1:]...)
208
-			e.mu.Unlock()
209
-			return true
210
-		}
211
-	}
212
-	e.mu.Unlock()
213
-	return false
214
-}
215
-
216
-func GetContainerId(eng *engine.Engine, name string) string {
217
-	var buf bytes.Buffer
218
-	job := eng.Job("container_inspect", name)
219
-
220
-	var outStream io.Writer
221
-
222
-	outStream = &buf
223
-	job.Stdout.Set(outStream)
224
-
225
-	if err := job.Run(); err != nil {
226
-		return ""
227
-	}
228
-	var out struct{ ID string }
229
-	json.NewDecoder(&buf).Decode(&out)
230
-	return out.ID
231
-}
232 1
deleted file mode 100644
... ...
@@ -1,154 +0,0 @@
1
-package events
2
-
3
-import (
4
-	"bytes"
5
-	"encoding/json"
6
-	"fmt"
7
-	"io"
8
-	"testing"
9
-	"time"
10
-
11
-	"github.com/docker/docker/engine"
12
-	"github.com/docker/docker/pkg/jsonmessage"
13
-)
14
-
15
-func TestEventsPublish(t *testing.T) {
16
-	e := New()
17
-	l1 := make(chan *jsonmessage.JSONMessage)
18
-	l2 := make(chan *jsonmessage.JSONMessage)
19
-	e.subscribe(l1)
20
-	e.subscribe(l2)
21
-	count := e.subscribersCount()
22
-	if count != 2 {
23
-		t.Fatalf("Must be 2 subscribers, got %d", count)
24
-	}
25
-	go e.log("test", "cont", "image")
26
-	select {
27
-	case msg := <-l1:
28
-		if len(e.events) != 1 {
29
-			t.Fatalf("Must be only one event, got %d", len(e.events))
30
-		}
31
-		if msg.Status != "test" {
32
-			t.Fatalf("Status should be test, got %s", msg.Status)
33
-		}
34
-		if msg.ID != "cont" {
35
-			t.Fatalf("ID should be cont, got %s", msg.ID)
36
-		}
37
-		if msg.From != "image" {
38
-			t.Fatalf("From should be image, got %s", msg.From)
39
-		}
40
-	case <-time.After(1 * time.Second):
41
-		t.Fatal("Timeout waiting for broadcasted message")
42
-	}
43
-	select {
44
-	case msg := <-l2:
45
-		if len(e.events) != 1 {
46
-			t.Fatalf("Must be only one event, got %d", len(e.events))
47
-		}
48
-		if msg.Status != "test" {
49
-			t.Fatalf("Status should be test, got %s", msg.Status)
50
-		}
51
-		if msg.ID != "cont" {
52
-			t.Fatalf("ID should be cont, got %s", msg.ID)
53
-		}
54
-		if msg.From != "image" {
55
-			t.Fatalf("From should be image, got %s", msg.From)
56
-		}
57
-	case <-time.After(1 * time.Second):
58
-		t.Fatal("Timeout waiting for broadcasted message")
59
-	}
60
-}
61
-
62
-func TestEventsPublishTimeout(t *testing.T) {
63
-	e := New()
64
-	l := make(chan *jsonmessage.JSONMessage)
65
-	e.subscribe(l)
66
-
67
-	c := make(chan struct{})
68
-	go func() {
69
-		e.log("test", "cont", "image")
70
-		close(c)
71
-	}()
72
-
73
-	select {
74
-	case <-c:
75
-	case <-time.After(time.Second):
76
-		t.Fatal("Timeout publishing message")
77
-	}
78
-}
79
-
80
-func TestLogEvents(t *testing.T) {
81
-	e := New()
82
-	eng := engine.New()
83
-	if err := e.Install(eng); err != nil {
84
-		t.Fatal(err)
85
-	}
86
-
87
-	for i := 0; i < eventsLimit+16; i++ {
88
-		action := fmt.Sprintf("action_%d", i)
89
-		id := fmt.Sprintf("cont_%d", i)
90
-		from := fmt.Sprintf("image_%d", i)
91
-		job := eng.Job("log", action, id, from)
92
-		if err := job.Run(); err != nil {
93
-			t.Fatal(err)
94
-		}
95
-	}
96
-	time.Sleep(50 * time.Millisecond)
97
-	if len(e.events) != eventsLimit {
98
-		t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events))
99
-	}
100
-
101
-	job := eng.Job("events")
102
-	job.SetenvInt64("since", 1)
103
-	job.SetenvInt64("until", time.Now().Unix())
104
-	buf := bytes.NewBuffer(nil)
105
-	job.Stdout.Add(buf)
106
-	if err := job.Run(); err != nil {
107
-		t.Fatal(err)
108
-	}
109
-	buf = bytes.NewBuffer(buf.Bytes())
110
-	dec := json.NewDecoder(buf)
111
-	var msgs []jsonmessage.JSONMessage
112
-	for {
113
-		var jm jsonmessage.JSONMessage
114
-		if err := dec.Decode(&jm); err != nil {
115
-			if err == io.EOF {
116
-				break
117
-			}
118
-			t.Fatal(err)
119
-		}
120
-		msgs = append(msgs, jm)
121
-	}
122
-	if len(msgs) != eventsLimit {
123
-		t.Fatalf("Must be %d events, got %d", eventsLimit, len(msgs))
124
-	}
125
-	first := msgs[0]
126
-	if first.Status != "action_16" {
127
-		t.Fatalf("First action is %s, must be action_15", first.Status)
128
-	}
129
-	last := msgs[len(msgs)-1]
130
-	if last.Status != "action_79" {
131
-		t.Fatalf("First action is %s, must be action_79", first.Status)
132
-	}
133
-}
134
-
135
-func TestEventsCountJob(t *testing.T) {
136
-	e := New()
137
-	eng := engine.New()
138
-	if err := e.Install(eng); err != nil {
139
-		t.Fatal(err)
140
-	}
141
-	l1 := make(chan *jsonmessage.JSONMessage)
142
-	l2 := make(chan *jsonmessage.JSONMessage)
143
-	e.subscribe(l1)
144
-	e.subscribe(l2)
145
-	job := eng.Job("subscribers_count")
146
-	env, _ := job.Stdout.AddEnv()
147
-	if err := job.Run(); err != nil {
148
-		t.Fatal(err)
149
-	}
150
-	count := env.GetInt("count")
151
-	if count != 2 {
152
-		t.Fatalf("There must be 2 subscribers, got %d", count)
153
-	}
154
-}
... ...
@@ -7,7 +7,6 @@ import (
7 7
 	"net/http"
8 8
 	"net/url"
9 9
 
10
-	"github.com/Sirupsen/logrus"
11 10
 	"github.com/docker/docker/engine"
12 11
 	"github.com/docker/docker/pkg/archive"
13 12
 	"github.com/docker/docker/pkg/progressreader"
... ...
@@ -92,8 +91,7 @@ func (s *TagStore) CmdImport(job *engine.Job) error {
92 92
 	if tag != "" {
93 93
 		logID = utils.ImageReference(logID, tag)
94 94
 	}
95
-	if err = job.Eng.Job("log", "import", logID, "").Run(); err != nil {
96
-		logrus.Errorf("Error logging event 'import' for %s: %s", logID, err)
97
-	}
95
+
96
+	s.eventsService.Log("import", logID, "")
98 97
 	return nil
99 98
 }
... ...
@@ -85,9 +85,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
85 85
 
86 86
 		logrus.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName)
87 87
 		if err := s.pullV2Repository(job.Eng, r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel")); err == nil {
88
-			if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil {
89
-				logrus.Errorf("Error logging event 'pull' for %s: %s", logName, err)
90
-			}
88
+			s.eventsService.Log("pull", logName, "")
91 89
 			return nil
92 90
 		} else if err != registry.ErrDoesNotExist && err != ErrV2RegistryUnavailable {
93 91
 			logrus.Errorf("Error from V2 registry: %s", err)
... ...
@@ -101,9 +99,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
101 101
 		return err
102 102
 	}
103 103
 
104
-	if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil {
105
-		logrus.Errorf("Error logging event 'pull' for %s: %s", logName, err)
106
-	}
104
+	s.eventsService.Log("pull", logName, "")
107 105
 
108 106
 	return nil
109 107
 }
... ...
@@ -13,6 +13,7 @@ import (
13 13
 	"strings"
14 14
 	"sync"
15 15
 
16
+	"github.com/docker/docker/daemon/events"
16 17
 	"github.com/docker/docker/image"
17 18
 	"github.com/docker/docker/pkg/parsers"
18 19
 	"github.com/docker/docker/pkg/stringid"
... ...
@@ -40,6 +41,7 @@ type TagStore struct {
40 40
 	pullingPool     map[string]chan struct{}
41 41
 	pushingPool     map[string]chan struct{}
42 42
 	registryService *registry.Service
43
+	eventsService   *events.Events
43 44
 }
44 45
 
45 46
 type Repository map[string]string
... ...
@@ -62,7 +64,7 @@ func (r Repository) Contains(u Repository) bool {
62 62
 	return true
63 63
 }
64 64
 
65
-func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registryService *registry.Service) (*TagStore, error) {
65
+func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registryService *registry.Service, eventsService *events.Events) (*TagStore, error) {
66 66
 	abspath, err := filepath.Abs(path)
67 67
 	if err != nil {
68 68
 		return nil, err
... ...
@@ -76,6 +78,7 @@ func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registrySer
76 76
 		pullingPool:     make(map[string]chan struct{}),
77 77
 		pushingPool:     make(map[string]chan struct{}),
78 78
 		registryService: registryService,
79
+		eventsService:   eventsService,
79 80
 	}
80 81
 	// Load the json file if it exists, otherwise create it.
81 82
 	if err := store.reload(); os.IsNotExist(err) {
... ...
@@ -7,6 +7,7 @@ import (
7 7
 	"path"
8 8
 	"testing"
9 9
 
10
+	"github.com/docker/docker/daemon/events"
10 11
 	"github.com/docker/docker/daemon/graphdriver"
11 12
 	_ "github.com/docker/docker/daemon/graphdriver/vfs" // import the vfs driver so it is used in the tests
12 13
 	"github.com/docker/docker/image"
... ...
@@ -59,7 +60,7 @@ func mkTestTagStore(root string, t *testing.T) *TagStore {
59 59
 	if err != nil {
60 60
 		t.Fatal(err)
61 61
 	}
62
-	store, err := NewTagStore(path.Join(root, "tags"), graph, nil, nil)
62
+	store, err := NewTagStore(path.Join(root, "tags"), graph, nil, nil, events.New())
63 63
 	if err != nil {
64 64
 		t.Fatal(err)
65 65
 	}
... ...
@@ -237,7 +237,7 @@ func TestEventsImageImport(t *testing.T) {
237 237
 	event := strings.TrimSpace(events[len(events)-1])
238 238
 
239 239
 	if !strings.HasSuffix(event, ": import") {
240
-		t.Fatalf("Missing pull event - got:%q", event)
240
+		t.Fatalf("Missing import event - got:%q", event)
241 241
 	}
242 242
 
243 243
 	logDone("events - image import is logged")