Skip to content

Commit

Permalink
add reqId so that all logs for the same request can be traced togethe…
Browse files Browse the repository at this point in the history
…r in DataDog's Log Explorer (#818)

Added a uuid reqId so that all the log statements for each request are
traced together with the same id.
  • Loading branch information
tak-hntlabs authored Aug 20, 2024
1 parent 99e7633 commit 88c9eae
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 49 deletions.
2 changes: 2 additions & 0 deletions packages/stream-metadata/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
"magic-bytes.js": "^1.10.0",
"pino": "^8.17.1",
"pino-pretty": "^10.2.3",
"uuid": "^8.3.2",
"zod": "^3.21.4"
},
"devDependencies": {
"@river-build/eslint-config": "workspace:^",
"@river-build/prettier-config": "workspace:^",
"@types/node": "^20.5.0",
"@types/uuid": "^10.0.0",
"@typescript-eslint/parser": "^7.14.1",
"esbuild": "^0.21.5",
"eslint": "^8.53.0",
Expand Down
10 changes: 8 additions & 2 deletions packages/stream-metadata/src/logger.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { pino } from 'pino'
import { FastifyBaseLogger } from 'fastify'

import { config } from './environment'

Expand All @@ -15,5 +16,10 @@ const baseLogger = pino({
level: config.log.level,
})

export const getLogger = (name: string, meta: Record<string, unknown> = {}) =>
baseLogger.child({ name, ...meta })
export function getLogger(name: string, meta: Record<string, unknown> = {}) {
return baseLogger.child({ name, ...meta })
}

export function getFunctionLogger(logger: FastifyBaseLogger, functionName: string) {
return logger.child({ functionName })
}
37 changes: 16 additions & 21 deletions packages/stream-metadata/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Server as HTTPSServer } from 'https'

import Fastify, { FastifyInstance } from 'fastify'
import cors from '@fastify/cors'
import { v4 as uuidv4 } from 'uuid'

import { config } from './environment'
import { getLogger } from './logger'
Expand Down Expand Up @@ -39,6 +40,13 @@ export type Server = FastifyInstance<

const server = Fastify({
logger,
genReqId: () => uuidv4(),
})

server.addHook('onRequest', (request, reply, done) => {
const reqId = request.id // Use Fastify's generated reqId, which is now a UUID
request.log = request.log.child({ reqId })
done()
})

