Skip to content

Commit

Permalink
Add persistence to the stress tests (#703)
Browse files Browse the repository at this point in the history
closes #659
closes #603

Test plan:
run
./packages/stress/scripts/start_redis.sh

./packages/stress/scripts/localhost_chat_setup.sh
./packages/stress/scripts/localhost_chat.sh

then run
./packages/stress/scripts/localhost_chat_setup.sh
a few more times
see "globalRunIndex" increment up in the reply
see "timeToShareKeys": "2ms", drop to nearly 0

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit


- **New Features**
- Added a task for "Start Stress Testing Redis" in the development
environment.
- Introduced scripts for managing Redis containers (`start_redis.sh`,
`stop_redis.sh`).
- Implemented Redis storage capabilities in the chat stress testing
module.
  
- **Documentation**
- Improved clarity and usability in the README for the `packages/stress`
module.
  
- **Bug Fixes**
- Enhanced error handling and environment variable management in various
scripts.
  
- **Refactor**
- Updated functions and interfaces to support Redis integration and
improved state management.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
texuf authored Aug 12, 2024
1 parent ce98bdd commit 02326c9
Show file tree
Hide file tree
Showing 15 changed files with 190 additions and 28 deletions.
10 changes: 10 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@
"group": "local-servers"
}
},
{
"label": "Start Stress Testing Redis",
"type": "shell",
"command": "./packages/stress/scripts/start_redis.sh",
"isBackground": true,
"problemMatcher": [],
"presentation": {
"group": "local-servers"
}
},
{
"label": "Casablanca",
"type": "shell",
Expand Down
13 changes: 6 additions & 7 deletions packages/stress/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# Stress

Running things in jest comes with like a huge amount of overhead per process. No f'ing clue what it's doing. Running things in node with custom-loader.mjs takes less overhead. (from looking at Activity Monitor while running clients)

Times simpile test just printing out os stats:
## Run it locally

```
jest: 731.20s user 196.02s system 765% cpu 2:01.08 total
node: 119.62s user 17.02s system 779% cpu 17.533 total // 7x speedup!!!
./scripts/localhost_chat_setup.sh && ./scripts/localhost_chat.sh
```

# Directions
### Totally optional, if you want to test persistence

Use files in scripts dir to launch load tests
```
./scripts/start_redis.sh
```
6 changes: 6 additions & 0 deletions packages/stress/scripts/docker_compose_redis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
services:
redis:
image: redis:latest
container_name: stress-redis
ports:
- '6379:6379'
19 changes: 17 additions & 2 deletions packages/stress/scripts/localhost_chat.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,21 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")"
cd ..

# run scripts/localhost_chat_setup.sh to set up the environment variables
source scripts/.env.localhost_chat
# List of environment files to source
ENV_FILES=(
"./scripts/.env.localhost_chat"
"./scripts/.env.storage"
)

# Loop through each file in the list
for file in "${ENV_FILES[@]}"; do
if [ -f "$file" ]; then
source "$file"
echo "Sourced: $file"
else
echo "Skipped: $file file does not exist."
fi
done

echo "stress/scripts/localhost_chat.sh"

Expand All @@ -23,10 +37,11 @@ echo "stress/scripts/localhost_chat.sh"
export SPACE_ID="${SPACE_ID}"
export CHANNEL_IDS="${CHANNEL_IDS}"
export ANNOUNCE_CHANNEL_ID="${ANNOUNCE_CHANNEL_ID:-}"
export REDIS_HOST="${REDIS_HOST:-}"

export RIVER_ENV="${RIVER_ENV:-local_multi}"
export STRESS_MODE="${STRESS_MODE:-chat}"
export STRESS_DURATION="${STRESS_DURATION:-360}"
export STRESS_DURATION="${STRESS_DURATION:-180}"
export SESSION_ID="${SESSION_ID:-$(uuidgen)}"

export PROCESSES_PER_CONTAINER="${PROCESSES_PER_CONTAINER:-4}"
Expand Down
1 change: 1 addition & 0 deletions packages/stress/scripts/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export RIVER_ENV="${RIVER_ENV}"
export BASE_CHAIN_RPC_URL="${BASE_CHAIN_RPC_URL}"
export RIVER_CHAIN_RPC_URL="${RIVER_CHAIN_RPC_URL}"
export MNEMONIC="${MNEMONIC}"
export REDIS_HOST="${REDIS_HOST:-}"
# stress
export SPACE_ID="${SPACE_ID}"
export ANNOUNCE_CHANNEL_ID="${ANNOUNCE_CHANNEL_ID:-}"
Expand Down
25 changes: 25 additions & 0 deletions packages/stress/scripts/start_redis.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash
set -euo pipefail
cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")"
cd ..

echo "stress/scripts/start_redis.sh"

# Specify the name or ID of the Docker container you want to stop
container_name="stress-redis"

# Check if the container is running
if docker ps --filter "name=$container_name" --format '{{.ID}}' | grep -qE "^[0-9a-f]+$"; then
# The container is running, so stop it

echo "Container $container_name is already running."
else
echo "Starting $container_name."

# write the environment variables to a file so the tests can load it
FILE="./scripts/.env.storage"
ENV_VAR="REDIS_HOST=http://localhost:6379"
echo $ENV_VAR >> $FILE # hard coded port from the docker compose file

docker-compose -p "stress" -f ./scripts/docker_compose_redis.yml up
fi
21 changes: 21 additions & 0 deletions packages/stress/scripts/stop_redis.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
set -euo pipefail
cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")"
cd ..

echo "stress/scripts/stop_redis.sh"

# remove the .env.storage file
rm -f ./scripts/.env.storage

# Specify the name or ID of the Docker container you want to stop
container_name="stress-redis"

# Check if the container is running
if docker ps --filter "name=$container_name" --format '{{.ID}}' | grep -qE "^[0-9a-f]+$"; then
# The container is running, so stop it
docker stop "$container_name"
echo "Container $container_name stopped."
else
echo "Container $container_name is not running."
fi
4 changes: 2 additions & 2 deletions packages/stress/src/demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ async function spamInfo(count: number) {

async function sendAMessage() {
logger.log('=======================send a message - start =======================')
const bob = await makeStressClient(config, 0, getRootWallet())
const bob = await makeStressClient(config, 0, getRootWallet(), undefined)
const { spaceId, defaultChannelId } = await bob.createSpace("bob's space")
await bob.sendMessage(defaultChannelId, 'hello')

logger.log('=======================send a message - make alice =======================')
const alice = await makeStressClient(config, 1)
const alice = await makeStressClient(config, 1, undefined, undefined)
await bob.spaceDapp.joinSpace(
spaceId,
alice.baseProvider.wallet.address,
Expand Down
7 changes: 7 additions & 0 deletions packages/stress/src/mode/chat/kickoffChat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ const logger = dlogger('stress:kickoffChat')
export async function kickoffChat(rootClient: StressClient, cfg: ChatConfig) {
logger.log('kickoffChat', rootClient.userId)
check(rootClient.clientIndex === 0, 'rootClient.clientIndex === 0')
const globalRunIndex = parseInt(
(await cfg.globalPersistedStore?.get('stress_global_run_index').catch(() => undefined)) ??
'0',
)
await cfg.globalPersistedStore?.set('stress_global_run_index', `${globalRunIndex + 1}`)

const { spaceId, sessionId } = cfg
const balance = await rootClient.baseProvider.wallet.getBalance()
const announceChannelId = cfg.announceChannelId
Expand Down Expand Up @@ -43,6 +49,7 @@ export async function kickoffChat(rootClient: StressClient, cfg: ChatConfig) {
walletBalance: balance.toString(),
testDuration: cfg.duration,
clientsCount: cfg.clientsCount,
globalRunIndex,
}

logger.log('start thread')
Expand Down
21 changes: 18 additions & 3 deletions packages/stress/src/mode/chat/root_chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { updateProfile } from './updateProfile'
import { chitChat } from './chitChat'
import { sumarizeChat } from './sumarizeChat'
import { statsReporter } from './statsReporter'
import { RedisStorage } from '../../utils/storage'

function getStressDuration(): number {
check(isSet(process.env.STRESS_DURATION), 'process.env.STRESS_DURATION')
Expand Down Expand Up @@ -53,6 +54,7 @@ function getChatConfig(opts: { processIndex: number; rootWallet: Wallet }): Chat
const randomClientsCount = process.env.RANDOM_CLIENTS_COUNT
? parseInt(process.env.RANDOM_CLIENTS_COUNT)
: 0
const storage = process.env.REDIS_HOST ? new RedisStorage(process.env.REDIS_HOST) : undefined
if (clientStartIndex >= clientEndIndex) {
throw new Error('clientStartIndex >= clientEndIndex')
}
Expand Down Expand Up @@ -81,6 +83,7 @@ function getChatConfig(opts: { processIndex: number; rootWallet: Wallet }): Chat
startedAtMs,
waitForSpaceMembershipTimeoutMs: Math.max(duration * 1000, 20000),
waitForChannelDecryptionTimeoutMs: Math.max(duration * 1000, 20000),
globalPersistedStore: storage,
} satisfies ChatConfig
}

Expand All @@ -100,7 +103,12 @@ export async function startStressChat(opts: {
logger.log('make clients')
const clients = await Promise.all(
chatConfig.localClients.wallets.map((wallet, i) =>
makeStressClient(opts.config, chatConfig.localClients.startIndex + i, wallet),
makeStressClient(
opts.config,
chatConfig.localClients.startIndex + i,
wallet,
chatConfig.globalPersistedStore,
),
),
)

Expand All @@ -119,7 +127,12 @@ export async function startStressChat(opts: {
i < chatConfig.clientsCount + chatConfig.randomClientsCount;
i++
) {
const rc = await makeStressClient(opts.config, i, ethers.Wallet.createRandom())
const rc = await makeStressClient(
opts.config,
i,
ethers.Wallet.createRandom(),
chatConfig.globalPersistedStore,
)
chatConfig.randomClients.push(rc)
}

Expand Down Expand Up @@ -179,6 +192,8 @@ export async function startStressChat(opts: {
await client.stop()
}

await chatConfig.globalPersistedStore?.close()

return { summary, chatConfig, opts }
}

Expand All @@ -190,7 +205,7 @@ export async function setupChat(opts: {
}) {
const logger = dlogger(`stress:setupChat`)
logger.log('setupChat')
const client = await makeStressClient(opts.config, 0, opts.rootWallet)
const client = await makeStressClient(opts.config, 0, opts.rootWallet, undefined)
// make a space
const { spaceId } = await client.createSpace('stress test space')
// make an announce channel
Expand Down
2 changes: 2 additions & 0 deletions packages/stress/src/mode/chat/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Wallet } from 'ethers'
import { StressClient } from '../../utils/stressClient'
import { IStorage } from '../../utils/storage'

export interface ChatConfig {
containerIndex: number
Expand All @@ -26,4 +27,5 @@ export interface ChatConfig {
startedAtMs: number
waitForSpaceMembershipTimeoutMs: number
waitForChannelDecryptionTimeoutMs: number
globalPersistedStore: IStorage | undefined
}
54 changes: 54 additions & 0 deletions packages/stress/src/utils/storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { dlogger } from '@river-build/dlog'
import Redis from 'ioredis'

const logger = dlogger('stress:storage')

export interface IStorage {
get(key: string): Promise<string | null>
set(key: string, value: string): Promise<void>
remove(key: string): Promise<void>
close(): Promise<void>
}

export class RedisStorage implements IStorage {
private client: Redis

constructor(uri: string) {
const url = new URL(uri)
const host = `${url.protocol}//${url.hostname}`
const port = parseInt(url.port)
const opts = host.includes('localhost') ? { port } : { host, port }
this.client = new Redis(opts)
}
async get(key: string): Promise<string | null> {
try {
const r = await this.client.get(key)
return r
} catch (error) {
logger.error(`Failed to get key ${key}:`, error)
return null
}
}
async set(key: string, value: string): Promise<void> {
try {
await this.client.set(key, value)
} catch (error) {
logger.error(`Failed to set key ${key} with value ${value}:`, error)
}
}
async remove(key: string): Promise<void> {
try {
await this.client.del(key)
} catch (error) {
logger.error(`Failed to remove key ${key}:`, error)
}
}

async close() {
try {
await this.client.quit()
} catch (error) {
logger.error('Failed to close Redis connection:', error)
}
}
}
28 changes: 16 additions & 12 deletions packages/stress/src/utils/stressClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ import { Wallet } from 'ethers'
import { PlainMessage } from '@bufbuild/protobuf'
import { ChannelMessage_Post_Attachment, ChannelMessage_Post_Mention } from '@river-build/proto'
import { waitFor } from './waitFor'
import { promises as fs } from 'node:fs'
import * as path from 'node:path'
import { sha256 } from 'ethers/lib/utils'
import { IStorage } from './storage'
const logger = dlogger('stress:stressClient')

export async function makeStressClient(
config: RiverConfig,
clientIndex: number,
inWallet?: Wallet,
inWallet: Wallet | undefined,
globalPersistedStore: IStorage | undefined,
) {
const { userId, signerContext, baseProvider, riverProvider, rpcClient } = await makeConnection(
config,
Expand Down Expand Up @@ -74,6 +74,7 @@ export async function makeStressClient(
rpcClient,
spaceDapp,
streamsClient,
globalPersistedStore,
)
}

Expand All @@ -88,6 +89,7 @@ export class StressClient {
public rpcClient: StreamRpcClient,
public spaceDapp: SpaceDapp,
public streamsClient: StreamsClient,
public globalPersistedStore: IStorage | undefined,
) {
logger.log('StressClient', { clientIndex, userId, logId: this.logId })
}
Expand All @@ -96,11 +98,8 @@ export class StressClient {
return `client${this.clientIndex}:${shortenHexString(this.userId)}`
}

get deviceFilePath(): string {
const envSuffix =
this.config.environmentId === 'gamma' ? '' : `-${this.config.environmentId}`
const filename = `stress-${this.userId}${envSuffix}`
return path.resolve(`/tmp/${filename}.json`)
get storageKey(): string {
return `stressclient_${this.userId}_${this.config.environmentId}`
}

async fundWallet() {
Expand Down Expand Up @@ -205,11 +204,13 @@ export class StressClient {
return
}
let device: ExportedDevice | undefined
const rawDevice = await fs.readFile(this.deviceFilePath, 'utf8').catch(() => undefined)
const rawDevice = await this.globalPersistedStore
?.get(this.storageKey)
.catch(() => undefined)
if (rawDevice) {
device = JSON.parse(rawDevice) as ExportedDevice
logger.info(
`Device imported from ${this.deviceFilePath}, outboundSessions: ${device.outboundSessions.length} inboundSessions: ${device.inboundSessions.length}`,
`Device imported from ${this.storageKey}, outboundSessions: ${device.outboundSessions.length} inboundSessions: ${device.inboundSessions.length}`,
)
}
const botPrivateKey = this.baseProvider.wallet.privateKey
Expand Down Expand Up @@ -282,8 +283,11 @@ export class StressClient {
const device = await this.streamsClient.cryptoBackend?.encryptionDevice.exportDevice()
if (device) {
try {
await fs.writeFile(this.deviceFilePath, JSON.stringify(device, null, 2))
logger.log(`Device exported to ${this.deviceFilePath}`)
await this.globalPersistedStore?.set(
this.storageKey,
JSON.stringify(device, null, 2),
)
logger.log(`Device exported to ${this.storageKey}`)
} catch (e) {
logger.error('Failed to export device', e)
}
Expand Down
2 changes: 0 additions & 2 deletions packages/stress/src/utils/systemInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import os from 'os'

export function getSystemInfo() {
return {
OperatingSystem: `${os.type()} ${os.release()}`,
SystemUptime: `${os.uptime()} seconds`,
TotalMemory: `${os.totalmem() / 1024 / 1024} MB`,
FreeMemory: `${os.freemem() / 1024 / 1024} MB`,
CPUCount: `${os.cpus().length}`,
Expand Down
Loading

0 comments on commit 02326c9

Please sign in to comment.