Browse code

libcontainerd: use cancellable context for events

The event subscriber can only be cancelled by cancelling the context.
In the case where we have to restart event processing we are never
cancelling the old subscribiption.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>

Brian Goff authored on 2020/07/18 03:39:32
Showing 1 changed files
... ...
@@ -722,9 +722,12 @@ func (c *client) waitServe(ctx context.Context) bool {
722 722
 	// `IsServing` will actually block until the service is ready.
723 723
 	// However it can return early, so we'll loop with a delay to handle it.
724 724
 	for {
725
-		serving, _ := c.client.IsServing(ctx)
726
-		if ctx.Err() != nil {
727
-			return false
725
+		serving, err := c.client.IsServing(ctx)
726
+		if err != nil {
727
+			if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
728
+				return false
729
+			}
730
+			logrus.WithError(err).Warn("Error while testing if containerd API is ready")
728 731
 		}
729 732
 
730 733
 		if serving {
... ...
@@ -748,9 +751,16 @@ func (c *client) processEventStream(ctx context.Context, ns string) {
748 748
 		ei  libcontainerdtypes.EventInfo
749 749
 	)
750 750
 
751
+	// Create a new context specifically for this subscription.
752
+	// The context must be cancelled to cancel the subscription.
753
+	// In cases where we have to restart event stream processing,
754
+	//   we'll need the original context b/c this one will be cancelled
755
+	subCtx, cancel := context.WithCancel(ctx)
756
+	defer cancel()
757
+
751 758
 	// Filter on both namespace *and* topic. To create an "and" filter,
752 759
 	// this must be a single, comma-separated string
753
-	eventStream, errC := c.client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/|")
760
+	eventStream, errC := c.client.EventService().Subscribe(subCtx, "namespace=="+ns+",topic~=|^/tasks/|")
754 761
 
755 762
 	c.logger.Debug("processing event stream")
756 763