package exchange import ( "context" "strings" "time" v1 "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/filters" "github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" "github.com/containerd/typeurl" goevents "github.com/docker/go-events" "github.com/gogo/protobuf/types" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) // Exchange broadcasts events type Exchange struct { broadcaster *goevents.Broadcaster } // NewExchange returns a new event Exchange func NewExchange() *Exchange { return &Exchange{ broadcaster: goevents.NewBroadcaster(), } } // Forward accepts an envelope to be direcly distributed on the exchange. // // This is useful when an event is forwaded on behalf of another namespace or // when the event is propagated on behalf of another publisher. func (e *Exchange) Forward(ctx context.Context, envelope *v1.Envelope) (err error) { if err := validateEnvelope(envelope); err != nil { return err } defer func() { logger := log.G(ctx).WithFields(logrus.Fields{ "topic": envelope.Topic, "ns": envelope.Namespace, "type": envelope.Event.TypeUrl, }) if err != nil { logger.WithError(err).Error("error forwarding event") } else { logger.Debug("event forwarded") } }() return e.broadcaster.Write(envelope) } // Publish packages and sends an event. The caller will be considered the // initial publisher of the event. This means the timestamp will be calculated // at this point and this method may read from the calling context. func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event) (err error) { var ( namespace string encoded *types.Any envelope v1.Envelope ) namespace, err = namespaces.NamespaceRequired(ctx) if err != nil { return errors.Wrapf(err, "failed publishing event") } if err := validateTopic(topic); err != nil { return errors.Wrapf(err, "envelope topic %q", topic) } encoded, err = typeurl.MarshalAny(event) if err != nil { return err } envelope.Timestamp = time.Now().UTC() envelope.Namespace = namespace envelope.Topic = topic envelope.Event = encoded defer func() { logger := log.G(ctx).WithFields(logrus.Fields{ "topic": envelope.Topic, "ns": envelope.Namespace, "type": envelope.Event.TypeUrl, }) if err != nil { logger.WithError(err).Error("error publishing event") } else { logger.Debug("event published") } }() return e.broadcaster.Write(&envelope) } // Subscribe to events on the exchange. Events are sent through the returned // channel ch. If an error is encountered, it will be sent on channel errs and // errs will be closed. To end the subscription, cancel the provided context. // // Zero or more filters may be provided as strings. Only events that match // *any* of the provided filters will be sent on the channel. The filters use // the standard containerd filters package syntax. func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.Envelope, errs <-chan error) { var ( evch = make(chan *v1.Envelope) errq = make(chan error, 1) channel = goevents.NewChannel(0) queue = goevents.NewQueue(channel) dst goevents.Sink = queue ) closeAll := func() { defer close(errq) defer e.broadcaster.Remove(dst) defer queue.Close() defer channel.Close() } ch = evch errs = errq if len(fs) > 0 { filter, err := filters.ParseAll(fs...) if err != nil { errq <- errors.Wrapf(err, "failed parsing subscription filters") closeAll() return } dst = goevents.NewFilter(queue, goevents.MatcherFunc(func(gev goevents.Event) bool { return filter.Match(adapt(gev)) })) } e.broadcaster.Add(dst) go func() { defer closeAll() var err error loop: for { select { case ev := <-channel.C: env, ok := ev.(*v1.Envelope) if !ok { // TODO(stevvooe): For the most part, we are well protected // from this condition. Both Forward and Publish protect // from this. err = errors.Errorf("invalid envelope encountered %#v; please file a bug", ev) break } select { case evch <- env: case <-ctx.Done(): break loop } case <-ctx.Done(): break loop } } if err == nil { if cerr := ctx.Err(); cerr != context.Canceled { err = cerr } } errq <- err }() return } func validateTopic(topic string) error { if topic == "" { return errors.Wrap(errdefs.ErrInvalidArgument, "must not be empty") } if topic[0] != '/' { return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'") } if len(topic) == 1 { return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component") } components := strings.Split(topic[1:], "/") for _, component := range components { if err := identifiers.Validate(component); err != nil { return errors.Wrapf(err, "failed validation on component %q", component) } } return nil } func validateEnvelope(envelope *v1.Envelope) error { if err := namespaces.Validate(envelope.Namespace); err != nil { return errors.Wrapf(err, "event envelope has invalid namespace") } if err := validateTopic(envelope.Topic); err != nil { return errors.Wrapf(err, "envelope topic %q", envelope.Topic) } if envelope.Timestamp.IsZero() { return errors.Wrapf(errdefs.ErrInvalidArgument, "timestamp must be set on forwarded event") } return nil } func adapt(ev interface{}) filters.Adaptor { if adaptor, ok := ev.(filters.Adaptor); ok { return adaptor } return filters.AdapterFunc(func(fieldpath []string) (string, bool) { return "", false }) }