Skip to content

Commit

Permalink
add a working version of fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
konnov committed May 17, 2024
1 parent 1ec07e5 commit d3a42de
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 45 deletions.
94 changes: 60 additions & 34 deletions solarkraft/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,64 +10,90 @@
*/

import { Horizon } from '@stellar/stellar-sdk'
import JSONbigint from 'json-bigint'
import { extractContractCall } from './fetcher/callDecoder.js'
import { loadFetcherState, saveContractCallEntry, saveFetcherState } from './fetcher/storage.js'

// how often to query for the latest synchronized height
const HEIGHT_FETCHING_PERIOD = 100

/**
* Fetch transactions from the ledger
* @param args the arguments parsed by yargs
*/
export async function fetch(args: any) {
const server = new Horizon.Server(args.rpc)

const contractId = args.id
console.log(`Target contract: ${contractId}...`)
console.log(`Fetching fresh transactions from: ${args.rpc}...`)
/*
// read the latest height cached from the latest invocation of fetch
const cachedHeight = 0
// fetch the actual height from the RPC endpoint
const currentHeight = 12345
let startHeight =
args.height < 0 ? currentHeight + args.height : args.height
startHeight = Math.max(startHeight, cachedHeight)
console.log(`| ledger | cached | start |`)
console.log(`| ${currentHeight} | ${cachedHeight} | ${startHeight} |\n`)
*/
const startHeight = args.height
let fetcherState = loadFetcherState(args.home)
const cachedHeight = fetcherState.heights.get(contractId, 1)
let lastHeight = args.height
console.log(`Last cached height: ${cachedHeight}`)
if (args.height < 0) {
// how to fetch the latest height?
console.log('not implemented yet, starting with 0')
lastHeight = 0
} else if (args.height === 0) {
lastHeight = cachedHeight
} else {
lastHeight = args.height
}

const DURATION = 30
const server = new Horizon.Server(args.rpc)
console.log(`Fetching the ledger for ${startHeight}`)
const response = await server.ledgers().ledger(startHeight).call()
//console.log(response)
console.log(`Fetching fresh transactions from: ${args.rpc}...`)

console.log(`Fetching the ledger for ${lastHeight}`)
const response = await server.ledgers().ledger(lastHeight).call()
const startCursor = (response as any).paging_token
// timeout id, if a timeout is set below
let timeout

// See:
// https://github.com/stellar/js-stellar-sdk/blob/master/test/integration/streaming_test.js
const finish = (err) => {
const done = (err) => {
// Closing handler:
// https://github.com/stellar/js-stellar-sdk/blob/master/test/integration/streaming_test.js
clearTimeout(timeout)
closeHandler()
console.error(err)
}

// TODO: work in progress
// the number of received events
let nEvents = 0

// initiate the streaming loop
const closeHandler = server
.operations()
.cursor(startCursor)
.stream({
onmessage: async (msg: any) => {
if (msg.transaction_successful === true) {
;(
await extractContractCall(
msg,
(id) => contractId === id
)
).map((e) => {
console.log(`call => ${JSONbigint.stringify(e)}`)
})
if (msg.transaction_successful) {
const callEntryMaybe =
await extractContractCall(msg, (id) => contractId === id)
if (callEntryMaybe.isJust()) {
const entry = callEntryMaybe.value
console.log(`+ save: ${entry.height}`)
saveContractCallEntry(args.home, entry)
}
} // else: shall we also store reverted transactions?

nEvents++
if (nEvents % HEIGHT_FETCHING_PERIOD === 0) {
// Fetch the height of the current message.
// Note that messages may come slightly out of order, so the heights are not precise.
const tx = await msg.transaction()
lastHeight = Math.max(lastHeight, tx.ledger_attr)
console.log(`= at: ${lastHeight}`)
// load and save the state, other fetchers may work concurrently
fetcherState = loadFetcherState(args.home)
fetcherState = { ...fetcherState, heights: fetcherState.heights.set(contractId, lastHeight) }
saveFetcherState(args.home, fetcherState)
}
},
onerror: finish,
onerror: done,
})

const timeout = setTimeout(finish, DURATION * 1000)
if (args.timeout > 0) {
console.log(`Fetching transactions for ${args.timeout} seconds.`)
timeout = setTimeout(done, args.timeout * 1000)
} else {
console.log('Fetching transactions indefinitely. Close with Ctrl-C.')
}
}
76 changes: 72 additions & 4 deletions solarkraft/src/fetcher/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import JSONbigint from 'json-bigint'
import { OrderedMap } from 'immutable'
import { join } from 'node:path/posix'
import { mkdirSync, readFileSync, writeFileSync } from 'node:fs'
import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs'

const JSONbig = JSONbigint({ useNativeBigInt: true })

Expand Down Expand Up @@ -68,13 +68,33 @@ export interface ContractCallEntry {
oldFields: FieldsMap
}

/**
* Serializable fetcher state.
*/
export interface FetcherState {
/**
* For every contract id, store the ledger height,
* up to which the transactions were fetched.
*/
heights: OrderedMap<string, number>
}

/**
* Given the solarkraft home, construct the path to store the transactions.
* @param solarkraftHome path to solarkraft home (or project directory)
* @returns path to the transactions storage
*/
export function storagePath(solarkraftHome: string): string {
return join(solarkraftHome, ".stor")
}

/**
* Store a contract call entry in the file storage.
* @param root the storage root directory
* @param home the storage root directory
* @param entry a call entry
*/
export function saveContractCallEntry(root: string, entry: ContractCallEntry) {
const filename = getEntryFilename(root, entry)
export function saveContractCallEntry(home: string, entry: ContractCallEntry) {
const filename = getEntryFilename(storagePath(home), entry)
// convert OrderedMaps to arrays
const simplified = {
...entry,
Expand Down Expand Up @@ -107,6 +127,42 @@ export function loadContractCallEntry(filename: string): ContractCallEntry {
}
}

/**
* Load fetcher state from the storage.
* @param root the storage root directory
* @returns the loaded state
*/
export function loadFetcherState(home: string): FetcherState {
const filename = getFetcherStateFilename(home)
if (!existsSync(filename)) {
// just return an empty map
return {
heights: OrderedMap<string, any>()
}
} else {
const contents = readFileSync(filename)
const loaded = JSONbig.parse(contents)
return {
heights: OrderedMap<string, any>(loaded.heights),
}
}
}

/**
* Store the fetcher config.
* @param home the storage root directory
* @param state fetcher state
*/
export function saveFetcherState(home: string, state: FetcherState): string {
const filename = getFetcherStateFilename(home)
const simplified = {
heights: state.heights.toArray(),
}
const contents = JSONbig.stringify(simplified)
writeFileSync(filename, contents)
return filename
}

/**
* Get the filename for a contract call entry. Create the parent directory, if required.
*
Expand All @@ -119,6 +175,18 @@ function getEntryFilename(root: string, entry: ContractCallEntry) {
return join(dir, `entry-${entry.txHash}.json`)
}

/**
* Get the filename for a contract call entry. Create the parent directory, if required.
*
* @param root storage root
* @param entry call entry
* @returns the filename
*/
function getFetcherStateFilename(root: string) {
mkdirSync(root, { recursive: true })
return join(root, 'fetcher-state.json')
}

/**
* Return the parent directory for an entry.
* If this directory does not exist, create it recursively.
Expand Down
25 changes: 19 additions & 6 deletions solarkraft/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,22 @@ import { version } from './version.js'
import { fetch } from './fetch.js'
import { verify } from './verify.js'
import { list } from './list.js'
import { join } from 'node:path'
import { homedir } from 'node:os'

// The default options present in every command
const defaultOpts = (yargs: any) =>
yargs.option('color', {
desc: 'color output',
type: 'boolean',
default: true,
})
desc: 'color output',
type: 'boolean',
default: true,
})
.option('home', {
desc: 'Solarkraft home directory (or project directory)',
type: 'string',
default: join(homedir(), '.solarkraft'),
})


// fetch: transaction extractor
const fetchCmd = {
Expand All @@ -31,14 +39,19 @@ const fetchCmd = {
require: true,
})
.option('rpc', {
desc: 'URL of the Stellar RPC',
desc: 'URL of a Horizon endpoint',
type: 'string',
default: 'http://localhost:8000',
default: 'https://horizon-testnet.stellar.org',
})
.option('height', {
desc: 'The height to start with (a negative value -n goes from the latest block - n)',
type: 'number',
default: -10,
})
.option('timeout', {
desc: 'Fetcher timeout in seconds (when 0, fetch indefinitely long)',
type: 'number',
default: 0,
}),
handler: fetch,
}
Expand Down
3 changes: 2 additions & 1 deletion solarkraft/test/unit/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { join } from 'node:path'
import {
loadContractCallEntry,
saveContractCallEntry,
storagePath,
} from '../../src/fetcher/storage.js'
import { OrderedMap } from 'immutable'

Expand Down Expand Up @@ -48,7 +49,7 @@ describe('storage tests', () => {

assert.equal(
filename,
join(root, CONTRACT_ID, '1000', `entry-${TX_HASH}.json`)
join(storagePath(root), CONTRACT_ID, '1000', `entry-${TX_HASH}.json`)
)
})

Expand Down

0 comments on commit d3a42de

Please sign in to comment.