package networkdb import ( "fmt" "net" "testing" "time" "github.com/docker/go-events" "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/serf" "gotest.tools/v3/assert" is "gotest.tools/v3/assert/cmp" ) func TestWatch_out_of_order(t *testing.T) { nDB := newNetworkDB(DefaultConfig()) nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{} nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{} assert.Assert(t, nDB.JoinNetwork("network1")) (&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{ Name: "node1", Addr: net.IPv4(1, 2, 3, 4), }) d := &delegate{nDB} msgs := messageBuffer{t: t} appendTableEvent := tableEventHelper(&msgs, "node1", "network1", "table1") msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{ Type: NetworkEventTypeJoin, LTime: 1, NodeName: "node1", NetworkID: "network1", }) appendTableEvent(1, TableEventTypeCreate, "tombstone1", []byte("a")) appendTableEvent(2, TableEventTypeDelete, "tombstone1", []byte("b")) appendTableEvent(3, TableEventTypeCreate, "key1", []byte("value1")) d.NotifyMsg(msgs.Compound()) msgs.Reset() nDB.CreateEntry("table1", "network1", "local1", []byte("should not see me in watch events")) watch, cancel := nDB.Watch("table1", "network1") defer cancel() got := drainChannel(watch.C) assert.Check(t, is.DeepEqual(got, []events.Event{ WatchEvent{Table: "table1", NetworkID: "network1", Key: "key1", Value: []byte("value1")}, })) // Receive events from node1, with events not received or received out of order // Create, (hidden update), delete appendTableEvent(4, TableEventTypeCreate, "key2", []byte("a")) appendTableEvent(6, TableEventTypeDelete, "key2", []byte("b")) // (Hidden recreate), delete appendTableEvent(8, TableEventTypeDelete, "key2", []byte("c")) // (Hidden recreate), update appendTableEvent(10, TableEventTypeUpdate, "key2", []byte("d")) // Update, create appendTableEvent(11, TableEventTypeUpdate, "key3", []byte("b")) appendTableEvent(10, TableEventTypeCreate, "key3", []byte("a")) // (Hidden create), update, update appendTableEvent(13, TableEventTypeUpdate, "key4", []byte("b")) appendTableEvent(14, TableEventTypeUpdate, "key4", []byte("c")) // Delete, create appendTableEvent(16, TableEventTypeDelete, "key5", []byte("a")) appendTableEvent(15, TableEventTypeCreate, "key5", []byte("a")) // (Hidden recreate), delete appendTableEvent(18, TableEventTypeDelete, "key5", []byte("b")) d.NotifyMsg(msgs.Compound()) msgs.Reset() got = drainChannel(watch.C) assert.Check(t, is.DeepEqual(got, []events.Event{ WatchEvent{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("a")}, // Delete value should match last observed value, // irrespective of the content of the delete event over the wire. WatchEvent{Table: "table1", NetworkID: "network1", Key: "key2", Prev: []byte("a")}, // Updates to previously-deleted keys should be observed as creates. WatchEvent{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("d")}, // Out-of-order update events should be observed as creates. WatchEvent{Table: "table1", NetworkID: "network1", Key: "key3", Value: []byte("b")}, WatchEvent{Table: "table1", NetworkID: "network1", Key: "key4", Value: []byte("b")}, WatchEvent{Table: "table1", NetworkID: "network1", Key: "key4", Prev: []byte("b"), Value: []byte("c")}, // key5 should not appear in the events. })) } func TestWatch_filters(t *testing.T) { nDB := newNetworkDB(DefaultConfig()) nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{} nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{} assert.Assert(t, nDB.JoinNetwork("network1")) assert.Assert(t, nDB.JoinNetwork("network2")) (&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{ Name: "node1", Addr: net.IPv4(1, 2, 3, 4), }) var ltime serf.LamportClock msgs := messageBuffer{t: t} msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{ Type: NetworkEventTypeJoin, LTime: ltime.Increment(), NodeName: "node1", NetworkID: "network1", }) msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{ Type: NetworkEventTypeJoin, LTime: ltime.Increment(), NodeName: "node1", NetworkID: "network2", }) for _, nid := range []string{"network1", "network2"} { for _, tname := range []string{"table1", "table2"} { msgs.Append(MessageTypeTableEvent, &TableEvent{ Type: TableEventTypeCreate, LTime: ltime.Increment(), NodeName: "node1", NetworkID: nid, TableName: tname, Key: nid + "." + tname + ".dead", Value: []byte("deaddead"), }) msgs.Append(MessageTypeTableEvent, &TableEvent{ Type: TableEventTypeDelete, LTime: ltime.Increment(), NodeName: "node1", NetworkID: nid, TableName: tname, Key: nid + "." + tname + ".dead", Value: []byte("deaddead"), }) msgs.Append(MessageTypeTableEvent, &TableEvent{ Type: TableEventTypeCreate, LTime: ltime.Increment(), NodeName: "node1", NetworkID: nid, TableName: tname, Key: nid + "." + tname + ".update", Value: []byte("initial"), }) msgs.Append(MessageTypeTableEvent, &TableEvent{ Type: TableEventTypeCreate, LTime: ltime.Increment(), NodeName: "node1", NetworkID: nid, TableName: tname, Key: nid + "." + tname, Value: []byte("a"), }) msgs.Append(MessageTypeTableEvent, &TableEvent{ Type: TableEventTypeUpdate, LTime: ltime.Increment(), NodeName: "node1", NetworkID: nid, TableName: tname, Key: nid + "." + tname + ".update", Value: []byte("updated"), }) } } (&delegate{nDB}).NotifyMsg(msgs.Compound()) watchAll, cancel := nDB.Watch("", "") defer cancel() watchNetwork1Tables, cancel := nDB.Watch("", "network1") defer cancel() watchTable1AllNetworks, cancel := nDB.Watch("table1", "") defer cancel() watchTable1Network1, cancel := nDB.Watch("table1", "network1") defer cancel() var gotAll, gotNetwork1Tables, gotTable1AllNetworks, gotTable1Network1 []events.Event L: for { select { case ev := <-watchAll.C: gotAll = append(gotAll, ev) case ev := <-watchNetwork1Tables.C: gotNetwork1Tables = append(gotNetwork1Tables, ev) case ev := <-watchTable1AllNetworks.C: gotTable1AllNetworks = append(gotTable1AllNetworks, ev) case ev := <-watchTable1Network1.C: gotTable1Network1 = append(gotTable1Network1, ev) case <-time.After(time.Second): break L } } assert.Check(t, is.DeepEqual(gotAll, []events.Event{ WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}, WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}, WatchEvent{Table: "table2", NetworkID: "network1", Key: "network1.table2", Value: []byte("a")}, WatchEvent{Table: "table2", NetworkID: "network1", Key: "network1.table2.update", Value: []byte("updated")}, WatchEvent{Table: "table1", NetworkID: "network2", Key: "network2.table1", Value: []byte("a")}, WatchEvent{Table: "table1", NetworkID: "network2", Key: "network2.table1.update", Value: []byte("updated")}, WatchEvent{Table: "table2", NetworkID: "network2", Key: "network2.table2", Value: []byte("a")}, WatchEvent{Table: "table2", NetworkID: "network2", Key: "network2.table2.update", Value: []byte("updated")}, })) assert.Check(t, is.DeepEqual(gotNetwork1Tables, []events.Event{ WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}, WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}, WatchEvent{Table: "table2", NetworkID: "network1", Key: "network1.table2", Value: []byte("a")}, WatchEvent{Table: "table2", NetworkID: "network1", Key: "network1.table2.update", Value: []byte("updated")}, })) assert.Check(t, is.DeepEqual(gotTable1AllNetworks, []events.Event{ WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}, WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}, WatchEvent{Table: "table1", NetworkID: "network2", Key: "network2.table1", Value: []byte("a")}, WatchEvent{Table: "table1", NetworkID: "network2", Key: "network2.table1.update", Value: []byte("updated")}, })) assert.Check(t, is.DeepEqual(gotTable1Network1, []events.Event{ WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")}, WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")}, })) } func TestLeaveRejoinOutOfOrder(t *testing.T) { // Regression test for https://github.com/moby/moby/issues/47728 nDB := newNetworkDB(DefaultConfig()) nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{} nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{} assert.Assert(t, nDB.JoinNetwork("network1")) (&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{ Name: "node1", Addr: net.IPv4(1, 2, 3, 4), }) d := &delegate{nDB} msgs := messageBuffer{t: t} appendTableEvent := tableEventHelper(&msgs, "node1", "network1", "table1") msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{ Type: NetworkEventTypeJoin, LTime: 1, NodeName: "node1", NetworkID: "network1", }) // Simulate node1 leaving, rejoining, and creating an entry, // but the table events are broadcast before the network events. appendTableEvent(1, TableEventTypeCreate, "key1", []byte("a")) msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{ Type: NetworkEventTypeLeave, LTime: 2, NodeName: "node1", NetworkID: "network1", }) msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{ Type: NetworkEventTypeJoin, LTime: 3, NodeName: "node1", NetworkID: "network1", }) // Simulate a bulk sync or receiving a rebroadcasted copy of the table // event from another node. appendTableEvent(1, TableEventTypeCreate, "key1", []byte("a")) d.NotifyMsg(msgs.Compound()) got := make(map[string]string) nDB.WalkTable("table1", func(nw, key string, value []byte, deleted bool) bool { got[nw+"/"+key] = fmt.Sprintf("%s (deleted=%t)", value, deleted) return false }) want := map[string]string{ "network1/key1": "a (deleted=false)", } assert.Check(t, is.DeepEqual(got, want)) } func drainChannel(ch <-chan events.Event) []events.Event { var events []events.Event for { select { case ev := <-ch: events = append(events, ev) case <-time.After(time.Second): return events } } } type messageBuffer struct { t *testing.T msgs [][]byte } func (mb *messageBuffer) Append(typ MessageType, msg any) { mb.t.Helper() buf, err := encodeMessage(typ, msg) if err != nil { mb.t.Fatalf("failed to encode message: %v", err) } mb.msgs = append(mb.msgs, buf) } func (mb *messageBuffer) Compound() []byte { return makeCompoundMessage(mb.msgs) } func (mb *messageBuffer) Reset() { mb.msgs = nil } func tableEventHelper(mb *messageBuffer, nodeName, networkID, tableName string) func(ltime serf.LamportTime, typ TableEvent_Type, key string, value []byte) { return func(ltime serf.LamportTime, typ TableEvent_Type, key string, value []byte) { mb.t.Helper() mb.Append(MessageTypeTableEvent, &TableEvent{ Type: typ, LTime: ltime, NodeName: nodeName, NetworkID: networkID, TableName: tableName, Key: key, Value: value, }) } }