package networkdb
import (
"fmt"
"net"
"testing"
"time"
"github.com/docker/go-events"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
func TestWatch_out_of_order(t *testing.T) {
nDB := newNetworkDB(DefaultConfig())
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
assert.Assert(t, nDB.JoinNetwork("network1"))
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
Name: "node1",
Addr: net.IPv4(1, 2, 3, 4),
})
d := &delegate{nDB}
msgs := messageBuffer{t: t}
appendTableEvent := tableEventHelper(&msgs, "node1", "network1", "table1")
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
Type: NetworkEventTypeJoin,
LTime: 1,
NodeName: "node1",
NetworkID: "network1",
})
appendTableEvent(1, TableEventTypeCreate, "tombstone1", []byte("a"))
appendTableEvent(2, TableEventTypeDelete, "tombstone1", []byte("b"))
appendTableEvent(3, TableEventTypeCreate, "key1", []byte("value1"))
d.NotifyMsg(msgs.Compound())
msgs.Reset()
nDB.CreateEntry("table1", "network1", "local1", []byte("should not see me in watch events"))
watch, cancel := nDB.Watch("table1", "network1")
defer cancel()
got := drainChannel(watch.C)
assert.Check(t, is.DeepEqual(got, []events.Event{
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key1", Value: []byte("value1")},
}))
// Receive events from node1, with events not received or received out of order
// Create, (hidden update), delete
appendTableEvent(4, TableEventTypeCreate, "key2", []byte("a"))
appendTableEvent(6, TableEventTypeDelete, "key2", []byte("b"))
// (Hidden recreate), delete
appendTableEvent(8, TableEventTypeDelete, "key2", []byte("c"))
// (Hidden recreate), update
appendTableEvent(10, TableEventTypeUpdate, "key2", []byte("d"))
// Update, create
appendTableEvent(11, TableEventTypeUpdate, "key3", []byte("b"))
appendTableEvent(10, TableEventTypeCreate, "key3", []byte("a"))
// (Hidden create), update, update
appendTableEvent(13, TableEventTypeUpdate, "key4", []byte("b"))
appendTableEvent(14, TableEventTypeUpdate, "key4", []byte("c"))
// Delete, create
appendTableEvent(16, TableEventTypeDelete, "key5", []byte("a"))
appendTableEvent(15, TableEventTypeCreate, "key5", []byte("a"))
// (Hidden recreate), delete
appendTableEvent(18, TableEventTypeDelete, "key5", []byte("b"))
d.NotifyMsg(msgs.Compound())
msgs.Reset()
got = drainChannel(watch.C)
assert.Check(t, is.DeepEqual(got, []events.Event{
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("a")},
// Delete value should match last observed value,
// irrespective of the content of the delete event over the wire.
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key2", Prev: []byte("a")},
// Updates to previously-deleted keys should be observed as creates.
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key2", Value: []byte("d")},
// Out-of-order update events should be observed as creates.
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key3", Value: []byte("b")},
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key4", Value: []byte("b")},
WatchEvent{Table: "table1", NetworkID: "network1", Key: "key4", Prev: []byte("b"), Value: []byte("c")},
// key5 should not appear in the events.
}))
}
func TestWatch_filters(t *testing.T) {
nDB := newNetworkDB(DefaultConfig())
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
assert.Assert(t, nDB.JoinNetwork("network1"))
assert.Assert(t, nDB.JoinNetwork("network2"))
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
Name: "node1",
Addr: net.IPv4(1, 2, 3, 4),
})
var ltime serf.LamportClock
msgs := messageBuffer{t: t}
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
Type: NetworkEventTypeJoin,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: "network1",
})
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
Type: NetworkEventTypeJoin,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: "network2",
})
for _, nid := range []string{"network1", "network2"} {
for _, tname := range []string{"table1", "table2"} {
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeCreate,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname + ".dead",
Value: []byte("deaddead"),
})
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeDelete,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname + ".dead",
Value: []byte("deaddead"),
})
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeCreate,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname + ".update",
Value: []byte("initial"),
})
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeCreate,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname,
Value: []byte("a"),
})
msgs.Append(MessageTypeTableEvent, &TableEvent{
Type: TableEventTypeUpdate,
LTime: ltime.Increment(),
NodeName: "node1",
NetworkID: nid,
TableName: tname,
Key: nid + "." + tname + ".update",
Value: []byte("updated"),
})
}
}
(&delegate{nDB}).NotifyMsg(msgs.Compound())
watchAll, cancel := nDB.Watch("", "")
defer cancel()
watchNetwork1Tables, cancel := nDB.Watch("", "network1")
defer cancel()
watchTable1AllNetworks, cancel := nDB.Watch("table1", "")
defer cancel()
watchTable1Network1, cancel := nDB.Watch("table1", "network1")
defer cancel()
var gotAll, gotNetwork1Tables, gotTable1AllNetworks, gotTable1Network1 []events.Event
L:
for {
select {
case ev := <-watchAll.C:
gotAll = append(gotAll, ev)
case ev := <-watchNetwork1Tables.C:
gotNetwork1Tables = append(gotNetwork1Tables, ev)
case ev := <-watchTable1AllNetworks.C:
gotTable1AllNetworks = append(gotTable1AllNetworks, ev)
case ev := <-watchTable1Network1.C:
gotTable1Network1 = append(gotTable1Network1, ev)
case <-time.After(time.Second):
break L
}
}
assert.Check(t, is.DeepEqual(gotAll, []events.Event{
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")},
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")},
WatchEvent{Table: "table2", NetworkID: "network1", Key: "network1.table2", Value: []byte("a")},
WatchEvent{Table: "table2", NetworkID: "network1", Key: "network1.table2.update", Value: []byte("updated")},
WatchEvent{Table: "table1", NetworkID: "network2", Key: "network2.table1", Value: []byte("a")},
WatchEvent{Table: "table1", NetworkID: "network2", Key: "network2.table1.update", Value: []byte("updated")},
WatchEvent{Table: "table2", NetworkID: "network2", Key: "network2.table2", Value: []byte("a")},
WatchEvent{Table: "table2", NetworkID: "network2", Key: "network2.table2.update", Value: []byte("updated")},
}))
assert.Check(t, is.DeepEqual(gotNetwork1Tables, []events.Event{
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")},
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")},
WatchEvent{Table: "table2", NetworkID: "network1", Key: "network1.table2", Value: []byte("a")},
WatchEvent{Table: "table2", NetworkID: "network1", Key: "network1.table2.update", Value: []byte("updated")},
}))
assert.Check(t, is.DeepEqual(gotTable1AllNetworks, []events.Event{
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")},
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")},
WatchEvent{Table: "table1", NetworkID: "network2", Key: "network2.table1", Value: []byte("a")},
WatchEvent{Table: "table1", NetworkID: "network2", Key: "network2.table1.update", Value: []byte("updated")},
}))
assert.Check(t, is.DeepEqual(gotTable1Network1, []events.Event{
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1", Value: []byte("a")},
WatchEvent{Table: "table1", NetworkID: "network1", Key: "network1.table1.update", Value: []byte("updated")},
}))
}
func TestLeaveRejoinOutOfOrder(t *testing.T) {
// Regression test for https://github.com/moby/moby/issues/47728
nDB := newNetworkDB(DefaultConfig())
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{}
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{}
assert.Assert(t, nDB.JoinNetwork("network1"))
(&eventDelegate{nDB}).NotifyJoin(&memberlist.Node{
Name: "node1",
Addr: net.IPv4(1, 2, 3, 4),
})
d := &delegate{nDB}
msgs := messageBuffer{t: t}
appendTableEvent := tableEventHelper(&msgs, "node1", "network1", "table1")
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
Type: NetworkEventTypeJoin,
LTime: 1,
NodeName: "node1",
NetworkID: "network1",
})
// Simulate node1 leaving, rejoining, and creating an entry,
// but the table events are broadcast before the network events.
appendTableEvent(1, TableEventTypeCreate, "key1", []byte("a"))
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
Type: NetworkEventTypeLeave,
LTime: 2,
NodeName: "node1",
NetworkID: "network1",
})
msgs.Append(MessageTypeNetworkEvent, &NetworkEvent{
Type: NetworkEventTypeJoin,
LTime: 3,
NodeName: "node1",
NetworkID: "network1",
})
// Simulate a bulk sync or receiving a rebroadcasted copy of the table
// event from another node.
appendTableEvent(1, TableEventTypeCreate, "key1", []byte("a"))
d.NotifyMsg(msgs.Compound())
got := make(map[string]string)
nDB.WalkTable("table1", func(nw, key string, value []byte, deleted bool) bool {
got[nw+"/"+key] = fmt.Sprintf("%s (deleted=%t)", value, deleted)
return false
})
want := map[string]string{
"network1/key1": "a (deleted=false)",
}
assert.Check(t, is.DeepEqual(got, want))
}
func drainChannel(ch <-chan events.Event) []events.Event {
var events []events.Event
for {
select {
case ev := <-ch:
events = append(events, ev)
case <-time.After(time.Second):
return events
}
}
}
type messageBuffer struct {
t *testing.T
msgs [][]byte
}
func (mb *messageBuffer) Append(typ MessageType, msg any) {
mb.t.Helper()
buf, err := encodeMessage(typ, msg)
if err != nil {
mb.t.Fatalf("failed to encode message: %v", err)
}
mb.msgs = append(mb.msgs, buf)
}
func (mb *messageBuffer) Compound() []byte {
return makeCompoundMessage(mb.msgs)
}
func (mb *messageBuffer) Reset() {
mb.msgs = nil
}
func tableEventHelper(mb *messageBuffer, nodeName, networkID, tableName string) func(ltime serf.LamportTime, typ TableEvent_Type, key string, value []byte) {
return func(ltime serf.LamportTime, typ TableEvent_Type, key string, value []byte) {
mb.t.Helper()
mb.Append(MessageTypeTableEvent, &TableEvent{
Type: typ,
LTime: ltime,
NodeName: nodeName,
NetworkID: networkID,
TableName: tableName,
Key: key,
Value: value,
})
}
}