Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: connect to chainhook #1

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
node_modules
.env
.git-info
tmp/
20 changes: 20 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
{
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Run: writeonly",
"runtimeArgs": [
"-r",
"ts-node/register"
],
"args": [
"${workspaceFolder}/src/index.ts"
],
"outputCapture": "std",
"internalConsoleOptions": "openOnSessionStart",
"env": {
"NODE_ENV": "development",
"TS_NODE_SKIP_IGNORE": "true",
"RUN_MODE": "writeonly"
},
"killBehavior": "polite",
},
{
"name": "Launch Program",
"type": "node",
Expand Down
9 changes: 4 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"@fastify/swagger": "^7.6.1",
"@fastify/type-provider-typebox": "^3.2.0",
"@hirosystems/api-toolkit": "^1.7.1",
"@hirosystems/chainhook-client": "^2.0.0",
"@hirosystems/chainhook-client": "^2.1.0",
"@sinclair/typebox": "^0.28.17",
"@stacks/transactions": "^6.1.0",
"@types/node": "^20.16.1",
Expand Down
58 changes: 26 additions & 32 deletions src/chainhook/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,37 @@ export async function startChainhookServer(args: { db: PgStore }): Promise<Chain
const blockHeight = await args.db.getChainTipBlockHeight();
logger.info(`ChainhookServer is at block ${blockHeight}`);

const network = ENV.NETWORK as 'mainnet' | 'testnet';
const predicates: EventObserverPredicate[] = [];
if (ENV.CHAINHOOK_AUTO_PREDICATE_REGISTRATION) {
const header = {
name: 'signer-monitor-api-blocks',
// predicates.push({
// name: 'signer-monitor-api-blocks',
// version: 1,
// chain: 'stacks',
// networks: {
// [network]: {
// startBlock: 1,
// if_this: {
// scope: 'block_height',
// higher_than: 1,
// },
// }
// }
// });
predicates.push({
name: 'signer-monitor-api-messages',
version: 1,
chain: 'stacks',
};
switch (ENV.NETWORK) {
case 'mainnet':
predicates.push({
...header,
networks: {
mainnet: {
start_block: blockHeight,
if_this: {
scope: 'block_height',
higher_than: 1,
},
},
networks: {
[network]: {
startBlock: 1,
if_this: {
scope: 'signer_message',
after_timestamp: 1,
},
});
break;
case 'testnet':
predicates.push({
...header,
networks: {
testnet: {
start_block: blockHeight,
if_this: {
scope: 'block_height',
higher_than: 1,
},
},
},
});
break;
}
}
}
});
}

const observer: EventObserverOptions = {
Expand Down
85 changes: 30 additions & 55 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import { PgStore } from './pg/pg-store';
import { JobQueue } from './token-processor/queue/job-queue';
import { buildApiServer, buildPromServer } from './api/init';
import { TokenProcessorMetrics } from './token-processor/token-processor-metrics';
import { ENV } from './env';
import { buildAdminRpcServer } from './admin-rpc/init';
import { isProdEnv } from './api/util/helpers';
import { buildProfilerServer, logger, registerShutdownConfig } from '@hirosystems/api-toolkit';
import { isProdEnv, logger, registerShutdownConfig } from '@hirosystems/api-toolkit';
import { closeChainhookServer, startChainhookServer } from './chainhook/server';

/**
Expand All @@ -15,16 +10,6 @@ import { closeChainhookServer, startChainhookServer } from './chainhook/server';
async function initBackgroundServices(db: PgStore) {
logger.info('Initializing background services...');

const jobQueue = new JobQueue({ db });
registerShutdownConfig({
name: 'Job Queue',
forceKillable: false,
handler: async () => {
await jobQueue.stop();
},
});
if (ENV.JOB_QUEUE_AUTO_START) jobQueue.start();

const server = await startChainhookServer({ db });
registerShutdownConfig({
name: 'Chainhook Server',
Expand All @@ -33,47 +18,37 @@ async function initBackgroundServices(db: PgStore) {
await closeChainhookServer(server);
},
});

const adminRpcServer = await buildAdminRpcServer({ db, jobQueue });
registerShutdownConfig({
name: 'Admin RPC Server',
forceKillable: false,
handler: async () => {
await adminRpcServer.close();
},
});
await adminRpcServer.listen({ host: ENV.API_HOST, port: ENV.ADMIN_RPC_PORT });
}

/**
* Initializes API service. Only for `default` and `readonly` run modes.
* @param db - PgStore
*/
async function initApiService(db: PgStore) {
logger.info('Initializing API service...');
const apiServer = await buildApiServer({ db });
registerShutdownConfig({
name: 'API Server',
forceKillable: false,
handler: async () => {
await apiServer.close();
},
});
// logger.info('Initializing API service...');
// const apiServer = await buildApiServer({ db });
// registerShutdownConfig({
// name: 'API Server',
// forceKillable: false,
// handler: async () => {
// await apiServer.close();
// },
// });

await apiServer.listen({ host: ENV.API_HOST, port: ENV.API_PORT });
// await apiServer.listen({ host: ENV.API_HOST, port: ENV.API_PORT });

if (isProdEnv) {
const promServer = await buildPromServer({ metrics: apiServer.metrics });
registerShutdownConfig({
name: 'Prometheus Server',
forceKillable: false,
handler: async () => {
await promServer.close();
},
});
// const promServer = await buildPromServer({ metrics: apiServer.metrics });
// registerShutdownConfig({
// name: 'Prometheus Server',
// forceKillable: false,
// handler: async () => {
// await promServer.close();
// },
// });

TokenProcessorMetrics.configure(db);
await promServer.listen({ host: ENV.API_HOST, port: ENV.PROMETHEUS_PORT });
// TokenProcessorMetrics.configure(db);
// await promServer.listen({ host: ENV.API_HOST, port: ENV.PROMETHEUS_PORT });
}
}

Expand All @@ -88,15 +63,15 @@ async function initApp() {
await initApiService(db);
}

const profilerServer = await buildProfilerServer();
registerShutdownConfig({
name: 'Profiler Server',
forceKillable: false,
handler: async () => {
await profilerServer.close();
},
});
await profilerServer.listen({ host: ENV.API_HOST, port: ENV.PROFILER_PORT });
// const profilerServer = await buildProfilerServer();
// registerShutdownConfig({
// name: 'Profiler Server',
// forceKillable: false,
// handler: async () => {
// await profilerServer.close();
// },
// });
// await profilerServer.listen({ host: ENV.API_HOST, port: ENV.PROFILER_PORT });

registerShutdownConfig({
name: 'DB',
Expand Down
63 changes: 32 additions & 31 deletions src/pg/chainhook/chainhook-pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,38 @@ type TodoStacksEvent = StacksEvent & {
export class ChainhookPgStore extends BasePgStoreModule {
async processPayload(payload: StacksPayload): Promise<void> {
await this.sqlWriteTransaction(async sql => {
for (const block of payload.rollback) {
logger.info(`ChainhookPgStore rollback block ${block.block_identifier.index}`);
const time = stopwatch();
await this.updateStacksBlock(sql, block, 'rollback');
logger.info(
`ChainhookPgStore rollback block ${
block.block_identifier.index
} finished in ${time.getElapsedSeconds()}s`
);
}
if (payload.rollback.length) {
const earliestRolledBack = Math.min(...payload.rollback.map(r => r.block_identifier.index));
await this.updateChainTipBlockHeight(earliestRolledBack - 1);
}
for (const block of payload.apply) {
if (block.block_identifier.index <= (await this.getLastIngestedBlockHeight())) {
logger.info(
`ChainhookPgStore skipping previously ingested block ${block.block_identifier.index}`
);
continue;
}
logger.info(`ChainhookPgStore apply block ${block.block_identifier.index}`);
const time = stopwatch();
await this.updateStacksBlock(sql, block, 'apply');
await this.updateChainTipBlockHeight(block.block_identifier.index);
logger.info(
`ChainhookPgStore apply block ${
block.block_identifier.index
} finished in ${time.getElapsedSeconds()}s`
);
}
logger.info({payload: payload}, 'Received payload');
// for (const block of payload.rollback) {
// logger.info(`ChainhookPgStore rollback block ${block.block_identifier.index}`);
// const time = stopwatch();
// await this.updateStacksBlock(sql, block, 'rollback');
// logger.info(
// `ChainhookPgStore rollback block ${
// block.block_identifier.index
// } finished in ${time.getElapsedSeconds()}s`
// );
// }
// if (payload.rollback.length) {
// const earliestRolledBack = Math.min(...payload.rollback.map(r => r.block_identifier.index));
// await this.updateChainTipBlockHeight(earliestRolledBack - 1);
// }
// for (const block of payload.apply) {
// if (block.block_identifier.index <= (await this.getLastIngestedBlockHeight())) {
// logger.info(
// `ChainhookPgStore skipping previously ingested block ${block.block_identifier.index}`
// );
// continue;
// }
// logger.info(`ChainhookPgStore apply block ${block.block_identifier.index}`);
// const time = stopwatch();
// await this.updateStacksBlock(sql, block, 'apply');
// await this.updateChainTipBlockHeight(block.block_identifier.index);
// logger.info(
// `ChainhookPgStore apply block ${
// block.block_identifier.index
// } finished in ${time.getElapsedSeconds()}s`
// );
// }
});
}

Expand Down