package auth import ( "errors" "fmt" "strings" "sync" "time" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/auth/user" "k8s.io/kubernetes/pkg/client/cache" kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" utilwait "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" authorizationapi "github.com/openshift/origin/pkg/authorization/api" "github.com/openshift/origin/pkg/authorization/authorizer/scope" "github.com/openshift/origin/pkg/client" ) // Lister enforces ability to enumerate a resource based on policy type Lister interface { // List returns the list of Namespace items that the user can access List(user user.Info) (*kapi.NamespaceList, error) } // subjectRecord is a cache record for the set of namespaces a subject can access type subjectRecord struct { subject string namespaces sets.String } // reviewRequest is the resource we want to review type reviewRequest struct { namespace string // the resource version of the namespace that was observed to make this request namespaceResourceVersion string // the map of policy uid to resource version that was observed to make this request policyUIDToResourceVersion map[types.UID]string // the map of policy binding uid to resource version that was observed to make this request policyBindingUIDToResourceVersion map[types.UID]string } // reviewRecord is a cache record for the result of a resource access review type reviewRecord struct { *reviewRequest users []string groups []string } // reviewRecordKeyFn is a key func for reviewRecord objects func reviewRecordKeyFn(obj interface{}) (string, error) { reviewRecord, ok := obj.(*reviewRecord) if !ok { return "", fmt.Errorf("expected reviewRecord") } return reviewRecord.namespace, nil } // subjectRecordKeyFn is a key func for subjectRecord objects func subjectRecordKeyFn(obj interface{}) (string, error) { subjectRecord, ok := obj.(*subjectRecord) if !ok { return "", fmt.Errorf("expected subjectRecord") } return subjectRecord.subject, nil } type skipSynchronizer interface { // SkipSynchronize returns true if if its safe to skip synchronization of the cache based on provided token from previous observation SkipSynchronize(prevState string, versionedObjects ...LastSyncResourceVersioner) (skip bool, currentState string) } // LastSyncResourceVersioner is any object that can divulge a LastSyncResourceVersion type LastSyncResourceVersioner interface { LastSyncResourceVersion() string } type unchangingLastSyncResourceVersioner struct{} func (u unchangingLastSyncResourceVersioner) LastSyncResourceVersion() string { return "0" } type unionLastSyncResourceVersioner []LastSyncResourceVersioner func (u unionLastSyncResourceVersioner) LastSyncResourceVersion() string { resourceVersions := []string{} for _, versioner := range u { resourceVersions = append(resourceVersions, versioner.LastSyncResourceVersion()) } return strings.Join(resourceVersions, "") } type statelessSkipSynchronizer struct{} func (rs *statelessSkipSynchronizer) SkipSynchronize(prevState string, versionedObjects ...LastSyncResourceVersioner) (skip bool, currentState string) { resourceVersions := []string{} for i := range versionedObjects { resourceVersions = append(resourceVersions, versionedObjects[i].LastSyncResourceVersion()) } currentState = strings.Join(resourceVersions, ",") skip = currentState == prevState return skip, currentState } type neverSkipSynchronizer struct{} func (s *neverSkipSynchronizer) SkipSynchronize(prevState string, versionedObjects ...LastSyncResourceVersioner) (bool, string) { return false, "" } // AuthorizationCache maintains a cache on the set of namespaces a user or group can access. type AuthorizationCache struct { // allKnownNamespaces we track all the known namespaces, so we can detect deletes. // TODO remove this in favor of a list/watch mechanism for projects allKnownNamespaces sets.String namespaceStore cache.Store namespaceInterface kcoreclient.NamespaceInterface lastSyncResourceVersioner LastSyncResourceVersioner clusterPolicyLister client.SyncedClusterPoliciesListerInterface clusterPolicyBindingLister client.SyncedClusterPolicyBindingsListerInterface policyNamespacer client.SyncedPoliciesListerNamespacer policyBindingNamespacer client.SyncedPolicyBindingsListerNamespacer policyLastSyncResourceVersioner LastSyncResourceVersioner reviewRecordStore cache.Store userSubjectRecordStore cache.Store groupSubjectRecordStore cache.Store clusterBindingResourceVersions sets.String clusterPolicyResourceVersions sets.String skip skipSynchronizer lastState string reviewer Reviewer syncHandler func(request *reviewRequest, userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) error watchers []CacheWatcher watcherLock sync.Mutex } // NewAuthorizationCache creates a new AuthorizationCache func NewAuthorizationCache(reviewer Reviewer, namespaceInterface kcoreclient.NamespaceInterface, clusterPolicyLister client.SyncedClusterPoliciesListerInterface, clusterPolicyBindingLister client.SyncedClusterPolicyBindingsListerInterface, policyNamespacer client.SyncedPoliciesListerNamespacer, policyBindingNamespacer client.SyncedPolicyBindingsListerNamespacer, ) *AuthorizationCache { result := &AuthorizationCache{ allKnownNamespaces: sets.String{}, namespaceStore: cache.NewStore(cache.MetaNamespaceKeyFunc), namespaceInterface: namespaceInterface, lastSyncResourceVersioner: &unchangingLastSyncResourceVersioner{}, clusterPolicyResourceVersions: sets.NewString(), clusterBindingResourceVersions: sets.NewString(), clusterPolicyLister: clusterPolicyLister, clusterPolicyBindingLister: clusterPolicyBindingLister, policyNamespacer: policyNamespacer, policyBindingNamespacer: policyBindingNamespacer, policyLastSyncResourceVersioner: unionLastSyncResourceVersioner{clusterPolicyLister, clusterPolicyBindingLister, policyNamespacer, policyBindingNamespacer}, reviewRecordStore: cache.NewStore(reviewRecordKeyFn), userSubjectRecordStore: cache.NewStore(subjectRecordKeyFn), groupSubjectRecordStore: cache.NewStore(subjectRecordKeyFn), reviewer: reviewer, skip: &neverSkipSynchronizer{}, watchers: []CacheWatcher{}, } result.syncHandler = result.syncRequest return result } // Run begins watching and synchronizing the cache func (ac *AuthorizationCache) Run(period time.Duration) { namespaceReflector := cache.NewReflector( &cache.ListWatch{ ListFunc: func(options kapi.ListOptions) (runtime.Object, error) { return ac.namespaceInterface.List(options) }, WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) { return ac.namespaceInterface.Watch(options) }, }, &kapi.Namespace{}, ac.namespaceStore, 2*time.Minute, ) namespaceReflector.Run() ac.lastSyncResourceVersioner = namespaceReflector ac.skip = &statelessSkipSynchronizer{} go utilwait.Forever(func() { ac.synchronize() }, period) } func (ac *AuthorizationCache) AddWatcher(watcher CacheWatcher) { ac.watcherLock.Lock() defer ac.watcherLock.Unlock() ac.watchers = append(ac.watchers, watcher) } func (ac *AuthorizationCache) RemoveWatcher(watcher CacheWatcher) { ac.watcherLock.Lock() defer ac.watcherLock.Unlock() lastIndex := len(ac.watchers) - 1 for i := 0; i < len(ac.watchers); i++ { if ac.watchers[i] == watcher { if i < lastIndex { // if we're not the last element, shift copy(ac.watchers[i:], ac.watchers[i+1:]) } ac.watchers = ac.watchers[:lastIndex] break } } } func (ac *AuthorizationCache) GetClusterPolicyLister() client.SyncedClusterPoliciesListerInterface { return ac.clusterPolicyLister } // synchronizeNamespaces synchronizes access over each namespace and returns a set of namespace names that were looked at in last sync func (ac *AuthorizationCache) synchronizeNamespaces(userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) sets.String { namespaceSet := sets.NewString() items := ac.namespaceStore.List() for i := range items { namespace := items[i].(*kapi.Namespace) namespaceSet.Insert(namespace.Name) reviewRequest := &reviewRequest{ namespace: namespace.Name, namespaceResourceVersion: namespace.ResourceVersion, } if err := ac.syncHandler(reviewRequest, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore); err != nil { utilruntime.HandleError(fmt.Errorf("error synchronizing: %v", err)) } } return namespaceSet } // synchronizePolicies synchronizes access over each policy func (ac *AuthorizationCache) synchronizePolicies(userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) { policyList, err := ac.policyNamespacer.Policies(kapi.NamespaceAll).List(kapi.ListOptions{}) if err != nil { utilruntime.HandleError(err) return } for _, policy := range policyList.Items { reviewRequest := &reviewRequest{ namespace: policy.Namespace, policyUIDToResourceVersion: map[types.UID]string{policy.UID: policy.ResourceVersion}, } if err := ac.syncHandler(reviewRequest, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore); err != nil { utilruntime.HandleError(fmt.Errorf("error synchronizing: %v", err)) } } } // synchronizePolicyBindings synchronizes access over each policy binding func (ac *AuthorizationCache) synchronizePolicyBindings(userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) { policyBindingList, err := ac.policyBindingNamespacer.PolicyBindings(kapi.NamespaceAll).List(kapi.ListOptions{}) if err != nil { utilruntime.HandleError(err) return } for _, policyBinding := range policyBindingList.Items { reviewRequest := &reviewRequest{ namespace: policyBinding.Namespace, policyBindingUIDToResourceVersion: map[types.UID]string{policyBinding.UID: policyBinding.ResourceVersion}, } if err := ac.syncHandler(reviewRequest, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore); err != nil { utilruntime.HandleError(fmt.Errorf("error synchronizing: %v", err)) } } } // purgeDeletedNamespaces will remove all namespaces enumerated in a reviewRecordStore that are not in the namespace set func (ac *AuthorizationCache) purgeDeletedNamespaces(oldNamespaces, newNamespaces sets.String, userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) { reviewRecordItems := reviewRecordStore.List() for i := range reviewRecordItems { reviewRecord := reviewRecordItems[i].(*reviewRecord) if !newNamespaces.Has(reviewRecord.namespace) { deleteNamespaceFromSubjects(userSubjectRecordStore, reviewRecord.users, reviewRecord.namespace) deleteNamespaceFromSubjects(groupSubjectRecordStore, reviewRecord.groups, reviewRecord.namespace) reviewRecordStore.Delete(reviewRecord) } } for namespace := range oldNamespaces.Difference(newNamespaces) { ac.notifyWatchers(namespace, nil, sets.String{}, sets.String{}) } } // invalidateCache returns true if there was a change in the cluster namespace that holds cluster policy and policy bindings func (ac *AuthorizationCache) invalidateCache() bool { invalidateCache := false clusterPolicyList, err := ac.clusterPolicyLister.ClusterPolicies().List(kapi.ListOptions{}) if err != nil { utilruntime.HandleError(err) return invalidateCache } temporaryVersions := sets.NewString() for _, clusterPolicy := range clusterPolicyList.Items { temporaryVersions.Insert(clusterPolicy.ResourceVersion) } if (len(ac.clusterPolicyResourceVersions) != len(temporaryVersions)) || !ac.clusterPolicyResourceVersions.HasAll(temporaryVersions.List()...) { invalidateCache = true ac.clusterPolicyResourceVersions = temporaryVersions } clusterPolicyBindingList, err := ac.clusterPolicyBindingLister.ClusterPolicyBindings().List(kapi.ListOptions{}) if err != nil { utilruntime.HandleError(err) return invalidateCache } temporaryVersions.Delete(temporaryVersions.List()...) for _, clusterPolicyBinding := range clusterPolicyBindingList.Items { temporaryVersions.Insert(clusterPolicyBinding.ResourceVersion) } if (len(ac.clusterBindingResourceVersions) != len(temporaryVersions)) || !ac.clusterBindingResourceVersions.HasAll(temporaryVersions.List()...) { invalidateCache = true ac.clusterBindingResourceVersions = temporaryVersions } return invalidateCache } // synchronize runs a a full synchronization over the cache data. it must be run in a single-writer model, it's not thread-safe by design. func (ac *AuthorizationCache) synchronize() { // if none of our internal reflectors changed, then we can skip reviewing the cache skip, currentState := ac.skip.SkipSynchronize(ac.lastState, ac.lastSyncResourceVersioner, ac.policyLastSyncResourceVersioner) if skip { return } // by default, we update our current caches and do an incremental change userSubjectRecordStore := ac.userSubjectRecordStore groupSubjectRecordStore := ac.groupSubjectRecordStore reviewRecordStore := ac.reviewRecordStore // if there was a global change that forced complete invalidation, we rebuild our cache and do a fast swap at end invalidateCache := ac.invalidateCache() if invalidateCache { userSubjectRecordStore = cache.NewStore(subjectRecordKeyFn) groupSubjectRecordStore = cache.NewStore(subjectRecordKeyFn) reviewRecordStore = cache.NewStore(reviewRecordKeyFn) } // iterate over caches and synchronize our three caches newKnownNamespaces := ac.synchronizeNamespaces(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore) ac.synchronizePolicies(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore) ac.synchronizePolicyBindings(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore) ac.purgeDeletedNamespaces(ac.allKnownNamespaces, newKnownNamespaces, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore) // if we did a full rebuild, now we swap the fully rebuilt cache if invalidateCache { ac.userSubjectRecordStore = userSubjectRecordStore ac.groupSubjectRecordStore = groupSubjectRecordStore ac.reviewRecordStore = reviewRecordStore } ac.allKnownNamespaces = newKnownNamespaces // we were able to update our cache since this last observation period ac.lastState = currentState } // syncRequest takes a reviewRequest and determines if it should update the caches supplied, it is not thread-safe func (ac *AuthorizationCache) syncRequest(request *reviewRequest, userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) error { lastKnownValue, err := lastKnown(reviewRecordStore, request.namespace) if err != nil { return err } if skipReview(request, lastKnownValue) { return nil } namespace := request.namespace review, err := ac.reviewer.Review(namespace) if err != nil { return err } usersToRemove := sets.NewString() groupsToRemove := sets.NewString() if lastKnownValue != nil { usersToRemove.Insert(lastKnownValue.users...) usersToRemove.Delete(review.Users()...) groupsToRemove.Insert(lastKnownValue.groups...) groupsToRemove.Delete(review.Groups()...) } deleteNamespaceFromSubjects(userSubjectRecordStore, usersToRemove.List(), namespace) deleteNamespaceFromSubjects(groupSubjectRecordStore, groupsToRemove.List(), namespace) addSubjectsToNamespace(userSubjectRecordStore, review.Users(), namespace) addSubjectsToNamespace(groupSubjectRecordStore, review.Groups(), namespace) cacheReviewRecord(request, lastKnownValue, review, reviewRecordStore) ac.notifyWatchers(namespace, lastKnownValue, sets.NewString(review.Users()...), sets.NewString(review.Groups()...)) if errMsg := review.EvaluationError(); len(errMsg) > 0 { return errors.New(errMsg) } return nil } // List returns the set of namespace names the user has access to view func (ac *AuthorizationCache) List(userInfo user.Info) (*kapi.NamespaceList, error) { keys := sets.String{} user := userInfo.GetName() groups := userInfo.GetGroups() obj, exists, _ := ac.userSubjectRecordStore.GetByKey(user) if exists { subjectRecord := obj.(*subjectRecord) keys.Insert(subjectRecord.namespaces.List()...) } for _, group := range groups { obj, exists, _ := ac.groupSubjectRecordStore.GetByKey(group) if exists { subjectRecord := obj.(*subjectRecord) keys.Insert(subjectRecord.namespaces.List()...) } } allowedNamespaces, err := scope.ScopesToVisibleNamespaces(userInfo.GetExtra()[authorizationapi.ScopesKey], ac.clusterPolicyLister.ClusterPolicies()) if err != nil { return nil, err } namespaceList := &kapi.NamespaceList{} for key := range keys { namespaceObj, exists, err := ac.namespaceStore.GetByKey(key) if err != nil { return nil, err } if exists { namespace := *namespaceObj.(*kapi.Namespace) if allowedNamespaces.Has("*") || allowedNamespaces.Has(namespace.Name) { namespaceList.Items = append(namespaceList.Items, namespace) } } } return namespaceList, nil } func (ac *AuthorizationCache) ReadyForAccess() bool { return len(ac.lastState) > 0 } // skipReview returns true if the request was satisfied by the lastKnown func skipReview(request *reviewRequest, lastKnownValue *reviewRecord) bool { // if your request is nil, you have no reason to make a review if request == nil { return true } // if you know nothing from a prior review, you better make a request if lastKnownValue == nil { return false } // if you are asking about a specific namespace, and you think you knew about a different one, you better check again if request.namespace != lastKnownValue.namespace { return false } // if you are making your request relative to a specific resource version, only make it if its different if len(request.namespaceResourceVersion) > 0 && request.namespaceResourceVersion != lastKnownValue.namespaceResourceVersion { return false } // if you see a new policy binding, or a newer version, we need to do a review for k, v := range request.policyBindingUIDToResourceVersion { oldValue, exists := lastKnownValue.policyBindingUIDToResourceVersion[k] if !exists || v != oldValue { return false } } // if you see a new policy, or a newer version, we need to do a review for k, v := range request.policyUIDToResourceVersion { oldValue, exists := lastKnownValue.policyUIDToResourceVersion[k] if !exists || v != oldValue { return false } } return true } // deleteNamespaceFromSubjects removes the namespace from each subject // if no other namespaces are active to that subject, it will also delete the subject from the cache entirely func deleteNamespaceFromSubjects(subjectRecordStore cache.Store, subjects []string, namespace string) { for _, subject := range subjects { obj, exists, _ := subjectRecordStore.GetByKey(subject) if exists { subjectRecord := obj.(*subjectRecord) delete(subjectRecord.namespaces, namespace) if len(subjectRecord.namespaces) == 0 { subjectRecordStore.Delete(subjectRecord) } } } } // addSubjectsToNamespace adds the specified namespace to each subject func addSubjectsToNamespace(subjectRecordStore cache.Store, subjects []string, namespace string) { for _, subject := range subjects { var item *subjectRecord obj, exists, _ := subjectRecordStore.GetByKey(subject) if exists { item = obj.(*subjectRecord) } else { item = &subjectRecord{subject: subject, namespaces: sets.NewString()} subjectRecordStore.Add(item) } item.namespaces.Insert(namespace) } } func (ac *AuthorizationCache) notifyWatchers(namespace string, exists *reviewRecord, users, groups sets.String) { ac.watcherLock.Lock() defer ac.watcherLock.Unlock() for _, watcher := range ac.watchers { watcher.GroupMembershipChanged(namespace, users, groups) } } // cacheReviewRecord updates the cache based on the request processed func cacheReviewRecord(request *reviewRequest, lastKnownValue *reviewRecord, review Review, reviewRecordStore cache.Store) { reviewRecord := &reviewRecord{ reviewRequest: &reviewRequest{namespace: request.namespace, policyUIDToResourceVersion: map[types.UID]string{}, policyBindingUIDToResourceVersion: map[types.UID]string{}}, groups: review.Groups(), users: review.Users(), } // keep what we last believe we knew by default if lastKnownValue != nil { reviewRecord.namespaceResourceVersion = lastKnownValue.namespaceResourceVersion for k, v := range lastKnownValue.policyUIDToResourceVersion { reviewRecord.policyUIDToResourceVersion[k] = v } for k, v := range lastKnownValue.policyBindingUIDToResourceVersion { reviewRecord.policyBindingUIDToResourceVersion[k] = v } } // update the review record relative to what drove this request if len(request.namespaceResourceVersion) > 0 { reviewRecord.namespaceResourceVersion = request.namespaceResourceVersion } for k, v := range request.policyUIDToResourceVersion { reviewRecord.policyUIDToResourceVersion[k] = v } for k, v := range request.policyBindingUIDToResourceVersion { reviewRecord.policyBindingUIDToResourceVersion[k] = v } // update the cache record reviewRecordStore.Add(reviewRecord) } func lastKnown(reviewRecordStore cache.Store, namespace string) (*reviewRecord, error) { obj, exists, err := reviewRecordStore.GetByKey(namespace) if err != nil { return nil, err } if exists { return obj.(*reviewRecord), nil } return nil, nil }