Skip to content

Commit

Permalink
Limit the number of reconfig reset requests enqueued
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Apr 3, 2024
1 parent f0386f2 commit ff4675b
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 1 deletion.
28 changes: 27 additions & 1 deletion association.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ const (
// irrespective of the receive buffer size
// see Association.getMaxTSNOffset
maxTSNOffset = 40000
// maxReconfigRequests is the maximum number of reconfig requests we will keep outstanding
maxReconfigRequests = 1000
)

func getAssociationStateString(a uint32) string {
Expand Down Expand Up @@ -2138,15 +2140,39 @@ func (a *Association) handleReconfigParam(raw param) (*packet, error) {
switch p := raw.(type) {
case *paramOutgoingResetRequest:
a.log.Tracef("[%s] handleReconfigParam (OutgoingResetRequest)", a.name)
if a.peerLastTSN < p.senderLastTSN && len(a.reconfigRequests) >= maxReconfigRequests {
// We have too many reconfig requests outstanding. Drop the request and let
// the peer retransmit. A well behaved peer should only have 1 outstanding
// reconfig request.
//
// RFC 6525: https://www.rfc-editor.org/rfc/rfc6525.html#section-5.1.1
// At any given time, there MUST NOT be more than one request in flight.
// So, if the Re-configuration Timer is running and the RE-CONFIG chunk
// contains at least one request parameter, the chunk MUST be buffered.
// chrome: https://chromium.googlesource.com/external/webrtc/+/refs/heads/main/net/dcsctp/socket/stream_reset_handler.cc#271
return nil, fmt.Errorf("too many outstanding reconfig requests: %d", len(a.reconfigRequests))
}
a.reconfigRequests[p.reconfigRequestSequenceNumber] = p
resp := a.resetStreamsIfAny(p)
if resp != nil {
return resp, nil
}
return nil, nil //nolint:nilnil

case *paramReconfigResponse:
a.log.Tracef("[%s] handleReconfigParam (ReconfigResponse)", a.name)
if p.result == reconfigResultInProgress {
// RFC 6525: https://www.rfc-editor.org/rfc/rfc6525.html#section-5.2.7
//
// If the Result field indicates "In progress", the timer for the
// Re-configuration Request Sequence Number is started again. If
// the timer runs out, the RE-CONFIG chunk MUST be retransmitted
// but the corresponding error counters MUST NOT be incremented.
if len(a.reconfigs) == 0 {
a.tReconfig.stop()
a.tReconfig.start(a.rtoMgr.getRTO())
}
return nil, nil
}
delete(a.reconfigs, p.reconfigResponseSequenceNumber)
if len(a.reconfigs) == 0 {
a.tReconfig.stop()
Expand Down
75 changes: 75 additions & 0 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3393,3 +3393,78 @@ func TestDataChunkBundlingIntoPacket(t *testing.T) {
}
}
}

func TestAssociation_ReconfigRequestsLimited(t *testing.T) {
checkGoroutineLeaks(t)

lim := test.TimeOut(time.Second * 10)
defer lim.Stop()

a1chan, a2chan := make(chan *Association), make(chan *Association)

udp1, udp2 := createUDPConnPair()

go func() {
a1, err := Client(Config{
NetConn: udp1,
LoggerFactory: logging.NewDefaultLoggerFactory(),
})
assert.NoError(t, err)
a1chan <- a1
}()

go func() {
a2, err := Server(Config{
NetConn: udp2,
LoggerFactory: logging.NewDefaultLoggerFactory(),
})
assert.NoError(t, err)
a2chan <- a2
}()

a1, a2 := <-a1chan, <-a2chan
defer a1.Close()
defer a2.Close()

writeStream, err := a1.OpenStream(1, PayloadTypeWebRTCString)
require.NoError(t, err)

readStream, err := a2.OpenStream(1, PayloadTypeWebRTCString)
require.NoError(t, err)

// exchange some data
testData := []byte("test")
_, err = writeStream.Write(testData)
require.NoError(t, err)

buf := make([]byte, len(testData))
_, err = readStream.Read(buf)
assert.NoError(t, err)
assert.Equal(t, testData, buf)

a1.lock.RLock()
tsn := a1.myNextTSN
a1.lock.RUnlock()
for i := 0; i < maxReconfigRequests+100; i++ {
c := &chunkReconfig{
paramA: &paramOutgoingResetRequest{
reconfigRequestSequenceNumber: 10 + uint32(i),
senderLastTSN: tsn + 10, // has to be enqueued
streamIdentifiers: []uint16{uint16(i)},
},
}
p := a1.createPacket([]chunk{c})
buf, err := p.marshal(true)
require.NoError(t, err)
_, err = a1.netConn.Write(buf)
require.NoError(t, err)
if i%100 == 0 {
time.Sleep(100 * time.Millisecond)
}
}
// Let a2 process the requests
time.Sleep(2 * time.Second)
a2.lock.RLock()
require.LessOrEqual(t, len(a2.reconfigRequests), maxReconfigRequests)
a2.lock.RUnlock()
}

0 comments on commit ff4675b

Please sign in to comment.