Browse code

in-memory ACID store for containers

This can be used by readers/queries so they don't need locks.

Signed-off-by: Fabio Kung <fabio.kung@gmail.com>

Fabio Kung authored on 2017/02/23 03:00:50
Showing 3 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,152 @@
0
+package container
1
+
2
+import (
3
+	"fmt"
4
+	"strings"
5
+	"time"
6
+
7
+	"github.com/Sirupsen/logrus"
8
+	"github.com/docker/docker/api/types"
9
+	"github.com/docker/docker/api/types/network"
10
+	"github.com/docker/go-connections/nat"
11
+)
12
+
13
+// Snapshot is a read only view for Containers
14
+type Snapshot struct {
15
+	ID           string `json:"Id"`
16
+	Name         string
17
+	Pid          int
18
+	Managed      bool
19
+	Image        string
20
+	ImageID      string
21
+	Command      string
22
+	Ports        []types.Port
23
+	ExposedPorts nat.PortSet
24
+	PublishPorts nat.PortSet
25
+	Labels       map[string]string
26
+	State        string
27
+	Status       string
28
+	Health       string
29
+	HostConfig   struct {
30
+		NetworkMode string
31
+		Isolation   string
32
+	}
33
+	NetworkSettings types.SummaryNetworkSettings
34
+	Mounts          []types.MountPoint
35
+	Created         time.Time
36
+	StartedAt       time.Time
37
+	Running         bool
38
+	Paused          bool
39
+	ExitCode        int
40
+}
41
+
42
+// Snapshot provides a read only view of a Container. Callers must hold a Lock on the container object.
43
+func (container *Container) Snapshot() *Snapshot {
44
+	snapshot := &Snapshot{
45
+		ID:           container.ID,
46
+		Name:         container.Name,
47
+		Pid:          container.Pid,
48
+		Managed:      container.Managed,
49
+		ImageID:      container.ImageID.String(),
50
+		Ports:        []types.Port{},
51
+		ExposedPorts: make(nat.PortSet),
52
+		PublishPorts: make(nat.PortSet),
53
+		State:        container.State.StateString(),
54
+		Status:       container.State.String(),
55
+		Health:       container.State.HealthString(),
56
+		Mounts:       container.GetMountPoints(),
57
+		Created:      container.Created,
58
+		StartedAt:    container.StartedAt,
59
+		Running:      container.Running,
60
+		Paused:       container.Paused,
61
+		ExitCode:     container.ExitCode(),
62
+	}
63
+
64
+	if container.HostConfig != nil {
65
+		snapshot.HostConfig.Isolation = string(container.HostConfig.Isolation)
66
+		snapshot.HostConfig.NetworkMode = string(container.HostConfig.NetworkMode)
67
+		for publish := range container.HostConfig.PortBindings {
68
+			snapshot.PublishPorts[publish] = struct{}{}
69
+		}
70
+	}
71
+
72
+	if container.Config != nil {
73
+		snapshot.Image = container.Config.Image
74
+		snapshot.Labels = container.Config.Labels
75
+		for exposed := range container.Config.ExposedPorts {
76
+			snapshot.ExposedPorts[exposed] = struct{}{}
77
+		}
78
+	}
79
+
80
+	if len(container.Args) > 0 {
81
+		args := []string{}
82
+		for _, arg := range container.Args {
83
+			if strings.Contains(arg, " ") {
84
+				args = append(args, fmt.Sprintf("'%s'", arg))
85
+			} else {
86
+				args = append(args, arg)
87
+			}
88
+		}
89
+		argsAsString := strings.Join(args, " ")
90
+		snapshot.Command = fmt.Sprintf("%s %s", container.Path, argsAsString)
91
+	} else {
92
+		snapshot.Command = container.Path
93
+	}
94
+
95
+	if container.NetworkSettings != nil {
96
+		networks := make(map[string]*network.EndpointSettings)
97
+		for name, netw := range container.NetworkSettings.Networks {
98
+			if netw == nil || netw.EndpointSettings == nil {
99
+				continue
100
+			}
101
+			networks[name] = &network.EndpointSettings{
102
+				EndpointID:          netw.EndpointID,
103
+				Gateway:             netw.Gateway,
104
+				IPAddress:           netw.IPAddress,
105
+				IPPrefixLen:         netw.IPPrefixLen,
106
+				IPv6Gateway:         netw.IPv6Gateway,
107
+				GlobalIPv6Address:   netw.GlobalIPv6Address,
108
+				GlobalIPv6PrefixLen: netw.GlobalIPv6PrefixLen,
109
+				MacAddress:          netw.MacAddress,
110
+				NetworkID:           netw.NetworkID,
111
+			}
112
+			if netw.IPAMConfig != nil {
113
+				networks[name].IPAMConfig = &network.EndpointIPAMConfig{
114
+					IPv4Address: netw.IPAMConfig.IPv4Address,
115
+					IPv6Address: netw.IPAMConfig.IPv6Address,
116
+				}
117
+			}
118
+		}
119
+		snapshot.NetworkSettings = types.SummaryNetworkSettings{Networks: networks}
120
+		for port, bindings := range container.NetworkSettings.Ports {
121
+			p, err := nat.ParsePort(port.Port())
122
+			if err != nil {
123
+				logrus.Warnf("invalid port map %+v", err)
124
+				continue
125
+			}
126
+			if len(bindings) == 0 {
127
+				snapshot.Ports = append(snapshot.Ports, types.Port{
128
+					PrivatePort: uint16(p),
129
+					Type:        port.Proto(),
130
+				})
131
+				continue
132
+			}
133
+			for _, binding := range bindings {
134
+				h, err := nat.ParsePort(binding.HostPort)
135
+				if err != nil {
136
+					logrus.Warnf("invalid host port map %+v", err)
137
+					continue
138
+				}
139
+				snapshot.Ports = append(snapshot.Ports, types.Port{
140
+					PrivatePort: uint16(p),
141
+					PublicPort:  uint16(h),
142
+					Type:        port.Proto(),
143
+					IP:          binding.HostIP,
144
+				})
145
+			}
146
+		}
147
+
148
+	}
149
+
150
+	return snapshot
151
+}
0 152
new file mode 100644
... ...
@@ -0,0 +1,90 @@
0
+package container
1
+
2
+import "github.com/hashicorp/go-memdb"
3
+
4
+const (
5
+	memdbTable   = "containers"
6
+	memdbIDField = "ID"
7
+	memdbIDIndex = "id"
8
+)
9
+
10
+var schema = &memdb.DBSchema{
11
+	Tables: map[string]*memdb.TableSchema{
12
+		memdbTable: {
13
+			Name: memdbTable,
14
+			Indexes: map[string]*memdb.IndexSchema{
15
+				memdbIDIndex: {
16
+					Name:    memdbIDIndex,
17
+					Unique:  true,
18
+					Indexer: &memdb.StringFieldIndex{Field: memdbIDField},
19
+				},
20
+			},
21
+		},
22
+	},
23
+}
24
+
25
+// MemDB provides an in-memory transactional (ACID) container Store
26
+type MemDB struct {
27
+	store *memdb.MemDB
28
+}
29
+
30
+// NewMemDB provides the default implementation, with the default schema
31
+func NewMemDB() (*MemDB, error) {
32
+	store, err := memdb.NewMemDB(schema)
33
+	if err != nil {
34
+		return nil, err
35
+	}
36
+	return &MemDB{store: store}, nil
37
+}
38
+
39
+// Snapshot provides a consistent read-only View of the database
40
+func (db *MemDB) Snapshot() *View {
41
+	return &View{db.store.Txn(false)}
42
+}
43
+
44
+// Save atomically updates the in-memory store
45
+func (db *MemDB) Save(snapshot *Snapshot) error {
46
+	txn := db.store.Txn(true)
47
+	defer txn.Commit()
48
+	return txn.Insert(memdbTable, snapshot)
49
+}
50
+
51
+// Delete removes an item by ID
52
+func (db *MemDB) Delete(id string) error {
53
+	txn := db.store.Txn(true)
54
+	defer txn.Commit()
55
+	return txn.Delete(memdbTable, &Snapshot{ID: id})
56
+}
57
+
58
+// View can be used by readers to avoid locking
59
+type View struct {
60
+	txn *memdb.Txn
61
+}
62
+
63
+// All returns a all items in this snapshot
64
+func (v *View) All() ([]Snapshot, error) {
65
+	var all []Snapshot
66
+	iter, err := v.txn.Get(memdbTable, memdbIDIndex)
67
+	if err != nil {
68
+		return nil, err
69
+	}
70
+	for {
71
+		item := iter.Next()
72
+		if item == nil {
73
+			break
74
+		}
75
+		snapshot := *(item.(*Snapshot)) // force a copy
76
+		all = append(all, snapshot)
77
+	}
78
+	return all, nil
79
+}
80
+
81
+//Get returns an item by id
82
+func (v *View) Get(id string) (*Snapshot, error) {
83
+	s, err := v.txn.First(memdbTable, memdbIDIndex, id)
84
+	if err != nil {
85
+		return nil, err
86
+	}
87
+	snapshot := *(s.(*Snapshot)) // force a copy
88
+	return &snapshot, nil
89
+}
0 90
new file mode 100644
... ...
@@ -0,0 +1,58 @@
0
+package container
1
+
2
+import "testing"
3
+
4
+func TestViewSave(t *testing.T) {
5
+	db, err := NewMemDB()
6
+	if err != nil {
7
+		t.Fatal(err)
8
+	}
9
+	snapshot := NewBaseContainer("id", "root").Snapshot()
10
+	if err := db.Save(snapshot); err != nil {
11
+		t.Fatal(err)
12
+
13
+	}
14
+}
15
+
16
+func TestViewAll(t *testing.T) {
17
+	var (
18
+		db, _ = NewMemDB()
19
+		one   = NewBaseContainer("id1", "root1").Snapshot()
20
+		two   = NewBaseContainer("id2", "root2").Snapshot()
21
+	)
22
+	one.Pid = 10
23
+	two.Pid = 20
24
+	db.Save(one)
25
+	db.Save(two)
26
+	all, err := db.Snapshot().All()
27
+	if err != nil {
28
+		t.Fatal(err)
29
+	}
30
+	if l := len(all); l != 2 {
31
+		t.Fatalf("expected 2 items, got %d", l)
32
+	}
33
+	byID := make(map[string]Snapshot)
34
+	for i := range all {
35
+		byID[all[i].ID] = all[i]
36
+	}
37
+	if s, ok := byID["id1"]; !ok || s.Pid != 10 {
38
+		t.Fatalf("expected something different with for id1: %v", s)
39
+	}
40
+	if s, ok := byID["id2"]; !ok || s.Pid != 20 {
41
+		t.Fatalf("expected something different with for id1: %v", s)
42
+	}
43
+}
44
+
45
+func TestViewGet(t *testing.T) {
46
+	db, _ := NewMemDB()
47
+	one := NewBaseContainer("id", "root")
48
+	one.ImageID = "some-image-123"
49
+	db.Save(one.Snapshot())
50
+	s, err := db.Snapshot().Get("id")
51
+	if err != nil {
52
+		t.Fatal(err)
53
+	}
54
+	if s == nil || s.ImageID != "some-image-123" {
55
+		t.Fatalf("expected something different. Got: %v", s)
56
+	}
57
+}