package dns import ( "fmt" "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/client/cache" kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/watch" ) // ServiceAccessor is the interface used by the ServiceResolver to access // services. type ServiceAccessor interface { kcoreclient.ServicesGetter ServiceByClusterIP(ip string) (*api.Service, error) } // cachedServiceAccessor provides a cache of services that can answer queries // about service lookups efficiently. type cachedServiceAccessor struct { store cache.Indexer } // cachedServiceAccessor implements ServiceAccessor var _ ServiceAccessor = &cachedServiceAccessor{} func NewCachedServiceAccessorAndStore() (ServiceAccessor, cache.Store) { store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, map[string]cache.IndexFunc{ "clusterIP": indexServiceByClusterIP, // for reverse lookups "namespace": cache.MetaNamespaceIndexFunc, }) return &cachedServiceAccessor{store: store}, store } // NewCachedServiceAccessor returns a service accessor that can answer queries about services. // It uses a backing cache to make ClusterIP lookups efficient. func NewCachedServiceAccessor(client cache.Getter, stopCh <-chan struct{}) ServiceAccessor { accessor, store := NewCachedServiceAccessorAndStore() lw := cache.NewListWatchFromClient(client, "services", api.NamespaceAll, fields.Everything()) reflector := cache.NewReflector(lw, &api.Service{}, store, 30*time.Minute) if stopCh != nil { reflector.RunUntil(stopCh) } else { reflector.Run() } return accessor } // ServiceByClusterIP returns the first service that matches the provided clusterIP value. // errors.IsNotFound(err) will be true if no such service exists. func (a *cachedServiceAccessor) ServiceByClusterIP(ip string) (*api.Service, error) { items, err := a.store.Index("clusterIP", &api.Service{Spec: api.ServiceSpec{ClusterIP: ip}}) if err != nil { return nil, err } if len(items) == 0 { return nil, errors.NewNotFound(api.Resource("service"), "clusterIP="+ip) } return items[0].(*api.Service), nil } // indexServiceByClusterIP creates an index between a clusterIP and the service that // uses it. func indexServiceByClusterIP(obj interface{}) ([]string, error) { return []string{obj.(*api.Service).Spec.ClusterIP}, nil } func (a *cachedServiceAccessor) Services(namespace string) kcoreclient.ServiceInterface { return cachedServiceNamespacer{a, namespace} } // TODO: needs to be unified with Registry interfaces once that work is done. type cachedServiceNamespacer struct { accessor *cachedServiceAccessor namespace string } var _ kcoreclient.ServiceInterface = cachedServiceNamespacer{} func (a cachedServiceNamespacer) Get(name string) (*api.Service, error) { item, ok, err := a.accessor.store.Get(&api.Service{ObjectMeta: api.ObjectMeta{Namespace: a.namespace, Name: name}}) if err != nil { return nil, err } if !ok { return nil, errors.NewNotFound(api.Resource("service"), name) } return item.(*api.Service), nil } func (a cachedServiceNamespacer) List(options api.ListOptions) (*api.ServiceList, error) { if !options.LabelSelector.Empty() { return nil, fmt.Errorf("label selection on the cache is not currently implemented") } items, err := a.accessor.store.Index("namespace", &api.Service{ObjectMeta: api.ObjectMeta{Namespace: a.namespace}}) if err != nil { return nil, err } services := make([]api.Service, 0, len(items)) for i := range items { services = append(services, *items[i].(*api.Service)) } return &api.ServiceList{ // TODO: set ResourceVersion so that we can make watch work. Items: services, }, nil } func (a cachedServiceNamespacer) Create(srv *api.Service) (*api.Service, error) { return nil, fmt.Errorf("not implemented") } func (a cachedServiceNamespacer) Update(srv *api.Service) (*api.Service, error) { return nil, fmt.Errorf("not implemented") } func (a cachedServiceNamespacer) UpdateStatus(srv *api.Service) (*api.Service, error) { return nil, fmt.Errorf("not implemented") } func (a cachedServiceNamespacer) Delete(name string, options *api.DeleteOptions) error { return fmt.Errorf("not implemented") } func (a cachedServiceNamespacer) DeleteCollection(options *api.DeleteOptions, listOptions api.ListOptions) error { return fmt.Errorf("not implemented") } func (a cachedServiceNamespacer) Watch(options api.ListOptions) (watch.Interface, error) { return nil, fmt.Errorf("not implemented") } func (a cachedServiceNamespacer) Patch(name string, pt api.PatchType, data []byte, subresources ...string) (*api.Service, error) { return nil, fmt.Errorf("not implemented") } func (a cachedServiceNamespacer) ProxyGet(scheme, name, port, path string, params map[string]string) restclient.ResponseWrapper { return nil } // cachedEndpointsAccessor provides a cache of services that can answer queries // about service lookups efficiently. type cachedEndpointsAccessor struct { store cache.Store } func NewCachedEndpointsAccessorAndStore() (kcoreclient.EndpointsGetter, cache.Store) { store := cache.NewStore(cache.MetaNamespaceKeyFunc) return &cachedEndpointsAccessor{store: store}, store } func (a *cachedEndpointsAccessor) Endpoints(namespace string) kcoreclient.EndpointsInterface { return cachedEndpointsNamespacer{accessor: a, namespace: namespace} } // TODO: needs to be unified with Registry interfaces once that work is done. type cachedEndpointsNamespacer struct { accessor *cachedEndpointsAccessor namespace string } var _ kcoreclient.EndpointsInterface = cachedEndpointsNamespacer{} func (a cachedEndpointsNamespacer) Get(name string) (*api.Endpoints, error) { item, ok, err := a.accessor.store.Get(&api.Endpoints{ObjectMeta: api.ObjectMeta{Namespace: a.namespace, Name: name}}) if err != nil { return nil, err } if !ok { return nil, errors.NewNotFound(api.Resource("endpoints"), name) } return item.(*api.Endpoints), nil } func (a cachedEndpointsNamespacer) List(options api.ListOptions) (*api.EndpointsList, error) { return nil, fmt.Errorf("not implemented") } func (a cachedEndpointsNamespacer) Create(srv *api.Endpoints) (*api.Endpoints, error) { return nil, fmt.Errorf("not implemented") } func (a cachedEndpointsNamespacer) Update(srv *api.Endpoints) (*api.Endpoints, error) { return nil, fmt.Errorf("not implemented") } func (a cachedEndpointsNamespacer) Delete(name string, options *api.DeleteOptions) error { return fmt.Errorf("not implemented") } func (a cachedEndpointsNamespacer) DeleteCollection(options *api.DeleteOptions, listOptions api.ListOptions) error { return fmt.Errorf("not implemented") } func (a cachedEndpointsNamespacer) Watch(options api.ListOptions) (watch.Interface, error) { return nil, fmt.Errorf("not implemented") } func (a cachedEndpointsNamespacer) Patch(name string, pt api.PatchType, data []byte, subresources ...string) (*api.Endpoints, error) { return nil, fmt.Errorf("not implemented") }