Browse code

Add rsync daemon copy strategy for windows support

Cesar Wong authored on 2015/10/21 05:01:29
Showing 16 changed files
... ...
@@ -1178,7 +1178,7 @@ _oc_rsync()
1178 1178
     flags+=("--delete")
1179 1179
     flags+=("--quiet")
1180 1180
     flags+=("-q")
1181
-    flags+=("--use-tar")
1181
+    flags+=("--strategy=")
1182 1182
 
1183 1183
     must_have_one_flag=()
1184 1184
     must_have_one_noun=()
... ...
@@ -2986,7 +2986,7 @@ _openshift_cli_rsync()
2986 2986
     flags+=("--delete")
2987 2987
     flags+=("--quiet")
2988 2988
     flags+=("-q")
2989
-    flags+=("--use-tar")
2989
+    flags+=("--strategy=")
2990 2990
 
2991 2991
     must_have_one_flag=()
2992 2992
     must_have_one_noun=()
... ...
@@ -762,7 +762,7 @@ Start a shell session in a pod
762 762
 
763 763
 
764 764
 == oc rsync
765
-Copy local files to a pod
765
+Copy files between local filesystem and a pod
766 766
 
767 767
 ====
768 768
 
... ...
@@ -771,6 +771,9 @@ Copy local files to a pod
771 771
 
772 772
   # Synchronize a local directory with a pod directory
773 773
   $ openshift cli rsync ./local/dir/ POD:/remote/dir
774
+  
775
+  # Synchronize a pod directory with a local directory
776
+  $ openshift cli rsync POD:/remote/dir/ ./local/dir
774 777
 ----
775 778
 ====
776 779
 
