Browse code

Move checkpointing to the Container object

Also hide ViewDB behind an inteface.

Signed-off-by: Fabio Kung <fabio.kung@gmail.com>

Fabio Kung authored on 2017/02/24 08:12:18
Showing 15 changed files
... ...
@@ -189,6 +189,21 @@ func (container *Container) ToDiskLocking() error {
189 189
 	return err
190 190
 }
191 191
 
192
+// CheckpointTo makes the Container's current state visible to queries.
193
+// Callers must hold a Container lock.
194
+func (container *Container) CheckpointTo(store ViewDB) error {
195
+	return store.Save(container.snapshot())
196
+}
197
+
198
+// CheckpointAndSaveToDisk is equivalent to calling CheckpointTo and ToDisk.
199
+// Callers must hold a Container lock.
200
+func (container *Container) CheckpointAndSaveToDisk(store ViewDB) error {
201
+	if err := container.CheckpointTo(store); err != nil {
202
+		return err
203
+	}
204
+	return container.ToDisk()
205
+}
206
+
192 207
 // readHostConfig reads the host configuration from disk for the container.
193 208
 func (container *Container) readHostConfig() error {
194 209
 	container.HostConfig = &containertypes.HostConfig{}
... ...
@@ -41,7 +41,7 @@ type Snapshot struct {
41 41
 }
42 42
 
43 43
 // Snapshot provides a read only view of a Container. Callers must hold a Lock on the container object.
44
-func (container *Container) Snapshot() *Snapshot {
44
+func (container *Container) snapshot() *Snapshot {
45 45
 	snapshot := &Snapshot{
46 46
 		ID:           container.ID,
47 47
 		Name:         container.Name,
... ...
@@ -8,6 +8,19 @@ const (
8 8
 	memdbIDIndex = "id"
9 9
 )
10 10
 
11
+// ViewDB provides an in-memory transactional (ACID) container Store
12
+type ViewDB interface {
13
+	Snapshot() View
14
+	Save(snapshot *Snapshot) error
15
+	Delete(id string) error
16
+}
17
+
18
+// View can be used by readers to avoid locking
19
+type View interface {
20
+	All() ([]Snapshot, error)
21
+	Get(id string) (*Snapshot, error)
22
+}
23
+
11 24
 var schema = &memdb.DBSchema{
12 25
 	Tables: map[string]*memdb.TableSchema{
13 26
 		memdbTable: {
... ...
@@ -23,46 +36,44 @@ var schema = &memdb.DBSchema{
23 23
 	},
24 24
 }
25 25
 
26
-// MemDB provides an in-memory transactional (ACID) container Store
27
-type MemDB struct {
26
+type memDB struct {
28 27
 	store *memdb.MemDB
29 28
 }
30 29
 
31
-// NewMemDB provides the default implementation, with the default schema
32
-func NewMemDB() (*MemDB, error) {
30
+// NewViewDB provides the default implementation, with the default schema
31
+func NewViewDB() (ViewDB, error) {
33 32
 	store, err := memdb.NewMemDB(schema)
34 33
 	if err != nil {
35 34
 		return nil, err
36 35
 	}
37
-	return &MemDB{store: store}, nil
36
+	return &memDB{store: store}, nil
38 37
 }
39 38
 
40 39
 // Snapshot provides a consistent read-only View of the database
41
-func (db *MemDB) Snapshot() *View {
42
-	return &View{db.store.Txn(false)}
40
+func (db *memDB) Snapshot() View {
41
+	return &memdbView{db.store.Txn(false)}
43 42
 }
44 43
 
45 44
 // Save atomically updates the in-memory store
46
-func (db *MemDB) Save(snapshot *Snapshot) error {
45
+func (db *memDB) Save(snapshot *Snapshot) error {
47 46
 	txn := db.store.Txn(true)
48 47
 	defer txn.Commit()
49 48
 	return txn.Insert(memdbTable, snapshot)
50 49
 }
51 50
 
52 51
 // Delete removes an item by ID
53
-func (db *MemDB) Delete(id string) error {
52
+func (db *memDB) Delete(id string) error {
54 53
 	txn := db.store.Txn(true)
55 54
 	defer txn.Commit()
56 55
 	return txn.Delete(memdbTable, &Snapshot{ID: id})
57 56
 }
58 57
 
59
-// View can be used by readers to avoid locking
60
-type View struct {
58
+type memdbView struct {
61 59
 	txn *memdb.Txn
62 60
 }
63 61
 
64 62
 // All returns a all items in this snapshot
65
-func (v *View) All() ([]Snapshot, error) {
63
+func (v *memdbView) All() ([]Snapshot, error) {
66 64
 	var all []Snapshot
67 65
 	iter, err := v.txn.Get(memdbTable, memdbIDIndex)
68 66
 	if err != nil {
... ...
@@ -80,7 +91,7 @@ func (v *View) All() ([]Snapshot, error) {
80 80
 }
81 81
 
82 82
 //Get returns an item by id
83
-func (v *View) Get(id string) (*Snapshot, error) {
83
+func (v *memdbView) Get(id string) (*Snapshot, error) {
84 84
 	s, err := v.txn.First(memdbTable, memdbIDIndex, id)
85 85
 	if err != nil {
86 86
 		return nil, err
... ...
@@ -3,27 +3,26 @@ package container
3 3
 import "testing"
4 4
 
5 5
 func TestViewSave(t *testing.T) {
6
-	db, err := NewMemDB()
6
+	db, err := NewViewDB()
7 7
 	if err != nil {
8 8
 		t.Fatal(err)
9 9
 	}
10
-	snapshot := NewBaseContainer("id", "root").Snapshot()
11
-	if err := db.Save(snapshot); err != nil {
10
+	c := NewBaseContainer("id", "root")
11
+	if err := c.CheckpointTo(db); err != nil {
12 12
 		t.Fatal(err)
13
-
14 13
 	}
15 14
 }
16 15
 
17 16
 func TestViewAll(t *testing.T) {
18 17
 	var (
19
-		db, _ = NewMemDB()
20
-		one   = NewBaseContainer("id1", "root1").Snapshot()
21
-		two   = NewBaseContainer("id2", "root2").Snapshot()
18
+		db, _ = NewViewDB()
19
+		one   = NewBaseContainer("id1", "root1")
20
+		two   = NewBaseContainer("id2", "root2")
22 21
 	)
23 22
 	one.Pid = 10
24 23
 	two.Pid = 20
25
-	db.Save(one)
26
-	db.Save(two)
24
+	one.CheckpointTo(db)
25
+	two.CheckpointTo(db)
27 26
 	all, err := db.Snapshot().All()
28 27
 	if err != nil {
29 28
 		t.Fatal(err)
... ...
@@ -44,10 +43,10 @@ func TestViewAll(t *testing.T) {
44 44
 }
45 45
 
46 46
 func TestViewGet(t *testing.T) {
47
-	db, _ := NewMemDB()
47
+	db, _ := NewViewDB()
48 48
 	one := NewBaseContainer("id", "root")
49 49
 	one.ImageID = "some-image-123"
50
-	db.Save(one.Snapshot())
50
+	one.CheckpointTo(db)
51 51
 	s, err := db.Snapshot().Get("id")
52 52
 	if err != nil {
53 53
 		t.Fatal(err)
... ...
@@ -108,13 +108,13 @@ func (daemon *Daemon) Register(c *container.Container) error {
108 108
 	}
109 109
 
110 110
 	// once in the memory store it is visible to other goroutines
111
-	// grab a Lock until it has been replicated to avoid races
111
+	// grab a Lock until it has been checkpointed to avoid races
112 112
 	c.Lock()
113 113
 	defer c.Unlock()
114 114
 
115 115
 	daemon.containers.Add(c.ID, c)
116 116
 	daemon.idIndex.Add(c.ID)
117
-	return daemon.containersReplica.Save(c.Snapshot())
117
+	return c.CheckpointTo(daemon.containersReplica)
118 118
 }
119 119
 
120 120
 func (daemon *Daemon) newContainer(name string, platform string, config *containertypes.Config, hostConfig *containertypes.HostConfig, imgID image.ID, managed bool) (*container.Container, error) {
... ...
@@ -218,10 +218,7 @@ func (daemon *Daemon) setHostConfig(container *container.Container, hostConfig *
218 218
 
219 219
 	runconfig.SetDefaultNetModeIfBlank(hostConfig)
220 220
 	container.HostConfig = hostConfig
221
-	if err := daemon.containersReplica.Save(container.Snapshot()); err != nil {
222
-		return err
223
-	}
224
-	return container.ToDisk()
221
+	return container.CheckpointAndSaveToDisk(daemon.containersReplica)
225 222
 }
226 223
 
227 224
 // verifyContainerSettings performs validation of the hostconfig and config
... ...
@@ -45,14 +45,11 @@ func (daemon *Daemon) getDNSSearchSettings(container *container.Container) []str
45 45
 	return nil
46 46
 }
47 47
 
48
-func (daemon *Daemon) saveAndReplicate(container *container.Container) error {
48
+func (daemon *Daemon) checkpointAndSave(container *container.Container) error {
49 49
 	container.Lock()
50 50
 	defer container.Unlock()
51
-	if err := daemon.containersReplica.Save(container.Snapshot()); err != nil {
52
-		return fmt.Errorf("Error replicating container state: %v", err)
53
-	}
54
-	if err := container.ToDisk(); err != nil {
55
-		return fmt.Errorf("Error saving container to disk: %v", err)
51
+	if err := container.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil {
52
+		return fmt.Errorf("Error saving container state: %v", err)
56 53
 	}
57 54
 	return nil
58 55
 }
... ...
@@ -1018,10 +1015,8 @@ func (daemon *Daemon) ConnectToNetwork(container *container.Container, idOrName
1018 1018
 			return err
1019 1019
 		}
1020 1020
 	}
1021
-	if err := daemon.saveAndReplicate(container); err != nil {
1022
-		return fmt.Errorf("Error saving container to disk: %v", err)
1023
-	}
1024
-	return nil
1021
+
1022
+	return daemon.checkpointAndSave(container)
1025 1023
 }
1026 1024
 
1027 1025
 // DisconnectFromNetwork disconnects container from network n.
... ...
@@ -1057,8 +1052,8 @@ func (daemon *Daemon) DisconnectFromNetwork(container *container.Container, netw
1057 1057
 		return err
1058 1058
 	}
1059 1059
 
1060
-	if err := daemon.saveAndReplicate(container); err != nil {
1061
-		return fmt.Errorf("Error saving container to disk: %v", err)
1060
+	if err := daemon.checkpointAndSave(container); err != nil {
1061
+		return err
1062 1062
 	}
1063 1063
 
1064 1064
 	if n != nil {
... ...
@@ -83,7 +83,7 @@ type Daemon struct {
83 83
 	ID                    string
84 84
 	repository            string
85 85
 	containers            container.Store
86
-	containersReplica     *container.MemDB
86
+	containersReplica     container.ViewDB
87 87
 	execCommands          *exec.Store
88 88
 	downloadManager       *xfer.LayerDownloadManager
89 89
 	uploadManager         *xfer.LayerUploadManager
... ...
@@ -762,7 +762,7 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe
762 762
 	d.ID = trustKey.PublicKey().KeyID()
763 763
 	d.repository = daemonRepo
764 764
 	d.containers = container.NewMemoryStore()
765
-	if d.containersReplica, err = container.NewMemDB(); err != nil {
765
+	if d.containersReplica, err = container.NewViewDB(); err != nil {
766 766
 		return nil, err
767 767
 	}
768 768
 	d.execCommands = exec.NewStore()
... ...
@@ -105,7 +105,7 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, forceRemo
105 105
 	// Mark container dead. We don't want anybody to be restarting it.
106 106
 	container.Lock()
107 107
 	container.Dead = true
108
-	if err = daemon.containersReplica.Save(container.Snapshot()); err != nil {
108
+	if err = container.CheckpointTo(daemon.containersReplica); err != nil {
109 109
 		container.Unlock()
110 110
 		return err
111 111
 	}
... ...
@@ -168,9 +168,9 @@ func handleProbeResult(d *Daemon, c *container.Container, result *types.Healthch
168 168
 	}
169 169
 
170 170
 	// replicate Health status changes
171
-	if err := d.containersReplica.Save(c.Snapshot()); err != nil {
171
+	if err := c.CheckpointTo(d.containersReplica); err != nil {
172 172
 		// queries will be inconsistent until the next probe runs or other state mutations
173
-		// trigger a replication
173
+		// checkpoint the container
174 174
 		logrus.Errorf("Error replicating health state for container %s: %v", c.ID, err)
175 175
 	}
176 176
 
... ...
@@ -29,7 +29,7 @@ func TestNoneHealthcheck(t *testing.T) {
29 29
 		},
30 30
 		State: &container.State{},
31 31
 	}
32
-	store, err := container.NewMemDB()
32
+	store, err := container.NewViewDB()
33 33
 	if err != nil {
34 34
 		t.Fatal(err)
35 35
 	}
... ...
@@ -69,7 +69,7 @@ func TestHealthStates(t *testing.T) {
69 69
 		},
70 70
 	}
71 71
 
72
-	store, err := container.NewMemDB()
72
+	store, err := container.NewViewDB()
73 73
 	if err != nil {
74 74
 		t.Fatal(err)
75 75
 	}
... ...
@@ -114,7 +114,7 @@ func (daemon *Daemon) Containers(config *types.ContainerListOptions) ([]*types.C
114 114
 	return daemon.reduceContainers(config, daemon.transformContainer)
115 115
 }
116 116
 
117
-func (daemon *Daemon) filterByNameIDMatches(view *container.View, ctx *listContext) ([]container.Snapshot, error) {
117
+func (daemon *Daemon) filterByNameIDMatches(view container.View, ctx *listContext) ([]container.Snapshot, error) {
118 118
 	idSearch := false
119 119
 	names := ctx.filters.Get("name")
120 120
 	ids := ctx.filters.Get("id")
... ...
@@ -240,7 +240,7 @@ func (daemon *Daemon) reducePsContainer(container *container.Snapshot, ctx *list
240 240
 }
241 241
 
242 242
 // foldFilter generates the container filter based on the user's filtering options.
243
-func (daemon *Daemon) foldFilter(view *container.View, config *types.ContainerListOptions) (*listContext, error) {
243
+func (daemon *Daemon) foldFilter(view container.View, config *types.ContainerListOptions) (*listContext, error) {
244 244
 	psFilters := config.Filters
245 245
 
246 246
 	if err := psFilters.Validate(acceptedPsFilterTags); err != nil {
... ...
@@ -90,10 +90,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
90 90
 		daemon.setStateCounter(c)
91 91
 
92 92
 		defer c.Unlock()
93
-		if err := daemon.containersReplica.Save(c.Snapshot()); err != nil {
94
-			return err
95
-		}
96
-		if err := c.ToDisk(); err != nil {
93
+		if err := c.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil {
97 94
 			return err
98 95
 		}
99 96
 		return daemon.postRunProcessing(c, e)
... ...
@@ -122,11 +119,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
122 122
 		c.HasBeenStartedBefore = true
123 123
 		daemon.setStateCounter(c)
124 124
 
125
-		if err := daemon.containersReplica.Save(c.Snapshot()); err != nil {
126
-			c.Reset(false)
127
-			return err
128
-		}
129
-		if err := c.ToDisk(); err != nil {
125
+		if err := c.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil {
130 126
 			c.Reset(false)
131 127
 			return err
132 128
 		}
... ...
@@ -137,10 +130,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
137 137
 		// Container is already locked in this case
138 138
 		c.Paused = true
139 139
 		daemon.setStateCounter(c)
140
-		if err := daemon.containersReplica.Save(c.Snapshot()); err != nil {
141
-			return err
142
-		}
143
-		if err := c.ToDisk(); err != nil {
140
+		if err := c.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil {
144 141
 			return err
145 142
 		}
146 143
 		daemon.updateHealthMonitor(c)
... ...
@@ -149,10 +139,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
149 149
 		// Container is already locked in this case
150 150
 		c.Paused = false
151 151
 		daemon.setStateCounter(c)
152
-		if err := daemon.containersReplica.Save(c.Snapshot()); err != nil {
153
-			return err
154
-		}
155
-		if err := c.ToDisk(); err != nil {
152
+		if err := c.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil {
156 153
 			return err
157 154
 		}
158 155
 		daemon.updateHealthMonitor(c)
... ...
@@ -82,10 +82,7 @@ func (daemon *Daemon) ContainerRename(oldName, newName string) error {
82 82
 		daemon.nameIndex.Release(oldName + k)
83 83
 	}
84 84
 	daemon.releaseName(oldName)
85
-	if err = daemon.containersReplica.Save(container.Snapshot()); err != nil {
86
-		return err
87
-	}
88
-	if err = container.ToDisk(); err != nil {
85
+	if err = container.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil {
89 86
 		return err
90 87
 	}
91 88
 
... ...
@@ -102,10 +99,7 @@ func (daemon *Daemon) ContainerRename(oldName, newName string) error {
102 102
 		if err != nil {
103 103
 			container.Name = oldName
104 104
 			container.NetworkSettings.IsAnonymousEndpoint = oldIsAnonymousEndpoint
105
-			if e := daemon.containersReplica.Save(container.Snapshot()); err != nil {
106
-				logrus.Errorf("%s: Failed in replicating state on rename failure: %v", container.ID, e)
107
-			}
108
-			if e := container.ToDisk(); e != nil {
105
+			if e := container.CheckpointAndSaveToDisk(daemon.containersReplica); e != nil {
109 106
 				logrus.Errorf("%s: Failed in writing to Disk on rename failure: %v", container.ID, e)
110 107
 			}
111 108
 		}
... ...
@@ -117,11 +117,8 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
117 117
 			if container.ExitCode() == 0 {
118 118
 				container.SetExitCode(128)
119 119
 			}
120
-			if err := daemon.containersReplica.Save(container.Snapshot()); err != nil {
121
-				logrus.Errorf("%s: failed replicating state on start failure: %v", container.ID, err)
122
-			}
123
-			if err := container.ToDisk(); err != nil {
124
-				logrus.Errorf("%s: failed writing to disk on start failure: %v", container.ID, err)
120
+			if err := container.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil {
121
+				logrus.Errorf("%s: failed saving state on start failure: %v", container.ID, err)
125 122
 			}
126 123
 			container.Reset(false)
127 124
 
... ...
@@ -38,8 +38,7 @@ func (daemon *Daemon) update(name string, hostConfig *container.HostConfig) erro
38 38
 		if restoreConfig {
39 39
 			container.Lock()
40 40
 			container.HostConfig = &backupHostConfig
41
-			daemon.containersReplica.Save(container.Snapshot())
42
-			container.ToDisk()
41
+			container.CheckpointAndSaveToDisk(daemon.containersReplica)
43 42
 			container.Unlock()
44 43
 		}
45 44
 	}()
... ...
@@ -54,7 +53,7 @@ func (daemon *Daemon) update(name string, hostConfig *container.HostConfig) erro
54 54
 		container.Unlock()
55 55
 		return errCannotUpdate(container.ID, err)
56 56
 	}
57
-	if err := daemon.containersReplica.Save(container.Snapshot()); err != nil {
57
+	if err := container.CheckpointTo(daemon.containersReplica); err != nil {
58 58
 		restoreConfig = true
59 59
 		container.Unlock()
60 60
 		return errCannotUpdate(container.ID, err)