Browse code

create policy cache

deads2k authored on 2015/02/24 00:08:53
Showing 6 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,149 @@
0
+package cache
1
+
2
+import (
3
+	"errors"
4
+	"fmt"
5
+
6
+	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
7
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
8
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
9
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
10
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
11
+
12
+	authorizationapi "github.com/openshift/origin/pkg/authorization/api"
13
+	policyregistry "github.com/openshift/origin/pkg/authorization/registry/policy"
14
+	bindingregistry "github.com/openshift/origin/pkg/authorization/registry/policybinding"
15
+)
16
+
17
+// PolicyCache maintains a cache of PolicyRules
18
+type PolicyCache struct {
19
+	policyBindingIndexer cache.Indexer
20
+	policyIndexer        cache.Indexer
21
+
22
+	bindingRegistry bindingregistry.Registry
23
+	policyRegistry  policyregistry.Registry
24
+
25
+	keyFunc cache.KeyFunc
26
+}
27
+
28
+// TODO: Eliminate listWatch when this merges upstream: https://github.com/GoogleCloudPlatform/kubernetes/pull/4453
29
+type listFunc func() (runtime.Object, error)
30
+type watchFunc func(resourceVersion string) (watch.Interface, error)
31
+type listWatch struct {
32
+	listFunc  listFunc
33
+	watchFunc watchFunc
34
+}
35
+
36
+func (lw *listWatch) List() (runtime.Object, error) {
37
+	return lw.listFunc()
38
+}
39
+
40
+func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) {
41
+	return lw.watchFunc(resourceVersion)
42
+}
43
+
44
+// NewPolicyCache creates a new PolicyCache.  You cannot use a normal client, because you don't want policy guarding the policy from the authorizer
45
+func NewPolicyCache(bindingRegistry bindingregistry.Registry, policyRegistry policyregistry.Registry) *PolicyCache {
46
+	result := &PolicyCache{
47
+		policyIndexer:        cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}),
48
+		policyBindingIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}),
49
+
50
+		keyFunc: cache.MetaNamespaceKeyFunc,
51
+
52
+		bindingRegistry: bindingRegistry,
53
+		policyRegistry:  policyRegistry,
54
+	}
55
+	return result
56
+}
57
+
58
+// Run begins watching and synchronizing the cache
59
+func (c *PolicyCache) Run() {
60
+	policyBindingReflector, policyReflector := c.configureReflectors()
61
+
62
+	policyBindingReflector.Run()
63
+	policyReflector.Run()
64
+}
65
+
66
+// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
67
+// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
68
+func (c *PolicyCache) RunUntil(bindingStopCh <-chan struct{}, policyStopCh <-chan struct{}) {
69
+	policyBindingReflector, policyReflector := c.configureReflectors()
70
+
71
+	policyBindingReflector.RunUntil(bindingStopCh)
72
+	policyReflector.RunUntil(policyStopCh)
73
+}
74
+
75
+func (c *PolicyCache) configureReflectors() (*cache.Reflector, *cache.Reflector) {
76
+	ctx := kapi.WithNamespace(kapi.NewContext(), kapi.NamespaceAll)
77
+
78
+	policyBindingReflector := cache.NewReflector(
79
+		&listWatch{
80
+			listFunc: func() (runtime.Object, error) {
81
+				return c.bindingRegistry.ListPolicyBindings(ctx, labels.Everything(), labels.Everything())
82
+			},
83
+			watchFunc: func(resourceVersion string) (watch.Interface, error) {
84
+				return c.bindingRegistry.WatchPolicyBindings(ctx, labels.Everything(), labels.Everything(), resourceVersion)
85
+			},
86
+		},
87
+		&authorizationapi.PolicyBinding{},
88
+		c.policyBindingIndexer,
89
+	)
90
+
91
+	policyReflector := cache.NewReflector(
92
+		&listWatch{
93
+			listFunc: func() (runtime.Object, error) {
94
+				return c.policyRegistry.ListPolicies(ctx, labels.Everything(), labels.Everything())
95
+			},
96
+			watchFunc: func(resourceVersion string) (watch.Interface, error) {
97
+				return c.policyRegistry.WatchPolicies(ctx, labels.Everything(), labels.Everything(), resourceVersion)
98
+			},
99
+		},
100
+		&authorizationapi.Policy{},
101
+		c.policyIndexer,
102
+	)
103
+
104
+	return policyBindingReflector, policyReflector
105
+}
106
+
107
+// GetPolicy retrieves a specific policy.  It conforms to rulevalidation.PolicyGetter.
108
+func (c *PolicyCache) GetPolicy(ctx kapi.Context, name string) (*authorizationapi.Policy, error) {
109
+	namespace, exists := kapi.NamespaceFrom(ctx)
110
+	if !exists {
111
+		return nil, errors.New("no namespace found")
112
+	}
113
+
114
+	keyObj := &authorizationapi.Policy{ObjectMeta: kapi.ObjectMeta{Namespace: namespace, Name: name}}
115
+	key, _ := c.keyFunc(keyObj)
116
+
117
+	policy, exists, err := c.policyIndexer.GetByKey(key)
118
+	if err != nil {
119
+		return nil, err
120
+	}
121
+	if !exists {
122
+		return nil, fmt.Errorf("%v not found", key)
123
+	}
124
+
125
+	return policy.(*authorizationapi.Policy), nil
126
+}
127
+
128
+// ListPolicyBindings obtains list of policyBindings that match a selector.  It conforms to rulevalidation.BindingLister
129
+func (c *PolicyCache) ListPolicyBindings(ctx kapi.Context, labels, fields labels.Selector) (*authorizationapi.PolicyBindingList, error) {
130
+	namespace, exists := kapi.NamespaceFrom(ctx)
131
+	if !exists {
132
+		return nil, errors.New("no namespace found")
133
+	}
134
+
135
+	bindings, err := c.policyBindingIndexer.Index("namespace", &authorizationapi.PolicyBinding{ObjectMeta: kapi.ObjectMeta{Namespace: namespace}})
136
+	if err != nil {
137
+		return nil, err
138
+	}
139
+
140
+	ret := &authorizationapi.PolicyBindingList{
141
+		Items: make([]authorizationapi.PolicyBinding, 0, len(bindings)),
142
+	}
143
+	for i := range bindings {
144
+		ret.Items = append(ret.Items, *bindings[i].(*authorizationapi.PolicyBinding))
145
+	}
146
+
147
+	return ret, nil
148
+}
0 149
new file mode 100644
... ...
@@ -0,0 +1,95 @@
0
+package cache
1
+
2
+import (
3
+	"testing"
4
+	"time"
5
+
6
+	kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
7
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
8
+	"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
9
+
10
+	authorizationapi "github.com/openshift/origin/pkg/authorization/api"
11
+	testregistry "github.com/openshift/origin/pkg/authorization/registry/test"
12
+)
13
+
14
+func TestPolicyGet(t *testing.T) {
15
+	policyStop := make(chan struct{})
16
+	bindingStop := make(chan struct{})
17
+	defer close(policyStop)
18
+	defer close(bindingStop)
19
+
20
+	policyRegistry := testregistry.NewPolicyRegistry(testPolicies(), nil)
21
+	bindingRegistry := testregistry.NewPolicyBindingRegistry(testBindings(), nil)
22
+
23
+	policyCache := NewPolicyCache(bindingRegistry, policyRegistry)
24
+	policyCache.RunUntil(bindingStop, policyStop)
25
+
26
+	testStop := make(chan struct{})
27
+
28
+	util.Until(func() {
29
+		ctx := kapi.WithNamespace(kapi.NewContext(), "mallet")
30
+		policy, policyErr := policyCache.GetPolicy(ctx, authorizationapi.PolicyName)
31
+
32
+		bindings, bindingErr := policyCache.ListPolicyBindings(ctx, labels.Everything(), labels.Everything())
33
+		if (policyErr == nil) && (bindingErr == nil) && (policy != nil) && (len(bindings.Items) == 1) {
34
+			close(testStop)
35
+		}
36
+
37
+	}, 1*time.Millisecond, testStop)
38
+}
39
+
40
+func testPolicies() []authorizationapi.Policy {
41
+	return []authorizationapi.Policy{
42
+		{
43
+			ObjectMeta: kapi.ObjectMeta{
44
+				Name:      authorizationapi.PolicyName,
45
+				Namespace: "mallet",
46
+			},
47
+			Roles: map[string]authorizationapi.Role{},
48
+		}}
49
+}
50
+func testBindings() []authorizationapi.PolicyBinding {
51
+	return []authorizationapi.PolicyBinding{
52
+		{
53
+			ObjectMeta: kapi.ObjectMeta{
54
+				Name:      "mallet",
55
+				Namespace: "mallet",
56
+			},
57
+			RoleBindings: map[string]authorizationapi.RoleBinding{
58
+				"projectAdmins": {
59
+					ObjectMeta: kapi.ObjectMeta{
60
+						Name:      "projectAdmins",
61
+						Namespace: "mallet",
62
+					},
63
+					RoleRef: kapi.ObjectReference{
64
+						Name:      "admin",
65
+						Namespace: "mallet",
66
+					},
67
+					Users: util.NewStringSet("Matthew"),
68
+				},
69
+				"viewers": {
70
+					ObjectMeta: kapi.ObjectMeta{
71
+						Name:      "viewers",
72
+						Namespace: "mallet",
73
+					},
74
+					RoleRef: kapi.ObjectReference{
75
+						Name:      "view",
76
+						Namespace: "mallet",
77
+					},
78
+					Users: util.NewStringSet("Victor"),
79
+				},
80
+				"editors": {
81
+					ObjectMeta: kapi.ObjectMeta{
82
+						Name:      "editors",
83
+						Namespace: "mallet",
84
+					},
85
+					RoleRef: kapi.ObjectReference{
86
+						Name:      "edit",
87
+						Namespace: "mallet",
88
+					},
89
+					Users: util.NewStringSet("Edgar"),
90
+				},
91
+			},
92
+		},
93
+	}
94
+}
... ...
@@ -34,16 +34,21 @@ func (r *PolicyRegistry) ListPolicies(ctx kapi.Context, labels, fields klabels.S
34 34
 	}
