Skip to content

Commit

Permalink
Use dumbConn2 in createAssocs
Browse files Browse the repository at this point in the history
This is avoid the issue with unexpected errors occur on Ubuntu linux.
Relates to #270
  • Loading branch information
enobufs committed Jan 1, 2024
1 parent f8e83fa commit 6aa7423
Showing 1 changed file with 77 additions and 62 deletions.
139 changes: 77 additions & 62 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2564,88 +2564,104 @@ func TestAssocMaxMessageSize(t *testing.T) {
})
}

// udpConnWrapper wraps a *net.UDPConn and implements net.Conn interface.
type udpConnWrapper struct {
conn *net.UDPConn
remoteAddr net.Addr
type dumbConnInboundHandler func([]byte)

type dumbConn2 struct {
net.Conn
packets [][]byte
closed bool
localAddr net.Addr
remoteAddr net.Addr
remoteInboundHandler dumbConnInboundHandler
mutex sync.Mutex
cond *sync.Cond
}

func newUDPConnWrapper(conn *net.UDPConn, remoteAddr net.Addr) net.Conn {
return &udpConnWrapper{
conn: conn,
func newDumbConn2(localAddr, remoteAddr net.Addr) *dumbConn2 {
c := &dumbConn2{
packets: [][]byte{},
localAddr: localAddr,
remoteAddr: remoteAddr,
}
c.cond = sync.NewCond(&c.mutex)
return c
}

// Implement the net.Conn interface methods
func (w *udpConnWrapper) Read(b []byte) (n int, err error) {
// w.conn.ReadFrom(b)
n, _, err = w.conn.ReadFrom(b)
return n, err
}
func (c *dumbConn2) Read(b []byte) (n int, err error) {
c.mutex.Lock()
defer c.mutex.Unlock()

func (w *udpConnWrapper) Write(b []byte) (n int, err error) {
return w.conn.WriteTo(b, w.remoteAddr)
}
for {
if len(c.packets) > 0 {
packet := c.packets[0]
c.packets = c.packets[1:]
n := copy(b, packet)
return n, nil
}

func (w *udpConnWrapper) Close() error {
return w.conn.Close()
}
if c.closed {
return 0, io.EOF
}

func (w *udpConnWrapper) LocalAddr() net.Addr {
return w.conn.LocalAddr()
c.cond.Wait()
}
}

func (w *udpConnWrapper) RemoteAddr() net.Addr {
return w.remoteAddr
}
func (c *dumbConn2) Write(b []byte) (int, error) {
c.mutex.Lock()
closed := c.closed
c.mutex.Unlock()

func (w *udpConnWrapper) SetDeadline(t time.Time) error {
return w.conn.SetDeadline(t)
if closed {
return 0, &net.OpError{Op: "write", Net: "udp", Addr: c.remoteAddr, Err: net.ErrClosed}
}
c.remoteInboundHandler(b)
return len(b), nil
}

func (w *udpConnWrapper) SetReadDeadline(t time.Time) error {
return w.conn.SetReadDeadline(t)
}
func (c *dumbConn2) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()

func (w *udpConnWrapper) SetWriteDeadline(t time.Time) error {
return w.conn.SetWriteDeadline(t)
c.closed = true
c.cond.Signal()
return nil
}

// crateUDPConnPair creates a pair of net.UDPConn objects that are connected with each other
func createUDPConnPair(t *testing.T) (net.Conn, net.Conn, error) {
udp1, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1")})
if err != nil {
return nil, nil, err
}
addr1, ok := udp1.LocalAddr().(*net.UDPAddr)
require.True(t, ok)
func (c *dumbConn2) LocalAddr() net.Addr {
// Unused by Association
return c.localAddr
}

udp2, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1")})
if err != nil {
return nil, nil, err
}
addr2, ok := udp2.LocalAddr().(*net.UDPAddr)
require.True(t, ok)
func (c *dumbConn2) RemoteAddr() net.Addr {
// Unused by Association
return c.remoteAddr
}

conn1 := newUDPConnWrapper(udp1, addr2)
if err != nil {
return nil, nil, err
}
func (c *dumbConn2) inboundHandler(packet []byte) {
c.mutex.Lock()
defer c.mutex.Unlock()

conn2 := newUDPConnWrapper(udp2, addr1)
if err != nil {
return nil, nil, err
if !c.closed {
c.packets = append(c.packets, packet)
c.cond.Signal()
}
}

return conn1, conn2, nil
// crateUDPConnPair creates a pair of net.UDPConn objects that are connected with each other
func createUDPConnPair() (net.Conn, net.Conn) {
addr1 := &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 1234}
addr2 := &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 5678}
conn1 := newDumbConn2(addr1, addr2)
conn2 := newDumbConn2(addr2, addr1)
conn1.remoteInboundHandler = conn2.inboundHandler
conn2.remoteInboundHandler = conn1.inboundHandler
return conn1, conn2
}

func createAssocs(t *testing.T) (*Association, *Association, error) {
udp1, udp2, err := createUDPConnPair(t)
if err != nil {
return nil, nil, err
}
func createAssocs() (*Association, *Association, error) {
udp1, udp2 := createUDPConnPair()

loggerFactory := logging.NewDefaultLoggerFactory()

Expand Down Expand Up @@ -2713,7 +2729,7 @@ loop:
func TestAssociation_Shutdown(t *testing.T) {
checkGoroutineLeaks(t)

a1, a2, err := createAssocs(t)
a1, a2, err := createAssocs()
require.NoError(t, err)

s11, err := a1.OpenStream(1, PayloadTypeWebRTCString)
Expand Down Expand Up @@ -2751,7 +2767,7 @@ func TestAssociation_Shutdown(t *testing.T) {
func TestAssociation_ShutdownDuringWrite(t *testing.T) {
checkGoroutineLeaks(t)

a1, a2, err := createAssocs(t)
a1, a2, err := createAssocs()
require.NoError(t, err)

s11, err := a1.OpenStream(1, PayloadTypeWebRTCString)
Expand Down Expand Up @@ -2962,7 +2978,7 @@ func TestAssociation_HandlePacketInCookieWaitState(t *testing.T) {
func TestAssociation_Abort(t *testing.T) {
checkGoroutineLeaks(t)

a1, a2, err := createAssocs(t)
a1, a2, err := createAssocs()
require.NoError(t, err)

s11, err := a1.OpenStream(1, PayloadTypeWebRTCString)
Expand Down Expand Up @@ -3001,8 +3017,7 @@ func TestAssociation_Abort(t *testing.T) {
func TestAssociation_createClientWithContext(t *testing.T) {
checkGoroutineLeaks(t)

udp1, udp2, err := createUDPConnPair(t)
require.NoError(t, err)
udp1, udp2 := createUDPConnPair()

loggerFactory := logging.NewDefaultLoggerFactory()

Expand Down

0 comments on commit 6aa7423

Please sign in to comment.