Skip to content

Commit

Permalink
Completely refactor media pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Nov 30, 2023
1 parent 9f41c76 commit 4310291
Show file tree
Hide file tree
Showing 13 changed files with 593 additions and 246 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7
github.com/livekit/psrpc v0.5.1
github.com/livekit/server-sdk-go v1.1.1
github.com/pion/interceptor v0.1.25
github.com/pion/rtp v1.8.3
github.com/pion/sdp/v2 v2.4.0
github.com/pion/webrtc/v3 v3.2.23
Expand Down Expand Up @@ -58,7 +59,6 @@ require (
github.com/pion/datachannel v1.5.5 // indirect
github.com/pion/dtls/v2 v2.2.7 // indirect
github.com/pion/ice/v2 v2.3.11 // indirect
github.com/pion/interceptor v0.1.25 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.8 // indirect
github.com/pion/randutil v0.1.0 // indirect
Expand Down
11 changes: 11 additions & 0 deletions pkg/media/media.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package media

type Writer[T any] interface {
WriteSample(sample T) error
}

type WriterFunc[T any] func(in T) error

func (fnc WriterFunc[T]) WriteSample(in T) error {
return fnc(in)
}
39 changes: 39 additions & 0 deletions pkg/media/opus/opus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package opus

import (
"gopkg.in/hraban/opus.v2"

"github.com/livekit/sip/pkg/media"
)

type Sample []byte

func Decode(w media.Writer[media.PCM16Sample], sampleRate int, channels int) (media.Writer[Sample], error) {
dec, err := opus.NewDecoder(sampleRate, channels)
if err != nil {
return nil, err
}
buf := make([]int16, 1000)
return media.WriterFunc[Sample](func(in Sample) error {
n, err := dec.Decode(in, buf)
if err != nil {
return err
}
return w.WriteSample(buf[:n])
}), nil
}

func Encode(w media.Writer[Sample], sampleRate int, channels int) (media.Writer[media.PCM16Sample], error) {
enc, err := opus.NewEncoder(sampleRate, channels, opus.AppVoIP)
if err != nil {
return nil, err
}
buf := make([]byte, 1024)
return media.WriterFunc[media.PCM16Sample](func(in media.PCM16Sample) error {
n, err := enc.Encode(in, buf)
if err != nil {
return err
}
return w.WriteSample(buf[:n])
}), nil
}
68 changes: 68 additions & 0 deletions pkg/media/pcm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package media

import (
"encoding/binary"
"time"

"github.com/pion/webrtc/v3/pkg/media"
)

func PlayAudio[T any](w Writer[T], sampleDur time.Duration, frames []T) error {
tick := time.NewTicker(sampleDur)
defer tick.Stop()
for range tick.C {
if len(frames) == 0 {
break
}
samples := frames[0]
frames = frames[1:]
if err := w.WriteSample(samples); err != nil {
return err
}
}
return nil
}

type LPCM16Sample []byte

func (s LPCM16Sample) Decode() PCM16Sample {
out := make(PCM16Sample, len(s)/2)
for i := 0; i < len(s); i += 2 {
out[i/2] = int16(binary.LittleEndian.Uint16(s[i:]))
}
return out
}

type PCM16Sample []int16

func (s PCM16Sample) Encode() LPCM16Sample {
out := make(LPCM16Sample, len(s)*2)
for i, v := range s {
binary.LittleEndian.PutUint16(out[2*i:], uint16(v))
}
return out
}

func DecodePCM(w Writer[PCM16Sample]) Writer[LPCM16Sample] {
return WriterFunc[LPCM16Sample](func(in LPCM16Sample) error {
return w.WriteSample(in.Decode())
})
}

func EncodePCM(w Writer[LPCM16Sample]) Writer[PCM16Sample] {
return WriterFunc[PCM16Sample](func(in PCM16Sample) error {
return w.WriteSample(in.Encode())
})
}

type MediaSampleWriter interface {
WriteSample(sample media.Sample) error
}

func FromSampleWriter[T ~[]byte](w MediaSampleWriter, sampleDur time.Duration) Writer[T] {
return WriterFunc[T](func(in T) error {
data := make([]byte, len(in))
copy(data, in)
return w.WriteSample(media.Sample{Data: data, Duration: sampleDur})
})
}
94 changes: 94 additions & 0 deletions pkg/media/rtp/rtp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package rtp

import (
"github.com/pion/interceptor"
"github.com/pion/rtp"

"github.com/livekit/sip/pkg/media"
)

type Writer interface {
WriteRTP(p *rtp.Packet) error
}

type Reader interface {
ReadRTP() (*rtp.Packet, interceptor.Attributes, error)
}

type Handler interface {
HandleRTP(p *rtp.Packet) error
}

type HandlerFunc func(p *rtp.Packet) error

func (fnc HandlerFunc) HandleRTP(p *rtp.Packet) error {
return fnc(p)
}

func HandleLoop(r Reader, h Handler) error {
for {
p, _, err := r.ReadRTP()
if err != nil {
return err
}
err = h.HandleRTP(p)
if err != nil {
return err
}
}
}

func NewStream(w Writer, packetDur uint32) *Stream {
s := &Stream{w: w, packetDur: packetDur}
s.p = rtp.Packet{
Header: rtp.Header{
Version: 2,
SSRC: 5000, // TODO: why this magic number?
Timestamp: 0,
SequenceNumber: 0,
},
}
return s
}

type Packet = rtp.Packet

type Stream struct {
w Writer
p Packet
packetDur uint32
}

func (s *Stream) WritePayload(data []byte) error {
s.p.Payload = data
if err := s.w.WriteRTP(&s.p); err != nil {
return err
}
s.p.Header.Timestamp += s.packetDur
s.p.Header.SequenceNumber++
return nil
}

func NewMediaStreamOut[T ~[]byte](w Writer, packetDur uint32) *MediaStreamOut[T] {
return &MediaStreamOut[T]{s: NewStream(w, packetDur)}
}

type MediaStreamOut[T ~[]byte] struct {
s *Stream
}

func (s *MediaStreamOut[T]) WriteSample(sample T) error {
return s.s.WritePayload([]byte(sample))
}

func NewMediaStreamIn[T ~[]byte](w media.Writer[T]) *MediaStreamIn[T] {
return &MediaStreamIn[T]{w: w}
}

type MediaStreamIn[T ~[]byte] struct {
w media.Writer[T]
}

func (s *MediaStreamIn[T]) HandleRTP(p *rtp.Packet) error {
return s.w.WriteSample(T(p.Payload))
}
32 changes: 32 additions & 0 deletions pkg/media/ulaw/ulaw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ulaw

import (
"github.com/zaf/g711"

"github.com/livekit/sip/pkg/media"
)

type Sample []byte

func (s Sample) Decode() media.LPCM16Sample {
return g711.DecodeUlaw(s)
}

func (s *Sample) Encode(data media.LPCM16Sample) {
*s = g711.EncodeUlaw(data)
}

func Encode(w media.Writer[media.LPCM16Sample]) media.Writer[Sample] {
return media.WriterFunc[Sample](func(in Sample) error {
out := in.Decode()
return w.WriteSample(out)
})
}

func Decode(w media.Writer[Sample]) media.Writer[media.LPCM16Sample] {
return media.WriterFunc[media.LPCM16Sample](func(in media.LPCM16Sample) error {
var s Sample
s.Encode(in)
return w.WriteSample(s)
})
}
7 changes: 7 additions & 0 deletions pkg/mixer/mixer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"time"

"github.com/frostbyte73/core"

"github.com/livekit/sip/pkg/media"
)