... ...
@@ -14,6 +14,7 @@ import (
14 14
 	kubecmd "k8s.io/kubernetes/pkg/kubectl/cmd"
15 15
 
16 16
 	"github.com/openshift/origin/pkg/cmd/cli/cmd"
17
+	"github.com/openshift/origin/pkg/cmd/cli/cmd/rsync"
17 18
 	"github.com/openshift/origin/pkg/cmd/cli/policy"
18 19
 	"github.com/openshift/origin/pkg/cmd/cli/secrets"
19 20
 	"github.com/openshift/origin/pkg/cmd/flagtypes"
... ...
@@ -111,7 +112,7 @@ func NewCommandCLI(name, fullName string, in io.Reader, out, errout io.Writer) *
111 111
 				cmd.NewCmdExplain(fullName, f, out),
112 112
 				cmd.NewCmdLogs(fullName, f, out),
113 113
 				cmd.NewCmdRsh(cmd.RshRecommendedName, fullName, f, in, out, errout),
114
-				cmd.NewCmdRsync(cmd.RsyncRecommendedName, fullName, f, out, errout),
114
+				rsync.NewCmdRsync(rsync.RsyncRecommendedName, fullName, f, out, errout),
115 115
 				cmd.NewCmdExec(fullName, f, in, out, errout),
116 116
 				cmd.NewCmdPortForward(fullName, f),
117 117
 				cmd.NewCmdProxy(fullName, f, out),
118 118
deleted file mode 100644
... ...
@@ -1,413 +0,0 @@
1
-package cmd
2
-
3
-import (
4
-	"bytes"
5
-	"fmt"
6
-	"io"
7
-	"io/ioutil"
8
-	"os"
9
-	"os/exec"
10
-	"path/filepath"
11
-	"runtime"
12
-	"strings"
13
-
14
-	"github.com/golang/glog"
15
-	"github.com/openshift/source-to-image/pkg/tar"
16
-	"github.com/spf13/cobra"
17
-	kvalidation "k8s.io/kubernetes/pkg/api/validation"
18
-	kclient "k8s.io/kubernetes/pkg/client/unversioned"
19
-	kubecmd "k8s.io/kubernetes/pkg/kubectl/cmd"
20
-	kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
21
-
22
-	"github.com/openshift/origin/pkg/cmd/util/clientcmd"
23
-)
24
-
25
-const (
26
-	// RsyncRecommendedName is the recommended name for the rsync command
27
-	RsyncRecommendedName = "rsync"
28
-
29
-	rsyncLong = `
30
-Copy local files to a container
31
-
32
-This command will attempt to copy local files to a remote container. It will default to the
33
-first container if none is specified, and will attempt to use 'rsync' if available locally and in the
34
-container. If 'rsync' is not present, it will attempt to use 'tar' to send files to the container.`
35
-
36
-	rsyncExample = `
37
-  # Synchronize a local directory with a pod directory
38
-  $ %[1]s ./local/dir/ POD:/remote/dir`
39
-
40
-	defaultRsyncExecutable = "rsync"
41
-	defaultTarExecutable   = "tar"
42
-)
43
-
44
-var (
45
-	testRsyncCommand = []string{"rsync", "--version"}
46
-	testTarCommand   = []string{"tar", "--version"}
47
-)
48
-
49
-// RsyncOptions holds the options to execute the sync command
50
-type RsyncOptions struct {
51
-	Namespace      string
52
-	PodName        string
53
-	ContainerName  string
54
-	Source         string
55
-	Destination    string
56
-	DestinationDir string
57
-	RshCommand     string
58
-	UseTar         bool
59
-	Quiet          bool
60
-	Delete         bool
61
-
62
-	Out    io.Writer
63
-	ErrOut io.Writer
64
-
65
-	LocalExecutor  executor
66
-	RemoteExecutor executor
67
-	Tar            tar.Tar
68
-	PodClient      kclient.PodInterface
69
-}
70
-
71
-// executor executes commands
72
-type executor interface {
73
-	Execute(command []string, in io.Reader, out, err io.Writer) error
74
-}
75
-
76
-// NewCmdRsync creates a new sync command
77
-func NewCmdRsync(name, parent string, f *clientcmd.Factory, out, errOut io.Writer) *cobra.Command {
78
-	tarHelper := tar.New()
79
-	tarHelper.SetExclusionPattern(nil)
80
-	o := RsyncOptions{
81
-		Out:           out,
82
-		ErrOut:        errOut,
83
-		LocalExecutor: &defaultLocalExecutor{},
84
-		Tar:           tarHelper,
85
-	}
86
-	cmd := &cobra.Command{
87
-		Use:     fmt.Sprintf("%s SOURCE_DIR POD:DESTINATION_DIR", name),
88
-		Short:   "Copy local files to a pod",
89
-		Long:    rsyncLong,
90
-		Example: fmt.Sprintf(rsyncExample, parent+" "+name),
91
-		Run: func(c *cobra.Command, args []string) {
92
-			kcmdutil.CheckErr(o.Complete(f, c, args))
93
-			kcmdutil.CheckErr(o.Validate())
94
-			kcmdutil.CheckErr(o.RunRsync())
95
-		},
96
-	}
97
-
98
-	cmd.Flags().StringVarP(&o.ContainerName, "container", "c", "", "Container within the pod")
99
-	cmd.Flags().BoolVarP(&o.Quiet, "quiet", "q", false, "Quiet copy")
100
-	cmd.Flags().BoolVar(&o.Delete, "delete", false, "Delete files not present in source")
101
-	cmd.Flags().BoolVar(&o.UseTar, "use-tar", false, "Use tar instead of rsync")
102
-	return cmd
103
-}
104
-
105
-func parseDestination(destination string) (string, string, error) {
106
-	parts := strings.SplitN(destination, ":", 2)
107
-	if len(parts) < 2 || len(parts[0]) == 0 || len(parts[1]) == 0 {
108
-		return "", "", fmt.Errorf("invalid destination %s: must be of the form PODNAME:DESTINATION_DIR", destination)
109
-	}
110
-	valid, msg := kvalidation.ValidatePodName(parts[0], false)
111
-	if !valid {
112
-		return "", "", fmt.Errorf("invalid pod name %s: %s", parts[0], msg)
113
-	}
114
-	return parts[0], parts[1], nil
115
-}
116
-
117
-// Complete verifies command line arguments and loads data from the command environment
118
-func (o *RsyncOptions) Complete(f *clientcmd.Factory, cmd *cobra.Command, args []string) error {
119
-	switch n := len(args); {
120
-	case n == 0:
121
-		cmd.Help()
122
-		fallthrough
123
-	case n < 2:
124
-		return kcmdutil.UsageError(cmd, "SOURCE_DIR and POD:DESTINATION_DIR are required arguments")
125
-	case n > 2:
126
-		return kcmdutil.UsageError(cmd, "only SOURCE_DIR and POD:DESTINATION_DIR should be specified as arguments")
127
-	}
128
-
129
-	// Set main command arguments
130
-	o.Source = args[0]
131
-	o.Destination = args[1]
132
-
133
-	// Determine pod name
134
-	var err error
135
-	o.PodName, o.DestinationDir, err = parseDestination(o.Destination)
136
-	if err != nil {
137
-		return kcmdutil.UsageError(cmd, err.Error())
138
-	}
139
-
140
-	// Use tar if running on windows
141
-	// TODO: Figure out how to use rsync in windows so that I/O can be
142
-	// redirected from the openshift native command to the cygwin rsync command
143
-	if runtime.GOOS == "windows" {
144
-		o.UseTar = true
145
-	}
146
-
147
-	namespace, _, err := f.DefaultNamespace()
148
-	if err != nil {
149
-		return err
150
-	}
151
-	o.Namespace = namespace
152
-
153
-	// Determine the Rsh command to use for rsync
154
-	if !o.UseTar {
155
-		rsh := siblingCommand(cmd, "rsh")
156
-		rshCmd := []string{rsh, "-n", o.Namespace}
157
-		if len(o.ContainerName) > 0 {
158
-			rshCmd = append(rshCmd, "-c", o.ContainerName)
159
-		}
160
-		o.RshCommand = strings.Join(rshCmd, " ")
161
-		glog.V(4).Infof("Rsh command: %s", o.RshCommand)
162
-	}
163
-
164
-	config, err := f.ClientConfig()
165
-	if err != nil {
166
-		return err
167
-	}
168
-
169
-	client, err := f.Client()
170
-	if err != nil {
171
-		return err
172
-	}
173
-
174
-	o.RemoteExecutor = &defaultRemoteExecutor{
175
-		Namespace:     o.Namespace,
176
-		PodName:       o.PodName,
177
-		ContainerName: o.ContainerName,
178
-		Config:        config,
179
-		Client:        client,
180
-	}
181
-
182
-	o.PodClient = client.Pods(namespace)
183
-
184
-	return nil
185
-}
186
-
187
-// sibling command returns a sibling command to the current command
188
-func siblingCommand(cmd *cobra.Command, name string) string {
189
-	c := cmd.Parent()
190
-	command := []string{}
191
-	for c != nil {
192
-		glog.V(5).Infof("Found parent command: %s", c.Name())
193
-		command = append([]string{c.Name()}, command...)
194
-		c = c.Parent()
195
-	}
196
-	// Replace the root command with what was actually used
197
-	// in the command line
198
-	glog.V(4).Infof("Setting root command to: %s", os.Args[0])
199
-	command[0] = os.Args[0]
200
-
201
-	// Append the sibling command
202
-	command = append(command, name)
203
-	glog.V(4).Infof("The sibling command is: %s", strings.Join(command, " "))
204
-
205
-	return strings.Join(command, " ")
206
-}
207
-
208
-// Validate checks that SyncOptions has all necessary fields
209
-func (o *RsyncOptions) Validate() error {
210
-	if len(o.PodName) == 0 {
211
-		return fmt.Errorf("pod name must be provided")
212
-	}
213
-	if len(o.Source) == 0 {
214
-		return fmt.Errorf("local source must be provided")
215
-	}
216
-	if len(o.Destination) == 0 {
217
-		return fmt.Errorf("remote destination must be provided")
218
-	}
219
-	if o.UseTar && len(o.DestinationDir) == 0 {
220
-		return fmt.Errorf("destination directory must be provided if using tar")
221
-	}
222
-	if !o.UseTar && len(o.RshCommand) == 0 {
223
-		return fmt.Errorf("rsh command must be provided when not using tar")
224
-	}
225
-	if o.Out == nil || o.ErrOut == nil {
226
-		return fmt.Errorf("output and error streams must be specified")
227
-	}
228
-	if o.LocalExecutor == nil || o.RemoteExecutor == nil {
229
-		return fmt.Errorf("local and remote executors must be provided")
230
-	}
231
-	if o.PodClient == nil {
232
-		return fmt.Errorf("pod client must be provided")
233
-	}
234
-	return nil
235
-}
236
-
237
-func (o *RsyncOptions) copyWithRsync(out, errOut io.Writer) error {
238
-	glog.V(3).Infof("Copying files with rsync")
239
-	flags := "-a"
240
-	if !o.Quiet {
241
-		flags += "v"
242
-	}
243
-	cmd := []string{defaultRsyncExecutable, flags}
244
-	if o.Delete {
245
-		cmd = append(cmd, "--delete")
246
-	}
247
-	cmd = append(cmd, "-e", o.RshCommand, o.Source, o.Destination)
248
-	glog.V(4).Infof("Local command: %s", strings.Join(cmd, " "))
249
-	return o.LocalExecutor.Execute(cmd, nil, out, errOut)
250
-}
251
-
252
-func (o *RsyncOptions) copyWithTar(out, errOut io.Writer) error {
253
-	glog.V(3).Infof("Copying files with tar")
254
-	if o.Delete {
255
-		// Implement the rsync --delete flag as a separate call to first delete directory contents
256
-		deleteCmd := []string{"sh", "-c", fmt.Sprintf("rm -rf %s", filepath.Join(o.DestinationDir, "*"))}
257
-		err := executeWithLogging(o.RemoteExecutor, deleteCmd)
258
-		if err != nil {
259
-			return fmt.Errorf("unable to delete files in destination: %v", err)
260
-		}
261
-	}
262
-	tmp, err := ioutil.TempFile("", "rsync")
263
-	if err != nil {
264
-		return fmt.Errorf("cannot create local temporary file for tar: %v", err)
265
-	}
266
-	defer os.Remove(tmp.Name())
267
-
268
-	err = tarLocal(o.Tar, o.Source, tmp)
269
-	if err != nil {
270
-		return fmt.Errorf("error creating tar of source directory: %v", err)
271
-	}
272
-	err = tmp.Close()
273
-	if err != nil {
274
-		return fmt.Errorf("error closing temporary tar file %s: %v", tmp.Name(), err)
275
-	}
276
-	tmp, err = os.Open(tmp.Name())
277
-	if err != nil {
278
-		return fmt.Errorf("cannot open temporary tar file %s: %v", tmp.Name(), err)
279
-	}
280
-	flags := "x"
281
-	if !o.Quiet {
282
-		flags += "v"
283
-	}
284
-	remoteCmd := []string{defaultTarExecutable, flags, "-C", o.DestinationDir}
285
-	errBuf := &bytes.Buffer{}
286
-	return o.RemoteExecutor.Execute(remoteCmd, tmp, out, errBuf)
287
-}
288
-
289
-func tarLocal(tar tar.Tar, sourceDir string, w io.Writer) error {
290
-	glog.V(4).Infof("Tarring %s locally", sourceDir)
291
-	// includeParent mimics rsync's behavior. When the source path ends in a path
292
-	// separator, then only the contents of the directory are copied. Otherwise,
293
-	// the directory itself is copied.
294
-	includeParent := true
295
-	if strings.HasSuffix(sourceDir, string(filepath.Separator)) {
296
-		includeParent = false
297
-		sourceDir = sourceDir[:len(sourceDir)-1]
298
-	}
299
-	return tar.CreateTarStream(sourceDir, includeParent, w)
300
-}
301
-
302
-// RunRsync copies files from source to destination
303
-func (o *RsyncOptions) RunRsync() error {
304
-	// If not going straight to tar and rsync exists locally, attempt to copy with rsync
305
-	if !o.UseTar && o.checkLocalRsync() {
306
-		errBuf := &bytes.Buffer{}
307
-		err := o.copyWithRsync(o.Out, errBuf)
308
-		// If no error occurred, we're done
309
-		if err == nil {
310
-			return nil
311
-		}
312
-		// If an error occurred, check whether rsync exists on the container.
313
-		// If it doesn't, fallback to tar
314
-		if o.checkRemoteRsync() {
315
-			// If remote rsync does exist, simply report the error
316
-			io.Copy(o.ErrOut, errBuf)
317
-			return err
318
-		}
319
-	}
320
-	return o.copyWithTar(o.Out, o.ErrOut)
321
-}
322
-
323
-func executeWithLogging(e executor, cmd []string) error {
324
-	w := &bytes.Buffer{}
325
-	err := e.Execute(cmd, nil, w, w)
326
-	glog.V(4).Infof("%s", w.String())
327
-	glog.V(4).Infof("error: %v", err)
328
-	return err
329
-}
330
-
331
-func (o *RsyncOptions) checkLocalRsync() bool {
332
-	_, err := exec.LookPath("rsync")
333
-	if err != nil {
334
-		glog.Warningf("rsync not found in local computer")
335
-		return false
336
-	}
337
-	return true
338
-}
339
-
340
-func (o *RsyncOptions) checkRemoteRsync() bool {
341
-	err := executeWithLogging(o.RemoteExecutor, testRsyncCommand)
342
-	if err != nil {
343
-		glog.Warningf("rsync not found in container %s of pod %s", o.containerName(), o.PodName)
344
-		return false
345
-	}
346
-	return true
347
-}
348
-
349
-func (o *RsyncOptions) containerName() string {
350
-	if len(o.ContainerName) > 0 {
351
-		return o.ContainerName
352
-	}
353
-	pod, err := o.PodClient.Get(o.PodName)
354
-	if err != nil {
355
-		glog.V(1).Infof("Error getting pod %s: %v", o.PodName, err)
356
-		return "[unknown]"
357
-	}
358
-	return pod.Spec.Containers[0].Name
359
-}
360
-
361
-// defaultLocalExecutor will execute commands on the local machine
362
-type defaultLocalExecutor struct{}
363
-
364
-// Execute will run a command locally
365
-func (*defaultLocalExecutor) Execute(command []string, in io.Reader, out, errOut io.Writer) error {
366
-	glog.V(3).Infof("Local executor running command: %s", strings.Join(command, " "))
367
-	cmd := exec.Command(command[0], command[1:]...)
368
-	cmd.Stdout = out
369
-	cmd.Stderr = errOut
370
-	cmd.Stdin = in
371
-	err := cmd.Run()
372
-	if err != nil {
373
-		glog.V(4).Infof("Error from local command execution: %v", err)
374
-	}
375
-	return err
376
-}
377
-
378
-// defaultRemoteExecutor will execute commands on a given pod/container by using the kube Exec command
379
-type defaultRemoteExecutor struct {
380
-	Namespace     string
381
-	PodName       string
382
-	ContainerName string
383
-	Client        *kclient.Client
384
-	Config        *kclient.Config
385
-}
386
-
387
-// Execute will run a command in a pod
388
-func (e *defaultRemoteExecutor) Execute(command []string, in io.Reader, out, errOut io.Writer) error {
389
-	glog.V(3).Infof("Remote executor running command: %s", strings.Join(command, " "))
390
-	execOptions := &kubecmd.ExecOptions{
391
-		In:            in,
392
-		Out:           out,
393
-		Err:           errOut,
394
-		Stdin:         in != nil,
395
-		Executor:      &kubecmd.DefaultRemoteExecutor{},
396
-		Client:        e.Client,
397
-		Config:        e.Config,
398
-		PodName:       e.PodName,
399
-		ContainerName: e.ContainerName,
400
-		Namespace:     e.Namespace,
401
-		Command:       command,
402
-	}
403
-	err := execOptions.Validate()
404
-	if err != nil {
405
-		glog.V(4).Infof("Error from remote command validation: %v", err)
406
-		return err
407
-	}
408
-	err = execOptions.Run()
409
-	if err != nil {
410
-		glog.V(4).Infof("Error from remote execution: %v", err)
411
-	}
412
-	return err
413
-}
414 1
new file mode 100644
... ...
@@ -0,0 +1,63 @@
0
+package rsync
1
+
2
+import (
3
+	"bytes"
4
+	"fmt"
5
+	"io"
6
+	"strings"
7
+
8
+	"github.com/golang/glog"
9
+	"k8s.io/kubernetes/pkg/util/errors"
10
+)
11
+
12
+// copyStrategies is an ordered list of copyStrategy objects that behaves as a single
13
+// strategy.
14
+type copyStrategies []copyStrategy
15
+
16
+// ensure copyStrategies implements the copyStrategy interface
17
+var _ copyStrategy = copyStrategies{}
18
+
19
+// Copy will call copy for strategies in list order. If a strategySetupError results from a copy,
20
+// the next strategy will be attempted. Otherwise the error is returned.
21
+func (ss copyStrategies) Copy(source, destination *pathSpec, out, errOut io.Writer) error {
22
+	var err error
23
+	for _, s := range ss {
24
+		errBuf := &bytes.Buffer{}
25
+		err = s.Copy(source, destination, out, errBuf)
26
+		if _, isSetupError := err.(strategySetupError); isSetupError {
27
+			glog.V(4).Infof("Error output:\n%s", errBuf.String())
28
+			glog.Warningf("Cannot use %s: %v", s.String(), err.Error())
29
+			continue
30
+		}
31
+		io.Copy(errOut, errBuf)
32
+		break
33
+	}
34
+	return err
35
+}
36
+
37
+// Validate will call Validate on all strategies and return an aggregate of their errors
38
+func (ss copyStrategies) Validate() error {
39
+	var errs []error
40
+	for _, s := range ss {
41
+		err := s.Validate()
42
+		if err != nil {
43
+			errs = append(errs, fmt.Errorf("invalid %v strategy: %v", s, err))
44
+		}
45
+	}
46
+	return errors.NewAggregate(errs)
47
+}
48
+
49
+// String will return a comma-separated list of strategy names
50
+func (ss copyStrategies) String() string {
51
+	names := []string{}
52
+	for _, s := range ss {
53
+		names = append(names, s.String())
54
+	}
55
+	return strings.Join(names, ",")
56
+}
57
+
58
+// strategySetupError is an error that occurred while setting up a strategy
59
+// (as opposed to actually executing a copy and getting an error from normal copy execution)
60
+type strategySetupError string
61
+
62
+func (e strategySetupError) Error() string { return string(e) }
0 63
new file mode 100644
... ...
@@ -0,0 +1,98 @@
0
+package rsync
1
+
2
+import (
3
+	"errors"
4
+	"io"
5
+	"strings"
6
+
7
+	"github.com/golang/glog"
8
+	"github.com/spf13/cobra"
9
+	kerrors "k8s.io/kubernetes/pkg/util/errors"
10
+
11
+	"github.com/openshift/origin/pkg/cmd/util/clientcmd"
12
+)
13
+
14
+var (
15
+	testRsyncCommand = []string{"rsync", "--version"}
16
+)
17
+
18
+type rsyncStrategy struct {
19
+	Flags          []string
20
+	RshCommand     string
21
+	LocalExecutor  executor
22
+	RemoteExecutor executor
23
+}
24
+
25
+func newRsyncStrategy(f *clientcmd.Factory, c *cobra.Command, o *RsyncOptions) (copyStrategy, error) {
26
+	// Determine the rsh command to pass to the local rsync command
27
+	rsh := siblingCommand(c, "rsh")
28
+	rshCmd := []string{rsh, "-n", o.Namespace}
29
+	if len(o.ContainerName) > 0 {
30
+		rshCmd = append(rshCmd, "-c", o.ContainerName)
31
+	}
32
+	rshCmdStr := strings.Join(rshCmd, " ")
33
+	glog.V(4).Infof("Rsh command: %s", rshCmdStr)
34
+
35
+	remoteExec, err := newRemoteExecutor(f, o)
36
+	if err != nil {
37
+		return nil, err
38
+	}
39
+
40
+	// TODO: Expose more flags to send to the rsync command
41
+	// either as a special argument or any unrecognized arguments.
42
+	// The blocking-io flag is used to resolve a sync issue when
43
+	// copying from the pod to the local machine
44
+	flags := []string{"-a", "--blocking-io", "--omit-dir-times", "--numeric-ids"}
45
+	if o.Quiet {
46
+		flags = append(flags, "-q")
47
+	} else {
48
+		flags = append(flags, "-v")
49
+	}
50
+	if o.Delete {
51
+		flags = append(flags, "--delete")
52
+	}
53
+
54
+	return &rsyncStrategy{
55
+		Flags:          flags,
56
+		RshCommand:     rshCmdStr,
57
+		RemoteExecutor: remoteExec,
58
+		LocalExecutor:  newLocalExecutor(),
59
+	}, nil
60
+}
61
+
62
+func (r *rsyncStrategy) Copy(source, destination *pathSpec, out, errOut io.Writer) error {
63
+	glog.V(3).Infof("Copying files with rsync")
64
+	cmd := append([]string{"rsync"}, r.Flags...)
65
+	cmd = append(cmd, "-e", r.RshCommand, source.RsyncPath(), destination.RsyncPath())
66
+	err := r.LocalExecutor.Execute(cmd, nil, out, errOut)
67
+	if isExitError(err) {
68
+		// Determine whether rsync is present in the pod container
69
+		testRsyncErr := executeWithLogging(r.RemoteExecutor, testRsyncCommand)
70
+		if testRsyncErr != nil {
71
+			glog.V(4).Infof("error testing whether rsync is available: %v", testRsyncErr)
72
+			return strategySetupError("rsync not available in container")
73
+		}
74
+	}
75
+	return err
76
+}
77
+
78
+func (r *rsyncStrategy) Validate() error {
79
+	errs := []error{}
80
+	if len(r.RshCommand) == 0 {
81
+		errs = append(errs, errors.New("rsh command must be provided"))
82
+	}
83
+	if r.LocalExecutor == nil {
84
+		errs = append(errs, errors.New("local executor must not be nil"))
85
+	}
86
+	if r.RemoteExecutor == nil {
87
+		errs = append(errs, errors.New("remote executor must not be nil"))
88
+	}
89
+	if len(errs) > 0 {
90
+		return kerrors.NewAggregate(errs)
91
+	}
92
+	return nil
93
+}
94
+
95
+func (r *rsyncStrategy) String() string {
96
+	return "rsync"
97
+}
0 98
new file mode 100644
... ...
@@ -0,0 +1,282 @@
0
+package rsync
1
+
2
+import (
3
+	"bytes"
4
+	"errors"
5
+	"fmt"
6
+	"io"
7
+	"math/rand"
8
+	"net"
9
+	"strconv"
10
+	"strings"
11
+	"time"
12
+
13
+	"github.com/golang/glog"
14
+	"github.com/spf13/cobra"
15
+	kerrors "k8s.io/kubernetes/pkg/util/errors"
16
+	krand "k8s.io/kubernetes/pkg/util/rand"
17
+
18
+	"github.com/openshift/origin/pkg/cmd/util/clientcmd"
19
+)
20
+
21
+const (
22
+	// startDaemonScript is the script that will be run on the container to start the
23
+	// rsync daemon. It takes 3 format parameters:
24
+	// 1 - alternate random name for config file
25
+	// 2 - alternate random name for pid file
26
+	// 3 - port number to listen on
27
+	// The output of the script is the name of a file containing the PID for the started daemon
28
+	startDaemonScript = `set -e
29
+TMPDIR=${TMPDIR:-/tmp}
30
+CONFIGFILE=$(mktemp 2> /dev/null || (echo -n "" > ${TMPDIR}/%[1]s && echo ${TMPDIR}/%[1]s))
31
+PIDFILE=$(mktemp 2> /dev/null || (echo -n "" > ${TMPDIR}/%[2]s && echo ${TMPDIR}/%[2]s))
32
+rm $PIDFILE
33
+printf "pid file = ${PIDFILE}\n[root]\n  path = /\n  use chroot = no\n  read only = no" > $CONFIGFILE
34
+rsync --daemon --config=${CONFIGFILE} --port=%[3]d
35
+echo ${PIDFILE}
36
+`
37
+	portRangeFrom = 30000
38
+	portRangeTo   = 60000
39
+	remoteLabel   = "root"
40
+)
41
+
42
+var (
43
+	random = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
44
+)
45
+
46
+type rsyncDaemonStrategy struct {
47
+	Flags          []string
48
+	RemoteExecutor executor
49
+	PortForwarder  forwarder
50
+	LocalExecutor  executor
51
+
52
+	daemonPIDFile   string
53
+	daemonPort      int
54
+	localPort       int
55
+	portForwardChan chan struct{}
56
+}
57
+
58
+func localRsyncURL(port int, label string, path string) string {
59
+	return fmt.Sprintf("rsync://127.0.0.1:%d/%s/%s", port, label, strings.TrimPrefix(path, "/"))
60
+}
61
+
62
+func getUsedPorts(data string) map[int]struct{} {
63
+	ports := map[int]struct{}{}
64
+	lines := strings.Split(data, "\n")
65
+	for _, line := range lines {
66
+		parts := strings.Fields(line)
67
+		if len(parts) < 2 {
68
+			continue
69
+		}
70
+		// discard lines that don't contain connection data
71
+		if !strings.Contains(parts[0], ":") {
72
+			continue
73
+		}
74
+		glog.V(5).Infof("Determining port in use from: %s", line)
75
+		localAddress := strings.Split(parts[1], ":")
76
+		if len(localAddress) < 2 {
77
+			continue
78
+		}
79
+		port, err := strconv.ParseInt(localAddress[1], 16, 0)
80
+		if err == nil {
81
+			ports[int(port)] = struct{}{}
82
+		}
83
+	}
84
+	glog.V(2).Infof("Used ports in container: %#v", ports)
85
+	return ports
86
+}
87
+
88
+func randomPort() int {
89
+	return portRangeFrom + random.Intn(portRangeTo-portRangeFrom)
90
+}
91
+
92
+func localPort() (int, error) {
93
+	l, err := net.Listen("tcp", ":0")
94
+	if err != nil {
95
+		glog.V(1).Infof("Could not determine local free port: %v", err)
96
+		return 0, err
97
+	}
98
+	defer l.Close()
99
+	glog.V(1).Infof("Found listener port at: %s", l.Addr().String())
100
+	addr := strings.Split(l.Addr().String(), ":")
101
+	port, err := strconv.Atoi(addr[len(addr)-1])
102
+	if err != nil {
103
+		glog.V(1).Infof("Could not parse listener address %#v: %v", addr, err)
104
+		return 0, err
105
+	}
106
+	return port, nil
107
+}
108
+
109
+func (s *rsyncDaemonStrategy) getFreePort() (int, error) {
110
+	cmd := []string{"cat", "/proc/net/tcp", "/proc/net/tcp6"}
111
+	tcpData := &bytes.Buffer{}
112
+	cmdErr := &bytes.Buffer{}
113
+	usedPorts := map[int]struct{}{}
114
+	err := s.RemoteExecutor.Execute(cmd, nil, tcpData, cmdErr)
115
+	if err == nil {
116
+		usedPorts = getUsedPorts(tcpData.String())
117
+	} else {
118
+		glog.V(4).Infof("Error getting free port data: %v, Err: %s", err, cmdErr.String())
119
+	}
120
+	tries := 0
121
+	for {
122
+		tries++
123
+		if tries > 20 {
124
+			glog.V(4).Infof("Too many attempts trying to find free port")
125
+			break
126
+		}
127
+		port := randomPort()
128
+		if _, used := usedPorts[port]; !used {
129
+			glog.V(4).Infof("Found free container port: %d", port)
130
+			return port, nil
131
+		}
132
+	}
133
+	return 0, fmt.Errorf("could not find a free port")
134
+
135
+}
136
+
137
+func (s *rsyncDaemonStrategy) startRemoteDaemon() error {
138
+	port, err := s.getFreePort()
139
+	if err != nil {
140
+		return err
141
+	}
142
+	cmdOut := &bytes.Buffer{}
143
+	cmdErr := &bytes.Buffer{}
144
+	cmdIn := bytes.NewBufferString(fmt.Sprintf(startDaemonScript, krand.String(32), krand.String(32), port))
145
+	err = s.RemoteExecutor.Execute([]string{"sh"}, cmdIn, cmdOut, cmdErr)
146
+	if err != nil {
147
+		glog.V(1).Infof("Error starting rsync daemon: %v. Out: %s, Err: %s\n", err, cmdOut.String(), cmdErr.String())
148
+		return err
149
+	}
150
+	s.daemonPort = port
151
+	s.daemonPIDFile = strings.TrimSpace(cmdOut.String())
152
+	return nil
153
+}
154
+
155
+func (s *rsyncDaemonStrategy) killRemoteDaemon() error {
156
+	cmd := []string{"sh", "-c", fmt.Sprintf("kill $(cat %s)", s.daemonPIDFile)}
157
+	cmdOut := &bytes.Buffer{}
158
+	cmdErr := &bytes.Buffer{}
159
+	err := s.RemoteExecutor.Execute(cmd, nil, cmdOut, cmdErr)
160
+	if err != nil {
161
+		glog.V(1).Infof("Error killing rsync daemon: %v. Out: %s, Err: %s\n", err, cmdOut.String(), cmdErr.String())
162
+	}
163
+	return err
164
+}
165
+
166
+func (s *rsyncDaemonStrategy) startPortForward() error {
167
+	var err error
168
+	s.localPort, err = localPort()
169
+	if err != nil {
170
+		// Attempt with a random port if other methods fail
171
+		s.localPort = randomPort()
172
+	}
173
+	s.portForwardChan = make(chan struct{})
174
+	return s.PortForwarder.ForwardPorts([]string{fmt.Sprintf("%d:%d", s.localPort, s.daemonPort)}, s.portForwardChan)
175
+}
176
+
177
+func (s *rsyncDaemonStrategy) stopPortForward() {
178
+	close(s.portForwardChan)
179
+}
180
+
181
+func (s *rsyncDaemonStrategy) copyUsingDaemon(source, destination *pathSpec, out, errOut io.Writer) error {
182
+	glog.V(3).Infof("Copying files with rsync daemon")
183
+	cmd := append([]string{"rsync"}, s.Flags...)
184
+	var sourceArg, destinationArg string
185
+	if source.Local() {
186
+		sourceArg = source.RsyncPath()
187
+	} else {
188
+		sourceArg = localRsyncURL(s.localPort, remoteLabel, source.Path)
189
+	}
190
+	if destination.Local() {
191
+		destinationArg = destination.RsyncPath()
192
+	} else {
193
+		destinationArg = localRsyncURL(s.localPort, remoteLabel, destination.Path)
194
+	}
195
+	cmd = append(cmd, sourceArg, destinationArg)
196
+	err := s.LocalExecutor.Execute(cmd, nil, out, errOut)
197
+	if err != nil {
198
+		// Determine whether rsync is present in the pod container
199
+		testRsyncErr := executeWithLogging(s.RemoteExecutor, testRsyncCommand)
200
+		if testRsyncErr != nil {
201
+			return strategySetupError("rsync not available in container")
202
+		}
203
+	}
204
+	return err
205
+}
206
+
207
+func (s *rsyncDaemonStrategy) Copy(source, destination *pathSpec, out, errOut io.Writer) error {
208
+	err := s.startRemoteDaemon()
209
+	if err != nil {
210
+		if isExitError(err) {
211
+			return strategySetupError(fmt.Sprintf("cannot start remote rsync daemon: %v", err))
212
+		} else {
213
+			return err
214
+		}
215
+	}
216
+	defer s.killRemoteDaemon()
217
+	err = s.startPortForward()
218
+	if err != nil {
219
+		if isExitError(err) {
220
+			return strategySetupError(fmt.Sprintf("cannot start port-forward: %v", err))
221
+		} else {
222
+			return err
223
+		}
224
+	}
225
+	defer s.stopPortForward()
226
+
227
+	err = s.copyUsingDaemon(source, destination, out, errOut)
228
+	return err
229
+}
230
+
231
+func (s *rsyncDaemonStrategy) Validate() error {
232
+	errs := []error{}
233
+	if s.PortForwarder == nil {
234
+		errs = append(errs, errors.New("port forwarder must be provided"))
235
+	}
236
+	if s.LocalExecutor == nil {
237
+		errs = append(errs, errors.New("local executor must be provided"))
238
+	}
239
+	if s.RemoteExecutor == nil {
240
+		errs = append(errs, errors.New("remote executor must be provided"))
241
+	}
242
+	if len(errs) > 0 {
243
+		return kerrors.NewAggregate(errs)
244
+	}
245
+	return nil
246
+}
247
+
248
+func newRsyncDaemonStrategy(f *clientcmd.Factory, c *cobra.Command, o *RsyncOptions) (copyStrategy, error) {
249
+	// TODO: Expose more flags to send to the rsync command
250
+	// either as a special argument or any unrecognized arguments.
251
+	flags := []string{"-a", "--omit-dir-times", "--numeric-ids"}
252
+	if o.Quiet {
253
+		flags = append(flags, "-q")
254
+	} else {
255
+		flags = append(flags, "-v")
256
+	}
257
+	if o.Delete {
258
+		flags = append(flags, "--delete")
259
+	}
260
+
261
+	remoteExec, err := newRemoteExecutor(f, o)
262
+	if err != nil {
263
+		return nil, err
264
+	}
265
+
266
+	forwarder, err := newPortForwarder(f, o)
267
+	if err != nil {
268
+		return nil, err
269
+	}
270
+
271
+	return &rsyncDaemonStrategy{
272
+		Flags:          flags,
273
+		RemoteExecutor: remoteExec,
274
+		LocalExecutor:  newLocalExecutor(),
275
+		PortForwarder:  forwarder,
276
+	}, nil
277
+}
278
+
279
+func (s *rsyncDaemonStrategy) String() string {
280
+	return "rsync-daemon"
281
+}
0 282
new file mode 100644
... ...
@@ -0,0 +1,177 @@
0
+package rsync
1
+
2
+import (
3
+	"errors"
4
+	"fmt"
5
+	"io"
6
+	"io/ioutil"
7
+	"os"
8
+	"path/filepath"
9
+	"strings"
10
+
11
+	"github.com/golang/glog"
12
+	"github.com/openshift/source-to-image/pkg/tar"
13
+	"github.com/spf13/cobra"
14
+	kerrors "k8s.io/kubernetes/pkg/util/errors"
15
+
16
+	"github.com/openshift/origin/pkg/cmd/util/clientcmd"
17
+)
18
+
19
+type tarStrategy struct {
20
+	Quiet          bool
21
+	Delete         bool
22
+	Tar            tar.Tar
23
+	RemoteExecutor executor
24
+}
25
+
26
+func newTarStrategy(f *clientcmd.Factory, c *cobra.Command, o *RsyncOptions) (copyStrategy, error) {
27
+
28
+	tarHelper := tar.New()
29
+	tarHelper.SetExclusionPattern(nil)
30
+
31
+	remoteExec, err := newRemoteExecutor(f, o)
32
+	if err != nil {
33
+		return nil, err
34
+	}
35
+
36
+	return &tarStrategy{
37
+		Quiet:          o.Quiet,
38
+		Delete:         o.Delete,
39
+		Tar:            tarHelper,
40
+		RemoteExecutor: remoteExec,
41
+	}, nil
42
+}
43
+
44
+func deleteContents(dir string) error {
45
+	files, err := ioutil.ReadDir(dir)
46
+	if err != nil {
47
+		return err
48
+	}
49
+	for _, f := range files {
50
+		if f.IsDir() {
51
+			err = os.RemoveAll(f.Name())
52
+		} else {
53
+			err = os.Remove(f.Name())
54
+		}
55
+		if err != nil {
56
+			return err
57
+		}
58
+	}
59
+	return nil
60
+
61
+}
62
+
63
+func deleteFiles(spec *pathSpec, remoteExecutor executor) error {
64
+	if spec.Local() {
65
+		return deleteContents(spec.Path)
66
+	}
67
+	deleteCmd := []string{"sh", "-c", fmt.Sprintf("rm -rf %s", filepath.Join(spec.Path, "*"))}
68
+	return executeWithLogging(remoteExecutor, deleteCmd)
69
+}
70
+
71
+func (r *tarStrategy) Copy(source, destination *pathSpec, out, errOut io.Writer) error {
72
+
73
+	glog.V(3).Infof("Copying files with tar")
74
+	if r.Delete {
75
+		// Implement the rsync --delete flag as a separate call to first delete directory contents
76
+		err := deleteFiles(destination, r.RemoteExecutor)
77
+		if err != nil {
78
+			return fmt.Errorf("unable to delete files in destination: %v", err)
79
+		}
80
+	}
81
+	tmp, err := ioutil.TempFile("", "rsync")
82
+	if err != nil {
83
+		return fmt.Errorf("cannot create local temporary file for tar: %v", err)
84
+	}
85
+	defer os.Remove(tmp.Name())
86
+
87
+	// Create tar
88
+	if source.Local() {
89
+		glog.V(4).Infof("Creating local tar file %s from local path %s", tmp.Name(), source.Path)
90
+		err = tarLocal(r.Tar, source.Path, tmp)
91
+		if err != nil {
92
+			return fmt.Errorf("error creating local tar of source directory: %v", err)
93
+		}
94
+	} else {
95
+		glog.V(4).Infof("Creating local tar file %s from remote path %s", tmp.Name(), source.Path)
96
+		err = tarRemote(r.RemoteExecutor, source.Path, tmp, errOut)
97
+		if err != nil {
98
+			return fmt.Errorf("error creating remote tar of source directory: %v", err)
99
+		}
100
+	}
101
+
102
+	err = tmp.Close()
103
+	if err != nil {
104
+		return fmt.Errorf("error closing temporary tar file %s: %v", tmp.Name(), err)
105
+	}
106
+	tmp, err = os.Open(tmp.Name())
107
+	if err != nil {
108
+		return fmt.Errorf("cannot open temporary tar file %s: %v", tmp.Name(), err)
109
+	}
110
+	defer tmp.Close()
111
+
112
+	// Extract tar
113
+	if destination.Local() {
114
+		glog.V(4).Infof("Untarring temp file %s to local directory %s", tmp.Name(), destination.Path)
115
+		err = untarLocal(r.Tar, destination.Path, tmp)
116
+	} else {
117
+		glog.V(4).Infof("Untarring temp file %s to remote directory %s", tmp.Name(), destination.Path)
118
+		err = untarRemote(r.RemoteExecutor, destination.Path, r.Quiet, tmp, out, errOut)
119
+	}
120
+	if err != nil {
121
+		return fmt.Errorf("error extracting tar at destination directory: %v", err)
122
+	}
123
+	return nil
124
+}
125
+
126
+func (r *tarStrategy) Validate() error {
127
+	errs := []error{}
128
+	if r.Tar == nil {
129
+		errs = append(errs, errors.New("tar helper must be provided"))
130
+	}
131
+	if r.RemoteExecutor == nil {
132
+		errs = append(errs, errors.New("remote executor must be provided"))
133
+	}
134
+	if len(errs) > 0 {
135
+		return kerrors.NewAggregate(errs)
136
+	}
137
+	return nil
138
+}
139
+
140
+func (r *tarStrategy) String() string {
141
+	return "tar"
142
+}
143
+
144
+func tarRemote(exec executor, sourceDir string, out, errOut io.Writer) error {
145
+	glog.V(4).Infof("Tarring %s remotely", sourceDir)
146
+	cmd := []string{"tar", "-C", sourceDir, "-c", "."}
147
+	glog.V(4).Infof("Remote tar command: %s", strings.Join(cmd, " "))
148
+	return exec.Execute(cmd, nil, out, errOut)
149
+}
150
+
151
+func tarLocal(tar tar.Tar, sourceDir string, w io.Writer) error {
152
+	glog.V(4).Infof("Tarring %s locally", sourceDir)
153
+	// includeParent mimics rsync's behavior. When the source path ends in a path
154
+	// separator, then only the contents of the directory are copied. Otherwise,
155
+	// the directory itself is copied.
156
+	includeParent := true
157
+	if strings.HasSuffix(sourceDir, string(filepath.Separator)) {
158
+		includeParent = false
159
+		sourceDir = sourceDir[:len(sourceDir)-1]
160
+	}
161
+	return tar.CreateTarStream(sourceDir, includeParent, w)
162
+}
163
+
164
+func untarLocal(tar tar.Tar, destinationDir string, r io.Reader) error {
165
+	glog.V(4).Infof("Extracting tar locally to %s", destinationDir)
166
+	return tar.ExtractTarStream(destinationDir, r)
167
+}
168
+
169
+func untarRemote(exec executor, destinationDir string, quiet bool, in io.Reader, out, errOut io.Writer) error {
170
+	cmd := []string{"tar", "-C", destinationDir, "-x"}
171
+	if !quiet {
172
+		cmd = append(cmd, "-v")
173
+	}
174
+	glog.V(4).Infof("Extracting tar remotely with command: %s", strings.Join(cmd, " "))
175
+	return exec.Execute(cmd, in, out, errOut)
176
+}
0 177
new file mode 100644
... ...
@@ -0,0 +1,34 @@
0
+package rsync
1
+
2
+import (
3
+	"io"
4
+	"os/exec"
5
+	"strings"
6
+
7
+	"github.com/golang/glog"
8
+)
9
+
10
+// localExecutor will execute commands on the local machine
11
+type localExecutor struct{}
12
+
13
+// ensure localExecutor implements the executor interface
14
+var _ executor = &localExecutor{}
15
+
16
+// Execute will run a command locally
17
+func (*localExecutor) Execute(command []string, in io.Reader, out, errOut io.Writer) error {
18
+	glog.V(3).Infof("Local executor running command: %s", strings.Join(command, " "))
19
+	cmd := exec.Command(command[0], command[1:]...)
20
+	cmd.Stdout = out
21
+	cmd.Stderr = errOut
22
+	cmd.Stdin = in
23
+	err := cmd.Run()
24
+	if err != nil {
25
+		glog.V(4).Infof("Error from local command execution: %v", err)
26
+	}
27
+	return err
28
+}
29
+
30
+// newLocalExecutor instantiates a local executor
31
+func newLocalExecutor() executor {
32
+	return &localExecutor{}
33
+}
0 34
new file mode 100644
... ...
@@ -0,0 +1,72 @@
0
+package rsync
1
+
2
+import (
3
+	"io"
4
+	"strings"
5
+
6
+	"github.com/golang/glog"
7
+	kclient "k8s.io/kubernetes/pkg/client/unversioned"
8
+	kubecmd "k8s.io/kubernetes/pkg/kubectl/cmd"
9
+
10
+	"github.com/openshift/origin/pkg/cmd/util/clientcmd"
11
+)
12
+
13
+// remoteExecutor will execute commands on a given pod/container by using the kube Exec command
14
+type remoteExecutor struct {
15
+	Namespace     string
16
+	PodName       string
17
+	ContainerName string
18
+	Client        *kclient.Client
19
+	Config        *kclient.Config
20
+}
21
+
22
+// Ensure it implements the executor interface
23
+var _ executor = &remoteExecutor{}
24
+
25
+// Execute will run a command in a pod
26
+func (e *remoteExecutor) Execute(command []string, in io.Reader, out, errOut io.Writer) error {
27
+	glog.V(3).Infof("Remote executor running command: %s", strings.Join(command, " "))
28
+	execOptions := &kubecmd.ExecOptions{
29
+		In:            in,
30
+		Out:           out,
31
+		Err:           errOut,
32
+		Stdin:         in != nil,
33
+		Executor:      &kubecmd.DefaultRemoteExecutor{},
34
+		Client:        e.Client,
35
+		Config:        e.Config,
36
+		PodName:       e.PodName,
37
+		ContainerName: e.ContainerName,
38
+		Namespace:     e.Namespace,
39
+		Command:       command,
40
+	}
41
+	err := execOptions.Validate()
42
+	if err != nil {
43
+		glog.V(4).Infof("Error from remote command validation: %v", err)
44
+		return err
45
+	}
46
+	err = execOptions.Run()
47
+	if err != nil {
48
+		glog.V(4).Infof("Error from remote execution: %v", err)
49
+	}
50
+	return err
51
+}
52
+
53
+func newRemoteExecutor(f *clientcmd.Factory, o *RsyncOptions) (executor, error) {
54
+	config, err := f.ClientConfig()
55
+	if err != nil {
56
+		return nil, err
57
+	}
58
+
59
+	client, err := f.Client()
60
+	if err != nil {
61
+		return nil, err
62
+	}
63
+
64
+	return &remoteExecutor{
65
+		Namespace:     o.Namespace,
66
+		PodName:       o.PodName(),
67
+		ContainerName: o.ContainerName,
68
+		Config:        config,
69
+		Client:        client,
70
+	}, nil
71
+}
0 72
new file mode 100644
... ...
@@ -0,0 +1,67 @@
0
+package rsync
1
+
2
+import (
3
+	kclient "k8s.io/kubernetes/pkg/client/unversioned"
4
+	"k8s.io/kubernetes/pkg/client/unversioned/portforward"
5
+	"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
6
+
7
+	"github.com/openshift/origin/pkg/cmd/util/clientcmd"
8
+)
9
+
10
+// portForwarder starts port forwarding to a given pod
11
+type portForwarder struct {
12
+	Namespace string
13
+	PodName   string
14
+	Client    *kclient.Client
15
+	Config    *kclient.Config
16
+}
17
+
18
+// ensure that portForwarder implements the forwarder interface
19
+var _ forwarder = &portForwarder{}
20
+
21
+// ForwardPorts will forward a set of ports from a pod, the stopChan will stop the forwarding
22
+// when it's closed or receives a struct{}
23
+func (f *portForwarder) ForwardPorts(ports []string, stopChan <-chan struct{}) error {
24
+	req := f.Client.RESTClient.Post().
25
+		Resource("pods").
26
+		Namespace(f.Namespace).
27
+		Name(f.PodName).
28
+		SubResource("portforward")
29
+
30
+	dialer, err := remotecommand.NewExecutor(f.Config, "POST", req.URL())
31
+	if err != nil {
32
+		return err
33
+	}
34
+	fw, err := portforward.New(dialer, ports, stopChan)
35
+	if err != nil {
36
+		return err
37
+	}
38
+	ready := make(chan struct{})
39
+	errChan := make(chan error)
40
+	fw.Ready = ready
41
+	go func() { errChan <- fw.ForwardPorts() }()
42
+	select {
43
+	case <-ready:
44
+		return nil
45
+	case err = <-errChan:
46
+		return err
47
+	}
48
+}
49
+
50
+// newPortForwarder creates a new forwarder for use with rsync
51
+func newPortForwarder(f *clientcmd.Factory, o *RsyncOptions) (forwarder, error) {
52
+	client, err := f.Client()
53
+	if err != nil {
54
+		return nil, err
55
+	}
56
+	config, err := f.ClientConfig()
57
+	if err != nil {
58
+		return nil, err
59
+	}
60
+	return &portForwarder{
61
+		Namespace: o.Namespace,
62
+		PodName:   o.PodName(),
63
+		Client:    client,
64
+		Config:    config,
65
+	}, nil
66
+}
0 67
new file mode 100644
... ...
@@ -0,0 +1,87 @@
0
+package rsync
1
+
2
+import (
3
+	"fmt"
4
+	"os"
5
+	"path/filepath"
6
+	"strings"
7
+
8
+	kvalidation "k8s.io/kubernetes/pkg/api/validation"
9
+)
10
+
11
+// pathSpec represents a path (remote or local) given as a source or destination
12
+// argument to the rsync command
13
+type pathSpec struct {
14
+	PodName string
15
+	Path    string
16
+}
17
+
18
+// Local returns true if the path is a local machine path
19
+func (s *pathSpec) Local() bool {
20
+	return len(s.PodName) == 0
21
+}
22
+
23
+// RsyncPath returns a pathSpec in the form that can be used directly by the OS rsync command
24
+func (s *pathSpec) RsyncPath() string {
25
+	if len(s.PodName) > 0 {
26
+		return fmt.Sprintf("%s:%s", s.PodName, s.Path)
27
+	}
28
+	if isWindows() {
29
+		return convertWindowsPath(s.Path)
30
+	}
31
+	return s.Path
32
+}
33
+
34
+// Validate returns an error if the pathSpec is not valid.
35
+func (s *pathSpec) Validate() error {
36
+	if s.Local() {
37
+		info, err := os.Stat(s.Path)
38
+		if err != nil {
39
+			return fmt.Errorf("invalid path %s: %v", s.Path, err)
40
+		}
41
+		if !info.IsDir() {
42
+			return fmt.Errorf("path %s must point to a directory", s.Path)
43
+		}
44
+	}
45
+	return nil
46
+}
47
+
48
+// parsePathSpec parses a string argument into a pathSpec object
49
+func parsePathSpec(path string) (*pathSpec, error) {
50
+	parts := strings.SplitN(path, ":", 2)
51
+	if len(parts) == 1 || (isWindows() && len(parts[0]) == 1) {
52
+		return &pathSpec{
53
+			Path: path,
54
+		}, nil
55
+	}
56
+	valid, msg := kvalidation.ValidatePodName(parts[0], false)
57
+	if !valid {
58
+		return nil, fmt.Errorf("invalid pod name %s: %s", parts[0], msg)
59
+	}
60
+	return &pathSpec{
61
+		PodName: parts[0],
62
+		Path:    parts[1],
63
+	}, nil
64
+}
65
+
66
+// convertWindowsPath converts a windows native path to a path that can be used by
67
+// the rsync command in windows.
68
+// It can take one of three forms:
69
+// 1 - relative to current dir or relative to current drive
70
+//     \mydir\subdir or subdir
71
+//     For these, it's only sufficient to change '\' to '/'
72
+// 2 - absolute path with drive
73
+//     d:\mydir\subdir
74
+//     These need to be converted to /cygdrive/<drive-letter>/rest/of/path
75
+// 3 - UNC path
76
+//     \\server\c$\mydir\subdir
77
+//     For these it should be sufficient to change '\' to '/'
78
+func convertWindowsPath(path string) string {
79
+	// If the path starts with a single letter followed by a ":", it needs to
80
+	// be converted /cygwin/<drive>/path form
81
+	parts := strings.SplitN(path, ":", 2)
82
+	if len(parts) > 1 && len(parts[0]) == 1 {
83
+		return fmt.Sprintf("/cygdrive/%s/%s", strings.ToLower(parts[0]), strings.TrimPrefix(filepath.ToSlash(parts[1]), "/"))
84
+	}
85
+	return filepath.ToSlash(path)
86
+}
0 87
new file mode 100644
... ...
@@ -0,0 +1,226 @@
0
+package rsync
1
+
2
+import (
3
+	"errors"
4
+	"fmt"
5
+	"io"
6
+
7
+	"github.com/golang/glog"
8
+	"github.com/spf13/cobra"
9
+	kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
10
+
11
+	"github.com/openshift/origin/pkg/cmd/util/clientcmd"
12
+)
13
+
14
+const (
15
+	// RsyncRecommendedName is the recommended name for the rsync command
16
+	RsyncRecommendedName = "rsync"
17
+
18
+	rsyncLong = `
19
+Copy local files to or from a pod container
20
+
21
+This command will copy local files to or from a remote container.
22
+It only copies the changed files using the rsync command from your OS.
23
+To ensure optimum performance, install rsync locally. In UNIX systems, 
24
+use your package manager. In Windows, install cwRsync from 
25
+https://www.itefix.net/cwrsync.
26
+
27
+If no container is specified, the first container of the pod is used 
28
+for the copy.`
29
+
30
+	rsyncExample = `
31
+  # Synchronize a local directory with a pod directory
32
+  $ %[1]s ./local/dir/ POD:/remote/dir
33
+  
34
+  # Synchronize a pod directory with a local directory
35
+  $ %[1]s POD:/remote/dir/ ./local/dir`
36
+
37
+	noRsyncUnixWarning    = "rsync command not found in path. Please use your package manager to install it."
38
+	noRsyncWindowsWarning = "rsync command not found in path. Download cwRsync for windows and add it to your path."
39
+)
40
+
41
+// copyStrategy
42
+type copyStrategy interface {
43
+	Copy(source, destination *pathSpec, out, errOut io.Writer) error
44
+	Validate() error
45
+	String() string
46
+}
47
+
48
+// executor executes commands
49
+type executor interface {
50
+	Execute(command []string, in io.Reader, out, err io.Writer) error
51
+}
52
+
53
+// forwarder forwards pod ports to the local machine
54
+type forwarder interface {
55
+	ForwardPorts(ports []string, stopChan <-chan struct{}) error
56
+}
57
+
58
+// RsyncOptions holds the options to execute the sync command
59
+type RsyncOptions struct {
60
+	Namespace     string
61
+	ContainerName string
62
+	Source        *pathSpec
63
+	Destination   *pathSpec
64
+	Strategy      copyStrategy
65
+	StrategyName  string
66
+	Quiet         bool
67
+	Delete        bool
68
+
69
+	Out    io.Writer
70
+	ErrOut io.Writer
71
+}
72
+
73
+// NewCmdRsync creates a new sync command
74
+func NewCmdRsync(name, parent string, f *clientcmd.Factory, out, errOut io.Writer) *cobra.Command {
75
+	o := RsyncOptions{
76
+		Out:    out,
77
+		ErrOut: errOut,
78
+	}
79
+	cmd := &cobra.Command{
80
+		Use:     fmt.Sprintf("%s SOURCE DESTINATION", name),
81
+		Short:   "Copy files between local filesystem and a pod",
82
+		Long:    rsyncLong,
83
+		Example: fmt.Sprintf(rsyncExample, parent+" "+name),
84
+		Run: func(c *cobra.Command, args []string) {
85
+			kcmdutil.CheckErr(o.Complete(f, c, args))
86
+			kcmdutil.CheckErr(o.Validate())
87
+			kcmdutil.CheckErr(o.RunRsync())
88
+		},
89
+	}
90
+
91
+	cmd.Flags().StringVarP(&o.ContainerName, "container", "c", "", "Container within the pod")
92
+	cmd.Flags().StringVar(&o.StrategyName, "strategy", "", "Specify which strategy to use for copy: rsync, rsync-daemon, or tar")
93
+
94
+	// Flags for rsync options, Must match rsync flag names
95
+	cmd.Flags().BoolVarP(&o.Quiet, "quiet", "q", false, "Suppress non-error messages")
96
+	cmd.Flags().BoolVar(&o.Delete, "delete", false, "Delete files not present in source")
97
+	return cmd
98
+}
99
+
100
+func warnNoRsync() {
101
+	if isWindows() {
102
+		glog.Warningf(noRsyncWindowsWarning)
103
+		return
104
+	}
105
+	glog.Warningf(noRsyncUnixWarning)
106
+}
107
+
108
+func (o *RsyncOptions) determineStrategy(f *clientcmd.Factory, cmd *cobra.Command, name string) (copyStrategy, error) {
109
+	switch name {
110
+	case "":
111
+		// Default case, use an rsync strategy first and then fallback to Tar
112
+		strategies := copyStrategies{}
113
+		if hasLocalRsync() {
114
+			if isWindows() {
115
+				strategy, err := newRsyncDaemonStrategy(f, cmd, o)
116
+				if err != nil {
117
+					return nil, err
118
+				}
119
+				strategies = append(strategies, strategy)
120
+			} else {
121
+				strategy, err := newRsyncStrategy(f, cmd, o)
122
+				if err != nil {
123
+					return nil, err
124
+				}
125
+				strategies = append(strategies, strategy)
126
+			}
127
+		} else {
128
+			warnNoRsync()
129
+		}
130
+		strategy, err := newTarStrategy(f, cmd, o)
131
+		if err != nil {
132
+			return nil, err
133
+		}
134
+		strategies = append(strategies, strategy)
135
+		return strategies, nil
136
+	case "rsync":
137
+		return newRsyncStrategy(f, cmd, o)
138
+
139
+	case "rsync-daemon":
140
+		return newRsyncDaemonStrategy(f, cmd, o)
141
+
142
+	case "tar":
143
+		return newTarStrategy(f, cmd, o)
144
+
145
+	default:
146
+		return nil, fmt.Errorf("unknown strategy: %s", name)
147
+	}
148
+}
149
+
150
+// Complete verifies command line arguments and loads data from the command environment
151
+func (o *RsyncOptions) Complete(f *clientcmd.Factory, cmd *cobra.Command, args []string) error {
152
+	switch n := len(args); {
153
+	case n == 0:
154
+		cmd.Help()
155
+		fallthrough
156
+	case n < 2:
157
+		return kcmdutil.UsageError(cmd, "SOURCE_DIR and POD:DESTINATION_DIR are required arguments")
158
+	case n > 2:
159
+		return kcmdutil.UsageError(cmd, "only SOURCE_DIR and POD:DESTINATION_DIR should be specified as arguments")
160
+	}
161
+
162
+	// Set main command arguments
163
+	var err error
164
+	o.Source, err = parsePathSpec(args[0])
165
+	if err != nil {
166
+		return err
167
+	}
168
+	o.Destination, err = parsePathSpec(args[1])
169
+	if err != nil {
170
+		return err
171
+	}
172
+
173
+	namespace, _, err := f.DefaultNamespace()
174
+	if err != nil {
175
+		return err
176
+	}
177
+	o.Namespace = namespace
178
+
179
+	o.Strategy, err = o.determineStrategy(f, cmd, o.StrategyName)
180
+	if err != nil {
181
+		return err
182
+	}
183
+
184
+	return nil
185
+}
186
+
187
+// Validate checks that SyncOptions has all necessary fields
188
+func (o *RsyncOptions) Validate() error {
189
+	if o.Out == nil || o.ErrOut == nil {
190
+		return errors.New("output and error streams must be specified")
191
+	}
192
+	if o.Source == nil || o.Destination == nil {
193
+		return errors.New("source and destination must be specified")
194
+	}
195
+	if err := o.Source.Validate(); err != nil {
196
+		return err
197
+	}
198
+	if err := o.Destination.Validate(); err != nil {
199
+		return err
200
+	}
201
+	// If source and destination are both local or both remote throw an error
202
+	if o.Source.Local() == o.Destination.Local() {
203
+		return errors.New("rsync is only valid between a local directory and a pod directory; " +
204
+			"specify a pod directory as [PODNAME]:[DIR]")
205
+	}
206
+	if err := o.Strategy.Validate(); err != nil {
207
+		return err
208
+	}
209
+
210
+	return nil
211
+}
212
+
213
+// RunRsync copies files from source to destination
214
+func (o *RsyncOptions) RunRsync() error {
215
+	return o.Strategy.Copy(o.Source, o.Destination, o.Out, o.ErrOut)
216
+}
217
+
218
+// PodName returns the name of the pod as specified in either the
219
+// the source or destination arguments
220
+func (o *RsyncOptions) PodName() string {
221
+	if len(o.Source.PodName) > 0 {
222
+		return o.Source.PodName
223
+	}
224
+	return o.Destination.PodName
225
+}
0 226
new file mode 100644
... ...
@@ -0,0 +1,64 @@
0
+package rsync
1
+
2
+import (
3
+	"bytes"
4
+	"os"
5
+	"os/exec"
6
+	"runtime"
7
+	"strings"
8
+
9
+	"github.com/golang/glog"
10
+	"github.com/spf13/cobra"
11
+)
12
+
13
+// executeWithLogging will execute a command and log its output
14
+func executeWithLogging(e executor, cmd []string) error {
15
+	w := &bytes.Buffer{}
16
+	err := e.Execute(cmd, nil, w, w)
17
+	glog.V(4).Infof("%s", w.String())
18
+	glog.V(4).Infof("error: %v", err)
19
+	return err
20
+}
21
+
22
+// isWindows returns true if the current platform is windows
23
+func isWindows() bool {
24
+	return runtime.GOOS == "windows"
25
+}
26
+
27
+// hasLocalRsync returns true if rsync is in current exec path
28
+func hasLocalRsync() bool {
29
+	_, err := exec.LookPath("rsync")
30
+	if err != nil {
31
+		return false
32
+	}
33
+	return true
34
+}
35
+
36
+// siblingCommand returns a sibling command to the current command
37
+func siblingCommand(cmd *cobra.Command, name string) string {
38
+	c := cmd.Parent()
39
+	command := []string{}
40
+	for c != nil {
41
+		glog.V(5).Infof("Found parent command: %s", c.Name())
42
+		command = append([]string{c.Name()}, command...)
43
+		c = c.Parent()
44
+	}
45
+	// Replace the root command with what was actually used
46
+	// in the command line
47
+	glog.V(4).Infof("Setting root command to: %s", os.Args[0])
48
+	command[0] = os.Args[0]
49
+
50
+	// Append the sibling command
51
+	command = append(command, name)
52
+	glog.V(4).Infof("The sibling command is: %s", strings.Join(command, " "))
53
+
54
+	return strings.Join(command, " ")
55
+}
56
+
57
+func isExitError(err error) bool {
58
+	if err == nil {
59
+		return false
60
+	}
61
+	_, exitErr := err.(*exec.ExitError)
62
+	return exitErr
63
+}
... ...
@@ -2,6 +2,8 @@ package cli
2 2
 
3 3
 import (
4 4
 	"fmt"
5
+	"io/ioutil"
6
+	"strings"
5 7
 
6 8
 	g "github.com/onsi/ginkgo"
7 9
 	o "github.com/onsi/gomega"
... ...
@@ -11,7 +13,7 @@ import (
11 11
 	exutil "github.com/openshift/origin/test/extended/util"
12 12
 )
13 13
 
14
-var _ = g.Describe("cli: parallel: rsync", func() {
14
+var _ = g.Describe("cli: parallel: oc rsync", func() {
15 15
 	defer g.GinkgoRecover()
16 16
 
17 17
 	var (
... ...
@@ -19,10 +21,13 @@ var _ = g.Describe("cli: parallel: rsync", func() {
19 19
 		templatePath = exutil.FixturePath("..", "..", "examples", "jenkins", "jenkins-ephemeral-template.json")
20 20
 		sourcePath1  = exutil.FixturePath("..", "..", "examples", "image-streams")
21 21
 		sourcePath2  = exutil.FixturePath("..", "..", "examples", "sample-app")
22
+		strategies   = []string{"rsync", "rsync-daemon", "tar"}
22 23
 	)
23 24
 
24
-	g.Describe("oc rsync", func() {
25
-		g.It("should copy files with rsync and tar to running container", func() {
25
+	g.Describe("copy by strategy", func() {
26
+		var podName string
27
+
28
+		g.JustBeforeEach(func() {
26 29
 			oc.SetOutputDir(exutil.TestContext.OutputDir)
27 30
 
28 31
 			g.By(fmt.Sprintf("calling oc new-app -f %q", templatePath))
... ...
@@ -38,31 +43,70 @@ var _ = g.Describe("cli: parallel: rsync", func() {
38 38
 			pods, err := oc.KubeREST().Pods(oc.Namespace()).List(selector, fields.Everything())
39 39
 			o.Expect(err).NotTo(o.HaveOccurred())
40 40
 			o.Expect(len(pods.Items)).ToNot(o.BeZero())
41
-			podName := pods.Items[0].Name
41
+			podName = pods.Items[0].Name
42
+		})
42 43
 
43
-			g.By(fmt.Sprintf("calling oc rsync %s %s:/tmp", sourcePath1, podName))
44
-			err = oc.Run("rsync").Args(sourcePath1, fmt.Sprintf("%s:/tmp", podName)).Execute()
45
-			o.Expect(err).NotTo(o.HaveOccurred())
44
+		testRsyncFunc := func(strategy string) func() {
45
+			return func() {
46
+				g.By(fmt.Sprintf("Calling oc rsync %s %s:/tmp --strategy=%s", sourcePath1, podName, strategy))
47
+				err := oc.Run("rsync").Args(
48
+					sourcePath1,
49
+					fmt.Sprintf("%s:/tmp", podName),
50
+					fmt.Sprintf("--strategy=%s", strategy)).Execute()
51
+				o.Expect(err).NotTo(o.HaveOccurred())
46 52
 
47
-			g.By("Verifying that files are copied to the container")
48
-			result, err := oc.Run("rsh").Args(podName, "ls", "/tmp/image-streams").Output()
49
-			o.Expect(err).NotTo(o.HaveOccurred())
50
-			o.Expect(result).To(o.ContainSubstring("image-streams-centos7.json"))
53
+				g.By("Verifying that files are copied to the container")
54
+				result, err := oc.Run("rsh").Args(podName, "ls", "/tmp/image-streams").Output()
55
+				o.Expect(err).NotTo(o.HaveOccurred())
56
+				o.Expect(result).To(o.ContainSubstring("image-streams-centos7.json"))
51 57
 
52
-			g.By(fmt.Sprintf("calling oc rsync --use-tar --delete %s/ %s:/tmp/image-streams", sourcePath2, podName))
53
-			err = oc.Run("rsync").Args("--use-tar", "--delete", sourcePath2+"/", fmt.Sprintf("%s:/tmp/image-streams", podName)).Execute()
54
-			o.Expect(err).NotTo(o.HaveOccurred())
58
+				g.By(fmt.Sprintf("Calling oc rsync %s/ %s:/tmp/image-streams --strategy=%s --delete", sourcePath2, podName, strategy))
59
+				err = oc.Run("rsync").Args(
60
+					sourcePath2+"/",
61
+					fmt.Sprintf("%s:/tmp/image-streams", podName),
62
+					fmt.Sprintf("--strategy=%s", strategy),
63
+					"--delete").Execute()
64
+				o.Expect(err).NotTo(o.HaveOccurred())
55 65
 
56
-			g.By("Verifying that the expected files are in the container")
57
-			result, err = oc.Run("rsh").Args(podName, "ls", "/tmp/image-streams").Output()
58
-			o.Expect(err).NotTo(o.HaveOccurred())
59
-			o.Expect(result).To(o.ContainSubstring("application-template-stibuild.json"))
60
-			o.Expect(result).NotTo(o.ContainSubstring("image-streams-centos7.json"))
66
+				g.By("Verifying that the expected files are in the container")
67
+				result, err = oc.Run("rsh").Args(podName, "ls", "/tmp/image-streams").Output()
68
+				o.Expect(err).NotTo(o.HaveOccurred())
69
+				o.Expect(result).To(o.ContainSubstring("application-template-stibuild.json"))
70
+				o.Expect(result).NotTo(o.ContainSubstring("image-streams-centos7.json"))
61 71
 
62
-			g.By("Getting an error if copying to a destination directory where there is no write permission")
63
-			result, err = oc.Run("rsync").Args(sourcePath1, fmt.Sprintf("%s:/", podName)).Output()
64
-			o.Expect(err).To(o.HaveOccurred())
65
-			o.Expect(result).To(o.ContainSubstring("Permission denied"))
66
-		})
72
+				g.By("Creating a local temporary directory")
73
+				tempDir, err := ioutil.TempDir("", "rsync")
74
+				o.Expect(err).NotTo(o.HaveOccurred())
75
+
76
+				g.By(fmt.Sprintf("Copying files from container to local directory: oc rsync %s:/tmp/image-streams/ %s --strategy=%s", podName, tempDir, strategy))
77
+				err = oc.Run("rsync").Args(
78
+					fmt.Sprintf("%s:/tmp/image-streams/", podName),
79
+					tempDir,
80
+					fmt.Sprintf("--strategy=%s", strategy)).Execute()
81
+
82
+				g.By(fmt.Sprintf("Verifying that files were copied to the local directory"))
83
+				files, err := ioutil.ReadDir(tempDir)
84
+				o.Expect(err).NotTo(o.HaveOccurred())
85
+				found := false
86
+				for _, f := range files {
87
+					if strings.Contains(f.Name(), "application-template-stibuild.json") {
88
+						found = true
89
+						break
90
+					}
91
+				}
92
+				o.Expect(found).To(o.BeTrue())
93
+
94
+				g.By("Getting an error if copying to a destination directory where there is no write permission")
95
+				result, err = oc.Run("rsync").Args(
96
+					sourcePath1,
97
+					fmt.Sprintf("%s:/", podName),
98
+					fmt.Sprintf("--strategy=%s", strategy)).Output()
99
+				o.Expect(err).To(o.HaveOccurred())
100
+			}
101
+		}
102
+
103
+		for _, strategy := range strategies {
104
+			g.It(fmt.Sprintf("should copy files with the %s strategy", strategy), testRsyncFunc(strategy))
105
+		}
67 106
 	})
68 107
 })