Skip to content

Commit

Permalink
Add “optional” to addEvent to allow error to be passed back to client (
Browse files Browse the repository at this point in the history
…#74)

Sometimes we try to add an event that has a good chance of failing.
Currently when we repond to KeySolicitations we first post a
KeyFulfillment to the channel. Lots of users can post this fulfillment
at the same time, but the node will only accept one of them

After this change, instead of throwing tons of errors polluting all
logs, we can now pass “optional: true” and check the error in the
calling code
  • Loading branch information
texuf authored May 27, 2024
1 parent 63db0cc commit 0ae84c7
Show file tree
Hide file tree
Showing 6 changed files with 469 additions and 326 deletions.
689 changes: 397 additions & 292 deletions core/node/protocol/protocol.pb.go

Large diffs are not rendered by default.

18 changes: 15 additions & 3 deletions core/node/rpc/add_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,24 @@ func (s *Service) localAddEvent(
log.Debug("localAddEvent", "parsedEvent", parsedEvent)

err = s.addParsedEvent(ctx, streamId, parsedEvent, nodes)
if err == nil {
if err != nil && req.Msg.Optional {
// aellis 5/2024 - we only want to wrap errors from canAddEvent,
// currently this is catching all errors, which is not ideal
addEventRequests.PassInc()
return connect.NewResponse(&AddEventResponse{}), nil
} else {
riverError := AsRiverError(err).Func("localAddEvent")
return connect.NewResponse(&AddEventResponse{
Error: &AddEventResponse_Error{
Code: riverError.Code,
Msg: riverError.Msg,
Funcs: riverError.Funcs,
},
}), nil
} else if err != nil {
addEventRequests.FailInc()
return nil, AsRiverError(err).Func("localAddEvent")
} else {
addEventRequests.PassInc()
return connect.NewResponse(&AddEventResponse{}), nil
}
}

Expand Down
17 changes: 16 additions & 1 deletion core/node/rpc/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,21 @@ func testRiverDeviceId(tester *serviceTester) {
),
)
require.Error(err) // expected error when calling AddEvent

// send it optionally
resp, err := client.AddEvent(
ctx,
connect.NewRequest(
&protocol.AddEventRequest{
StreamId: channelId[:],
Event: msg,
Optional: true,
},
),
)
require.NoError(err) // expected error when calling AddEvent
require.NotNil(resp.Msg.Error, "expected error")

}

