-
Notifications
You must be signed in to change notification settings - Fork 51
/
session.go
161 lines (128 loc) · 3.44 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
package srtp
import (
"errors"
"io"
"net"
"sync"
"time"
"github.com/pion/logging"
"github.com/pion/transport/v3/packetio"
)
type streamSession interface {
Close() error
write([]byte) (int, error)
decrypt([]byte) error
}
type session struct {
localContextMutex sync.Mutex
localContext, remoteContext *Context
localOptions, remoteOptions []ContextOption
newStream chan readStream
acceptStreamTimeout time.Time
started chan interface{}
closed chan interface{}
readStreamsClosed bool
readStreams map[uint32]readStream
readStreamsLock sync.Mutex
log logging.LeveledLogger
bufferFactory func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser
nextConn net.Conn
}
// Config is used to configure a session.
// You can provide either a KeyingMaterialExporter to export keys
// or directly pass the keys themselves.
// After a Config is passed to a session it must not be modified.
type Config struct {
Keys SessionKeys
Profile ProtectionProfile
BufferFactory func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser
LoggerFactory logging.LoggerFactory
AcceptStreamTimeout time.Time
// List of local/remote context options.
// ReplayProtection is enabled on remote context by default.
// Default replay protection window size is 64.
LocalOptions, RemoteOptions []ContextOption
}
// SessionKeys bundles the keys required to setup an SRTP session
type SessionKeys struct {
LocalMasterKey []byte
LocalMasterSalt []byte
RemoteMasterKey []byte
RemoteMasterSalt []byte
}
func (s *session) getOrCreateReadStream(ssrc uint32, child streamSession, proto func() readStream) (readStream, bool) {
s.readStreamsLock.Lock()
defer s.readStreamsLock.Unlock()
if s.readStreamsClosed {
return nil, false
}
r, ok := s.readStreams[ssrc]
if ok {
return r, false
}
// Create the readStream.
r = proto()
if err := r.init(child, ssrc); err != nil {
return nil, false
}
s.readStreams[ssrc] = r
return r, true
}
func (s *session) removeReadStream(ssrc uint32) {
s.readStreamsLock.Lock()
defer s.readStreamsLock.Unlock()
if s.readStreamsClosed {
return
}
delete(s.readStreams, ssrc)
}
func (s *session) close() error {
if s.nextConn == nil {
return nil
} else if err := s.nextConn.Close(); err != nil {
return err
}
<-s.closed
return nil
}
func (s *session) start(localMasterKey, localMasterSalt, remoteMasterKey, remoteMasterSalt []byte, profile ProtectionProfile, child streamSession) error {
var err error
s.localContext, err = CreateContext(localMasterKey, localMasterSalt, profile, s.localOptions...)
if err != nil {
return err
}
s.remoteContext, err = CreateContext(remoteMasterKey, remoteMasterSalt, profile, s.remoteOptions...)
if err != nil {
return err
}
if err = s.nextConn.SetReadDeadline(s.acceptStreamTimeout); err != nil {
return err
}
go func() {
defer func() {
close(s.newStream)
s.readStreamsLock.Lock()
s.readStreamsClosed = true
s.readStreamsLock.Unlock()
close(s.closed)
}()
b := make([]byte, 8192)
for {
var i int
i, err = s.nextConn.Read(b)
if err != nil {
if !errors.Is(err, io.EOF) {
s.log.Error(err.Error())
}
return
}
if err = child.decrypt(b[:i]); err != nil {
s.log.Info(err.Error())
}
}
}()
close(s.started)
return nil
}