Skip to content

Commit

Permalink
stress: send a message with n_clients/total clients in channel (#668)
Browse files Browse the repository at this point in the history
Closes #654

<img width="1081" alt="image"
src="https://github.com/user-attachments/assets/283b6330-132d-4db9-8f48-1e442e57b3ea">


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit


- **New Features**
- Enhanced messaging capabilities during chat stress tests, including
real-time updates on connected clients.
- Introduced statistics reporting to monitor client interactions and
engagement in the chat.

- **Improvements**
- Clarified variable naming for event IDs related to chat messages for
better readability.
- Expanded the `ChatConfig` interface, allowing for flexible event
handling.

- **Bug Fixes**
- Managed lifecycle of statistics reporting to ensure accurate
performance monitoring during chat sessions.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: texuf <[email protected]>
  • Loading branch information
miguel-nascimento and texuf authored Aug 9, 2024
1 parent db61ce6 commit f8c0392
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 2 deletions.
11 changes: 9 additions & 2 deletions packages/stress/src/mode/chat/kickoffChat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@ export async function kickoffChat(rootClient: StressClient, cfg: ChatConfig) {
const shareKeysDuration = Date.now() - shareKeysStart

logger.log('send message')
const { eventId } = await rootClient.sendMessage(
const { eventId: kickoffMessageEventId } = await rootClient.sendMessage(
announceChannelId,
`hello, we're starting the stress test now!, containers: ${cfg.containerCount} ppc: ${cfg.processesPerContainer} clients: ${cfg.clientsCount} randomNewClients: ${cfg.randomClients.length} sessionId: ${sessionId}`,
)
const { eventId: countClientsMessageEventId } = await rootClient.sendMessage(
cfg.announceChannelId,
`Clients: 0/${cfg.clientsCount} 🤖`,
)

cfg.kickoffMessageEventId = kickoffMessageEventId
cfg.countClientsMessageEventId = countClientsMessageEventId

const initialStats = {
timeToShareKeys: shareKeysDuration + 'ms',
Expand All @@ -44,7 +51,7 @@ export async function kickoffChat(rootClient: StressClient, cfg: ChatConfig) {
`System Info: ${makeCodeBlock(getSystemInfo())} Initial Stats: ${makeCodeBlock(
initialStats,
)}`,
{ threadId: eventId },
{ threadId: kickoffMessageEventId },
)

const mintMembershipForWallet = async (wallet: Wallet, i: number) => {
Expand Down
9 changes: 9 additions & 0 deletions packages/stress/src/mode/chat/root_chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { joinChat } from './joinChat'
import { updateProfile } from './updateProfile'
import { chitChat } from './chitChat'
import { sumarizeChat } from './sumarizeChat'
import { statsReporter } from './statsReporter'

function getStressDuration(): number {
check(isSet(process.env.STRESS_DURATION), 'process.env.STRESS_DURATION')
Expand Down Expand Up @@ -56,6 +57,8 @@ function getChatConfig(opts: { processIndex: number; rootWallet: Wallet }): Chat
throw new Error('clientStartIndex >= clientEndIndex')
}
return {
kickoffMessageEventId: undefined,
countClientsMessageEventId: undefined,
containerIndex,
containerCount,
processIndex: opts.processIndex,
Expand Down Expand Up @@ -106,7 +109,11 @@ export async function startStressChat(opts: {
`clients.length !== chatConfig.clientsPerProcess ${clients.length} !== ${chatConfig.clientsPerProcess}`,
)

let cancelStatsReporting: (() => void) | undefined

if (chatConfig.processIndex === 0) {
cancelStatsReporting = statsReporter(clients[0], chatConfig)

for (
let i = chatConfig.clientsCount;
i < chatConfig.clientsCount + chatConfig.randomClientsCount;
Expand Down Expand Up @@ -164,6 +171,8 @@ export async function startStressChat(opts: {

logger.log('done', { summary })

cancelStatsReporting?.()

for (let i = 0; i < clients.length; i += 1) {
const client = clients[i]
logger.log(`stopping ${client.logId}`)
Expand Down
95 changes: 95 additions & 0 deletions packages/stress/src/mode/chat/statsReporter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { dlogger } from '@river-build/dlog'
import type { StressClient } from '../../utils/stressClient'
import { ChatConfig } from './types'

const logger = dlogger('stress:statsReporter')

export function statsReporter(rootClient: StressClient, chatConfig: ChatConfig) {
let canceled = false
let lastReactionCount = 0
const interval = setInterval(() => {
if (canceled) {
return
}
void (async () => {
if (chatConfig.kickoffMessageEventId && chatConfig.countClientsMessageEventId) {
const reactionCount = countReactions(
rootClient,
chatConfig.announceChannelId,
chatConfig.kickoffMessageEventId,
)
if (canceled) {
return
}
if (lastReactionCount === reactionCount) {
return
}
lastReactionCount = reactionCount
await updateCountClients(
rootClient,
chatConfig.announceChannelId,
chatConfig.countClientsMessageEventId,
chatConfig.clientsCount,
reactionCount,
)
}
})()
}, 5000)

return () => {
logger.info('canceled')
clearInterval(interval)
canceled = true
}
}

export const updateCountClients = async (
client: StressClient,
announceChannelId: string,
countClientsMessageEventId: string,
totalClients: number,
reactionCounts: number,
) => {
logger.info(`Clients: ${reactionCounts}/${totalClients} 🤖`)
return await client.streamsClient.sendChannelMessage_Edit_Text(
announceChannelId,
countClientsMessageEventId,
{
content: {
body: `Clients: ${reactionCounts}/${totalClients} 🤖`,
mentions: [],
attachments: [],
},
},
)
}

export const countReactions = (
client: StressClient,
announceChannelId: string,
rootMessageId: string,
) => {
const channel = client.streamsClient.stream(announceChannelId)
if (!channel) {
return 0
}
const message = channel.view.events.get(rootMessageId)
if (!message) {
return 0
}

const reactions = channel.view.timeline.filter((event) => {
if (event.localEvent?.channelMessage.payload.case === 'reaction') {
return event.localEvent?.channelMessage.payload.value.refEventId === rootMessageId
}
if (
event.decryptedContent?.kind === 'channelMessage' &&
event.decryptedContent?.content.payload.case === 'reaction'
) {
return event.decryptedContent?.content.payload.value.refEventId === rootMessageId
}
return
})

return reactions.length
}
2 changes: 2 additions & 0 deletions packages/stress/src/mode/chat/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export interface ChatConfig {
sessionId: string
spaceId: string
announceChannelId: string
kickoffMessageEventId: string | undefined
countClientsMessageEventId: string | undefined
channelIds: string[]
allWallets: Wallet[]
randomClientsCount: number
Expand Down

0 comments on commit f8c0392

Please sign in to comment.