Browse code

create a new file services.go and move service part codes from cluster.go into services.go

Signed-off-by: allencloud <allen.sun@daocloud.io>

allencloud authored on 2017/02/12 03:40:14
Showing 2 changed files
... ...
@@ -40,10 +40,7 @@ package cluster
40 40
 
41 41
 import (
42 42
 	"crypto/x509"
43
-	"encoding/base64"
44
-	"encoding/json"
45 43
 	"fmt"
46
-	"io"
47 44
 	"net"
48 45
 	"os"
49 46
 	"path/filepath"
... ...
@@ -52,25 +49,19 @@ import (
52 52
 	"time"
53 53
 
54 54
 	"github.com/Sirupsen/logrus"
55
-	"github.com/docker/distribution/reference"
56 55
 	apierrors "github.com/docker/docker/api/errors"
57 56
 	apitypes "github.com/docker/docker/api/types"
58
-	"github.com/docker/docker/api/types/backend"
59 57
 	"github.com/docker/docker/api/types/filters"
60 58
 	"github.com/docker/docker/api/types/network"
61 59
 	types "github.com/docker/docker/api/types/swarm"
62 60
 	"github.com/docker/docker/daemon/cluster/convert"
63 61
 	executorpkg "github.com/docker/docker/daemon/cluster/executor"
64
-	"github.com/docker/docker/daemon/logger"
65 62
 	"github.com/docker/docker/opts"
66
-	"github.com/docker/docker/pkg/ioutils"
67 63
 	"github.com/docker/docker/pkg/signal"
68
-	"github.com/docker/docker/pkg/stdcopy"
69 64
 	"github.com/docker/docker/runconfig"
70 65
 	swarmapi "github.com/docker/swarmkit/api"
71 66
 	"github.com/docker/swarmkit/manager/encryption"
72 67
 	swarmnode "github.com/docker/swarmkit/node"
73
-	gogotypes "github.com/gogo/protobuf/types"
74 68
 	"github.com/pkg/errors"
75 69
 	"golang.org/x/net/context"
76 70
 )
... ...
@@ -793,370 +784,6 @@ func (c *Cluster) errNoManager(st nodeState) error {
793 793
 	return errors.New("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
794 794
 }
795 795
 
796
-// GetServices returns all services of a managed swarm cluster.
797
-func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
798
-	c.mu.RLock()
799
-	defer c.mu.RUnlock()
800
-
801
-	state := c.currentNodeState()
802
-	if !state.IsActiveManager() {
803
-		return nil, c.errNoManager(state)
804
-	}
805
-
806
-	filters, err := newListServicesFilters(options.Filters)
807
-	if err != nil {
808
-		return nil, err
809
-	}
810
-	ctx, cancel := c.getRequestContext()
811
-	defer cancel()
812
-
813
-	r, err := state.controlClient.ListServices(
814
-		ctx,
815
-		&swarmapi.ListServicesRequest{Filters: filters})
816
-	if err != nil {
817
-		return nil, err
818
-	}
819
-
820
-	services := []types.Service{}
821
-
822
-	for _, service := range r.Services {
823
-		services = append(services, convert.ServiceFromGRPC(*service))
824
-	}
825
-
826
-	return services, nil
827
-}
828
-
829
-// imageWithDigestString takes an image such as name or name:tag
830
-// and returns the image pinned to a digest, such as name@sha256:34234
831
-func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
832
-	ref, err := reference.ParseAnyReference(image)
833
-	if err != nil {
834
-		return "", err
835
-	}
836
-	namedRef, ok := ref.(reference.Named)
837
-	if !ok {
838
-		if _, ok := ref.(reference.Digested); ok {
839
-			return "", errors.New("image reference is an image ID")
840
-		}
841
-		return "", errors.Errorf("unknown image reference format: %s", image)
842
-	}
843
-	// only query registry if not a canonical reference (i.e. with digest)
844
-	if _, ok := namedRef.(reference.Canonical); !ok {
845
-		namedRef = reference.TagNameOnly(namedRef)
846
-
847
-		taggedRef, ok := namedRef.(reference.NamedTagged)
848
-		if !ok {
849
-			return "", errors.Errorf("image reference not tagged: %s", image)
850
-		}
851
-
852
-		repo, _, err := c.config.Backend.GetRepository(ctx, taggedRef, authConfig)
853
-		if err != nil {
854
-			return "", err
855
-		}
856
-		dscrptr, err := repo.Tags(ctx).Get(ctx, taggedRef.Tag())
857
-		if err != nil {
858
-			return "", err
859
-		}
860
-
861
-		namedDigestedRef, err := reference.WithDigest(taggedRef, dscrptr.Digest)
862
-		if err != nil {
863
-			return "", err
864
-		}
865
-		// return familiar form until interface updated to return type
866
-		return reference.FamiliarString(namedDigestedRef), nil
867
-	}
868
-	// reference already contains a digest, so just return it
869
-	return reference.FamiliarString(ref), nil
870
-}
871
-
872
-// CreateService creates a new service in a managed swarm cluster.
873
-func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) {
874
-	c.mu.RLock()
875
-	defer c.mu.RUnlock()
876
-
877
-	state := c.currentNodeState()
878
-	if !state.IsActiveManager() {
879
-		return nil, c.errNoManager(state)
880
-	}
881
-
882
-	ctx, cancel := c.getRequestContext()
883
-	defer cancel()
884
-
885
-	err := c.populateNetworkID(ctx, state.controlClient, &s)
886
-	if err != nil {
887
-		return nil, err
888
-	}
889
-
890
-	serviceSpec, err := convert.ServiceSpecToGRPC(s)
891
-	if err != nil {
892
-		return nil, apierrors.NewBadRequestError(err)
893
-	}
894
-
895
-	ctnr := serviceSpec.Task.GetContainer()
896
-	if ctnr == nil {
897
-		return nil, errors.New("service does not use container tasks")
898
-	}
899
-
900
-	if encodedAuth != "" {
901
-		ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
902
-	}
903
-
904
-	// retrieve auth config from encoded auth
905
-	authConfig := &apitypes.AuthConfig{}
906
-	if encodedAuth != "" {
907
-		if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
908
-			logrus.Warnf("invalid authconfig: %v", err)
909
-		}
910
-	}
911
-
912
-	resp := &apitypes.ServiceCreateResponse{}
913
-
914
-	// pin image by digest
915
-	if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
916
-		digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
917
-		if err != nil {
918
-			logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
919
-			resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()))
920
-		} else if ctnr.Image != digestImage {
921
-			logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
922
-			ctnr.Image = digestImage
923
-		} else {
924
-			logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
925
-		}
926
-	}
927
-
928
-	r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
929
-	if err != nil {
930
-		return nil, err
931
-	}
932
-
933
-	resp.ID = r.Service.ID
934
-	return resp, nil
935
-}
936
-
937
-// GetService returns a service based on an ID or name.
938
-func (c *Cluster) GetService(input string) (types.Service, error) {
939
-	c.mu.RLock()
940
-	defer c.mu.RUnlock()
941
-
942
-	state := c.currentNodeState()
943
-	if !state.IsActiveManager() {
944
-		return types.Service{}, c.errNoManager(state)
945
-	}
946
-
947
-	ctx, cancel := c.getRequestContext()
948
-	defer cancel()
949
-
950
-	service, err := getService(ctx, state.controlClient, input)
951
-	if err != nil {
952
-		return types.Service{}, err
953
-	}
954
-	return convert.ServiceFromGRPC(*service), nil
955
-}
956
-
957
-// UpdateService updates existing service to match new properties.
958
-func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) {
959
-	c.mu.RLock()
960
-	defer c.mu.RUnlock()
961
-
962
-	state := c.currentNodeState()
963
-	if !state.IsActiveManager() {
964
-		return nil, c.errNoManager(state)
965
-	}
966
-
967
-	ctx, cancel := c.getRequestContext()
968
-	defer cancel()
969
-
970
-	err := c.populateNetworkID(ctx, state.controlClient, &spec)
971
-	if err != nil {
972
-		return nil, err
973
-	}
974
-
975
-	serviceSpec, err := convert.ServiceSpecToGRPC(spec)
976
-	if err != nil {
977
-		return nil, apierrors.NewBadRequestError(err)
978
-	}
979
-
980
-	currentService, err := getService(ctx, state.controlClient, serviceIDOrName)
981
-	if err != nil {
982
-		return nil, err
983
-	}
984
-
985
-	newCtnr := serviceSpec.Task.GetContainer()
986
-	if newCtnr == nil {
987
-		return nil, errors.New("service does not use container tasks")
988
-	}
989
-
990
-	if encodedAuth != "" {
991
-		newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
992
-	} else {
993
-		// this is needed because if the encodedAuth isn't being updated then we
994
-		// shouldn't lose it, and continue to use the one that was already present
995
-		var ctnr *swarmapi.ContainerSpec
996
-		switch registryAuthFrom {
997
-		case apitypes.RegistryAuthFromSpec, "":
998
-			ctnr = currentService.Spec.Task.GetContainer()
999
-		case apitypes.RegistryAuthFromPreviousSpec:
1000
-			if currentService.PreviousSpec == nil {
1001
-				return nil, errors.New("service does not have a previous spec")
1002
-			}
1003
-			ctnr = currentService.PreviousSpec.Task.GetContainer()
1004
-		default:
1005
-			return nil, errors.New("unsupported registryAuthFrom value")
1006
-		}
1007
-		if ctnr == nil {
1008
-			return nil, errors.New("service does not use container tasks")
1009
-		}
1010
-		newCtnr.PullOptions = ctnr.PullOptions
1011
-		// update encodedAuth so it can be used to pin image by digest
1012
-		if ctnr.PullOptions != nil {
1013
-			encodedAuth = ctnr.PullOptions.RegistryAuth
1014
-		}
1015
-	}
1016
-
1017
-	// retrieve auth config from encoded auth
1018
-	authConfig := &apitypes.AuthConfig{}
1019
-	if encodedAuth != "" {
1020
-		if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
1021
-			logrus.Warnf("invalid authconfig: %v", err)
1022
-		}
1023
-	}
1024
-
1025
-	resp := &apitypes.ServiceUpdateResponse{}
1026
-
1027
-	// pin image by digest
1028
-	if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
1029
-		digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
1030
-		if err != nil {
1031
-			logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
1032
-			resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()))
1033
-		} else if newCtnr.Image != digestImage {
1034
-			logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
1035
-			newCtnr.Image = digestImage
1036
-		} else {
1037
-			logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
1038
-		}
1039
-	}
1040
-
1041
-	_, err = state.controlClient.UpdateService(
1042
-		ctx,
1043
-		&swarmapi.UpdateServiceRequest{
1044
-			ServiceID: currentService.ID,
1045
-			Spec:      &serviceSpec,
1046
-			ServiceVersion: &swarmapi.Version{
1047
-				Index: version,
1048
-			},
1049
-		},
1050
-	)
1051
-
1052
-	return resp, err
1053
-}
1054
-
1055
-// RemoveService removes a service from a managed swarm cluster.
1056
-func (c *Cluster) RemoveService(input string) error {
1057
-	c.mu.RLock()
1058
-	defer c.mu.RUnlock()
1059
-
1060
-	state := c.currentNodeState()
1061
-	if !state.IsActiveManager() {
1062
-		return c.errNoManager(state)
1063
-	}
1064
-
1065
-	ctx, cancel := c.getRequestContext()
1066
-	defer cancel()
1067
-
1068
-	service, err := getService(ctx, state.controlClient, input)
1069
-	if err != nil {
1070
-		return err
1071
-	}
1072
-
1073
-	_, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID})
1074
-	return err
1075
-}
1076
-
1077
-// ServiceLogs collects service logs and writes them back to `config.OutStream`
1078
-func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
1079
-	c.mu.RLock()
1080
-	state := c.currentNodeState()
1081
-	if !state.IsActiveManager() {
1082
-		c.mu.RUnlock()
1083
-		return c.errNoManager(state)
1084
-	}
1085
-
1086
-	service, err := getService(ctx, state.controlClient, input)
1087
-	if err != nil {
1088
-		c.mu.RUnlock()
1089
-		return err
1090
-	}
1091
-
1092
-	stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
1093
-		Selector: &swarmapi.LogSelector{
1094
-			ServiceIDs: []string{service.ID},
1095
-		},
1096
-		Options: &swarmapi.LogSubscriptionOptions{
1097
-			Follow: config.Follow,
1098
-		},
1099
-	})
1100
-	if err != nil {
1101
-		c.mu.RUnlock()
1102
-		return err
1103
-	}
1104
-
1105
-	wf := ioutils.NewWriteFlusher(config.OutStream)
1106
-	defer wf.Close()
1107
-	close(started)
1108
-	wf.Flush()
1109
-
1110
-	outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
1111
-	errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
1112
-
1113
-	// Release the lock before starting the stream.
1114
-	c.mu.RUnlock()
1115
-	for {
1116
-		// Check the context before doing anything.
1117
-		select {
1118
-		case <-ctx.Done():
1119
-			return ctx.Err()
1120
-		default:
1121
-		}
1122
-
1123
-		subscribeMsg, err := stream.Recv()
1124
-		if err == io.EOF {
1125
-			return nil
1126
-		}
1127
-		if err != nil {
1128
-			return err
1129
-		}
1130
-
1131
-		for _, msg := range subscribeMsg.Messages {
1132
-			data := []byte{}
1133
-
1134
-			if config.Timestamps {
1135
-				ts, err := gogotypes.TimestampFromProto(msg.Timestamp)
1136
-				if err != nil {
1137
-					return err
1138
-				}
1139
-				data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
1140
-			}
1141
-
1142
-			data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
1143
-				contextPrefix, msg.Context.NodeID,
1144
-				contextPrefix, msg.Context.ServiceID,
1145
-				contextPrefix, msg.Context.TaskID,
1146
-			))...)
1147
-
1148
-			data = append(data, msg.Data...)
1149
-
1150
-			switch msg.Stream {
1151
-			case swarmapi.LogStreamStdout:
1152
-				outStream.Write(data)
1153
-			case swarmapi.LogStreamStderr:
1154
-				errStream.Write(data)
1155
-			}
1156
-		}
1157
-	}
1158
-}
1159
-
1160 796
 // GetTasks returns a list of tasks matching the filter options.
