Browse code

Avoid delay on node rejoin, avoid useless witness

Avoid waiting for a double notification once a node rejoin, just
put it back to active state. Waiting for a further message does not
really add anything to the safety of the operation, the source of truth
for the node status resided inside memberlist.

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>

Flavio Crisciani authored on 2017/12/22 07:42:47
Showing 4 changed files
... ...
@@ -165,16 +165,19 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
165 165
 		}
166 166
 	}
167 167
 	nDB.RUnlock()
168
+
168 169
 	if !ok || network.leaving || !nodePresent {
169 170
 		// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
170 171
 		return false
171 172
 	}
172 173
 
174
+	nDB.Lock()
173 175
 	e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
174 176
 	if err == nil {
175 177
 		// We have the latest state. Ignore the event
176 178
 		// since it is stale.
177 179
 		if e.ltime >= tEvent.LTime {
180
+			nDB.Unlock()
178 181
 			return false
179 182
 		}
180 183
 	}
... ...
@@ -195,8 +198,6 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
195 195
 			nDB.config.Hostname, nDB.config.NodeID, tEvent)
196 196
 		e.reapTime = nDB.config.reapEntryInterval
197 197
 	}
198
-
199
-	nDB.Lock()
200 198
 	nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
201 199
 	nDB.Unlock()
202 200
 
... ...
@@ -26,13 +26,10 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
26 26
 	e.broadcastNodeEvent(mn.Addr, opCreate)
27 27
 	e.nDB.Lock()
28 28
 	defer e.nDB.Unlock()
29
+
29 30
 	// In case the node is rejoining after a failure or leave,
30
-	// wait until an explicit join message arrives before adding
31
-	// it to the nodes just to make sure this is not a stale
32
-	// join. If you don't know about this node add it immediately.
33
-	_, fOk := e.nDB.failedNodes[mn.Name]
34
-	_, lOk := e.nDB.leftNodes[mn.Name]
35
-	if fOk || lOk {
31
+	// just add the node back to active
32
+	if moved, _ := e.nDB.changeNodeState(mn.Name, nodeActiveState); moved {
36 33
 		return
37 34
 	}
38 35
 
... ...
@@ -322,6 +322,8 @@ func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
322 322
 // GetEntry retrieves the value of a table entry in a given (network,
323 323
 // table, key) tuple
324 324
 func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
325
+	nDB.RLock()
326
+	defer nDB.RUnlock()
325 327
 	entry, err := nDB.getEntry(tname, nid, key)
326 328
 	if err != nil {
327 329
 		return nil, err
... ...
@@ -331,9 +333,6 @@ func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
331 331
 }
332 332
 
333 333
 func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
334
-	nDB.RLock()
335
-	defer nDB.RUnlock()
336
-
337 334
 	e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
338 335
 	if !ok {
339 336
 		return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
... ...
@@ -348,13 +347,10 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
348 348
 // entry for the same tuple for which there is already an existing
349 349
 // entry unless the current entry is deleting state.
350 350
 func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
351
+	nDB.Lock()
351 352
 	oldEntry, err := nDB.getEntry(tname, nid, key)
352
-	if err != nil {
353
-		if _, ok := err.(types.NotFoundError); !ok {
354
-			return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err)
355
-		}
356
-	}
357
-	if oldEntry != nil && !oldEntry.deleting {
353
+	if err == nil || (oldEntry != nil && !oldEntry.deleting) {
354
+		nDB.Unlock()
358 355
 		return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
359 356
 	}
360 357
 
... ...
@@ -364,14 +360,13 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
364 364
 		value: value,
365 365
 	}
366 366
 
367
+	nDB.createOrUpdateEntry(nid, tname, key, entry)
368
+	nDB.Unlock()
369
+
367 370
 	if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
368 371
 		return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
369 372
 	}
370 373
 
371
-	nDB.Lock()
372
-	nDB.createOrUpdateEntry(nid, tname, key, entry)
373
-	nDB.Unlock()
374
-
375 374
 	return nil
376 375
 }
377 376
 
... ...
@@ -380,7 +375,9 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
380 380
 // propagates this event to the cluster. It is an error to update a
381 381
 // non-existent entry.
382 382
 func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
