Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow requesting a ringtone during SIP transfers #207

Merged
merged 7 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/jfreymuth/oggvorbis v1.0.5
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598
github.com/livekit/protocol v1.26.1-0.20241016113321-d16f740cf07b
github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.26.1-0.20241016113321-d16f740cf07b h1:kbGOwqbLMPTw8zMn8vluHoJIR+NjLDWbsMBI8GWJwzQ=
github.com/livekit/protocol v1.26.1-0.20241016113321-d16f740cf07b/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a h1:31YXXJLEwCflp7KEe9rRAwmONyCwHFujTl4MdxegTxw=
github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o=
Expand Down
2 changes: 1 addition & 1 deletion pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
user: req.Username,
pass: req.Password,
dtmf: req.Dtmf,
ringtone: req.PlayRingtone,
dialtone: req.PlayDialtone,
headers: req.Headers,
headersToAttrs: req.HeadersToAttributes,
ringingTimeout: req.RingingTimeout.AsDuration(),
Expand Down
37 changes: 35 additions & 2 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"context"
"errors"
"fmt"
"math"
"net/netip"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/emiago/sipgo/sip"
"github.com/frostbyte73/core"
"github.com/icholy/digest"

"github.com/livekit/protocol/livekit"
Expand All @@ -39,6 +41,7 @@ import (
"github.com/livekit/sip/pkg/media"
"github.com/livekit/sip/pkg/media/dtmf"
"github.com/livekit/sip/pkg/media/rtp"
"github.com/livekit/sip/pkg/media/tones"
"github.com/livekit/sip/pkg/stats"
"github.com/livekit/sip/res"
)
Expand Down Expand Up @@ -274,6 +277,7 @@ type inboundCall struct {
joinDur func() time.Duration
forwardDTMF atomic.Bool
done atomic.Bool
started core.Fuse
}

func (s *Server) newInboundCall(
Expand Down Expand Up @@ -427,6 +431,9 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
return // already sent a response
}
}

c.started.Break()

// Wait for the caller to terminate the call.
select {
case <-ctx.Done():
Expand Down Expand Up @@ -766,8 +773,34 @@ func (c *inboundCall) handleDTMF(tone dtmf.Event) {
}
}

