Skip to content

Commit

Permalink
Various cleanup and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
edaniels committed Feb 24, 2024
1 parent 5359da5 commit dd83fe5
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 101 deletions.
195 changes: 132 additions & 63 deletions association.go

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions association_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,32 @@ import (
)

type associationStats struct {
nPackets uint64

Check failure on line 11 in association_stats.go

View workflow job for this annotation

GitHub Actions / lint / Go

field `nPackets` is unused (unused)
nPacketsSent uint64
nDATAs uint64
nSACKs uint64
nSACKsSent uint64

Check failure on line 15 in association_stats.go

View workflow job for this annotation

GitHub Actions / lint / Go

field `nSACKsSent` is unused (unused)
nT3Timeouts uint64
nAckTimeouts uint64
nFastRetrans uint64
}

func (s *associationStats) incPackets() {

Check failure on line 21 in association_stats.go

View workflow job for this annotation

GitHub Actions / lint / Go

func `(*associationStats).incPackets` is unused (unused)
atomic.AddUint64(&s.nPackets, 1)

Check warning on line 22 in association_stats.go

View check run for this annotation

Codecov / codecov/patch

association_stats.go#L21-L22

Added lines #L21 - L22 were not covered by tests
}

func (s *associationStats) getNumPackets() uint64 {

Check failure on line 25 in association_stats.go

View workflow job for this annotation

GitHub Actions / lint / Go

func `(*associationStats).getNumPackets` is unused (unused)
return atomic.LoadUint64(&s.nPackets)

Check warning on line 26 in association_stats.go

View check run for this annotation

Codecov / codecov/patch

association_stats.go#L25-L26

Added lines #L25 - L26 were not covered by tests
}

func (s *associationStats) incPacketsSent() {
atomic.AddUint64(&s.nPacketsSent, 1)
}

func (s *associationStats) getNumPacketsSent() uint64 {

Check failure on line 33 in association_stats.go

View workflow job for this annotation

GitHub Actions / lint / Go

func `(*associationStats).getNumPacketsSent` is unused (unused)
return atomic.LoadUint64(&s.nPacketsSent)

Check warning on line 34 in association_stats.go

View check run for this annotation

Codecov / codecov/patch

association_stats.go#L33-L34

Added lines #L33 - L34 were not covered by tests
}

func (s *associationStats) incDATAs() {
atomic.AddUint64(&s.nDATAs, 1)
}
Expand All @@ -31,6 +50,14 @@ func (s *associationStats) getNumSACKs() uint64 {
return atomic.LoadUint64(&s.nSACKs)
}

func (s *associationStats) incSACKsSent() {

Check failure on line 53 in association_stats.go

View workflow job for this annotation

GitHub Actions / lint / Go

func `(*associationStats).incSACKsSent` is unused (unused)
atomic.AddUint64(&s.nSACKsSent, 1)

Check warning on line 54 in association_stats.go

View check run for this annotation

Codecov / codecov/patch

association_stats.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}

func (s *associationStats) getNumSACKsSent() uint64 {

Check failure on line 57 in association_stats.go

View workflow job for this annotation

GitHub Actions / lint / Go

func `(*associationStats).getNumSACKsSent` is unused (unused)
return atomic.LoadUint64(&s.nSACKsSent)

Check warning on line 58 in association_stats.go

View check run for this annotation

Codecov / codecov/patch

association_stats.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}

func (s *associationStats) incT3Timeouts() {
atomic.AddUint64(&s.nT3Timeouts, 1)
}
Expand Down
9 changes: 7 additions & 2 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,19 @@ func createNewAssociationPair(br *test.Bridge, ackMode int, recvBufSize uint32)

