Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various cleanup and fixes #302

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 137 additions & 63 deletions association.go

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions association_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,32 @@ import (
)

type associationStats struct {
nPackets uint64
nPacketsSent uint64
nDATAs uint64
nSACKs uint64
nSACKsSent uint64
nT3Timeouts uint64
nAckTimeouts uint64
nFastRetrans uint64
}

func (s *associationStats) incPackets() {
atomic.AddUint64(&s.nPackets, 1)
}

func (s *associationStats) getNumPackets() uint64 {
return atomic.LoadUint64(&s.nPackets)
}

func (s *associationStats) incPacketsSent() {
atomic.AddUint64(&s.nPacketsSent, 1)
}

func (s *associationStats) getNumPacketsSent() uint64 {
return atomic.LoadUint64(&s.nPacketsSent)
}

func (s *associationStats) incDATAs() {
atomic.AddUint64(&s.nDATAs, 1)
}
Expand All @@ -31,6 +50,14 @@ func (s *associationStats) getNumSACKs() uint64 {
return atomic.LoadUint64(&s.nSACKs)
}

func (s *associationStats) incSACKsSent() {
atomic.AddUint64(&s.nSACKsSent, 1)
}

func (s *associationStats) getNumSACKsSent() uint64 {
return atomic.LoadUint64(&s.nSACKsSent)
}

func (s *associationStats) incT3Timeouts() {
atomic.AddUint64(&s.nT3Timeouts, 1)
}
Expand All @@ -56,8 +83,11 @@ func (s *associationStats) getNumFastRetrans() uint64 {
}

