package cache
import (
"errors"
"fmt"
"time"
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
authorizationapi "github.com/openshift/origin/pkg/authorization/api"
policyregistry "github.com/openshift/origin/pkg/authorization/registry/policy"
bindingregistry "github.com/openshift/origin/pkg/authorization/registry/policybinding"
)
// PolicyCache maintains a cache of PolicyRules
type PolicyCache struct {
policyBindingIndexer cache.Indexer
policyIndexer cache.Indexer
bindingRegistry bindingregistry.Registry
policyRegistry policyregistry.Registry
keyFunc cache.KeyFunc
}
// TODO: Eliminate listWatch when this merges upstream: https://github.com/GoogleCloudPlatform/kubernetes/pull/4453
type listFunc func() (runtime.Object, error)
type watchFunc func(resourceVersion string) (watch.Interface, error)
type listWatch struct {
listFunc listFunc
watchFunc watchFunc
}
func (lw *listWatch) List() (runtime.Object, error) {
return lw.listFunc()
}
func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) {
return lw.watchFunc(resourceVersion)
}
// NewPolicyCache creates a new PolicyCache. You cannot use a normal client, because you don't want policy guarding the policy from the authorizer
func NewPolicyCache(bindingRegistry bindingregistry.Registry, policyRegistry policyregistry.Registry) *PolicyCache {
result := &PolicyCache{
policyIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}),
policyBindingIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}),
keyFunc: cache.MetaNamespaceKeyFunc,
bindingRegistry: bindingRegistry,
policyRegistry: policyRegistry,
}
return result
}
// Run begins watching and synchronizing the cache
func (c *PolicyCache) Run() {
policyBindingReflector, policyReflector := c.configureReflectors()
policyBindingReflector.Run()
policyReflector.Run()
}
// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
func (c *PolicyCache) RunUntil(bindingStopCh <-chan struct{}, policyStopCh <-chan struct{}) {
policyBindingReflector, policyReflector := c.configureReflectors()
policyBindingReflector.RunUntil(bindingStopCh)
policyReflector.RunUntil(policyStopCh)
}
func (c *PolicyCache) configureReflectors() (*cache.Reflector, *cache.Reflector) {
ctx := kapi.WithNamespace(kapi.NewContext(), kapi.NamespaceAll)
policyBindingReflector := cache.NewReflector(
&listWatch{
listFunc: func() (runtime.Object, error) {
return c.bindingRegistry.ListPolicyBindings(ctx, labels.Everything(), fields.Everything())
},
watchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.bindingRegistry.WatchPolicyBindings(ctx, labels.Everything(), fields.Everything(), resourceVersion)
},
},
&authorizationapi.PolicyBinding{},
c.policyBindingIndexer,
2*time.Minute,
)
policyReflector := cache.NewReflector(
&listWatch{
listFunc: func() (runtime.Object, error) {
return c.policyRegistry.ListPolicies(ctx, labels.Everything(), fields.Everything())
},
watchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.policyRegistry.WatchPolicies(ctx, labels.Everything(), fields.Everything(), resourceVersion)
},
},
&authorizationapi.Policy{},
c.policyIndexer,
2*time.Minute,
)
return policyBindingReflector, policyReflector
}
// GetPolicy retrieves a specific policy. It conforms to rulevalidation.PolicyGetter.
func (c *PolicyCache) GetPolicy(ctx kapi.Context, name string) (*authorizationapi.Policy, error) {
namespace, exists := kapi.NamespaceFrom(ctx)
if !exists {
return nil, errors.New("no namespace found")
}
keyObj := &authorizationapi.Policy{ObjectMeta: kapi.ObjectMeta{Namespace: namespace, Name: name}}
key, _ := c.keyFunc(keyObj)
policy, exists, err := c.policyIndexer.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("%v not found", key)
}
return policy.(*authorizationapi.Policy), nil
}
// ListPolicyBindings obtains list of policyBindings that match a selector. It conforms to rulevalidation.BindingLister
func (c *PolicyCache) ListPolicyBindings(ctx kapi.Context, label labels.Selector, field fields.Selector) (*authorizationapi.PolicyBindingList, error) {
namespace, exists := kapi.NamespaceFrom(ctx)
if !exists {
return nil, errors.New("no namespace found")
}
bindings, err := c.policyBindingIndexer.Index("namespace", &authorizationapi.PolicyBinding{ObjectMeta: kapi.ObjectMeta{Namespace: namespace}})
if err != nil {
return nil, err
}
ret := &authorizationapi.PolicyBindingList{
Items: make([]authorizationapi.PolicyBinding, 0, len(bindings)),
}
for i := range bindings {
ret.Items = append(ret.Items, *bindings[i].(*authorizationapi.PolicyBinding))
}
return ret, nil
}