From a85b40a756c466550f701bb760c2b2f5bc11773c Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 3 Oct 2024 15:15:19 -0700 Subject: [PATCH] Transfer participant integration test (#186) This also makes sure that we do not cancel the transfer request if the RPC handler gets cancelled, and avoids situation where we may return an old transfer result to a 2nd transfer request if one failed before. --- pkg/sip/inbound.go | 8 +- pkg/sip/outbound.go | 9 +- pkg/sip/protocol.go | 16 +++- pkg/sip/service.go | 2 +- pkg/siptest/client.go | 145 ++++++++++++++++++++++++++++---- test/integration/sip_test.go | 156 +++++++++++++++++++++++++++++++---- 6 files changed, 292 insertions(+), 44 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 51a75f2..a44b489 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -746,6 +746,8 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string) error return err } + c.log.Infow("inbound call tranferred", "transferTo", transferTo) + // This is needed to actually terminate the session before a media timeout c.Close() @@ -760,7 +762,7 @@ func (s *Server) newInbound(id LocalTag, invite *sip.Request, inviteTx sip.Serve invite: invite, inviteTx: inviteTx, cancelled: make(chan struct{}), - referDone: make(chan error, 1), + referDone: make(chan error), // Do not buffer the channel to avoid reading a result for an old request } c.from, _ = invite.From() if c.from != nil { @@ -1096,14 +1098,14 @@ func (c *sipInbound) handleNotify(req *sip.Request, tx sip.ServerTransaction) er // Success select { case c.referDone <- nil: - default: + case <-time.After(notifyAckTimeout): } default: // Failure select { // TODO be more specific in the reported error case c.referDone <- psrpc.NewErrorf(psrpc.Canceled, "call transfer failed"): - default: + case <-time.After(notifyAckTimeout): } } } diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 847702b..65d613f 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -22,6 +22,7 @@ import ( "net/netip" "sort" "sync" + "time" "github.com/emiago/sipgo/sip" "github.com/frostbyte73/core" @@ -405,6 +406,8 @@ func (c *outboundCall) transferCall(ctx context.Context, transferTo string) erro return err } + c.log.Infow("outbound l tranferred", "transferTo", transferTo) + // This is needed to actually terminate the session before a media timeout c.CloseWithReason(CallHangup, "call transferred") @@ -423,7 +426,7 @@ func (c *Client) newOutbound(id LocalTag, from URI) *sipOutbound { c: c, id: id, from: fromHeader, - referDone: make(chan error, 1), + referDone: make(chan error), // Do not buffer the channel to avoid reading a result for an old request } } @@ -742,14 +745,14 @@ func (c *sipOutbound) handleNotify(req *sip.Request, tx sip.ServerTransaction) e // Success select { case c.referDone <- nil: - default: + case <-time.After(notifyAckTimeout): } default: // Failure select { // TODO be more specific in the reported error case c.referDone <- psrpc.NewErrorf(psrpc.Canceled, "call transfer failed"): - default: + case <-time.After(notifyAckTimeout): } } } diff --git a/pkg/sip/protocol.go b/pkg/sip/protocol.go index 83d503c..b7d9b01 100644 --- a/pkg/sip/protocol.go +++ b/pkg/sip/protocol.go @@ -19,13 +19,20 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/emiago/sipgo/sip" "github.com/livekit/psrpc" "github.com/pkg/errors" ) -var referIdRegexp = regexp.MustCompile(`^refer(;id=(\d+))?$`) +const ( + notifyAckTimeout = 5 * time.Second +) + +var ( + referIdRegexp = regexp.MustCompile(`^refer(;id=(\d+))?$`) +) type ErrorStatus struct { StatusCode int @@ -171,6 +178,13 @@ func parseNotifyBody(body string) (int, error) { func handleNotify(req *sip.Request) (method sip.RequestMethod, cseq uint32, status int, err error) { event := req.GetHeader("Event") + if event == nil { + event = req.GetHeader("o") + } + if event == nil { + return "", 0, 0, psrpc.NewErrorf(psrpc.MalformedRequest, "no event in NOTIFY request") + } + var cseq64 uint64 if m := referIdRegexp.FindStringSubmatch(strings.ToLower(event.Value())); len(m) > 0 { diff --git a/pkg/sip/service.go b/pkg/sip/service.go index 4126f1e..239ccb6 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -124,7 +124,7 @@ func (s *Service) CreateSIPParticipantAffinity(ctx context.Context, req *rpc.Int func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalTransferSIPParticipantRequest) (*emptypb.Empty, error) { s.log.Infow("transfering SIP call", "callID", req.SipCallId, "transferTo", req.TransferTo) - ctx, done := context.WithTimeout(ctx, 30*time.Second) + ctx, done := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) defer done() // Look for call both in client (outbound) and server (inbound) diff --git a/pkg/siptest/client.go b/pkg/siptest/client.go index 296d24e..a0b0a2c 100644 --- a/pkg/siptest/client.go +++ b/pkg/siptest/client.go @@ -34,6 +34,7 @@ import ( "github.com/at-wat/ebml-go/webm" "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" + "github.com/frostbyte73/core" "github.com/icholy/digest" "github.com/pion/sdp/v3" @@ -56,6 +57,7 @@ type ClientConfig struct { OnBye func() OnMediaTimeout func() OnDTMF func(ev dtmf.Event) + OnRefer func(req *sip.Request) Codec string } @@ -146,6 +148,14 @@ func NewClient(id string, conf ClientConfig) (*Client, error) { default: } }) + cli.sipServer.OnRefer(func(req *sip.Request, tx sip.ServerTransaction) { + if conf.OnRefer != nil { + conf.OnRefer(req) + } + + err = tx.Respond(sip.NewResponseFromRequest(req, 202, "Accepted", nil)) + tx.Terminate() + }) return cli, nil } @@ -168,7 +178,8 @@ type Client struct { inviteReq *sip.Request inviteResp *sip.Response recordHandler atomic.Pointer[rtp.Handler] - closed atomic.Bool + lastCSeq atomic.Uint32 + closed core.Fuse } func (c *Client) LocalIP() string { @@ -183,23 +194,22 @@ func (c *Client) RemoteHeaders() []sip.Header { } func (c *Client) Close() { - if !c.closed.CompareAndSwap(false, true) { - return - } - if c.mediaConn != nil { - c.mediaConn.Close() - } - if c.inviteResp != nil { - c.sendBye() - c.inviteReq = nil - c.inviteResp = nil - } - if c.sipClient != nil { - c.sipClient.Close() - } - if c.sipServer != nil { - c.sipServer.Close() - } + c.closed.Once(func() { + if c.mediaConn != nil { + c.mediaConn.Close() + } + if c.inviteResp != nil { + c.sendBye() + c.inviteReq = nil + c.inviteResp = nil + } + if c.sipClient != nil { + c.sipClient.Close() + } + if c.sipServer != nil { + c.sipServer.Close() + } + }) } func (c *Client) setupRTPReceiver() { @@ -322,6 +332,11 @@ func (c *Client) Dial(ip string, uri string, number string, headers map[string]s } c.inviteReq = req c.inviteResp = resp + + if h, ok := req.CSeq(); ok { + c.lastCSeq.Store(h.SeqNo) + } + c.mediaConn.SetDestAddr(dstAddr) c.log.Debug("client connected", "media-dst", dstAddr) return nil @@ -361,6 +376,10 @@ func (c *Client) sendBye() { req := sip.NewByeRequest(c.inviteReq, c.inviteResp, nil) req.AppendHeader(sip.NewHeader("User-Agent", "LiveKit")) + cseq := c.lastCSeq.Add(1) + cseqH, _ := req.CSeq() + cseqH.SeqNo = cseq + tx, err := c.sipClient.TransactionRequest(req) if err != nil { return @@ -381,6 +400,96 @@ func (c *Client) SendDTMF(digits string) error { return dtmf.Write(context.Background(), c.audioOut, c.mediaDTMF, c.mediaAudio.GetCurrentTimestamp(), digits) } +func (c *Client) SendNotify(eventReq *sip.Request, notifyStatus string) error { + var recipient *sip.Uri + + if contact, ok := eventReq.Contact(); ok { + recipient = &contact.Address + } else if from, ok := eventReq.From(); ok { + recipient = &from.Address + } else { + return errors.New("missing destination address") + } + + req := sip.NewRequest(sip.NOTIFY, recipient) + + req.SipVersion = eventReq.SipVersion + sip.CopyHeaders("Via", eventReq, req) + + if len(eventReq.GetHeaders("Route")) > 0 { + sip.CopyHeaders("Route", eventReq, req) + } else { + hdrs := c.inviteResp.GetHeaders("Record-Route") + for i := len(hdrs) - 1; i >= 0; i-- { + rrh, ok := hdrs[i].(*sip.RecordRouteHeader) + if !ok { + continue + } + + h := rrh.Clone() + req.AppendHeader(h) + } + } + + maxForwardsHeader := sip.MaxForwardsHeader(70) + req.AppendHeader(&maxForwardsHeader) + + if to, ok := eventReq.To(); ok { + req.AppendHeader((*sip.FromHeader)(to)) + } else { + return errors.New("missing To header in REFER request") + } + + if from, ok := eventReq.From(); ok { + req.AppendHeader((*sip.ToHeader)(from)) + } else { + return errors.New("missing From header in REFER request") + } + + if callId, ok := eventReq.CallID(); ok { + req.AppendHeader(callId) + } + + ct := sip.ContentTypeHeader("message/sipfrag") + req.AppendHeader(&ct) + + cseq := c.lastCSeq.Add(1) + cseqH := &sip.CSeqHeader{ + SeqNo: cseq, + MethodName: sip.NOTIFY, + } + req.AppendHeader(cseqH) + + req.SetTransport(eventReq.Transport()) + req.SetSource(eventReq.Destination()) + req.SetDestination(eventReq.Source()) + + if eventCSeq, ok := eventReq.CSeq(); ok { + req.AppendHeader(sip.NewHeader("Event", fmt.Sprintf("refer;id=%d", eventCSeq.SeqNo))) + } else { + return errors.New("missing CSeq header in REFER request") + } + + req.SetBody([]byte(notifyStatus)) + + tx, err := c.sipClient.TransactionRequest(req) + if err != nil { + return err + } + defer tx.Terminate() + + resp, err := getResponse(tx) + if err != nil { + return err + } + + if resp.StatusCode != sip.StatusOK { + return fmt.Errorf("NOTIFY failed with status %d", resp.StatusCode) + } + + return nil +} + func (c *Client) createOffer() ([]byte, error) { sessionId := rand.Uint64() diff --git a/test/integration/sip_test.go b/test/integration/sip_test.go index fea1e72..6e194e6 100644 --- a/test/integration/sip_test.go +++ b/test/integration/sip_test.go @@ -11,6 +11,9 @@ import ( "testing" "time" + sipgo "github.com/emiago/sipgo/sip" + "github.com/stretchr/testify/require" + "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -20,10 +23,6 @@ import ( "github.com/livekit/psrpc" lksdk "github.com/livekit/server-sdk-go/v2" - "github.com/livekit/sip/pkg/stats" - - "github.com/stretchr/testify/require" - "github.com/livekit/sip/pkg/config" "github.com/livekit/sip/pkg/media/dtmf" "github.com/livekit/sip/pkg/media/g711" @@ -31,6 +30,7 @@ import ( "github.com/livekit/sip/pkg/service" "github.com/livekit/sip/pkg/sip" "github.com/livekit/sip/pkg/siptest" + "github.com/livekit/sip/pkg/stats" "github.com/livekit/sip/test/lktest" ) @@ -225,11 +225,11 @@ func (s *SIPServer) DeleteDispatch(t testing.TB, id string) { } } -func runClient(t testing.TB, conf *NumberConfig, id string, number string, forcePin bool, headers map[string]string, onDTMF func(ev dtmf.Event)) *siptest.Client { - return runClientWithCodec(t, conf, id, number, "", forcePin, headers, onDTMF) +func runClient(t testing.TB, conf *NumberConfig, id string, number string, forcePin bool, headers map[string]string, onDTMF func(ev dtmf.Event), onBye func(), onRefer func(req *sipgo.Request)) *siptest.Client { + return runClientWithCodec(t, conf, id, number, "", forcePin, headers, onDTMF, onBye, onRefer) } -func runClientWithCodec(t testing.TB, conf *NumberConfig, id string, number string, codec string, forcePin bool, headers map[string]string, onDTMF func(ev dtmf.Event)) *siptest.Client { +func runClientWithCodec(t testing.TB, conf *NumberConfig, id string, number string, codec string, forcePin bool, headers map[string]string, onDTMF func(ev dtmf.Event), onBye func(), onRefer func(req *sipgo.Request)) *siptest.Client { cconf := siptest.ClientConfig{ // IP: dockerBridgeIP, Number: number, @@ -240,7 +240,9 @@ func runClientWithCodec(t testing.TB, conf *NumberConfig, id string, number stri OnMediaTimeout: func() { t.Fatal("media timeout from server to test client") }, - OnDTMF: onDTMF, + OnDTMF: onDTMF, + OnBye: onBye, + OnRefer: onRefer, } cli, err := siptest.NewClient(id, cconf) @@ -265,18 +267,21 @@ func runClientWithCodec(t testing.TB, conf *NumberConfig, id string, number stri const ( serverNumber = "+000000000" clientNumber = "+111111111" + transferNumber = "+222222222" participantsJoinTimeout = 5 * time.Second participantsJoinWithPinTimeout = participantsJoinTimeout + 5*time.Second participantsLeaveTimeout = 3 * time.Second webrtcSetupDelay = 5 * time.Second + notifyIntervalDelay = 100 * time.Millisecond ) func TestSIPJoinOpenRoom(t *testing.T) { lk := runLiveKit(t) var ( - dmu sync.Mutex - dtmfOut string - dtmfIn string + dmu sync.Mutex + dtmfOut string + dtmfIn string + referRequest *sipgo.Request ) const ( clientID = "test-cli" @@ -311,12 +316,21 @@ func TestSIPJoinOpenRoom(t *testing.T) { customAttr: customVal, }) + transferDone := make(chan struct{}) + byeReceived := make(chan struct{}) + cli := runClient(t, nc, clientID, clientNumber, false, map[string]string{ "X-LK-Inbound": "1", }, func(ev dtmf.Event) { dmu.Lock() defer dmu.Unlock() dtmfIn += string(ev.Digit) + }, func() { + close(byeReceived) + }, func(req *sipgo.Request) { + dmu.Lock() + defer dmu.Unlock() + referRequest = req }) h := sip.Headers(cli.RemoteHeaders()).GetHeader("X-LK-Accepted") @@ -376,10 +390,53 @@ func TestSIPJoinOpenRoom(t *testing.T) { return dtmfIn == "4567" }, 5*time.Second, time.Second/2) + go func() { + // TransferSIPParticipant is synchronous + _, err = lk.SIP.TransferSIPParticipant(context.Background(), &livekit.TransferSIPParticipantRequest{ + RoomName: roomName, + ParticipantIdentity: "sip_" + clientNumber, + TransferTo: "tel:" + transferNumber, + }) + require.NoError(t, err) + close(transferDone) + }() + + require.Eventually(t, func() bool { + dmu.Lock() + defer dmu.Unlock() + + return referRequest != nil + + }, 5*time.Second, time.Second/2) + + require.Equal(t, sipgo.REFER, referRequest.Method) + transferTo := referRequest.GetHeader("Refer-To") + require.Equal(t, "tel:"+transferNumber, transferTo.Value()) + + time.Sleep(notifyIntervalDelay) + err = cli.SendNotify(referRequest, "SIP/2.0 100 Trying") + require.NoError(t, err) + + time.Sleep(notifyIntervalDelay) + err = cli.SendNotify(referRequest, "SIP/2.0 200 OK") + require.NoError(t, err) + + select { + case <-transferDone: + case <-time.After(participantsLeaveTimeout): + t.Fatal("participant transfer call never completed") + } + + select { + case <-byeReceived: + case <-time.After(participantsLeaveTimeout): + t.Fatal("did not receive bye after notify") + } + cli.Close() r.Disconnect() - // SIP participant must disconnect from LK room on hangup. + // SIP participant should have left ctx, cancel = context.WithTimeout(context.Background(), participantsLeaveTimeout) defer cancel() lk.ExpectRoomWithParticipants(t, ctx, roomName, nil) @@ -388,8 +445,9 @@ func TestSIPJoinOpenRoom(t *testing.T) { func TestSIPJoinPinRoom(t *testing.T) { lk := runLiveKit(t) var ( - dmu sync.Mutex - dtmf string + dmu sync.Mutex + dtmf string + referRequest *sipgo.Request ) const ( clientID = "test-cli" @@ -424,9 +482,15 @@ func TestSIPJoinPinRoom(t *testing.T) { customAttr: customVal, }) + transferDone := make(chan struct{}) + cli := runClient(t, nc, clientID, clientNumber, false, map[string]string{ "X-LK-Inbound": "1", - }, nil) + }, nil, nil, func(req *sipgo.Request) { + dmu.Lock() + defer dmu.Unlock() + referRequest = req + }) // Even though we set this header in the dispatch rule, PIN forces us to send response earlier. // Because of this, we can no longer attach attributes from a selected dispatch rule later. @@ -482,6 +546,62 @@ func TestSIPJoinPinRoom(t *testing.T) { return dtmf == dtmfDigits }, 5*time.Second, time.Second/2) + go func() { + // TransferSIPParticipant is synchronous + _, err = lk.SIP.TransferSIPParticipant(context.Background(), &livekit.TransferSIPParticipantRequest{ + RoomName: "test-priv", + ParticipantIdentity: "sip_" + clientNumber, + TransferTo: "tel:" + transferNumber, + }) + require.Error(t, err) + close(transferDone) + }() + + require.Eventually(t, func() bool { + dmu.Lock() + defer dmu.Unlock() + + return referRequest != nil + + }, 5*time.Second, time.Second/2) + + require.Equal(t, sipgo.REFER, referRequest.Method) + transferTo := referRequest.GetHeader("Refer-To") + require.Equal(t, "tel:"+transferNumber, transferTo.Value()) + + time.Sleep(notifyIntervalDelay) + err = cli.SendNotify(referRequest, "SIP/2.0 403 Fobidden") + require.NoError(t, err) + + select { + case <-transferDone: + case <-time.After(participantsLeaveTimeout): + t.Fatal("participant transfer call never completed") + } + + // Participants should all still be there + time.Sleep(time.Second) + lk.ExpectRoomWithParticipants(t, ctx, roomName, []lktest.ParticipantInfo{ + {Identity: "test"}, + { + Identity: "sip_" + clientNumber, + Name: "Phone " + clientNumber, + Kind: livekit.ParticipantInfo_SIP, + Metadata: meta, + Attributes: map[string]string{ + "sip.callID": "", // special case + "sip.callStatus": "active", + "sip.trunkPhoneNumber": serverNumber, + "sip.phoneNumber": clientNumber, + "sip.ruleID": nc.RuleID, + "sip.trunkID": nc.TrunkID, + "lktest.id": clientID, + "test.lk.inbound": "1", // from SIP headers + customAttr: customVal, + }, + }, + }) + cli.Close() r.Disconnect() @@ -509,7 +629,7 @@ func TestSIPJoinOpenRoomWithPin(t *testing.T) { }) srv.CreateDirectDispatch(t, "test-priv", "1234", "", nil) - cli := runClient(t, nc, clientID, clientNumber, true, nil, nil) + cli := runClient(t, nc, clientID, clientNumber, true, nil, nil, nil, nil) // Send audio, so that we don't trigger media timeout. mctx, mcancel := context.WithCancel(context.Background()) @@ -571,7 +691,7 @@ func TestSIPJoinRoomIndividual(t *testing.T) { rch <- room }() - cli := runClient(t, nc, clientID, clientNumber, false, nil, nil) + cli := runClient(t, nc, clientID, clientNumber, false, nil, nil, nil, nil) // Send audio, so that we don't trigger media timeout. mctx, mcancel := context.WithCancel(context.Background()) @@ -651,7 +771,7 @@ func TestSIPAudio(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - cli := runClientWithCodec(t, nc, strconv.Itoa(i+1), fmt.Sprintf("+%d", 111111111*(i+1)), codec, false, nil, nil) + cli := runClientWithCodec(t, nc, strconv.Itoa(i+1), fmt.Sprintf("+%d", 111111111*(i+1)), codec, false, nil, nil, nil, nil) mu.Lock() clients[i] = cli audios[i] = cli