diff --git a/snapshotter/modules/pooler/uniswapv2/aggregate/single_uniswap_trade_volume_7d.py b/snapshotter/modules/pooler/uniswapv2/aggregate/single_uniswap_trade_volume_7d.py index 7e9831b0..8ac8442d 100644 --- a/snapshotter/modules/pooler/uniswapv2/aggregate/single_uniswap_trade_volume_7d.py +++ b/snapshotter/modules/pooler/uniswapv2/aggregate/single_uniswap_trade_volume_7d.py @@ -4,7 +4,6 @@ from ipfs_client.main import AsyncIPFSClient from redis import asyncio as aioredis -from ..utils.helpers import get_pair_metadata from ..utils.models.message_models import UniswapTradesAggregateSnapshot from snapshotter.utils.callback_helpers import GenericProcessorAggregate from snapshotter.utils.data_utils import get_project_epoch_snapshot @@ -67,19 +66,13 @@ async def compute( contract = project_id.split(':')[-2] - pair_metadata = await get_pair_metadata( - pair_address=contract, - redis_conn=redis, - rpc_helper=rpc_helper, - ) aggregate_snapshot = UniswapTradesAggregateSnapshot( epochId=msg_obj.epochId, ) # 24h snapshots fetches snapshot_tasks = list() self._logger.debug('fetching 24hour aggregates spaced out by 1 day over 7 days...') - # 1. find one day tail epoch - count = 0 + count = 1 self._logger.debug( 'fetch # {}: queueing task for 24h aggregate snapshot for project ID {}' ' at currently received epoch ID {} with snasphot CID {}', @@ -91,32 +84,23 @@ async def compute( redis, msg_obj.snapshotCid, ipfs_reader, msg_obj.projectId, ), ) + seek_stop_flag = False head_epoch = msg_obj.epochId # 2. if not extrapolated, attempt to seek further back - while not seek_stop_flag or count < 7: + while not seek_stop_flag and count < 7: tail_epoch_id, seek_stop_flag = await get_tail_epoch_id( redis, protocol_state_contract, anchor_rpc_helper, head_epoch, 86400, msg_obj.projectId, ) count += 1 - if not seek_stop_flag or count > 1: - self._logger.debug( - 'fetch # {}: for 7d aggregated trade volume calculations: ' - 'queueing task for 24h aggregate snapshot for project ID {} at rewinded epoch ID {}', - count, msg_obj.projectId, tail_epoch_id, - ) - snapshot_tasks.append( - get_project_epoch_snapshot( - redis, protocol_state_contract, anchor_rpc_helper, - ipfs_reader, tail_epoch_id, msg_obj.projectId, - ), - ) - head_epoch = tail_epoch_id - 1 - if count == 7: - self._logger.info( - 'fetch # {}: reached 7 day limit for 24h aggregate snapshots for project ID {} at rewinded epoch ID {}', - count, msg_obj.projectId, tail_epoch_id, + snapshot_tasks.append( + get_project_epoch_snapshot( + redis, protocol_state_contract, anchor_rpc_helper, + ipfs_reader, tail_epoch_id, msg_obj.projectId, + ), ) + head_epoch = tail_epoch_id - 1 + all_snapshots = await asyncio.gather(*snapshot_tasks, return_exceptions=True) self._logger.debug( 'for 7d aggregated trade volume calculations: fetched {} ' @@ -136,4 +120,6 @@ async def compute( if not all(complete_flags) or count < 7: aggregate_snapshot.complete = False + else: + aggregate_snapshot.complete = True return aggregate_snapshot