forked from asticode/go-astits
-
Notifications
You must be signed in to change notification settings - Fork 0
/
demuxer_test.go
102 lines (90 loc) · 2.91 KB
/
demuxer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package astits
import (
"bytes"
"context"
"fmt"
"testing"
"github.com/asticode/go-astitools/binary"
"github.com/stretchr/testify/assert"
)
func TestDemuxerNew(t *testing.T) {
ps := 1
pp := func(ps []*Packet) (ds []*Data, skip bool, err error) { return }
dmx := New(context.Background(), nil, OptPacketSize(ps), OptPacketsParser(pp))
assert.Equal(t, ps, dmx.optPacketSize)
assert.Equal(t, fmt.Sprintf("%p", pp), fmt.Sprintf("%p", dmx.optPacketsParser))
}
func TestDemuxerNextPacket(t *testing.T) {
// Ctx error
ctx, cancel := context.WithCancel(context.Background())
dmx := New(ctx, bytes.NewReader([]byte{}))
cancel()
_, err := dmx.NextPacket()
assert.Error(t, err)
// Valid
w := astibinary.New()
b1, p1 := packet(*packetHeader, *packetAdaptationField, []byte("1"))
w.Write(b1)
b2, p2 := packet(*packetHeader, *packetAdaptationField, []byte("2"))
w.Write(b2)
dmx = New(context.Background(), bytes.NewReader(w.Bytes()))
// First packet
p, err := dmx.NextPacket()
assert.NoError(t, err)
assert.Equal(t, p1, p)
assert.Equal(t, 192, dmx.packetBuffer.packetSize)
// Second packet
p, err = dmx.NextPacket()
assert.NoError(t, err)
assert.Equal(t, p2, p)
// EOF
_, err = dmx.NextPacket()
assert.EqualError(t, err, ErrNoMorePackets.Error())
}
func TestDemuxerNextData(t *testing.T) {
// Init
w := astibinary.New()
b := psiBytes()
b1, _ := packet(PacketHeader{ContinuityCounter: uint8(0), PayloadUnitStartIndicator: true, PID: PIDPAT}, PacketAdaptationField{}, b[:147])
w.Write(b1)
b2, _ := packet(PacketHeader{ContinuityCounter: uint8(1), PID: PIDPAT}, PacketAdaptationField{}, b[147:])
w.Write(b2)
// We write this additional empty packet since we don't dump the packet pool anymore
b3, _ := packet(PacketHeader{ContinuityCounter: uint8(2), PayloadUnitStartIndicator: true, PID: PIDPAT}, PacketAdaptationField{}, []byte{})
w.Write(b3)
dmx := New(context.Background(), bytes.NewReader(w.Bytes()))
p, err := dmx.NextPacket()
assert.NoError(t, err)
_, err = dmx.Rewind()
assert.NoError(t, err)
// Next data
var ds []*Data
for _, s := range psi.Sections {
if s.Header.TableType != PSITableTypeUnknown {
d, err := dmx.NextData()
assert.NoError(t, err)
ds = append(ds, d)
}
}
assert.Equal(t, psi.toData(p, PIDPAT), ds)
assert.Equal(t, map[uint16]uint16{0x3: 0x2, 0x5: 0x4}, dmx.programMap.p)
// No more packets
_, err = dmx.NextData()
assert.EqualError(t, err, ErrNoMorePackets.Error())
}
func TestDemuxerRewind(t *testing.T) {
r := bytes.NewReader([]byte("content"))
dmx := New(context.Background(), r)
dmx.packetPool.add(&Packet{Header: &PacketHeader{PID: 1}})
dmx.dataBuffer = append(dmx.dataBuffer, &Data{})
b := make([]byte, 2)
_, err := r.Read(b)
assert.NoError(t, err)
n, err := dmx.Rewind()
assert.NoError(t, err)
assert.Equal(t, int64(0), n)
assert.Equal(t, 7, r.Len())
assert.Equal(t, 0, len(dmx.dataBuffer))
assert.Equal(t, 0, len(dmx.packetPool.b))
assert.Nil(t, dmx.packetBuffer)
}