Skip to content

Commit

Permalink
Kerem/fetch service logging (#729)
Browse files Browse the repository at this point in the history
this pr introduces a custom pino logger and removes all references to
console

Authors: Kerem + Tak

---------

Co-authored-by: Tak Wai Wong <[email protected]>
  • Loading branch information
mechanical-turk and tak-hntlabs authored Aug 14, 2024
1 parent 1cb0ff9 commit 74e4556
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 65 deletions.
8 changes: 5 additions & 3 deletions packages/stream-metadata/esbuild.config.mjs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import { build } from "esbuild";
import esbuildPluginPino from "esbuild-plugin-pino";

build({
bundle: true,
entryPoints: ["./src/node.ts"],
entryPoints: { node_esbuild: "./src/node.ts" }, // Rename the entry point to control the output file name
format: "cjs",
logLevel: "info",
loader: {
".ts": "ts",
".wasm": "file",
},
outfile: "dist/node_esbuild.cjs",
outExtension: { ".js": ".cjs" },
outdir: "dist",
outExtension: { ".js": ".cjs" }, // Ensure the output file has .cjs extension
platform: "node",
plugins: [esbuildPluginPino({ transports: ["pino-pretty"] })],
sourcemap: "inline",
target: "es2022",
}).catch((e) => {
Expand Down
3 changes: 3 additions & 0 deletions packages/stream-metadata/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
"@river-build/sdk": "workspace:^",
"@river-build/web3": "workspace:^",
"dotenv": "^16.4.5",
"esbuild-plugin-pino": "^2.2.0",
"ethers": "^5.7.2",
"fastify": "^4.28.1",
"magic-bytes.js": "^1.10.0",
"pino": "^8.17.1",
"pino-pretty": "^10.2.3",
"zod": "^3.21.4"
},
"devDependencies": {
Expand Down
45 changes: 27 additions & 18 deletions packages/stream-metadata/src/environment.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import * as dotenv from 'dotenv'
import { getWeb3Deployment } from '@river-build/web3'
import { z } from 'zod'
import { getLogger } from './logger'

import { Config } from './types'
dotenv.config({
path: ['.env', '.env.local'],
})

const IntStringSchema = z.string().regex(/^[0-9]+$/)
const NumberFromIntStringSchema = IntStringSchema.transform((str) => parseInt(str, 10))
Expand All @@ -11,31 +14,37 @@ const envSchema = z.object({
RIVER_ENV: z.string(),
RIVER_CHAIN_RPC_URL: z.string().url(),
PORT: NumberFromIntStringSchema,
LOG_LEVEL: z.string().optional().default('info'),
LOG_PRETTY: z.boolean().optional().default(true),
})

dotenv.config({
path: ['.env', '.env.local'],
})
function makeConfig() {
// eslint-disable-next-line no-process-env -- this is the only line where we're allowed to use process.env
const env = envSchema.parse(process.env)
const web3Config = getWeb3Deployment(env.RIVER_ENV)

// eslint-disable-next-line no-process-env -- this is the only line where we're allowed to use process.env
const env = envSchema.parse(process.env)

export const config = makeConfig(env.RIVER_ENV, env.RIVER_CHAIN_RPC_URL, env.PORT)

function makeConfig(riverEnv: string, riverChainRpcUrl: string, port: number): Config {
const web3Config = getWeb3Deployment(riverEnv)
return {
...web3Config,
port,
riverEnv,
riverChainRpcUrl,
web3Config,
log: {
level: env.LOG_LEVEL,
pretty: env.LOG_PRETTY,
},
port: env.PORT,
riverEnv: env.RIVER_ENV,
riverChainRpcUrl: env.RIVER_CHAIN_RPC_URL,
}
}

console.log('config', {
export type Config = ReturnType<typeof makeConfig>

export const config = makeConfig()

const logger = getLogger('environment')

logger.info('config', {
riverEnv: config.riverEnv,
chainId: config.river.chainId,
chainId: config.web3Config.river.chainId,
port: config.port,
riverRegistry: config.river.addresses.riverRegistry,
riverRegistry: config.web3Config.river.addresses.riverRegistry,
riverChainRpcUrl: config.riverChainRpcUrl,
})
4 changes: 2 additions & 2 deletions packages/stream-metadata/src/evmRpcClient.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { RiverRegistry } from '@river-build/web3'
import { ethers } from 'ethers'
import { Config } from './types'
import { Config } from './environment'

let riverRegistry: ReturnType<typeof createRiverRegistry> | undefined

function createRiverRegistry(config: Config) {
const provider = new ethers.providers.JsonRpcProvider(config.riverChainRpcUrl)
const riverRegistry = new RiverRegistry(config.river, provider)
const riverRegistry = new RiverRegistry(config.web3Config.river, provider)

if (!riverRegistry) {
throw new Error('Failed to create river registry')
Expand Down
11 changes: 9 additions & 2 deletions packages/stream-metadata/src/handleImageRequest.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { FastifyReply, FastifyRequest } from 'fastify'
import { StreamPrefix, StreamStateView, makeStreamId } from '@river-build/sdk'
import { StreamPrefix, StreamStateView, makeStreamId, deriveKeyAndIV } from '@river-build/sdk'

import { getMediaStreamContent, getStream } from './riverStreamRpcClient'
import { isBytes32String, isValidEthereumAddress } from './validators'
import { getLogger } from './logger'

const logger = getLogger('handleImageRequest')

import { ChunkedMedia } from '@river-build/proto'
import { StreamIdHex } from './types'
Expand All @@ -27,7 +31,10 @@ export async function handleImageRequest(request: FastifyRequest, reply: Fastify
const streamId = makeStreamId(StreamPrefix.Space, spaceAddress)
stream = await getStream(config, streamId)
} catch (e) {
console.error(`Failed to get stream for space ${spaceAddress}: ${e}`)
logger.error(`Failed to get stream`, {
error: e,
spaceAddress,
})
return reply.code(404).send('Stream not found')
}

Expand Down
1 change: 1 addition & 0 deletions packages/stream-metadata/src/handleMetadataRequest.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { FastifyRequest, FastifyReply } from 'fastify'

import { isValidEthereumAddress } from './validators'

export function handleMetadataRequest(
Expand Down
18 changes: 18 additions & 0 deletions packages/stream-metadata/src/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { pino } from 'pino'
import { config } from './environment'

const pretty = {
target: 'pino-pretty',
options: {
colorize: true,
colorizeObjects: true,
},
}

const baseLogger = pino({
transport: config.log.pretty ? pretty : undefined,
level: config.log.level,
})

export const getLogger = (name: string, meta: Record<string, unknown> = {}) =>
baseLogger.child({ name, ...meta })
29 changes: 18 additions & 11 deletions packages/stream-metadata/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import cors from '@fastify/cors'
import { handleImageRequest } from './handleImageRequest'
import { handleMetadataRequest } from './handleMetadataRequest'
import { config } from './environment'
import { getLogger } from './logger'

// Set the process title to 'fetch-image' so it can be easily identified
// or killed with `pkill fetch-image`
process.title = 'fetch-image'
process.title = 'stream-metadata'

const logger = getLogger('server')

const server = Fastify({
logger: true,
logger,
})

async function registerPlugins() {
Expand All @@ -21,9 +24,9 @@ async function registerPlugins() {
origin: '*', // Allow any origin
methods: ['GET'], // Allowed HTTP methods
})
console.info('CORS registered successfully')
logger.info('CORS registered successfully')
} catch (err) {
console.error('Error registering CORS', err)
logger.error('Error registering CORS', err)
process.exit(1) // Exit the process if registration fails
}
}
Expand All @@ -32,15 +35,17 @@ registerPlugins()

server.get('/space/:spaceAddress', async (request, reply) => {
const { spaceAddress } = request.params as { spaceAddress?: string }
console.log(`GET /space/${spaceAddress}`)
logger.info(`GET /space`, { spaceAddress })

const { protocol, serverAddress } = getServerInfo()
return handleMetadataRequest(request, reply, `${protocol}://${serverAddress}`)
})

server.get('/space/:spaceAddress/image', async (request, reply) => {
const { spaceAddress } = request.params as { spaceAddress?: string }
console.log(`GET /space/${spaceAddress}/image`)
logger.info(`GET /space/../image`, {
spaceAddress,
})

return handleImageRequest(request, reply)
})
Expand Down Expand Up @@ -87,20 +92,22 @@ async function startServer(port: number) {
process.on('SIGTERM', async () => {
try {
await server.close()
console.log('Server closed gracefully')
logger.info('Server closed gracefully')
process.exit(0)
} catch (err) {
console.error('Error during server shutdown', err)
logger.info('Error during server shutdown', err)
process.exit(1)
}
})

// Start the server on the port set in the .env, or the next available port
startServer(config.port)
.then(() => {
console.log('Server started')
logger.info('Server started')
})
.catch((err) => {
console.error('Error starting server', err)
.catch((err: unknown) => {
logger.error('Error starting server', {
err,
})
process.exit(1)
})
42 changes: 31 additions & 11 deletions packages/stream-metadata/src/riverStreamRpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ import { PromiseClient, createPromiseClient } from '@connectrpc/connect'
import { BigNumber } from 'ethers'
import { StreamService } from '@river-build/proto'
import { filetypemime } from 'magic-bytes.js'

import { getNodeForStream } from './streamRegistry'
import { getLogger } from './logger'

const logger = getLogger('riverStreamRpcClient')

const clients = new Map<string, StreamRpcClient>()

Expand All @@ -22,7 +26,9 @@ const contentCache: Record<string, MediaContent | undefined> = {}
export type StreamRpcClient = PromiseClient<typeof StreamService> & { url?: string }

function makeStreamRpcClient(url: string): StreamRpcClient {
console.log(`makeStreamRpcClient: Connecting to url=${url}`)
logger.info(`makeStreamRpcClient: Connecting`, {
url,
})

const options: ConnectTransportOptions = {
baseUrl: url,
Expand All @@ -42,7 +48,7 @@ async function getStreamClient(config: Config, streamId: `0x${string}`) {
clients.set(client.url!, client)
url = client.url!
}
console.log(`getStreamClient: url=${url}`)
logger.info('getStreamClient: client url', url)

const client = clients.get(url)
if (!client) {
Expand Down Expand Up @@ -78,7 +84,9 @@ async function mediaContentFromStreamView(
): Promise<MediaContent> {
const mediaInfo = streamView.mediaContent.info
if (mediaInfo) {
console.log(`mediaContentFromStreamView: mediaInfo.spaceId=${mediaInfo.spaceId}`)
logger.info(`mediaContentFromStreamView`, {
spaceId: mediaInfo.spaceId,
})

// Aggregate data chunks into a single Uint8Array
const data = new Uint8Array(
Expand All @@ -96,7 +104,9 @@ async function mediaContentFromStreamView(
// Determine the MIME type
const mimeType = filetypemime(decrypted)
if (mimeType?.length > 0) {
console.log(`mediaContentFromStreamView: type=${JSON.stringify(mimeType[0])}`)
logger.info(`mediaContentFromStreamView`, {
mimeType,
})

// Return decrypted data and MIME type
return {
Expand Down Expand Up @@ -128,26 +138,33 @@ export async function getStream(
client = result.client
lastMiniblockNum = result.lastMiniblockNum
} catch (e) {
console.error(`Failed to get client for stream ${streamId}: ${e}`)
logger.error('Failed to get client for stream', {
err: e,
streamId,
})
return undefined
}

if (!client) {
console.error(`Failed to get client for stream ${streamId}`)
logger.error(`Failed to get client for stream`, { streamId })
return undefined
}

console.log(
`getStream: client=${client.url}; streamId=${streamId}; lastMiniblockNum=${lastMiniblockNum}`,
)
logger.info(`getStream`, {
clientUrl: client.url,
streamId,
lastMiniblockNum: lastMiniblockNum.toString(),
})

const start = Date.now()

const response = await client.getStream({
streamId: streamIdAsBytes(streamId),
})

console.log(`getStream: getStream took ${Date.now() - start}ms`)
logger.info(`getStream finished`, {
duration: Date.now() - start,
})

const unpackedResponse = await unpackStream(response.stream)
return streamViewFromUnpackedResponse(streamId, unpackedResponse)
Expand Down Expand Up @@ -183,7 +200,10 @@ export async function getMediaStreamContent(
try {
result = await mediaContentFromStreamView(sv, secret, iv)
} catch (e) {
console.error(`Failed to get media content for stream ${fullStreamId}: ${e}`)
logger.error(`Failed to get media content for stream`, {
err: e,
streamId: fullStreamId,
})
return { data: null, mimeType: null }
}

Expand Down
Loading

0 comments on commit 74e4556

Please sign in to comment.