diff --git a/go.mod b/go.mod index f7d6fd69..a54825b6 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 github.com/livekit/psrpc v0.5.1 github.com/livekit/server-sdk-go v1.1.1 + github.com/pion/interceptor v0.1.25 github.com/pion/rtp v1.8.3 github.com/pion/sdp/v2 v2.4.0 github.com/pion/webrtc/v3 v3.2.23 @@ -58,7 +59,6 @@ require ( github.com/pion/datachannel v1.5.5 // indirect github.com/pion/dtls/v2 v2.2.7 // indirect github.com/pion/ice/v2 v2.3.11 // indirect - github.com/pion/interceptor v0.1.25 // indirect github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.8 // indirect github.com/pion/randutil v0.1.0 // indirect diff --git a/pkg/media/media.go b/pkg/media/media.go new file mode 100644 index 00000000..e090e48f --- /dev/null +++ b/pkg/media/media.go @@ -0,0 +1,11 @@ +package media + +type Writer[T any] interface { + WriteSample(sample T) error +} + +type WriterFunc[T any] func(in T) error + +func (fnc WriterFunc[T]) WriteSample(in T) error { + return fnc(in) +} diff --git a/pkg/media/opus/opus.go b/pkg/media/opus/opus.go new file mode 100644 index 00000000..c0cd3be6 --- /dev/null +++ b/pkg/media/opus/opus.go @@ -0,0 +1,39 @@ +package opus + +import ( + "gopkg.in/hraban/opus.v2" + + "github.com/livekit/sip/pkg/media" +) + +type Sample []byte + +func Decode(w media.Writer[media.PCM16Sample], sampleRate int, channels int) (media.Writer[Sample], error) { + dec, err := opus.NewDecoder(sampleRate, channels) + if err != nil { + return nil, err + } + buf := make([]int16, 1000) + return media.WriterFunc[Sample](func(in Sample) error { + n, err := dec.Decode(in, buf) + if err != nil { + return err + } + return w.WriteSample(buf[:n]) + }), nil +} + +func Encode(w media.Writer[Sample], sampleRate int, channels int) (media.Writer[media.PCM16Sample], error) { + enc, err := opus.NewEncoder(sampleRate, channels, opus.AppVoIP) + if err != nil { + return nil, err + } + buf := make([]byte, 1024) + return media.WriterFunc[media.PCM16Sample](func(in media.PCM16Sample) error { + n, err := enc.Encode(in, buf) + if err != nil { + return err + } + return w.WriteSample(buf[:n]) + }), nil +} diff --git a/pkg/media/pcm.go b/pkg/media/pcm.go new file mode 100644 index 00000000..2d46f8ce --- /dev/null +++ b/pkg/media/pcm.go @@ -0,0 +1,68 @@ +package media + +import ( + "encoding/binary" + "time" + + "github.com/pion/webrtc/v3/pkg/media" +) + +func PlayAudio[T any](w Writer[T], sampleDur time.Duration, frames []T) error { + tick := time.NewTicker(sampleDur) + defer tick.Stop() + for range tick.C { + if len(frames) == 0 { + break + } + samples := frames[0] + frames = frames[1:] + if err := w.WriteSample(samples); err != nil { + return err + } + } + return nil +} + +type LPCM16Sample []byte + +func (s LPCM16Sample) Decode() PCM16Sample { + out := make(PCM16Sample, len(s)/2) + for i := 0; i < len(s); i += 2 { + out[i/2] = int16(binary.LittleEndian.Uint16(s[i:])) + } + return out +} + +type PCM16Sample []int16 + +func (s PCM16Sample) Encode() LPCM16Sample { + out := make(LPCM16Sample, len(s)*2) + for i, v := range s { + binary.LittleEndian.PutUint16(out[2*i:], uint16(v)) + } + return out +} + +func DecodePCM(w Writer[PCM16Sample]) Writer[LPCM16Sample] { + return WriterFunc[LPCM16Sample](func(in LPCM16Sample) error { + return w.WriteSample(in.Decode()) + }) +} + +func EncodePCM(w Writer[LPCM16Sample]) Writer[PCM16Sample] { + return WriterFunc[PCM16Sample](func(in PCM16Sample) error { + return w.WriteSample(in.Encode()) + }) +} + +type MediaSampleWriter interface { + WriteSample(sample media.Sample) error +} + +func FromSampleWriter[T ~[]byte](w MediaSampleWriter, sampleDur time.Duration) Writer[T] { + return WriterFunc[T](func(in T) error { + data := make([]byte, len(in)) + copy(data, in) + return w.WriteSample(media.Sample{Data: data, Duration: sampleDur}) + }) +} diff --git a/pkg/media/rtp/rtp.go b/pkg/media/rtp/rtp.go new file mode 100644 index 00000000..423df183 --- /dev/null +++ b/pkg/media/rtp/rtp.go @@ -0,0 +1,94 @@ +package rtp + +import ( + "github.com/pion/interceptor" + "github.com/pion/rtp" + + "github.com/livekit/sip/pkg/media" +) + +type Writer interface { + WriteRTP(p *rtp.Packet) error +} + +type Reader interface { + ReadRTP() (*rtp.Packet, interceptor.Attributes, error) +} + +type Handler interface { + HandleRTP(p *rtp.Packet) error +} + +type HandlerFunc func(p *rtp.Packet) error + +func (fnc HandlerFunc) HandleRTP(p *rtp.Packet) error { + return fnc(p) +} + +func HandleLoop(r Reader, h Handler) error { + for { + p, _, err := r.ReadRTP() + if err != nil { + return err + } + err = h.HandleRTP(p) + if err != nil { + return err + } + } +} + +func NewStream(w Writer, packetDur uint32) *Stream { + s := &Stream{w: w, packetDur: packetDur} + s.p = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + SSRC: 5000, // TODO: why this magic number? + Timestamp: 0, + SequenceNumber: 0, + }, + } + return s +} + +type Packet = rtp.Packet + +type Stream struct { + w Writer + p Packet + packetDur uint32 +} + +func (s *Stream) WritePayload(data []byte) error { + s.p.Payload = data + if err := s.w.WriteRTP(&s.p); err != nil { + return err + } + s.p.Header.Timestamp += s.packetDur + s.p.Header.SequenceNumber++ + return nil +} + +func NewMediaStreamOut[T ~[]byte](w Writer, packetDur uint32) *MediaStreamOut[T] { + return &MediaStreamOut[T]{s: NewStream(w, packetDur)} +} + +type MediaStreamOut[T ~[]byte] struct { + s *Stream +} + +func (s *MediaStreamOut[T]) WriteSample(sample T) error { + return s.s.WritePayload([]byte(sample)) +} + +func NewMediaStreamIn[T ~[]byte](w media.Writer[T]) *MediaStreamIn[T] { + return &MediaStreamIn[T]{w: w} +} + +type MediaStreamIn[T ~[]byte] struct { + w media.Writer[T] +} + +func (s *MediaStreamIn[T]) HandleRTP(p *rtp.Packet) error { + return s.w.WriteSample(T(p.Payload)) +} diff --git a/pkg/media/ulaw/ulaw.go b/pkg/media/ulaw/ulaw.go new file mode 100644 index 00000000..a288bb14 --- /dev/null +++ b/pkg/media/ulaw/ulaw.go @@ -0,0 +1,32 @@ +package ulaw + +import ( + "github.com/zaf/g711" + + "github.com/livekit/sip/pkg/media" +) + +type Sample []byte + +func (s Sample) Decode() media.LPCM16Sample { + return g711.DecodeUlaw(s) +} + +func (s *Sample) Encode(data media.LPCM16Sample) { + *s = g711.EncodeUlaw(data) +} + +func Encode(w media.Writer[media.LPCM16Sample]) media.Writer[Sample] { + return media.WriterFunc[Sample](func(in Sample) error { + out := in.Decode() + return w.WriteSample(out) + }) +} + +func Decode(w media.Writer[Sample]) media.Writer[media.LPCM16Sample] { + return media.WriterFunc[media.LPCM16Sample](func(in media.LPCM16Sample) error { + var s Sample + s.Encode(in) + return w.WriteSample(s) + }) +} diff --git a/pkg/mixer/mixer.go b/pkg/mixer/mixer.go index 7d84a952..3feeb4fe 100644 --- a/pkg/mixer/mixer.go +++ b/pkg/mixer/mixer.go @@ -20,6 +20,8 @@ import ( "time" "github.com/frostbyte73/core" + + "github.com/livekit/sip/pkg/media" ) const ( @@ -176,3 +178,8 @@ func (i *Input) Push(sample []int16) { i.hasBuffered = true } } + +func (i *Input) WriteSample(sample media.PCM16Sample) error { + i.Push(sample) + return nil +} diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 6b3c1a6e..01666144 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -9,6 +9,8 @@ import ( "github.com/emiago/sipgo/sip" "github.com/icholy/digest" "github.com/pion/sdp/v2" + + "github.com/livekit/sip/pkg/media/rtp" ) func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) { @@ -107,13 +109,16 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) { } type inboundCall struct { - s *Server - tag string - from *sip.FromHeader - to *sip.ToHeader - src string - media mediaData - done atomic.Bool + s *Server + tag string + from *sip.FromHeader + to *sip.ToHeader + src string + rtpConn *MediaConn + audioHandler atomic.Pointer[rtp.Handler] + dtmf chan byte // buffered; DTMF digits as characters + lkRoom atomic.Pointer[Room] // LiveKit room; only populated after correct pin is entered + done atomic.Bool } func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHeader, src string) *inboundCall { @@ -123,14 +128,14 @@ func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHead from: from, to: to, src: src, + dtmf: make(chan byte, 10), } - c.initMedia() return c } func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction) { // Send initial request. In the best case scenario, we will immediately get a room name to join. - // Otherwise, we could ever learn that this number is not allowed and reject the call, or ask for pin if required. + // Otherwise, we could even learn that this number is not allowed and reject the call, or ask for pin if required. roomName, identity, requirePin, rejectInvite := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.src, "", false) if rejectInvite { sipErrorResponse(tx, req) @@ -138,7 +143,7 @@ func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction) { } // We need to start media first, otherwise we won't be able to send audio prompts to the caller, or receive DTMF. - answerData, err := c.runMedia(req.Body()) + answerData, err := c.runMediaConn(req.Body()) if err != nil { sipErrorResponse(tx, req) return @@ -163,16 +168,18 @@ func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction) { } } -func (c *inboundCall) runMedia(offerData []byte) ([]byte, error) { - addr, err := c.createMediaSession() - if err != nil { +func (c *inboundCall) runMediaConn(offerData []byte) (answerData []byte, _ error) { + conn := NewMediaConn() + conn.OnRTP(c) + if err := conn.Start("0.0.0.0"); err != nil { return nil, err } + c.rtpConn = conn offer := sdp.SessionDescription{} if err := offer.Unmarshal(offerData); err != nil { return nil, err } - return generateAnswer(offer, c.s.publicIp, addr.Port) + return generateAnswer(offer, c.s.publicIp, conn.LocalAddr().Port) } func (c *inboundCall) pinPrompt() { @@ -183,7 +190,7 @@ func (c *inboundCall) pinPrompt() { noPin := false for { select { - case b, ok := <-c.media.dtmf: + case b, ok := <-c.dtmf: if !ok { c.Close() return diff --git a/pkg/sip/media.go b/pkg/sip/media.go index 70b01bd0..12225509 100644 --- a/pkg/sip/media.go +++ b/pkg/sip/media.go @@ -15,168 +15,55 @@ package sip import ( - "bytes" - "encoding/binary" "log" - "net" - "sync/atomic" "time" - "github.com/at-wat/ebml-go" - "github.com/at-wat/ebml-go/webm" - "github.com/pion/rtp" - "github.com/pion/webrtc/v3" - "github.com/pion/webrtc/v3/pkg/media" - "github.com/zaf/g711" - "gopkg.in/hraban/opus.v2" - - "github.com/livekit/sip/pkg/mixer" + "github.com/livekit/sip/pkg/media" + "github.com/livekit/sip/pkg/media/rtp" + "github.com/livekit/sip/pkg/media/ulaw" "github.com/livekit/sip/res" - - lksdk "github.com/livekit/server-sdk-go" ) const ( - channels = 1 - sampleRate = 8000 + channels = 1 + sampleRate = 8000 + sampleDur = 20 * time.Millisecond + sampleDurPart = int(time.Second / sampleDur) + rtpPacketDur = uint32(sampleRate / sampleDurPart) ) type mediaRes struct { - enterPin [][]int16 - roomJoin [][]int16 - wrongPin [][]int16 + enterPin []media.PCM16Sample + roomJoin []media.PCM16Sample + wrongPin []media.PCM16Sample } func (s *Server) initMediaRes() { - s.res.enterPin = audioFileToFrames(res.EnterPinMkv) - s.res.roomJoin = audioFileToFrames(res.RoomJoinMkv) - s.res.wrongPin = audioFileToFrames(res.WrongPinMkv) -} - -func audioFileToFrames(data []byte) [][]int16 { - var ret struct { - Header webm.EBMLHeader `ebml:"EBML"` - Segment webm.Segment `ebml:"Segment"` - } - if err := ebml.Unmarshal(bytes.NewReader(data), &ret); err != nil { - panic(err) - } - - var frames [][]int16 - for _, cluster := range ret.Segment.Cluster { - for _, block := range cluster.SimpleBlock { - for _, data := range block.Data { - decoded := g711.DecodeUlaw(data) - pcm := make([]int16, 0, len(decoded)/2) - for i := 0; i < len(decoded); i += 2 { - sample := binary.LittleEndian.Uint16(decoded[i:]) - pcm = append(pcm, int16(sample)) - } - frames = append(frames, pcm) - } - } - } - return frames -} - -type mediaData struct { - conn *net.UDPConn - mix *mixer.Mixer - enc *opus.Encoder - dest atomic.Pointer[net.UDPAddr] - track atomic.Pointer[webrtc.TrackLocalStaticSample] - room atomic.Pointer[lksdk.Room] - dtmf chan byte -} - -func (c *inboundCall) initMedia() { - c.media.dtmf = make(chan byte, 10) + s.res.enterPin = readMkvAudioFile(res.EnterPinMkv) + s.res.roomJoin = readMkvAudioFile(res.RoomJoinMkv) + s.res.wrongPin = readMkvAudioFile(res.WrongPinMkv) } func (c *inboundCall) closeMedia() { - if p := c.media.room.Load(); p != nil { - p.Disconnect() - c.media.room.Store(nil) - } - if p := c.media.track.Load(); p != nil { - c.media.track.Store(nil) - } - c.media.mix.Stop() - c.media.conn.Close() - close(c.media.dtmf) -} - -func (c *inboundCall) createMediaSession() (*net.UDPAddr, error) { - conn, err := net.ListenUDP("udp", &net.UDPAddr{ - Port: 0, - IP: net.ParseIP("0.0.0.0"), - }) - if err != nil { - return nil, err - } - c.media.conn = conn - - mixerRtpPkt := &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - SSRC: 5000, - }, + c.audioHandler.Store(nil) + if p := c.lkRoom.Load(); p != nil { + p.Close() + c.lkRoom.Store(nil) } - c.media.mix = mixer.NewMixer(func(audioSample []byte) { - dstAddr := c.media.dest.Load() - if dstAddr == nil { - return - } - - mixerRtpPkt.Payload = g711.EncodeUlaw(audioSample) - - raw, err := mixerRtpPkt.Marshal() - if err != nil { - return - } - - if _, err = c.media.conn.WriteTo(raw, dstAddr); err != nil { - return - } - - mixerRtpPkt.Header.Timestamp += 160 - mixerRtpPkt.Header.SequenceNumber += 1 - }, 8000) - - enc, err := opus.NewEncoder(sampleRate, channels, opus.AppVoIP) - if err != nil { - return nil, err - } - c.media.enc = enc - - go c.readMedia() - return conn.LocalAddr().(*net.UDPAddr), nil + c.rtpConn.Close() + close(c.dtmf) } -func (c *inboundCall) readMedia() { - buff := make([]byte, 1500) - var rtpPkt rtp.Packet - for { - n, srcAddr, err := c.media.conn.ReadFromUDP(buff) - if err != nil { - return - } - c.media.dest.Store(srcAddr) - - if err := rtpPkt.Unmarshal(buff[:n]); err != nil { - continue - } - c.handleRTP(&rtpPkt) - } -} - -func (c *inboundCall) handleRTP(p *rtp.Packet) { +func (c *inboundCall) HandleRTP(p *rtp.Packet) error { if p.Marker && p.PayloadType == 101 { c.handleDTMF(p.Payload) - return + return nil } // TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it? - c.handleAudio(p.Payload) + if h := c.audioHandler.Load(); h != nil { + return (*h).HandleRTP(p) + } + return nil } var dtmfEventToChar = [256]byte{ @@ -194,95 +81,33 @@ func (c *inboundCall) handleDTMF(data []byte) { // RFC2833 b := dtmfEventToChar[ev] // We should have enough buffer here. select { - case c.media.dtmf <- b: + case c.dtmf <- b: default: } } -func (c *inboundCall) handleAudio(audioData []byte) { - track := c.media.track.Load() - if track == nil { - return - } - decoded := g711.DecodeUlaw(audioData) - - var pcm []int16 - for i := 0; i < len(decoded); i += 2 { - sample := binary.LittleEndian.Uint16(decoded[i:]) - pcm = append(pcm, int16(sample)) - } - - data := make([]byte, 1000) - n, err := c.media.enc.Encode(pcm, data) - if err != nil { - return - } - if err = track.WriteSample(media.Sample{Data: data[:n], Duration: time.Millisecond * 20}); err != nil { - return - } -} - func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity string) error { - roomCB := &lksdk.RoomCallback{ - ParticipantCallback: lksdk.ParticipantCallback{ - OnTrackSubscribed: func(track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { - if track.Kind() == webrtc.RTPCodecTypeVideo { - if err := publication.SetSubscribed(false); err != nil { - log.Println(err) - } - return - } - - decoder, err := opus.NewDecoder(8000, 1) - if err != nil { - return - } - - input := c.media.mix.AddInput() - samples := make([]int16, 1000) - for { - rtpPkt, _, err := track.ReadRTP() - if err != nil { - break - } - - n, err := decoder.Decode(rtpPkt.Payload, samples) - if err != nil { - break - } - - input.Push(samples[:n]) - } - c.media.mix.RemoveInput(input) - }, - }, - } - - room, err := lksdk.ConnectToRoom(c.s.conf.WsUrl, - lksdk.ConnectInfo{ - APIKey: c.s.conf.ApiKey, - APISecret: c.s.conf.ApiSecret, - RoomName: roomName, - ParticipantIdentity: participantIdentity, - }, - roomCB, - ) + room, err := ConnectToRoom(c.s.conf, roomName, participantIdentity) if err != nil { return err } - - track, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") + local, err := room.NewParticipant() if err != nil { + _ = room.Close() return err } + c.lkRoom.Store(room) + + // Decoding pipeline (SIP -> LK) + lpcm := media.DecodePCM(local) + law := ulaw.Encode(lpcm) + var h rtp.Handler = rtp.NewMediaStreamIn(law) + c.audioHandler.Store(&h) + + // Encoding pipeline (LK -> SIP) + s := rtp.NewMediaStreamOut[ulaw.Sample](c.rtpConn, rtpPacketDur) + room.SetOutput(ulaw.Decode(s)) - if _, err = room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{ - Name: participantIdentity, - }); err != nil { - return err - } - c.media.track.Store(track) - c.media.room.Store(room) return nil } @@ -294,18 +119,9 @@ func (c *inboundCall) joinRoom(roomName, identity string) { } } -func (c *inboundCall) playAudio(frames [][]int16) { - input := c.media.mix.AddInput() - defer c.media.mix.RemoveInput(input) - - tick := time.NewTicker(20 * time.Millisecond) - defer tick.Stop() - for range tick.C { - if len(frames) == 0 { - break - } - samples := frames[0] - frames = frames[1:] - input.Push(samples) - } +func (c *inboundCall) playAudio(frames []media.PCM16Sample) { + r := c.lkRoom.Load() + t := r.NewTrack() + defer t.Close() + t.PlayAudio(frames) } diff --git a/pkg/sip/media_file.go b/pkg/sip/media_file.go new file mode 100644 index 00000000..c5ce522b --- /dev/null +++ b/pkg/sip/media_file.go @@ -0,0 +1,33 @@ +package sip + +import ( + "bytes" + + "github.com/at-wat/ebml-go" + "github.com/at-wat/ebml-go/webm" + + "github.com/livekit/sip/pkg/media" + "github.com/livekit/sip/pkg/media/ulaw" +) + +func readMkvAudioFile(data []byte) []media.PCM16Sample { + var ret struct { + Header webm.EBMLHeader `ebml:"EBML"` + Segment webm.Segment `ebml:"Segment"` + } + if err := ebml.Unmarshal(bytes.NewReader(data), &ret); err != nil { + panic(err) + } + + var frames []media.PCM16Sample + for _, cluster := range ret.Segment.Cluster { + for _, block := range cluster.SimpleBlock { + for _, data := range block.Data { + lpcm := ulaw.Sample(data).Decode() + pcm := lpcm.Decode() + frames = append(frames, pcm) + } + } + } + return frames +} diff --git a/pkg/sip/media_sip.go b/pkg/sip/media_sip.go new file mode 100644 index 00000000..4b1a16e9 --- /dev/null +++ b/pkg/sip/media_sip.go @@ -0,0 +1,98 @@ +package sip + +import ( + "net" + "sync" + "sync/atomic" + + "github.com/livekit/sip/pkg/media/rtp" +) + +var _ rtp.Writer = (*MediaConn)(nil) + +func NewMediaConn() *MediaConn { + return &MediaConn{} +} + +type MediaConn struct { + wmu sync.Mutex + conn *net.UDPConn + + dest atomic.Pointer[net.UDPAddr] + onRTP rtp.Handler +} + +func (c *MediaConn) LocalAddr() *net.UDPAddr { + return c.conn.LocalAddr().(*net.UDPAddr) +} + +func (c *MediaConn) DestAddr() *net.UDPAddr { + return c.dest.Load() +} + +func (c *MediaConn) SetDestAddr(addr *net.UDPAddr) { + c.dest.Store(addr) +} + +func (c *MediaConn) OnRTP(h rtp.Handler) { + c.onRTP = h +} + +func (c *MediaConn) Close() error { + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + return nil +} + +func (c *MediaConn) Start(listenAddr string) error { + if listenAddr == "" { + listenAddr = "0.0.0.0" + } + var err error + c.conn, err = net.ListenUDP("udp", &net.UDPAddr{ + IP: net.ParseIP(listenAddr), + Port: 0, + }) + if err != nil { + return err + } + go c.readLoop() + return nil +} + +func (c *MediaConn) readLoop() { + buf := make([]byte, 1500) // MTU + var p rtp.Packet + for { + n, srcAddr, err := c.conn.ReadFromUDP(buf) + if err != nil { + return + } + c.dest.Store(srcAddr) + + p = rtp.Packet{} + if err := p.Unmarshal(buf[:n]); err != nil { + continue + } + if c.onRTP != nil { + _ = c.onRTP.HandleRTP(&p) + } + } +} + +func (c *MediaConn) WriteRTP(p *rtp.Packet) error { + addr := c.dest.Load() + if addr == nil { + return nil + } + data, err := p.Marshal() + if err != nil { + return err + } + c.wmu.Lock() + defer c.wmu.Unlock() + _, err = c.conn.WriteTo(data, addr) + return err +} diff --git a/pkg/sip/room.go b/pkg/sip/room.go new file mode 100644 index 00000000..0eb11543 --- /dev/null +++ b/pkg/sip/room.go @@ -0,0 +1,139 @@ +package sip + +import ( + "log" + "sync/atomic" + + lksdk "github.com/livekit/server-sdk-go" + "github.com/pion/webrtc/v3" + + "github.com/livekit/sip/pkg/config" + "github.com/livekit/sip/pkg/media" + "github.com/livekit/sip/pkg/media/opus" + "github.com/livekit/sip/pkg/media/rtp" + "github.com/livekit/sip/pkg/mixer" +) + +type Room struct { + room *lksdk.Room + mix *mixer.Mixer + out atomic.Pointer[media.Writer[media.LPCM16Sample]] + identity string +} + +func ConnectToRoom(conf *config.Config, roomName string, identity string) (*Room, error) { + r := &Room{ + identity: identity, + } + r.mix = mixer.NewMixer(func(data []byte) { + sample := media.LPCM16Sample(data) + if out := r.Output(); out == nil { + _ = out.WriteSample(sample) + } + }, sampleRate) + + room, err := lksdk.ConnectToRoom(conf.WsUrl, + lksdk.ConnectInfo{ + APIKey: conf.ApiKey, + APISecret: conf.ApiSecret, + RoomName: roomName, + ParticipantIdentity: identity, + }, + &lksdk.RoomCallback{ + ParticipantCallback: lksdk.ParticipantCallback{ + OnTrackSubscribed: func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { + if track.Kind() != webrtc.RTPCodecTypeAudio { + if err := pub.SetSubscribed(false); err != nil { + log.Println(err) + } + return + } + + mtrack := r.NewTrack() + defer mtrack.Close() + + odec, err := opus.Decode(mtrack, sampleRate, channels) + if err != nil { + return + } + h := rtp.NewMediaStreamIn[opus.Sample](odec) + _ = rtp.HandleLoop(track, h) + }, + }, + }, + ) + if err != nil { + return nil, err + } + r.room = room + return r, nil +} + +func (r *Room) Output() media.Writer[media.LPCM16Sample] { + ptr := r.out.Load() + if ptr == nil { + return nil + } + return *ptr +} + +func (r *Room) SetOutput(out media.Writer[media.LPCM16Sample]) { + if out == nil { + r.out.Store(nil) + } else { + r.out.Store(&out) + } +} + +func (r *Room) Close() error { + if r.room == nil { + r.room.Disconnect() + r.room = nil + } + if r.mix != nil { + r.mix.Stop() + r.mix = nil + } + return nil +} + +func (r *Room) NewParticipant() (media.Writer[media.PCM16Sample], error) { + track, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") + if err != nil { + return nil, err + } + if _, err = r.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{ + Name: r.identity, + }); err != nil { + return nil, err + } + ow := media.FromSampleWriter[opus.Sample](track, sampleDur) + pw, err := opus.Encode(ow, sampleRate, channels) + if err != nil { + return nil, err + } + return pw, nil +} + +func (r *Room) NewTrack() *Track { + inp := r.mix.AddInput() + return &Track{room: r, inp: inp} +} + +type Track struct { + room *Room + inp *mixer.Input +} + +func (t *Track) Close() error { + t.room.mix.RemoveInput(t.inp) + return nil +} + +func (t *Track) PlayAudio(frames []media.PCM16Sample) { + _ = media.PlayAudio[media.PCM16Sample](t, sampleDur, frames) +} + +func (t *Track) WriteSample(pcm media.PCM16Sample) error { + return t.inp.WriteSample(pcm) +} diff --git a/pkg/sip/signaling.go b/pkg/sip/signaling.go index cff1caa7..13b0a932 100644 --- a/pkg/sip/signaling.go +++ b/pkg/sip/signaling.go @@ -62,5 +62,8 @@ func generateAnswer(offer sdp.SessionDescription, publicIp string, rtpListenerPo }, } + // Static compiler check for sample rate hardcoded above. + var _ = [1]struct{}{}[8000-sampleRate] + return answer.Marshal() }