package factory import ( "fmt" "sort" "time" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" utilwait "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" osclient "github.com/openshift/origin/pkg/client" oscache "github.com/openshift/origin/pkg/client/cache" routeapi "github.com/openshift/origin/pkg/route/api" "github.com/openshift/origin/pkg/router" "github.com/openshift/origin/pkg/router/controller" ) // RouterControllerFactory initializes and manages the watches that drive a router // controller. It supports optional scoping on Namespace, Labels, and Fields of routes. // If Namespace is empty, it means "all namespaces". type RouterControllerFactory struct { KClient kcoreclient.EndpointsGetter OSClient osclient.RoutesNamespacer NodeClient kcoreclient.NodesGetter Namespaces controller.NamespaceLister ResyncInterval time.Duration Namespace string Labels labels.Selector Fields fields.Selector } // NewDefaultRouterControllerFactory initializes a default router controller factory. func NewDefaultRouterControllerFactory(oc osclient.RoutesNamespacer, kc kclientset.Interface) *RouterControllerFactory { return &RouterControllerFactory{ KClient: kc.Core(), OSClient: oc, NodeClient: kc.Core(), ResyncInterval: 10 * time.Minute, Namespace: kapi.NamespaceAll, Labels: labels.Everything(), Fields: fields.Everything(), } } // Create begins listing and watching against the API server for the desired route and endpoint // resources. It spawns child goroutines that cannot be terminated. func (factory *RouterControllerFactory) Create(plugin router.Plugin, watchNodes bool) *controller.RouterController { routeEventQueue := oscache.NewEventQueue(cache.MetaNamespaceKeyFunc) cache.NewReflector(&routeLW{ client: factory.OSClient, namespace: factory.Namespace, field: factory.Fields, label: factory.Labels, }, &routeapi.Route{}, routeEventQueue, factory.ResyncInterval).Run() endpointsEventQueue := oscache.NewEventQueue(cache.MetaNamespaceKeyFunc) cache.NewReflector(&endpointsLW{ client: factory.KClient, namespace: factory.Namespace, // we do not scope endpoints by labels or fields because the route labels != endpoints labels }, &kapi.Endpoints{}, endpointsEventQueue, factory.ResyncInterval).Run() nodeEventQueue := oscache.NewEventQueue(cache.MetaNamespaceKeyFunc) if watchNodes { cache.NewReflector(&nodeLW{ client: factory.NodeClient, field: fields.Everything(), label: labels.Everything(), }, &kapi.Node{}, nodeEventQueue, factory.ResyncInterval).Run() } return &controller.RouterController{ Plugin: plugin, NextEndpoints: func() (watch.EventType, *kapi.Endpoints, error) { eventType, obj, err := endpointsEventQueue.Pop() if err != nil { return watch.Error, nil, err } return eventType, obj.(*kapi.Endpoints), nil }, NextRoute: func() (watch.EventType, *routeapi.Route, error) { eventType, obj, err := routeEventQueue.Pop() if err != nil { return watch.Error, nil, err } return eventType, obj.(*routeapi.Route), nil }, NextNode: func() (watch.EventType, *kapi.Node, error) { eventType, obj, err := nodeEventQueue.Pop() if err != nil { return watch.Error, nil, err } return eventType, obj.(*kapi.Node), nil }, EndpointsListConsumed: func() bool { return endpointsEventQueue.ListConsumed() }, RoutesListConsumed: func() bool { return routeEventQueue.ListConsumed() }, Namespaces: factory.Namespaces, // check namespaces a bit more often than we resync events, so that we aren't always waiting // the maximum interval for new items to come into the list // TODO: trigger a reflector resync after every namespace sync? NamespaceSyncInterval: factory.ResyncInterval - 10*time.Second, NamespaceWaitInterval: 10 * time.Second, NamespaceRetries: 5, WatchNodes: watchNodes, } } // CreateNotifier begins listing and watching against the API server for the desired route and endpoint // resources. It spawns child goroutines that cannot be terminated. It is a more efficient store of a // route system. func (factory *RouterControllerFactory) CreateNotifier(changed func()) RoutesByHost { keyFn := cache.MetaNamespaceKeyFunc routeStore := cache.NewIndexer(keyFn, cache.Indexers{"host": hostIndexFunc}) routeEventQueue := oscache.NewEventQueueForStore(keyFn, routeStore) cache.NewReflector(&routeLW{ client: factory.OSClient, namespace: factory.Namespace, field: factory.Fields, label: factory.Labels, }, &routeapi.Route{}, routeEventQueue, factory.ResyncInterval).Run() endpointStore := cache.NewStore(keyFn) endpointsEventQueue := oscache.NewEventQueueForStore(keyFn, endpointStore) cache.NewReflector(&endpointsLW{ client: factory.KClient, namespace: factory.Namespace, // we do not scope endpoints by labels or fields because the route labels != endpoints labels }, &kapi.Endpoints{}, endpointsEventQueue, factory.ResyncInterval).Run() go utilwait.Until(func() { for { if _, _, err := routeEventQueue.Pop(); err != nil { return } changed() } }, time.Second, utilwait.NeverStop) go utilwait.Until(func() { for { if _, _, err := endpointsEventQueue.Pop(); err != nil { return } changed() } }, time.Second, utilwait.NeverStop) return &routesByHost{ routes: routeStore, endpoints: endpointStore, } } type RoutesByHost interface { Hosts() []string Route(host string) (*routeapi.Route, bool) Endpoints(namespace, name string) *kapi.Endpoints } type routesByHost struct { routes cache.Indexer endpoints cache.Store } func (r *routesByHost) Hosts() []string { return r.routes.ListIndexFuncValues("host") } func (r *routesByHost) Route(host string) (*routeapi.Route, bool) { arr, err := r.routes.ByIndex("host", host) if err != nil || len(arr) == 0 { return nil, false } return oldestRoute(arr), true } func (r *routesByHost) Endpoints(namespace, name string) *kapi.Endpoints { obj, ok, err := r.endpoints.GetByKey(fmt.Sprintf("%s/%s", namespace, name)) if !ok || err != nil { return &kapi.Endpoints{} } return obj.(*kapi.Endpoints) } // routeAge sorts routes from oldest to newest and is stable for all routes. type routeAge []routeapi.Route func (r routeAge) Len() int { return len(r) } func (r routeAge) Swap(i, j int) { r[i], r[j] = r[j], r[i] } func (r routeAge) Less(i, j int) bool { return routeapi.RouteLessThan(&r[i], &r[j]) } func oldestRoute(routes []interface{}) *routeapi.Route { var oldest *routeapi.Route for i := range routes { route := routes[i].(*routeapi.Route) if oldest == nil || route.CreationTimestamp.Before(oldest.CreationTimestamp) { oldest = route } } return oldest } func hostIndexFunc(obj interface{}) ([]string, error) { route := obj.(*routeapi.Route) hosts := []string{ fmt.Sprintf("%s-%s%s", route.Name, route.Namespace, ".generated.local"), } if len(route.Spec.Host) > 0 { hosts = append(hosts, route.Spec.Host) } return hosts, nil } // routeLW is a ListWatcher for routes that can be filtered to a label, field, or // namespace. type routeLW struct { client osclient.RoutesNamespacer label labels.Selector field fields.Selector namespace string } func (lw *routeLW) List(options kapi.ListOptions) (runtime.Object, error) { opts := kapi.ListOptions{ LabelSelector: lw.label, FieldSelector: lw.field, } routes, err := lw.client.Routes(lw.namespace).List(opts) if err != nil { return nil, err } // return routes in order of age to avoid rejections during resync sort.Sort(routeAge(routes.Items)) return routes, nil } func (lw *routeLW) Watch(options kapi.ListOptions) (watch.Interface, error) { opts := kapi.ListOptions{ LabelSelector: lw.label, FieldSelector: lw.field, ResourceVersion: options.ResourceVersion, } return lw.client.Routes(lw.namespace).Watch(opts) } // endpointsLW is a list watcher for routes. type endpointsLW struct { client kcoreclient.EndpointsGetter label labels.Selector field fields.Selector namespace string } func (lw *endpointsLW) List(options kapi.ListOptions) (runtime.Object, error) { return lw.client.Endpoints(lw.namespace).List(options) } func (lw *endpointsLW) Watch(options kapi.ListOptions) (watch.Interface, error) { opts := kapi.ListOptions{ LabelSelector: lw.label, FieldSelector: lw.field, ResourceVersion: options.ResourceVersion, } return lw.client.Endpoints(lw.namespace).Watch(opts) } // nodeLW is a list watcher for nodes. type nodeLW struct { client kcoreclient.NodesGetter label labels.Selector field fields.Selector } func (lw *nodeLW) List(options kapi.ListOptions) (runtime.Object, error) { return lw.client.Nodes().List(options) } func (lw *nodeLW) Watch(options kapi.ListOptions) (watch.Interface, error) { opts := kapi.ListOptions{ LabelSelector: lw.label, FieldSelector: lw.field, ResourceVersion: options.ResourceVersion, } return lw.client.Nodes().Watch(opts) }