1161 797
 func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
1162 798
 	c.mu.RLock()
1163 799
new file mode 100644
... ...
@@ -0,0 +1,389 @@
0
+package cluster
1
+
2
+import (
3
+	"encoding/base64"
4
+	"encoding/json"
5
+	"fmt"
6
+	"io"
7
+	"os"
8
+	"strings"
9
+
10
+	"github.com/Sirupsen/logrus"
11
+	"github.com/docker/distribution/reference"
12
+	apierrors "github.com/docker/docker/api/errors"
13
+	apitypes "github.com/docker/docker/api/types"
14
+	"github.com/docker/docker/api/types/backend"
15
+	types "github.com/docker/docker/api/types/swarm"
16
+	"github.com/docker/docker/daemon/cluster/convert"
17
+	"github.com/docker/docker/daemon/logger"
18
+	"github.com/docker/docker/pkg/ioutils"
19
+	"github.com/docker/docker/pkg/stdcopy"
20
+	swarmapi "github.com/docker/swarmkit/api"
21
+	gogotypes "github.com/gogo/protobuf/types"
22
+	"github.com/pkg/errors"
23
+	"golang.org/x/net/context"
24
+)
25
+
26
+// GetServices returns all services of a managed swarm cluster.
27
+func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
28
+	c.mu.RLock()
29
+	defer c.mu.RUnlock()
30
+
31
+	state := c.currentNodeState()
32
+	if !state.IsActiveManager() {
33
+		return nil, c.errNoManager(state)
34
+	}
35
+
36
+	filters, err := newListServicesFilters(options.Filters)
37
+	if err != nil {
38
+		return nil, err
39
+	}
40
+	ctx, cancel := c.getRequestContext()
41
+	defer cancel()
42
+
43
+	r, err := state.controlClient.ListServices(
44
+		ctx,
45
+		&swarmapi.ListServicesRequest{Filters: filters})
46
+	if err != nil {
47
+		return nil, err
48
+	}
49
+
50
+	services := []types.Service{}
51
+
52
+	for _, service := range r.Services {
53
+		services = append(services, convert.ServiceFromGRPC(*service))
54
+	}
55
+
56
+	return services, nil
57
+}
58
+
59
+// GetService returns a service based on an ID or name.
60
+func (c *Cluster) GetService(input string) (types.Service, error) {
61
+	c.mu.RLock()
62
+	defer c.mu.RUnlock()
63
+
64
+	state := c.currentNodeState()
65
+	if !state.IsActiveManager() {
66
+		return types.Service{}, c.errNoManager(state)
67
+	}
68
+
69
+	ctx, cancel := c.getRequestContext()
70
+	defer cancel()
71
+
72
+	service, err := getService(ctx, state.controlClient, input)
73
+	if err != nil {
74
+		return types.Service{}, err
75
+	}
76
+	return convert.ServiceFromGRPC(*service), nil
77
+}
78
+
79
+// CreateService creates a new service in a managed swarm cluster.
80
+func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) {
81
+	c.mu.RLock()
82
+	defer c.mu.RUnlock()
83
+
84
+	state := c.currentNodeState()
85
+	if !state.IsActiveManager() {
86
+		return nil, c.errNoManager(state)
87
+	}
88
+
89
+	ctx, cancel := c.getRequestContext()
90
+	defer cancel()
91
+
92
+	err := c.populateNetworkID(ctx, state.controlClient, &s)
93
+	if err != nil {
94
+		return nil, err
95
+	}
96
+
97
+	serviceSpec, err := convert.ServiceSpecToGRPC(s)
98
+	if err != nil {
99
+		return nil, apierrors.NewBadRequestError(err)
100
+	}
101
+
102
+	ctnr := serviceSpec.Task.GetContainer()
103
+	if ctnr == nil {
104
+		return nil, errors.New("service does not use container tasks")
105
+	}
106
+
107
+	if encodedAuth != "" {
108
+		ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
109
+	}
110
+
111
+	// retrieve auth config from encoded auth
112
+	authConfig := &apitypes.AuthConfig{}
113
+	if encodedAuth != "" {
114
+		if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
115
+			logrus.Warnf("invalid authconfig: %v", err)
116
+		}
117
+	}
118
+
119
+	resp := &apitypes.ServiceCreateResponse{}
120
+
121
+	// pin image by digest
122
+	if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
123
+		digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
124
+		if err != nil {
125
+			logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
126
+			resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()))
127
+		} else if ctnr.Image != digestImage {
128
+			logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
129
+			ctnr.Image = digestImage
130
+		} else {
131
+			logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
132
+		}
133
+	}
134
+
135
+	r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
136
+	if err != nil {
137
+		return nil, err
138
+	}
139
+
140
+	resp.ID = r.Service.ID
141
+	return resp, nil
142
+}
143
+
144
+// UpdateService updates existing service to match new properties.
145
+func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) {
146
+	c.mu.RLock()
147
+	defer c.mu.RUnlock()
148
+
149
+	state := c.currentNodeState()
150
+	if !state.IsActiveManager() {
151
+		return nil, c.errNoManager(state)
152
+	}
153
+
154
+	ctx, cancel := c.getRequestContext()
155
+	defer cancel()
156
+
157
+	err := c.populateNetworkID(ctx, state.controlClient, &spec)
158
+	if err != nil {
159
+		return nil, err
160
+	}
161
+
162
+	serviceSpec, err := convert.ServiceSpecToGRPC(spec)
163
+	if err != nil {
164
+		return nil, apierrors.NewBadRequestError(err)
165
+	}
166
+
167
+	currentService, err := getService(ctx, state.controlClient, serviceIDOrName)
168
+	if err != nil {
169
+		return nil, err
170
+	}
171
+
172
+	newCtnr := serviceSpec.Task.GetContainer()
173
+	if newCtnr == nil {
174
+		return nil, errors.New("service does not use container tasks")
175
+	}
176
+
177
+	if encodedAuth != "" {
178
+		newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
179
+	} else {
180
+		// this is needed because if the encodedAuth isn't being updated then we
181
+		// shouldn't lose it, and continue to use the one that was already present
182
+		var ctnr *swarmapi.ContainerSpec
183
+		switch registryAuthFrom {
184
+		case apitypes.RegistryAuthFromSpec, "":
185
+			ctnr = currentService.Spec.Task.GetContainer()
186
+		case apitypes.RegistryAuthFromPreviousSpec:
187
+			if currentService.PreviousSpec == nil {
188
+				return nil, errors.New("service does not have a previous spec")
189
+			}
190
+			ctnr = currentService.PreviousSpec.Task.GetContainer()
191
+		default:
192
+			return nil, errors.New("unsupported registryAuthFrom value")
193
+		}
194
+		if ctnr == nil {
195
+			return nil, errors.New("service does not use container tasks")
196
+		}
197
+		newCtnr.PullOptions = ctnr.PullOptions
198
+		// update encodedAuth so it can be used to pin image by digest
199
+		if ctnr.PullOptions != nil {
200
+			encodedAuth = ctnr.PullOptions.RegistryAuth
201
+		}
202
+	}
203
+
204
+	// retrieve auth config from encoded auth
205
+	authConfig := &apitypes.AuthConfig{}
206
+	if encodedAuth != "" {
207
+		if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
208
+			logrus.Warnf("invalid authconfig: %v", err)
209
+		}
210
+	}
211
+
212
+	resp := &apitypes.ServiceUpdateResponse{}
213
+
214
+	// pin image by digest
215
+	if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
216
+		digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
217
+		if err != nil {
218
+			logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
219
+			resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()))
220
+		} else if newCtnr.Image != digestImage {
221
+			logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
222
+			newCtnr.Image = digestImage
223
+		} else {
224
+			logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
225
+		}
226
+	}
227
+
228
+	_, err = state.controlClient.UpdateService(
229
+		ctx,
230
+		&swarmapi.UpdateServiceRequest{
231
+			ServiceID: currentService.ID,
232
+			Spec:      &serviceSpec,
233
+			ServiceVersion: &swarmapi.Version{
234
+				Index: version,
235
+			},
236
+		},
237
+	)
238
+
239
+	return resp, err
240
+}
241
+
242
+// RemoveService removes a service from a managed swarm cluster.
243
+func (c *Cluster) RemoveService(input string) error {
244
+	c.mu.RLock()
245
+	defer c.mu.RUnlock()
246
+
247
+	state := c.currentNodeState()
248
+	if !state.IsActiveManager() {
249
+		return c.errNoManager(state)
250
+	}
251
+
252
+	ctx, cancel := c.getRequestContext()
253
+	defer cancel()
254
+
255
+	service, err := getService(ctx, state.controlClient, input)
256
+	if err != nil {
257
+		return err
258
+	}
259
+
260
+	_, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID})
261
+	return err
262
+}
263
+
264
+// ServiceLogs collects service logs and writes them back to `config.OutStream`
265
+func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
266
+	c.mu.RLock()
267
+	state := c.currentNodeState()
268
+	if !state.IsActiveManager() {
269
+		c.mu.RUnlock()
270
+		return c.errNoManager(state)
271
+	}
272
+
273
+	service, err := getService(ctx, state.controlClient, input)
274
+	if err != nil {
275
+		c.mu.RUnlock()
276
+		return err
277
+	}
278
+
279
+	stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
280
+		Selector: &swarmapi.LogSelector{
281
+			ServiceIDs: []string{service.ID},
282
+		},
283
+		Options: &swarmapi.LogSubscriptionOptions{
284
+			Follow: config.Follow,
285
+		},
286
+	})
287
+	if err != nil {
288
+		c.mu.RUnlock()
289
+		return err
290
+	}
291
+
292
+	wf := ioutils.NewWriteFlusher(config.OutStream)
293
+	defer wf.Close()
294
+	close(started)
295
+	wf.Flush()
296
+
297
+	outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
298
+	errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
299
+
300
+	// Release the lock before starting the stream.
301
+	c.mu.RUnlock()
302
+	for {
303
+		// Check the context before doing anything.
304
+		select {
305
+		case <-ctx.Done():
306
+			return ctx.Err()
307
+		default:
308
+		}
309
+
310
+		subscribeMsg, err := stream.Recv()
311
+		if err == io.EOF {
312
+			return nil
313
+		}
314
+		if err != nil {
315
+			return err
316
+		}
317
+
318
+		for _, msg := range subscribeMsg.Messages {
319
+			data := []byte{}
320
+
321
+			if config.Timestamps {
322
+				ts, err := gogotypes.TimestampFromProto(msg.Timestamp)
323
+				if err != nil {
324
+					return err
325
+				}
326
+				data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
327
+			}
328
+
329
+			data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
330
+				contextPrefix, msg.Context.NodeID,
331
+				contextPrefix, msg.Context.ServiceID,
332
+				contextPrefix, msg.Context.TaskID,
333
+			))...)
334
+
335
+			data = append(data, msg.Data...)
336
+
337
+			switch msg.Stream {
338
+			case swarmapi.LogStreamStdout:
339
+				outStream.Write(data)
340
+			case swarmapi.LogStreamStderr:
341
+				errStream.Write(data)
342
+			}
343
+		}
344
+	}
345
+}
346
+
347
+// imageWithDigestString takes an image such as name or name:tag
348
+// and returns the image pinned to a digest, such as name@sha256:34234
349
+func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
350
+	ref, err := reference.ParseAnyReference(image)
351
+	if err != nil {
352
+		return "", err
353
+	}
354
+	namedRef, ok := ref.(reference.Named)
355
+	if !ok {
356
+		if _, ok := ref.(reference.Digested); ok {
357
+			return "", errors.New("image reference is an image ID")
358
+		}
359
+		return "", errors.Errorf("unknown image reference format: %s", image)
360
+	}
361
+	// only query registry if not a canonical reference (i.e. with digest)
362
+	if _, ok := namedRef.(reference.Canonical); !ok {
363
+		namedRef = reference.TagNameOnly(namedRef)
364
+
365
+		taggedRef, ok := namedRef.(reference.NamedTagged)
366
+		if !ok {
367
+			return "", errors.Errorf("image reference not tagged: %s", image)
368
+		}
369
+
370
+		repo, _, err := c.config.Backend.GetRepository(ctx, taggedRef, authConfig)
371
+		if err != nil {
372
+			return "", err
373
+		}
374
+		dscrptr, err := repo.Tags(ctx).Get(ctx, taggedRef.Tag())
375
+		if err != nil {
376
+			return "", err
377
+		}
378
+
379
+		namedDigestedRef, err := reference.WithDigest(taggedRef, dscrptr.Digest)
380
+		if err != nil {
381
+			return "", err
382
+		}
383
+		// return familiar form until interface updated to return type
384
+		return reference.FamiliarString(namedDigestedRef), nil
385
+	}
386
+	// reference already contains a digest, so just return it
387
+	return reference.FamiliarString(ref), nil
388
+}