package migrate import ( "fmt" "io" "strings" "github.com/golang/glog" "github.com/spf13/cobra" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/util/sets" cmdutil "github.com/openshift/origin/pkg/cmd/util" "github.com/openshift/origin/pkg/cmd/util/clientcmd" ) // MigrateVisitFunc is invoked for each returned object, and may return a // Reporter that can contain info to be used by save. type MigrateVisitFunc func(info *resource.Info) (Reporter, error) // MigrateActionFunc is expected to persist the altered info.Object. The // Reporter returned from Visit is passed to this function and may be used // to carry additional information about what to save on an object. type MigrateActionFunc func(info *resource.Info, reporter Reporter) error // MigrateFilterFunc can return false to skip an item, or an error. type MigrateFilterFunc func(info *resource.Info) (bool, error) // Reporter indicates whether a resource requires migration. type Reporter interface { // Changed returns true if the resource requires migration. Changed() bool } // ResourceOptions assists in performing migrations on any object that // can be retrieved via the API. type ResourceOptions struct { In io.Reader Out, ErrOut io.Writer AllNamespaces bool Include []string Filenames []string Confirm bool Output string FromKey string ToKey string OverlappingResources []sets.String DefaultExcludes []unversioned.GroupResource Builder *resource.Builder SaveFn MigrateActionFunc PrintFn MigrateActionFunc FilterFn MigrateFilterFunc DryRun bool Summarize bool } func (o *ResourceOptions) Bind(c *cobra.Command) { c.Flags().StringVarP(&o.Output, "output", "o", o.Output, "Output the modified objects instead of saving them, valid values are 'yaml' or 'json'") c.Flags().StringSliceVar(&o.Include, "include", o.Include, "Resource types to migrate. Passing --filename will override this flag.") c.Flags().BoolVar(&o.AllNamespaces, "all-namespaces", true, "Migrate objects in all namespaces. Defaults to true.") c.Flags().BoolVar(&o.Confirm, "confirm", false, "If true, all requested objects will be migrated. Defaults to false.") c.Flags().StringVar(&o.FromKey, "from-key", o.FromKey, "If specified, only migrate items with a key (namespace/name or name) greater than or equal to this value") c.Flags().StringVar(&o.ToKey, "to-key", o.ToKey, "If specified, only migrate items with a key (namespace/name or name) less than this value") usage := "Filename, directory, or URL to docker-compose.yml file to use" kubectl.AddJsonFilenameFlag(c, &o.Filenames, usage) c.MarkFlagRequired("filename") } func (o *ResourceOptions) Complete(f *clientcmd.Factory, c *cobra.Command) error { switch { case len(o.Output) > 0: printer, _, err := kubectl.GetPrinter(o.Output, "", false) if err != nil { return err } first := true o.PrintFn = func(info *resource.Info, _ Reporter) error { obj, err := info.Mapping.ConvertToVersion(info.Object, info.Mapping.GroupVersionKind.GroupVersion()) if err != nil { return err } // TODO: PrintObj is not correct for YAML - it should inject document separators itself if o.Output == "yaml" && !first { fmt.Fprintln(o.Out, "---") } first = false printer.PrintObj(obj, o.Out) return nil } o.DryRun = true case o.Confirm: o.DryRun = false default: o.DryRun = true } namespace, explicitNamespace, err := f.Factory.DefaultNamespace() if err != nil { return err } allNamespaces := !explicitNamespace && o.AllNamespaces if len(o.FromKey) > 0 || len(o.ToKey) > 0 { o.FilterFn = func(info *resource.Info) (bool, error) { var key string if info.Mapping.Scope.Name() == meta.RESTScopeNameNamespace { key = info.Namespace + "/" + info.Name } else { if !allNamespaces { return false, nil } key = info.Name } if len(o.FromKey) > 0 && o.FromKey > key { return false, nil } if len(o.ToKey) > 0 && o.ToKey <= key { return false, nil } return true, nil } } oclient, _, _, err := f.Clients() if err != nil { return err } mapper, _ := f.Object(false) resourceNames := sets.NewString() for i, s := range o.Include { if resourceNames.Has(s) { continue } if s != "*" { resourceNames.Insert(s) break } all, err := clientcmd.FindAllCanonicalResources(oclient.Discovery(), mapper) if err != nil { return fmt.Errorf("could not calculate the list of available resources: %v", err) } exclude := sets.NewString() for _, gr := range o.DefaultExcludes { exclude.Insert(gr.String()) } candidate := sets.NewString() for _, gr := range all { // if the user specifies a resource that matches resource or resource+group, skip it if resourceNames.Has(gr.Resource) || resourceNames.Has(gr.String()) || exclude.Has(gr.String()) { continue } candidate.Insert(gr.String()) } candidate.Delete(exclude.List()...) include := candidate if len(o.OverlappingResources) > 0 { include = sets.NewString() for _, k := range candidate.List() { reduce := k for _, others := range o.OverlappingResources { if !others.Has(k) { continue } reduce = others.List()[0] break } include.Insert(reduce) } } glog.V(4).Infof("Found the following resources from the server: %v", include.List()) last := o.Include[i+1:] o.Include = append([]string{}, o.Include[:i]...) o.Include = append(o.Include, include.List()...) o.Include = append(o.Include, last...) break } o.Builder = f.Factory.NewBuilder(false). AllNamespaces(allNamespaces). FilenameParam(false, false, o.Filenames...). ContinueOnError(). DefaultNamespace(). RequireObject(true). SelectAllParam(true). Flatten() if !allNamespaces { o.Builder.NamespaceParam(namespace) } if len(o.Filenames) == 0 { o.Builder.ResourceTypes(o.Include...) } return nil } func (o *ResourceOptions) Validate() error { if len(o.Filenames) == 0 && len(o.Include) == 0 { return fmt.Errorf("you must specify at least one resource or resource type to migrate with --include or --filenames") } return nil } func (o *ResourceOptions) Visitor() *ResourceVisitor { return &ResourceVisitor{ Out: o.Out, Builder: o.Builder, SaveFn: o.SaveFn, PrintFn: o.PrintFn, FilterFn: o.FilterFn, DryRun: o.DryRun, } } type ResourceVisitor struct { Out io.Writer Builder *resource.Builder SaveFn MigrateActionFunc PrintFn MigrateActionFunc FilterFn MigrateFilterFunc DryRun bool } func (o *ResourceVisitor) Visit(fn MigrateVisitFunc) error { dryRun := o.DryRun summarize := true actionFn := o.SaveFn switch { case o.PrintFn != nil: actionFn = o.PrintFn dryRun = true summarize = false case dryRun: actionFn = nil } out := o.Out result := o.Builder.Do() if result.Err() != nil { return result.Err() } // Ignore any resource that does not support GET result.IgnoreErrors(errors.IsMethodNotSupported, errors.IsNotFound) t := migrateTracker{ out: out, migrateFn: fn, actionFn: actionFn, dryRun: dryRun, resourcesWithErrors: sets.NewString(), } err := result.Visit(func(info *resource.Info, err error) error { if err == nil && o.FilterFn != nil { var ok bool t.found++ if ok, err = o.FilterFn(info); err == nil && !ok { t.ignored++ if glog.V(2) { t.report("ignored:", info, nil) } return nil } } if err != nil { t.resourcesWithErrors.Insert(info.Mapping.Resource) t.errors++ t.report("error:", info, err) return nil } t.attempt(info, 10) return nil }) if summarize { if dryRun { fmt.Fprintf(out, "summary (dry run): total=%d errors=%d ignored=%d unchanged=%d migrated=%d\n", t.found, t.errors, t.ignored, t.unchanged, t.found-t.errors-t.unchanged-t.ignored) } else { fmt.Fprintf(out, "summary: total=%d errors=%d ignored=%d unchanged=%d migrated=%d\n", t.found, t.errors, t.ignored, t.unchanged, t.found-t.errors-t.unchanged-t.ignored) } } if t.resourcesWithErrors.Len() > 0 { fmt.Fprintf(out, "info: to rerun only failing resources, add --include=%s\n", strings.Join(t.resourcesWithErrors.List(), ",")) } switch { case err != nil: fmt.Fprintf(out, "error: exited without processing all resources: %v\n", err) err = cmdutil.ErrExit case t.errors > 0: fmt.Fprintf(out, "error: %d resources failed to migrate\n", t.errors) err = cmdutil.ErrExit } return err } // ErrUnchanged may be returned by MigrateActionFunc to indicate that the object // did not need migration (but that could only be determined when the action was taken). var ErrUnchanged = fmt.Errorf("migration was not necessary") // ErrRecalculate may be returned by MigrateActionFunc to indicate that the object // has changed and needs to have its information recalculated prior to being saved. // Use when a resource requries multiple API operations to persist (for instance, // both status and spec must be changed). var ErrRecalculate = fmt.Errorf("recalculate migration") // ErrRetriable is a wrapper for an error that a migrator may use to indicate the // specific error can be retried. type ErrRetriable struct { error } func (ErrRetriable) Temporary() bool { return true } // ErrNotRetriable is a wrapper for an error that a migrator may use to indicate the // specific error cannot be retried. type ErrNotRetriable struct { error } func (ErrNotRetriable) Temporary() bool { return false } type temporary interface { // Temporary should return true if this is a temporary error Temporary() bool } // attemptResult is an enumeration of the result of a migration type attemptResult int const ( attemptResultSuccess attemptResult = iota attemptResultError attemptResultUnchanged attemptResultIgnore ) // migrateTracker abstracts transforming and saving resources and can be used to keep track // of how many total resources have been updated. type migrateTracker struct { out io.Writer migrateFn MigrateVisitFunc actionFn MigrateActionFunc dryRun bool found, ignored, unchanged, errors int retries int resourcesWithErrors sets.String } // report prints a message to out that includes info about the current resource. If the optional error is // provided it will be written as well. func (t *migrateTracker) report(prefix string, info *resource.Info, err error) { ns := info.Namespace if len(ns) > 0 { ns = "-n " + ns } if err != nil { fmt.Fprintf(t.out, "%-10s %s/%s %s: %v\n", prefix, info.Mapping.Resource, info.Name, ns, err) } else { fmt.Fprintf(t.out, "%-10s %s/%s %s\n", prefix, info.Mapping.Resource, info.Name, ns) } } // attempt will try to invoke the migrateFn and saveFn on info, retrying any recalculation requests up // to retries times. func (t *migrateTracker) attempt(info *resource.Info, retries int) { t.found++ t.retries = retries result, err := t.try(info) switch { case err != nil: t.resourcesWithErrors.Insert(info.Mapping.Resource) t.errors++ t.report("error:", info, err) case result == attemptResultIgnore: t.ignored++ if glog.V(2) { t.report("ignored:", info, nil) } case result == attemptResultUnchanged: t.unchanged++ if glog.V(2) { t.report("unchanged:", info, nil) } case result == attemptResultSuccess: if glog.V(1) { if t.dryRun { t.report("migrated (dry run):", info, nil) } else { t.report("migrated:", info, nil) } } } } // try will mutate the info and attempt to save, recalculating if there are any retries left. // The result of the attempt or an error will be returned. func (t *migrateTracker) try(info *resource.Info) (attemptResult, error) { reporter, err := t.migrateFn(info) if err != nil { return attemptResultError, err } if reporter == nil { return attemptResultIgnore, nil } if !reporter.Changed() { return attemptResultUnchanged, nil } if t.actionFn != nil { if err := t.actionFn(info, reporter); err != nil { if err == ErrUnchanged { return attemptResultUnchanged, nil } if canRetry(err) { if t.retries > 0 { if glog.V(1) && err != ErrRecalculate { t.report("retry:", info, err) } result, err := t.try(info) switch result { case attemptResultUnchanged, attemptResultIgnore: result = attemptResultSuccess } return result, err } } return attemptResultError, err } } return attemptResultSuccess, nil } // canRetry returns true if the provided error indicates a retry is possible. func canRetry(err error) bool { if temp, ok := err.(temporary); ok && temp.Temporary() { return true } return err == ErrRecalculate } // DefaultRetriable adds retry information to the provided error, and will refresh the // info if the client info is stale. If the refresh fails the error is made fatal. // All other errors are left in their natural state - they will not be retried unless // they define a Temporary() method that returns true. func DefaultRetriable(info *resource.Info, err error) error { if err == nil { return nil } switch { case errors.IsMethodNotSupported(err): return ErrNotRetriable{err} case errors.IsConflict(err): if refreshErr := info.Get(); refreshErr != nil { return ErrNotRetriable{err} } return ErrRetriable{err} case errors.IsServerTimeout(err): return ErrRetriable{err} } return err }