Skip to content

Commit

Permalink
make nextTransactionID an atomic variable
Browse files Browse the repository at this point in the history
This commit has 2 main benefits:
1) It makes it impossible to access nextTransactionID un-atomically
2) It fixes a small bug where we would have racy (albeit atomic)
   accesses to nextTransactionID. Consider the following interleaving:
   Dispactcher Call #1 and #2:
        Read nextTransactioID

   Dispactcher Call #1 and #2:
        Bump nextTransactionId *locally* and then write it back. The
        same value is written back twice.

   Dispactcher Call #1 and #2:
        Send a message with the newly minted transaction ID, x. Note,
        *two* messages are sent with x! So two responses will come back.

   First response arrives:
        Entry is deleted from dispatcher's waited hash map.

   Second response arrives:
        Received message with id x, but no record of it, because the
        entry was deleted when the first message arrived.

   The solution is just to use an atomic read-modify-write operation in
   the form of .Add(1)
  • Loading branch information
fprasx committed Aug 21, 2023
1 parent 890af59 commit bceab88
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions pkg/agent/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Dispatcher struct {

// nextTransactionID is the current transaction id. When we need a new one
// we simply bump it and take the new number.
nextTransactionID uint64
nextTransactionID *atomic.Uint64

logger *zap.Logger

Expand Down Expand Up @@ -96,7 +96,7 @@ func NewDispatcher(logger *zap.Logger, addr string, parent *InformantServer) (di
disp = Dispatcher{
conn: c,
waiters: make(map[uint64]util.SignalSender[*MonitorResult]),
nextTransactionID: 0,
nextTransactionID: &atomic.Uint64{},
logger: logger.Named("dispatcher"),
protoVersion: version.Version,
server: parent,
Expand All @@ -123,8 +123,7 @@ func (disp *Dispatcher) send(ctx context.Context, id uint64, message any) error
// on the provided SignalSender. The value passed into message must be a valid value
// to send to the monitor. See the docs for SerializeInformantMessage.
func (disp *Dispatcher) Call(ctx context.Context, sender util.SignalSender[*MonitorResult], message any) error {
id := atomic.LoadUint64(&disp.nextTransactionID)
atomic.AddUint64(&disp.nextTransactionID, 1)
id := disp.nextTransactionID.Add(1)
err := disp.send(ctx, id, message)
if err != nil {
disp.logger.Error("failed to send message", zap.Any("message", message), zap.Error(err))
Expand Down

0 comments on commit bceab88

Please sign in to comment.