package rulevalidation import ( "reflect" "testing" "k8s.io/kubernetes/pkg/util/sets" authorizationapi "github.com/openshift/origin/pkg/authorization/api" ) type escalationTest struct { ownerRules []authorizationapi.PolicyRule servantRules []authorizationapi.PolicyRule expectedCovered bool expectedUncoveredRules []authorizationapi.PolicyRule } func TestExactMatch(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("builds")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("builds")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestMultipleRulesCoveringSingleRule(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("delete"), Resources: sets.NewString("deployments")}, {Verbs: sets.NewString("delete"), Resources: sets.NewString("builds")}, {Verbs: sets.NewString("update"), Resources: sets.NewString("builds", "deployments")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("delete", "update"), Resources: sets.NewString("builds", "deployments")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestMultipleAPIGroupsCoveringSingleRule(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {APIGroups: []string{"group1"}, Verbs: sets.NewString("delete"), Resources: sets.NewString("deployments")}, {APIGroups: []string{"group1"}, Verbs: sets.NewString("delete"), Resources: sets.NewString("builds")}, {APIGroups: []string{"group1"}, Verbs: sets.NewString("update"), Resources: sets.NewString("builds", "deployments")}, {APIGroups: []string{"group2"}, Verbs: sets.NewString("delete"), Resources: sets.NewString("deployments")}, {APIGroups: []string{"group2"}, Verbs: sets.NewString("delete"), Resources: sets.NewString("builds")}, {APIGroups: []string{"group2"}, Verbs: sets.NewString("update"), Resources: sets.NewString("builds", "deployments")}, }, servantRules: []authorizationapi.PolicyRule{ {APIGroups: []string{"group1", "group2"}, Verbs: sets.NewString("delete", "update"), Resources: sets.NewString("builds", "deployments")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestSingleAPIGroupsCoveringMultiple(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {APIGroups: []string{"group1", "group2"}, Verbs: sets.NewString("delete", "update"), Resources: sets.NewString("builds", "deployments")}, }, servantRules: []authorizationapi.PolicyRule{ {APIGroups: []string{"group1"}, Verbs: sets.NewString("delete"), Resources: sets.NewString("deployments")}, {APIGroups: []string{"group1"}, Verbs: sets.NewString("delete"), Resources: sets.NewString("builds")}, {APIGroups: []string{"group1"}, Verbs: sets.NewString("update"), Resources: sets.NewString("builds", "deployments")}, {APIGroups: []string{"group2"}, Verbs: sets.NewString("delete"), Resources: sets.NewString("deployments")}, {APIGroups: []string{"group2"}, Verbs: sets.NewString("delete"), Resources: sets.NewString("builds")}, {APIGroups: []string{"group2"}, Verbs: sets.NewString("update"), Resources: sets.NewString("builds", "deployments")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestMultipleRulesMissingSingleVerbResourceCombination(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("delete", "update"), Resources: sets.NewString("builds", "deployments")}, {Verbs: sets.NewString("delete"), Resources: sets.NewString("pods")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("delete", "update"), Resources: sets.NewString("builds", "deployments", "pods")}, }, expectedCovered: false, expectedUncoveredRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("update"), Resources: sets.NewString("pods")}, }, }.test(t) } func TestResourceGroupCoveringEnumerated(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("create", "delete", "update"), Resources: sets.NewString("resourcegroup:builds")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("delete", "update"), Resources: sets.NewString("builds", "buildconfigs")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestEnumeratedCoveringResourceGroup(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("delete", "update"), Resources: sets.NewString("builds", "buildconfigs", "buildlogs", "buildconfigs/instantiate", "buildconfigs/instantiatebinary", "builds/log", "builds/clone", "buildconfigs/webhooks")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("delete", "update"), Resources: sets.NewString("resourcegroup:builds")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestEnumeratedMissingPartOfResourceGroup(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("delete", "update"), Resources: sets.NewString("builds", "buildconfigs")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("delete", "update"), Resources: sets.NewString("resourcegroup:builds")}, }, expectedCovered: false, expectedUncoveredRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("delete"), Resources: sets.NewString("buildlogs")}, {Verbs: sets.NewString("update"), Resources: sets.NewString("buildlogs")}, {Verbs: sets.NewString("delete"), Resources: sets.NewString("buildconfigs/instantiate")}, {Verbs: sets.NewString("update"), Resources: sets.NewString("buildconfigs/instantiate")}, {Verbs: sets.NewString("delete"), Resources: sets.NewString("buildconfigs/instantiatebinary")}, {Verbs: sets.NewString("update"), Resources: sets.NewString("buildconfigs/instantiatebinary")}, {Verbs: sets.NewString("delete"), Resources: sets.NewString("builds/log")}, {Verbs: sets.NewString("update"), Resources: sets.NewString("builds/log")}, {Verbs: sets.NewString("delete"), Resources: sets.NewString("builds/clone")}, {Verbs: sets.NewString("update"), Resources: sets.NewString("builds/clone")}, {Verbs: sets.NewString("delete"), Resources: sets.NewString("buildconfigs/webhooks")}, {Verbs: sets.NewString("update"), Resources: sets.NewString("buildconfigs/webhooks")}, }, }.test(t) } func TestAPIGroupStarCoveringMultiple(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {APIGroups: []string{"*"}, Verbs: sets.NewString("get"), Resources: sets.NewString("roles")}, }, servantRules: []authorizationapi.PolicyRule{ {APIGroups: []string{"group1", "group2"}, Verbs: sets.NewString("get"), Resources: sets.NewString("roles")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestEnumerationNotCoveringAPIGroupStar(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {APIGroups: []string{"dummy-group"}, Verbs: sets.NewString("get"), Resources: sets.NewString("roles")}, }, servantRules: []authorizationapi.PolicyRule{ {APIGroups: []string{"*"}, Verbs: sets.NewString("get"), Resources: sets.NewString("roles")}, }, expectedCovered: false, expectedUncoveredRules: []authorizationapi.PolicyRule{ {APIGroups: []string{"*"}, Verbs: sets.NewString("get"), Resources: sets.NewString("roles")}, }, }.test(t) } func TestAPIGroupStarCoveringStar(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {APIGroups: []string{"*"}, Verbs: sets.NewString("get"), Resources: sets.NewString("roles")}, }, servantRules: []authorizationapi.PolicyRule{ {APIGroups: []string{"*"}, Verbs: sets.NewString("get"), Resources: sets.NewString("roles")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestVerbStarCoveringMultiple(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("*"), Resources: sets.NewString("roles")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("watch", "list"), Resources: sets.NewString("roles")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestEnumerationNotCoveringVerbStar(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get", "list", "watch", "create", "update", "delete", "exec"), Resources: sets.NewString("roles")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("*"), Resources: sets.NewString("roles")}, }, expectedCovered: false, expectedUncoveredRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("*"), Resources: sets.NewString("roles")}, }, }.test(t) } func TestVerbStarCoveringStar(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("*"), Resources: sets.NewString("roles")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("*"), Resources: sets.NewString("roles")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestResourceStarCoveringMultiple(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("*")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("resourcegroup:deployments")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestEnumerationNotCoveringResourceStar(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("roles", "resourcegroup:deployments")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("*")}, }, expectedCovered: false, expectedUncoveredRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("*")}, }, }.test(t) } func TestResourceStarCoveringStar(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("*")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("*")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestResourceNameEmptyCoveringMultiple(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("pods"), ResourceNames: sets.NewString()}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("pods"), ResourceNames: sets.NewString("foo", "bar")}, }, expectedCovered: true, expectedUncoveredRules: []authorizationapi.PolicyRule{}, }.test(t) } func TestEnumerationNotCoveringResourceNameEmpty(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("pods"), ResourceNames: sets.NewString("foo", "bar")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("pods"), ResourceNames: sets.NewString()}, }, expectedCovered: false, expectedUncoveredRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("pods")}, }, }.test(t) } func TestNonResourceCoversExactMatch(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), NonResourceURLs: sets.NewString("/foo")}, {Verbs: sets.NewString("get"), NonResourceURLs: sets.NewString("/bar")}, {Verbs: sets.NewString("post"), NonResourceURLs: sets.NewString("/baz")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), NonResourceURLs: sets.NewString("/foo", "/bar")}, {Verbs: sets.NewString("post"), NonResourceURLs: sets.NewString("/baz")}, }, expectedCovered: true, }.test(t) } func TestNonResourceCoversWildcard(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get", "post"), NonResourceURLs: sets.NewString("/foo/*", "/bar/*")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get", "post"), NonResourceURLs: sets.NewString("/foo/", "/bar/")}, {Verbs: sets.NewString("get", "post"), NonResourceURLs: sets.NewString("/foo/1", "/bar/1")}, {Verbs: sets.NewString("get", "post"), NonResourceURLs: sets.NewString("/foo/*", "/bar/*")}, }, expectedCovered: true, }.test(t) } func TestNonResourceUncovered(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), NonResourceURLs: sets.NewString("/foo")}, {Verbs: sets.NewString("post"), NonResourceURLs: sets.NewString("/bar/*")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get", "post"), NonResourceURLs: sets.NewString("/foo", "/foo/1", "/foo/*", "/bar/baz", "/bar/*")}, }, expectedCovered: false, expectedUncoveredRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("post"), NonResourceURLs: sets.NewString("/foo")}, {Verbs: sets.NewString("get"), NonResourceURLs: sets.NewString("/foo/1")}, {Verbs: sets.NewString("post"), NonResourceURLs: sets.NewString("/foo/1")}, {Verbs: sets.NewString("get"), NonResourceURLs: sets.NewString("/foo/*")}, {Verbs: sets.NewString("post"), NonResourceURLs: sets.NewString("/foo/*")}, {Verbs: sets.NewString("get"), NonResourceURLs: sets.NewString("/bar/baz")}, {Verbs: sets.NewString("get"), NonResourceURLs: sets.NewString("/bar/*")}, }, }.test(t) } func TestMixedResourceNonResourceCovered(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("pods"), NonResourceURLs: sets.NewString("/api", "/api/*")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("pods"), NonResourceURLs: sets.NewString("/api", "/api/v1")}, }, expectedCovered: true, }.test(t) } func TestMixedResourceNonResourceUncovered(t *testing.T) { escalationTest{ ownerRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get"), Resources: sets.NewString("pods"), NonResourceURLs: sets.NewString("/api", "/api/*")}, }, servantRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("get", "post"), Resources: sets.NewString("pods", "builds"), NonResourceURLs: sets.NewString("/api", "/apis")}, }, expectedCovered: false, expectedUncoveredRules: []authorizationapi.PolicyRule{ {Verbs: sets.NewString("post"), Resources: sets.NewString("pods")}, {Verbs: sets.NewString("get"), Resources: sets.NewString("builds")}, {Verbs: sets.NewString("post"), Resources: sets.NewString("builds")}, {Verbs: sets.NewString("post"), NonResourceURLs: sets.NewString("/api")}, {Verbs: sets.NewString("get"), NonResourceURLs: sets.NewString("/apis")}, {Verbs: sets.NewString("post"), NonResourceURLs: sets.NewString("/apis")}, }, }.test(t) } func (test escalationTest) test(t *testing.T) { actualCovered, actualUncoveredRules := Covers(test.ownerRules, test.servantRules) if actualCovered != test.expectedCovered { t.Errorf("expected %v, but got %v", test.expectedCovered, actualCovered) } if !rulesMatch(test.expectedUncoveredRules, actualUncoveredRules) { t.Errorf("expected %v, but got %v", test.expectedUncoveredRules, actualUncoveredRules) } } func rulesMatch(expectedRules, actualRules []authorizationapi.PolicyRule) bool { if len(expectedRules) != len(actualRules) { return false } for _, expectedRule := range expectedRules { found := false for _, actualRule := range actualRules { if reflect.DeepEqual(expectedRule, actualRule) { found = true break } } if !found { return false } } return true }