383
-	if _, err := nDB.GetEntry(tname, nid, key); err != nil {
383
+	nDB.Lock()
384
+	if _, err := nDB.getEntry(tname, nid, key); err != nil {
385
+		nDB.Unlock()
384 386
 		return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
385 387
 	}
386 388
 
... ...
@@ -390,14 +387,13 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
390 390
 		value: value,
391 391
 	}
392 392
 
393
+	nDB.createOrUpdateEntry(nid, tname, key, entry)
394
+	nDB.Unlock()
395
+
393 396
 	if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
394 397
 		return fmt.Errorf("cannot send table update event: %v", err)
395 398
 	}
396 399
 
397
-	nDB.Lock()
398
-	nDB.createOrUpdateEntry(nid, tname, key, entry)
399
-	nDB.Unlock()
400
-
401 400
 	return nil
402 401
 }
403 402
 
... ...
@@ -427,27 +423,29 @@ func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem
427 427
 // table, key) tuple and if the NetworkDB is part of the cluster
428 428
 // propagates this event to the cluster.
429 429
 func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
430
-	value, err := nDB.GetEntry(tname, nid, key)
431
-	if err != nil {
432
-		return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
430
+	nDB.Lock()
431
+	oldEntry, err := nDB.getEntry(tname, nid, key)
432
+	if err != nil || oldEntry == nil || oldEntry.deleting {
433
+		nDB.Unlock()
434
+		return fmt.Errorf("cannot delete entry %s with network id %s and key %s "+
435
+			"does not exist or is already being deleted", tname, nid, key)
433 436
 	}
434 437
 
435 438
 	entry := &entry{
436 439
 		ltime:    nDB.tableClock.Increment(),
437 440
 		node:     nDB.config.NodeID,
438
-		value:    value,
441
+		value:    oldEntry.value,
439 442
 		deleting: true,
440 443
 		reapTime: nDB.config.reapEntryInterval,
441 444
 	}
442 445
 
446
+	nDB.createOrUpdateEntry(nid, tname, key, entry)
447
+	nDB.Unlock()
448
+
443 449
 	if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
444 450
 		return fmt.Errorf("cannot send table delete event: %v", err)
445 451
 	}
446 452
 
447
-	nDB.Lock()
448
-	nDB.createOrUpdateEntry(nid, tname, key, entry)
449
-	nDB.Unlock()
450
-
451 453
 	return nil
452 454
 }
453 455
 
... ...
@@ -735,3 +735,64 @@ func TestNodeReincarnation(t *testing.T) {
735 735
 
736 736
 	closeNetworkDBInstances(dbs)
737 737
 }
738
+
739
+func TestParallelCreate(t *testing.T) {
740
+	dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
741
+
742
+	startCh := make(chan int)
743
+	doneCh := make(chan error)
744
+	var success int32
745
+	for i := 0; i < 20; i++ {
746
+		go func() {
747
+			<-startCh
748
+			err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
749
+			if err == nil {
750
+				atomic.AddInt32(&success, 1)
751
+			}
752
+			doneCh <- err
753
+		}()
754
+	}
755
+
756
+	close(startCh)
757
+
758
+	for i := 0; i < 20; i++ {
759
+		<-doneCh
760
+	}
761
+	close(doneCh)
762
+	// Only 1 write should have succeeded
763
+	assert.Equal(t, int32(1), success)
764
+
765
+	closeNetworkDBInstances(dbs)
766
+}
767
+
768
+func TestParallelDelete(t *testing.T) {
769
+	dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
770
+
771
+	err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
772
+	assert.NoError(t, err)
773
+
774
+	startCh := make(chan int)
775
+	doneCh := make(chan error)
776
+	var success int32
777
+	for i := 0; i < 20; i++ {
778
+		go func() {
779
+			<-startCh
780
+			err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
781
+			if err == nil {
782
+				atomic.AddInt32(&success, 1)
783
+			}
784
+			doneCh <- err
785
+		}()
786
+	}
787
+
788
+	close(startCh)
789
+
790
+	for i := 0; i < 20; i++ {
791
+		<-doneCh
792
+	}
793
+	close(doneCh)
794
+	// Only 1 write should have succeeded
795
+	assert.Equal(t, int32(1), success)
796
+
797
+	closeNetworkDBInstances(dbs)
798
+}