package rsync import ( "errors" "fmt" "io" "sync" "time" "github.com/fsnotify/fsnotify" "github.com/golang/glog" "github.com/spf13/cobra" kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "github.com/openshift/origin/pkg/cmd/templates" "github.com/openshift/origin/pkg/cmd/util/clientcmd" "github.com/openshift/origin/pkg/util/fsnotification" ) const ( // RsyncRecommendedName is the recommended name for the rsync command RsyncRecommendedName = "rsync" noRsyncUnixWarning = "WARNING: rsync command not found in path. Please use your package manager to install it.\n" noRsyncWindowsWarning = "WARNING: rsync command not found in path. Download cwRsync for Windows and add it to your PATH.\n" ) var ( rsyncLong = templates.LongDesc(` Copy local files to or from a pod container This command will copy local files to or from a remote container. It only copies the changed files using the rsync command from your OS. To ensure optimum performance, install rsync locally. In UNIX systems, use your package manager. In Windows, install cwRsync from https://www.itefix.net/cwrsync. If no container is specified, the first container of the pod is used for the copy.`) rsyncExample = templates.Examples(` # Synchronize a local directory with a pod directory %[1]s ./local/dir/ POD:/remote/dir # Synchronize a pod directory with a local directory %[1]s POD:/remote/dir/ ./local/dir`) ) // copyStrategy type copyStrategy interface { Copy(source, destination *pathSpec, out, errOut io.Writer) error Validate() error String() string } // executor executes commands type executor interface { Execute(command []string, in io.Reader, out, err io.Writer) error } // forwarder forwards pod ports to the local machine type forwarder interface { ForwardPorts(ports []string, stopChan <-chan struct{}) error } // podChecker can check if pods are valid (exists, etc) type podChecker interface { CheckPod() error } // RsyncOptions holds the options to execute the sync command type RsyncOptions struct { Namespace string ContainerName string Source *pathSpec Destination *pathSpec Strategy copyStrategy StrategyName string Quiet bool Delete bool Watch bool RsyncInclude []string RsyncExclude []string RsyncProgress bool RsyncNoPerms bool Out io.Writer ErrOut io.Writer } // NewCmdRsync creates a new sync command func NewCmdRsync(name, parent string, f *clientcmd.Factory, out, errOut io.Writer) *cobra.Command { o := RsyncOptions{ Out: out, ErrOut: errOut, } cmd := &cobra.Command{ Use: fmt.Sprintf("%s SOURCE DESTINATION", name), Short: "Copy files between local filesystem and a pod", Long: rsyncLong, Example: fmt.Sprintf(rsyncExample, parent+" "+name), Run: func(c *cobra.Command, args []string) { kcmdutil.CheckErr(o.Complete(f, c, args)) kcmdutil.CheckErr(o.Validate()) kcmdutil.CheckErr(o.RunRsync()) }, } // NOTE: When adding new flags to the command, please update the rshExcludeFlags in copyrsync.go // if those flags should not be passed to the rsh command. cmd.Flags().StringVarP(&o.ContainerName, "container", "c", "", "Container within the pod") cmd.Flags().StringVar(&o.StrategyName, "strategy", "", "Specify which strategy to use for copy: rsync, rsync-daemon, or tar") // Flags for rsync options, Must match rsync flag names cmd.Flags().BoolVarP(&o.Quiet, "quiet", "q", false, "Suppress non-error messages") cmd.Flags().BoolVar(&o.Delete, "delete", false, "Delete files not present in source") cmd.Flags().StringSliceVar(&o.RsyncExclude, "exclude", nil, "rsync - exclude files matching specified pattern") cmd.Flags().StringSliceVar(&o.RsyncInclude, "include", nil, "rsync - include files matching specified pattern") cmd.Flags().BoolVar(&o.RsyncProgress, "progress", false, "rsync - show progress during transfer") cmd.Flags().BoolVar(&o.RsyncNoPerms, "no-perms", false, "rsync - do not transfer permissions") cmd.Flags().BoolVarP(&o.Watch, "watch", "w", false, "Watch directory for changes and resync automatically") return cmd } func warnNoRsync(out io.Writer) { if isWindows() { fmt.Fprintf(out, noRsyncWindowsWarning) return } fmt.Fprintf(out, noRsyncUnixWarning) } func (o *RsyncOptions) determineStrategy(f *clientcmd.Factory, cmd *cobra.Command, name string) (copyStrategy, error) { switch name { case "": // Default case, use an rsync strategy first and then fallback to Tar strategies := copyStrategies{} if hasLocalRsync() { if isWindows() { strategy, err := newRsyncDaemonStrategy(f, cmd, o) if err != nil { return nil, err } strategies = append(strategies, strategy) } else { strategy, err := newRsyncStrategy(f, cmd, o) if err != nil { return nil, err } strategies = append(strategies, strategy) } } else { warnNoRsync(o.ErrOut) } strategy, err := newTarStrategy(f, cmd, o) if err != nil { return nil, err } strategies = append(strategies, strategy) return strategies, nil case "rsync": return newRsyncStrategy(f, cmd, o) case "rsync-daemon": return newRsyncDaemonStrategy(f, cmd, o) case "tar": return newTarStrategy(f, cmd, o) default: return nil, fmt.Errorf("unknown strategy: %s", name) } } // Complete verifies command line arguments and loads data from the command environment func (o *RsyncOptions) Complete(f *clientcmd.Factory, cmd *cobra.Command, args []string) error { switch n := len(args); { case n == 0: cmd.Help() fallthrough case n < 2: return kcmdutil.UsageError(cmd, "SOURCE_DIR and POD:DESTINATION_DIR are required arguments") case n > 2: return kcmdutil.UsageError(cmd, "only SOURCE_DIR and POD:DESTINATION_DIR should be specified as arguments") } // Set main command arguments var err error o.Source, err = parsePathSpec(args[0]) if err != nil { return err } o.Destination, err = parsePathSpec(args[1]) if err != nil { return err } namespace, _, err := f.DefaultNamespace() if err != nil { return err } o.Namespace = namespace o.Strategy, err = o.determineStrategy(f, cmd, o.StrategyName) if err != nil { return err } return nil } // Validate checks that SyncOptions has all necessary fields func (o *RsyncOptions) Validate() error { if o.Out == nil || o.ErrOut == nil { return errors.New("output and error streams must be specified") } if o.Source == nil || o.Destination == nil { return errors.New("source and destination must be specified") } if err := o.Source.Validate(); err != nil { return err } if err := o.Destination.Validate(); err != nil { return err } // If source and destination are both local or both remote throw an error if o.Source.Local() == o.Destination.Local() { return errors.New("rsync is only valid between a local directory and a pod directory; " + "specify a pod directory as [PODNAME]:[DIR]") } if o.Destination.Local() && o.Watch { return errors.New("\"--watch\" can only be used with a local source directory") } if err := o.Strategy.Validate(); err != nil { return err } return nil } // RunRsync copies files from source to destination func (o *RsyncOptions) RunRsync() error { if err := o.Strategy.Copy(o.Source, o.Destination, o.Out, o.ErrOut); err != nil { return err } if !o.Watch { return nil } return o.WatchAndSync() } // WatchAndSync sets up a recursive filesystem watch on the sync path // and invokes rsync each time the path changes. func (o *RsyncOptions) WatchAndSync() error { // these variables must be accessed while holding the changeLock // mutex as they are shared between goroutines to communicate // sync state/events. var ( changeLock sync.Mutex dirty bool lastChange time.Time watchError error ) watcher, err := fsnotify.NewWatcher() if err != nil { return fmt.Errorf("error setting up filesystem watcher: %v", err) } defer watcher.Close() go func() { for { select { case event := <-watcher.Events: changeLock.Lock() glog.V(5).Infof("filesystem watch event: %s", event) lastChange = time.Now() dirty = true if event.Op&fsnotify.Remove == fsnotify.Remove { if e := watcher.Remove(event.Name); e != nil { glog.V(5).Infof("error removing watch for %s: %v", event.Name, e) } } else { if e := fsnotification.AddRecursiveWatch(watcher, event.Name); e != nil && watchError == nil { watchError = e } } changeLock.Unlock() case err := <-watcher.Errors: changeLock.Lock() watchError = fmt.Errorf("error watching filesystem for changes: %v", err) changeLock.Unlock() } } }() err = fsnotification.AddRecursiveWatch(watcher, o.Source.Path) if err != nil { return fmt.Errorf("error watching source path %s: %v", o.Source.Path, err) } delay := 2 * time.Second ticker := time.NewTicker(delay) defer ticker.Stop() for { changeLock.Lock() if watchError != nil { return watchError } // if a change happened more than 'delay' seconds ago, sync it now. // if a change happened less than 'delay' seconds ago, sleep for 'delay' seconds // and see if more changes happen, we don't want to sync when // the filesystem is in the middle of changing due to a massive // set of changes (such as a local build in progress). if dirty && time.Now().After(lastChange.Add(delay)) { glog.V(1).Info("Synchronizing filesystem changes...") err = o.Strategy.Copy(o.Source, o.Destination, o.Out, o.ErrOut) if err != nil { return err } glog.V(1).Info("Done.") dirty = false } changeLock.Unlock() <-ticker.C } } // PodName returns the name of the pod as specified in either the // the source or destination arguments func (o *RsyncOptions) PodName() string { if len(o.Source.PodName) > 0 { return o.Source.PodName } return o.Destination.PodName }