const (
Expand Down Expand Up @@ -176,3 +178,8 @@ func (i *Input) Push(sample []int16) {
i.hasBuffered = true
}
}

func (i *Input) WriteSample(sample media.PCM16Sample) error {
i.Push(sample)
return nil
}
37 changes: 22 additions & 15 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/emiago/sipgo/sip"
"github.com/icholy/digest"
"github.com/pion/sdp/v2"

"github.com/livekit/sip/pkg/media/rtp"
)

func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) {
Expand Down Expand Up @@ -107,13 +109,16 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
}

type inboundCall struct {
s *Server
tag string
from *sip.FromHeader
to *sip.ToHeader
src string
media mediaData
done atomic.Bool
s *Server
tag string
from *sip.FromHeader
to *sip.ToHeader
src string
rtpConn *MediaConn
audioHandler atomic.Pointer[rtp.Handler]
dtmf chan byte // buffered; DTMF digits as characters
lkRoom atomic.Pointer[Room] // LiveKit room; only populated after correct pin is entered
done atomic.Bool
}

func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHeader, src string) *inboundCall {
Expand All @@ -123,22 +128,22 @@ func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHead
from: from,
to: to,
src: src,
dtmf: make(chan byte, 10),
}
c.initMedia()
return c
}

func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction) {
// Send initial request. In the best case scenario, we will immediately get a room name to join.
// Otherwise, we could ever learn that this number is not allowed and reject the call, or ask for pin if required.
// Otherwise, we could even learn that this number is not allowed and reject the call, or ask for pin if required.
roomName, identity, requirePin, rejectInvite := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.src, "", false)
if rejectInvite {
sipErrorResponse(tx, req)
return
}

// We need to start media first, otherwise we won't be able to send audio prompts to the caller, or receive DTMF.
answerData, err := c.runMedia(req.Body())
answerData, err := c.runMediaConn(req.Body())
if err != nil {
sipErrorResponse(tx, req)
return
Expand All @@ -163,16 +168,18 @@ func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction) {
}
}

func (c *inboundCall) runMedia(offerData []byte) ([]byte, error) {
addr, err := c.createMediaSession()
if err != nil {
func (c *inboundCall) runMediaConn(offerData []byte) (answerData []byte, _ error) {
conn := NewMediaConn()
conn.OnRTP(c)
if err := conn.Start("0.0.0.0"); err != nil {
return nil, err
}
c.rtpConn = conn
offer := sdp.SessionDescription{}
if err := offer.Unmarshal(offerData); err != nil {
return nil, err
}
return generateAnswer(offer, c.s.publicIp, addr.Port)
return generateAnswer(offer, c.s.publicIp, conn.LocalAddr().Port)
}

func (c *inboundCall) pinPrompt() {
Expand All @@ -183,7 +190,7 @@ func (c *inboundCall) pinPrompt() {
noPin := false
for {
select {
case b, ok := <-c.media.dtmf:
case b, ok := <-c.dtmf:
if !ok {
c.Close()
return
Expand Down
Loading

0 comments on commit 4310291

Please sign in to comment.