By default the router binds ports 80 and 443 even if no routing
configuration is available. This may be desirable when haproxy is
serving traffic directly to clients. However, the f5 loadbalancer will
treat bound ports as indicating that a backend is ready to receive
traffic. This means that router pods that are starting or restarting
may be put into rotation before they are ready to serve the current
route state and clients may see 503s for perfectly healthy backend
endpoints. To avoid this possibility, this change adds an
option (BIND_ONLY_WHEN_READY/--bind-only-when-ready) to the router that
ensures that ports are bound only when a router instance has route and
endpoint state available.
Reference: bz1383663
| ... | ... |
@@ -20094,6 +20094,8 @@ _openshift_infra_router() |
| 20094 | 20094 |
local_nonpersistent_flags+=("--allowed-domains=")
|
| 20095 | 20095 |
flags+=("--as=")
|
| 20096 | 20096 |
local_nonpersistent_flags+=("--as=")
|
| 20097 |
+ flags+=("--bind-ports-after-sync")
|
|
| 20098 |
+ local_nonpersistent_flags+=("--bind-ports-after-sync")
|
|
| 20097 | 20099 |
flags+=("--certificate-authority=")
|
| 20098 | 20100 |
flags_with_completion+=("--certificate-authority")
|
| 20099 | 20101 |
flags_completion+=("_filedir")
|
| ... | ... |
@@ -20255,6 +20255,8 @@ _openshift_infra_router() |
| 20255 | 20255 |
local_nonpersistent_flags+=("--allowed-domains=")
|
| 20256 | 20256 |
flags+=("--as=")
|
| 20257 | 20257 |
local_nonpersistent_flags+=("--as=")
|
| 20258 |
+ flags+=("--bind-ports-after-sync")
|
|
| 20259 |
+ local_nonpersistent_flags+=("--bind-ports-after-sync")
|
|
| 20258 | 20260 |
flags+=("--certificate-authority=")
|
| 20259 | 20261 |
flags_with_completion+=("--certificate-authority")
|
| 20260 | 20262 |
flags_completion+=("_filedir")
|
| ... | ... |
@@ -46,6 +46,10 @@ You may restrict the set of routes exposed to a single project (with \-\-namespa |
| 46 | 46 |
Username to impersonate for the operation |
| 47 | 47 |
|
| 48 | 48 |
.PP |
| 49 |
+\fB\-\-bind\-ports\-after\-sync\fP=false |
|
| 50 |
+ Bind ports only after route state has been synchronized |
|
| 51 |
+ |
|
| 52 |
+.PP |
|
| 49 | 53 |
\fB\-\-certificate\-authority\fP="" |
| 50 | 54 |
Path to a cert. file for the certificate authority |
| 51 | 55 |
|
| ... | ... |
@@ -94,6 +94,7 @@ listen stats :1936 |
| 94 | 94 |
stats auth {{.StatsUser}}:{{.StatsPassword}}
|
| 95 | 95 |
{{ end }}
|
| 96 | 96 |
|
| 97 |
+{{ if .BindPorts }}
|
|
| 97 | 98 |
frontend public |
| 98 | 99 |
bind :{{env "ROUTER_SERVICE_HTTP_PORT" "80"}}
|
| 99 | 100 |
mode http |
| ... | ... |
@@ -523,6 +524,9 @@ backend be_secure_{{$cfgIdx}}
|
| 523 | 523 |
{{ end }}{{/* end range over serviceUnitNames */}}
|
| 524 | 524 |
{{ end }}{{/* end tls==reencrypt */}}
|
| 525 | 525 |
{{ end }}{{/* end loop over routes */}}
|
| 526 |
+{{ else }}
|
|
| 527 |
+# Avoiding binding ports until routing configuration has been synchronized. |
|
| 528 |
+{{ end }}{{/* end bind ports after sync */}}
|
|
| 526 | 529 |
{{ end }}{{/* end haproxy config template */}}
|
| 527 | 530 |
|
| 528 | 531 |
{{/*--------------------------------- END OF HAPROXY CONFIG, BELOW ARE MAPPING FILES ------------------------*/}}
|
| ... | ... |
@@ -49,6 +49,10 @@ type EventQueue struct {
|
| 49 | 49 |
// item it refers to is explicitly deleted from the store or the |
| 50 | 50 |
// event is read via Pop(). |
| 51 | 51 |
lastReplaceKey string |
| 52 |
+ // Tracks whether the Replace() method has been called at least once. |
|
| 53 |
+ replaceCalled bool |
|
| 54 |
+ // Tracks the number of items queued by the last Replace() call. |
|
| 55 |
+ replaceCount int |
|
| 52 | 56 |
} |
| 53 | 57 |
|
| 54 | 58 |
// EventQueue implements kcache.Store |
| ... | ... |
@@ -322,6 +326,9 @@ func (eq *EventQueue) Replace(objects []interface{}, resourceVersion string) err
|
| 322 | 322 |
eq.lock.Lock() |
| 323 | 323 |
defer eq.lock.Unlock() |
| 324 | 324 |
|
| 325 |
+ eq.replaceCalled = true |
|
| 326 |
+ eq.replaceCount = len(objects) |
|
| 327 |
+ |
|
| 325 | 328 |
eq.events = map[string]watch.EventType{}
|
| 326 | 329 |
eq.queue = eq.queue[:0] |
| 327 | 330 |
|
| ... | ... |
@@ -346,6 +353,23 @@ func (eq *EventQueue) Replace(objects []interface{}, resourceVersion string) err
|
| 346 | 346 |
return nil |
| 347 | 347 |
} |
| 348 | 348 |
|
| 349 |
+// ListSuccessfulAtLeastOnce indicates whether a List operation was |
|
| 350 |
+// successfully completed regardless of whether any items were queued. |
|
| 351 |
+func (eq *EventQueue) ListSuccessfulAtLeastOnce() bool {
|
|
| 352 |
+ eq.lock.Lock() |
|
| 353 |
+ defer eq.lock.Unlock() |
|
| 354 |
+ |
|
| 355 |
+ return eq.replaceCalled |
|
| 356 |
+} |
|
| 357 |
+ |
|
| 358 |
+// ListCount returns how many objects were queued by the most recent List operation. |
|
| 359 |
+func (eq *EventQueue) ListCount() int {
|
|
| 360 |
+ eq.lock.Lock() |
|
| 361 |
+ defer eq.lock.Unlock() |
|
| 362 |
+ |
|
| 363 |
+ return eq.replaceCount |
|
| 364 |
+} |
|
| 365 |
+ |
|
| 349 | 366 |
// ListConsumed indicates whether the items queued by a List/Relist |
| 350 | 367 |
// operation have been consumed. |
| 351 | 368 |
func (eq *EventQueue) ListConsumed() bool {
|
| ... | ... |
@@ -62,6 +62,7 @@ type TemplateRouter struct {
|
| 62 | 62 |
DefaultCertificateDir string |
| 63 | 63 |
ExtendedValidation bool |
| 64 | 64 |
RouterService *ktypes.NamespacedName |
| 65 |
+ BindPortsAfterSync bool |
|
| 65 | 66 |
} |
| 66 | 67 |
|
| 67 | 68 |
// reloadInterval returns how often to run the router reloads. The interval |
| ... | ... |
@@ -86,6 +87,7 @@ func (o *TemplateRouter) Bind(flag *pflag.FlagSet) {
|
| 86 | 86 |
flag.StringVar(&o.ReloadScript, "reload", util.Env("RELOAD_SCRIPT", ""), "The path to the reload script to use")
|
| 87 | 87 |
flag.DurationVar(&o.ReloadInterval, "interval", reloadInterval(), "Controls how often router reloads are invoked. Mutiple router reload requests are coalesced for the duration of this interval since the last reload time.") |
| 88 | 88 |
flag.BoolVar(&o.ExtendedValidation, "extended-validation", util.Env("EXTENDED_VALIDATION", "true") == "true", "If set, then an additional extended validation step is performed on all routes admitted in by this router. Defaults to true and enables the extended validation checks.")
|
| 89 |
+ flag.BoolVar(&o.BindPortsAfterSync, "bind-ports-after-sync", util.Env("ROUTER_BIND_PORTS_AFTER_SYNC", "") == "true", "Bind ports only after route state has been synchronized")
|
|
| 89 | 90 |
} |
| 90 | 91 |
|
| 91 | 92 |
type RouterStats struct {
|
| ... | ... |
@@ -188,6 +190,7 @@ func (o *TemplateRouterOptions) Run() error {
|
| 188 | 188 |
StatsUsername: o.StatsUsername, |
| 189 | 189 |
StatsPassword: o.StatsPassword, |
| 190 | 190 |
PeerService: o.RouterService, |
| 191 |
+ BindPortsAfterSync: o.BindPortsAfterSync, |
|
| 191 | 192 |
IncludeUDP: o.RouterSelection.IncludeUDP, |
| 192 | 193 |
AllowWildcardRoutes: o.RouterSelection.AllowWildcardRoutes, |
| 193 | 194 |
} |
| ... | ... |
@@ -37,6 +37,11 @@ type RouterController struct {
|
| 37 | 37 |
endpointsListConsumed bool |
| 38 | 38 |
filteredByNamespace bool |
| 39 | 39 |
|
| 40 |
+ RoutesListSuccessfulAtLeastOnce func() bool |
|
| 41 |
+ EndpointsListSuccessfulAtLeastOnce func() bool |
|
| 42 |
+ RoutesListCount func() int |
|
| 43 |
+ EndpointsListCount func() int |
|
| 44 |
+ |
|
| 40 | 45 |
WatchNodes bool |
| 41 | 46 |
|
| 42 | 47 |
Namespaces NamespaceLister |
| ... | ... |
@@ -57,6 +62,51 @@ func (c *RouterController) Run() {
|
| 57 | 57 |
if c.WatchNodes {
|
| 58 | 58 |
go utilwait.Forever(c.HandleNode, 0) |
| 59 | 59 |
} |
| 60 |
+ go c.watchForFirstSync() |
|
| 61 |
+} |
|
| 62 |
+ |
|
| 63 |
+// handleFirstSync signals the router when it sees that the various |
|
| 64 |
+// watchers have successfully listed data from the api. |
|
| 65 |
+func (c *RouterController) handleFirstSync() bool {
|
|
| 66 |
+ c.lock.Lock() |
|
| 67 |
+ defer c.lock.Unlock() |
|
| 68 |
+ |
|
| 69 |
+ synced := c.RoutesListSuccessfulAtLeastOnce() && |
|
| 70 |
+ c.EndpointsListSuccessfulAtLeastOnce() && |
|
| 71 |
+ (c.Namespaces == nil || c.filteredByNamespace) |
|
| 72 |
+ if !synced {
|
|
| 73 |
+ return false |
|
| 74 |
+ } |
|
| 75 |
+ |
|
| 76 |
+ // If either of the event queues were empty after the initial |
|
| 77 |
+ // List, the tracking listConsumed variable's default value of |
|
| 78 |
+ // 'false' may prevent the router from reloading to indicate the |
|
| 79 |
+ // readiness status. Set the value to 'true' to ensure that a |
|
| 80 |
+ // reload will be performed if necessary. |
|
| 81 |
+ if c.RoutesListCount() == 0 {
|
|
| 82 |
+ c.routesListConsumed = true |
|
| 83 |
+ } |
|
| 84 |
+ if c.EndpointsListCount() == 0 {
|
|
| 85 |
+ c.endpointsListConsumed = true |
|
| 86 |
+ } |
|
| 87 |
+ c.updateLastSyncProcessed() |
|
| 88 |
+ |
|
| 89 |
+ err := c.Plugin.SetSyncedAtLeastOnce() |
|
| 90 |
+ if err == nil {
|
|
| 91 |
+ return true |
|
| 92 |
+ } |
|
| 93 |
+ utilruntime.HandleError(err) |
|
| 94 |
+ return false |
|
| 95 |
+} |
|
| 96 |
+ |
|
| 97 |
+// watchForFirstSync loops until the first sync has been handled. |
|
| 98 |
+func (c *RouterController) watchForFirstSync() {
|
|
| 99 |
+ for {
|
|
| 100 |
+ if c.handleFirstSync() {
|
|
| 101 |
+ return |
|
| 102 |
+ } |
|
| 103 |
+ time.Sleep(50 * time.Millisecond) |
|
| 104 |
+ } |
|
| 60 | 105 |
} |
| 61 | 106 |
|
| 62 | 107 |
func (c *RouterController) HandleNamespaces() {
|
| ... | ... |
@@ -12,6 +12,7 @@ import ( |
| 12 | 12 |
|
| 13 | 13 |
type fakeRouterPlugin struct {
|
| 14 | 14 |
lastSyncProcessed bool |
| 15 |
+ syncedAtLeastOnce bool |
|
| 15 | 16 |
} |
| 16 | 17 |
|
| 17 | 18 |
func (p *fakeRouterPlugin) HandleRoute(t watch.EventType, route *routeapi.Route) error {
|
| ... | ... |
@@ -26,11 +27,17 @@ func (p *fakeRouterPlugin) HandleEndpoints(watch.EventType, *kapi.Endpoints) err |
| 26 | 26 |
func (p *fakeRouterPlugin) HandleNamespaces(namespaces sets.String) error {
|
| 27 | 27 |
return nil |
| 28 | 28 |
} |
| 29 |
+ |
|
| 29 | 30 |
func (p *fakeRouterPlugin) SetLastSyncProcessed(processed bool) error {
|
| 30 | 31 |
p.lastSyncProcessed = processed |
| 31 | 32 |
return nil |
| 32 | 33 |
} |
| 33 | 34 |
|
| 35 |
+func (p *fakeRouterPlugin) SetSyncedAtLeastOnce() error {
|
|
| 36 |
+ p.syncedAtLeastOnce = true |
|
| 37 |
+ return nil |
|
| 38 |
+} |
|
| 39 |
+ |
|
| 34 | 40 |
type fakeNamespaceLister struct {
|
| 35 | 41 |
} |
| 36 | 42 |
|
| ... | ... |
@@ -82,3 +82,7 @@ func (p *ExtendedValidator) HandleNamespaces(namespaces sets.String) error {
|
| 82 | 82 |
func (p *ExtendedValidator) SetLastSyncProcessed(processed bool) error {
|
| 83 | 83 |
return p.plugin.SetLastSyncProcessed(processed) |
| 84 | 84 |
} |
| 85 |
+ |
|
| 86 |
+func (p *ExtendedValidator) SetSyncedAtLeastOnce() error {
|
|
| 87 |
+ return p.plugin.SetSyncedAtLeastOnce() |
|
| 88 |
+} |
| ... | ... |
@@ -99,6 +99,18 @@ func (factory *RouterControllerFactory) Create(plugin router.Plugin, watchNodes |
| 99 | 99 |
} |
| 100 | 100 |
return eventType, obj.(*kapi.Node), nil |
| 101 | 101 |
}, |
| 102 |
+ EndpointsListCount: func() int {
|
|
| 103 |
+ return endpointsEventQueue.ListCount() |
|
| 104 |
+ }, |
|
| 105 |
+ RoutesListCount: func() int {
|
|
| 106 |
+ return routeEventQueue.ListCount() |
|
| 107 |
+ }, |
|
| 108 |
+ EndpointsListSuccessfulAtLeastOnce: func() bool {
|
|
| 109 |
+ return endpointsEventQueue.ListSuccessfulAtLeastOnce() |
|
| 110 |
+ }, |
|
| 111 |
+ RoutesListSuccessfulAtLeastOnce: func() bool {
|
|
| 112 |
+ return routeEventQueue.ListSuccessfulAtLeastOnce() |
|
| 113 |
+ }, |
|
| 102 | 114 |
EndpointsListConsumed: func() bool {
|
| 103 | 115 |
return endpointsEventQueue.ListConsumed() |
| 104 | 116 |
}, |
| ... | ... |
@@ -152,6 +152,10 @@ func (p *HostAdmitter) SetLastSyncProcessed(processed bool) error {
|
| 152 | 152 |
return p.plugin.SetLastSyncProcessed(processed) |
| 153 | 153 |
} |
| 154 | 154 |
|
| 155 |
+func (p *HostAdmitter) SetSyncedAtLeastOnce() error {
|
|
| 156 |
+ return p.plugin.SetSyncedAtLeastOnce() |
|
| 157 |
+} |
|
| 158 |
+ |
|
| 155 | 159 |
// addRoute admits routes based on subdomain ownership - returns errors if the route is not admitted. |
| 156 | 160 |
func (p *HostAdmitter) addRoute(route *routeapi.Route) error {
|
| 157 | 161 |
// Find displaced routes (or error if an existing route displaces us) |
| ... | ... |
@@ -315,3 +315,7 @@ func (a *StatusAdmitter) HandleNamespaces(namespaces sets.String) error {
|
| 315 | 315 |
func (a *StatusAdmitter) SetLastSyncProcessed(processed bool) error {
|
| 316 | 316 |
return a.plugin.SetLastSyncProcessed(processed) |
| 317 | 317 |
} |
| 318 |
+ |
|
| 319 |
+func (a *StatusAdmitter) SetSyncedAtLeastOnce() error {
|
|
| 320 |
+ return a.plugin.SetSyncedAtLeastOnce() |
|
| 321 |
+} |
| ... | ... |
@@ -43,6 +43,10 @@ func (p *fakePlugin) SetLastSyncProcessed(processed bool) error {
|
| 43 | 43 |
return fmt.Errorf("not expected")
|
| 44 | 44 |
} |
| 45 | 45 |
|
| 46 |
+func (p *fakePlugin) SetSyncedAtLeastOnce() error {
|
|
| 47 |
+ return fmt.Errorf("not expected")
|
|
| 48 |
+} |
|
| 49 |
+ |
|
| 46 | 50 |
func TestStatusNoOp(t *testing.T) {
|
| 47 | 51 |
now := nowFn() |
| 48 | 52 |
touched := unversioned.Time{Time: now.Add(-time.Minute)}
|
| ... | ... |
@@ -259,6 +259,10 @@ func (p *UniqueHost) SetLastSyncProcessed(processed bool) error {
|
| 259 | 259 |
return p.plugin.SetLastSyncProcessed(processed) |
| 260 | 260 |
} |
| 261 | 261 |
|
| 262 |
+func (p *UniqueHost) SetSyncedAtLeastOnce() error {
|
|
| 263 |
+ return p.plugin.SetSyncedAtLeastOnce() |
|
| 264 |
+} |
|
| 265 |
+ |
|
| 262 | 266 |
// routeKeys returns the internal router key to use for the given Route. |
| 263 | 267 |
func routeKeys(route *routeapi.Route) []string {
|
| 264 | 268 |
keys := make([]string, 1+len(route.Spec.AlternateBackends)) |
| ... | ... |
@@ -618,3 +618,8 @@ func (p *F5Plugin) HandleRoute(eventType watch.EventType, |
| 618 | 618 |
func (p *F5Plugin) SetLastSyncProcessed(processed bool) error {
|
| 619 | 619 |
return nil |
| 620 | 620 |
} |
| 621 |
+ |
|
| 622 |
+// No-op since f5 has its own concept of what 'ready' means |
|
| 623 |
+func (p *F5Plugin) SetSyncedAtLeastOnce() error {
|
|
| 624 |
+ return nil |
|
| 625 |
+} |
| ... | ... |
@@ -50,6 +50,7 @@ type TemplatePluginConfig struct {
|
| 50 | 50 |
IncludeUDP bool |
| 51 | 51 |
AllowWildcardRoutes bool |
| 52 | 52 |
PeerService *ktypes.NamespacedName |
| 53 |
+ BindPortsAfterSync bool |
|
| 53 | 54 |
} |
| 54 | 55 |
|
| 55 | 56 |
// routerInterface controls the interaction of the plugin with the underlying router implementation |
| ... | ... |
@@ -89,6 +90,9 @@ type routerInterface interface {
|
| 89 | 89 |
|
| 90 | 90 |
// SetSkipCommit indicates to the router whether commits should be skipped |
| 91 | 91 |
SetSkipCommit(skipCommit bool) |
| 92 |
+ |
|
| 93 |
+ // SetSyncedAtLeastOnce indicates to the router that state has been read from the api at least once |
|
| 94 |
+ SetSyncedAtLeastOnce() |
|
| 92 | 95 |
} |
| 93 | 96 |
|
| 94 | 97 |
func env(name, defaultValue string) string {
|
| ... | ... |
@@ -143,6 +147,7 @@ func NewTemplatePlugin(cfg TemplatePluginConfig, lookupSvc ServiceLookup) (*Temp |
| 143 | 143 |
statsPort: cfg.StatsPort, |
| 144 | 144 |
allowWildcardRoutes: cfg.AllowWildcardRoutes, |
| 145 | 145 |
peerEndpointsKey: peerKey, |
| 146 |
+ bindPortsAfterSync: cfg.BindPortsAfterSync, |
|
| 146 | 147 |
} |
| 147 | 148 |
router, err := newTemplateRouter(templateRouterCfg) |
| 148 | 149 |
return newDefaultTemplatePlugin(router, cfg.IncludeUDP, lookupSvc), err |
| ... | ... |
@@ -239,6 +244,12 @@ func (p *TemplatePlugin) SetLastSyncProcessed(processed bool) error {
|
| 239 | 239 |
return nil |
| 240 | 240 |
} |
| 241 | 241 |
|
| 242 |
+func (p *TemplatePlugin) SetSyncedAtLeastOnce() error {
|
|
| 243 |
+ p.Router.SetSyncedAtLeastOnce() |
|
| 244 |
+ p.Router.Commit() |
|
| 245 |
+ return nil |
|
| 246 |
+} |
|
| 247 |
+ |
|
| 242 | 248 |
// routeKeys returns the internal router keys to use for the given Route. |
| 243 | 249 |
// A route can have several services that it can point to, now |
| 244 | 250 |
func routeKeys(route *routeapi.Route) ([]string, []int32) {
|
| ... | ... |
@@ -86,6 +86,10 @@ type templateRouter struct {
|
| 86 | 86 |
lock sync.Mutex |
| 87 | 87 |
// the router should only reload when the value is false |
| 88 | 88 |
skipCommit bool |
| 89 |
+ // If true, haproxy should only bind ports when it has route and endpoint state |
|
| 90 |
+ bindPortsAfterSync bool |
|
| 91 |
+ // whether the router state has been read from the api at least once |
|
| 92 |
+ syncedAtLeastOnce bool |
|
| 89 | 93 |
} |
| 90 | 94 |
|
| 91 | 95 |
// templateRouterCfg holds all configuration items required to initialize the template router |
| ... | ... |
@@ -103,6 +107,7 @@ type templateRouterCfg struct {
|
| 103 | 103 |
allowWildcardRoutes bool |
| 104 | 104 |
peerEndpointsKey string |
| 105 | 105 |
includeUDP bool |
| 106 |
+ bindPortsAfterSync bool |
|
| 106 | 107 |
} |
| 107 | 108 |
|
| 108 | 109 |
// templateConfig is a subset of the templateRouter information that should be passed to the template for generating |
| ... | ... |
@@ -124,6 +129,8 @@ type templateData struct {
|
| 124 | 124 |
StatsPassword string |
| 125 | 125 |
//port to expose stats with (if the template supports it) |
| 126 | 126 |
StatsPort int |
| 127 |
+ // whether the router should bind the default ports |
|
| 128 |
+ BindPorts bool |
|
| 127 | 129 |
} |
| 128 | 130 |
|
| 129 | 131 |
func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) {
|
| ... | ... |
@@ -162,6 +169,7 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) {
|
| 162 | 162 |
allowWildcardRoutes: cfg.allowWildcardRoutes, |
| 163 | 163 |
peerEndpointsKey: cfg.peerEndpointsKey, |
| 164 | 164 |
peerEndpoints: []Endpoint{},
|
| 165 |
+ bindPortsAfterSync: cfg.bindPortsAfterSync, |
|
| 165 | 166 |
|
| 166 | 167 |
rateLimitedCommitFunction: nil, |
| 167 | 168 |
rateLimitedCommitStopChannel: make(chan struct{}),
|
| ... | ... |
@@ -394,6 +402,7 @@ func (r *templateRouter) writeConfig() error {
|
| 394 | 394 |
StatsUser: r.statsUser, |
| 395 | 395 |
StatsPassword: r.statsPassword, |
| 396 | 396 |
StatsPort: r.statsPort, |
| 397 |
+ BindPorts: !r.bindPortsAfterSync || r.syncedAtLeastOnce, |
|
| 397 | 398 |
} |
| 398 | 399 |
if err := template.Execute(file, data); err != nil {
|
| 399 | 400 |
file.Close() |
| ... | ... |
@@ -729,6 +738,13 @@ func (r *templateRouter) SetSkipCommit(skipCommit bool) {
|
| 729 | 729 |
} |
| 730 | 730 |
} |
| 731 | 731 |
|
| 732 |
+// SetSyncedAtLeastOnce indicates to the router that state has been |
|
| 733 |
+// read from the api. |
|
| 734 |
+func (r *templateRouter) SetSyncedAtLeastOnce() {
|
|
| 735 |
+ r.syncedAtLeastOnce = true |
|
| 736 |
+ glog.V(4).Infof("Router state synchronized for the first time")
|
|
| 737 |
+} |
|
| 738 |
+ |
|
| 732 | 739 |
// HasServiceUnit attempts to retrieve a service unit for the given |
| 733 | 740 |
// key, returning a boolean indication of whether the key is known. |
| 734 | 741 |
func (r *templateRouter) HasServiceUnit(key string) bool {
|
| ... | ... |
@@ -257,6 +257,10 @@ func (p *DelayPlugin) SetLastSyncProcessed(processed bool) error {
|
| 257 | 257 |
return p.plugin.SetLastSyncProcessed(processed) |
| 258 | 258 |
} |
| 259 | 259 |
|
| 260 |
+func (p *DelayPlugin) SetSyncedAtLeastOnce() error {
|
|
| 261 |
+ return p.plugin.SetSyncedAtLeastOnce() |
|
| 262 |
+} |
|
| 263 |
+ |
|
| 260 | 264 |
// launchRouter launches a template router that communicates with the |
| 261 | 265 |
// api via the provided clients. |
| 262 | 266 |
func launchRouter(oc osclient.Interface, kc kclient.Interface, maxDelay int32, name string, reloadInterval int, reloadCounts map[string]int) (templatePlugin *templateplugin.TemplatePlugin) {
|
| ... | ... |
@@ -1256,6 +1256,10 @@ u3YLAbyW/lHhOCiZu2iAI8AbmXem9lW6Tr7p/97s0w== |
| 1256 | 1256 |
// createAndStartRouterContainer is responsible for deploying the router image in docker. It assumes that all router images |
| 1257 | 1257 |
// will use a command line flag that can take --master which points to the master url |
| 1258 | 1258 |
func createAndStartRouterContainer(dockerCli *dockerClient.Client, masterIp string, routerStatsPort int, reloadInterval int) (containerId string, err error) {
|
| 1259 |
+ return createAndStartRouterContainerBindAfterSync(dockerCli, masterIp, routerStatsPort, reloadInterval, false) |
|
| 1260 |
+} |
|
| 1261 |
+ |
|
| 1262 |
+func createAndStartRouterContainerBindAfterSync(dockerCli *dockerClient.Client, masterIp string, routerStatsPort int, reloadInterval int, bindPortsAfterSync bool) (containerId string, err error) {
|
|
| 1259 | 1263 |
ports := []string{"80", "443"}
|
| 1260 | 1264 |
if routerStatsPort > 0 {
|
| 1261 | 1265 |
ports = append(ports, fmt.Sprintf("%d", routerStatsPort))
|
| ... | ... |
@@ -1291,6 +1295,7 @@ func createAndStartRouterContainer(dockerCli *dockerClient.Client, masterIp stri |
| 1291 | 1291 |
fmt.Sprintf("STATS_USERNAME=%s", statsUser),
|
| 1292 | 1292 |
fmt.Sprintf("STATS_PASSWORD=%s", statsPassword),
|
| 1293 | 1293 |
fmt.Sprintf("DEFAULT_CERTIFICATE=%s", defaultCert),
|
| 1294 |
+ fmt.Sprintf("ROUTER_BIND_PORTS_AFTER_SYNC=%s", strconv.FormatBool(bindPortsAfterSync)),
|
|
| 1294 | 1295 |
} |
| 1295 | 1296 |
|
| 1296 | 1297 |
reloadIntVar := fmt.Sprintf("RELOAD_INTERVAL=%ds", reloadInterval)
|
| ... | ... |
@@ -1582,8 +1587,80 @@ func TestRouterReloadCoalesce(t *testing.T) {
|
| 1582 | 1582 |
|
| 1583 | 1583 |
// waitForRouterToBecomeAvailable checks for the router start up and waits |
| 1584 | 1584 |
// till it becomes available. |
| 1585 |
-func waitForRouterToBecomeAvailable(host string, port int) {
|
|
| 1585 |
+func waitForRouterToBecomeAvailable(host string, port int) error {
|
|
| 1586 | 1586 |
hostAndPort := fmt.Sprintf("%s:%d", host, port)
|
| 1587 | 1587 |
uri := fmt.Sprintf("%s/healthz", hostAndPort)
|
| 1588 |
- waitForRoute(uri, hostAndPort, "http", nil, "") |
|
| 1588 |
+ return waitForRoute(uri, hostAndPort, "http", nil, "") |
|
| 1589 |
+} |
|
| 1590 |
+ |
|
| 1591 |
+// Ensure that when configured with ROUTER_BIND_PORTS_AFTER_SYNC=true, |
|
| 1592 |
+// haproxy binds ports only when an initial sync has been performed. |
|
| 1593 |
+func TestRouterBindsPortsAfterSync(t *testing.T) {
|
|
| 1594 |
+ // Create a new master but do not start it yet to simulate a router without api access. |
|
| 1595 |
+ fakeMasterAndPod := tr.NewTestHttpService() |
|
| 1596 |
+ |
|
| 1597 |
+ dockerCli, err := testutil.NewDockerClient() |
|
| 1598 |
+ if err != nil {
|
|
| 1599 |
+ t.Fatalf("Unable to get docker client: %v", err)
|
|
| 1600 |
+ } |
|
| 1601 |
+ |
|
| 1602 |
+ bindPortsAfterSync := true |
|
| 1603 |
+ reloadInterval := 1 |
|
| 1604 |
+ routerId, err := createAndStartRouterContainerBindAfterSync(dockerCli, fakeMasterAndPod.MasterHttpAddr, statsPort, reloadInterval, bindPortsAfterSync) |
|
| 1605 |
+ if err != nil {
|
|
| 1606 |
+ t.Fatalf("Error starting container %s : %v", getRouterImage(), err)
|
|
| 1607 |
+ } |
|
| 1608 |
+ defer cleanUp(t, dockerCli, routerId) |
|
| 1609 |
+ |
|
| 1610 |
+ routerIP := "127.0.0.1" |
|
| 1611 |
+ |
|
| 1612 |
+ if err = waitForRouterToBecomeAvailable(routerIP, statsPort); err != nil {
|
|
| 1613 |
+ t.Fatalf("Unexpected error while waiting for the router to become available: %v", err)
|
|
| 1614 |
+ } |
|
| 1615 |
+ |
|
| 1616 |
+ routeAddress := getRouteAddress() |
|
| 1617 |
+ |
|
| 1618 |
+ // Validate that the default ports are not yet bound |
|
| 1619 |
+ schemes := []string{"http", "https"}
|
|
| 1620 |
+ for _, scheme := range schemes {
|
|
| 1621 |
+ _, err = getRoute(routeAddress, routeAddress, scheme, nil, "") |
|
| 1622 |
+ if err == nil {
|
|
| 1623 |
+ t.Fatalf("Router is unexpectedly accepting connections via %v", scheme)
|
|
| 1624 |
+ } else if !strings.HasSuffix(fmt.Sprintf("%v", err), "connection refused") {
|
|
| 1625 |
+ t.Fatalf("Unexpected error when dispatching %v request: %v", scheme, err)
|
|
| 1626 |
+ } |
|
| 1627 |
+ } |
|
| 1628 |
+ |
|
| 1629 |
+ // Start the master |
|
| 1630 |
+ defer fakeMasterAndPod.Stop() |
|
| 1631 |
+ err = fakeMasterAndPod.Start() |
|
| 1632 |
+ validateServer(fakeMasterAndPod, t) |
|
| 1633 |
+ if err != nil {
|
|
| 1634 |
+ t.Fatalf("Unable to start http server: %v", err)
|
|
| 1635 |
+ } |
|
| 1636 |
+ |
|
| 1637 |
+ // Validate that the default ports are now bound |
|
| 1638 |
+ var lastErr error |
|
| 1639 |
+ for _, scheme := range schemes {
|
|
| 1640 |
+ err := wait.Poll(time.Millisecond*100, time.Duration(reloadInterval)*2*time.Second, func() (bool, error) {
|
|
| 1641 |
+ _, err := getRoute(routeAddress, routeAddress, scheme, nil, "") |
|
| 1642 |
+ lastErr = nil |
|
| 1643 |
+ switch err {
|
|
| 1644 |
+ case ErrUnavailable: |
|
| 1645 |
+ return true, nil |
|
| 1646 |
+ case nil: |
|
| 1647 |
+ return false, fmt.Errorf("Router is unexpectedly accepting connections via %v", scheme)
|
|
| 1648 |
+ default: |
|
| 1649 |
+ lastErr = fmt.Errorf("Unexpected error when dispatching %v request: %v", scheme, err)
|
|
| 1650 |
+ return false, nil |
|
| 1651 |
+ } |
|
| 1652 |
+ |
|
| 1653 |
+ }) |
|
| 1654 |
+ if err == wait.ErrWaitTimeout && lastErr != nil {
|
|
| 1655 |
+ err = lastErr |
|
| 1656 |
+ } |
|
| 1657 |
+ if err != nil {
|
|
| 1658 |
+ t.Fatalf(err.Error()) |
|
| 1659 |
+ } |
|
| 1660 |
+ } |
|
| 1589 | 1661 |
} |