From 5f47edb8c6349e4a794b2c0a5ae9361823f9ffe6 Mon Sep 17 00:00:00 2001 From: vaultec <47548474+vaultec81@users.noreply.github.com> Date: Wed, 27 Mar 2024 13:58:40 -0700 Subject: [PATCH] refactor: structure vm runner into class structure --- src/services/new/vm/vm-runner.ts | 550 +++++++++++++++++++------------ 1 file changed, 335 insertions(+), 215 deletions(-) diff --git a/src/services/new/vm/vm-runner.ts b/src/services/new/vm/vm-runner.ts index e10c416..e89ae31 100644 --- a/src/services/new/vm/vm-runner.ts +++ b/src/services/new/vm/vm-runner.ts @@ -1,17 +1,15 @@ import * as IPFS from 'kubo-rpc-client' +import { Collection, MongoClient } from 'mongodb' import { addLink } from '../../../ipfs-utils/add-link' import { removeLink } from '../../../ipfs-utils/rm-link' import { ContractErrorType, instantiate } from './utils' - //Crypto imports import { ripemd160, sha256 } from 'bitcoinjs-lib/src/crypto' - const CID = IPFS.CID -const ipfs = IPFS.create({ url: process.env.IPFS_HOST || 'http://127.0.0.1:5001' }) - +const ipfs = IPFS.create({ url: process.env.IPFS_HOST || 'http://127.0.0.1:5001' }) export class WasmRunner { stateCache: Map @@ -224,246 +222,368 @@ export class WasmRunner { registerBindings() {} } -void (async () => { - console.log('init') - - let modules = {} - for(let [contract_id, code] of Object.entries(JSON.parse(process.env.modules))) { - const binaryData = await ipfs.block.get(IPFS.CID.parse(code)) - modules[contract_id] = await WebAssembly.compile(binaryData) +/** + * Container class for VM execution + */ +class VmRunner { + balanceDb: Collection + ledgerDb: Collection + + ledgerStack: any[] + outputStack: any[] + balanceSnapshots: Map + + state: any + modules: any + + constructor(args) { + this.state = args.state + this.modules = args.modules } - let state = {} - for(let [contract_id, stateCid] of Object.entries(JSON.parse(process.env.state))) { - const wasmRunner = new WasmRunner(); - const stateAccess = await wasmRunner.contractStateRaw(contract_id, stateCid) - state[contract_id] = { - wasmRunner, - stateAccess + async getBalanceSnapshot(account: string, block_height: number) { + const lastBalance = await this.balanceDb.findOne({ account: account }) + + const balanceTemplate = lastBalance + ? { + account: account, + tokens: { + HIVE: lastBalance.tokens.HIVE, + HBD: lastBalance.tokens.HBD, + }, + block_height: block_height, + } + : { + account: account, + tokens: { + HIVE: 0, + HBD: 0, + }, + block_height: block_height, + } + + const hiveDeposits = await this.ledgerDb + .find( + { + unit: 'HIVE', + from: account, + }, + { + sort: { + block_height: 1, + }, + }, + ) + .toArray() + + const hbdDeposits = await this.ledgerDb + .find( + { + unit: 'HBD', + from: account, + }, + { + sort: { + block_height: 1, + }, + }, + ) + .toArray() + + const hiveAmount = hiveDeposits + .map((e) => e.amount) + .reduce((acc, cur) => { + return acc + cur + }, balanceTemplate.tokens.HIVE) + + const hbdAmount = hbdDeposits + .map((e) => e.amount) + .reduce((acc, cur) => { + return acc + cur + }, balanceTemplate.tokens.HBD) + + return { + account: account, + tokens: { + HIVE: hiveAmount, + HBD: hbdAmount, + }, + block_height: block_height, } } - - - process.send({ - type: 'ready', - }) - process.on('message', async (message: any) => { - if (message.type === 'call') { - const contract_id = message.contract_id - const memory = new WebAssembly.Memory({ - initial: 10, - maximum: 128, - }) - - let IOGas = 0; - let error; - const logs = [] - const {wasmRunner, stateAccess} = state[contract_id] - - const contractEnv = { - 'block.included_in': null, - 'sender.id': null, - 'sender.type': null + + /** + * Init should only be called once + */ + async init() { + // const connection = new MongoClient(process.env.MONGO_URI) + // await connection.connect() + // const db = connection.db('vsc-new') + // this.balanceDb = db.collection('bridge_balances') + // this.ledgerDb = db.collection('bridge_ledeger') + + let modules = {} + for (let [contract_id, code] of Object.entries(this.modules)) { + const binaryData = await ipfs.block.get(IPFS.CID.parse(code)) + modules[contract_id] = await WebAssembly.compile(binaryData) + } + + let state = {} + for (let [contract_id, stateCid] of Object.entries(this.state)) { + const wasmRunner = new WasmRunner() + const stateAccess = await wasmRunner.contractStateRaw(contract_id, stateCid) + state[contract_id] = { + wasmRunner, + stateAccess, } + } + this.state = state; + this.modules = modules; + } + + /** + * Executes a smart contract operation + */ + async executeCall(args: { contract_id: string; action: string; payload: string }) { + const contract_id = args.contract_id + const memory = new WebAssembly.Memory({ + initial: 10, + maximum: 128, + }) + + let IOGas = 0 + let error + const logs = [] + const { wasmRunner, stateAccess } = this.state[contract_id] + + const contractEnv = { + 'block.included_in': null, + 'sender.id': null, + 'sender.type': null, + } + + /** + * Contract System calls + */ + const contractCalls = { + 'crypto.sha256': (value) => { + return sha256(Buffer.from(value, 'hex')).toString('hex') + }, + 'crypto.ripemd160': (value) => { + return ripemd160(Buffer.from(value, 'hex')).toString('hex') + }, + } - /** - * Contract System calls - */ - const contractCalls = { - 'crypto.sha256': (value) => { - return sha256(Buffer.from(value, 'hex')).toString('hex') + try { + const insta = await instantiate(this.modules[contract_id], { + env: { + memory, + abort(msg, file, line, colm) { + error = { + msg: insta.exports.__getString(msg), + file: insta.exports.__getString(file), + line, + colm, + } + }, + //Prevent AS loader from allowing any non-deterministic data in. + //TODO: Load in VRF seed for use in contract + seed: () => { + return 0 + }, }, - 'crypto.ripemd160': (value) => { - return ripemd160(Buffer.from(value, 'hex')).toString('hex') - } - } + //Same here + Date: {}, + Math: {}, + sdk: { + 'console.log': (keyPtr) => { + const logMsg = (insta as any).exports.__getString(keyPtr) + logs.push(logMsg) + IOGas = IOGas + logMsg.length + }, + 'console.logNumber': (val) => { + logs.push(val) + }, + 'console.logBool': (val) => { + logs.push(Boolean(val)) + }, + 'db.setObject': (keyPtr, valPtr) => { + const key = (insta as any).exports.__getString(keyPtr) + const val = (insta as any).exports.__getString(valPtr) - try { - const insta = await instantiate(modules[contract_id], { - env: { - memory, - abort(msg, file, line, colm) { - - error = { - msg: insta.exports.__getString(msg), - file: insta.exports.__getString(file), - line, - colm - } - }, - //Prevent AS loader from allowing any non-deterministic data in. - //TODO: Load in VRF seed for use in contract - seed: () => { - return 0; - }, + IOGas = IOGas + key.length + val.length + + wasmRunner.stateCache.set(key, val) + return 1 }, - //Same here - Date: {}, - Math: {}, - sdk: { - 'console.log': (keyPtr) => { - const logMsg = (insta as any).exports.__getString(keyPtr) - logs.push(logMsg) - IOGas = IOGas + logMsg.length - }, - 'console.logNumber': (val) => { - logs.push(val) - }, - 'console.logBool': (val) => { - logs.push(Boolean(val)) - }, - 'db.setObject': (keyPtr, valPtr) => { - const key = (insta as any).exports.__getString(keyPtr) - const val = (insta as any).exports.__getString(valPtr) - - IOGas = IOGas + key.length + val.length - - - wasmRunner.stateCache.set(key, val) - return 1 - }, - 'db.getObject': async (keyPtr) => { - const key = (insta as any).exports.__getString(keyPtr) - let value; - if(wasmRunner.stateCache.has(key)) { - value = wasmRunner.stateCache.get(key) - } else { - value = await stateAccess.client.pull(key) - wasmRunner.stateCache.set(key, value) - } - - const val = JSON.stringify(value) - - IOGas = IOGas + val.length; // Total serialized length of gas - - - return insta.exports.__newString(val) - }, - 'db.delObject': async (keyPtr) => { - const key = (insta as any).exports.__getString(keyPtr) - wasmRunner.stateCache.set(key, null) - }, - 'system.call': async (callPtr, valPtr) => { - const callArg = insta.exports.__getString(callPtr) - const valArg = JSON.parse(insta.exports.__getString(valPtr)) - let resultData; - if(typeof contractCalls[callArg] === 'function') { - resultData = JSON.stringify({ - result: contractCalls[callArg](valArg.arg0) - }) - } else { - resultData = JSON.stringify({ - err: 'INVALID_CALL' - }) - } + 'db.getObject': async (keyPtr) => { + const key = (insta as any).exports.__getString(keyPtr) + let value + if (wasmRunner.stateCache.has(key)) { + value = wasmRunner.stateCache.get(key) + } else { + value = await stateAccess.client.pull(key) + wasmRunner.stateCache.set(key, value) + } - return insta.exports.__newString(resultData); - }, - 'system.getEnv': async (envPtr) => { - const envArg = insta.exports.__getString(envPtr) - - return insta.exports.__newString(contractEnv[envArg]) + const val = JSON.stringify(value) + + IOGas = IOGas + val.length // Total serialized length of gas + + return insta.exports.__newString(val) + }, + 'db.delObject': async (keyPtr) => { + const key = (insta as any).exports.__getString(keyPtr) + wasmRunner.stateCache.set(key, null) + }, + 'system.call': async (callPtr, valPtr) => { + const callArg = insta.exports.__getString(callPtr) + const valArg = JSON.parse(insta.exports.__getString(valPtr)) + let resultData + if (typeof contractCalls[callArg] === 'function') { + resultData = JSON.stringify({ + result: contractCalls[callArg](valArg.arg0), + }) + } else { + resultData = JSON.stringify({ + err: 'INVALID_CALL', + }) } + + return insta.exports.__newString(resultData) }, - } as any) - + 'system.getEnv': async (envPtr) => { + const envArg = insta.exports.__getString(envPtr) - if(!insta.instance.exports[message.action]) { - process.send({ - type: 'execute-stop', - ret: null, - errorType: ContractErrorType.INVALID_ACTION, - logs, - reqId: message.reqId, - IOGas: 0, - }) - return; + return insta.exports.__newString(contractEnv[envArg]) + }, + }, + } as any) + + if (!insta.instance.exports[args.action]) { + return { + type: 'execute-stop', + ret: null, + errorType: ContractErrorType.INVALID_ACTION, + logs, + // reqId: message.reqId, + IOGas: 0, } - let ptr; - try { - ptr = await (insta.instance.exports[message.action] as any)( - (insta as any).exports.__newString(message.payload), - ) + return + } + let ptr + try { + ptr = await (insta.instance.exports[args.action] as any)( + (insta as any).exports.__newString(args.payload), + ) - const str = (insta as any).exports.__getString(ptr) - process.send({ + const str = (insta as any).exports.__getString(ptr) + process.send({ + type: 'execute-stop', + ret: str, + logs, + // reqId: message.reqId, + IOGas, + }) + } catch (ex) { + if (ex.name === 'RuntimeError' && ex.message === 'unreachable') { + console.log(`RuntimeError: unreachable ${JSON.stringify(error)}`, error) + return { type: 'execute-stop', - ret: str, + ret: null, + error: error, + errorType: ContractErrorType.RUNTIME_EXCEPTION, logs, - reqId: message.reqId, + // reqId: message.reqId, IOGas, - }) - } catch (ex) { - if(ex.name === "RuntimeError" && ex.message === "unreachable") { - console.log(`RuntimeError: unreachable ${JSON.stringify(error)}`, error) - process.send({ - type: 'execute-stop', - ret: null, - error: error, - errorType: ContractErrorType.RUNTIME_EXCEPTION, - logs, - reqId: message.reqId, - IOGas, - }); - } else { - console.log(ex) - process.send({ - type: 'execute-stop', - ret: null, - errorType: ContractErrorType.RUNTIME_UNKNOWN, - logs, - reqId: message.reqId, - IOGas, - }); } - } - - } catch (ex) { - console.log('failed runtime setup', ex) - process.send({ + } else { + return { type: 'execute-stop', ret: null, + errorType: ContractErrorType.RUNTIME_UNKNOWN, logs, - errorType: ContractErrorType.RUNTIME_SETUP, - reqId: message.reqId, + // reqId: message.reqId, IOGas, - }) + } } + } + } catch (ex) { + console.log('failed runtime setup', ex) + return { + type: 'execute-stop', + ret: null, + logs, + errorType: ContractErrorType.RUNTIME_SETUP, + // reqId: message.reqId, + IOGas, + } } + } - //Finalization when VM is done - if(message.type === "finish") { - let entries = Object.entries<{ - wasmRunner: any - stateAccess: any - }>(state) - for(let index in entries) { - const [contract_id, entry] = entries[index] - const {wasmRunner, stateAccess} = entry - for(let [key, value] of wasmRunner.stateCache.entries()) { - //Try catch safety - try { - if(value === null) { - //Assume deleted - await stateAccess.client.del(key) - } else { - await stateAccess.client.update(key, JSON.parse(value)) - } - } catch { - + async *finish() { + let entries = Object.entries<{ + wasmRunner: any + stateAccess: any + }>(this.state) + for (let index in entries) { + const [contract_id, entry] = entries[index] + const { wasmRunner, stateAccess } = entry + for (let [key, value] of wasmRunner.stateCache.entries()) { + //Try catch safety + try { + if (value === null) { + //Assume deleted + await stateAccess.client.del(key) + } else { + await stateAccess.client.update(key, JSON.parse(value)) } - } - console.log('sending result') - process.send({ - type: 'partial-result', - contract_id, - index, - stateMerkle: stateAccess.finish().stateMerkle.toString(), - }) + } catch {} } + console.log('sending result') + yield { + type: 'partial-result', + contract_id, + index, + stateMerkle: stateAccess.finish().stateMerkle.toString(), + } + } + yield { + type: 'finish-result', + } + } +} + +void (async () => { + const vmRunner = new VmRunner({ + state: JSON.parse(process.env.state), + modules: JSON.parse(process.env.modules), + }) + + await vmRunner.init() + + process.send({ + type: 'ready', + }) + + process.on('message', async (message: any) => { + if (message.type === 'call') { + const executeResult = await vmRunner.executeCall({ + contract_id: message.contract_id, + payload: message.payload, + action: message.action + }) process.send({ - type: 'finish-result', + ...executeResult, + reqId: message.reqId, }) } + + //Finalization when VM is done + if (message.type === 'finish') { + for await (let result of vmRunner.finish()) { + process.send(result) + } + } }) -})() \ No newline at end of file +})()