package networkdb import ( "net" "strings" "github.com/docker/go-events" ) type WatchEvent struct { Table string NetworkID string Key string Value []byte // Current value of the entry, or nil if deleted Prev []byte // Previous value of the entry, or nil if created } func (e WatchEvent) IsCreate() bool { return e.Prev == nil && e.Value != nil } func (e WatchEvent) IsUpdate() bool { return e.Prev != nil && e.Value != nil } func (e WatchEvent) IsDelete() bool { return e.Prev != nil && e.Value == nil } func (e WatchEvent) String() string { kind := "Unknown" switch { case e.IsCreate(): kind = "Create" case e.IsUpdate(): kind = "Update" case e.IsDelete(): kind = "Delete" } return kind + "(" + e.Table + "/" + e.NetworkID + "/" + e.Key + ")" } // NodeTable represents table event for node join and leave const NodeTable = "NodeTable" // NodeAddr represents the value carried for node event in NodeTable type NodeAddr struct { Addr net.IP } // Watch creates a watcher with filters for a particular table or // network or any combination of the tuple. If any of the // filter is an empty string it acts as a wildcard for that // field. Watch returns a channel of events, where the events will be // sent. The watch channel is initialized with synthetic create events for all // the existing table entries not owned by this node which match the filters. func (nDB *NetworkDB) Watch(tname, nid string) (*events.Channel, func()) { var matcher events.Matcher if tname != "" || nid != "" { matcher = events.MatcherFunc(func(ev events.Event) bool { evt := ev.(WatchEvent) if tname != "" && evt.Table != tname { return false } if nid != "" && evt.NetworkID != nid { return false } return true }) } ch := events.NewChannel(0) sink := events.Sink(events.NewQueue(ch)) if matcher != nil { sink = events.NewFilter(sink, matcher) } // Synthesize events for all the existing table entries not owned by // this node so that the watcher receives all state without racing with // any concurrent mutations to the table. nDB.RLock() defer nDB.RUnlock() if tname == "" { var prefix []byte if nid != "" { prefix = []byte("/" + nid + "/") } else { prefix = []byte("/") } nDB.indexes[byNetwork].Root().WalkPrefix(prefix, func(path []byte, v *entry) bool { if !v.deleting && v.node != nDB.config.NodeID { tuple := strings.SplitN(string(path[1:]), "/", 3) if len(tuple) == 3 { entryNid, entryTname, key := tuple[0], tuple[1], tuple[2] sink.Write(WatchEvent{ Table: entryTname, NetworkID: entryNid, Key: key, Value: v.value, }) } } return false }) } else { prefix := []byte("/" + tname + "/") if nid != "" { prefix = append(prefix, []byte(nid+"/")...) } nDB.indexes[byTable].Root().WalkPrefix(prefix, func(path []byte, v *entry) bool { if !v.deleting && v.node != nDB.config.NodeID { tuple := strings.SplitN(string(path[1:]), "/", 3) if len(tuple) == 3 { entryTname, entryNid, key := tuple[0], tuple[1], tuple[2] sink.Write(WatchEvent{ Table: entryTname, NetworkID: entryNid, Key: key, Value: v.value, }) } } return false }) } nDB.broadcaster.Add(sink) return ch, func() { nDB.broadcaster.Remove(sink) ch.Close() sink.Close() } }