Browse code

Engine: integer job status, improved stream API

* Jobs return an integer status instead of a string
* Status convention mimics unix process execution: 0=success, 1=generic error, 127="no such command"
* Stdout and Stderr support multiple thread-safe data receivers and ring buffer filtering

Solomon Hykes authored on 2013/11/20 16:37:03
Showing 12 changed files
... ...
@@ -1,6 +1,8 @@
1 1
 package docker
2 2
 
3 3
 import (
4
+	"bufio"
5
+	"bytes"
4 6
 	"code.google.com/p/go.net/websocket"
5 7
 	"encoding/base64"
6 8
 	"encoding/json"
... ...
@@ -565,12 +567,18 @@ func postContainersCreate(srv *Server, version float64, w http.ResponseWriter, r
565 565
 		job.SetenvList("Dns", defaultDns)
566 566
 	}
567 567
 	// Read container ID from the first line of stdout
568
-	job.StdoutParseString(&out.ID)
568
+	job.Stdout.AddString(&out.ID)
569 569
 	// Read warnings from stderr
570
-	job.StderrParseLines(&out.Warnings, 0)
570
+	warnings := &bytes.Buffer{}
571
+	job.Stderr.Add(warnings)
571 572
 	if err := job.Run(); err != nil {
572 573
 		return err
573 574
 	}
575
+	// Parse warnings from stderr
576
+	scanner := bufio.NewScanner(warnings)
577
+	for scanner.Scan() {
578
+		out.Warnings = append(out.Warnings, scanner.Text())
579
+	}
574 580
 	if job.GetenvInt("Memory") > 0 && !srv.runtime.capabilities.MemoryLimit {
575 581
 		log.Println("WARNING: Your kernel does not support memory limit capabilities. Limitation discarded.")
576 582
 		out.Warnings = append(out.Warnings, "Your kernel does not support memory limit capabilities. Limitation discarded.")
... ...
@@ -9,7 +9,7 @@ import (
9 9
 	"strings"
10 10
 )
11 11
 
12
-type Handler func(*Job) string
12
+type Handler func(*Job) Status
13 13
 
14 14
 var globalHandlers map[string]Handler
15 15
 
... ...
@@ -99,10 +99,12 @@ func (eng *Engine) Job(name string, args ...string) *Job {
99 99
 		Eng:    eng,
100 100
 		Name:   name,
101 101
 		Args:   args,
102
-		Stdin:  os.Stdin,
103
-		Stdout: os.Stdout,
104
-		Stderr: os.Stderr,
102
+		Stdin:  NewInput(),
103
+		Stdout: NewOutput(),
104
+		Stderr: NewOutput(),
105 105
 	}
106
+	job.Stdout.Add(utils.NopWriteCloser(os.Stdout))
107
+	job.Stderr.Add(utils.NopWriteCloser(os.Stderr))
106 108
 	handler, exists := eng.handlers[name]
107 109
 	if exists {
108 110
 		job.handler = handler
... ...
@@ -38,8 +38,9 @@ func TestJob(t *testing.T) {
38 38
 		t.Fatalf("job1.handler should be empty")
39 39
 	}
40 40
 
41
-	h := func(j *Job) string {
42
-		return j.Name
41
+	h := func(j *Job) Status {
42
+		j.Printf("%s\n", j.Name)
43
+		return 42
43 44
 	}
44 45
 
45 46
 	eng.Register("dummy2", h)
... ...
@@ -49,7 +50,7 @@ func TestJob(t *testing.T) {
49 49
 		t.Fatalf("job2.handler shouldn't be nil")
50 50
 	}
51 51
 
52
-	if job2.handler(job2) != job2.Name {
52
+	if job2.handler(job2) != 42 {
53 53
 		t.Fatalf("handler dummy2 was not found in job2")
54 54
 	}
55 55
 }
... ...
@@ -1,32 +1,18 @@
1 1
 package engine
2 2
 
3 3
 import (
4
-	"fmt"
5 4
 	"github.com/dotcloud/docker/utils"
6
-	"io/ioutil"
7
-	"runtime"
8
-	"strings"
9 5
 	"testing"
10 6
 )
11 7
 
12 8
 var globalTestID string
13 9
 
14 10
 func newTestEngine(t *testing.T) *Engine {
15
-	// Use the caller function name as a prefix.
16
-	// This helps trace temp directories back to their test.
17
-	pc, _, _, _ := runtime.Caller(1)
18
-	callerLongName := runtime.FuncForPC(pc).Name()
19
-	parts := strings.Split(callerLongName, ".")
20
-	callerShortName := parts[len(parts)-1]
21
-	if globalTestID == "" {
22
-		globalTestID = utils.RandomString()[:4]
23
-	}
24
-	prefix := fmt.Sprintf("docker-test%s-%s-", globalTestID, callerShortName)
25
-	root, err := ioutil.TempDir("", prefix)
11
+	tmp, err := utils.TestDirectory("")
26 12
 	if err != nil {
27 13
 		t.Fatal(err)
28 14
 	}
29
-	eng, err := New(root)
15
+	eng, err := New(tmp)
30 16
 	if err != nil {
31 17
 		t.Fatal(err)
32 18
 	}
... ...
@@ -1,16 +1,13 @@
1 1
 package engine
2 2
 
3 3
 import (
4
-	"bufio"
5 4
 	"bytes"
6 5
 	"encoding/json"
7 6
 	"fmt"
8 7
 	"io"
9
-	"io/ioutil"
10
-	"os"
11 8
 	"strconv"
12 9
 	"strings"
13
-	"sync"
10
+	"time"
14 11
 )
15 12
 
16 13
 // A job is the fundamental unit of work in the docker engine.
... ...
@@ -31,126 +28,75 @@ type Job struct {
31 31
 	Name    string
32 32
 	Args    []string
33 33
 	env     []string
34
-	Stdin   io.Reader
35
-	Stdout  io.Writer
36
-	Stderr  io.Writer
37
-	handler func(*Job) string
38
-	status  string
34
+	Stdout  *Output
35
+	Stderr  *Output
36
+	Stdin   *Input
37
+	handler Handler
38
+	status  Status
39
+	end     time.Time
39 40
 	onExit  []func()
40 41
 }
41 42
 
43
+type Status int
44
+
45
+const (
46
+	StatusOK       Status = 0
47
+	StatusErr      Status = 1
48
+	StatusNotFound Status = 127
49
+)
50
+
42 51
 // Run executes the job and blocks until the job completes.
43 52
 // If the job returns a failure status, an error is returned
44 53
 // which includes the status.
45 54
 func (job *Job) Run() error {
46
-	defer func() {
47
-		var wg sync.WaitGroup
48
-		for _, f := range job.onExit {
49
-			wg.Add(1)
50
-			go func(f func()) {
51
-				f()
52
-				wg.Done()
53
-			}(f)
54
-		}
55
-		wg.Wait()
56
-	}()
57
-	if job.Stdout != nil && job.Stdout != os.Stdout {
58
-		job.Stdout = io.MultiWriter(job.Stdout, os.Stdout)
59
-	}
60
-	if job.Stderr != nil && job.Stderr != os.Stderr {
61
-		job.Stderr = io.MultiWriter(job.Stderr, os.Stderr)
55
+	// FIXME: make this thread-safe
56
+	// FIXME: implement wait
57
+	if !job.end.IsZero() {
58
+		return fmt.Errorf("%s: job has already completed", job.Name)
62 59
 	}
60
+	// Log beginning and end of the job
63 61
 	job.Eng.Logf("+job %s", job.CallString())
64 62
 	defer func() {
65 63
 		job.Eng.Logf("-job %s%s", job.CallString(), job.StatusString())
66 64
 	}()
65
+	var errorMessage string
66
+	job.Stderr.AddString(&errorMessage)
67 67
 	if job.handler == nil {
68
-		job.status = "command not found"
68
+		job.Errorf("%s: command not found", job.Name)
69
+		job.status = 127
69 70
 	} else {
70 71
 		job.status = job.handler(job)
72
+		job.end = time.Now()
71 73
 	}
72
-	if job.status != "0" {
73
-		return fmt.Errorf("%s: %s", job.Name, job.status)
74
+	// Wait for all background tasks to complete
75
+	if err := job.Stdout.Close(); err != nil {
76
+		return err
77
+	}
78
+	if err := job.Stderr.Close(); err != nil {
79
+		return err
80
+	}
81
+	if job.status != 0 {
82
+		return fmt.Errorf("%s: %s", job.Name, errorMessage)
74 83
 	}
75 84
 	return nil
76 85
 }
77 86
 
78
-func (job *Job) StdoutParseLines(dst *[]string, limit int) {
79
-	job.parseLines(job.StdoutPipe(), dst, limit)
80
-}
81
-
82
-func (job *Job) StderrParseLines(dst *[]string, limit int) {
83
-	job.parseLines(job.StderrPipe(), dst, limit)
84
-}
85
-
86
-func (job *Job) parseLines(src io.Reader, dst *[]string, limit int) {
87
-	var wg sync.WaitGroup
88
-	wg.Add(1)
89
-	go func() {
90
-		defer wg.Done()
91
-		scanner := bufio.NewScanner(src)
92
-		for scanner.Scan() {
93
-			// If the limit is reached, flush the rest of the source and return
94
-			if limit > 0 && len(*dst) >= limit {
95
-				io.Copy(ioutil.Discard, src)
96
-				return
97
-			}
98
-			line := scanner.Text()
99
-			// Append the line (with delimitor removed)
100
-			*dst = append(*dst, line)
101
-		}
102
-	}()
103
-	job.onExit = append(job.onExit, wg.Wait)
104
-}
105
-
106
-func (job *Job) StdoutParseString(dst *string) {
107
-	lines := make([]string, 0, 1)
108
-	job.StdoutParseLines(&lines, 1)
109
-	job.onExit = append(job.onExit, func() {
110
-		if len(lines) >= 1 {
111
-			*dst = lines[0]
112
-		}
113
-	})
114
-}
115
-
116
-func (job *Job) StderrParseString(dst *string) {
117
-	lines := make([]string, 0, 1)
118
-	job.StderrParseLines(&lines, 1)
119
-	job.onExit = append(job.onExit, func() { *dst = lines[0] })
120
-}
121
-
122
-func (job *Job) StdoutPipe() io.ReadCloser {
123
-	r, w := io.Pipe()
124
-	job.Stdout = w
125
-	job.onExit = append(job.onExit, func() { w.Close() })
126
-	return r
127
-}
128
-
129
-func (job *Job) StderrPipe() io.ReadCloser {
130
-	r, w := io.Pipe()
131
-	job.Stderr = w
132
-	job.onExit = append(job.onExit, func() { w.Close() })
133
-	return r
134
-}
135
-
136 87
 func (job *Job) CallString() string {
137 88
 	return fmt.Sprintf("%s(%s)", job.Name, strings.Join(job.Args, ", "))
138 89
 }
139 90
 
140 91
 func (job *Job) StatusString() string {
141
-	// FIXME: if a job returns the empty string, it will be printed
142
-	// as not having returned.
143
-	// (this only affects String which is a convenience function).
144
-	if job.status != "" {
145
-		var okerr string
146
-		if job.status == "0" {
147
-			okerr = "OK"
148
-		} else {
149
-			okerr = "ERR"
150
-		}
151
-		return fmt.Sprintf(" = %s (%s)", okerr, job.status)
92
+	// If the job hasn't completed, status string is empty
93
+	if job.end.IsZero() {
94
+		return ""
95
+	}
96
+	var okerr string
97
+	if job.status == StatusOK {
98
+		okerr = "OK"
99
+	} else {
100
+		okerr = "ERR"
152 101
 	}
153
-	return ""
102
+	return fmt.Sprintf(" = %s (%d)", okerr, job.status)
154 103
 }
155 104
 
156 105
 // String returns a human-readable description of `job`
... ...
@@ -338,5 +284,8 @@ func (job *Job) Printf(format string, args ...interface{}) (n int, err error) {
338 338
 
339 339
 func (job *Job) Errorf(format string, args ...interface{}) (n int, err error) {
340 340
 	return fmt.Fprintf(job.Stderr, format, args...)
341
+}
341 342
 
343
+func (job *Job) Error(err error) (int, error) {
344
+	return fmt.Fprintf(job.Stderr, "%s", err)
342 345
 }
343 346
new file mode 100644
... ...
@@ -0,0 +1,166 @@
0
+package engine
1
+
2
+import (
3
+	"bufio"
4
+	"container/ring"
5
+	"fmt"
6
+	"io"
7
+	"sync"
8
+)
9
+
10
+type Output struct {
11
+	sync.Mutex
12
+	dests []io.Writer
13
+	tasks sync.WaitGroup
14
+}
15
+
16
+// NewOutput returns a new Output object with no destinations attached.
17
+// Writing to an empty Output will cause the written data to be discarded.
18
+func NewOutput() *Output {
19
+	return &Output{}
20
+}
21
+
22
+// Add attaches a new destination to the Output. Any data subsequently written
23
+// to the output will be written to the new destination in addition to all the others.
24
+// This method is thread-safe.
25
+// FIXME: Add cannot fail
26
+func (o *Output) Add(dst io.Writer) error {
27
+	o.Mutex.Lock()
28
+	defer o.Mutex.Unlock()
29
+	o.dests = append(o.dests, dst)
30
+	return nil
31
+}
32
+
33
+// AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination,
34
+// and returns its reading end for consumption by the caller.
35
+// This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package.
36
+// This method is thread-safe.
37
+func (o *Output) AddPipe() (io.Reader, error) {
38
+	r, w := io.Pipe()
39
+	o.Add(w)
40
+	return r, nil
41
+}
42
+
43
+// AddTail starts a new goroutine which will read all subsequent data written to the output,
44
+// line by line, and append the last `n` lines to `dst`.
45
+func (o *Output) AddTail(dst *[]string, n int) error {
46
+	src, err := o.AddPipe()
47
+	if err != nil {
48
+		return err
49
+	}
50
+	o.tasks.Add(1)
51
+	go func() {
52
+		defer o.tasks.Done()
53
+		Tail(src, n, dst)
54
+	}()
55
+	return nil
56
+}
57
+
58
+// AddString starts a new goroutine which will read all subsequent data written to the output,
59
+// line by line, and store the last line into `dst`.
60
+func (o *Output) AddString(dst *string) error {
61
+	src, err := o.AddPipe()
62
+	if err != nil {
63
+		return err
64
+	}
65
+	o.tasks.Add(1)
66
+	go func() {
67
+		defer o.tasks.Done()
68
+		lines := make([]string, 0, 1)
69
+		Tail(src, 1, &lines)
70
+		if len(lines) == 0 {
71
+			*dst = ""
72
+		} else {
73
+			*dst = lines[0]
74
+		}
75
+	}()
76
+	return nil
77
+}
78
+
79
+// Write writes the same data to all registered destinations.
80
+// This method is thread-safe.
81
+func (o *Output) Write(p []byte) (n int, err error) {
82
+	o.Mutex.Lock()
83
+	defer o.Mutex.Unlock()
84
+	var firstErr error
85
+	for _, dst := range o.dests {
86
+		_, err := dst.Write(p)
87
+		if err != nil && firstErr == nil {
88
+			firstErr = err
89
+		}
90
+	}
91
+	return len(p), err
92
+}
93
+
94
+// Close unregisters all destinations and waits for all background
95
+// AddTail and AddString tasks to complete.
96
+// The Close method of each destination is called if it exists.
97
+func (o *Output) Close() error {
98
+	o.Mutex.Lock()
99
+	defer o.Mutex.Unlock()
100
+	var firstErr error
101
+	for _, dst := range o.dests {
102
+		if closer, ok := dst.(io.WriteCloser); ok {
103
+			err := closer.Close()
104
+			if err != nil && firstErr == nil {
105
+				firstErr = err
106
+			}
107
+		}
108
+	}
109
+	o.tasks.Wait()
110
+	return firstErr
111
+}
112
+
113
+type Input struct {
114
+	src io.Reader
115
+	sync.Mutex
116
+}
117
+
118
+// NewInput returns a new Input object with no source attached.
119
+// Reading to an empty Input will return io.EOF.
120
+func NewInput() *Input {
121
+	return &Input{}
122
+}
123
+
124
+// Read reads from the input in a thread-safe way.
125
+func (i *Input) Read(p []byte) (n int, err error) {
126
+	i.Mutex.Lock()
127
+	defer i.Mutex.Unlock()
128
+	if i.src == nil {
129
+		return 0, io.EOF
130
+	}
131
+	return i.src.Read(p)
132
+}
133
+
134
+// Add attaches a new source to the input.
135
+// Add can only be called once per input. Subsequent calls will
136
+// return an error.
137
+func (i *Input) Add(src io.Reader) error {
138
+	i.Mutex.Lock()
139
+	defer i.Mutex.Unlock()
140
+	if i.src != nil {
141
+		return fmt.Errorf("Maximum number of sources reached: 1")
142
+	}
143
+	i.src = src
144
+	return nil
145
+}
146
+
147
+// Tail reads from `src` line per line, and returns the last `n` lines as an array.
148
+// A ring buffer is used to only store `n` lines at any time.
149
+func Tail(src io.Reader, n int, dst *[]string) {
150
+	scanner := bufio.NewScanner(src)
151
+	r := ring.New(n)
152
+	for scanner.Scan() {
153
+		if n == 0 {
154
+			continue
155
+		}
156
+		r.Value = scanner.Text()
157
+		r = r.Next()
158
+	}
159
+	r.Do(func(v interface{}) {
160
+		if v == nil {
161
+			return
162
+		}
163
+		*dst = append(*dst, v.(string))
164
+	})
165
+}
... ...
@@ -304,6 +304,10 @@ func TestGetContainersJSON(t *testing.T) {
304 304
 		Cmd:   []string{"echo", "test"},
305 305
 	}, t)
306 306
 
307
+	if containerID == "" {
308
+		t.Fatalf("Received empty container ID")
309
+	}
310
+
307 311
 	req, err := http.NewRequest("GET", "/containers/json?all=1", nil)
308 312
 	if err != nil {
309 313
 		t.Fatal(err)
... ...
@@ -499,7 +499,7 @@ func TestCreateVolume(t *testing.T) {
499 499
 		t.Fatal(err)
500 500
 	}
501 501
 	var id string
502
-	jobCreate.StdoutParseString(&id)
502
+	jobCreate.Stdout.AddString(&id)
503 503
 	if err := jobCreate.Run(); err != nil {
504 504
 		t.Fatal(err)
505 505
 	}
... ...
@@ -1502,7 +1502,7 @@ func TestOnlyLoopbackExistsWhenUsingDisableNetworkOption(t *testing.T) {
1502 1502
 		t.Fatal(err)
1503 1503
 	}
1504 1504
 	var id string
1505
-	jobCreate.StdoutParseString(&id)
1505
+	jobCreate.Stdout.AddString(&id)
1506 1506
 	if err := jobCreate.Run(); err != nil {
1507 1507
 		t.Fatal(err)
1508 1508
 	}
... ...
@@ -390,7 +390,7 @@ func startEchoServerContainer(t *testing.T, proto string) (*docker.Runtime, *doc
390 390
 		jobCreate.SetenvList("Cmd", []string{"sh", "-c", cmd})
391 391
 		jobCreate.SetenvList("PortSpecs", []string{fmt.Sprintf("%s/%s", strPort, proto)})
392 392
 		jobCreate.SetenvJson("ExposedPorts", ep)
393
-		jobCreate.StdoutParseString(&id)
393
+		jobCreate.Stdout.AddString(&id)
394 394
 		if err := jobCreate.Run(); err != nil {
395 395
 			t.Fatal(err)
396 396
 		}
... ...
@@ -224,7 +224,7 @@ func TestRunWithTooLowMemoryLimit(t *testing.T) {
224 224
 	job.Setenv("CpuShares", "1000")
225 225
 	job.SetenvList("Cmd", []string{"/bin/cat"})
226 226
 	var id string
227
-	job.StdoutParseString(&id)
227
+	job.Stdout.AddString(&id)
228 228
 	if err := job.Run(); err == nil {
229 229
 		t.Errorf("Memory limit is smaller than the allowed limit. Container creation should've failed!")
230 230
 	}
... ...
@@ -46,7 +46,7 @@ func createNamedTestContainer(eng *engine.Engine, config *docker.Config, f utils
46 46
 	if err := job.ImportEnv(config); err != nil {
47 47
 		f.Fatal(err)
48 48
 	}
49
-	job.StdoutParseString(&shortId)
49
+	job.Stdout.AddString(&shortId)
50 50
 	if err := job.Run(); err != nil {
51 51
 		f.Fatal(err)
52 52
 	}
... ...
@@ -39,15 +39,18 @@ func init() {
39 39
 // jobInitApi runs the remote api server `srv` as a daemon,
40 40
 // Only one api server can run at the same time - this is enforced by a pidfile.
41 41
 // The signals SIGINT and SIGTERM are intercepted for cleanup.
42
-func jobInitApi(job *engine.Job) string {
42
+func jobInitApi(job *engine.Job) engine.Status {
43 43
 	job.Logf("Creating server")
44
+	// FIXME: ImportEnv deprecates ConfigFromJob
44 45
 	srv, err := NewServer(job.Eng, ConfigFromJob(job))
45 46
 	if err != nil {
46
-		return err.Error()
47
+		job.Error(err)
48
+		return engine.StatusErr
47 49
 	}
48 50
 	if srv.runtime.config.Pidfile != "" {
49 51
 		job.Logf("Creating pidfile")
50 52
 		if err := utils.CreatePidFile(srv.runtime.config.Pidfile); err != nil {
53
+			// FIXME: do we need fatal here instead of returning a job error?
51 54
 			log.Fatal(err)
52 55
 		}
53 56
 	}
... ...
@@ -68,18 +71,21 @@ func jobInitApi(job *engine.Job) string {
68 68
 		job.Eng.Hack_SetGlobalVar("httpapi.bridgeIP", srv.runtime.networkManager.bridgeNetwork.IP)
69 69
 	}
70 70
 	if err := job.Eng.Register("create", srv.ContainerCreate); err != nil {
71
-		return err.Error()
71
+		job.Error(err)
72
+		return engine.StatusErr
72 73
 	}
73 74
 	if err := job.Eng.Register("start", srv.ContainerStart); err != nil {
74
-		return err.Error()
75
+		job.Error(err)
76
+		return engine.StatusErr
75 77
 	}
76 78
 	if err := job.Eng.Register("serveapi", srv.ListenAndServe); err != nil {
77
-		return err.Error()
79
+		job.Error(err)
80
+		return engine.StatusErr
78 81
 	}
79
-	return "0"
82
+	return engine.StatusOK
80 83
 }
81 84
 
82
-func (srv *Server) ListenAndServe(job *engine.Job) string {
85
+func (srv *Server) ListenAndServe(job *engine.Job) engine.Status {
83 86
 	protoAddrs := job.Args
84 87
 	chErrors := make(chan error, len(protoAddrs))
85 88
 	for _, protoAddr := range protoAddrs {
... ...
@@ -94,7 +100,8 @@ func (srv *Server) ListenAndServe(job *engine.Job) string {
94 94
 				log.Println("/!\\ DON'T BIND ON ANOTHER IP ADDRESS THAN 127.0.0.1 IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\")
95 95
 			}
96 96
 		default:
97
-			return "Invalid protocol format."
97
+			job.Errorf("Invalid protocol format.")
98
+			return engine.StatusErr
98 99
 		}
99 100
 		go func() {
100 101
 			// FIXME: merge Server.ListenAndServe with ListenAndServe
... ...
@@ -104,10 +111,11 @@ func (srv *Server) ListenAndServe(job *engine.Job) string {
104 104
 	for i := 0; i < len(protoAddrs); i += 1 {
105 105
 		err := <-chErrors
106 106
 		if err != nil {
107
-			return err.Error()
107
+			job.Error(err)
108
+			return engine.StatusErr
108 109
 		}
109 110
 	}
110
-	return "0"
111
+	return engine.StatusOK
111 112
 }
112 113
 
113 114
 func (srv *Server) DockerVersion() APIVersion {
... ...
@@ -1260,19 +1268,22 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write
1260 1260
 	return nil
1261 1261
 }
1262 1262
 
1263
-func (srv *Server) ContainerCreate(job *engine.Job) string {
1263
+func (srv *Server) ContainerCreate(job *engine.Job) engine.Status {
1264 1264
 	var name string
1265 1265
 	if len(job.Args) == 1 {
1266 1266
 		name = job.Args[0]
1267 1267
 	} else if len(job.Args) > 1 {
1268
-		return fmt.Sprintf("Usage: %s ", job.Name)
1268
+		job.Printf("Usage: %s", job.Name)
1269
+		return engine.StatusErr
1269 1270
 	}
1270 1271
 	var config Config
1271 1272
 	if err := job.ExportEnv(&config); err != nil {
1272
-		return err.Error()
1273
+		job.Error(err)
1274
+		return engine.StatusErr
1273 1275
 	}
1274 1276
 	if config.Memory != 0 && config.Memory < 524288 {
1275
-		return "Minimum memory limit allowed is 512k"
1277
+		job.Errorf("Minimum memory limit allowed is 512k")
1278
+		return engine.StatusErr
1276 1279
 	}
1277 1280
 	if config.Memory > 0 && !srv.runtime.capabilities.MemoryLimit {
1278 1281
 		config.Memory = 0
... ...
@@ -1287,9 +1298,11 @@ func (srv *Server) ContainerCreate(job *engine.Job) string {
1287 1287
 			if tag == "" {
1288 1288
 				tag = DEFAULTTAG
1289 1289
 			}
1290
-			return fmt.Sprintf("No such image: %s (tag: %s)", config.Image, tag)
1290
+			job.Errorf("No such image: %s (tag: %s)", config.Image, tag)
1291
+			return engine.StatusErr
1291 1292
 		}
1292
-		return err.Error()
1293
+		job.Error(err)
1294
+		return engine.StatusErr
1293 1295
 	}
1294 1296
 	srv.LogEvent("create", container.ID, srv.runtime.repositories.ImageName(container.Image))
1295 1297
 	// FIXME: this is necessary because runtime.Create might return a nil container
... ...
@@ -1301,7 +1314,7 @@ func (srv *Server) ContainerCreate(job *engine.Job) string {
1301 1301
 	for _, warning := range buildWarnings {
1302 1302
 		job.Errorf("%s\n", warning)
1303 1303
 	}
1304
-	return "0"
1304
+	return engine.StatusOK
1305 1305
 }
1306 1306
 
1307 1307
 func (srv *Server) ContainerRestart(name string, t int) error {
... ...
@@ -1619,22 +1632,25 @@ func (srv *Server) RegisterLinks(name string, hostConfig *HostConfig) error {
1619 1619
 	return nil
1620 1620
 }
1621 1621
 
1622
-func (srv *Server) ContainerStart(job *engine.Job) string {
1622
+func (srv *Server) ContainerStart(job *engine.Job) engine.Status {
1623 1623
 	if len(job.Args) < 1 {
1624
-		return fmt.Sprintf("Usage: %s container_id", job.Name)
1624
+		job.Errorf("Usage: %s container_id", job.Name)
1625
+		return engine.StatusErr
1625 1626
 	}
1626 1627
 	name := job.Args[0]
1627 1628
 	runtime := srv.runtime
1628 1629
 	container := runtime.Get(name)
1629 1630
 
1630 1631
 	if container == nil {
1631
-		return fmt.Sprintf("No such container: %s", name)
1632
+		job.Errorf("No such container: %s", name)
1633
+		return engine.StatusErr
1632 1634
 	}
1633 1635
 	// If no environment was set, then no hostconfig was passed.
1634 1636
 	if len(job.Environ()) > 0 {
1635 1637
 		var hostConfig HostConfig
1636 1638
 		if err := job.ExportEnv(&hostConfig); err != nil {
1637
-			return err.Error()
1639
+			job.Error(err)
1640
+			return engine.StatusErr
1638 1641
 		}
1639 1642
 		// Validate the HostConfig binds. Make sure that:
1640 1643
 		// 1) the source of a bind mount isn't /
... ...
@@ -1647,29 +1663,33 @@ func (srv *Server) ContainerStart(job *engine.Job) string {
1647 1647
 
1648 1648
 			// refuse to bind mount "/" to the container
1649 1649
 			if source == "/" {
1650
-				return fmt.Sprintf("Invalid bind mount '%s' : source can't be '/'", bind)
1650
+				job.Errorf("Invalid bind mount '%s' : source can't be '/'", bind)
1651
+				return engine.StatusErr
1651 1652
 			}
1652 1653
 
1653 1654
 			// ensure the source exists on the host
1654 1655
 			_, err := os.Stat(source)
1655 1656
 			if err != nil && os.IsNotExist(err) {
1656
-				return fmt.Sprintf("Invalid bind mount '%s' : source doesn't exist", bind)
1657
+				job.Errorf("Invalid bind mount '%s' : source doesn't exist", bind)
1658
+				return engine.StatusErr
1657 1659
 			}
1658 1660
 		}
1659 1661
 		// Register any links from the host config before starting the container
1660 1662
 		// FIXME: we could just pass the container here, no need to lookup by name again.
1661 1663
 		if err := srv.RegisterLinks(name, &hostConfig); err != nil {
1662
-			return err.Error()
1664
+			job.Error(err)
1665
+			return engine.StatusErr
1663 1666
 		}
1664 1667
 		container.hostConfig = &hostConfig
1665 1668
 		container.ToDisk()
1666 1669
 	}
1667 1670
 	if err := container.Start(); err != nil {
1668
-		return fmt.Sprintf("Cannot start container %s: %s", name, err)
1671
+		job.Errorf("Cannot start container %s: %s", name, err)
1672
+		return engine.StatusErr
1669 1673
 	}
1670 1674
 	srv.LogEvent("start", container.ID, runtime.repositories.ImageName(container.Image))
1671 1675
 
1672
-	return "0"
1676
+	return engine.StatusOK
1673 1677
 }
1674 1678
 
1675 1679
 func (srv *Server) ContainerStop(name string, t int) error {