You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am trying to write an integration test for bar aggregation using historical trades from Polygon.io.
I am getting a pytest discovery error and cannot figure out how to fix/proceed, even with the help of cursor IDE. Can someone please help me?
Objectives
Test that trades can be aggregated into 1s bars without any gaps. If there are no trades for a period, OHLC should be last close value and V should be 0
Test that 1s bars can be aggregated into 5s bars without any gaps
TypeError: cannot set 'pytest_asyncio_scoped_event_loop' attribute of immutable type 'nautilus_trader.common.component.TestClock'
platform linux -- Python 3.12.8, pytest-8.3.3, pluggy-1.5.0
2024-12-05 17:20:33.294 [info] rootdir: /home/user/Code/crypto_nautilus
configfile: pyproject.toml
plugins: hypothesis-6.119.1, asyncio-0.24.0, anyio-4.6.2.post1, timeout-2.3.1
asyncio: mode=Mode.STRICT, default_loop_scope=function
2024-12-05 17:20:33.804 [info] collected 0 items
2024-12-05 17:20:33.807 [info] INTERNALERROR> Traceback (most recent call last):
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/_pytest/main.py", line 283, in wrap_session
INTERNALERROR> session.exitstatus = doit(config, session) or 0
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/_pytest/main.py", line 336, in _main
INTERNALERROR> config.hook.pytest_collection(session=session)
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513, in __call__
INTERNALERROR> return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-12-05 17:20:33.809 [info]
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120, in _hookexec
INTERNALERROR> return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/pluggy/_callers.py", line 139, in _multicall
INTERNALERROR> raise exception.with_traceback(exception.__traceback__)
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/pluggy/_callers.py", line 122, in _multicall
INTERNALERROR> teardown.throw(exception) # type: ignore[union-attr]
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/_pytest/logging.py", line 790, in pytest_collection
INTERNALERROR> return (yield)
INTERNALERROR> ^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/pluggy/_callers.py", line 122, in _multicall
INTERNALERROR> teardown.throw(exception) # type: ignore[union-attr]
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/_pytest/warnings.py", line 121, in pytest_collection
INTERNALERROR> return (yield)
INTERNALERROR> ^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/pluggy/_callers.py", line 122, in _multicall
INTERNALERROR> teardown.throw(exception) # type: ignore[union-attr]
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/_pytest/config/__init__.py", line 1417, in pytest_collection
INTERNALERROR> return (yield)
INTERNALERROR> ^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103, in _multicall
INTERNALERROR> res = hook_impl.function(*args)
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/_pytest/main.py", line 347, in pytest_collection
INTERNALERROR> session.perform_collect()
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/_pytest/main.py", line 809, in perform_collect
INTERNALERROR> self.items.extend(self.genitems(node))
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/_pytest/main.py", line 975, in genitems
INTERNALERROR> yield from self.genitems(subnode)
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/_pytest/main.py", line 975, in genitems
INTERNALERROR> yield from self.genitems(subnode)
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/_pytest/main.py", line 970, in genitems
INTERNALERROR> rep, duplicate = self._collect_one_node(node, handle_dupes)
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/_pytest/main.py", line 835, in _collect_one_node
INTERNALERROR> rep = collect_one_node(node)
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/_pytest/runner.py", line 566, in collect_one_node
INTERNALERROR> ihook.pytest_collectstart(collector=collector)
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/pluggy/_hooks.py", line 513, in __call__
INTERNALERROR> return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/pluggy/_manager.py", line 120, in _hookexec
INTERNALERROR> return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/pluggy/_callers.py", line 139, in _multicall
INTERNALERROR> raise exception.with_traceback(exception.__traceback__)
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/pluggy/_callers.py", line 103, in _multicall
INTERNALERROR> res = hook_impl.function(*args)
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^^^^^
INTERNALERROR> File "/home/user/Code/crypto_nautilus/.venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py", line 680, in pytest_collectstart
INTERNALERROR> collector.obj.__pytest_asyncio_scoped_event_loop = scoped_event_loop
INTERNALERROR> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
INTERNALERROR> TypeError: cannot set '__pytest_asyncio_scoped_event_loop' attribute of immutable type 'nautilus_trader.common.component.TestClock'
2024-12-05 17:20:33.810 [info]
========================= no tests collected in 0.52s ==========================
2024-12-05 17:20:33.812 [error] pytest test discovery error for workspace: /home/user/Code/crypto_nautilus
TypeError: cannot set '__pytest_asyncio_scoped_event_loop' attribute of immutable type 'nautilus_trader.common.component.TestClock'
Check Python Test Logs for more details.
2024-12-05 17:20:33.813 [error] pytest test discovery error for workspace: /home/user/Code/crypto_nautilus
TypeError: cannot set '__pytest_asyncio_scoped_event_loop' attribute of immutable type 'nautilus_trader.common.component.TestClock'
Check Python Test Logs for more details.
2024-12-05 17:20:34.137 [error] Subprocess exited unsuccessfully with exit code 3 and signal null on workspace /home/user/Code/crypto_nautilus.
2024-12-05 17:20:34.137 [error] Subprocess exited unsuccessfully with exit code 3 and signal null on workspace /home/user/Code/crypto_nautilus. Creating and sending error discovery payload
2024-12-05 17:20:34.137 [error] pytest test discovery error for workspace: /home/user/Code/crypto_nautilus
The python test process was terminated before it could exit on its own, the process errored with: Code: 3, Signal: null for workspace /home/user/Code/crypto_nautilus
Relevant Files
Test file (tests/test_bar_aggregation.py):
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Optional
from zoneinfo import ZoneInfo
import pandas as pd
import pytest
from nautilus_trader.cache.cache import Cache
from nautilus_trader.common.component import MessageBus, TestClock
from nautilus_trader.common.providers import InstrumentProvider
from nautilus_trader.core.uuid import UUID4
from nautilus_trader.data.aggregation import TimeBarAggregator
from nautilus_trader.model.data import (
Bar,
BarAggregation,
BarSpecification,
BarType,
TradeTick,
)
from nautilus_trader.model.enums import AggregationSource, PriceType
from nautilus_trader.model.identifiers import (
ClientId,
InstrumentId,
Symbol,
TraderId,
Venue,
)
from nautilus_trader.model.instruments import CurrencyPair, Instrument
from nautilus_trader.model.objects import Currency, Price, Quantity
from nautilus_trader.portfolio import PortfolioFacade
from nautilus_trader.trading.strategy import Strategy, StrategyConfig
from crypto_nautilus.config.settings import settings
from crypto_nautilus.polygon_crypto_client import PolygonCryptoClient
from crypto_nautilus.polygon_crypto_data import PolygonCryptoData
from crypto_nautilus.utils.logging import configure_test_logging, unified_logger
class HistoricalBarAggregationStrategyConfig(StrategyConfig):
"""Configuration for HistoricalBarAggregationStrategy."""
__msgspec_kw_only__ = True
__msgspec_frozen__ = True
instrument_id: InstrumentId
class HistoricalBarAggregationStrategy(Strategy):
"""Strategy for testing historical bar aggregation."""
def __init__(self, config: HistoricalBarAggregationStrategyConfig):
super().__init__(config=config)
self._instrument_id = config.instrument_id
self._initialized = False
self.bars_1s: list[Bar] = [] # Store 1s bars
self.bars_5s: list[Bar] = [] # Store 5s bars
self._trades_received = False
self._1s_aggregator: Optional[TimeBarAggregator] = None
self._5s_aggregator: Optional[TimeBarAggregator] = None
self._last_trade_price = None
def on_start(self):
"""Called when strategy starts."""
if not self._initialized:
self.instrument = self.cache.instrument(self._instrument_id)
# Define bar types
self.bar_type_1s = BarType(
self.instrument.id,
BarSpecification(1, BarAggregation.SECOND, PriceType.LAST),
aggregation_source=AggregationSource.EXTERNAL,
)
self.bar_type_5s = BarType(
self.instrument.id,
BarSpecification(5, BarAggregation.SECOND, PriceType.LAST),
aggregation_source=AggregationSource.INTERNAL,
)
# Create aggregators with build_with_no_updates=True
self._1s_aggregator = TimeBarAggregator(
instrument=self.instrument,
bar_type=self.bar_type_1s,
handler=self._publish_bars,
clock=self.clock,
build_with_no_updates=True, # Important: build bars even without updates
timestamp_on_close=True,
)
self._5s_aggregator = TimeBarAggregator(
instrument=self.instrument,
bar_type=self.bar_type_5s,
handler=self._publish_bars,
clock=self.clock,
build_with_no_updates=True, # Important: build bars even without updates
timestamp_on_close=True,
)
# Subscribe to bars
self.subscribe_bars(self.bar_type_1s)
self.subscribe_bars(self.bar_type_5s)
# Subscribe to trade ticks
self.subscribe_trade_ticks(self.instrument.id)
self._initialized = True
def on_trade_tick(self, tick: TradeTick):
"""Called when a trade tick is received."""
self._trades_received = True
self._last_trade_price = tick.price
# Update 1s aggregator with trade
self._1s_aggregator.handle_trade_tick(tick)
def on_bar(self, bar: Bar):
"""Called when a bar is received."""
if bar.bar_type == self.bar_type_1s:
# Store 1s bar
self.bars_1s.append(bar)
self._log.debug(f"Received 1s bar: {bar}")
# Update 5s aggregator with 1s bar
self._5s_aggregator.handle_bar(bar)
elif bar.bar_type == self.bar_type_5s:
# Store 5s bar
self.bars_5s.append(bar)
self._log.debug(f"Received 5s bar: {bar}")
def _publish_bars(self, bar: Bar):
"""Publish bar to message bus."""
# If bar has no volume, use last trade price for OHLC
if (
bar.volume == Quantity.zero(self.instrument.size_precision)
and self._last_trade_price
):
bar = Bar(
bar_type=bar.bar_type,
open=self._last_trade_price,
high=self._last_trade_price,
low=self._last_trade_price,
close=self._last_trade_price,
volume=Quantity.zero(self.instrument.size_precision),
ts_event=bar.ts_event,
ts_init=bar.ts_init,
)
self._msgbus.publish(
topic=f"data.bars.{bar.bar_type}",
msg=bar,
)
@pytest.fixture(autouse=True)
def setup_logging():
"""Configure logging for tests."""
configure_test_logging()
yield
@pytest.fixture
def btc_instrument() -> CurrencyPair:
"""Create BTC-USD test instrument."""
venue = Venue("POLYGON")
btc_symbol = Symbol("BTC-USD")
btc_id = InstrumentId(symbol=btc_symbol, venue=venue)
btc = Currency.from_str("BTC")
usd = Currency.from_str("USD")
return CurrencyPair(
instrument_id=btc_id,
raw_symbol=btc_symbol,
base_currency=btc,
quote_currency=usd,
price_precision=2,
size_precision=8,
price_increment=Price.from_str("0.01"),
size_increment=Quantity.from_str("0.00000001"),
maker_fee=Decimal("0.0"),
taker_fee=Decimal("0.0"),
margin_init=Decimal("0.0"),
margin_maint=Decimal("0.0"),
ts_event=0,
ts_init=0,
lot_size=Quantity.from_str("0.00000001"),
max_quantity=Quantity.from_str("1000.0"),
min_quantity=Quantity.from_str("0.00000001"),
max_notional=None,
min_notional=None,
max_price=None,
min_price=None,
tick_scheme_name=None,
info=None,
)
def verify_ticker_format(ticker: str) -> bool:
"""Verify that a ticker follows Polygon's format requirements."""
# Should be in format "X:BTC-USD"
if not ticker.startswith("X:"):
return False
if not ticker.endswith("-USD"):
return False
base = ticker[2:-4] # Extract base currency
if not base.isalpha():
return False
if "--" in ticker:
return False
return True
def test_bar_aggregation_from_historical_trades(btc_instrument, event_loop):
"""Test bar aggregation from historical trades."""
logger = unified_logger.get_component_logger(__name__)
# Setup components with TestClock instead of LiveClock
clock = TestClock()
msgbus = MessageBus(
trader_id=TraderId("TESTER-001"),
clock=clock,
)
cache = Cache(database=None)
portfolio = PortfolioFacade(msgbus)
# Create instrument provider
class TestProvider(InstrumentProvider):
def __init__(self):
self._instruments = {btc_instrument.id: btc_instrument}
def get(self, instrument_id: InstrumentId) -> Instrument | None:
return self._instruments.get(instrument_id)
def find(self, symbol: Symbol) -> Instrument | None:
for instrument in self._instruments.values():
if instrument.id.symbol == symbol:
return instrument
return None
def list_all(self) -> list[Instrument]:
return list(self._instruments.values())
def get_all(self) -> dict[InstrumentId, Instrument]:
return self._instruments.copy()
def count(self) -> int:
return len(self._instruments)
# Create PolygonCryptoData instance
if not settings.POLYGONIO_API_KEY:
pytest.skip("POLYGON_API_KEY environment variable not set")
polygon_data = PolygonCryptoData(
api_key=settings.POLYGONIO_API_KEY,
)
# Create PolygonCryptoClient
client = PolygonCryptoClient(
loop=event_loop, # Use the fixture's event loop
client_id=ClientId("POLYGON"),
msgbus=msgbus,
cache=cache,
clock=clock,
instrument_provider=TestProvider(),
api_key=settings.POLYGONIO_API_KEY,
)
client._polygon_data = polygon_data
# Add instrument to cache
cache.add_instrument(btc_instrument)
client.rebuild_instrument_mappings()
# Verify ticker format
ticker = client._instrument_id_to_ticker[btc_instrument.id]
assert verify_ticker_format(ticker), f"Invalid ticker format: {ticker}"
# Create and initialize strategy
config = HistoricalBarAggregationStrategyConfig(
strategy_id="HISTORICAL-BAR-TEST",
instrument_id=btc_instrument.id,
)
strategy = HistoricalBarAggregationStrategy(config)
# Register strategy
strategy.register(
trader_id=TraderId("TESTER-001"),
portfolio=portfolio,
msgbus=msgbus,
cache=cache,
clock=clock,
)
# Start strategy
strategy.start()
try:
# Request historical trades for last 2 minutes
end_time = datetime.now(ZoneInfo("UTC"))
start_time = end_time - timedelta(minutes=2)
# Set the test clock to start_time
clock.set_time(int(pd.Timestamp(start_time).timestamp() * 1_000_000_000))
# Create correlation ID for request
correlation_id = UUID4()
# Request historical trade ticks synchronously
client.request_trade_ticks(
instrument_id=btc_instrument.id,
limit=1000,
correlation_id=correlation_id,
start=pd.Timestamp(start_time),
end=pd.Timestamp(end_time),
)
# Instead of sleeping, advance the clock to end_time
clock.set_time(int(pd.Timestamp(end_time).timestamp() * 1_000_000_000))
# Process any events
events = clock.advance_time(
int(pd.Timestamp(end_time).timestamp() * 1_000_000_000)
)
for event in events:
event.handle()
# Verify we received trades
assert (
strategy._trades_received
), "No trades received from historical data request"
len_1s_bars = len(strategy.bars_1s)
len_5s_bars = len(strategy.bars_5s)
logger.info(
"Received {} 1s bars and {} 5s bars",
len_1s_bars,
len_5s_bars,
)
# Verify we got bars
assert len_1s_bars > 0, "No 1-second bars generated"
assert len_5s_bars > 0, "No 5-second bars generated"
# Verify bar counts
expected_1s_bars = 120 # 2 minutes = 120 seconds
expected_5s_bars = 24 # 2 minutes = 24 5-second bars
assert (
len_1s_bars >= expected_1s_bars * 0.8
), f"Expected at least 80% of {expected_1s_bars} 1-second bars, got {len_1s_bars}"
assert (
len_5s_bars >= expected_5s_bars * 0.8
), f"Expected at least 80% of {expected_5s_bars} 5-second bars, got {len_5s_bars}"
# Verify bar timestamps are continuous
for i in range(1, len_1s_bars):
assert (
strategy.bars_1s[i].ts_init - strategy.bars_1s[i - 1].ts_init
== 1_000_000_000
), "Gap detected in 1-second bars"
for i in range(1, len_5s_bars):
assert (
strategy.bars_5s[i].ts_init - strategy.bars_5s[i - 1].ts_init
== 5_000_000_000
), "Gap detected in 5-second bars"
# Log some sample bars
logger.info(
"Sample 1s bars: {}",
strategy.bars_1s[:3],
)
logger.info(
"Sample 5s bars: {}",
strategy.bars_5s[:3],
)
finally:
# Cleanup
try:
# Unsubscribe from trade ticks first
client.unsubscribe_trade_ticks(btc_instrument.id)
# Stop strategy
strategy.stop()
# Disconnect client
client.disconnect()
except Exception as e:
logger.error("Error during cleanup: {}", str(e))
tests/conftest.py:
@pytest.fixture
def event_loop_policy():
"""Override event loop policy for tests."""
policy = asyncio.get_event_loop_policy()
return policy
@pytest.fixture
def event_loop(event_loop_policy):
"""Create an instance of the default event loop for each test case."""
loop = event_loop_policy.new_event_loop()
yield loop
# Clean up
if loop.is_running():
loop.stop()
loop.close()
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
I am trying to write an integration test for bar aggregation using historical trades from Polygon.io.
I am getting a pytest discovery error and cannot figure out how to fix/proceed, even with the help of cursor IDE. Can someone please help me?
Objectives
Test Setup
Error
TypeError: cannot set 'pytest_asyncio_scoped_event_loop' attribute of immutable type 'nautilus_trader.common.component.TestClock'
Relevant Files
Beta Was this translation helpful? Give feedback.
All reactions