Skip to content

Commit

Permalink
Configurable RTOMax - fix #181
Browse files Browse the repository at this point in the history
This change adds RTOMax to Config letting the user cap the
retransmission timer without breaking compatibility.
If RTOMax is 0 the current value of 60 seconds is used.
  • Loading branch information
daonb committed Feb 27, 2024
1 parent 76ae7f1 commit 1520f1c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 34 deletions.
14 changes: 8 additions & 6 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ type Config struct {
MaxMessageSize uint32
EnableZeroChecksum bool
LoggerFactory logging.LoggerFactory
// RTOMax is the maximum retransmission timeout in milliseconds
RTOMax float64
}

// Server accepts a SCTP stream over a conn
Expand Down Expand Up @@ -312,7 +314,7 @@ func createAssociation(config Config) *Association {
myNextRSN: tsn,
minTSN2MeasureRTT: tsn,
state: closed,
rtoMgr: newRTOManager(),
rtoMgr: newRTOManager(config.RTOMax),
streams: map[uint16]*Stream{},
reconfigs: map[uint32]*chunkReconfig{},
reconfigRequests: map[uint32]*paramOutgoingResetRequest{},
Expand Down Expand Up @@ -340,11 +342,11 @@ func createAssociation(config Config) *Association {
a.name, a.CWND(), a.ssthresh, a.inflightQueue.getNumBytes())

a.srtt.Store(float64(0))
a.t1Init = newRTXTimer(timerT1Init, a, maxInitRetrans)
a.t1Cookie = newRTXTimer(timerT1Cookie, a, maxInitRetrans)
a.t2Shutdown = newRTXTimer(timerT2Shutdown, a, noMaxRetrans) // retransmit forever
a.t3RTX = newRTXTimer(timerT3RTX, a, noMaxRetrans) // retransmit forever
a.tReconfig = newRTXTimer(timerReconfig, a, noMaxRetrans) // retransmit forever
a.t1Init = newRTXTimer(timerT1Init, a, maxInitRetrans, config.RTOMax)
a.t1Cookie = newRTXTimer(timerT1Cookie, a, maxInitRetrans, config.RTOMax)
a.t2Shutdown = newRTXTimer(timerT2Shutdown, a, noMaxRetrans, config.RTOMax)
a.t3RTX = newRTXTimer(timerT3RTX, a, noMaxRetrans, config.RTOMax)
a.tReconfig = newRTXTimer(timerReconfig, a, noMaxRetrans, config.RTOMax)
a.ackTimer = newAckTimer(a)

return a
Expand Down
33 changes: 24 additions & 9 deletions rtx_timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
const (
rtoInitial float64 = 1.0 * 1000 // msec
rtoMin float64 = 1.0 * 1000 // msec
rtoMax float64 = 60.0 * 1000 // msec
defaultRTOMax float64 = 60.0 * 1000 // msec
rtoAlpha float64 = 0.125
rtoBeta float64 = 0.25
maxInitRetrans uint = 8
Expand All @@ -28,13 +28,20 @@ type rtoManager struct {
rto float64
noUpdate bool
mutex sync.RWMutex
rtoMax float64
}

// newRTOManager creates a new rtoManager.
func newRTOManager() *rtoManager {
return &rtoManager{
rto: rtoInitial,
func newRTOManager(rtoMax float64) *rtoManager {
mgr := rtoManager{
rto: rtoInitial,
rtoMax: rtoMax,
}
if mgr.rtoMax == 0 {
mgr.rtoMax = defaultRTOMax
}
return &mgr

Check failure on line 44 in rtx_timer.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gofumpt`-ed (gofumpt)
}

// setNewRTT takes a newly measured RTT then adjust the RTO in msec.
Expand All @@ -55,7 +62,7 @@ func (m *rtoManager) setNewRTT(rtt float64) float64 {
m.rttvar = (1-rtoBeta)*m.rttvar + rtoBeta*(math.Abs(m.srtt-rtt))
m.srtt = (1-rtoAlpha)*m.srtt + rtoAlpha*rtt
}
m.rto = math.Min(math.Max(m.srtt+4*m.rttvar, rtoMin), rtoMax)
m.rto = math.Min(math.Max(m.srtt+4*m.rttvar, rtoMin), m.rtoMax)
return m.srtt
}

Expand Down Expand Up @@ -106,19 +113,27 @@ type rtxTimer struct {
stopFunc stopTimerLoop
closed bool
mutex sync.RWMutex
rtoMax float64
}

type stopTimerLoop func()

// newRTXTimer creates a new retransmission timer.
// if maxRetrans is set to 0, it will keep retransmitting until stop() is called.
// (it will never make onRetransmissionFailure() callback.
func newRTXTimer(id int, observer rtxTimerObserver, maxRetrans uint) *rtxTimer {
return &rtxTimer{
func newRTXTimer(id int, observer rtxTimerObserver, maxRetrans uint,
rtoMax float64) *rtxTimer {

Check failure on line 125 in rtx_timer.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gofumpt`-ed (gofumpt)

timer := rtxTimer{
id: id,
observer: observer,
maxRetrans: maxRetrans,
rtoMax: rtoMax,
}
if timer.rtoMax == 0 {
timer.rtoMax = defaultRTOMax
}
return &timer
}

// start starts the timer.
Expand Down Expand Up @@ -148,7 +163,7 @@ func (t *rtxTimer) start(rto float64) bool {
canceling := false

for !canceling {
timeout := calculateNextTimeout(rto, nRtos)
timeout := calculateNextTimeout(rto, nRtos, t.rtoMax)
timer := time.NewTimer(time.Duration(timeout) * time.Millisecond)

select {
Expand Down Expand Up @@ -208,7 +223,7 @@ func (t *rtxTimer) isRunning() bool {
return (t.stopFunc != nil)
}

func calculateNextTimeout(rto float64, nRtos uint) float64 {
func calculateNextTimeout(rto float64, nRtos uint, rtoMax float64) float64 {
// RFC 4096 sec 6.3.3. Handle T3-rtx Expiration
// E2) For the destination address for which the timer expires, set RTO
// <- RTO * 2 ("back off the timer"). The maximum value discussed
Expand Down
49 changes: 30 additions & 19 deletions rtx_timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

func TestRTOManager(t *testing.T) {
t.Run("initial values", func(t *testing.T) {
m := newRTOManager()
m := newRTOManager(0)
assert.Equal(t, rtoInitial, m.rto, "should be rtoInitial")
assert.Equal(t, rtoInitial, m.getRTO(), "should be rtoInitial")
assert.Equal(t, float64(0), m.srtt, "should be 0")
Expand All @@ -23,7 +23,7 @@ func TestRTOManager(t *testing.T) {

t.Run("RTO calculation (small RTT)", func(t *testing.T) {
var rto float64
m := newRTOManager()
m := newRTOManager(0)
exp := []int32{
1800,
1500,
Expand All @@ -41,7 +41,7 @@ func TestRTOManager(t *testing.T) {

t.Run("RTO calculation (large RTT)", func(t *testing.T) {
var rto float64
m := newRTOManager()
m := newRTOManager(0)
exp := []int32{
60000, // capped at RTO.Max
60000, // capped at RTO.Max
Expand All @@ -59,22 +59,33 @@ func TestRTOManager(t *testing.T) {

t.Run("calculateNextTimeout", func(t *testing.T) {
var rto float64
rto = calculateNextTimeout(1.0, 0)
rto = calculateNextTimeout(1.0, 0, defaultRTOMax)
assert.Equal(t, float64(1), rto, "should match")
rto = calculateNextTimeout(1.0, 1)
rto = calculateNextTimeout(1.0, 1, defaultRTOMax)
assert.Equal(t, float64(2), rto, "should match")
rto = calculateNextTimeout(1.0, 2)
rto = calculateNextTimeout(1.0, 2, defaultRTOMax)
assert.Equal(t, float64(4), rto, "should match")
rto = calculateNextTimeout(1.0, 30)
rto = calculateNextTimeout(1.0, 30, defaultRTOMax)
assert.Equal(t, float64(60000), rto, "should match")
rto = calculateNextTimeout(1.0, 63)
rto = calculateNextTimeout(1.0, 63, defaultRTOMax)
assert.Equal(t, float64(60000), rto, "should match")
rto = calculateNextTimeout(1.0, 64)
rto = calculateNextTimeout(1.0, 64, defaultRTOMax)
assert.Equal(t, float64(60000), rto, "should match")
})
t.Run("calculateNextTimeout w/ RTOMax", func(t *testing.T) {
var rto float64
rto = calculateNextTimeout(1.0, 0, 2.0)
assert.Equal(t, 1.0, rto, "should match")
rto = calculateNextTimeout(1.5, 1, 2.0)
assert.Equal(t, 2.0, rto, "should match")
rto = calculateNextTimeout(1.0, 10, 2.0)
assert.Equal(t, 2.0, rto, "should match")
rto = calculateNextTimeout(1.0, 31, 1000.0)
assert.Equal(t, 1000.0, rto, "should match")
})

t.Run("reset", func(t *testing.T) {
m := newRTOManager()
m := newRTOManager(0)
for i := 0; i < 10; i++ {
m.setNewRTT(200)
}
Expand Down Expand Up @@ -118,7 +129,7 @@ func TestRtxTimer(t *testing.T) {
assert.Equal(t, timerID, id, "unexpted timer ID: %d", id)
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

assert.False(t, rt.isRunning(), "should not be running")

Expand All @@ -144,7 +155,7 @@ func TestRtxTimer(t *testing.T) {
assert.Equal(t, timerID, id, "unexpted timer ID: %d", id)
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

interval := float64(30.0)
ok := rt.start(interval)
Expand All @@ -171,7 +182,7 @@ func TestRtxTimer(t *testing.T) {
assert.Equal(t, timerID, id, "unexpted timer ID: %d", id)
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

interval := float64(30.0)
ok := rt.start(interval)
Expand All @@ -194,7 +205,7 @@ func TestRtxTimer(t *testing.T) {
assert.Equal(t, timerID, id, "unexpted timer ID: %d", id)
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

interval := float64(30.0)
ok := rt.start(interval)
Expand All @@ -221,7 +232,7 @@ func TestRtxTimer(t *testing.T) {
assert.Equal(t, timerID, id, "unexpted timer ID: %d", id)
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

for i := 0; i < 1000; i++ {
ok := rt.start(30)
Expand Down Expand Up @@ -253,7 +264,7 @@ func TestRtxTimer(t *testing.T) {
t.Logf("onRtxFailure: elapsed=%.03f\n", elapsed)
doneCh <- true
},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

// RTO(msec) Total(msec)
// 10 10 1st RTO
Expand Down Expand Up @@ -297,7 +308,7 @@ func TestRtxTimer(t *testing.T) {
onRtxFailure: func(id int) {
assert.Fail(t, "timer should not fail")
},
}, 0)
}, 0, 0)

// RTO(msec) Total(msec)
// 10 10 1st RTO
Expand Down Expand Up @@ -332,7 +343,7 @@ func TestRtxTimer(t *testing.T) {
doneCh <- true
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

for i := 0; i < 10; i++ {
rt.stop()
Expand All @@ -355,7 +366,7 @@ func TestRtxTimer(t *testing.T) {
rtoCount++
},
onRtxFailure: func(id int) {},
}, pathMaxRetrans)
}, pathMaxRetrans, 0)

ok := rt.start(20)
assert.True(t, ok, "should be accepted")
Expand Down

0 comments on commit 1520f1c

Please sign in to comment.