Browse code

- Runtime: Forbid parralel push/pull for a single image/repo. Fixes #311

Guillaume J. Charmes authored on 2013/06/18 08:10:00
Showing 2 changed files
... ...
@@ -65,7 +65,11 @@ func init() {
65 65
 
66 66
 	// Create the "Server"
67 67
 	srv := &Server{
68
-		runtime: runtime,
68
+		runtime:     runtime,
69
+		enableCors:  false,
70
+		lock:        &sync.Mutex{},
71
+		pullingPool: make(map[string]struct{}),
72
+		pushingPool: make(map[string]struct{}),
69 73
 	}
70 74
 	// Retrieve the Image
71 75
 	if err := srv.ImagePull(unitTestImageName, "", "", os.Stdout, utils.NewStreamFormatter(false), nil); err != nil {
... ...
@@ -15,6 +15,7 @@ import (
15 15
 	"path"
16 16
 	"runtime"
17 17
 	"strings"
18
+	"sync"
18 19
 )
19 20
 
20 21
 func (srv *Server) DockerVersion() APIVersion {
... ...
@@ -401,7 +402,47 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, local, re
401 401
 	return nil
402 402
 }
403 403
 
404
+func (srv *Server) poolAdd(kind, key string) error {
405
+	srv.lock.Lock()
406
+	defer srv.lock.Unlock()
407
+
408
+	if _, exists := srv.pullingPool[key]; exists {
409
+		return fmt.Errorf("%s %s is already in progress", key, kind)
410
+	}
411
+
412
+	switch kind {
413
+	case "pull":
414
+		srv.pullingPool[key] = struct{}{}
415
+		break
416
+	case "push":
417
+		srv.pushingPool[key] = struct{}{}
418
+		break
419
+	default:
420
+		return fmt.Errorf("Unkown pool type")
421
+	}
422
+	return nil
423
+}
424
+
425
+func (srv *Server) poolRemove(kind, key string) error {
426
+	switch kind {
427
+	case "pull":
428
+		delete(srv.pullingPool, key)
429
+		break
430
+	case "push":
431
+		delete(srv.pushingPool, key)
432
+		break
433
+	default:
434
+		return fmt.Errorf("Unkown pool type")
435
+	}
436
+	return nil
437
+}
438
+
404 439
 func (srv *Server) ImagePull(name, tag, endpoint string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig) error {
440
+	if err := srv.poolAdd("pull", name+":"+tag); err != nil {
441
+		return err
442
+	}
443
+	defer srv.poolRemove("pull", name+":"+tag)
444
+
405 445
 	r := registry.NewRegistry(srv.runtime.root, authConfig)
406 446
 	out = utils.NewWriteFlusher(out)
407 447
 	if endpoint != "" {
... ...
@@ -418,7 +459,6 @@ func (srv *Server) ImagePull(name, tag, endpoint string, out io.Writer, sf *util
418 418
 	if err := srv.pullRepository(r, out, name, remote, tag, sf); err != nil {
419 419
 		return err
420 420
 	}
421
-
422 421
 	return nil
423 422
 }
424 423
 
... ...
@@ -593,7 +633,13 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgId,
593 593
 	return nil
594 594
 }
595 595
 
596
+// FIXME: Allow to interupt current push when new push of same image is done.
596 597
 func (srv *Server) ImagePush(name, endpoint string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig) error {
598
+	if err := srv.poolAdd("push", name); err != nil {
599
+		return err
600
+	}
601
+	defer srv.poolRemove("push", name)
602
+
597 603
 	out = utils.NewWriteFlusher(out)
598 604
 	img, err := srv.runtime.graph.Get(name)
599 605
 	r := registry.NewRegistry(srv.runtime.root, authConfig)
... ...
@@ -991,14 +1037,20 @@ func NewServer(autoRestart, enableCors bool, dns ListOpts) (*Server, error) {
991 991
 		return nil, err
992 992
 	}
993 993
 	srv := &Server{
994
-		runtime:    runtime,
995
-		enableCors: enableCors,
994
+		runtime:     runtime,
995
+		enableCors:  enableCors,
996
+		lock:        &sync.Mutex{},
997
+		pullingPool: make(map[string]struct{}),
998
+		pushingPool: make(map[string]struct{}),
996 999
 	}
997 1000
 	runtime.srv = srv
998 1001
 	return srv, nil
999 1002
 }
1000 1003
 
1001 1004
 type Server struct {
1002
-	runtime    *Runtime
1003
-	enableCors bool
1005
+	runtime     *Runtime
1006
+	enableCors  bool
1007
+	lock        *sync.Mutex
1008
+	pullingPool map[string]struct{}
1009
+	pushingPool map[string]struct{}
1004 1010
 }