From f8c03922068da0e720eeb86f22559ebcffc280f7 Mon Sep 17 00:00:00 2001 From: Miguel Gomes Date: Fri, 9 Aug 2024 18:01:53 -0300 Subject: [PATCH] stress: send a message with `n_clients/total clients` in channel (#668) Closes #654 image ## Summary by CodeRabbit - **New Features** - Enhanced messaging capabilities during chat stress tests, including real-time updates on connected clients. - Introduced statistics reporting to monitor client interactions and engagement in the chat. - **Improvements** - Clarified variable naming for event IDs related to chat messages for better readability. - Expanded the `ChatConfig` interface, allowing for flexible event handling. - **Bug Fixes** - Managed lifecycle of statistics reporting to ensure accurate performance monitoring during chat sessions. --------- Co-authored-by: texuf --- packages/stress/src/mode/chat/kickoffChat.ts | 11 ++- packages/stress/src/mode/chat/root_chat.ts | 9 ++ .../stress/src/mode/chat/statsReporter.ts | 95 +++++++++++++++++++ packages/stress/src/mode/chat/types.ts | 2 + 4 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 packages/stress/src/mode/chat/statsReporter.ts diff --git a/packages/stress/src/mode/chat/kickoffChat.ts b/packages/stress/src/mode/chat/kickoffChat.ts index 05f370368..771ee7e70 100644 --- a/packages/stress/src/mode/chat/kickoffChat.ts +++ b/packages/stress/src/mode/chat/kickoffChat.ts @@ -26,10 +26,17 @@ export async function kickoffChat(rootClient: StressClient, cfg: ChatConfig) { const shareKeysDuration = Date.now() - shareKeysStart logger.log('send message') - const { eventId } = await rootClient.sendMessage( + const { eventId: kickoffMessageEventId } = await rootClient.sendMessage( announceChannelId, `hello, we're starting the stress test now!, containers: ${cfg.containerCount} ppc: ${cfg.processesPerContainer} clients: ${cfg.clientsCount} randomNewClients: ${cfg.randomClients.length} sessionId: ${sessionId}`, ) + const { eventId: countClientsMessageEventId } = await rootClient.sendMessage( + cfg.announceChannelId, + `Clients: 0/${cfg.clientsCount} 🤖`, + ) + + cfg.kickoffMessageEventId = kickoffMessageEventId + cfg.countClientsMessageEventId = countClientsMessageEventId const initialStats = { timeToShareKeys: shareKeysDuration + 'ms', @@ -44,7 +51,7 @@ export async function kickoffChat(rootClient: StressClient, cfg: ChatConfig) { `System Info: ${makeCodeBlock(getSystemInfo())} Initial Stats: ${makeCodeBlock( initialStats, )}`, - { threadId: eventId }, + { threadId: kickoffMessageEventId }, ) const mintMembershipForWallet = async (wallet: Wallet, i: number) => { diff --git a/packages/stress/src/mode/chat/root_chat.ts b/packages/stress/src/mode/chat/root_chat.ts index 8eafb6e1d..69b332835 100644 --- a/packages/stress/src/mode/chat/root_chat.ts +++ b/packages/stress/src/mode/chat/root_chat.ts @@ -11,6 +11,7 @@ import { joinChat } from './joinChat' import { updateProfile } from './updateProfile' import { chitChat } from './chitChat' import { sumarizeChat } from './sumarizeChat' +import { statsReporter } from './statsReporter' function getStressDuration(): number { check(isSet(process.env.STRESS_DURATION), 'process.env.STRESS_DURATION') @@ -56,6 +57,8 @@ function getChatConfig(opts: { processIndex: number; rootWallet: Wallet }): Chat throw new Error('clientStartIndex >= clientEndIndex') } return { + kickoffMessageEventId: undefined, + countClientsMessageEventId: undefined, containerIndex, containerCount, processIndex: opts.processIndex, @@ -106,7 +109,11 @@ export async function startStressChat(opts: { `clients.length !== chatConfig.clientsPerProcess ${clients.length} !== ${chatConfig.clientsPerProcess}`, ) + let cancelStatsReporting: (() => void) | undefined + if (chatConfig.processIndex === 0) { + cancelStatsReporting = statsReporter(clients[0], chatConfig) + for ( let i = chatConfig.clientsCount; i < chatConfig.clientsCount + chatConfig.randomClientsCount; @@ -164,6 +171,8 @@ export async function startStressChat(opts: { logger.log('done', { summary }) + cancelStatsReporting?.() + for (let i = 0; i < clients.length; i += 1) { const client = clients[i] logger.log(`stopping ${client.logId}`) diff --git a/packages/stress/src/mode/chat/statsReporter.ts b/packages/stress/src/mode/chat/statsReporter.ts new file mode 100644 index 000000000..f5fa481bd --- /dev/null +++ b/packages/stress/src/mode/chat/statsReporter.ts @@ -0,0 +1,95 @@ +import { dlogger } from '@river-build/dlog' +import type { StressClient } from '../../utils/stressClient' +import { ChatConfig } from './types' + +const logger = dlogger('stress:statsReporter') + +export function statsReporter(rootClient: StressClient, chatConfig: ChatConfig) { + let canceled = false + let lastReactionCount = 0 + const interval = setInterval(() => { + if (canceled) { + return + } + void (async () => { + if (chatConfig.kickoffMessageEventId && chatConfig.countClientsMessageEventId) { + const reactionCount = countReactions( + rootClient, + chatConfig.announceChannelId, + chatConfig.kickoffMessageEventId, + ) + if (canceled) { + return + } + if (lastReactionCount === reactionCount) { + return + } + lastReactionCount = reactionCount + await updateCountClients( + rootClient, + chatConfig.announceChannelId, + chatConfig.countClientsMessageEventId, + chatConfig.clientsCount, + reactionCount, + ) + } + })() + }, 5000) + + return () => { + logger.info('canceled') + clearInterval(interval) + canceled = true + } +} + +export const updateCountClients = async ( + client: StressClient, + announceChannelId: string, + countClientsMessageEventId: string, + totalClients: number, + reactionCounts: number, +) => { + logger.info(`Clients: ${reactionCounts}/${totalClients} 🤖`) + return await client.streamsClient.sendChannelMessage_Edit_Text( + announceChannelId, + countClientsMessageEventId, + { + content: { + body: `Clients: ${reactionCounts}/${totalClients} 🤖`, + mentions: [], + attachments: [], + }, + }, + ) +} + +export const countReactions = ( + client: StressClient, + announceChannelId: string, + rootMessageId: string, +) => { + const channel = client.streamsClient.stream(announceChannelId) + if (!channel) { + return 0 + } + const message = channel.view.events.get(rootMessageId) + if (!message) { + return 0 + } + + const reactions = channel.view.timeline.filter((event) => { + if (event.localEvent?.channelMessage.payload.case === 'reaction') { + return event.localEvent?.channelMessage.payload.value.refEventId === rootMessageId + } + if ( + event.decryptedContent?.kind === 'channelMessage' && + event.decryptedContent?.content.payload.case === 'reaction' + ) { + return event.decryptedContent?.content.payload.value.refEventId === rootMessageId + } + return + }) + + return reactions.length +} diff --git a/packages/stress/src/mode/chat/types.ts b/packages/stress/src/mode/chat/types.ts index 597785958..79e88b1b3 100644 --- a/packages/stress/src/mode/chat/types.ts +++ b/packages/stress/src/mode/chat/types.ts @@ -12,6 +12,8 @@ export interface ChatConfig { sessionId: string spaceId: string announceChannelId: string + kickoffMessageEventId: string | undefined + countClientsMessageEventId: string | undefined channelIds: string[] allWallets: Wallet[] randomClientsCount: number