Skip to content

Commit

Permalink
Problem: could not aggregate event-level data in a stats manager, bad…
Browse files Browse the repository at this point in the history
… wording of stats classes

Solution: add tickStats as an event-level aggregator without TimeFrames and integrate with accountTimeSeries.ts; rename timeSeries.ts to timeFrameStats.ts and related classes; also add docs
  • Loading branch information
MHHukiewitz committed Jan 30, 2023
1 parent 552eb71 commit 39419ed
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 133 deletions.
15 changes: 8 additions & 7 deletions packages/framework/src/utils/domain/indexer/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ import {
IndexerMainDomainContext,
} from '../../../services/indexer/src/types.js'
import { Blockchain } from '../../../types.js'
import {
AccountTimeSeriesStats,
AccountStatsFilters,
AccountStats,
} from '../../stats/types.js'
import { AccountStats, AccountTimeSeriesStats, TimeSeriesStatsFilters } from '../../stats/types.js'

/**
* Describes the main indexer domain class capable of calculating stats.
Expand All @@ -24,6 +20,7 @@ export type IndexerMainDomainWithStats = {
updateStats(now: number): Promise<void>
/**
* Returns the time-series stats for the given account.
* @param blockchainId The blockchain to get the time-series stats from.
* @param accounts The accounts to get the time-series stats from.
* @param type The type of time-series to get.
* @param filters The transformations and clipping to apply to the time-series.
Expand All @@ -32,10 +29,11 @@ export type IndexerMainDomainWithStats = {
blockchainId: Blockchain,
accounts: string[],
type: string,
filters: AccountStatsFilters,
filters: TimeSeriesStatsFilters,
): Promise<AccountTimeSeriesStats[]>
/**
* Returns the global stats for the given accounts.
* @param blockchainId The blockchain to get the summary from.
* @param accounts The accounts to get the summary from.
*/
getAccountStats(
Expand Down Expand Up @@ -146,6 +144,7 @@ export abstract class IndexerMainDomain {

/**
* Gets the indexing state of the given accounts.
* @param blockchainId The blockchain to get the state from.
* @param accounts The accounts to get the state from.
*/
async getAccountState(
Expand All @@ -165,6 +164,7 @@ export abstract class IndexerMainDomain {

/**
* Returns the time-series stats for the given account.
* @param blockchainId The blockchain to get the time-series stats from.
* @param accounts The accounts to get the time-series stats from.
* @param type The type of time-series to get.
* @param filters The transformations and clipping to apply to the time-series.
Expand All @@ -173,7 +173,7 @@ export abstract class IndexerMainDomain {
blockchainId: Blockchain,
accounts: string[] = [],
type: string,
filters: AccountStatsFilters,
filters: TimeSeriesStatsFilters,
): Promise<AccountTimeSeriesStats<V>[]> {
this.checkStats()

Expand All @@ -196,6 +196,7 @@ export abstract class IndexerMainDomain {

/**
* Returns the global stats for the given accounts.
* @param blockchainId The blockchain to get the summary from.
* @param accounts The accounts to get the summary from.
*/
async getAccountStats<V>(
Expand Down
8 changes: 2 additions & 6 deletions packages/framework/src/utils/domain/indexer/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ import {
TransactionDateRangeResponse,
} from '../../../services/indexer/src/types.js'
import { Blockchain, ParsedTransaction } from '../../../types.js'
import {
AccountTimeSeriesStats,
AccountStatsFilters,
AccountStats,
} from '../../stats/index.js'
import { AccountStats, AccountTimeSeriesStats, TimeSeriesStatsFilters } from '../../stats/index.js'
import { WorkerKind } from '../../workers.js'
import { importBlockchainWorkerIndexerDomain } from '../common.js'

Expand All @@ -21,7 +17,7 @@ export type IndexerWorkerDomainWithStats = {
getTimeSeriesStats(
account: string,
type: string,
filters: AccountStatsFilters,
filters: TimeSeriesStatsFilters,
): Promise<AccountTimeSeriesStats>
getStats(account: string): Promise<AccountStats>
}
Expand Down
41 changes: 16 additions & 25 deletions packages/framework/src/utils/stats/accountTimeSeries.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
import { Utils } from '@aleph-indexer/core'
import { IndexerMsClient } from '../../services/indexer/index.js'
import { DateRange, getDateRangeFromInterval, mergeDateRangesFromIterable, TimeFrame } from '../time.js'
import {
DateRange,
getDateRangeFromInterval,
mergeDateRangesFromIterable,
} from '../time.js'
import {
StatsStateStorage,
StatsStateState,
StatsStateDALIndex,
StatsState,
} from './dal/statsState.js'
import { StatsTimeSeriesStorage } from './dal/statsTimeSeries.js'
import {
AccountTimeSeriesStats,
AccountStatsFilters,
AccountTimeSeriesStatsConfig,
AccountStats,
} from './types.js'
TimeFrameState,
TimeFrameStateCode,
TimeFrameStateDALIndex,
TimeFrameStateStorage,
} from './dal/timeFrameState.js'
import { TimeFrameStatsStorage } from './dal/timeFrameEntity.js'
import { AccountStats, AccountTimeSeriesStats, AccountTimeSeriesStatsConfig, TimeSeriesStatsFilters } from './types.js'

const { JobRunner } = Utils

Expand All @@ -31,8 +22,8 @@ export class AccountTimeSeriesStatsManager<V> {
constructor(
public config: AccountTimeSeriesStatsConfig<V>,
protected indexerClient: IndexerMsClient,
protected stateDAL: StatsStateStorage,
protected timeSeriesDAL: StatsTimeSeriesStorage,
protected stateDAL: TimeFrameStateStorage,
protected timeSeriesDAL: TimeFrameStatsStorage,
) {
this.compactionJob = new JobRunner({
name: `stats-compactor ${config.account}`,
Expand All @@ -49,7 +40,7 @@ export class AccountTimeSeriesStatsManager<V> {

async getTimeSeriesStats(
type: string,
filters: AccountStatsFilters,
filters: TimeSeriesStatsFilters,
): Promise<AccountTimeSeriesStats> {
const { account } = this.config

Expand All @@ -61,7 +52,7 @@ export class AccountTimeSeriesStatsManager<V> {
throw new Error(`Stats for ${account} of type "${type}" not found`)

const series = await timeSeries.getStats(account, filters)
const { timeFrame } = filters
const timeFrame = filters.timeFrame ?? TimeFrame.Tick

return {
account,
Expand Down Expand Up @@ -137,10 +128,10 @@ export class AccountTimeSeriesStatsManager<V> {

protected async compactStates(): Promise<void> {
const { account } = this.config
const { Processed } = StatsStateState
const { Processed } = TimeFrameStateCode

const fetchedRanges = await this.stateDAL
.useIndex(StatsStateDALIndex.AccountTypeState)
.useIndex(TimeFrameStateDALIndex.AccountTypeState)
.getAllValuesFromTo([account, Processed], [account, Processed], {
reverse: false,
})
Expand All @@ -150,14 +141,14 @@ export class AccountTimeSeriesStatsManager<V> {
)

const newStates = newRanges.map((range) => {
const newState = range as StatsState
const newState = range as TimeFrameState
newState.account = account
newState.state = Processed
return newState
})

const oldStates = oldRanges.map((range) => {
const oldState = range as StatsState
const oldState = range as TimeFrameState
oldState.account = account
oldState.state = Processed
return oldState
Expand Down
4 changes: 2 additions & 2 deletions packages/framework/src/utils/stats/dal/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export * from './statsState.js'
export * from './statsTimeSeries.js'
export * from './timeFrameState.js'
export * from './timeFrameEntity.js'
33 changes: 33 additions & 0 deletions packages/framework/src/utils/stats/dal/tickEntity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { EntityStorage } from '@aleph-indexer/core'

export type TickEntity<T> = {
account: string
type: string
date: number
data: T
}

export type TickStatsStorage = EntityStorage<TickEntity<any>>

const accountKey = {
get: (e: TickEntity<unknown>) => e.account,
length: EntityStorage.AddressLength,
}

const typeKey = {
get: (e: TickEntity<unknown>) => e.type,
length: EntityStorage.VariableLength,
}

const dateKey = {
get: (e: TickEntity<unknown>) => e.date,
length: EntityStorage.TimestampLength,
}

export function createStatsTimeSeriesDAL(path: string): TickStatsStorage {
return new EntityStorage<TickEntity<any>>({
name: 'stats_time_series',
path,
key: [accountKey, typeKey, dateKey],
})
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EntityStorage } from '@aleph-indexer/core'
import { TimeFrame } from '../../time.js'

export type StatsTimeSeries<T> = {
export type TimeFrameEntity<T> = {
account: string
type: string
timeFrame: TimeFrame
Expand All @@ -13,34 +13,34 @@ export type StatsTimeSeries<T> = {
/**
* Stats Time Series Entity Storage.
*/
export type StatsTimeSeriesStorage = EntityStorage<StatsTimeSeries<any>>
export type TimeFrameStatsStorage = EntityStorage<TimeFrameEntity<any>>

const accountKey = {
get: (e: StatsTimeSeries<unknown>) => e.account,
get: (e: TimeFrameEntity<unknown>) => e.account,
length: EntityStorage.AddressLength,
}

const typeKey = {
get: (e: StatsTimeSeries<unknown>) => e.type,
get: (e: TimeFrameEntity<unknown>) => e.type,
length: EntityStorage.VariableLength,
}

const timeFrameKey = {
get: (e: StatsTimeSeries<unknown>) => e.timeFrame,
get: (e: TimeFrameEntity<unknown>) => e.timeFrame,
length: 3,
}

// @note: start date in millis of the interval
const startDateKey = {
get: (e: StatsTimeSeries<unknown>) => e.startDate,
get: (e: TimeFrameEntity<unknown>) => e.startDate,
length: EntityStorage.TimestampLength,
}

/**
* Creates a stats time series Entity Storage.
*/
export function createStatsTimeSeriesDAL(path: string): StatsTimeSeriesStorage {
return new EntityStorage<StatsTimeSeries<any>>({
export function createStatsTimeSeriesDAL(path: string): TimeFrameStatsStorage {
return new EntityStorage<TimeFrameEntity<any>>({
name: 'stats_time_series',
path,
key: [accountKey, typeKey, timeFrameKey, startDateKey],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,66 +1,66 @@
import { EntityStorage } from '@aleph-indexer/core'
import { TimeFrame } from '../../time.js'

export enum StatsStateState {
export enum TimeFrameStateCode {
Processing = 0,
Processed = 1,
}

export type StatsState = {
export type TimeFrameState = {
account: string
type: string
timeFrame: TimeFrame
startDate: number
endDate: number
state: StatsStateState
state: TimeFrameStateCode
}

/**
* Stats Entity Storage.
*/
export type StatsStateStorage = EntityStorage<StatsState>
export type TimeFrameStateStorage = EntityStorage<TimeFrameState>

export enum StatsStateDALIndex {
export enum TimeFrameStateDALIndex {
AccountTypeState = 'account_type_state',
}

const accountKey = {
get: (e: StatsState) => e.account,
get: (e: TimeFrameState) => e.account,
length: EntityStorage.AddressLength,
}

const typeKey = {
get: (e: StatsState) => e.type,
get: (e: TimeFrameState) => e.type,
length: EntityStorage.VariableLength,
}

const timeFrameKey = {
get: (e: StatsState) => e.timeFrame,
get: (e: TimeFrameState) => e.timeFrame,
length: 2,
}

// @note: start date in millis of the interval
const startDateKey = {
get: (e: StatsState) => e.startDate,
get: (e: TimeFrameState) => e.startDate,
length: EntityStorage.TimestampLength,
}

const stateKey = {
get: (e: StatsState) => e.state || StatsStateState.Processing,
get: (e: TimeFrameState) => e.state || TimeFrameStateCode.Processing,
length: 1,
}

/**
* Creates a stats Entity Storage.
*/
export function createStatsStateDAL(path: string): StatsStateStorage {
return new EntityStorage<StatsState>({
name: 'stats_state',
export function createTimeFrameStateDAL(path: string): TimeFrameStateStorage {
return new EntityStorage<TimeFrameState>({
name: 'time_frame_state',
path,
key: [accountKey, typeKey, timeFrameKey, startDateKey],
indexes: [
{
name: StatsStateDALIndex.AccountTypeState,
name: TimeFrameStateDALIndex.AccountTypeState,
key: [accountKey, typeKey, stateKey],
},
],
Expand Down
2 changes: 1 addition & 1 deletion packages/framework/src/utils/stats/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export * from './accountTimeSeries.js'
export * from './timeSeries.js'
export * from './timeFrameStats.js'
export * from './dal/index.js'
export * from './types.js'
26 changes: 26 additions & 0 deletions packages/framework/src/utils/stats/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { TimeSeries, TimeSeriesStatsConfig, TimeSeriesStatsFilters } from './types.js'
import { DateRange } from '../time.js'
import { EventBase } from '../../types.js'

/**
* Provides outward-facing methods of the stats aggregator.
*/
export interface StatsI<I extends EventBase<any>, O> {
config: TimeSeriesStatsConfig<I, O>;

/**
* Get the stats for a given interval, possibly time frame size and account.
* @param args The interval, time frame size, limits and whether to reverse the order of the time series.
* @param account The account to get the stats for.
*/
getStats(account: string, args: TimeSeriesStatsFilters): Promise<TimeSeries<O>>;

/**
* Process the events for a given account into time frames.
* @param account The account to process the events for.
* @param now The current unix timestamp.
* @param pendingDateRanges The requested time frames to process.
* @param minDate
*/
process(account: string, now: number, pendingDateRanges: DateRange[], minDate: number | undefined): Promise<void>;
}
Loading

0 comments on commit 39419ed

Please sign in to comment.