35 35
 
36 36
 	namespace := kapi.NamespaceValue(ctx)
37
-	if len(namespace) == 0 {
38
-		return nil, errors.New("invalid request.  Namespace parameter required.")
39
-	}
40
-
41 37
 	list := make([]authorizationapi.Policy, 0)
42
-	if namespacedPolicies, ok := r.Policies[namespace]; ok {
43
-		for _, curr := range namespacedPolicies {
44
-			list = append(list, curr)
38
+
39
+	if namespace == kapi.NamespaceAll {
40
+		for _, curr := range r.Policies {
41
+			for _, policy := range curr {
42
+				list = append(list, policy)
43
+			}
45 44
 		}
46 45
 
46
+	} else {
47
+		if namespacedPolicies, ok := r.Policies[namespace]; ok {
48
+			for _, curr := range namespacedPolicies {
49
+				list = append(list, curr)
50
+			}
51
+		}
47 52
 	}
48 53
 
49 54
 	return &authorizationapi.PolicyList{
... ...
@@ -34,16 +34,21 @@ func (r *PolicyBindingRegistry) ListPolicyBindings(ctx kapi.Context, labels, fie
34 34
 	}
35 35
 
36 36
 	namespace := kapi.NamespaceValue(ctx)
37
-	if len(namespace) == 0 {
38
-		return nil, errors.New("invalid request.  Namespace parameter required.")
39
-	}
40
-
41 37
 	list := make([]authorizationapi.PolicyBinding, 0)
42
-	if namespacedBindings, ok := r.PolicyBindings[namespace]; ok {
43
-		for _, curr := range namespacedBindings {
44
-			list = append(list, curr)
38
+
39
+	if namespace == kapi.NamespaceAll {
40
+		for _, curr := range r.PolicyBindings {
41
+			for _, binding := range curr {
42
+				list = append(list, binding)
43
+			}
45 44
 		}
46 45
 
46
+	} else {
47
+		if namespacedBindings, ok := r.PolicyBindings[namespace]; ok {
48
+			for _, curr := range namespacedBindings {
49
+				list = append(list, curr)
50
+			}
51
+		}
47 52
 	}
48 53
 
49 54
 	return &authorizationapi.PolicyBindingList{
... ...
@@ -75,6 +75,7 @@ import (
75 75
 
76 76
 	authorizationapi "github.com/openshift/origin/pkg/authorization/api"
77 77
 	"github.com/openshift/origin/pkg/authorization/authorizer"
78
+	policycache "github.com/openshift/origin/pkg/authorization/cache"
78 79
 	authorizationetcd "github.com/openshift/origin/pkg/authorization/registry/etcd"
79 80
 	policyregistry "github.com/openshift/origin/pkg/authorization/registry/policy"
80 81
 	policybindingregistry "github.com/openshift/origin/pkg/authorization/registry/policybinding"
... ...
@@ -115,6 +116,7 @@ type MasterConfig struct {
115 115
 	AuthorizationAttributeBuilder authorizer.AuthorizationAttributeBuilder
116 116
 	MasterAuthorizationNamespace  string
117 117
 
118
+	PolicyCache               *policycache.PolicyCache
118 119
 	ProjectAuthorizationCache *projectauth.AuthorizationCache
119 120
 
120 121
 	// Map requests to contexts
... ...
@@ -559,6 +561,11 @@ func (c *MasterConfig) RunProjectAuthorizationCache() {
559 559
 	c.ProjectAuthorizationCache.Run(period)
560 560
 }
561 561
 
562
+// RunPolicyCache starts the policy cache
563
+func (c *MasterConfig) RunPolicyCache() {
564
+	c.PolicyCache.Run()
565
+}
566
+
562 567
 // RunAssetServer starts the asset server for the OpenShift UI.
563 568
 func (c *MasterConfig) RunAssetServer() {
564 569
 	// TODO use	version.Get().GitCommit as an etag cache header
... ...
@@ -38,6 +38,7 @@ import (
38 38
 	"github.com/openshift/origin/pkg/auth/authenticator/request/unionrequest"
39 39
 	"github.com/openshift/origin/pkg/auth/authenticator/request/x509request"
40 40
 	"github.com/openshift/origin/pkg/authorization/authorizer"
41
+	policycache "github.com/openshift/origin/pkg/authorization/cache"
41 42
 	authorizationetcd "github.com/openshift/origin/pkg/authorization/registry/etcd"
42 43
 	"github.com/openshift/origin/pkg/authorization/rulevalidation"
43 44
 	projectauth "github.com/openshift/origin/pkg/project/auth"
... ...
@@ -367,11 +368,9 @@ func start(cfg *config, args []string) error {
367 367
 
368 368
 			EtcdHelper: etcdHelper,
369 369
 
370
-			AdmissionControl:              admit.NewAlwaysAdmit(),
371
-			Authorizer:                    newAuthorizer(etcdHelper, masterAuthorizationNamespace),
372
-			AuthorizationAttributeBuilder: newAuthorizationAttributeBuilder(requestContextMapper),
373
-			MasterAuthorizationNamespace:  masterAuthorizationNamespace,
374
-			RequestContextMapper:          requestContextMapper,
370
+			AdmissionControl:             admit.NewAlwaysAdmit(),
371
+			MasterAuthorizationNamespace: masterAuthorizationNamespace,
372
+			RequestContextMapper:         requestContextMapper,
375 373
 
376 374
 			ImageFor: imageResolverFn,
377 375
 		}
... ...
@@ -502,6 +501,13 @@ func start(cfg *config, args []string) error {
502 502
 
503 503
 		osmaster.BuildClients()
504 504
 
505
+		authorizationEtcd := authorizationetcd.New(etcdHelper)
506
+		osmaster.PolicyCache = policycache.NewPolicyCache(authorizationEtcd, authorizationEtcd)
507
+		osmaster.Authorizer = newAuthorizer(osmaster.PolicyCache, masterAuthorizationNamespace)
508
+		osmaster.AuthorizationAttributeBuilder = newAuthorizationAttributeBuilder(requestContextMapper)
509
+		// the policy cache must start before you attempt to start any other components
510
+		osmaster.RunPolicyCache()
511
+
505 512
 		osmaster.ProjectAuthorizationCache = projectauth.NewAuthorizationCache(
506 513
 			projectauth.NewReviewer(osmaster.PolicyClient()),
507 514
 			osmaster.KubeClient().Namespaces(),
... ...
@@ -653,9 +659,8 @@ func start(cfg *config, args []string) error {
653 653
 	return nil
654 654
 }
655 655
 
656
-func newAuthorizer(etcdHelper tools.EtcdHelper, masterAuthorizationNamespace string) authorizer.Authorizer {
657
-	authorizationEtcd := authorizationetcd.New(etcdHelper)
658
-	authorizer := authorizer.NewAuthorizer(masterAuthorizationNamespace, rulevalidation.NewDefaultRuleResolver(authorizationEtcd, authorizationEtcd))
656
+func newAuthorizer(policyCache *policycache.PolicyCache, masterAuthorizationNamespace string) authorizer.Authorizer {
657
+	authorizer := authorizer.NewAuthorizer(masterAuthorizationNamespace, rulevalidation.NewDefaultRuleResolver(policyCache, policyCache))
659 658
 	return authorizer
660 659
 }
661 660