async function registerPlugins() {
Expand All @@ -53,43 +61,30 @@ function setupRoutes() {
/*
* Routes
*/
server.get('/health', async (request, reply) => {
logger.info(`GET /health`)
return checkHealth(request, reply)
})

server.get('/space/:spaceAddress', async (request, reply) => {
const { spaceAddress } = request.params as { spaceAddress?: string }
logger.info({ spaceAddress }, 'GET /space/../metadata')

const { protocol, serverAddress } = getServerInfo()
return fetchSpaceMetadata(request, reply, `${protocol}://${serverAddress}`)
})

server.get('/space/:spaceAddress/image', async (request, reply) => {
const { spaceAddress } = request.params as { spaceAddress?: string }
logger.info({ spaceAddress }, 'GET /space/../image')

return fetchSpaceImage(request, reply)
})
server.get('/health', checkHealth)
server.get('/space/:spaceAddress', async (request, reply) =>
fetchSpaceMetadata(request, reply, getServerUrl()),
)
server.get('/space/:spaceAddress/image', fetchSpaceImage)

// Generic / route to return 404
server.get('/', async (request, reply) => {
request.log.info(`GET /`)
return reply.code(404).send('Not found')
})
}

/*
* Start the server
*/
function getServerInfo() {
function getServerUrl() {
const addressInfo = server.server.address()
const protocol = server.server instanceof HTTPSServer ? 'https' : 'http'
const serverAddress =
typeof addressInfo === 'string'
? addressInfo
: `${addressInfo?.address}:${addressInfo?.port}`
return { protocol, serverAddress }
return `${protocol}://${serverAddress}`
}

process.on('SIGTERM', async () => {
Expand Down
33 changes: 21 additions & 12 deletions packages/stream-metadata/src/riverStreamRpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ import {
import { PromiseClient, createPromiseClient } from '@connectrpc/connect'
import { StreamService } from '@river-build/proto'
import { filetypemime } from 'magic-bytes.js'
import { FastifyBaseLogger } from 'fastify'

import { MediaContent, StreamIdHex } from './types'
import { getNodeForStream } from './streamRegistry'
import { getLogger } from './logger'

const logger = getLogger('riverStreamRpcClient')
import { getFunctionLogger } from './logger'

const clients = new Map<string, StreamRpcClient>()

const contentCache: Record<string, MediaContent | undefined> = {}

export type StreamRpcClient = PromiseClient<typeof StreamService> & { url?: string }

function makeStreamRpcClient(url: string): StreamRpcClient {
logger.info({ url }, 'makeStreamRpcClient: Connecting')
function makeStreamRpcClient(log: FastifyBaseLogger, url: string): StreamRpcClient {
const logger = getFunctionLogger(log, 'makeStreamRpcClient')
logger.info({ url }, 'Connecting')

const options: ConnectTransportOptions = {
baseUrl: url,
Expand All @@ -36,11 +36,12 @@ function makeStreamRpcClient(url: string): StreamRpcClient {
return client
}

async function getStreamClient(streamId: `0x${string}`) {
const node = await getNodeForStream(streamId)
async function getStreamClient(log: FastifyBaseLogger, streamId: `0x${string}`) {
const logger = getFunctionLogger(log, 'getStreamClient')
const node = await getNodeForStream(logger, streamId)
let url = node?.url
if (!clients.has(url)) {
const client = makeStreamRpcClient(url)
const client = makeStreamRpcClient(logger, url)
clients.set(client.url!, client)
url = client.url!
}
Expand Down Expand Up @@ -74,10 +75,12 @@ function streamViewFromUnpackedResponse(
}

async function mediaContentFromStreamView(
log: FastifyBaseLogger,
streamView: StreamStateView,
secret: Uint8Array,
iv: Uint8Array,
): Promise<MediaContent> {
const logger = getFunctionLogger(log, 'mediaContentFromStreamView')
const mediaInfo = streamView.mediaContent.info
if (!mediaInfo) {
logger.error(
Expand Down Expand Up @@ -147,8 +150,12 @@ function stripHexPrefix(hexString: string): string {
return hexString
}

export async function getStream(streamId: string): Promise<StreamStateView> {
const result = await getStreamClient(`0x${streamId}`)
export async function getStream(
log: FastifyBaseLogger,
streamId: string,
): Promise<StreamStateView> {
const logger = getFunctionLogger(log, 'getStream')
const result = await getStreamClient(logger, `0x${streamId}`)
const client = result.client
const lastMiniblockNum = result.lastMiniblockNum

Expand Down Expand Up @@ -184,10 +191,12 @@ export async function getStream(streamId: string): Promise<StreamStateView> {
}

export async function getMediaStreamContent(
log: FastifyBaseLogger,
fullStreamId: StreamIdHex,
secret: Uint8Array,
iv: Uint8Array,
): Promise<MediaContent | { data: null; mimeType: null }> {
const logger = getFunctionLogger(log, 'getMediaStreamContent')
const toHexString = (byteArray: Uint8Array) => {
return Array.from(byteArray, (byte) => byte.toString(16).padStart(2, '0')).join('')
}
Expand All @@ -202,14 +211,14 @@ export async function getMediaStreamContent(
*/

const streamId = stripHexPrefix(fullStreamId)
const sv = await getStream(streamId)
const sv = await getStream(logger, streamId)

if (!sv) {
logger.error({ streamId }, 'Failed to get stream')
throw new Error(`Failed to get stream ${streamId}`)
}

const result = await mediaContentFromStreamView(sv, secret, iv)
const result = await mediaContentFromStreamView(logger, sv, secret, iv)

// Cache the result
const concatenatedString = `${fullStreamId}${secretHex}${ivHex}`
Expand Down
5 changes: 2 additions & 3 deletions packages/stream-metadata/src/routes/health.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { FastifyReply, FastifyRequest } from 'fastify'

import { getLogger } from '../logger'
import { getRiverRegistry } from '../evmRpcClient'

const logger = getLogger('handleHealthCheckRequest')
import { getFunctionLogger } from '../logger'

export async function checkHealth(request: FastifyRequest, reply: FastifyReply) {
const logger = getFunctionLogger(request.log, 'checkHealth')
// Do a health check on the river registry
try {
await getRiverRegistry().getAllNodes()
Expand Down
18 changes: 11 additions & 7 deletions packages/stream-metadata/src/routes/spaceImage.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import { FastifyReply, FastifyRequest } from 'fastify'
import { FastifyBaseLogger, FastifyReply, FastifyRequest } from 'fastify'
import { ChunkedMedia } from '@river-build/proto'
import { StreamPrefix, StreamStateView, makeStreamId } from '@river-build/sdk'

import { StreamIdHex } from '../types'
import { getMediaStreamContent, getStream } from '../riverStreamRpcClient'
import { isBytes32String, isValidEthereumAddress } from '../validators'
import { getLogger } from '../logger'

const logger = getLogger('handleImageRequest')
import { getFunctionLogger } from '../logger'

export async function fetchSpaceImage(request: FastifyRequest, reply: FastifyReply) {
const logger = getFunctionLogger(request.log, 'fetchSpaceImage')
const { spaceAddress } = request.params as { spaceAddress?: string }

if (!spaceAddress) {
Expand All @@ -27,7 +26,7 @@ export async function fetchSpaceImage(request: FastifyRequest, reply: FastifyRep
let stream: StreamStateView | undefined
try {
const streamId = makeStreamId(StreamPrefix.Space, spaceAddress)
stream = await getStream(streamId)
stream = await getStream(logger, streamId)
} catch (error) {
logger.error(
{
Expand Down Expand Up @@ -58,7 +57,7 @@ export async function fetchSpaceImage(request: FastifyRequest, reply: FastifyRep
let key: Uint8Array | undefined
let iv: Uint8Array | undefined
try {
const { key: _key, iv: _iv } = getEncryption(spaceImage)
const { key: _key, iv: _iv } = getEncryption(logger, spaceImage)
key = _key
iv = _iv
if (key?.length === 0 || iv?.length === 0) {
Expand Down Expand Up @@ -89,6 +88,7 @@ export async function fetchSpaceImage(request: FastifyRequest, reply: FastifyRep
let mimeType: string | null
try {
const { data: _data, mimeType: _mimType } = await getMediaStreamContent(
logger,
fullStreamId,
key,
iv,
Expand Down Expand Up @@ -132,7 +132,11 @@ async function getSpaceImage(streamView: StreamStateView): Promise<ChunkedMedia
return spaceImage
}

function getEncryption(chunkedMedia: ChunkedMedia): { key: Uint8Array; iv: Uint8Array } {
function getEncryption(
log: FastifyBaseLogger,
chunkedMedia: ChunkedMedia,
): { key: Uint8Array; iv: Uint8Array } {
const logger = getFunctionLogger(log, 'getEncryption')
switch (chunkedMedia.encryption.case) {
case 'aesgcm': {
const key = new Uint8Array(chunkedMedia.encryption.value.secretKey)
Expand Down
11 changes: 9 additions & 2 deletions packages/stream-metadata/src/routes/spaceMetadata.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import { FastifyRequest, FastifyReply } from 'fastify'

import { isValidEthereumAddress } from '../validators'
import { getFunctionLogger } from '../logger'

export function fetchSpaceMetadata(request: FastifyRequest, reply: FastifyReply, baseUrl: string) {
export function fetchSpaceMetadata(
request: FastifyRequest,
reply: FastifyReply,
serverUrl: string,
) {
const logger = getFunctionLogger(request.log, 'fetchSpaceMetadata')
const { spaceAddress } = request.params as { spaceAddress?: string }
logger.info({ spaceAddress }, 'GET /space/../metadata')

if (!spaceAddress) {
return reply
Expand All @@ -23,7 +30,7 @@ export function fetchSpaceMetadata(request: FastifyRequest, reply: FastifyReply,
description: '....',
members: 99999,
fees: '0.001 eth',
image: `${baseUrl}/space/${spaceAddress}/image`,
image: `${serverUrl}/space/${spaceAddress}/image`,
}

return reply.header('Content-Type', 'application/json').send(dummyJson)
Expand Down
6 changes: 4 additions & 2 deletions packages/stream-metadata/src/streamRegistry.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { BigNumber } from 'ethers'
import { FastifyBaseLogger } from 'fastify'

import { StreamIdHex } from './types'
import { getLogger } from './logger'
import { getRiverRegistry } from './evmRpcClient'
import { getFunctionLogger } from './logger'

type CachedStreamData = {
url: string
Expand All @@ -11,12 +12,13 @@ type CachedStreamData = {
}

const cache: Record<string, CachedStreamData> = {}
const logger = getLogger('streamRegistry')

// TODO: remove this entire file
export async function getNodeForStream(
log: FastifyBaseLogger,
streamId: StreamIdHex,
): Promise<{ url: string; lastMiniblockNum: BigNumber }> {
const logger = getFunctionLogger(log, 'getNodeForStream')
logger.info({ streamId }, 'getNodeForStream')

const now = Date.now()
Expand Down
9 changes: 9 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3845,6 +3845,7 @@ __metadata:
"@river-build/sdk": "workspace:^"
"@river-build/web3": "workspace:^"
"@types/node": ^20.5.0
"@types/uuid": ^10.0.0
"@typescript-eslint/parser": ^7.14.1
dotenv: ^16.4.5
esbuild: ^0.21.5
Expand All @@ -3859,6 +3860,7 @@ __metadata:
rollup: ^4.18.1
ts-node: ^10.9.1
typescript: ^5.1.6
uuid: ^8.3.2
zod: ^3.21.4
languageName: unknown
linkType: soft
Expand Down Expand Up @@ -5190,6 +5192,13 @@ __metadata:
languageName: node
linkType: hard

"@types/uuid@npm:^10.0.0":
version: 10.0.0
resolution: "@types/uuid@npm:10.0.0"
checksum: e3958f8b0fe551c86c14431f5940c3470127293280830684154b91dc7eb3514aeb79fe3216968833cf79d4d1c67f580f054b5be2cd562bebf4f728913e73e944
languageName: node
linkType: hard

"@types/yargs-parser@npm:*":
version: 21.0.0
resolution: "@types/yargs-parser@npm:21.0.0"
Expand Down

0 comments on commit 88c9eae

Please sign in to comment.