package auth
import (
"errors"
"fmt"
"strings"
"sync"
"time"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/auth/user"
"k8s.io/kubernetes/pkg/client/cache"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
utilwait "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
authorizationapi "github.com/openshift/origin/pkg/authorization/api"
"github.com/openshift/origin/pkg/authorization/authorizer/scope"
"github.com/openshift/origin/pkg/client"
)
// Lister enforces ability to enumerate a resource based on policy
type Lister interface {
// List returns the list of Namespace items that the user can access
List(user user.Info) (*kapi.NamespaceList, error)
}
// subjectRecord is a cache record for the set of namespaces a subject can access
type subjectRecord struct {
subject string
namespaces sets.String
}
// reviewRequest is the resource we want to review
type reviewRequest struct {
namespace string
// the resource version of the namespace that was observed to make this request
namespaceResourceVersion string
// the map of policy uid to resource version that was observed to make this request
policyUIDToResourceVersion map[types.UID]string
// the map of policy binding uid to resource version that was observed to make this request
policyBindingUIDToResourceVersion map[types.UID]string
}
// reviewRecord is a cache record for the result of a resource access review
type reviewRecord struct {
*reviewRequest
users []string
groups []string
}
// reviewRecordKeyFn is a key func for reviewRecord objects
func reviewRecordKeyFn(obj interface{}) (string, error) {
reviewRecord, ok := obj.(*reviewRecord)
if !ok {
return "", fmt.Errorf("expected reviewRecord")
}
return reviewRecord.namespace, nil
}
// subjectRecordKeyFn is a key func for subjectRecord objects
func subjectRecordKeyFn(obj interface{}) (string, error) {
subjectRecord, ok := obj.(*subjectRecord)
if !ok {
return "", fmt.Errorf("expected subjectRecord")
}
return subjectRecord.subject, nil
}
type skipSynchronizer interface {
// SkipSynchronize returns true if if its safe to skip synchronization of the cache based on provided token from previous observation
SkipSynchronize(prevState string, versionedObjects ...LastSyncResourceVersioner) (skip bool, currentState string)
}
// LastSyncResourceVersioner is any object that can divulge a LastSyncResourceVersion
type LastSyncResourceVersioner interface {
LastSyncResourceVersion() string
}
type unchangingLastSyncResourceVersioner struct{}
func (u unchangingLastSyncResourceVersioner) LastSyncResourceVersion() string {
return "0"
}
type unionLastSyncResourceVersioner []LastSyncResourceVersioner
func (u unionLastSyncResourceVersioner) LastSyncResourceVersion() string {
resourceVersions := []string{}
for _, versioner := range u {
resourceVersions = append(resourceVersions, versioner.LastSyncResourceVersion())
}
return strings.Join(resourceVersions, "")
}
type statelessSkipSynchronizer struct{}
func (rs *statelessSkipSynchronizer) SkipSynchronize(prevState string, versionedObjects ...LastSyncResourceVersioner) (skip bool, currentState string) {
resourceVersions := []string{}
for i := range versionedObjects {
resourceVersions = append(resourceVersions, versionedObjects[i].LastSyncResourceVersion())
}
currentState = strings.Join(resourceVersions, ",")
skip = currentState == prevState
return skip, currentState
}
type neverSkipSynchronizer struct{}
func (s *neverSkipSynchronizer) SkipSynchronize(prevState string, versionedObjects ...LastSyncResourceVersioner) (bool, string) {
return false, ""
}
// AuthorizationCache maintains a cache on the set of namespaces a user or group can access.
type AuthorizationCache struct {
// allKnownNamespaces we track all the known namespaces, so we can detect deletes.
// TODO remove this in favor of a list/watch mechanism for projects
allKnownNamespaces sets.String
namespaceStore cache.Store
namespaceInterface kcoreclient.NamespaceInterface
lastSyncResourceVersioner LastSyncResourceVersioner
clusterPolicyLister client.SyncedClusterPoliciesListerInterface
clusterPolicyBindingLister client.SyncedClusterPolicyBindingsListerInterface
policyNamespacer client.SyncedPoliciesListerNamespacer
policyBindingNamespacer client.SyncedPolicyBindingsListerNamespacer
policyLastSyncResourceVersioner LastSyncResourceVersioner
reviewRecordStore cache.Store
userSubjectRecordStore cache.Store
groupSubjectRecordStore cache.Store
clusterBindingResourceVersions sets.String
clusterPolicyResourceVersions sets.String
skip skipSynchronizer
lastState string
reviewer Reviewer
syncHandler func(request *reviewRequest, userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) error
watchers []CacheWatcher
watcherLock sync.Mutex
}
// NewAuthorizationCache creates a new AuthorizationCache
func NewAuthorizationCache(reviewer Reviewer, namespaceInterface kcoreclient.NamespaceInterface,
clusterPolicyLister client.SyncedClusterPoliciesListerInterface, clusterPolicyBindingLister client.SyncedClusterPolicyBindingsListerInterface,
policyNamespacer client.SyncedPoliciesListerNamespacer, policyBindingNamespacer client.SyncedPolicyBindingsListerNamespacer,
) *AuthorizationCache {
result := &AuthorizationCache{
allKnownNamespaces: sets.String{},
namespaceStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
namespaceInterface: namespaceInterface,
lastSyncResourceVersioner: &unchangingLastSyncResourceVersioner{},
clusterPolicyResourceVersions: sets.NewString(),
clusterBindingResourceVersions: sets.NewString(),
clusterPolicyLister: clusterPolicyLister,
clusterPolicyBindingLister: clusterPolicyBindingLister,
policyNamespacer: policyNamespacer,
policyBindingNamespacer: policyBindingNamespacer,
policyLastSyncResourceVersioner: unionLastSyncResourceVersioner{clusterPolicyLister, clusterPolicyBindingLister, policyNamespacer, policyBindingNamespacer},
reviewRecordStore: cache.NewStore(reviewRecordKeyFn),
userSubjectRecordStore: cache.NewStore(subjectRecordKeyFn),
groupSubjectRecordStore: cache.NewStore(subjectRecordKeyFn),
reviewer: reviewer,
skip: &neverSkipSynchronizer{},
watchers: []CacheWatcher{},
}
result.syncHandler = result.syncRequest
return result
}
// Run begins watching and synchronizing the cache
func (ac *AuthorizationCache) Run(period time.Duration) {
namespaceReflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
return ac.namespaceInterface.List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
return ac.namespaceInterface.Watch(options)
},
},
&kapi.Namespace{},
ac.namespaceStore,
2*time.Minute,
)
namespaceReflector.Run()
ac.lastSyncResourceVersioner = namespaceReflector
ac.skip = &statelessSkipSynchronizer{}
go utilwait.Forever(func() { ac.synchronize() }, period)
}
func (ac *AuthorizationCache) AddWatcher(watcher CacheWatcher) {
ac.watcherLock.Lock()
defer ac.watcherLock.Unlock()
ac.watchers = append(ac.watchers, watcher)
}
func (ac *AuthorizationCache) RemoveWatcher(watcher CacheWatcher) {
ac.watcherLock.Lock()
defer ac.watcherLock.Unlock()
lastIndex := len(ac.watchers) - 1
for i := 0; i < len(ac.watchers); i++ {
if ac.watchers[i] == watcher {
if i < lastIndex {
// if we're not the last element, shift
copy(ac.watchers[i:], ac.watchers[i+1:])
}
ac.watchers = ac.watchers[:lastIndex]
break
}
}
}
func (ac *AuthorizationCache) GetClusterPolicyLister() client.SyncedClusterPoliciesListerInterface {
return ac.clusterPolicyLister
}
// synchronizeNamespaces synchronizes access over each namespace and returns a set of namespace names that were looked at in last sync
func (ac *AuthorizationCache) synchronizeNamespaces(userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) sets.String {
namespaceSet := sets.NewString()
items := ac.namespaceStore.List()
for i := range items {
namespace := items[i].(*kapi.Namespace)
namespaceSet.Insert(namespace.Name)
reviewRequest := &reviewRequest{
namespace: namespace.Name,
namespaceResourceVersion: namespace.ResourceVersion,
}
if err := ac.syncHandler(reviewRequest, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore); err != nil {
utilruntime.HandleError(fmt.Errorf("error synchronizing: %v", err))
}
}
return namespaceSet
}
// synchronizePolicies synchronizes access over each policy
func (ac *AuthorizationCache) synchronizePolicies(userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) {
policyList, err := ac.policyNamespacer.Policies(kapi.NamespaceAll).List(kapi.ListOptions{})
if err != nil {
utilruntime.HandleError(err)
return
}
for _, policy := range policyList.Items {
reviewRequest := &reviewRequest{
namespace: policy.Namespace,
policyUIDToResourceVersion: map[types.UID]string{policy.UID: policy.ResourceVersion},
}
if err := ac.syncHandler(reviewRequest, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore); err != nil {
utilruntime.HandleError(fmt.Errorf("error synchronizing: %v", err))
}
}
}
// synchronizePolicyBindings synchronizes access over each policy binding
func (ac *AuthorizationCache) synchronizePolicyBindings(userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) {
policyBindingList, err := ac.policyBindingNamespacer.PolicyBindings(kapi.NamespaceAll).List(kapi.ListOptions{})
if err != nil {
utilruntime.HandleError(err)
return
}
for _, policyBinding := range policyBindingList.Items {
reviewRequest := &reviewRequest{
namespace: policyBinding.Namespace,
policyBindingUIDToResourceVersion: map[types.UID]string{policyBinding.UID: policyBinding.ResourceVersion},
}
if err := ac.syncHandler(reviewRequest, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore); err != nil {
utilruntime.HandleError(fmt.Errorf("error synchronizing: %v", err))
}
}
}
// purgeDeletedNamespaces will remove all namespaces enumerated in a reviewRecordStore that are not in the namespace set
func (ac *AuthorizationCache) purgeDeletedNamespaces(oldNamespaces, newNamespaces sets.String, userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) {
reviewRecordItems := reviewRecordStore.List()
for i := range reviewRecordItems {
reviewRecord := reviewRecordItems[i].(*reviewRecord)
if !newNamespaces.Has(reviewRecord.namespace) {
deleteNamespaceFromSubjects(userSubjectRecordStore, reviewRecord.users, reviewRecord.namespace)
deleteNamespaceFromSubjects(groupSubjectRecordStore, reviewRecord.groups, reviewRecord.namespace)
reviewRecordStore.Delete(reviewRecord)
}
}
for namespace := range oldNamespaces.Difference(newNamespaces) {
ac.notifyWatchers(namespace, nil, sets.String{}, sets.String{})
}
}
// invalidateCache returns true if there was a change in the cluster namespace that holds cluster policy and policy bindings
func (ac *AuthorizationCache) invalidateCache() bool {
invalidateCache := false
clusterPolicyList, err := ac.clusterPolicyLister.ClusterPolicies().List(kapi.ListOptions{})
if err != nil {
utilruntime.HandleError(err)
return invalidateCache
}
temporaryVersions := sets.NewString()
for _, clusterPolicy := range clusterPolicyList.Items {
temporaryVersions.Insert(clusterPolicy.ResourceVersion)
}
if (len(ac.clusterPolicyResourceVersions) != len(temporaryVersions)) || !ac.clusterPolicyResourceVersions.HasAll(temporaryVersions.List()...) {
invalidateCache = true
ac.clusterPolicyResourceVersions = temporaryVersions
}
clusterPolicyBindingList, err := ac.clusterPolicyBindingLister.ClusterPolicyBindings().List(kapi.ListOptions{})
if err != nil {
utilruntime.HandleError(err)
return invalidateCache
}
temporaryVersions.Delete(temporaryVersions.List()...)
for _, clusterPolicyBinding := range clusterPolicyBindingList.Items {
temporaryVersions.Insert(clusterPolicyBinding.ResourceVersion)
}
if (len(ac.clusterBindingResourceVersions) != len(temporaryVersions)) || !ac.clusterBindingResourceVersions.HasAll(temporaryVersions.List()...) {
invalidateCache = true
ac.clusterBindingResourceVersions = temporaryVersions
}
return invalidateCache
}
// synchronize runs a a full synchronization over the cache data. it must be run in a single-writer model, it's not thread-safe by design.
func (ac *AuthorizationCache) synchronize() {
// if none of our internal reflectors changed, then we can skip reviewing the cache
skip, currentState := ac.skip.SkipSynchronize(ac.lastState, ac.lastSyncResourceVersioner, ac.policyLastSyncResourceVersioner)
if skip {
return
}
// by default, we update our current caches and do an incremental change
userSubjectRecordStore := ac.userSubjectRecordStore
groupSubjectRecordStore := ac.groupSubjectRecordStore
reviewRecordStore := ac.reviewRecordStore
// if there was a global change that forced complete invalidation, we rebuild our cache and do a fast swap at end
invalidateCache := ac.invalidateCache()
if invalidateCache {
userSubjectRecordStore = cache.NewStore(subjectRecordKeyFn)
groupSubjectRecordStore = cache.NewStore(subjectRecordKeyFn)
reviewRecordStore = cache.NewStore(reviewRecordKeyFn)
}
// iterate over caches and synchronize our three caches
newKnownNamespaces := ac.synchronizeNamespaces(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
ac.synchronizePolicies(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
ac.synchronizePolicyBindings(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
ac.purgeDeletedNamespaces(ac.allKnownNamespaces, newKnownNamespaces, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
// if we did a full rebuild, now we swap the fully rebuilt cache
if invalidateCache {
ac.userSubjectRecordStore = userSubjectRecordStore
ac.groupSubjectRecordStore = groupSubjectRecordStore
ac.reviewRecordStore = reviewRecordStore
}
ac.allKnownNamespaces = newKnownNamespaces
// we were able to update our cache since this last observation period
ac.lastState = currentState
}
// syncRequest takes a reviewRequest and determines if it should update the caches supplied, it is not thread-safe
func (ac *AuthorizationCache) syncRequest(request *reviewRequest, userSubjectRecordStore cache.Store, groupSubjectRecordStore cache.Store, reviewRecordStore cache.Store) error {
lastKnownValue, err := lastKnown(reviewRecordStore, request.namespace)
if err != nil {
return err
}
if skipReview(request, lastKnownValue) {
return nil
}
namespace := request.namespace
review, err := ac.reviewer.Review(namespace)
if err != nil {
return err
}
usersToRemove := sets.NewString()
groupsToRemove := sets.NewString()
if lastKnownValue != nil {
usersToRemove.Insert(lastKnownValue.users...)
usersToRemove.Delete(review.Users()...)
groupsToRemove.Insert(lastKnownValue.groups...)
groupsToRemove.Delete(review.Groups()...)
}
deleteNamespaceFromSubjects(userSubjectRecordStore, usersToRemove.List(), namespace)
deleteNamespaceFromSubjects(groupSubjectRecordStore, groupsToRemove.List(), namespace)
addSubjectsToNamespace(userSubjectRecordStore, review.Users(), namespace)
addSubjectsToNamespace(groupSubjectRecordStore, review.Groups(), namespace)
cacheReviewRecord(request, lastKnownValue, review, reviewRecordStore)
ac.notifyWatchers(namespace, lastKnownValue, sets.NewString(review.Users()...), sets.NewString(review.Groups()...))
if errMsg := review.EvaluationError(); len(errMsg) > 0 {
return errors.New(errMsg)
}
return nil
}
// List returns the set of namespace names the user has access to view
func (ac *AuthorizationCache) List(userInfo user.Info) (*kapi.NamespaceList, error) {
keys := sets.String{}
user := userInfo.GetName()
groups := userInfo.GetGroups()
obj, exists, _ := ac.userSubjectRecordStore.GetByKey(user)
if exists {
subjectRecord := obj.(*subjectRecord)
keys.Insert(subjectRecord.namespaces.List()...)
}
for _, group := range groups {
obj, exists, _ := ac.groupSubjectRecordStore.GetByKey(group)
if exists {
subjectRecord := obj.(*subjectRecord)
keys.Insert(subjectRecord.namespaces.List()...)
}
}
allowedNamespaces, err := scope.ScopesToVisibleNamespaces(userInfo.GetExtra()[authorizationapi.ScopesKey], ac.clusterPolicyLister.ClusterPolicies())
if err != nil {
return nil, err
}
namespaceList := &kapi.NamespaceList{}
for key := range keys {
namespaceObj, exists, err := ac.namespaceStore.GetByKey(key)
if err != nil {
return nil, err
}
if exists {
namespace := *namespaceObj.(*kapi.Namespace)
if allowedNamespaces.Has("*") || allowedNamespaces.Has(namespace.Name) {
namespaceList.Items = append(namespaceList.Items, namespace)
}
}
}
return namespaceList, nil
}
func (ac *AuthorizationCache) ReadyForAccess() bool {
return len(ac.lastState) > 0
}
// skipReview returns true if the request was satisfied by the lastKnown
func skipReview(request *reviewRequest, lastKnownValue *reviewRecord) bool {
// if your request is nil, you have no reason to make a review
if request == nil {
return true
}
// if you know nothing from a prior review, you better make a request
if lastKnownValue == nil {
return false
}
// if you are asking about a specific namespace, and you think you knew about a different one, you better check again
if request.namespace != lastKnownValue.namespace {
return false
}
// if you are making your request relative to a specific resource version, only make it if its different
if len(request.namespaceResourceVersion) > 0 && request.namespaceResourceVersion != lastKnownValue.namespaceResourceVersion {
return false
}
// if you see a new policy binding, or a newer version, we need to do a review
for k, v := range request.policyBindingUIDToResourceVersion {
oldValue, exists := lastKnownValue.policyBindingUIDToResourceVersion[k]
if !exists || v != oldValue {
return false
}
}
// if you see a new policy, or a newer version, we need to do a review
for k, v := range request.policyUIDToResourceVersion {
oldValue, exists := lastKnownValue.policyUIDToResourceVersion[k]
if !exists || v != oldValue {
return false
}
}
return true
}
// deleteNamespaceFromSubjects removes the namespace from each subject
// if no other namespaces are active to that subject, it will also delete the subject from the cache entirely
func deleteNamespaceFromSubjects(subjectRecordStore cache.Store, subjects []string, namespace string) {
for _, subject := range subjects {
obj, exists, _ := subjectRecordStore.GetByKey(subject)
if exists {
subjectRecord := obj.(*subjectRecord)
delete(subjectRecord.namespaces, namespace)
if len(subjectRecord.namespaces) == 0 {
subjectRecordStore.Delete(subjectRecord)
}
}
}
}
// addSubjectsToNamespace adds the specified namespace to each subject
func addSubjectsToNamespace(subjectRecordStore cache.Store, subjects []string, namespace string) {
for _, subject := range subjects {
var item *subjectRecord
obj, exists, _ := subjectRecordStore.GetByKey(subject)
if exists {
item = obj.(*subjectRecord)
} else {
item = &subjectRecord{subject: subject, namespaces: sets.NewString()}
subjectRecordStore.Add(item)
}
item.namespaces.Insert(namespace)
}
}
func (ac *AuthorizationCache) notifyWatchers(namespace string, exists *reviewRecord, users, groups sets.String) {
ac.watcherLock.Lock()
defer ac.watcherLock.Unlock()
for _, watcher := range ac.watchers {
watcher.GroupMembershipChanged(namespace, users, groups)
}
}
// cacheReviewRecord updates the cache based on the request processed
func cacheReviewRecord(request *reviewRequest, lastKnownValue *reviewRecord, review Review, reviewRecordStore cache.Store) {
reviewRecord := &reviewRecord{
reviewRequest: &reviewRequest{namespace: request.namespace, policyUIDToResourceVersion: map[types.UID]string{}, policyBindingUIDToResourceVersion: map[types.UID]string{}},
groups: review.Groups(),
users: review.Users(),
}
// keep what we last believe we knew by default
if lastKnownValue != nil {
reviewRecord.namespaceResourceVersion = lastKnownValue.namespaceResourceVersion
for k, v := range lastKnownValue.policyUIDToResourceVersion {
reviewRecord.policyUIDToResourceVersion[k] = v
}
for k, v := range lastKnownValue.policyBindingUIDToResourceVersion {
reviewRecord.policyBindingUIDToResourceVersion[k] = v
}
}
// update the review record relative to what drove this request
if len(request.namespaceResourceVersion) > 0 {
reviewRecord.namespaceResourceVersion = request.namespaceResourceVersion
}
for k, v := range request.policyUIDToResourceVersion {
reviewRecord.policyUIDToResourceVersion[k] = v
}
for k, v := range request.policyBindingUIDToResourceVersion {
reviewRecord.policyBindingUIDToResourceVersion[k] = v
}
// update the cache record
reviewRecordStore.Add(reviewRecord)
}
func lastKnown(reviewRecordStore cache.Store, namespace string) (*reviewRecord, error) {
obj, exists, err := reviewRecordStore.GetByKey(namespace)
if err != nil {
return nil, err
}
if exists {
return obj.(*reviewRecord), nil
}
return nil, nil
}