func (c *inboundCall) transferCall(ctx context.Context, transferTo string) error {
err := c.cc.TransferCall(ctx, transferTo)
func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) error {
var err error

if dialtone && c.started.IsBroken() && !c.done.Load() {
const ringVolume = math.MaxInt16 / 2
rctx, rcancel := context.WithCancel(ctx)
defer rcancel()

// mute the room audio to the SIP participant
w := c.lkRoom.SwapOutput(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since w may contain C resampler, we need to make sure it's either set back via SwapOutput, or closed if it's no longer needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for confusing, but this still is not closed. The c.media.GetAudioWriter() is fine, it will be closed when c.media is closed. The problem is with w - you detach it from the room, so it won't be closed if the transfer succeeds. It should probably be in the defer below - it either swaps w back to the room, or should close it.


defer func() {
if err != nil && !c.done.Load() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this should check an actual return var of the function. I usually declare it with a different name to make sure it's the right one:

func (c *inboundCall) transferCall(...) (gerr error) {
	// ...
	defer func() {
		if gerr != nil {
			// ...
		}
	}()

c.lkRoom.SwapOutput(w)
} else {
w.Close()
}
}()

go func() {
aw := c.media.GetAudioWriter()

tones.Play(rctx, aw, ringVolume, tones.ETSIRinging)
aw.Close()
}()
}

err = c.cc.TransferCall(ctx, transferTo)
if err != nil {
c.log.Infow("inbound call failed to transfer", "error", err, "transferTo", transferTo)
return err
Expand Down
53 changes: 39 additions & 14 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type sipOutboundConfig struct {
user string
pass string
dtmf string
ringtone bool
dialtone bool
headers map[string]string
headersToAttrs map[string]string
ringingTimeout time.Duration
Expand All @@ -63,15 +63,15 @@ type outboundCall struct {
log logger.Logger
cc *sipOutbound
media *MediaPort
started core.Fuse
stopped core.Fuse
closing core.Fuse

mu sync.RWMutex
mon *stats.CallMonitor
lkRoom *Room
lkRoomIn media.PCM16Writer // output to room; OPUS at 48k
sipConf sipOutboundConfig
sipRunning bool
mu sync.RWMutex
mon *stats.CallMonitor
lkRoom *Room
lkRoomIn media.PCM16Writer // output to room; OPUS at 48k
sipConf sipOutboundConfig
}

func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig) (*outboundCall, error) {
Expand Down Expand Up @@ -223,6 +223,7 @@ func (c *outboundCall) ConnectSIP(ctx context.Context) error {
return fmt.Errorf("update SIP failed: %w", err)
}
c.connectMedia()
c.started.Break()
c.lkRoom.Subscribe()
c.log.Infow("Outbound SIP call established")
return nil
Expand All @@ -247,7 +248,7 @@ func (c *outboundCall) connectToRoom(ctx context.Context, lkNew RoomConfig) erro
if err := r.Connect(c.c.conf, lkNew); err != nil {
return err
}
// We have to create the track early because we might play a ringtone while SIP connects.
// We have to create the track early because we might play a dialtone while SIP connects.
// Thus, we are forced to set full sample rate here instead of letting the codec adapt to the SIP source sample rate.
local, err := r.NewParticipantTrack(RoomSampleRate)
if err != nil {
Expand All @@ -260,12 +261,12 @@ func (c *outboundCall) connectToRoom(ctx context.Context, lkNew RoomConfig) erro
}

func (c *outboundCall) dialSIP(ctx context.Context) error {
if c.sipConf.ringtone {
if c.sipConf.dialtone {
const ringVolume = math.MaxInt16 / 2
rctx, rcancel := context.WithCancel(ctx)
defer rcancel()

// Play a ringtone to the room while participant connects
// Play dialtone to the room while participant connects
go func() {
rctx, span := tracer.Start(rctx, "tones.Play")
defer span.End()
Expand All @@ -286,7 +287,6 @@ func (c *outboundCall) dialSIP(ctx context.Context) error {
}
c.setStatus(CallActive)

c.sipRunning = true
return nil
}

Expand Down Expand Up @@ -327,7 +327,6 @@ func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan stru
func (c *outboundCall) stopSIP(reason string) {
c.mon.CallTerminate(reason)
c.cc.Close()
c.sipRunning = false
}

func (c *outboundCall) setStatus(v CallStatus) {
Expand Down Expand Up @@ -433,8 +432,34 @@ func (c *outboundCall) handleDTMF(ev dtmf.Event) {
}, lksdk.WithDataPublishReliable(true))
}

func (c *outboundCall) transferCall(ctx context.Context, transferTo string) error {
err := c.cc.transferCall(ctx, transferTo)
func (c *outboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) error {
var err error

if dialtone && c.started.IsBroken() && !c.stopped.IsBroken() {
const ringVolume = math.MaxInt16 / 2
rctx, rcancel := context.WithCancel(ctx)
defer rcancel()

// mute the room audio to the SIP participant
w := c.lkRoom.SwapOutput(nil)

defer func() {
if err != nil && !c.stopped.IsBroken() {
c.lkRoom.SwapOutput(w)
} else {
w.Close()
}
}()

go func() {
aw := c.media.GetAudioWriter()

tones.Play(rctx, aw, ringVolume, tones.ETSIRinging)
aw.Close()
}()
}

err = c.cc.transferCall(ctx, transferTo)
if err != nil {
c.log.Infow("outound call failed to transfer", "error", err, "transferTo", transferTo)
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/sip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT
ctx, cdone := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cdone()

err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo)
err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo, req.PlayDialtone)
transfetResult.Store(&err)
close(done)

Expand All @@ -177,14 +177,14 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT
}
}

func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string) error {
func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string, dialtone bool) error {
// Look for call both in client (outbound) and server (inbound)
s.cli.cmu.Lock()
out := s.cli.activeCalls[LocalTag(callID)]
s.cli.cmu.Unlock()

if out != nil {
err := out.transferCall(ctx, transferTo)
err := out.transferCall(ctx, transferTo, dialtone)
if err != nil {
return err
}
Expand All @@ -197,7 +197,7 @@ func (s *Service) processParticipantTransfer(ctx context.Context, callID string,
s.srv.cmu.Unlock()

if in != nil {
err := in.transferCall(ctx, transferTo)
err := in.transferCall(ctx, transferTo, dialtone)
if err != nil {
return err
}
Expand Down
Loading