func testSyncStreams(tester *serviceTester) {
Expand Down Expand Up @@ -917,7 +932,7 @@ func TestSingleAndMulti(t *testing.T) {
{"testRiverDeviceId", testRiverDeviceId},
{"testSyncStreams", testSyncStreams},
{"testAddStreamsToSync", testAddStreamsToSync},
{"testRemoveStreamsFromSync", testRemoveStreamsFromSync},
{"testRemoveStreamsFromSync", testRemoveStreamsFromSync},
}

t.Run("single", func(t *testing.T) {
Expand Down
59 changes: 31 additions & 28 deletions core/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
ChannelMessage_Post_Attachment,
MemberPayload_Nft,
CreateStreamRequest,
AddEventResponse_Error,
} from '@river-build/proto'
import {
bin_fromHexString,
Expand Down Expand Up @@ -1105,7 +1106,7 @@ export class Client
body: string,
mentions?: ChannelMessage_Post_Mention[],
attachments: ChannelMessage_Post_Attachment[] = [],
): Promise<string> {
): Promise<{ eventId: string }> {
return this.sendChannelMessage_Text(streamId, {
content: {
body,
Expand All @@ -1119,7 +1120,7 @@ export class Client
streamId: string,
payload: ChannelMessage,
opts: { beforeSendEventHook?: Promise<void> } = {},
): Promise<string> {
): Promise<{ eventId: string }> {
const stream = this.stream(streamId)
check(stream !== undefined, 'stream not found')
const localId = stream.view.appendLocalEvent(payload, 'sending', this)
Expand Down Expand Up @@ -1172,7 +1173,7 @@ export class Client
content: PlainMessage<ChannelMessage_Post_Content_Text>
},
opts: { beforeSendEventHook?: Promise<void> } = {},
): Promise<string> {
): Promise<{ eventId: string }> {
const { content, ...options } = payload
return this.sendChannelMessage(
streamId,
Expand All @@ -1198,7 +1199,7 @@ export class Client
content: PlainMessage<ChannelMessage_Post_Content_Image>
},
opts: { beforeSendEventHook?: Promise<void> } = {},
): Promise<string> {
): Promise<{ eventId: string }> {
const { content, ...options } = payload
return this.sendChannelMessage(
streamId,
Expand All @@ -1224,7 +1225,7 @@ export class Client
content: PlainMessage<ChannelMessage_Post_Content_GM>
},
opts: { beforeSendEventHook?: Promise<void> } = {},
): Promise<string> {
): Promise<{ eventId: string }> {
const { content, ...options } = payload
return this.sendChannelMessage(
streamId,
Expand Down Expand Up @@ -1254,20 +1255,14 @@ export class Client
data: data,
chunkIndex: chunkIndex,
})
return this.makeEventWithHashAndAddToStream(
streamId,
payload,
prevMiniblockHash,
undefined,
undefined,
)
return this.makeEventWithHashAndAddToStream(streamId, payload, prevMiniblockHash)
}

async sendChannelMessage_Reaction(
streamId: string,
payload: PlainMessage<ChannelMessage_Reaction>,
opts: { beforeSendEventHook?: Promise<void> } = {},
): Promise<string> {
): Promise<{ eventId: string }> {
return this.sendChannelMessage(
streamId,
new ChannelMessage({
Expand All @@ -1283,7 +1278,7 @@ export class Client
async sendChannelMessage_Redaction(
streamId: string,
payload: PlainMessage<ChannelMessage_Redaction>,
): Promise<string> {
): Promise<{ eventId: string }> {
const stream = this.stream(streamId)
if (!stream) {
throw new Error(`stream not found: ${streamId}`)
Expand All @@ -1306,7 +1301,7 @@ export class Client
streamId: string,
refEventId: string,
newPost: PlainMessage<ChannelMessage_Post>,
): Promise<string> {
): Promise<{ eventId: string }> {
return this.sendChannelMessage(
streamId,
new ChannelMessage({
Expand All @@ -1327,7 +1322,7 @@ export class Client
payload: Omit<PlainMessage<ChannelMessage_Post>, 'content'> & {
content: PlainMessage<ChannelMessage_Post_Content_Text>
},
): Promise<string> {
): Promise<{ eventId: string }> {
const { content, ...options } = payload
return this.sendChannelMessage_Edit(streamId, refEventId, {
...options,
Expand All @@ -1338,7 +1333,7 @@ export class Client
})
}

async redactMessage(streamId: string, eventId: string): Promise<string> {
async redactMessage(streamId: string, eventId: string): Promise<{ eventId: string }> {
const stream = this.stream(streamId)
check(isDefined(stream), 'stream not found')

Expand All @@ -1365,7 +1360,7 @@ export class Client
)
}

async inviteUser(streamId: string | Uint8Array, userId: string): Promise<string> {
async inviteUser(streamId: string | Uint8Array, userId: string): Promise<{ eventId: string }> {
await this.initStream(streamId)
check(isDefined(this.userStreamId))
return this.makeEventAndAddToStream(
Expand All @@ -1379,7 +1374,7 @@ export class Client
)
}

async joinUser(streamId: string | Uint8Array, userId: string): Promise<string> {
async joinUser(streamId: string | Uint8Array, userId: string): Promise<{ eventId: string }> {
await this.initStream(streamId)
check(isDefined(this.userStreamId))
return this.makeEventAndAddToStream(
Expand Down Expand Up @@ -1437,7 +1432,7 @@ export class Client
return stream
}

async leaveStream(streamId: string | Uint8Array): Promise<string> {
async leaveStream(streamId: string | Uint8Array): Promise<{ eventId: string }> {
this.logCall('leaveStream', streamId)
check(isDefined(this.userStreamId))

Expand Down Expand Up @@ -1466,7 +1461,7 @@ export class Client
)
}

async removeUser(streamId: string | Uint8Array, userId: string): Promise<string> {
async removeUser(streamId: string | Uint8Array, userId: string): Promise<{ eventId: string }> {
check(isDefined(this.userStreamId))
this.logCall('removeUser', streamId, userId)

Expand Down Expand Up @@ -1698,15 +1693,16 @@ export class Client
async makeEventAndAddToStream(
streamId: string | Uint8Array,
payload: PlainMessage<StreamEvent>['payload'],
options: { method?: string; localId?: string; cleartext?: string } = {},
): Promise<string> {
options: { method?: string; localId?: string; cleartext?: string; optional?: boolean } = {},
): Promise<{ eventId: string; error?: AddEventResponse_Error }> {
// TODO: filter this.logged payload for PII reasons
this.logCall(
'await makeEventAndAddToStream',
options.method,
streamId,
payload,
options.localId,
options.optional,
)
assert(this.userStreamId !== undefined, 'userStreamId must be set')

Expand All @@ -1718,24 +1714,26 @@ export class Client
isDefined(prevHash),
'no prev miniblock hash for stream ' + streamIdAsString(streamId),
)
const { eventId } = await this.makeEventWithHashAndAddToStream(
const { eventId, error } = await this.makeEventWithHashAndAddToStream(
streamId,
payload,
prevHash,
options.optional,
options.localId,
options.cleartext,
)
return eventId
return { eventId, error }
}

async makeEventWithHashAndAddToStream(
streamId: string | Uint8Array,
payload: PlainMessage<StreamEvent>['payload'],
prevMiniblockHash: Uint8Array,
optional?: boolean,
localId?: string,
cleartext?: string,
retryCount?: number,
): Promise<{ prevMiniblockHash: Uint8Array; eventId: string }> {
): Promise<{ prevMiniblockHash: Uint8Array; eventId: string; error?: AddEventResponse_Error }> {
const event = await makeEvent(this.signerContext, payload, prevMiniblockHash)
const eventId = bin_toHexString(event.hash)
if (localId) {
Expand All @@ -1751,11 +1749,16 @@ export class Client
}

try {
await this.rpcClient.addEvent({ streamId: streamIdAsBytes(streamId), event })
const { error } = await this.rpcClient.addEvent({
streamId: streamIdAsBytes(streamId),
event,
optional,
})
if (localId) {
const stream = this.streams.get(streamId)
stream?.view.updateLocalEvent(localId, eventId, 'sent', this)
}
return { prevMiniblockHash, eventId, error }
} catch (err) {
// custom retry logic for addEvent
// if we send up a stale prevMiniblockHash, the server will return a BAD_PREV_MINIBLOCK_HASH
Expand All @@ -1777,6 +1780,7 @@ export class Client
streamId,
payload,
bin_fromHexString(expectedHash),
optional,
isDefined(localId) ? eventId : undefined,
cleartext,
retryCount + 1,
Expand All @@ -1789,7 +1793,6 @@ export class Client
throw err
}
}
return { prevMiniblockHash, eventId }
}

async getStreamLastMiniblockHash(streamId: string | Uint8Array): Promise<Uint8Array> {
Expand Down
2 changes: 1 addition & 1 deletion core/stress/src/mode/chat/kickoffChat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export async function kickoffChat(rootClient: StressClient, cfg: ChatConfig) {
const shareKeysDuration = Date.now() - shareKeysStart

logger.log('send message')
const eventId = await rootClient.sendMessage(
const { eventId } = await rootClient.sendMessage(
announceChannelId,
`hello, we're starting the stress test now!, containers: ${cfg.containerCount} ppc: ${cfg.processesPerContainer} clients: ${cfg.clientsCount} sessionId: ${sessionId}`,
)
Expand Down
10 changes: 9 additions & 1 deletion protocol/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -653,9 +653,17 @@ message GetLastMiniblockHashResponse {
message AddEventRequest {
bytes stream_id = 1;
Envelope event = 2;
bool optional = 3; // if true, response will contain non nil error if event didn't pass validation
}

message AddEventResponse {}
message AddEventResponse {
message Error {
Err code = 1;
string msg = 2;
repeated string funcs = 3;
}
Error error = 1; // only set if AddEventRequest.optional is true
}

message SyncStreamsRequest {
repeated SyncCookie sync_pos = 1;
Expand Down

0 comments on commit 0ae84c7

Please sign in to comment.