go func() {
a0, err0 = Client(Config{
Name: "a0",
NetConn: br.GetConn0(),
MaxReceiveBufferSize: recvBufSize,
LoggerFactory: loggerFactory,
})
handshake0Ch <- true
}()
go func() {
a1, err1 = Client(Config{
// we could have two "client"s here but it's more
// standard to have one peer starting initialization and
// another waiting for the initialization to be requested (INIT).
a1, err1 = Server(Config{
Name: "a1",
NetConn: br.GetConn1(),
MaxReceiveBufferSize: recvBufSize,
LoggerFactory: loggerFactory,
Expand Down Expand Up @@ -1752,7 +1757,7 @@ func TestAssocT3RtxTimer(t *testing.T) {
}

func TestAssocCongestionControl(t *testing.T) {
// sbuf - large enobh not to be bundled
// sbuf - large enough not to be bundled
sbuf := make([]byte, 1000)
for i := 0; i < len(sbuf); i++ {
sbuf[i] = byte(i & 0xcc)
Expand Down
4 changes: 4 additions & 0 deletions chunk_payload_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,7 @@ func (p *chunkPayloadData) setAllInflight() {
}
}
}

func (p *chunkPayloadData) isFragmented() bool {
return !(p.head == nil && p.beginningFragment && p.endingFragment)
}
16 changes: 15 additions & 1 deletion control_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@

package sctp

import (
"sync"
)

// control queue

type controlQueue struct {
mu sync.RWMutex
queue []*packet
}

Expand All @@ -14,19 +19,28 @@ func newControlQueue() *controlQueue {
}

func (q *controlQueue) push(c *packet) {
q.mu.Lock()
q.queue = append(q.queue, c)
q.mu.Unlock()
}

func (q *controlQueue) pushAll(packets []*packet) {
q.mu.Lock()
q.queue = append(q.queue, packets...)
q.mu.Unlock()
}

func (q *controlQueue) popAll() []*packet {
q.mu.Lock()
packets := q.queue
q.queue = []*packet{}
q.mu.Unlock()
return packets
}

func (q *controlQueue) size() int {
return len(q.queue)
q.mu.RLock()
size := len(q.queue)
q.mu.RUnlock()
return size
}
8 changes: 4 additions & 4 deletions packet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ func TestPacketUnmarshal(t *testing.T) {
switch {
case err != nil:
t.Errorf("Unmarshal failed for SCTP packet with no chunks: %v", err)
case pkt.sourcePort != 5000:
t.Errorf("Unmarshal passed for SCTP packet, but got incorrect source port exp: %d act: %d", 5000, pkt.sourcePort)
case pkt.destinationPort != 5000:
t.Errorf("Unmarshal passed for SCTP packet, but got incorrect destination port exp: %d act: %d", 5000, pkt.destinationPort)
case pkt.sourcePort != defaultSCTPSrcDstPort:
t.Errorf("Unmarshal passed for SCTP packet, but got incorrect source port exp: %d act: %d", defaultSCTPSrcDstPort, pkt.sourcePort)
case pkt.destinationPort != defaultSCTPSrcDstPort:
t.Errorf("Unmarshal passed for SCTP packet, but got incorrect destination port exp: %d act: %d", defaultSCTPSrcDstPort, pkt.destinationPort)
case pkt.verificationTag != 0:
t.Errorf("Unmarshal passed for SCTP packet, but got incorrect verification tag exp: %d act: %d", 0, pkt.verificationTag)
}
Expand Down
46 changes: 43 additions & 3 deletions payload_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ package sctp
import (
"fmt"
"sort"
"sync"
)

type payloadQueue struct {
mu sync.RWMutex
chunkMap map[uint32]*chunkPayloadData
sorted []uint32
dupTSN []uint32
Expand All @@ -20,6 +22,11 @@ func newPayloadQueue() *payloadQueue {
}

func (q *payloadQueue) updateSortedKeys() {

Check failure on line 24 in payload_queue.go

View workflow job for this annotation

GitHub Actions / lint / Go

func `(*payloadQueue).updateSortedKeys` is unused (unused)
q.mu.Lock()
defer q.mu.Unlock()

Check warning on line 26 in payload_queue.go

View check run for this annotation

Codecov / codecov/patch

payload_queue.go#L25-L26

Added lines #L25 - L26 were not covered by tests
}

func (q *payloadQueue) updateSortedKeysWithLock() {
if q.sorted != nil {
return
}
Expand All @@ -37,6 +44,9 @@ func (q *payloadQueue) updateSortedKeys() {
}

func (q *payloadQueue) canPush(p *chunkPayloadData, cumulativeTSN uint32) bool {
q.mu.RLock()
defer q.mu.RUnlock()

_, ok := q.chunkMap[p.tsn]
if ok || sna32LTE(p.tsn, cumulativeTSN) {
return false
Expand All @@ -45,6 +55,9 @@ func (q *payloadQueue) canPush(p *chunkPayloadData, cumulativeTSN uint32) bool {
}

func (q *payloadQueue) pushNoCheck(p *chunkPayloadData) {
q.mu.Lock()
defer q.mu.Unlock()

q.chunkMap[p.tsn] = p
q.nBytes += len(p.userData)
q.sorted = nil
Expand All @@ -54,6 +67,9 @@ func (q *payloadQueue) pushNoCheck(p *chunkPayloadData) {
// older than our cumulativeTSN marker, it will be recored as duplications,
// which can later be retrieved using popDuplicates.
func (q *payloadQueue) push(p *chunkPayloadData, cumulativeTSN uint32) bool {
q.mu.Lock()
defer q.mu.Unlock()

_, ok := q.chunkMap[p.tsn]
if ok || sna32LTE(p.tsn, cumulativeTSN) {
// Found the packet, log in dups
Expand All @@ -69,7 +85,10 @@ func (q *payloadQueue) push(p *chunkPayloadData, cumulativeTSN uint32) bool {

// pop pops only if the oldest chunk's TSN matches the given TSN.
func (q *payloadQueue) pop(tsn uint32) (*chunkPayloadData, bool) {
q.updateSortedKeys()
q.mu.Lock()
defer q.mu.Unlock()

q.updateSortedKeysWithLock()

if len(q.chunkMap) > 0 && tsn == q.sorted[0] {
q.sorted = q.sorted[1:]
Expand All @@ -85,25 +104,34 @@ func (q *payloadQueue) pop(tsn uint32) (*chunkPayloadData, bool) {

// get returns reference to chunkPayloadData with the given TSN value.
func (q *payloadQueue) get(tsn uint32) (*chunkPayloadData, bool) {
q.mu.RLock()
defer q.mu.RUnlock()

c, ok := q.chunkMap[tsn]
return c, ok
}

// popDuplicates returns an array of TSN values that were found duplicate.
func (q *payloadQueue) popDuplicates() []uint32 {
q.mu.Lock()
defer q.mu.Unlock()

dups := q.dupTSN
q.dupTSN = []uint32{}
return dups
}

func (q *payloadQueue) getGapAckBlocks(cumulativeTSN uint32) (gapAckBlocks []gapAckBlock) {
q.mu.Lock()
defer q.mu.Unlock()

var b gapAckBlock

if len(q.chunkMap) == 0 {
return []gapAckBlock{}
}

q.updateSortedKeys()
q.updateSortedKeysWithLock()

for i, tsn := range q.sorted {
if i == 0 {
Expand Down Expand Up @@ -155,7 +183,10 @@ func (q *payloadQueue) markAsAcked(tsn uint32) int {
}

func (q *payloadQueue) getLastTSNReceived() (uint32, bool) {
q.updateSortedKeys()
q.mu.Lock()
defer q.mu.Unlock()

q.updateSortedKeysWithLock()

qlen := len(q.sorted)
if qlen == 0 {
Expand All @@ -165,6 +196,9 @@ func (q *payloadQueue) getLastTSNReceived() (uint32, bool) {
}

func (q *payloadQueue) markAllToRetrasmit() {
q.mu.Lock()
defer q.mu.Unlock()

for _, c := range q.chunkMap {
if c.acked || c.abandoned() {
continue
Expand All @@ -174,9 +208,15 @@ func (q *payloadQueue) markAllToRetrasmit() {
}

func (q *payloadQueue) getNumBytes() int {
q.mu.RLock()
defer q.mu.RUnlock()

return q.nBytes
}

func (q *payloadQueue) size() int {
q.mu.RLock()
defer q.mu.RUnlock()

return len(q.chunkMap)
}
19 changes: 15 additions & 4 deletions pending_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ package sctp

import (
"errors"
"sync"
"sync/atomic"
)

// pendingBaseQueue

type pendingBaseQueue struct {
mu sync.RWMutex
queue []*chunkPayloadData
}

Expand All @@ -18,10 +21,14 @@ func newPendingBaseQueue() *pendingBaseQueue {
}

func (q *pendingBaseQueue) push(c *chunkPayloadData) {
q.mu.Lock()
q.queue = append(q.queue, c)
q.mu.Unlock()
}

func (q *pendingBaseQueue) pop() *chunkPayloadData {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.queue) == 0 {
return nil
}
Expand All @@ -31,13 +38,17 @@ func (q *pendingBaseQueue) pop() *chunkPayloadData {
}

func (q *pendingBaseQueue) get(i int) *chunkPayloadData {
q.mu.RLock()
defer q.mu.RUnlock()
if len(q.queue) == 0 || i < 0 || i >= len(q.queue) {
return nil
}
return q.queue[i]
}

func (q *pendingBaseQueue) size() int {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.queue)
}

Expand All @@ -46,7 +57,7 @@ func (q *pendingBaseQueue) size() int {
type pendingQueue struct {
unorderedQueue *pendingBaseQueue
orderedQueue *pendingBaseQueue
nBytes int
nBytes uint64
selected bool
unorderedIsSelected bool
}
Expand All @@ -71,7 +82,7 @@ func (q *pendingQueue) push(c *chunkPayloadData) {
} else {
q.orderedQueue.push(c)
}
q.nBytes += len(c.userData)
atomic.AddUint64(&q.nBytes, uint64(len(c.userData)))
}

func (q *pendingQueue) peek() *chunkPayloadData {
Expand Down Expand Up @@ -129,12 +140,12 @@ func (q *pendingQueue) pop(c *chunkPayloadData) error {
}
}
}
q.nBytes -= len(c.userData)
atomic.AddUint64(&q.nBytes, -uint64(len(c.userData)))
return nil
}

func (q *pendingQueue) getNumBytes() int {
return q.nBytes
return int(atomic.LoadUint64(&q.nBytes))
}

func (q *pendingQueue) size() int {
Expand Down
Loading

0 comments on commit dd83fe5

Please sign in to comment.