Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increment missing indications from SACK #353

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,7 @@
}

// The caller should hold the lock.
func (a *Association) processFastRetransmission(cumTSNAckPoint, htna uint32, cumTSNAckPointAdvanced bool) error {
func (a *Association) processFastRetransmission(cumTSNAckPoint uint32, gapAckBlocks []gapAckBlock, htna uint32, cumTSNAckPointAdvanced bool) error {
// HTNA algorithm - RFC 4960 Sec 7.2.4
// Increment missIndicator of each chunks that the SACK reported missing
// when either of the following is met:
Expand All @@ -1772,7 +1772,10 @@
maxTSN = htna
} else {
// b) increment for all TSNs reported missing
maxTSN = cumTSNAckPoint + uint32(a.inflightQueue.size()) + 1
maxTSN = cumTSNAckPoint
if len(gapAckBlocks) > 0 {
maxTSN += uint32(gapAckBlocks[len(gapAckBlocks)-1].end)
}

Check warning on line 1778 in association.go

View check run for this annotation

Codecov / codecov/patch

association.go#L1775-L1778

Added lines #L1775 - L1778 were not covered by tests
}

for tsn := cumTSNAckPoint + 1; sna32LT(tsn, maxTSN); tsn++ {
Expand Down Expand Up @@ -1882,7 +1885,7 @@
a.setRWND(d.advertisedReceiverWindowCredit - bytesOutstanding)
}

err = a.processFastRetransmission(d.cumulativeTSNAck, htna, cumTSNAckPointAdvanced)
err = a.processFastRetransmission(d.cumulativeTSNAck, d.gapAckBlocks, htna, cumTSNAckPointAdvanced)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know the protocol, but just reading this, can this just pass in the end of the last entry? Any reason for passing in the slice?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess, you will have to do the check for length here rather than inside the function. Maybe, six of one and half-a-dozen of other case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No difference but I prefer to put the code and comment together to understand all TSNs reported missing from the SACK (gapAckBlocks)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @cnderrauber

if err != nil {
return err
}
Expand Down
82 changes: 56 additions & 26 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2921,49 +2921,79 @@ func TestAssociationFastRtxWnd(t *testing.T) {
}
}

// intercept SACK
var lastSACK atomic.Pointer[chunkSelectiveAck]
dbConn2.remoteInboundHandler = func(buf []byte) {
p := &packet{}
require.NoError(t, p.unmarshal(true, buf))
for _, c := range p.chunks {
if ack, aok := c.(*chunkSelectiveAck); aok {
lastSACK.Store(ack)
}
}
dbConn1.inboundHandler(buf)
}

_, err = s1.WriteSCTP([]byte("hello"), PayloadTypeWebRTCBinary)
require.NoError(t, err)
require.Eventually(t, func() bool { return lastSACK.Load() != nil }, 1*time.Second, 10*time.Millisecond)

shouldDrop.Store(true)
// send packets and dropped
buf := make([]byte, 1000)
for i := 0; i < 10; i++ {
buf := make([]byte, 700)
for i := 0; i < 20; i++ {
_, err = s1.WriteSCTP(buf, PayloadTypeWebRTCBinary)
require.NoError(t, err)
}

require.Eventually(t, func() bool { return dropCounter.Load() >= 10 }, 5*time.Second, 10*time.Millisecond, "drop %d", dropCounter.Load())
// send packets to trigger fast retransmit
shouldDrop.Store(false)
require.Eventually(t, func() bool { return dropCounter.Load() >= 15 }, 5*time.Second, 10*time.Millisecond, "drop %d", dropCounter.Load())

require.Zero(t, a1.stats.getNumFastRetrans())
require.False(t, a1.inFastRecovery)

// wait SACK
sackCh := make(chan []byte, 1)
dbConn2.remoteInboundHandler = func(buf []byte) {
p := &packet{}
require.NoError(t, p.unmarshal(true, buf))
for _, c := range p.chunks {
if _, ok := c.(*chunkSelectiveAck); ok {
select {
case sackCh <- buf:
default:
}
return
}
}
}
// wait sack to trigger fast retransmit
for i := 0; i < 3; i++ {
_, err = s1.WriteSCTP(buf, PayloadTypeWebRTCBinary)
require.NoError(t, err)
dbConn1.inboundHandler(<-sackCh)
// sack to trigger fast retransmit
ack := *(lastSACK.Load())
ack.gapAckBlocks = []gapAckBlock{{start: 11}}
for i := 11; i < 14; i++ {
ack.gapAckBlocks[0].end = uint16(i)
pkt := a1.createPacket([]chunk{&ack})
pktBuf, err1 := pkt.marshal(true)
require.NoError(t, err1)
dbConn1.inboundHandler(pktBuf)
}
// fast retransmit and new sack sent

require.Eventually(t, func() bool {
a1.lock.RLock()
defer a1.lock.RUnlock()
return a1.inFastRecovery
}, 5*time.Second, 10*time.Millisecond)
require.GreaterOrEqual(t, uint64(10), a1.stats.getNumFastRetrans())

// 7.2.4 b) In fast-recovery AND the Cumulative TSN Ack Point advanced
// the miss indications are incremented for all TSNs reported missing
// in the SACK.
a1.lock.Lock()
lastTSN := a1.inflightQueue.chunks.Back().tsn
lastTSNMinusTwo := lastTSN - 2
lastChunk := a1.inflightQueue.chunks.Back()
lastChunkMinusTwo, ok := a1.inflightQueue.get(lastTSNMinusTwo)
a1.lock.Unlock()
require.True(t, ok)
require.True(t, lastTSN > ack.cumulativeTSNAck+uint32(ack.gapAckBlocks[0].end)+3)

// sack with cumAckPoint advanced, lastTSN should not be marked as missing
ack.cumulativeTSNAck++
end := lastTSN - 1 - ack.cumulativeTSNAck
ack.gapAckBlocks = append(ack.gapAckBlocks, gapAckBlock{start: uint16(end), end: uint16(end)})
pkt := a1.createPacket([]chunk{&ack})
pktBuf, err := pkt.marshal(true)
require.NoError(t, err)
dbConn1.inboundHandler(pktBuf)
require.Eventually(t, func() bool {
a1.lock.Lock()
defer a1.lock.Unlock()
return lastChunkMinusTwo.missIndicator == 1 && lastChunk.missIndicator == 0
}, 5*time.Second, 10*time.Millisecond)
}

func TestAssociationMaxTSNOffset(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
return q.buf[q.head]
}

func (q *queue[T]) Back() T {
return q.buf[(q.tail-1+len(q.buf))%len(q.buf)]

Check warning on line 51 in queue.go

View check run for this annotation

Codecov / codecov/patch

queue.go#L50-L51

Added lines #L50 - L51 were not covered by tests
}

func (q *queue[T]) At(i int) T {
return q.buf[(q.head+i)%(len(q.buf))]
}
Expand Down
Loading