Skip to content

Commit

Permalink
Use ServerTimestamp from the envelope as StreamOrder
Browse files Browse the repository at this point in the history
  • Loading branch information
smweber committed Sep 13, 2024
1 parent 1dc480b commit 693bc61
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 26 deletions.
23 changes: 14 additions & 9 deletions pkg/connector/handlesignal.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,16 @@ type Bv2ChatEvent struct {
}

var (
_ bridgev2.RemoteMessage = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteEdit = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteEventWithTimestamp = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteReaction = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteReactionRemove = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteMessageRemove = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteTyping = (*Bv2ChatEvent)(nil)
_ bridgev2.RemotePreHandler = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteChatInfoChange = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteMessage = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteEdit = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteEventWithTimestamp = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteReaction = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteReactionRemove = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteMessageRemove = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteTyping = (*Bv2ChatEvent)(nil)
_ bridgev2.RemotePreHandler = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteChatInfoChange = (*Bv2ChatEvent)(nil)
_ bridgev2.RemoteEventWithStreamOrder = (*Bv2ChatEvent)(nil)
)

func (evt *Bv2ChatEvent) GetType() bridgev2.RemoteEventType {
Expand Down Expand Up @@ -337,6 +338,10 @@ func (evt *Bv2ChatEvent) ConvertEdit(ctx context.Context, portal *bridgev2.Porta
}, nil
}

func (evt *Bv2ChatEvent) GetStreamOrder() int64 {
return int64(evt.Info.ServerTimestamp)
}

type Bv2Receipt struct {
Type signalpb.ReceiptMessage_Type
Chat networkid.PortalKey
Expand Down
3 changes: 2 additions & 1 deletion pkg/signalmeow/events/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ type MessageInfo struct {
Sender uuid.UUID
ChatID string

GroupRevision uint32
GroupRevision uint32
ServerTimestamp uint64
}

type ChatEvent struct {
Expand Down
36 changes: 20 additions & 16 deletions pkg/signalmeow/receiving.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,9 +744,9 @@ func (cli *Client) handleDecryptedResult(
log.Warn().Msg("sync message sent destination is nil")
} else if content.SyncMessage.Sent.Message != nil {
// TODO handle expiration start ts, and maybe the sync message ts?
cli.incomingDataMessage(ctx, content.SyncMessage.Sent.Message, cli.Store.ACI, syncDestinationServiceID)
cli.incomingDataMessage(ctx, content.SyncMessage.Sent.Message, cli.Store.ACI, syncDestinationServiceID, envelope.GetServerTimestamp())
} else if content.SyncMessage.Sent.EditMessage != nil {
cli.incomingEditMessage(ctx, content.SyncMessage.Sent.EditMessage, cli.Store.ACI, syncDestinationServiceID)
cli.incomingEditMessage(ctx, content.SyncMessage.Sent.EditMessage, cli.Store.ACI, syncDestinationServiceID, envelope.GetServerTimestamp())
}
}
if content.SyncMessage.Contacts != nil {
Expand Down Expand Up @@ -794,9 +794,9 @@ func (cli *Client) handleDecryptedResult(

var sendDeliveryReceipt bool
if content.DataMessage != nil {
sendDeliveryReceipt = cli.incomingDataMessage(ctx, content.DataMessage, theirServiceID.UUID, theirServiceID)
sendDeliveryReceipt = cli.incomingDataMessage(ctx, content.DataMessage, theirServiceID.UUID, theirServiceID, envelope.GetServerTimestamp())
} else if content.EditMessage != nil {
sendDeliveryReceipt = cli.incomingEditMessage(ctx, content.EditMessage, theirServiceID.UUID, theirServiceID)
sendDeliveryReceipt = cli.incomingEditMessage(ctx, content.EditMessage, theirServiceID.UUID, theirServiceID, envelope.GetServerTimestamp())
}
if sendDeliveryReceipt {
// TODO send delivery receipts after actually bridging instead of here
Expand All @@ -814,8 +814,9 @@ func (cli *Client) handleDecryptedResult(
}
cli.handleEvent(&events.ChatEvent{
Info: events.MessageInfo{
Sender: theirServiceID.UUID,
ChatID: groupOrUserID(groupID, theirServiceID),
Sender: theirServiceID.UUID,
ChatID: groupOrUserID(groupID, theirServiceID),
ServerTimestamp: envelope.GetServerTimestamp(),
},
Event: content.TypingMessage,
})
Expand All @@ -825,8 +826,9 @@ func (cli *Client) handleDecryptedResult(
if content.CallMessage != nil && (content.CallMessage.Offer != nil || content.CallMessage.Hangup != nil) {
cli.handleEvent(&events.Call{
Info: events.MessageInfo{
Sender: theirServiceID.UUID,
ChatID: theirServiceID.String(),
Sender: theirServiceID.UUID,
ChatID: theirServiceID.String(),
ServerTimestamp: envelope.GetServerTimestamp(),
},
IsRinging: content.CallMessage.Offer != nil,
})
Expand Down Expand Up @@ -954,7 +956,7 @@ func (cli *Client) handlePNISignatureMessage(ctx context.Context, sender libsign
return nil
}

func (cli *Client) incomingEditMessage(ctx context.Context, editMessage *signalpb.EditMessage, messageSenderACI uuid.UUID, chatRecipient libsignalgo.ServiceID) bool {
func (cli *Client) incomingEditMessage(ctx context.Context, editMessage *signalpb.EditMessage, messageSenderACI uuid.UUID, chatRecipient libsignalgo.ServiceID, serverTimestamp uint64) bool {
// If it's a group message, get the ID and invalidate cache if necessary
var groupID types.GroupIdentifier
var groupRevision uint32
Expand All @@ -972,16 +974,17 @@ func (cli *Client) incomingEditMessage(ctx context.Context, editMessage *signalp
}
cli.handleEvent(&events.ChatEvent{
Info: events.MessageInfo{
Sender: messageSenderACI,
ChatID: groupOrUserID(groupID, chatRecipient),
GroupRevision: groupRevision,
Sender: messageSenderACI,
ChatID: groupOrUserID(groupID, chatRecipient),
GroupRevision: groupRevision,
ServerTimestamp: serverTimestamp,
},
Event: editMessage,
})
return true
}

func (cli *Client) incomingDataMessage(ctx context.Context, dataMessage *signalpb.DataMessage, messageSenderACI uuid.UUID, chatRecipient libsignalgo.ServiceID) bool {
func (cli *Client) incomingDataMessage(ctx context.Context, dataMessage *signalpb.DataMessage, messageSenderACI uuid.UUID, chatRecipient libsignalgo.ServiceID, serverTimestamp uint64) bool {
// If there's a profile key, save it
if dataMessage.ProfileKey != nil {
profileKey := libsignalgo.ProfileKey(dataMessage.ProfileKey)
Expand Down Expand Up @@ -1009,9 +1012,10 @@ func (cli *Client) incomingDataMessage(ctx context.Context, dataMessage *signalp
}

evtInfo := events.MessageInfo{
Sender: messageSenderACI,
ChatID: groupOrUserID(groupID, chatRecipient),
GroupRevision: groupRevision,
Sender: messageSenderACI,
ChatID: groupOrUserID(groupID, chatRecipient),
GroupRevision: groupRevision,
ServerTimestamp: serverTimestamp,
}
// Hacky special case for group calls to cache the state
if dataMessage.GroupCallUpdate != nil {
Expand Down

0 comments on commit 693bc61

Please sign in to comment.