Browse code

wait on pull already in progress

Victor Vieux authored on 2013/11/21 06:51:05
Showing 2 changed files
... ...
@@ -732,9 +732,9 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin
732 732
 		id := history[i]
733 733
 
734 734
 		// ensure no two downloads of the same layer happen at the same time
735
-		if err := srv.poolAdd("pull", "layer:"+id); err != nil {
735
+		if c, err := srv.poolAdd("pull", "layer:"+id); err != nil {
736 736
 			utils.Errorf("Image (id: %s) pull is already running, skipping: %v", id, err)
737
-			return nil
737
+			<-c
738 738
 		}
739 739
 		defer srv.poolRemove("pull", "layer:"+id)
740 740
 
... ...
@@ -829,7 +829,7 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName
829 829
 			}
830 830
 
831 831
 			// ensure no two downloads of the same image happen at the same time
832
-			if err := srv.poolAdd("pull", "img:"+img.ID); err != nil {
832
+			if _, err := srv.poolAdd("pull", "img:"+img.ID); err != nil {
833 833
 				utils.Errorf("Image (id: %s) pull is already running, skipping: %v", img.ID, err)
834 834
 				if parallel {
835 835
 					errors <- nil
... ...
@@ -900,38 +900,41 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName
900 900
 	return nil
901 901
 }
902 902
 
903
-func (srv *Server) poolAdd(kind, key string) error {
903
+func (srv *Server) poolAdd(kind, key string) (chan struct{}, error) {
904 904
 	srv.Lock()
905 905
 	defer srv.Unlock()
906 906
 
907
-	if _, exists := srv.pullingPool[key]; exists {
908
-		return fmt.Errorf("pull %s is already in progress", key)
907
+	if c, exists := srv.pullingPool[key]; exists {
908
+		return c, fmt.Errorf("pull %s is already in progress", key)
909 909
 	}
910
-	if _, exists := srv.pushingPool[key]; exists {
911
-		return fmt.Errorf("push %s is already in progress", key)
910
+	if c, exists := srv.pushingPool[key]; exists {
911
+		return c, fmt.Errorf("push %s is already in progress", key)
912 912
 	}
913 913
 
914
+	c := make(chan struct{})
914 915
 	switch kind {
915 916
 	case "pull":
916
-		srv.pullingPool[key] = struct{}{}
917
-		break
917
+		srv.pullingPool[key] = c
918 918
 	case "push":
919
-		srv.pushingPool[key] = struct{}{}
920
-		break
919
+		srv.pushingPool[key] = c
921 920
 	default:
922
-		return fmt.Errorf("Unknown pool type")
921
+		return nil, fmt.Errorf("Unknown pool type")
923 922
 	}
924
-	return nil
923
+	return c, nil
925 924
 }
926 925
 
927 926
 func (srv *Server) poolRemove(kind, key string) error {
928 927
 	switch kind {
929 928
 	case "pull":
930
-		delete(srv.pullingPool, key)
931
-		break
929
+		if c, exists := srv.pullingPool[key]; exists {
930
+			close(c)
931
+			delete(srv.pullingPool, key)
932
+		}
932 933
 	case "push":
933
-		delete(srv.pushingPool, key)
934
-		break
934
+		if c, exists := srv.pushingPool[key]; exists {
935
+			close(c)
936
+			delete(srv.pushingPool, key)
937
+		}
935 938
 	default:
936 939
 		return fmt.Errorf("Unknown pool type")
937 940
 	}
... ...
@@ -943,7 +946,7 @@ func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *ut
943 943
 	if err != nil {
944 944
 		return err
945 945
 	}
946
-	if err := srv.poolAdd("pull", localName+":"+tag); err != nil {
946
+	if _, err := srv.poolAdd("pull", localName+":"+tag); err != nil {
947 947
 		return err
948 948
 	}
949 949
 	defer srv.poolRemove("pull", localName+":"+tag)
... ...
@@ -1138,7 +1141,7 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID,
1138 1138
 
1139 1139
 // FIXME: Allow to interrupt current push when new push of same image is done.
1140 1140
 func (srv *Server) ImagePush(localName string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig, metaHeaders map[string][]string) error {
1141
-	if err := srv.poolAdd("push", localName); err != nil {
1141
+	if _, err := srv.poolAdd("push", localName); err != nil {
1142 1142
 		return err
1143 1143
 	}
1144 1144
 	defer srv.poolRemove("push", localName)
... ...
@@ -1769,8 +1772,8 @@ func NewServer(eng *engine.Engine, config *DaemonConfig) (*Server, error) {
1769 1769
 	srv := &Server{
1770 1770
 		Eng:         eng,
1771 1771
 		runtime:     runtime,
1772
-		pullingPool: make(map[string]struct{}),
1773
-		pushingPool: make(map[string]struct{}),
1772
+		pullingPool: make(map[string]chan struct{}),
1773
+		pushingPool: make(map[string]chan struct{}),
1774 1774
 		events:      make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
1775 1775
 		listeners:   make(map[string]chan utils.JSONMessage),
1776 1776
 		reqFactory:  nil,
... ...
@@ -1807,8 +1810,8 @@ func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage {
1807 1807
 type Server struct {
1808 1808
 	sync.Mutex
1809 1809
 	runtime     *Runtime
1810
-	pullingPool map[string]struct{}
1811
-	pushingPool map[string]struct{}
1810
+	pullingPool map[string]chan struct{}
1811
+	pushingPool map[string]chan struct{}
1812 1812
 	events      []utils.JSONMessage
1813 1813
 	listeners   map[string]chan utils.JSONMessage
1814 1814
 	reqFactory  *utils.HTTPRequestFactory
... ...
@@ -8,49 +8,38 @@ import (
8 8
 
9 9
 func TestPools(t *testing.T) {
10 10
 	srv := &Server{
11
-		pullingPool: make(map[string]struct{}),
12
-		pushingPool: make(map[string]struct{}),
11
+		pullingPool: make(map[string]chan struct{}),
12
+		pushingPool: make(map[string]chan struct{}),
13 13
 	}
14 14
 
15
-	err := srv.poolAdd("pull", "test1")
16
-	if err != nil {
15
+	if _, err := srv.poolAdd("pull", "test1"); err != nil {
17 16
 		t.Fatal(err)
18 17
 	}
19
-	err = srv.poolAdd("pull", "test2")
20
-	if err != nil {
18
+	if _, err := srv.poolAdd("pull", "test2"); err != nil {
21 19
 		t.Fatal(err)
22 20
 	}
23
-	err = srv.poolAdd("push", "test1")
24
-	if err == nil || err.Error() != "pull test1 is already in progress" {
21
+	if _, err := srv.poolAdd("push", "test1"); err == nil || err.Error() != "pull test1 is already in progress" {
25 22
 		t.Fatalf("Expected `pull test1 is already in progress`")
26 23
 	}
27
-	err = srv.poolAdd("pull", "test1")
28
-	if err == nil || err.Error() != "pull test1 is already in progress" {
24
+	if _, err := srv.poolAdd("pull", "test1"); err == nil || err.Error() != "pull test1 is already in progress" {
29 25
 		t.Fatalf("Expected `pull test1 is already in progress`")
30 26
 	}
31
-	err = srv.poolAdd("wait", "test3")
32
-	if err == nil || err.Error() != "Unknown pool type" {
27
+	if _, err := srv.poolAdd("wait", "test3"); err == nil || err.Error() != "Unknown pool type" {
33 28
 		t.Fatalf("Expected `Unknown pool type`")
34 29
 	}
35
-
36
-	err = srv.poolRemove("pull", "test2")
37
-	if err != nil {
30
+	if err := srv.poolRemove("pull", "test2"); err != nil {
38 31
 		t.Fatal(err)
39 32
 	}
40
-	err = srv.poolRemove("pull", "test2")
41
-	if err != nil {
33
+	if err := srv.poolRemove("pull", "test2"); err != nil {
42 34
 		t.Fatal(err)
43 35
 	}
44
-	err = srv.poolRemove("pull", "test1")
45
-	if err != nil {
36
+	if err := srv.poolRemove("pull", "test1"); err != nil {
46 37
 		t.Fatal(err)
47 38
 	}
48
-	err = srv.poolRemove("push", "test1")
49
-	if err != nil {
39
+	if err := srv.poolRemove("push", "test1"); err != nil {
50 40
 		t.Fatal(err)
51 41
 	}
52
-	err = srv.poolRemove("wait", "test3")
53
-	if err == nil || err.Error() != "Unknown pool type" {
42
+	if err := srv.poolRemove("wait", "test3"); err == nil || err.Error() != "Unknown pool type" {
54 43
 		t.Fatalf("Expected `Unknown pool type`")
55 44
 	}
56 45
 }