Browse code

New package daemon/events

Signed-off-by: Alexander Morozov <lk4d4@docker.com>

Alexander Morozov authored on 2015/04/04 05:58:56
Showing 2 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,66 @@
0
+package events
1
+
2
+import (
3
+	"sync"
4
+	"time"
5
+
6
+	"github.com/docker/docker/pkg/jsonmessage"
7
+	"github.com/docker/docker/pkg/pubsub"
8
+)
9
+
10
+const eventsLimit = 64
11
+
12
+// Events is pubsub channel for *jsonmessage.JSONMessage
13
+type Events struct {
14
+	mu     sync.Mutex
15
+	events []*jsonmessage.JSONMessage
16
+	pub    *pubsub.Publisher
17
+}
18
+
19
+// New returns new *Events instance
20
+func New() *Events {
21
+	return &Events{
22
+		events: make([]*jsonmessage.JSONMessage, 0, eventsLimit),
23
+		pub:    pubsub.NewPublisher(100*time.Millisecond, 1024),
24
+	}
25
+}
26
+
27
+// Subscribe adds new listener to events, returns slice of 64 stored last events
28
+// channel in which you can expect new events in form of interface{}, so you
29
+// need type assertion.
30
+func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}) {
31
+	e.mu.Lock()
32
+	current := make([]*jsonmessage.JSONMessage, len(e.events))
33
+	copy(current, e.events)
34
+	l := e.pub.Subscribe()
35
+	e.mu.Unlock()
36
+	return current, l
37
+}
38
+
39
+// Evict evicts listener from pubsub
40
+func (e *Events) Evict(l chan interface{}) {
41
+	e.pub.Evict(l)
42
+}
43
+
44
+// Log broadcasts event to listeners. Each listener has 100 millisecond for
45
+// receiving event or it will be skipped.
46
+func (e *Events) Log(action, id, from string) {
47
+	go func() {
48
+		e.mu.Lock()
49
+		jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: time.Now().UTC().Unix()}
50
+		if len(e.events) == cap(e.events) {
51
+			// discard oldest event
52
+			copy(e.events, e.events[1:])
53
+			e.events[len(e.events)-1] = jm
54
+		} else {
55
+			e.events = append(e.events, jm)
56
+		}
57
+		e.mu.Unlock()
58
+		e.pub.Publish(jm)
59
+	}()
60
+}
61
+
62
+// SubscribersCount returns number of event listeners
63
+func (e *Events) SubscribersCount() int {
64
+	return e.pub.Len()
65
+}
0 66
new file mode 100644
... ...
@@ -0,0 +1,135 @@
0
+package events
1
+
2
+import (
3
+	"fmt"
4
+	"testing"
5
+	"time"
6
+
7
+	"github.com/docker/docker/pkg/jsonmessage"
8
+)
9
+
10
+func TestEventsLog(t *testing.T) {
11
+	e := New()
12
+	_, l1 := e.Subscribe()
13
+	_, l2 := e.Subscribe()
14
+	defer e.Evict(l1)
15
+	defer e.Evict(l2)
16
+	count := e.SubscribersCount()
17
+	if count != 2 {
18
+		t.Fatalf("Must be 2 subscribers, got %d", count)
19
+	}
20
+	e.Log("test", "cont", "image")
21
+	select {
22
+	case msg := <-l1:
23
+		jmsg, ok := msg.(*jsonmessage.JSONMessage)
24
+		if !ok {
25
+			t.Fatalf("Unexpected type %T", msg)
26
+		}
27
+		if len(e.events) != 1 {
28
+			t.Fatalf("Must be only one event, got %d", len(e.events))
29
+		}
30
+		if jmsg.Status != "test" {
31
+			t.Fatalf("Status should be test, got %s", jmsg.Status)
32
+		}
33
+		if jmsg.ID != "cont" {
34
+			t.Fatalf("ID should be cont, got %s", jmsg.ID)
35
+		}
36
+		if jmsg.From != "image" {
37
+			t.Fatalf("From should be image, got %s", jmsg.From)
38
+		}
39
+	case <-time.After(1 * time.Second):
40
+		t.Fatal("Timeout waiting for broadcasted message")
41
+	}
42
+	select {
43
+	case msg := <-l2:
44
+		jmsg, ok := msg.(*jsonmessage.JSONMessage)
45
+		if !ok {
46
+			t.Fatalf("Unexpected type %T", msg)
47
+		}
48
+		if len(e.events) != 1 {
49
+			t.Fatalf("Must be only one event, got %d", len(e.events))
50
+		}
51
+		if jmsg.Status != "test" {
52
+			t.Fatalf("Status should be test, got %s", jmsg.Status)
53
+		}
54
+		if jmsg.ID != "cont" {
55
+			t.Fatalf("ID should be cont, got %s", jmsg.ID)
56
+		}
57
+		if jmsg.From != "image" {
58
+			t.Fatalf("From should be image, got %s", jmsg.From)
59
+		}
60
+	case <-time.After(1 * time.Second):
61
+		t.Fatal("Timeout waiting for broadcasted message")
62
+	}
63
+}
64
+
65
+func TestEventsLogTimeout(t *testing.T) {
66
+	e := New()
67
+	_, l := e.Subscribe()
68
+	defer e.Evict(l)
69
+
70
+	c := make(chan struct{})
71
+	go func() {
72
+		e.Log("test", "cont", "image")
73
+		close(c)
74
+	}()
75
+
76
+	select {
77
+	case <-c:
78
+	case <-time.After(time.Second):
79
+		t.Fatal("Timeout publishing message")
80
+	}
81
+}
82
+
83
+func TestLogEvents(t *testing.T) {
84
+	e := New()
85
+
86
+	for i := 0; i < eventsLimit+16; i++ {
87
+		action := fmt.Sprintf("action_%d", i)
88
+		id := fmt.Sprintf("cont_%d", i)
89
+		from := fmt.Sprintf("image_%d", i)
90
+		e.Log(action, id, from)
91
+	}
92
+	time.Sleep(50 * time.Millisecond)
93
+	current, l := e.Subscribe()
94
+	for i := 0; i < 10; i++ {
95
+		num := i + eventsLimit + 16
96
+		action := fmt.Sprintf("action_%d", num)
97
+		id := fmt.Sprintf("cont_%d", num)
98
+		from := fmt.Sprintf("image_%d", num)
99
+		e.Log(action, id, from)
100
+	}
101
+	if len(e.events) != eventsLimit {
102
+		t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events))
103
+	}
104
+
105
+	var msgs []*jsonmessage.JSONMessage
106
+	for len(msgs) < 10 {
107
+		m := <-l
108
+		jm, ok := (m).(*jsonmessage.JSONMessage)
109
+		if !ok {
110
+			t.Fatalf("Unexpected type %T", m)
111
+		}
112
+		msgs = append(msgs, jm)
113
+	}
114
+	if len(current) != eventsLimit {
115
+		t.Fatalf("Must be %d events, got %d", eventsLimit, len(current))
116
+	}
117
+	first := current[0]
118
+	if first.Status != "action_16" {
119
+		t.Fatalf("First action is %s, must be action_16", first.Status)
120
+	}
121
+	last := current[len(current)-1]
122
+	if last.Status != "action_79" {
123
+		t.Fatalf("Last action is %s, must be action_79", last.Status)
124
+	}
125
+
126
+	firstC := msgs[0]
127
+	if firstC.Status != "action_80" {
128
+		t.Fatalf("First action is %s, must be action_80", firstC.Status)
129
+	}
130
+	lastC := msgs[len(msgs)-1]
131
+	if lastC.Status != "action_89" {
132
+		t.Fatalf("Last action is %s, must be action_89", lastC.Status)
133
+	}
134
+}