Browse code

Update layer store to sync transaction files before committing

Fixes case where shutdown occurs before content is synced to disked
on layer creation. This case can leave the layer store in an bad
state and require manual recovery. This change ensures all files
are synced to disk before a layer is committed. Any shutdown that
occurs will only cause the layer to not show up but will allow it to
be repulled or recreated without error.

Added generic io logic to ioutils package to abstract it out of
the layer store package.


Signed-off-by: Derek McGowan <derek@mcgstyle.net>

Derek McGowan authored on 2016/08/10 03:55:17
Showing 3 changed files
... ...
@@ -34,7 +34,7 @@ type fileMetadataStore struct {
34 34
 
35 35
 type fileMetadataTransaction struct {
36 36
 	store *fileMetadataStore
37
-	root  string
37
+	ws    *ioutils.AtomicWriteSet
38 38
 }
39 39
 
40 40
 // NewFSMetadataStore returns an instance of a metadata store
... ...
@@ -71,33 +71,32 @@ func (fms *fileMetadataStore) StartTransaction() (MetadataTransaction, error) {
71 71
 	if err := os.MkdirAll(tmpDir, 0755); err != nil {
72 72
 		return nil, err
73 73
 	}
74
-
75
-	td, err := ioutil.TempDir(tmpDir, "layer-")
74
+	ws, err := ioutils.NewAtomicWriteSet(tmpDir)
76 75
 	if err != nil {
77 76
 		return nil, err
78 77
 	}
79
-	// Create a new tempdir
78
+
80 79
 	return &fileMetadataTransaction{
81 80
 		store: fms,
82
-		root:  td,
81
+		ws:    ws,
83 82
 	}, nil
84 83
 }
85 84
 
86 85
 func (fm *fileMetadataTransaction) SetSize(size int64) error {
87 86
 	content := fmt.Sprintf("%d", size)
88
-	return ioutil.WriteFile(filepath.Join(fm.root, "size"), []byte(content), 0644)
87
+	return fm.ws.WriteFile("size", []byte(content), 0644)
89 88
 }
90 89
 
91 90
 func (fm *fileMetadataTransaction) SetParent(parent ChainID) error {
92
-	return ioutil.WriteFile(filepath.Join(fm.root, "parent"), []byte(digest.Digest(parent).String()), 0644)
91
+	return fm.ws.WriteFile("parent", []byte(digest.Digest(parent).String()), 0644)
93 92
 }
94 93
 
95 94
 func (fm *fileMetadataTransaction) SetDiffID(diff DiffID) error {
96
-	return ioutil.WriteFile(filepath.Join(fm.root, "diff"), []byte(digest.Digest(diff).String()), 0644)
95
+	return fm.ws.WriteFile("diff", []byte(digest.Digest(diff).String()), 0644)
97 96
 }
98 97
 
99 98
 func (fm *fileMetadataTransaction) SetCacheID(cacheID string) error {
100
-	return ioutil.WriteFile(filepath.Join(fm.root, "cache-id"), []byte(cacheID), 0644)
99
+	return fm.ws.WriteFile("cache-id", []byte(cacheID), 0644)
101 100
 }
102 101
 
103 102
 func (fm *fileMetadataTransaction) SetDescriptor(ref distribution.Descriptor) error {
... ...
@@ -105,11 +104,11 @@ func (fm *fileMetadataTransaction) SetDescriptor(ref distribution.Descriptor) er
105 105
 	if err != nil {
106 106
 		return err
107 107
 	}
108
-	return ioutil.WriteFile(filepath.Join(fm.root, "descriptor.json"), jsonRef, 0644)
108
+	return fm.ws.WriteFile("descriptor.json", jsonRef, 0644)
109 109
 }
110 110
 
111 111
 func (fm *fileMetadataTransaction) TarSplitWriter(compressInput bool) (io.WriteCloser, error) {
112
-	f, err := os.OpenFile(filepath.Join(fm.root, "tar-split.json.gz"), os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
112
+	f, err := fm.ws.FileWriter("tar-split.json.gz", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
113 113
 	if err != nil {
114 114
 		return nil, err
115 115
 	}
... ...
@@ -131,15 +130,16 @@ func (fm *fileMetadataTransaction) Commit(layer ChainID) error {
131 131
 	if err := os.MkdirAll(filepath.Dir(finalDir), 0755); err != nil {
132 132
 		return err
133 133
 	}
134
-	return os.Rename(fm.root, finalDir)
134
+
135
+	return fm.ws.Commit(finalDir)
135 136
 }
136 137
 
137 138
 func (fm *fileMetadataTransaction) Cancel() error {
138
-	return os.RemoveAll(fm.root)
139
+	return fm.ws.Cancel()
139 140
 }
140 141
 
141 142
 func (fm *fileMetadataTransaction) String() string {
142
-	return fm.root
143
+	return fm.ws.String()
143 144
 }
144 145
 
145 146
 func (fms *fileMetadataStore) GetSize(layer ChainID) (int64, error) {
... ...
@@ -80,3 +80,83 @@ func (w *atomicFileWriter) Close() (retErr error) {
80 80
 	}
81 81
 	return nil
82 82
 }
83
+
84
+// AtomicWriteSet is used to atomically write a set
85
+// of files and ensure they are visible at the same time.
86
+// Must be committed to a new directory.
87
+type AtomicWriteSet struct {
88
+	root string
89
+}
90
+
91
+// NewAtomicWriteSet creates a new atomic write set to
92
+// atomically create a set of files. The given directory
93
+// is used as the base directory for storing files before
94
+// commit. If no temporary directory is given the system
95
+// default is used.
96
+func NewAtomicWriteSet(tmpDir string) (*AtomicWriteSet, error) {
97
+	td, err := ioutil.TempDir(tmpDir, "write-set-")
98
+	if err != nil {
99
+		return nil, err
100
+	}
101
+
102
+	return &AtomicWriteSet{
103
+		root: td,
104
+	}, nil
105
+}
106
+
107
+// WriteFile writes a file to the set, guaranteeing the file
108
+// has been synced.
109
+func (ws *AtomicWriteSet) WriteFile(filename string, data []byte, perm os.FileMode) error {
110
+	f, err := ws.FileWriter(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
111
+	if err != nil {
112
+		return err
113
+	}
114
+	n, err := f.Write(data)
115
+	if err == nil && n < len(data) {
116
+		err = io.ErrShortWrite
117
+	}
118
+	if err1 := f.Close(); err == nil {
119
+		err = err1
120
+	}
121
+	return err
122
+}
123
+
124
+type syncFileCloser struct {
125
+	*os.File
126
+}
127
+
128
+func (w syncFileCloser) Close() error {
129
+	err := w.File.Sync()
130
+	if err1 := w.File.Close(); err == nil {
131
+		err = err1
132
+	}
133
+	return err
134
+}
135
+
136
+// FileWriter opens a file writer inside the set. The file
137
+// should be synced and closed before calling commit.
138
+func (ws *AtomicWriteSet) FileWriter(name string, flag int, perm os.FileMode) (io.WriteCloser, error) {
139
+	f, err := os.OpenFile(filepath.Join(ws.root, name), flag, perm)
140
+	if err != nil {
141
+		return nil, err
142
+	}
143
+	return syncFileCloser{f}, nil
144
+}
145
+
146
+// Cancel cancels the set and removes all temporary data
147
+// created in the set.
148
+func (ws *AtomicWriteSet) Cancel() error {
149
+	return os.RemoveAll(ws.root)
150
+}
151
+
152
+// Commit moves all created files to the target directory. The
153
+// target directory must not exist and the parent of the target
154
+// directory must exist.
155
+func (ws *AtomicWriteSet) Commit(target string) error {
156
+	return os.Rename(ws.root, target)
157
+}
158
+
159
+// String returns the location the set is writing to.
160
+func (ws *AtomicWriteSet) String() string {
161
+	return ws.root
162
+}
... ...
@@ -5,9 +5,21 @@ import (
5 5
 	"io/ioutil"
6 6
 	"os"
7 7
 	"path/filepath"
8
+	"runtime"
8 9
 	"testing"
9 10
 )
10 11
 
12
+var (
13
+	testMode os.FileMode = 0640
14
+)
15
+
16
+func init() {
17
+	// Windows does not support full Linux file mode
18
+	if runtime.GOOS == "windows" {
19
+		testMode = 0666
20
+	}
21
+}
22
+
11 23
 func TestAtomicWriteToFile(t *testing.T) {
12 24
 	tmpDir, err := ioutil.TempDir("", "atomic-writers-test")
13 25
 	if err != nil {
... ...
@@ -16,7 +28,7 @@ func TestAtomicWriteToFile(t *testing.T) {
16 16
 	defer os.RemoveAll(tmpDir)
17 17
 
18 18
 	expected := []byte("barbaz")
19
-	if err := AtomicWriteFile(filepath.Join(tmpDir, "foo"), expected, 0666); err != nil {
19
+	if err := AtomicWriteFile(filepath.Join(tmpDir, "foo"), expected, testMode); err != nil {
20 20
 		t.Fatalf("Error writing to file: %v", err)
21 21
 	}
22 22
 
... ...
@@ -33,7 +45,88 @@ func TestAtomicWriteToFile(t *testing.T) {
33 33
 	if err != nil {
34 34
 		t.Fatalf("Error statting file: %v", err)
35 35
 	}
36
-	if expected := os.FileMode(0666); st.Mode() != expected {
36
+	if expected := os.FileMode(testMode); st.Mode() != expected {
37
+		t.Fatalf("Mode mismatched, expected %o, got %o", expected, st.Mode())
38
+	}
39
+}
40
+
41
+func TestAtomicWriteSetCommit(t *testing.T) {
42
+	tmpDir, err := ioutil.TempDir("", "atomic-writerset-test")
43
+	if err != nil {
44
+		t.Fatalf("Error when creating temporary directory: %s", err)
45
+	}
46
+	defer os.RemoveAll(tmpDir)
47
+
48
+	if err := os.Mkdir(filepath.Join(tmpDir, "tmp"), 0700); err != nil {
49
+		t.Fatalf("Error creating tmp directory: %s", err)
50
+	}
51
+
52
+	targetDir := filepath.Join(tmpDir, "target")
53
+	ws, err := NewAtomicWriteSet(filepath.Join(tmpDir, "tmp"))
54
+	if err != nil {
55
+		t.Fatalf("Error creating atomic write set: %s", err)
56
+	}
57
+
58
+	expected := []byte("barbaz")
59
+	if err := ws.WriteFile("foo", expected, testMode); err != nil {
60
+		t.Fatalf("Error writing to file: %v", err)
61
+	}
62
+
63
+	if _, err := ioutil.ReadFile(filepath.Join(targetDir, "foo")); err == nil {
64
+		t.Fatalf("Expected error reading file where should not exist")
65
+	}
66
+
67
+	if err := ws.Commit(targetDir); err != nil {
68
+		t.Fatalf("Error committing file: %s", err)
69
+	}
70
+
71
+	actual, err := ioutil.ReadFile(filepath.Join(targetDir, "foo"))
72
+	if err != nil {
73
+		t.Fatalf("Error reading from file: %v", err)
74
+	}
75
+
76
+	if bytes.Compare(actual, expected) != 0 {
77
+		t.Fatalf("Data mismatch, expected %q, got %q", expected, actual)
78
+	}
79
+
80
+	st, err := os.Stat(filepath.Join(targetDir, "foo"))
81
+	if err != nil {
82
+		t.Fatalf("Error statting file: %v", err)
83
+	}
84
+	if expected := os.FileMode(testMode); st.Mode() != expected {
37 85
 		t.Fatalf("Mode mismatched, expected %o, got %o", expected, st.Mode())
38 86
 	}
87
+
88
+}
89
+
90
+func TestAtomicWriteSetCancel(t *testing.T) {
91
+	tmpDir, err := ioutil.TempDir("", "atomic-writerset-test")
92
+	if err != nil {
93
+		t.Fatalf("Error when creating temporary directory: %s", err)
94
+	}
95
+	defer os.RemoveAll(tmpDir)
96
+
97
+	if err := os.Mkdir(filepath.Join(tmpDir, "tmp"), 0700); err != nil {
98
+		t.Fatalf("Error creating tmp directory: %s", err)
99
+	}
100
+
101
+	ws, err := NewAtomicWriteSet(filepath.Join(tmpDir, "tmp"))
102
+	if err != nil {
103
+		t.Fatalf("Error creating atomic write set: %s", err)
104
+	}
105
+
106
+	expected := []byte("barbaz")
107
+	if err := ws.WriteFile("foo", expected, testMode); err != nil {
108
+		t.Fatalf("Error writing to file: %v", err)
109
+	}
110
+
111
+	if err := ws.Cancel(); err != nil {
112
+		t.Fatalf("Error committing file: %s", err)
113
+	}
114
+
115
+	if _, err := ioutil.ReadFile(filepath.Join(tmpDir, "target", "foo")); err == nil {
116
+		t.Fatalf("Expected error reading file where should not exist")
117
+	} else if !os.IsNotExist(err) {
118
+		t.Fatalf("Unexpected error reading file: %s", err)
119
+	}
39 120
 }