Browse code

Remove engine usage for events

Signed-off-by: Alexander Morozov <lk4d4@docker.com>

Alexander Morozov authored on 2015/04/04 07:17:49
Showing 11 changed files
... ...
@@ -3,6 +3,7 @@ package server
3 3
 import (
4 4
 	"bufio"
5 5
 	"bytes"
6
+	"time"
6 7
 
7 8
 	"encoding/base64"
8 9
 	"encoding/json"
... ...
@@ -23,7 +24,9 @@ import (
23 23
 	"github.com/docker/docker/daemon"
24 24
 	"github.com/docker/docker/daemon/networkdriver/bridge"
25 25
 	"github.com/docker/docker/engine"
26
+	"github.com/docker/docker/pkg/jsonmessage"
26 27
 	"github.com/docker/docker/pkg/parsers"
28
+	"github.com/docker/docker/pkg/parsers/filters"
27 29
 	"github.com/docker/docker/pkg/stdcopy"
28 30
 	"github.com/docker/docker/pkg/streamformatter"
29 31
 	"github.com/docker/docker/pkg/version"
... ...
@@ -324,13 +327,104 @@ func getEvents(eng *engine.Engine, version version.Version, w http.ResponseWrite
324 324
 	if err := parseForm(r); err != nil {
325 325
 		return err
326 326
 	}
327
+	var since int64 = -1
328
+	if r.Form.Get("since") != "" {
329
+		s, err := strconv.ParseInt(r.Form.Get("since"), 10, 64)
330
+		if err != nil {
331
+			return err
332
+		}
333
+		since = s
334
+	}
327 335
 
328
-	var job = eng.Job("events")
329
-	streamJSON(job, w, true)
330
-	job.Setenv("since", r.Form.Get("since"))
331
-	job.Setenv("until", r.Form.Get("until"))
332
-	job.Setenv("filters", r.Form.Get("filters"))
333
-	return job.Run()
336
+	var until int64 = -1
337
+	if r.Form.Get("until") != "" {
338
+		u, err := strconv.ParseInt(r.Form.Get("until"), 10, 64)
339
+		if err != nil {
340
+			return err
341
+		}
342
+		until = u
343
+	}
344
+	timer := time.NewTimer(0)
345
+	timer.Stop()
346
+	if until > 0 {
347
+		dur := time.Unix(until, 0).Sub(time.Now())
348
+		timer = time.NewTimer(dur)
349
+	}
350
+
351
+	ef, err := filters.FromParam(r.Form.Get("filters"))
352
+	if err != nil {
353
+		return err
354
+	}
355
+
356
+	isFiltered := func(field string, filter []string) bool {
357
+		if len(filter) == 0 {
358
+			return false
359
+		}
360
+		for _, v := range filter {
361
+			if v == field {
362
+				return false
363
+			}
364
+			if strings.Contains(field, ":") {
365
+				image := strings.Split(field, ":")
366
+				if image[0] == v {
367
+					return false
368
+				}
369
+			}
370
+		}
371
+		return true
372
+	}
373
+
374
+	d := getDaemon(eng)
375
+	es := d.EventsService
376
+	w.Header().Set("Content-Type", "application/json")
377
+	enc := json.NewEncoder(utils.NewWriteFlusher(w))
378
+
379
+	getContainerId := func(cn string) string {
380
+		c, err := d.Get(cn)
381
+		if err != nil {
382
+			return ""
383
+		}
384
+		return c.ID
385
+	}
386
+
387
+	sendEvent := func(ev *jsonmessage.JSONMessage) error {
388
+		//incoming container filter can be name,id or partial id, convert and replace as a full container id
389
+		for i, cn := range ef["container"] {
390
+			ef["container"][i] = getContainerId(cn)
391
+		}
392
+
393
+		if isFiltered(ev.Status, ef["event"]) || isFiltered(ev.From, ef["image"]) ||
394
+			isFiltered(ev.ID, ef["container"]) {
395
+			return nil
396
+		}
397
+
398
+		return enc.Encode(ev)
399
+	}
400
+
401
+	current, l := es.Subscribe()
402
+	defer es.Evict(l)
403
+	for _, ev := range current {
404
+		if ev.Time < since {
405
+			continue
406
+		}
407
+		if err := sendEvent(ev); err != nil {
408
+			return err
409
+		}
410
+	}
411
+	for {
412
+		select {
413
+		case ev := <-l:
414
+			jev, ok := ev.(*jsonmessage.JSONMessage)
415
+			if !ok {
416
+				continue
417
+			}
418
+			if err := sendEvent(jev); err != nil {
419
+				return err
420
+			}
421
+		case <-timer.C:
422
+			return nil
423
+		}
424
+	}
334 425
 }
335 426
 
336 427
 func getImagesHistory(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
... ...
@@ -250,47 +250,6 @@ func TestGetContainersByName(t *testing.T) {
250 250
 	}
251 251
 }
252 252
 
253
-func TestGetEvents(t *testing.T) {
254
-	eng := engine.New()
255
-	var called bool
256
-	eng.Register("events", func(job *engine.Job) error {
257
-		called = true
258
-		since := job.Getenv("since")
259
-		if since != "1" {
260
-			t.Fatalf("'since' should be 1, found %#v instead", since)
261
-		}
262
-		until := job.Getenv("until")
263
-		if until != "0" {
264
-			t.Fatalf("'until' should be 0, found %#v instead", until)
265
-		}
266
-		v := &engine.Env{}
267
-		v.Set("since", since)
268
-		v.Set("until", until)
269
-		if _, err := v.WriteTo(job.Stdout); err != nil {
270
-			return err
271
-		}
272
-		return nil
273
-	})
274
-	r := serveRequest("GET", "/events?since=1&until=0", nil, eng, t)
275
-	if !called {
276
-		t.Fatal("handler was not called")
277
-	}
278
-	assertContentType(r, "application/json", t)
279
-	var stdoutJSON struct {
280
-		Since int
281
-		Until int
282
-	}
283
-	if err := json.Unmarshal(r.Body.Bytes(), &stdoutJSON); err != nil {
284
-		t.Fatal(err)
285
-	}
286
-	if stdoutJSON.Since != 1 {
287
-		t.Errorf("since != 1: %#v", stdoutJSON.Since)
288
-	}
289
-	if stdoutJSON.Until != 0 {
290
-		t.Errorf("until != 0: %#v", stdoutJSON.Until)
291
-	}
292
-}
293
-
294 253
 func TestLogs(t *testing.T) {
295 254
 	eng := engine.New()
296 255
 	var inspect bool
... ...
@@ -8,7 +8,6 @@ import (
8 8
 	"github.com/docker/docker/autogen/dockerversion"
9 9
 	"github.com/docker/docker/daemon/networkdriver/bridge"
10 10
 	"github.com/docker/docker/engine"
11
-	"github.com/docker/docker/events"
12 11
 	"github.com/docker/docker/pkg/parsers/kernel"
13 12
 )
14 13
 
... ...
@@ -19,9 +18,6 @@ func Register(eng *engine.Engine) error {
19 19
 	if err := remote(eng); err != nil {
20 20
 		return err
21 21
 	}
22
-	if err := events.New().Install(eng); err != nil {
23
-		return err
24
-	}
25 22
 	if err := eng.Register("version", dockerVersion); err != nil {
26 23
 		return err
27 24
 	}
... ...
@@ -200,9 +200,11 @@ func (container *Container) WriteHostConfig() error {
200 200
 
201 201
 func (container *Container) LogEvent(action string) {
202 202
 	d := container.daemon
203
-	if err := d.eng.Job("log", action, container.ID, d.Repositories().ImageName(container.ImageID)).Run(); err != nil {
204
-		logrus.Errorf("Error logging event %s for %s: %s", action, container.ID, err)
205
-	}
203
+	d.EventsService.Log(
204
+		action,
205
+		container.ID,
206
+		d.Repositories().ImageName(container.ImageID),
207
+	)
206 208
 }
207 209
 
208 210
 func (container *Container) getResourcePath(path string) (string, error) {
... ...
@@ -19,6 +19,7 @@ import (
19 19
 	"github.com/Sirupsen/logrus"
20 20
 	"github.com/docker/docker/api"
21 21
 	"github.com/docker/docker/autogen/dockerversion"
22
+	"github.com/docker/docker/daemon/events"
22 23
 	"github.com/docker/docker/daemon/execdriver"
23 24
 	"github.com/docker/docker/daemon/execdriver/execdrivers"
24 25
 	"github.com/docker/docker/daemon/execdriver/lxc"
... ...
@@ -109,6 +110,7 @@ type Daemon struct {
109 109
 	statsCollector   *statsCollector
110 110
 	defaultLogConfig runconfig.LogConfig
111 111
 	RegistryService  *registry.Service
112
+	EventsService    *events.Events
112 113
 }
113 114
 
114 115
 // Install installs daemon capabilities to eng.
... ...
@@ -932,8 +934,9 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService
932 932
 		return nil, err
933 933
 	}
934 934
 
935
+	eventsService := events.New()
935 936
 	logrus.Debug("Creating repository list")
936
-	repositories, err := graph.NewTagStore(path.Join(config.Root, "repositories-"+driver.String()), g, trustKey, registryService)
937
+	repositories, err := graph.NewTagStore(path.Join(config.Root, "repositories-"+driver.String()), g, trustKey, registryService, eventsService)
937 938
 	if err != nil {
938 939
 		return nil, fmt.Errorf("Couldn't create Tag store: %s", err)
939 940
 	}
... ...
@@ -1025,6 +1028,7 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService
1025 1025
 		statsCollector:   newStatsCollector(1 * time.Second),
1026 1026
 		defaultLogConfig: config.LogConfig,
1027 1027
 		RegistryService:  registryService,
1028
+		EventsService:    eventsService,
1028 1029
 	}
1029 1030
 
1030 1031
 	eng.OnShutdown(func() {
... ...
@@ -108,7 +108,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, list *[]types
108 108
 			*list = append(*list, types.ImageDelete{
109 109
 				Untagged: utils.ImageReference(repoName, tag),
110 110
 			})
111
-			eng.Job("log", "untag", img.ID, "").Run()
111
+			daemon.EventsService.Log("untag", img.ID, "")
112 112
 		}
113 113
 	}
114 114
 	tags = daemon.Repositories().ByID()[img.ID]
... ...
@@ -123,6 +123,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, list *[]types
123 123
 			*list = append(*list, types.ImageDelete{
124 124
 				Deleted: img.ID,
125 125
 			})
126
+			daemon.EventsService.Log("delete", img.ID, "")
126 127
 			eng.Job("log", "delete", img.ID, "").Run()
127 128
 			if img.Parent != "" && !noprune {
128 129
 				err := daemon.DeleteImage(eng, img.Parent, list, false, force, noprune)
... ...
@@ -51,11 +51,6 @@ func (daemon *Daemon) CmdInfo(job *engine.Job) error {
51 51
 		initPath = daemon.SystemInitPath()
52 52
 	}
53 53
 
54
-	cjob := job.Eng.Job("subscribers_count")
55
-	env, _ := cjob.Stdout.AddEnv()
56
-	if err := cjob.Run(); err != nil {
57
-		return err
58
-	}
59 54
 	v := &engine.Env{}
60 55
 	v.SetJson("ID", daemon.ID)
61 56
 	v.SetInt("Containers", len(daemon.List()))
... ...
@@ -71,7 +66,7 @@ func (daemon *Daemon) CmdInfo(job *engine.Job) error {
71 71
 	v.Set("SystemTime", time.Now().Format(time.RFC3339Nano))
72 72
 	v.Set("ExecutionDriver", daemon.ExecutionDriver().Name())
73 73
 	v.Set("LoggingDriver", daemon.defaultLogConfig.Type)
74
-	v.SetInt("NEventsListener", env.GetInt("count"))
74
+	v.SetInt("NEventsListener", daemon.EventsService.SubscribersCount())
75 75
 	v.Set("KernelVersion", kernelVersion)
76 76
 	v.Set("OperatingSystem", operatingSystem)
77 77
 	v.Set("IndexServerAddress", registry.IndexServerAddress())
... ...
@@ -7,7 +7,6 @@ import (
7 7
 	"net/http"
8 8
 	"net/url"
9 9
 
10
-	"github.com/Sirupsen/logrus"
11 10
 	"github.com/docker/docker/engine"
12 11
 	"github.com/docker/docker/pkg/archive"
13 12
 	"github.com/docker/docker/pkg/progressreader"
... ...
@@ -92,8 +91,7 @@ func (s *TagStore) CmdImport(job *engine.Job) error {
92 92
 	if tag != "" {
93 93
 		logID = utils.ImageReference(logID, tag)
94 94
 	}
95
-	if err = job.Eng.Job("log", "import", logID, "").Run(); err != nil {
96
-		logrus.Errorf("Error logging event 'import' for %s: %s", logID, err)
97
-	}
95
+
96
+	s.eventsService.Log("import", logID, "")
98 97
 	return nil
99 98
 }
... ...
@@ -85,9 +85,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
85 85
 
86 86
 		logrus.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName)
87 87
 		if err := s.pullV2Repository(job.Eng, r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel")); err == nil {
88
-			if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil {
89
-				logrus.Errorf("Error logging event 'pull' for %s: %s", logName, err)
90
-			}
88
+			s.eventsService.Log("pull", logName, "")
91 89
 			return nil
92 90
 		} else if err != registry.ErrDoesNotExist && err != ErrV2RegistryUnavailable {
93 91
 			logrus.Errorf("Error from V2 registry: %s", err)
... ...
@@ -101,9 +99,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error {
101 101
 		return err
102 102
 	}
103 103
 
104
-	if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil {
105
-		logrus.Errorf("Error logging event 'pull' for %s: %s", logName, err)
106
-	}
104
+	s.eventsService.Log("pull", logName, "")
107 105
 
108 106
 	return nil
109 107
 }
... ...
@@ -13,6 +13,7 @@ import (
13 13
 	"strings"
14 14
 	"sync"
15 15
 
16
+	"github.com/docker/docker/daemon/events"
16 17
 	"github.com/docker/docker/image"
17 18
 	"github.com/docker/docker/pkg/parsers"
18 19
 	"github.com/docker/docker/pkg/stringid"
... ...
@@ -40,6 +41,7 @@ type TagStore struct {
40 40
 	pullingPool     map[string]chan struct{}
41 41
 	pushingPool     map[string]chan struct{}
42 42
 	registryService *registry.Service
43
+	eventsService   *events.Events
43 44
 }
44 45
 
45 46
 type Repository map[string]string
... ...
@@ -62,7 +64,7 @@ func (r Repository) Contains(u Repository) bool {
62 62
 	return true
63 63
 }
64 64
 
65
-func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registryService *registry.Service) (*TagStore, error) {
65
+func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registryService *registry.Service, eventsService *events.Events) (*TagStore, error) {
66 66
 	abspath, err := filepath.Abs(path)
67 67
 	if err != nil {
68 68
 		return nil, err
... ...
@@ -76,6 +78,7 @@ func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registrySer
76 76
 		pullingPool:     make(map[string]chan struct{}),
77 77
 		pushingPool:     make(map[string]chan struct{}),
78 78
 		registryService: registryService,
79
+		eventsService:   eventsService,
79 80
 	}
80 81
 	// Load the json file if it exists, otherwise create it.
81 82
 	if err := store.reload(); os.IsNotExist(err) {
... ...
@@ -7,6 +7,7 @@ import (
7 7
 	"path"
8 8
 	"testing"
9 9
 
10
+	"github.com/docker/docker/daemon/events"
10 11
 	"github.com/docker/docker/daemon/graphdriver"
11 12
 	_ "github.com/docker/docker/daemon/graphdriver/vfs" // import the vfs driver so it is used in the tests
12 13
 	"github.com/docker/docker/image"
... ...
@@ -59,7 +60,7 @@ func mkTestTagStore(root string, t *testing.T) *TagStore {
59 59
 	if err != nil {
60 60
 		t.Fatal(err)
61 61
 	}
62
-	store, err := NewTagStore(path.Join(root, "tags"), graph, nil, nil)
62
+	store, err := NewTagStore(path.Join(root, "tags"), graph, nil, nil, events.New())
63 63
 	if err != nil {
64 64
 		t.Fatal(err)
65 65
 	}