package servingcert
import (
"errors"
"fmt"
"strconv"
"time"
"github.com/golang/glog"
kapi "k8s.io/kubernetes/pkg/api"
kapierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
"github.com/openshift/origin/pkg/cmd/server/crypto"
)
const (
// ServingCertSecretAnnotation stores the name of the secret to generate into.
ServingCertSecretAnnotation = "service.alpha.openshift.io/serving-cert-secret-name"
// ServingCertCreatedByAnnotation stores the of the signer common name. This could be used later to see if the
// services need to have the the serving certs regenerated. The presence and matching of this annotation prevents
// regeneration
ServingCertCreatedByAnnotation = "service.alpha.openshift.io/serving-cert-signed-by"
// ServingCertErrorAnnotation stores the error that caused cert generation failures.
ServingCertErrorAnnotation = "service.alpha.openshift.io/serving-cert-generation-error"
// ServingCertErrorNumAnnotation stores how many consecutive errors we've hit. A value of the maxRetries will prevent
// the controller from reattempting until it is cleared.
ServingCertErrorNumAnnotation = "service.alpha.openshift.io/serving-cert-generation-error-num"
// ServiceUIDAnnotation is an annotation on a secret that indicates which service created it, by UID
ServiceUIDAnnotation = "service.alpha.openshift.io/originating-service-uid"
// ServiceNameAnnotation is an annotation on a secret that indicates which service created it, by Name to allow reverse lookups on services
// for comparison against UIDs
ServiceNameAnnotation = "service.alpha.openshift.io/originating-service-name"
// ServingCertExpiryAnnotation is an annotation that holds the expiry time of the certificate. It accepts time in the
// RFC3339 format: 2018-11-29T17:44:39Z
ServingCertExpiryAnnotation = "service.alpha.openshift.io/expiry"
)
// ServiceServingCertController is responsible for synchronizing Service objects stored
// in the system with actual running replica sets and pods.
type ServiceServingCertController struct {
serviceClient kcoreclient.ServicesGetter
secretClient kcoreclient.SecretsGetter
// Services that need to be checked
queue workqueue.RateLimitingInterface
maxRetries int
serviceCache cache.Store
serviceController *framework.Controller
ca *crypto.CA
publicCert string
dnsSuffix string
// syncHandler does the work. It's factored out for unit testing
syncHandler func(serviceKey string) error
}
// NewServiceServingCertController creates a new ServiceServingCertController.
// TODO this should accept a shared informer
func NewServiceServingCertController(serviceClient kcoreclient.ServicesGetter, secretClient kcoreclient.SecretsGetter, ca *crypto.CA, dnsSuffix string, resyncInterval time.Duration) *ServiceServingCertController {
sc := &ServiceServingCertController{
serviceClient: serviceClient,
secretClient: secretClient,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
maxRetries: 10,
ca: ca,
dnsSuffix: dnsSuffix,
}
sc.serviceCache, sc.serviceController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
return sc.serviceClient.Services(kapi.NamespaceAll).List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
return sc.serviceClient.Services(kapi.NamespaceAll).Watch(options)
},
},
&kapi.Service{},
resyncInterval,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
service := obj.(*kapi.Service)
glog.V(4).Infof("Adding service %s", service.Name)
sc.enqueueService(obj)
},
UpdateFunc: func(old, cur interface{}) {
service := cur.(*kapi.Service)
glog.V(4).Infof("Updating service %s", service.Name)
// Resync on service object relist.
sc.enqueueService(cur)
},
},
)
sc.syncHandler = sc.syncService
return sc
}
// Run begins watching and syncing.
func (sc *ServiceServingCertController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go sc.serviceController.Run(stopCh)
for i := 0; i < workers; i++ {
go wait.Until(sc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down service signing cert controller")
sc.queue.ShutDown()
}
func (sc *ServiceServingCertController) enqueueService(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
sc.queue.Add(key)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (sc *ServiceServingCertController) worker() {
for {
if !sc.work() {
return
}
}
}
// work returns true if the worker thread should continue
func (sc *ServiceServingCertController) work() bool {
key, quit := sc.queue.Get()
if quit {
return false
}
defer sc.queue.Done(key)
if err := sc.syncHandler(key.(string)); err == nil {
// this means the request was successfully handled. We should "forget" the item so that any retry
// later on is reset
sc.queue.Forget(key)
} else {
// if we had an error it means that we didn't handle it, which means that we want to requeue the work
utilruntime.HandleError(fmt.Errorf("error syncing service, it will be retried: %v", err))
sc.queue.AddRateLimited(key)
}
return true
}
// syncService will sync the service with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (sc *ServiceServingCertController) syncService(key string) error {
obj, exists, err := sc.serviceCache.GetByKey(key)
if err != nil {
glog.V(4).Infof("Unable to retrieve service %v from store: %v", key, err)
return err
}
if !exists {
glog.V(4).Infof("Service has been deleted %v", key)
return nil
}
if !sc.requiresCertGeneration(obj.(*kapi.Service)) {
return nil
}
// make a copy to avoid mutating cache state
t, err := kapi.Scheme.DeepCopy(obj)
if err != nil {
return err
}
service := t.(*kapi.Service)
if service.Annotations == nil {
service.Annotations = map[string]string{}
}
dnsName := service.Name + "." + service.Namespace + ".svc"
fqDNSName := dnsName + "." + sc.dnsSuffix
certificateLifetime := 365 * 2 // 2 years
servingCert, err := sc.ca.MakeServerCert(sets.NewString(dnsName, fqDNSName), certificateLifetime)
if err != nil {
return err
}
certBytes, keyBytes, err := servingCert.GetPEMBytes()
if err != nil {
return err
}
secret := &kapi.Secret{
ObjectMeta: kapi.ObjectMeta{
Namespace: service.Namespace,
Name: service.Annotations[ServingCertSecretAnnotation],
Annotations: map[string]string{
ServiceUIDAnnotation: string(service.UID),
ServiceNameAnnotation: service.Name,
ServingCertExpiryAnnotation: servingCert.Certs[0].NotAfter.Format(time.RFC3339),
},
},
Type: kapi.SecretTypeTLS,
Data: map[string][]byte{
kapi.TLSCertKey: certBytes,
kapi.TLSPrivateKeyKey: keyBytes,
},
}
_, err = sc.secretClient.Secrets(service.Namespace).Create(secret)
if err != nil && !kapierrors.IsAlreadyExists(err) {
// if we have an error creating the secret, then try to update the service with that information. If it fails,
// then we'll just try again later on re-list or because the service had already been updated and we'll get triggered again.
service.Annotations[ServingCertErrorAnnotation] = err.Error()
service.Annotations[ServingCertErrorNumAnnotation] = strconv.Itoa(getNumFailures(service) + 1)
_, updateErr := sc.serviceClient.Services(service.Namespace).Update(service)
// if we're past the max retries and we successfully updated, then the sync loop successfully handled this service and we want to forget it
if updateErr == nil && getNumFailures(service) >= sc.maxRetries {
return nil
}
return err
}
if kapierrors.IsAlreadyExists(err) {
actualSecret, err := sc.secretClient.Secrets(service.Namespace).Get(secret.Name)
if err != nil {
// if we have an error creating the secret, then try to update the service with that information. If it fails,
// then we'll just try again later on re-list or because the service had already been updated and we'll get triggered again.
service.Annotations[ServingCertErrorAnnotation] = err.Error()
service.Annotations[ServingCertErrorNumAnnotation] = strconv.Itoa(getNumFailures(service) + 1)
_, updateErr := sc.serviceClient.Services(service.Namespace).Update(service)
// if we're past the max retries and we successfully updated, then the sync loop successfully handled this service and we want to forget it
if updateErr == nil && getNumFailures(service) >= sc.maxRetries {
return nil
}
return err
}
if actualSecret.Annotations[ServiceUIDAnnotation] != string(service.UID) {
service.Annotations[ServingCertErrorAnnotation] = fmt.Sprintf("secret/%v references serviceUID %v, which does not match %v", actualSecret.Name, actualSecret.Annotations[ServiceUIDAnnotation], service.UID)
service.Annotations[ServingCertErrorNumAnnotation] = strconv.Itoa(getNumFailures(service) + 1)
_, updateErr := sc.serviceClient.Services(service.Namespace).Update(service)
// if we're past the max retries and we successfully updated, then the sync loop successfully handled this service and we want to forget it
if updateErr == nil && getNumFailures(service) >= sc.maxRetries {
return nil
}
return errors.New(service.Annotations[ServingCertErrorAnnotation])
}
}
service.Annotations[ServingCertCreatedByAnnotation] = sc.ca.Config.Certs[0].Subject.CommonName
delete(service.Annotations, ServingCertErrorAnnotation)
delete(service.Annotations, ServingCertErrorNumAnnotation)
_, err = sc.serviceClient.Services(service.Namespace).Update(service)
return err
}
func getNumFailures(service *kapi.Service) int {
numFailuresString := service.Annotations[ServingCertErrorNumAnnotation]
if len(numFailuresString) == 0 {
return 0
}
numFailures, err := strconv.Atoi(numFailuresString)
if err != nil {
return 0
}
return numFailures
}
func (sc *ServiceServingCertController) requiresCertGeneration(service *kapi.Service) bool {
if secretName := service.Annotations[ServingCertSecretAnnotation]; len(secretName) == 0 {
return false
}
if getNumFailures(service) >= sc.maxRetries {
return false
}
if service.Annotations[ServingCertCreatedByAnnotation] == sc.ca.Config.Certs[0].Subject.CommonName {
return false
}
return true
}