-
Notifications
You must be signed in to change notification settings - Fork 3
/
server.go
121 lines (105 loc) · 2.41 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package rudp
import (
"net"
"sync/atomic"
"time"
dcep "github.com/pion/datachannel"
"github.com/pion/logging"
"github.com/pion/sctp"
)
// Server ...
type Server struct {
conn *serverConn
bufferSize int
assoc *sctp.Association
closed atomic.Value // bool
onClosed func()
loggerFactory logging.LoggerFactory
log logging.LeveledLogger
}
type serverConfig struct {
conn net.PacketConn
remAddr net.Addr
bufferSize int
onHandshakeComplete func()
onClosed func()
loggerFactory logging.LoggerFactory
}
func newServer(config *serverConfig) (*Server, error) {
log := config.loggerFactory.NewLogger("rudp-s")
svrConn := newServerConn(
config.conn,
config.remAddr,
log)
s := &Server{
conn: svrConn,
bufferSize: config.bufferSize,
onClosed: config.onClosed,
loggerFactory: config.loggerFactory,
log: log,
}
s.closed.Store(false)
go func() {
s.log.Debug("handlshake started")
var err error
s.assoc, err = sctp.Server(sctp.Config{
LoggerFactory: s.loggerFactory,
MaxReceiveBufferSize: uint32(s.bufferSize),
NetConn: s.conn,
})
if err != nil {
s.log.Error(err.Error())
return
}
config.onHandshakeComplete()
}()
return s, nil
}
func (s *Server) handleInbound(data []byte) {
if s.closed.Load().(bool) {
return
}
s.log.Debugf("Server: handleInboud: %d bytes", len(data))
s.conn.handleInbound(data)
}
// AcceptChannel ...
func (s *Server) AcceptChannel() (Channel, error) {
s.log.Debug("accept stream")
cfg := dcep.Config{LoggerFactory: s.loggerFactory}
dcepCh, err := dcep.Accept(s.assoc, &cfg)
if err != nil {
return nil, err
}
dc := &dataChannel{
dc: dcepCh,
config: Config{
ChannelType: cfg.ChannelType,
Negotiated: cfg.Negotiated,
Priority: cfg.Priority,
ReliabilityParameter: cfg.ReliabilityParameter,
Label: cfg.Label,
Protocol: cfg.Protocol,
},
}
return dc, nil
}
// Close ...
func (s *Server) Close() error {
var err error
if !s.closed.Load().(bool) {
err = s.conn.Close()
s.closed.Store(true)
time.AfterFunc(8*time.Second, func() {
s.onClosed()
})
}
return err
}
// LocalAddr ...
func (s *Server) LocalAddr() net.Addr {
return s.conn.LocalAddr()
}
// RemoteAddr ...
func (s *Server) RemoteAddr() net.Addr {
return s.conn.RemoteAddr()
}