Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix submit_extrinsic timeout #2497

Merged
merged 9 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 56 additions & 55 deletions bittensor/core/extrinsics/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Module with helper functions for extrinsics."""

from concurrent.futures import ThreadPoolExecutor
import os
import threading
from typing import TYPE_CHECKING

Expand All @@ -12,14 +14,15 @@
from substrateinterface import SubstrateInterface, ExtrinsicReceipt
from scalecodec.types import GenericExtrinsic

try:
EXTRINSIC_SUBMISSION_TIMEOUT = float(os.getenv("EXTRINSIC_SUBMISSION_TIMEOUT", 200))
except ValueError:
raise ValueError(
"EXTRINSIC_SUBMISSION_TIMEOUT environment variable must be a float."
)

class _ThreadingTimeoutException(Exception):
"""
Exception raised for timeout. Different from TimeoutException because this also triggers
a websocket failure. This exception should only be used with `threading` timer..
"""

pass
if EXTRINSIC_SUBMISSION_TIMEOUT < 0:
raise ValueError("EXTRINSIC_SUBMISSION_TIMEOUT cannot be negative.")


def submit_extrinsic(
Expand Down Expand Up @@ -50,55 +53,53 @@ def submit_extrinsic(
extrinsic_hash = extrinsic.extrinsic_hash
starting_block = substrate.get_block()

def _handler():
"""
Timeout handler for threading. Will raise a TimeoutError if timeout is exceeded.
"""
logging.error("Timed out waiting for extrinsic submission.")
raise _ThreadingTimeoutException

# sets a timeout timer for the next call to 200 seconds
# will raise a _ThreadingTimeoutException if it reaches this point
timer = threading.Timer(200, _handler)

try:
timer.start()
response = substrate.submit_extrinsic(
extrinsic,
wait_for_inclusion=wait_for_inclusion,
wait_for_finalization=wait_for_finalization,
)
except SubstrateRequestException as e:
logging.error(format_error_message(e.args[0], substrate=substrate))
# Re-rise the exception for retrying of the extrinsic call. If we remove the retry logic, the raise will need
# to be removed.
raise

except _ThreadingTimeoutException:
after_timeout_block = substrate.get_block()

timeout = EXTRINSIC_SUBMISSION_TIMEOUT
event = threading.Event()

def submit():
try:
response_ = substrate.submit_extrinsic(
extrinsic,
wait_for_inclusion=wait_for_inclusion,
wait_for_finalization=wait_for_finalization,
)
except SubstrateRequestException as e:
logging.error(format_error_message(e.args[0], substrate=substrate))
# Re-raise the exception for retrying of the extrinsic call. If we remove the retry logic,
# the raise will need to be removed.
raise
finally:
event.set()
return response_

with ThreadPoolExecutor(max_workers=1) as executor:
response = None
for block_num in range(
starting_block["header"]["number"],
after_timeout_block["header"]["number"] + 1,
):
block_hash = substrate.get_block_hash(block_num)
try:
response = substrate.retrieve_extrinsic_by_hash(
block_hash, f"0x{extrinsic_hash.hex()}"
future = executor.submit(submit)
if not event.wait(timeout):
logging.error("Timed out waiting for extrinsic submission.")
after_timeout_block = substrate.get_block()

for block_num in range(
thewhaleking marked this conversation as resolved.
Show resolved Hide resolved
starting_block["header"]["number"],
after_timeout_block["header"]["number"] + 1,
):
block_hash = substrate.get_block_hash(block_num)
try:
response = substrate.retrieve_extrinsic_by_hash(
block_hash, f"0x{extrinsic_hash.hex()}"
)
except ExtrinsicNotFound:
continue
if response:
break
if response is None:
logging.error(
f"Extrinsic '0x{extrinsic_hash.hex()}' not submitted. "
f"Initially attempted to submit at block {starting_block['header']['number']}."
)
except ExtrinsicNotFound:
continue
if response:
break
finally:
timer.cancel()

if response is None:
logging.error(
f"Extrinsic '0x{extrinsic_hash.hex()}' not submitted. "
f"Initially attempted to submit at block {starting_block['header']['number']}."
)
raise SubstrateRequestException
raise SubstrateRequestException

else:
response = future.result()

return response
2 changes: 2 additions & 0 deletions bittensor/utils/networking.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ def get_formatted_ws_endpoint_url(endpoint_url: Optional[str]) -> Optional[str]:
def ensure_connected(func):
"""Decorator ensuring the function executes with an active substrate connection."""

# TODO we need to rethink the logic in this

def is_connected(substrate) -> bool:
"""Check if the substrate connection is active."""
sock = substrate.websocket.socket
Expand Down
113 changes: 113 additions & 0 deletions tests/unit_tests/extrinsics/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import time
from unittest.mock import MagicMock, patch
import importlib
import pytest
from substrateinterface.base import (
SubstrateInterface,
GenericExtrinsic,
SubstrateRequestException,
)

from bittensor.core.extrinsics import utils


@pytest.fixture
def set_extrinsics_timeout_env(monkeypatch):
monkeypatch.setenv("EXTRINSIC_SUBMISSION_TIMEOUT", "1")


def test_submit_extrinsic_timeout():
timeout = 1

def wait(extrinsic, wait_for_inclusion, wait_for_finalization):
time.sleep(timeout + 0.01)
return True

mock_substrate = MagicMock(autospec=SubstrateInterface)
mock_substrate.submit_extrinsic = wait
mock_extrinsic = MagicMock(autospec=GenericExtrinsic)
with patch.object(utils, "EXTRINSIC_SUBMISSION_TIMEOUT", timeout):
with pytest.raises(SubstrateRequestException):
utils.submit_extrinsic(mock_substrate, mock_extrinsic, True, True)


def test_submit_extrinsic_success():
mock_substrate = MagicMock(autospec=SubstrateInterface)
mock_substrate.submit_extrinsic.return_value = True
mock_extrinsic = MagicMock(autospec=GenericExtrinsic)
result = utils.submit_extrinsic(mock_substrate, mock_extrinsic, True, True)
assert result is True


def test_submit_extrinsic_timeout_env(set_extrinsics_timeout_env):
importlib.reload(utils)
timeout = utils.EXTRINSIC_SUBMISSION_TIMEOUT
assert timeout < 5 # should be less than 5 seconds as taken from test env var

def wait(extrinsic, wait_for_inclusion, wait_for_finalization):
time.sleep(timeout + 1)
return True

mock_substrate = MagicMock(autospec=SubstrateInterface)
mock_substrate.submit_extrinsic = wait
mock_extrinsic = MagicMock(autospec=GenericExtrinsic)
with pytest.raises(SubstrateRequestException):
utils.submit_extrinsic(mock_substrate, mock_extrinsic, True, True)


def test_submit_extrinsic_success_env(set_extrinsics_timeout_env):
importlib.reload(utils)
mock_substrate = MagicMock(autospec=SubstrateInterface)
mock_substrate.submit_extrinsic.return_value = True
mock_extrinsic = MagicMock(autospec=GenericExtrinsic)
result = utils.submit_extrinsic(mock_substrate, mock_extrinsic, True, True)
assert result is True


def test_submit_extrinsic_timeout_env_float(monkeypatch):
monkeypatch.setenv("EXTRINSIC_SUBMISSION_TIMEOUT", "1.45") # use float

importlib.reload(utils)
timeout = utils.EXTRINSIC_SUBMISSION_TIMEOUT

assert timeout == 1.45 # parsed correctly

def wait(extrinsic, wait_for_inclusion, wait_for_finalization):
time.sleep(timeout + 0.3) # sleep longer by float
return True

mock_substrate = MagicMock(autospec=SubstrateInterface)
mock_substrate.submit_extrinsic = wait
mock_extrinsic = MagicMock(autospec=GenericExtrinsic)
with pytest.raises(SubstrateRequestException):
utils.submit_extrinsic(mock_substrate, mock_extrinsic, True, True)


def test_import_timeout_env_parse(monkeypatch):
# int
monkeypatch.setenv("EXTRINSIC_SUBMISSION_TIMEOUT", "1")
importlib.reload(utils)
assert utils.EXTRINSIC_SUBMISSION_TIMEOUT == 1 # parsed correctly

# float
monkeypatch.setenv("EXTRINSIC_SUBMISSION_TIMEOUT", "1.45") # use float
importlib.reload(utils)
assert utils.EXTRINSIC_SUBMISSION_TIMEOUT == 1.45 # parsed correctly

# invalid
monkeypatch.setenv("EXTRINSIC_SUBMISSION_TIMEOUT", "not_an_int")
with pytest.raises(ValueError) as e:
importlib.reload(utils)
assert "must be a float" in str(e.value)

# negative
monkeypatch.setenv("EXTRINSIC_SUBMISSION_TIMEOUT", "-1")
with pytest.raises(ValueError) as e:
importlib.reload(utils)
assert "cannot be negative" in str(e.value)

# default (not checking exact value, just that it's a value)
monkeypatch.delenv("EXTRINSIC_SUBMISSION_TIMEOUT")
importlib.reload(utils)
assert isinstance(utils.EXTRINSIC_SUBMISSION_TIMEOUT, float) # has a default value
assert utils.EXTRINSIC_SUBMISSION_TIMEOUT > 0 # is positive
Loading