func (s *associationStats) reset() {
atomic.StoreUint64(&s.nPackets, 0)
atomic.StoreUint64(&s.nPacketsSent, 0)
atomic.StoreUint64(&s.nDATAs, 0)
atomic.StoreUint64(&s.nSACKs, 0)
atomic.StoreUint64(&s.nSACKsSent, 0)
atomic.StoreUint64(&s.nT3Timeouts, 0)
atomic.StoreUint64(&s.nAckTimeouts, 0)
atomic.StoreUint64(&s.nFastRetrans, 0)
Expand Down
9 changes: 7 additions & 2 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,19 @@ func createNewAssociationPair(br *test.Bridge, ackMode int, recvBufSize uint32)

go func() {
a0, err0 = Client(Config{
Name: "a0",
NetConn: br.GetConn0(),
MaxReceiveBufferSize: recvBufSize,
LoggerFactory: loggerFactory,
})
handshake0Ch <- true
}()
go func() {
a1, err1 = Client(Config{
// we could have two "client"s here but it's more
// standard to have one peer starting initialization and
// another waiting for the initialization to be requested (INIT).
a1, err1 = Server(Config{
Name: "a1",
NetConn: br.GetConn1(),
MaxReceiveBufferSize: recvBufSize,
LoggerFactory: loggerFactory,
Expand Down Expand Up @@ -1752,7 +1757,7 @@ func TestAssocT3RtxTimer(t *testing.T) {
}

func TestAssocCongestionControl(t *testing.T) {
// sbuf - large enobh not to be bundled
// sbuf - large enough not to be bundled
sbuf := make([]byte, 1000)
for i := 0; i < len(sbuf); i++ {
sbuf[i] = byte(i & 0xcc)
Expand Down
4 changes: 4 additions & 0 deletions chunk_payload_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,7 @@ func (p *chunkPayloadData) setAllInflight() {
}
}
}

func (p *chunkPayloadData) isFragmented() bool {
return !(p.head == nil && p.beginningFragment && p.endingFragment)
}
16 changes: 15 additions & 1 deletion control_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@

package sctp

import (
"sync"
)

// control queue

type controlQueue struct {
mu sync.RWMutex
queue []*packet
}

Expand All @@ -14,19 +19,28 @@ func newControlQueue() *controlQueue {
}

func (q *controlQueue) push(c *packet) {
q.mu.Lock()
q.queue = append(q.queue, c)
q.mu.Unlock()
}

func (q *controlQueue) pushAll(packets []*packet) {
q.mu.Lock()
q.queue = append(q.queue, packets...)
q.mu.Unlock()
}

func (q *controlQueue) popAll() []*packet {
q.mu.Lock()
packets := q.queue
q.queue = []*packet{}
q.mu.Unlock()
return packets
}

func (q *controlQueue) size() int {
return len(q.queue)
q.mu.RLock()
size := len(q.queue)
q.mu.RUnlock()
return size
}
8 changes: 4 additions & 4 deletions packet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ func TestPacketUnmarshal(t *testing.T) {
switch {
case err != nil:
t.Errorf("Unmarshal failed for SCTP packet with no chunks: %v", err)
case pkt.sourcePort != 5000:
t.Errorf("Unmarshal passed for SCTP packet, but got incorrect source port exp: %d act: %d", 5000, pkt.sourcePort)
case pkt.destinationPort != 5000:
t.Errorf("Unmarshal passed for SCTP packet, but got incorrect destination port exp: %d act: %d", 5000, pkt.destinationPort)
case pkt.sourcePort != defaultSCTPSrcDstPort:
t.Errorf("Unmarshal passed for SCTP packet, but got incorrect source port exp: %d act: %d", defaultSCTPSrcDstPort, pkt.sourcePort)
case pkt.destinationPort != defaultSCTPSrcDstPort:
t.Errorf("Unmarshal passed for SCTP packet, but got incorrect destination port exp: %d act: %d", defaultSCTPSrcDstPort, pkt.destinationPort)
case pkt.verificationTag != 0:
t.Errorf("Unmarshal passed for SCTP packet, but got incorrect verification tag exp: %d act: %d", 0, pkt.verificationTag)
}
Expand Down
49 changes: 46 additions & 3 deletions payload_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import (
"fmt"
"sort"
"sync"
)

type payloadQueue struct {
mu sync.RWMutex
chunkMap map[uint32]*chunkPayloadData
sorted []uint32
dupTSN []uint32
Expand All @@ -19,7 +21,15 @@
return &payloadQueue{chunkMap: map[uint32]*chunkPayloadData{}}
}

//nolint:unused
func (q *payloadQueue) updateSortedKeys() {
q.mu.Lock()
defer q.mu.Unlock()

q.updateSortedKeysWithLock()

Check warning on line 29 in payload_queue.go

View check run for this annotation

Codecov / codecov/patch

payload_queue.go#L26-L29

Added lines #L26 - L29 were not covered by tests
}

func (q *payloadQueue) updateSortedKeysWithLock() {
if q.sorted != nil {
return
}
Expand All @@ -37,6 +47,9 @@
}

func (q *payloadQueue) canPush(p *chunkPayloadData, cumulativeTSN uint32) bool {
q.mu.RLock()
defer q.mu.RUnlock()

_, ok := q.chunkMap[p.tsn]
if ok || sna32LTE(p.tsn, cumulativeTSN) {
return false
Expand All @@ -45,6 +58,9 @@
}

func (q *payloadQueue) pushNoCheck(p *chunkPayloadData) {
q.mu.Lock()
defer q.mu.Unlock()

q.chunkMap[p.tsn] = p
q.nBytes += len(p.userData)
q.sorted = nil
Expand All @@ -54,6 +70,9 @@
// older than our cumulativeTSN marker, it will be recored as duplications,
// which can later be retrieved using popDuplicates.
func (q *payloadQueue) push(p *chunkPayloadData, cumulativeTSN uint32) bool {
q.mu.Lock()
defer q.mu.Unlock()

_, ok := q.chunkMap[p.tsn]
if ok || sna32LTE(p.tsn, cumulativeTSN) {
// Found the packet, log in dups
Expand All @@ -69,7 +88,10 @@

// pop pops only if the oldest chunk's TSN matches the given TSN.
func (q *payloadQueue) pop(tsn uint32) (*chunkPayloadData, bool) {
q.updateSortedKeys()
q.mu.Lock()
defer q.mu.Unlock()

q.updateSortedKeysWithLock()

if len(q.chunkMap) > 0 && tsn == q.sorted[0] {
q.sorted = q.sorted[1:]
Expand All @@ -85,25 +107,34 @@

// get returns reference to chunkPayloadData with the given TSN value.
func (q *payloadQueue) get(tsn uint32) (*chunkPayloadData, bool) {
q.mu.RLock()
defer q.mu.RUnlock()

c, ok := q.chunkMap[tsn]
return c, ok
}

// popDuplicates returns an array of TSN values that were found duplicate.
func (q *payloadQueue) popDuplicates() []uint32 {
q.mu.Lock()
defer q.mu.Unlock()

dups := q.dupTSN
q.dupTSN = []uint32{}
return dups
}

func (q *payloadQueue) getGapAckBlocks(cumulativeTSN uint32) (gapAckBlocks []gapAckBlock) {
q.mu.Lock()
defer q.mu.Unlock()

var b gapAckBlock

if len(q.chunkMap) == 0 {
return []gapAckBlock{}
}

q.updateSortedKeys()
q.updateSortedKeysWithLock()

for i, tsn := range q.sorted {
if i == 0 {
Expand Down Expand Up @@ -155,7 +186,10 @@
}

func (q *payloadQueue) getLastTSNReceived() (uint32, bool) {
q.updateSortedKeys()
q.mu.Lock()
defer q.mu.Unlock()

q.updateSortedKeysWithLock()

qlen := len(q.sorted)
if qlen == 0 {
Expand All @@ -165,6 +199,9 @@
}

func (q *payloadQueue) markAllToRetrasmit() {
q.mu.Lock()
defer q.mu.Unlock()

for _, c := range q.chunkMap {
if c.acked || c.abandoned() {
continue
Expand All @@ -174,9 +211,15 @@
}

func (q *payloadQueue) getNumBytes() int {
q.mu.RLock()
defer q.mu.RUnlock()

return q.nBytes
}

func (q *payloadQueue) size() int {
q.mu.RLock()
defer q.mu.RUnlock()

return len(q.chunkMap)
}
19 changes: 15 additions & 4 deletions pending_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ package sctp

import (
"errors"
"sync"
"sync/atomic"
)

// pendingBaseQueue

type pendingBaseQueue struct {
mu sync.RWMutex
queue []*chunkPayloadData
}

Expand All @@ -18,10 +21,14 @@ func newPendingBaseQueue() *pendingBaseQueue {
}

func (q *pendingBaseQueue) push(c *chunkPayloadData) {
q.mu.Lock()
q.queue = append(q.queue, c)
q.mu.Unlock()
}

func (q *pendingBaseQueue) pop() *chunkPayloadData {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.queue) == 0 {
return nil
}
Expand All @@ -31,13 +38,17 @@ func (q *pendingBaseQueue) pop() *chunkPayloadData {
}

func (q *pendingBaseQueue) get(i int) *chunkPayloadData {
q.mu.RLock()
defer q.mu.RUnlock()
if len(q.queue) == 0 || i < 0 || i >= len(q.queue) {
return nil
}
return q.queue[i]
}

func (q *pendingBaseQueue) size() int {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.queue)
}

Expand All @@ -46,7 +57,7 @@ func (q *pendingBaseQueue) size() int {
type pendingQueue struct {
unorderedQueue *pendingBaseQueue
orderedQueue *pendingBaseQueue
nBytes int
nBytes uint64
selected bool
unorderedIsSelected bool
}
Expand All @@ -71,7 +82,7 @@ func (q *pendingQueue) push(c *chunkPayloadData) {
} else {
q.orderedQueue.push(c)
}
q.nBytes += len(c.userData)
atomic.AddUint64(&q.nBytes, uint64(len(c.userData)))
}

func (q *pendingQueue) peek() *chunkPayloadData {
Expand Down Expand Up @@ -129,12 +140,12 @@ func (q *pendingQueue) pop(c *chunkPayloadData) error {
}
}
}
q.nBytes -= len(c.userData)
atomic.AddUint64(&q.nBytes, -uint64(len(c.userData)))
return nil
}

func (q *pendingQueue) getNumBytes() int {
return q.nBytes
return int(atomic.LoadUint64(&q.nBytes))
}

func (q *pendingQueue) size() int {
Expand Down
Loading
Loading