Skip to content

Commit

Permalink
Merge pull request #60 from freespek/igor/fetcher-loop-really
Browse files Browse the repository at this point in the history
working fetcher CLI
  • Loading branch information
konnov authored May 17, 2024
2 parents 1ec07e5 + 10d04dc commit c900229
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 45 deletions.
106 changes: 73 additions & 33 deletions solarkraft/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,64 +10,104 @@
*/

import { Horizon } from '@stellar/stellar-sdk'
import JSONbigint from 'json-bigint'
import { extractContractCall } from './fetcher/callDecoder.js'
import {
loadFetcherState,
saveContractCallEntry,
saveFetcherState,
} from './fetcher/storage.js'

// how often to query for the latest synchronized height
const HEIGHT_FETCHING_PERIOD = 100

/**
* Fetch transactions from the ledger
* @param args the arguments parsed by yargs
*/
export async function fetch(args: any) {
const server = new Horizon.Server(args.rpc)

const contractId = args.id
console.log(`Target contract: ${contractId}...`)
let fetcherState = loadFetcherState(args.home)
const cachedHeight = fetcherState.heights.get(contractId, 1)
let lastHeight = args.height
console.log(`Last cached height: ${cachedHeight}`)
if (args.height < 0) {
// how to fetch the latest height?
console.log(`not implemented yet, starting with ${cachedHeight}`)
lastHeight = cachedHeight
} else if (args.height === 0) {
lastHeight = cachedHeight
} else {
lastHeight = args.height
}

console.log(`Fetching fresh transactions from: ${args.rpc}...`)
/*
// read the latest height cached from the latest invocation of fetch
const cachedHeight = 0
// fetch the actual height from the RPC endpoint
const currentHeight = 12345
let startHeight =
args.height < 0 ? currentHeight + args.height : args.height
startHeight = Math.max(startHeight, cachedHeight)
console.log(`| ledger | cached | start |`)
console.log(`| ${currentHeight} | ${cachedHeight} | ${startHeight} |\n`)
*/
const startHeight = args.height

const DURATION = 30
const server = new Horizon.Server(args.rpc)
console.log(`Fetching the ledger for ${startHeight}`)
const response = await server.ledgers().ledger(startHeight).call()
//console.log(response)
console.log(`Fetching the ledger for ${lastHeight}`)
const response = await server.ledgers().ledger(lastHeight).call()
const startCursor = (response as any).paging_token
// timeout id, if a timeout is set below
let timeout

// See:
// https://github.com/stellar/js-stellar-sdk/blob/master/test/integration/streaming_test.js
const finish = (err) => {
const done = (err) => {
// Closing handler:
// https://github.com/stellar/js-stellar-sdk/blob/master/test/integration/streaming_test.js
clearTimeout(timeout)
closeHandler()
console.error(err)
}

// TODO: work in progress
// the number of received events
let nEvents = 0

// initiate the streaming loop
const closeHandler = server
.operations()
.cursor(startCursor)
.stream({
onmessage: async (msg: any) => {
if (msg.transaction_successful === true) {
;(
await extractContractCall(
msg,
(id) => contractId === id
)
).map((e) => {
console.log(`call => ${JSONbigint.stringify(e)}`)
})
if (msg.transaction_successful) {
const callEntryMaybe = await extractContractCall(
msg,
(id) => contractId === id
)
if (callEntryMaybe.isJust()) {
const entry = callEntryMaybe.value
console.log(`+ save: ${entry.height}`)
saveContractCallEntry(args.home, entry)
}
} // else: shall we also store reverted transactions?

nEvents++
if (nEvents % HEIGHT_FETCHING_PERIOD === 0) {
// Fetch the height of the current message and persist it for the future runs.
// Note that messages may come slightly out of order, so the heights are not precise.
const tx = await msg.transaction()
lastHeight = Math.max(lastHeight, tx.ledger_attr)
console.log(`= at: ${lastHeight}`)
// Load and save the state. Other fetchers may work concurrently,
// so there is a possibility of overwriting an updated height.
// This will result in a fetcher doing additional work on the next run.
fetcherState = loadFetcherState(args.home)
fetcherState = {
...fetcherState,
heights: fetcherState.heights.set(
contractId,
lastHeight
),
}
saveFetcherState(args.home, fetcherState)
}
},
onerror: finish,
onerror: done,
})

const timeout = setTimeout(finish, DURATION * 1000)
if (args.timeout > 0) {
console.log(`Fetching transactions for ${args.timeout} seconds.`)
timeout = setTimeout(done, args.timeout * 1000)
} else {
console.log('Fetching transactions indefinitely. Close with Ctrl-C.')
}
}
77 changes: 73 additions & 4 deletions solarkraft/src/fetcher/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import JSONbigint from 'json-bigint'
import { OrderedMap } from 'immutable'
import { join } from 'node:path/posix'
import { mkdirSync, readFileSync, writeFileSync } from 'node:fs'
import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs'

const JSONbig = JSONbigint({ useNativeBigInt: true })

Expand Down Expand Up @@ -68,13 +68,34 @@ export interface ContractCallEntry {
oldFields: FieldsMap
}

/**
* Serializable fetcher state.
*/
export interface FetcherState {
/**
* For every contract id, store the ledger height,
* up to which the transactions were fetched.
* Similar to Stellar SDK, we are using number instead of bigint.
*/
heights: OrderedMap<string, number>
}

/**
* Given the solarkraft home, construct the path to store the transactions.
* @param solarkraftHome path to solarkraft home (or project directory)
* @returns path to the transactions storage
*/
export function storagePath(solarkraftHome: string): string {
return join(solarkraftHome, '.stor')
}

/**
* Store a contract call entry in the file storage.
* @param root the storage root directory
* @param home the storage root directory
* @param entry a call entry
*/
export function saveContractCallEntry(root: string, entry: ContractCallEntry) {
const filename = getEntryFilename(root, entry)
export function saveContractCallEntry(home: string, entry: ContractCallEntry) {
const filename = getEntryFilename(storagePath(home), entry)
// convert OrderedMaps to arrays
const simplified = {
...entry,
Expand Down Expand Up @@ -107,6 +128,43 @@ export function loadContractCallEntry(filename: string): ContractCallEntry {
}
}

/**
* Load fetcher state from the storage.
* @param root the storage root directory
* @returns the loaded state
*/
export function loadFetcherState(home: string): FetcherState {
const filename = getFetcherStateFilename(home)
if (!existsSync(filename)) {
// just return an empty map
return {
heights: OrderedMap<string, number>(),
}
} else {
const contents = readFileSync(filename)
const loaded = JSONbig.parse(contents)
return {
heights: OrderedMap<string, number>(loaded.heights),
}
}
}

/**
* Store the fetcher config.
* @param home the storage root directory
* @param state fetcher state
*/
export function saveFetcherState(home: string, state: FetcherState): string {
const filename = getFetcherStateFilename(home)
mkdirSync(home, { recursive: true })
const simplified = {
heights: state.heights.toArray(),
}
const contents = JSONbig.stringify(simplified)
writeFileSync(filename, contents)
return filename
}

/**
* Get the filename for a contract call entry. Create the parent directory, if required.
*
Expand All @@ -119,6 +177,17 @@ function getEntryFilename(root: string, entry: ContractCallEntry) {
return join(dir, `entry-${entry.txHash}.json`)
}

/**
* Get the filename for the fetcher state.
*
* @param root storage root
* @param entry call entry
* @returns the filename
*/
function getFetcherStateFilename(root: string) {
return join(root, 'fetcher-state.json')
}

/**
* Return the parent directory for an entry.
* If this directory does not exist, create it recursively.
Expand Down
27 changes: 20 additions & 7 deletions solarkraft/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,22 @@ import { version } from './version.js'
import { fetch } from './fetch.js'
import { verify } from './verify.js'
import { list } from './list.js'
import { join } from 'node:path'
import { homedir } from 'node:os'

// The default options present in every command
const defaultOpts = (yargs: any) =>
yargs.option('color', {
desc: 'color output',
type: 'boolean',
default: true,
})
yargs
.option('color', {
desc: 'color output',
type: 'boolean',
default: true,
})
.option('home', {
desc: 'Solarkraft home directory (or project directory)',
type: 'string',
default: join(homedir(), '.solarkraft'),
})

// fetch: transaction extractor
const fetchCmd = {
Expand All @@ -31,14 +39,19 @@ const fetchCmd = {
require: true,
})
.option('rpc', {
desc: 'URL of the Stellar RPC',
desc: 'URL of a Horizon endpoint',
type: 'string',
default: 'http://localhost:8000',
default: 'https://horizon-testnet.stellar.org',
})
.option('height', {
desc: 'The height to start with (a negative value -n goes from the latest block - n)',
type: 'number',
default: -10,
})
.option('timeout', {
desc: 'Fetcher timeout in seconds (when 0, fetch indefinitely long)',
type: 'number',
default: 0,
}),
handler: fetch,
}
Expand Down
8 changes: 7 additions & 1 deletion solarkraft/test/unit/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { join } from 'node:path'
import {
loadContractCallEntry,
saveContractCallEntry,
storagePath,
} from '../../src/fetcher/storage.js'
import { OrderedMap } from 'immutable'

Expand Down Expand Up @@ -48,7 +49,12 @@ describe('storage tests', () => {

assert.equal(
filename,
join(root, CONTRACT_ID, '1000', `entry-${TX_HASH}.json`)
join(
storagePath(root),
CONTRACT_ID,
'1000',
`entry-${TX_HASH}.json`
)
)
})

Expand Down

0 comments on commit c900229

Please sign in to comment.