forked from Haivision/srtgo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rw_test.go
122 lines (109 loc) · 2.55 KB
/
rw_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package srtgo
import (
"context"
"fmt"
"sync"
"testing"
"time"
)
type tester struct {
blocking string // blocking state
ctx context.Context // done notification for senders
cancel func() // cancel tester
err chan error // error output for senders/receivers
started sync.WaitGroup // sync goroutines on start
}
// Creates a connector socket and keeps writing to it
func (t *tester) send(port uint16) {
buf := make([]byte, 1316)
sock := NewSrtSocket("127.0.0.1", port, map[string]string{"blocking": t.blocking, "mode": "caller", "transtype": "file", "latency": "300"})
if sock == nil {
t.err <- fmt.Errorf("failed to create caller socket")
return
}
defer sock.Close()
// Wait until all sockets are ready
t.started.Done()
t.started.Wait()
err := sock.Connect()
if err != nil {
t.err <- fmt.Errorf("connect: %v", err)
return
}
for {
select {
case <-t.ctx.Done():
return
default:
}
_, err := sock.Write(buf)
if err != nil {
t.err <- fmt.Errorf("write: %v", err)
return
}
time.Sleep(1 * time.Millisecond) // 1316 bytes every 1 ms = 1310 KB/s
}
}
// Creates a listener socket and keeps reading from it
func (t *tester) receive(port uint16, iterations int) {
sock := NewSrtSocket("127.0.0.1", port, map[string]string{"blocking": t.blocking, "mode": "listener", "rcvbuf": "22937600", "transtype": "file", "latency": "300"})
if sock == nil {
t.err <- fmt.Errorf("failed to create listener socket")
return
}
buf := make([]byte, 1316)
defer sock.Close()
defer t.cancel()
// Wait until all sockets are ready
t.started.Done()
t.started.Wait()
err := sock.Listen(1)
if err != nil {
t.err <- fmt.Errorf("listen: %v", err)
return
}
remote, _, err := sock.Accept()
if err != nil {
t.err <- fmt.Errorf("accept: %v", err)
return
}
for i := 0; i < iterations; i++ {
_, err := remote.Read(buf)
if err != nil {
t.err <- fmt.Errorf("read: %v", err)
return
}
}
}
func runTransmitBench(b *testing.B, blocking bool) {
blockStr := "0"
if blocking {
blockStr = "1"
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t := &tester{
ctx: ctx,
blocking: blockStr,
cancel: cancel,
err: make(chan error, 1),
}
t.started.Add(2)
port := randomPort()
go t.receive(port, b.N)
go t.send(port)
t.started.Wait()
b.ResetTimer()
select {
case <-ctx.Done():
return
case err := <-t.err:
b.Error(err)
}
}
func BenchmarkRWBlocking(b *testing.B) {
runTransmitBench(b, true)
}
func BenchmarkRWNonBlocking(b *testing.B) {
runTransmitBench(b, false)
}