Skip to content

Commit

Permalink
Merge branch 'master' into c_w
Browse files Browse the repository at this point in the history
  • Loading branch information
daonb authored Mar 6, 2024
2 parents af23cc1 + 3f9fba4 commit 5de2b1c
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 50 deletions.
64 changes: 49 additions & 15 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func getAssociationStateString(a uint32) string {
//
// Note: No "CLOSED" state is illustrated since if a
// association is "CLOSED" its TCB SHOULD be removed.
// Note: By nature of an Association being constructed with one net.Conn,
// it is not a multi-home supporting implementation of SCTP.
type Association struct {
bytesReceived uint64
bytesSent uint64
Expand Down Expand Up @@ -238,6 +240,7 @@ type Association struct {
// Config collects the arguments to createAssociation construction into
// a single structure
type Config struct {
Name string
NetConn net.Conn
MaxReceiveBufferSize uint32
MaxMessageSize uint32
Expand Down Expand Up @@ -304,11 +307,17 @@ func createAssociation(config Config) *Association {

tsn := globalMathRandomGenerator.Uint32()
a := &Association{
netConn: config.NetConn,
maxReceiveBufferSize: maxReceiveBufferSize,
maxMessageSize: maxMessageSize,
netConn: config.NetConn,
maxReceiveBufferSize: maxReceiveBufferSize,
maxMessageSize: maxMessageSize,

// These two max values have us not need to follow
// 5.1.1 where this peer may be incapable of supporting
// the requested amount of outbound streams from the other
// peer.
myMaxNumOutboundStreams: math.MaxUint16,
myMaxNumInboundStreams: math.MaxUint16,

payloadQueue: newPayloadQueue(),
inflightQueue: newPayloadQueue(),
pendingQueue: newPendingQueue(),
Expand All @@ -335,9 +344,12 @@ func createAssociation(config Config) *Association {
silentError: ErrSilentlyDiscard,
stats: &associationStats{},
log: config.LoggerFactory.NewLogger("sctp"),
name: config.Name,
}

a.name = fmt.Sprintf("%p", a)
if a.name == "" {
a.name = fmt.Sprintf("%p", a)
}

// RFC 4690 Sec 7.2.1
// o The initial cwnd before DATA transmission or after a sufficiently
Expand Down Expand Up @@ -477,8 +489,11 @@ func (a *Association) Close() error {
<-a.readLoopCloseCh

a.log.Debugf("[%s] association closed", a.name)
a.log.Debugf("[%s] stats nPackets (in) : %d", a.name, a.stats.getNumPacketsReceived())
a.log.Debugf("[%s] stats nPackets (out) : %d", a.name, a.stats.getNumPacketsSent())
a.log.Debugf("[%s] stats nDATAs (in) : %d", a.name, a.stats.getNumDATAs())
a.log.Debugf("[%s] stats nSACKs (in) : %d", a.name, a.stats.getNumSACKs())
a.log.Debugf("[%s] stats nSACKs (in) : %d", a.name, a.stats.getNumSACKsReceived())
a.log.Debugf("[%s] stats nSACKs (out) : %d\n", a.name, a.stats.getNumSACKsSent())
a.log.Debugf("[%s] stats nT3Timeouts : %d", a.name, a.stats.getNumT3Timeouts())
a.log.Debugf("[%s] stats nAckTimeouts: %d", a.name, a.stats.getNumAckTimeouts())
a.log.Debugf("[%s] stats nFastRetrans: %d", a.name, a.stats.getNumFastRetrans())
Expand Down Expand Up @@ -548,7 +563,7 @@ func (a *Association) readLoop() {

a.log.Debugf("[%s] association closed", a.name)
a.log.Debugf("[%s] stats nDATAs (in) : %d", a.name, a.stats.getNumDATAs())
a.log.Debugf("[%s] stats nSACKs (in) : %d", a.name, a.stats.getNumSACKs())
a.log.Debugf("[%s] stats nSACKs (in) : %d", a.name, a.stats.getNumSACKsReceived())
a.log.Debugf("[%s] stats nT3Timeouts : %d", a.name, a.stats.getNumT3Timeouts())
a.log.Debugf("[%s] stats nAckTimeouts: %d", a.name, a.stats.getNumAckTimeouts())
a.log.Debugf("[%s] stats nFastRetrans: %d", a.name, a.stats.getNumFastRetrans())
Expand Down Expand Up @@ -597,6 +612,7 @@ loop:
break loop
}
atomic.AddUint64(&a.bytesSent, uint64(len(raw)))
a.stats.incPacketsSent()
}

if !ok {
Expand Down Expand Up @@ -671,15 +687,15 @@ func (a *Association) handleInbound(raw []byte) error {
return nil
}

a.handleChunkStart()
a.handleChunksStart()

for _, c := range p.chunks {
if err := a.handleChunk(p, c); err != nil {
return err
}
}

a.handleChunkEnd()
a.handleChunksEnd()

return nil
}
Expand Down Expand Up @@ -826,6 +842,7 @@ func (a *Association) gatherOutboundSackPackets(rawPackets [][]byte) [][]byte {
if a.ackState == ackStateImmediate {
a.ackState = ackStateIdle
sack := a.createSelectiveAckChunk()
a.stats.incSACKsSent()
a.log.Debugf("[%s] sending SACK: %s", a.name, sack)
raw, err := a.marshalPacket(a.createPacket([]chunk{sack}))
if err != nil {
Expand Down Expand Up @@ -1119,7 +1136,10 @@ func (a *Association) handleInit(p *packet, i *chunkInit) ([]*packet, error) {
return nil, fmt.Errorf("%w: %s", ErrHandleInitState, getAssociationStateString(state))
}

// Should we be setting any of these permanently until we've ACKed further?
// NOTE: Setting these prior to a reception of a COOKIE ECHO chunk containing
// our cookie is not compliant with https://www.rfc-editor.org/rfc/rfc9260#section-5.1-2.2.3.
// It makes us more vulnerable to resource attacks, albeit minimally so.
// https://www.rfc-editor.org/rfc/rfc9260#sec_handle_stream_parameters
a.myMaxNumInboundStreams = min16(i.numInboundStreams, a.myMaxNumInboundStreams)
a.myMaxNumOutboundStreams = min16(i.numOutboundStreams, a.myMaxNumOutboundStreams)
a.peerVerificationTag = i.initiateTag
Expand Down Expand Up @@ -1166,6 +1186,8 @@ func (a *Association) handleInit(p *packet, i *chunkInit) ([]*packet, error) {

if a.myCookie == nil {
var err error
// NOTE: This generation process is not compliant with
// 5.1.3. Generating State Cookie (https://www.rfc-editor.org/rfc/rfc4960#section-5.1.3)
if a.myCookie, err = newRandomStateCookie(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1305,13 +1327,16 @@ func (a *Association) handleCookieEcho(c *chunkCookieEcho) []*packet {
return nil
}

// RFC wise, these do not seem to belong here, but removing them
// causes TestCookieEchoRetransmission to break
a.t1Init.stop()
a.storedInit = nil

a.t1Cookie.stop()
a.storedCookieEcho = nil

a.setState(established)
// Note: This is a future place where the user could be notified (COMMUNICATION UP)
a.handshakeCompletedCh <- nil
}

Expand Down Expand Up @@ -1340,6 +1365,7 @@ func (a *Association) handleCookieAck() {
a.storedCookieEcho = nil

a.setState(established)
// Note: This is a future place where the user could be notified (COMMUNICATION UP)
a.handshakeCompletedCh <- nil
}

Expand All @@ -1353,9 +1379,9 @@ func (a *Association) handleData(d *chunkPayloadData) []*packet {
if canPush {
s := a.getOrCreateStream(d.streamIdentifier, true, PayloadTypeUnknown)
if s == nil {
// silentely discard the data. (sender will retry on T3-rtx timeout)
// silently discard the data. (sender will retry on T3-rtx timeout)
// see pion/sctp#30
a.log.Debugf("discard %d", d.streamSequenceNumber)
a.log.Debugf("[%s] discard %d", a.name, d.streamSequenceNumber)
return nil
}

Expand Down Expand Up @@ -1720,7 +1746,7 @@ func (a *Association) handleSack(d *chunkSelectiveAck) error {
return nil
}

a.stats.incSACKs()
a.stats.incSACKsReceived()

if sna32GT(a.cumulativeTSNAckPoint, d.cumulativeTSNAck) {
// RFC 4960 sec 6.2.1. Processing a Received SACK
Expand Down Expand Up @@ -2379,15 +2405,17 @@ func pack(p *packet) []*packet {
return []*packet{p}
}

func (a *Association) handleChunkStart() {
func (a *Association) handleChunksStart() {
a.lock.Lock()
defer a.lock.Unlock()

a.stats.incPacketsReceived()

a.delayedAckTriggered = false
a.immediateAckTriggered = false
}

func (a *Association) handleChunkEnd() {
func (a *Association) handleChunksEnd() {
a.lock.Lock()
defer a.lock.Unlock()

Expand All @@ -2410,13 +2438,18 @@ func (a *Association) handleChunk(p *packet, c chunk) error {
var err error

if _, err = c.check(); err != nil {
a.log.Errorf("[ %s ] failed validating chunk: %s ", a.name, err)
a.log.Errorf("[%s] failed validating chunk: %s ", a.name, err)
return nil
}

isAbort := false

switch c := c.(type) {
// Note: We do not do the following for chunkInit, chunkInitAck, and chunkCookieEcho:
// If an endpoint receives an INIT, INIT ACK, or COOKIE ECHO chunk but decides not to establish the
// new association due to missing mandatory parameters in the received INIT or INIT ACK chunk, invalid
// parameter values, or lack of local resources, it SHOULD respond with an ABORT chunk.

case *chunkInit:
packets, err = a.handleInit(p, c)

Expand All @@ -2434,6 +2467,7 @@ func (a *Association) handleChunk(p *packet, c chunk) error {
}
a.log.Debugf("[%s] Error chunk, with following errors: %s", a.name, errStr)

// Note: chunkHeartbeatAck not handled?
case *chunkHeartbeat:
packets = a.handleHeartbeat(c)

Expand Down
50 changes: 40 additions & 10 deletions association_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,30 @@ import (
)

type associationStats struct {
nDATAs uint64
nSACKs uint64
nT3Timeouts uint64
nAckTimeouts uint64
nFastRetrans uint64
nPacketsReceived uint64
nPacketsSent uint64
nDATAs uint64
nSACKsReceived uint64
nSACKsSent uint64
nT3Timeouts uint64
nAckTimeouts uint64
nFastRetrans uint64
}

func (s *associationStats) incPacketsReceived() {
atomic.AddUint64(&s.nPacketsReceived, 1)
}

func (s *associationStats) getNumPacketsReceived() uint64 {
return atomic.LoadUint64(&s.nPacketsReceived)
}

func (s *associationStats) incPacketsSent() {
atomic.AddUint64(&s.nPacketsSent, 1)
}

func (s *associationStats) getNumPacketsSent() uint64 {
return atomic.LoadUint64(&s.nPacketsSent)
}

func (s *associationStats) incDATAs() {
Expand All @@ -23,12 +42,20 @@ func (s *associationStats) getNumDATAs() uint64 {
return atomic.LoadUint64(&s.nDATAs)
}

func (s *associationStats) incSACKs() {
atomic.AddUint64(&s.nSACKs, 1)
func (s *associationStats) incSACKsReceived() {
atomic.AddUint64(&s.nSACKsReceived, 1)
}

func (s *associationStats) getNumSACKsReceived() uint64 {
return atomic.LoadUint64(&s.nSACKsReceived)
}

func (s *associationStats) incSACKsSent() {
atomic.AddUint64(&s.nSACKsSent, 1)
}

func (s *associationStats) getNumSACKs() uint64 {
return atomic.LoadUint64(&s.nSACKs)
func (s *associationStats) getNumSACKsSent() uint64 {
return atomic.LoadUint64(&s.nSACKsSent)
}

func (s *associationStats) incT3Timeouts() {
Expand Down Expand Up @@ -56,8 +83,11 @@ func (s *associationStats) getNumFastRetrans() uint64 {
}

func (s *associationStats) reset() {
atomic.StoreUint64(&s.nPacketsReceived, 0)
atomic.StoreUint64(&s.nPacketsSent, 0)
atomic.StoreUint64(&s.nDATAs, 0)
atomic.StoreUint64(&s.nSACKs, 0)
atomic.StoreUint64(&s.nSACKsReceived, 0)
atomic.StoreUint64(&s.nSACKsSent, 0)
atomic.StoreUint64(&s.nT3Timeouts, 0)
atomic.StoreUint64(&s.nAckTimeouts, 0)
atomic.StoreUint64(&s.nFastRetrans, 0)
Expand Down
Loading

0 comments on commit 5de2b1c

Please sign in to comment.