Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow requesting a ringtone during SIP transfers #207

Merged
merged 7 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.22.7

toolchain go1.22.8

replace github.com/livekit/protocol => ../protocol

require (
github.com/at-wat/ebml-go v0.17.1
github.com/emiago/sipgo v0.13.1
Expand Down
32 changes: 30 additions & 2 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"context"
"errors"
"fmt"
"math"
"net/netip"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/emiago/sipgo/sip"
"github.com/frostbyte73/core"
"github.com/icholy/digest"

"github.com/livekit/protocol/livekit"
Expand All @@ -39,6 +41,7 @@ import (
"github.com/livekit/sip/pkg/media"
"github.com/livekit/sip/pkg/media/dtmf"
"github.com/livekit/sip/pkg/media/rtp"
"github.com/livekit/sip/pkg/media/tones"
"github.com/livekit/sip/pkg/stats"
"github.com/livekit/sip/res"
)
Expand Down Expand Up @@ -274,6 +277,7 @@ type inboundCall struct {
joinDur func() time.Duration
forwardDTMF atomic.Bool
done atomic.Bool
started core.Fuse
}

func (s *Server) newInboundCall(
Expand Down Expand Up @@ -427,6 +431,9 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
return // already sent a response
}
}

c.started.Break()

// Wait for the caller to terminate the call.
select {
case <-ctx.Done():
Expand Down Expand Up @@ -766,8 +773,29 @@ func (c *inboundCall) handleDTMF(tone dtmf.Event) {
}
}

func (c *inboundCall) transferCall(ctx context.Context, transferTo string) error {
err := c.cc.TransferCall(ctx, transferTo)
func (c *inboundCall) transferCall(ctx context.Context, transferTo string, ringtone bool) error {
var err error

if ringtone && c.started.IsBroken() && !c.done.Load() {
const ringVolume = math.MaxInt16 / 2
rctx, rcancel := context.WithCancel(ctx)
defer rcancel()

// mute the room audio to the SIP participant
w := c.lkRoom.SwapOutput(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since w may contain C resampler, we need to make sure it's either set back via SwapOutput, or closed if it's no longer needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for confusing, but this still is not closed. The c.media.GetAudioWriter() is fine, it will be closed when c.media is closed. The problem is with w - you detach it from the room, so it won't be closed if the transfer succeeds. It should probably be in the defer below - it either swaps w back to the room, or should close it.


defer func() {
if err != nil && !c.done.Load() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this should check an actual return var of the function. I usually declare it with a different name to make sure it's the right one:

func (c *inboundCall) transferCall(...) (gerr error) {
	// ...
	defer func() {
		if gerr != nil {
			// ...
		}
	}()

c.lkRoom.SwapOutput(w)
}
}()

go func() {
tones.Play(rctx, c.media.GetAudioWriter(), ringVolume, tones.ETSIRinging)
}()
}

err = c.cc.TransferCall(ctx, transferTo)
if err != nil {
c.log.Infow("inbound call failed to transfer", "error", err, "transferTo", transferTo)
return err
Expand Down
40 changes: 30 additions & 10 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ type outboundCall struct {
log logger.Logger
cc *sipOutbound
media *MediaPort
started core.Fuse
stopped core.Fuse
closing core.Fuse

mu sync.RWMutex
mon *stats.CallMonitor
lkRoom *Room
lkRoomIn media.PCM16Writer // output to room; OPUS at 48k
sipConf sipOutboundConfig
sipRunning bool
mu sync.RWMutex
mon *stats.CallMonitor
lkRoom *Room
lkRoomIn media.PCM16Writer // output to room; OPUS at 48k
sipConf sipOutboundConfig
}

func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig) (*outboundCall, error) {
Expand Down Expand Up @@ -223,6 +223,7 @@ func (c *outboundCall) ConnectSIP(ctx context.Context) error {
return fmt.Errorf("update SIP failed: %w", err)
}
c.connectMedia()
c.started.Break()
c.lkRoom.Subscribe()
c.log.Infow("Outbound SIP call established")
return nil
Expand Down Expand Up @@ -286,7 +287,6 @@ func (c *outboundCall) dialSIP(ctx context.Context) error {
}
c.setStatus(CallActive)

c.sipRunning = true
return nil
}

Expand Down Expand Up @@ -327,7 +327,6 @@ func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan stru
func (c *outboundCall) stopSIP(reason string) {
c.mon.CallTerminate(reason)
c.cc.Close()
c.sipRunning = false
}

func (c *outboundCall) setStatus(v CallStatus) {
Expand Down Expand Up @@ -433,8 +432,29 @@ func (c *outboundCall) handleDTMF(ev dtmf.Event) {
}, lksdk.WithDataPublishReliable(true))
}

func (c *outboundCall) transferCall(ctx context.Context, transferTo string) error {
err := c.cc.transferCall(ctx, transferTo)
func (c *outboundCall) transferCall(ctx context.Context, transferTo string, ringtone bool) error {
var err error

if ringtone && c.started.IsBroken() && !c.stopped.IsBroken() {
const ringVolume = math.MaxInt16 / 2
rctx, rcancel := context.WithCancel(ctx)
defer rcancel()

// mute the room audio to the SIP participant
w := c.lkRoom.SwapOutput(nil)

defer func() {
if err != nil && !c.stopped.IsBroken() {
c.lkRoom.SwapOutput(w)
}
}()

go func() {
tones.Play(rctx, c.media.GetAudioWriter(), ringVolume, tones.ETSIRinging)
}()
}

err = c.cc.transferCall(ctx, transferTo)
if err != nil {
c.log.Infow("outound call failed to transfer", "error", err, "transferTo", transferTo)
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/sip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT
ctx, cdone := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cdone()

err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo)
err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo, req.PlayRingtone)
transfetResult.Store(&err)
close(done)

Expand All @@ -177,14 +177,14 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT
}
}

func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string) error {
func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string, ringtone bool) error {
// Look for call both in client (outbound) and server (inbound)
s.cli.cmu.Lock()
out := s.cli.activeCalls[LocalTag(callID)]
s.cli.cmu.Unlock()

if out != nil {
err := out.transferCall(ctx, transferTo)
err := out.transferCall(ctx, transferTo, ringtone)
if err != nil {
return err
}
Expand All @@ -197,7 +197,7 @@ func (s *Service) processParticipantTransfer(ctx context.Context, callID string,
s.srv.cmu.Unlock()

if in != nil {
err := in.transferCall(ctx, transferTo)
err := in.transferCall(ctx, transferTo, ringtone)
if err != nil {
return err
}
Expand Down
Loading