Skip to content

Commit

Permalink
Add a ringing state to participant attributes. (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc authored Nov 14, 2024
1 parent 855c338 commit 156a785
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 21 deletions.
15 changes: 10 additions & 5 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
if ok, err := c.waitMedia(ctx); !ok {
return false, err
}
c.setStatus(CallActive)
return true, nil
}

Expand Down Expand Up @@ -464,7 +465,11 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
}
ctx, cancel := context.WithTimeout(ctx, disp.MaxCallDuration)
defer cancel()
if err := c.joinRoom(ctx, disp.Room); err != nil {
status := CallRinging
if pinPrompt {
status = CallActive
}
if err := c.joinRoom(ctx, disp.Room, status); err != nil {
return errors.Wrap(err, "failed joining room")
}
// Publish our own track.
Expand Down Expand Up @@ -759,7 +764,7 @@ func (c *inboundCall) setStatus(v CallStatus) {
})
}

func (c *inboundCall) createLiveKitParticipant(ctx context.Context, rconf RoomConfig) error {
func (c *inboundCall) createLiveKitParticipant(ctx context.Context, rconf RoomConfig, status CallStatus) error {
ctx, span := tracer.Start(ctx, "inboundCall.createLiveKitParticipant")
defer span.End()
partConf := &rconf.Participant
Expand All @@ -769,7 +774,7 @@ func (c *inboundCall) createLiveKitParticipant(ctx context.Context, rconf RoomCo
for k, v := range c.extraAttrs {
partConf.Attributes[k] = v
}
partConf.Attributes[livekit.AttrSIPCallStatus] = CallActive.Attribute()
partConf.Attributes[livekit.AttrSIPCallStatus] = status.Attribute()
c.forwardDTMF.Store(true)
select {
case <-ctx.Done():
Expand Down Expand Up @@ -799,7 +804,7 @@ func (c *inboundCall) publishTrack() error {
return nil
}

func (c *inboundCall) joinRoom(ctx context.Context, rconf RoomConfig) error {
func (c *inboundCall) joinRoom(ctx context.Context, rconf RoomConfig, status CallStatus) error {
if c.joinDur != nil {
c.joinDur()
}
Expand All @@ -810,7 +815,7 @@ func (c *inboundCall) joinRoom(ctx context.Context, rconf RoomConfig) error {
"participantName", rconf.Participant.Name,
)
c.log.Infow("Joining room")
if err := c.createLiveKitParticipant(ctx, rconf); err != nil {
if err := c.createLiveKitParticipant(ctx, rconf, status); err != nil {
c.log.Errorw("Cannot create LiveKit participant", err)
c.close(true, callDropped, "participant-failed")
return errors.Wrap(err, "cannot create LiveKit participant")
Expand Down
36 changes: 22 additions & 14 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (c *outboundCall) connectMedia() {
c.media.HandleDTMF(c.handleDTMF)
}

func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan struct{}) (*sip.Response, error) {
func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan struct{}, setState func(code sip.StatusCode)) (*sip.Response, error) {
cnt := 0
for {
select {
Expand All @@ -337,13 +337,15 @@ func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan stru
case <-tx.Done():
return nil, psrpc.NewErrorf(psrpc.Canceled, "transaction failed to complete (%d intermediate responses)", cnt)
case res := <-tx.Responses():
switch res.StatusCode {
default:
status := res.StatusCode
if status/100 != 1 { // != 1xx
return res, nil
case 100, 180, 183:
// continue
cnt++
}
if setState != nil {
setState(res.StatusCode)
}
// continue
cnt++
}
}
}
Expand Down Expand Up @@ -402,7 +404,13 @@ func (c *outboundCall) sipSignal(ctx context.Context) error {

toUri := CreateURIFromUserAndAddress(c.sipConf.to, c.sipConf.address, TransportFrom(c.sipConf.transport))

sdpResp, err := c.cc.Invite(ctx, toUri, c.sipConf.user, c.sipConf.pass, c.sipConf.headers, sdpOffer)
ringing := false
sdpResp, err := c.cc.Invite(ctx, toUri, c.sipConf.user, c.sipConf.pass, c.sipConf.headers, sdpOffer, func(code sip.StatusCode) {
if !ringing && code >= sip.StatusRinging {
ringing = true
c.setStatus(CallRinging)
}
})
if err != nil {
// TODO: should we retry? maybe new offer will work
var e *ErrorStatus
Expand Down Expand Up @@ -434,7 +442,7 @@ func (c *outboundCall) sipSignal(ctx context.Context) error {
c.c.cmu.Unlock()

c.mon.InviteAccept()
err = c.cc.AckInvite(ctx)
err = c.cc.AckInviteOK(ctx)
if err != nil {
c.log.Infow("SIP accept failed", "error", err)
return err
Expand Down Expand Up @@ -577,7 +585,7 @@ func (c *sipOutbound) RemoteHeaders() Headers {
return c.inviteOk.Headers()
}

func (c *sipOutbound) Invite(ctx context.Context, to URI, user, pass string, headers map[string]string, sdpOffer []byte) ([]byte, error) {
func (c *sipOutbound) Invite(ctx context.Context, to URI, user, pass string, headers map[string]string, sdpOffer []byte, setState func(code sip.StatusCode)) ([]byte, error) {
ctx, span := tracer.Start(ctx, "sipOutbound.Invite")
defer span.End()
c.mu.Lock()
Expand All @@ -602,7 +610,7 @@ func (c *sipOutbound) Invite(ctx context.Context, to URI, user, pass string, hea
}
authLoop:
for {
req, resp, err = c.attemptInvite(ctx, dest, toHeader, sdpOffer, authHeaderRespName, authHeader, sipHeaders)
req, resp, err = c.attemptInvite(ctx, dest, toHeader, sdpOffer, authHeaderRespName, authHeader, sipHeaders, setState)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -692,15 +700,15 @@ authLoop:
return c.inviteOk.Body(), nil
}

func (c *sipOutbound) AckInvite(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "sipOutbound.AckInvite")
func (c *sipOutbound) AckInviteOK(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "sipOutbound.AckInviteOK")
defer span.End()
c.mu.Lock()
defer c.mu.Unlock()
return c.c.sipCli.WriteRequest(sip.NewAckRequest(c.invite, c.inviteOk, nil))
}

func (c *sipOutbound) attemptInvite(ctx context.Context, dest string, to *sip.ToHeader, offer []byte, authHeaderName, authHeader string, headers Headers) (*sip.Request, *sip.Response, error) {
func (c *sipOutbound) attemptInvite(ctx context.Context, dest string, to *sip.ToHeader, offer []byte, authHeaderName, authHeader string, headers Headers, setState func(code sip.StatusCode)) (*sip.Request, *sip.Response, error) {
ctx, span := tracer.Start(ctx, "sipOutbound.attemptInvite")
defer span.End()
req := sip.NewRequest(sip.INVITE, &to.Address)
Expand All @@ -726,7 +734,7 @@ func (c *sipOutbound) attemptInvite(ctx context.Context, dest string, to *sip.To
}
defer tx.Terminate()

resp, err := sipResponse(ctx, tx, c.c.closing.Watch())
resp, err := sipResponse(ctx, tx, c.c.closing.Watch(), setState)
return req, resp, err
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/sip/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func (v CallStatus) Attribute() string {
return "" // no attribute for these statuses
case CallDialing:
return "dialing"
case CallRinging:
return "ringing"
case CallAutomation:
return "automation"
case CallActive:
Expand Down Expand Up @@ -82,6 +84,7 @@ const (
callDropped = CallStatus(iota)
callFlood
CallDialing
CallRinging
CallAutomation
CallActive
CallHangup
Expand Down
4 changes: 2 additions & 2 deletions pkg/sip/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func sendAndACK(ctx context.Context, c Signaling, req *sip.Request) {
return
}
defer tx.Terminate()
r, err := sipResponse(ctx, tx, nil)
r, err := sipResponse(ctx, tx, nil, nil)
if err != nil {
return
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func sendRefer(ctx context.Context, c Signaling, req *sip.Request, stop <-chan s
defer tx.Terminate()

ctx = context.WithoutCancel(ctx)
resp, err := sipResponse(ctx, tx, stop)
resp, err := sipResponse(ctx, tx, stop, nil)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 156a785

Please sign in to comment.