diff --git a/poetry.lock b/poetry.lock
index e56e30e..f208c8b 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -1,4 +1,4 @@
-# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
+# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
[[package]]
name = "aiohttp"
@@ -964,6 +964,24 @@ files = [
{file = "keylimiter-0.1.5.tar.gz", hash = "sha256:6655240c15d1aee6acf26bd66bb1ae6c5dfd014e287f9c9c563dcfeacfc8fc81"},
]
+[[package]]
+name = "loguru"
+version = "0.7.2"
+description = "Python logging made (stupidly) simple"
+optional = false
+python-versions = ">=3.5"
+files = [
+ {file = "loguru-0.7.2-py3-none-any.whl", hash = "sha256:003d71e3d3ed35f0f8984898359d65b79e5b21943f78af86aa5491210429b8eb"},
+ {file = "loguru-0.7.2.tar.gz", hash = "sha256:e671a53522515f34fd406340ee968cb9ecafbc4b36c679da03c18fd8d0bd51ac"},
+]
+
+[package.dependencies]
+colorama = {version = ">=0.3.4", markers = "sys_platform == \"win32\""}
+win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""}
+
+[package.extras]
+dev = ["Sphinx (==7.2.5)", "colorama (==0.4.5)", "colorama (==0.4.6)", "exceptiongroup (==1.1.3)", "freezegun (==1.1.0)", "freezegun (==1.2.2)", "mypy (==v0.910)", "mypy (==v0.971)", "mypy (==v1.4.1)", "mypy (==v1.5.1)", "pre-commit (==3.4.0)", "pytest (==6.1.2)", "pytest (==7.4.0)", "pytest-cov (==2.12.1)", "pytest-cov (==4.1.0)", "pytest-mypy-plugins (==1.9.3)", "pytest-mypy-plugins (==3.0.0)", "sphinx-autobuild (==2021.3.14)", "sphinx-rtd-theme (==1.3.0)", "tox (==3.27.1)", "tox (==4.11.0)"]
+
[[package]]
name = "markdown-it-py"
version = "3.0.0"
@@ -1369,6 +1387,12 @@ files = [
{file = "py_bip39_bindings-0.1.11-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:4ee776f3b33b2d71fee48679951f117e3d1f052449ec2fcb184f3c64a4c77e4f"},
{file = "py_bip39_bindings-0.1.11-cp311-none-win32.whl", hash = "sha256:d8b722e49562810f94eb61c9efa172f327537c74c37da3e86b161f7f444c51bf"},
{file = "py_bip39_bindings-0.1.11-cp311-none-win_amd64.whl", hash = "sha256:be934052497f07605768e2c7184e4f4269b3e2e77930131dfc9bdbb791e6fdf4"},
+ {file = "py_bip39_bindings-0.1.11-cp312-cp312-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:afa9c5762cfaec01141f478a9c3132de01ec3890ff2e5a4013c79d3ba3aff8bb"},
+ {file = "py_bip39_bindings-0.1.11-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:a3af7c1f955c6bbd613c6b38d022f7c73896acaf0ecc972ac0dee4b952e14568"},
+ {file = "py_bip39_bindings-0.1.11-cp312-cp312-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:6aed3e86f105a36676e8dd0c8bc3f611a81b7ba4309b22a77fdc0f63b260e094"},
+ {file = "py_bip39_bindings-0.1.11-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ae120b5542fecf97aa3fdb6a526bac1004cb641bc9cc0d0030c6735dc2156072"},
+ {file = "py_bip39_bindings-0.1.11-cp312-none-win32.whl", hash = "sha256:92abce265b0f2d8c5830441aff06b7b4f9426088a3de39624b12f3f9ff9fc2eb"},
+ {file = "py_bip39_bindings-0.1.11-cp312-none-win_amd64.whl", hash = "sha256:6794187229eb0b04d0770f0fba936f0c5c598f552848a398ed5af9a61638cacb"},
{file = "py_bip39_bindings-0.1.11-cp37-cp37m-macosx_10_7_x86_64.whl", hash = "sha256:76fc141ed154ccef9c36d5e2eb615565f2e272a43ed56edbdda538840b597187"},
{file = "py_bip39_bindings-0.1.11-cp37-cp37m-macosx_11_0_arm64.whl", hash = "sha256:3837f7040e732f7be49da5f595f147de2304e92a67267b12d5aa08a9bb02dd4b"},
{file = "py_bip39_bindings-0.1.11-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:82de90eabe531095d4e4721ea1546873f0161c101c30b43dcf0a7bbd9cdcce69"},
@@ -1428,6 +1452,11 @@ files = [
{file = "py_ed25519_zebra_bindings-1.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:2e715341b4927f6735ed7113644c0a5362310df4ddad1f938b5040c85884db15"},
{file = "py_ed25519_zebra_bindings-1.0.1-cp311-none-win32.whl", hash = "sha256:21498379d5e85d97a9633b7cf6362b4d187c7575ab8633c3ba6c99b1dcb83358"},
{file = "py_ed25519_zebra_bindings-1.0.1-cp311-none-win_amd64.whl", hash = "sha256:58e7d56a6f565fc044d313ec429b782150366a39ada973051dde60f1363abd9b"},
+ {file = "py_ed25519_zebra_bindings-1.0.1-cp312-cp312-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:e90d55655ba2837cba691cbcf7f14d3146f07716bc5e60a4032d1a933672c261"},
+ {file = "py_ed25519_zebra_bindings-1.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ffcf39d48380454c132a38ff6172045035067848beb8ea619465ab389ff46717"},
+ {file = "py_ed25519_zebra_bindings-1.0.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:c2d9aa959c0d84cbf741f1b0605f49468584226401ffc0f60f0ab8752ca3e255"},
+ {file = "py_ed25519_zebra_bindings-1.0.1-cp312-none-win32.whl", hash = "sha256:ca83b898458a701d31749a56c6c441f8f1bdb451f70df00eace68abf34dbb90c"},
+ {file = "py_ed25519_zebra_bindings-1.0.1-cp312-none-win_amd64.whl", hash = "sha256:68ae9d36ff0062b2cc90f47e19a364590151dac78a0c36a72786078051a3990e"},
{file = "py_ed25519_zebra_bindings-1.0.1-cp36-cp36m-macosx_10_7_x86_64.whl", hash = "sha256:ac9d378114ce16420f66fd990ff09156f1d056b993a6076edeae4f866f5fd67f"},
{file = "py_ed25519_zebra_bindings-1.0.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d5e884b347d4a8d821327bafbdfcc19b2c8997d6a78704db15ef1db33baea4c1"},
{file = "py_ed25519_zebra_bindings-1.0.1-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:3fcbe7677a8ba0888df9ce882402e13153b23dfdb97c8d0ec4e2ebd41d7c6b69"},
@@ -1497,6 +1526,12 @@ files = [
{file = "py_sr25519_bindings-0.2.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:7b56b5cbbfb36b41ddfa462989a03386590ac036f3a755ef64fffeb2fed88654"},
{file = "py_sr25519_bindings-0.2.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:8f06ea3237e06666e3a4ff4719b4fba415472943831b229428753c37d5ecd1b4"},
{file = "py_sr25519_bindings-0.2.0-cp311-none-win_amd64.whl", hash = "sha256:d62af30b2022f5fa787e46c06823c35a21abe791bf55012f498f9ba8e4baabc8"},
+ {file = "py_sr25519_bindings-0.2.0-cp312-cp312-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:ceafa0c31b49f2128461eb2c6ea18dc5d0bfae7218a100be7153f271e46bac49"},
+ {file = "py_sr25519_bindings-0.2.0-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:c8dedb8525556591738a64310875df70ea67886e5a40f2799bd96ef8848936cf"},
+ {file = "py_sr25519_bindings-0.2.0-cp312-cp312-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:ce149796923696f5cfc6263f135674a14fe2d513fd35b2bfa73226b940aff648"},
+ {file = "py_sr25519_bindings-0.2.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fe8e20ee0856e8a60682566df955b81e7631670136607da627ab6892df34790d"},
+ {file = "py_sr25519_bindings-0.2.0-cp312-none-win32.whl", hash = "sha256:92382456c6f176c07e0d554c71d483853387885ce17714f8a4b50fdcf7552297"},
+ {file = "py_sr25519_bindings-0.2.0-cp312-none-win_amd64.whl", hash = "sha256:48ee4e14a77f815f3996beecb7d7abf422b756e9163ee4df739c1aded8a3e8ba"},
{file = "py_sr25519_bindings-0.2.0-cp36-cp36m-macosx_10_7_x86_64.whl", hash = "sha256:c3de899a1e911b8945f09e6389f8d2df68924c12c78e3e66fedb15f1e4ff56ad"},
{file = "py_sr25519_bindings-0.2.0-cp36-cp36m-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:758761b605f90e4238304df7520155a3358b13cc55ee18c5113632da17343163"},
{file = "py_sr25519_bindings-0.2.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1f63580a224607e68b861eb03421465091c3104b6309e5fca7448f5aa6dbda60"},
@@ -2112,6 +2147,20 @@ docs = ["Sphinx (>=6.0)", "sphinx-rtd-theme (>=1.1.0)"]
optional = ["python-socks", "wsaccel"]
test = ["websockets"]
+[[package]]
+name = "win32-setctime"
+version = "1.1.0"
+description = "A small Python utility to set file creation time on Windows"
+optional = false
+python-versions = ">=3.5"
+files = [
+ {file = "win32_setctime-1.1.0-py3-none-any.whl", hash = "sha256:231db239e959c2fe7eb1d7dc129f11172354f98361c4fa2d6d2d7e278baa8aad"},
+ {file = "win32_setctime-1.1.0.tar.gz", hash = "sha256:15cf5750465118d6929ae4de4eb46e8edae9a5634350c01ba582df868e932cb2"},
+]
+
+[package.extras]
+dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"]
+
[[package]]
name = "xxhash"
version = "3.4.1"
@@ -2335,4 +2384,4 @@ multidict = ">=4.0"
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<4.0"
-content-hash = "9317d80298d25997c1ef7fe9a233030de5ab31dd430ee7b9ac1c85e278c2ecd4"
+content-hash = "4c0a97d5166050becc637ccdcab1410c65291051604e702c156a6a60ce64b4e7"
diff --git a/pyproject.toml b/pyproject.toml
index 8e6583d..045f076 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -29,6 +29,7 @@ fastapi = "^0.110.0"
starlette = "^0.36.3"
uvicorn = "^0.29.0"
keylimiter = "^0.1.5"
+loguru = "^0.7.2"
[tool.poetry.group.dev]
diff --git a/src/communex/cli/module.py b/src/communex/cli/module.py
index 8595321..bd6e623 100644
--- a/src/communex/cli/module.py
+++ b/src/communex/cli/module.py
@@ -7,7 +7,7 @@
import communex.balance as c_balance
from communex.compat.key import classic_load_key
-from communex.errors import ChainTransactionError
+from communex.errors import ChainTransactionError, InvalidIPError, InvalidClassError, InvalidModuleError
from communex.misc import get_map_modules
from communex.module.server import ModuleServer
from communex.util import is_ip_valid
@@ -45,7 +45,7 @@ def register(
resolved_key = classic_load_key(key)
if not is_ip_valid(ip):
- raise ValueError("Invalid ip address")
+ raise InvalidIPError("Invalid ip address")
address = f"{ip}:{port}"
subnet = client.get_name(netuid)
@@ -70,7 +70,7 @@ def update(key: str, name: str, ip: str, port: int, delegation_fee: int = 20, ne
client = make_client()
if not is_ip_valid(ip):
- raise ValueError("Invalid ip address")
+ raise InvalidIPError("Invalid ip address")
address = f"{ip}:{port}"
@@ -107,24 +107,24 @@ def serve(
module_path = ".".join(module_parts)
if not module_path:
# This could do some kind of relative import somehow?
- raise ValueError(f"Invalid class path: `{class_path}`, module name is missing")
+ raise InvalidModuleError(f"Invalid class path: `{class_path}`, module name is missing")
if not class_name:
- raise ValueError(f"Invalid class path: `{class_path}`, class name is missing")
+ raise InvalidClassError(f"Invalid class path: `{class_path}`, class name is missing")
case _:
# This is impossible
- raise Exception(f"Invalid class path: `{class_path}`")
+ raise TypeError(f"Invalid class path: `{class_path}`")
try:
module = importlib.import_module(module_path)
- except ModuleNotFoundError:
+ except ModuleNotFoundError as e:
context.error(f"Module `{module_path}` not found")
- raise typer.Exit(code=1)
+ raise typer.Exit(code=1) from e
try:
class_obj = getattr(module, class_name)
- except AttributeError:
+ except AttributeError as e:
context.error(f"Class `{class_name}` not found in module `{module}`")
- raise typer.Exit(code=1)
+ raise typer.Exit(code=1) from e
keypair = classic_load_key(key)
server = ModuleServer(class_obj(), keypair, whitelist=whitelist,
@@ -150,7 +150,7 @@ def info(name: str, balance: bool = False, netuid: int = 0):
module = next((item for item in modules_to_list if item["name"] == name), None)
if module is None:
- raise ValueError("Module not found")
+ raise InvalidModuleError("Module not found")
general_module = cast(dict[str, Any], module)
print_table_from_plain_dict(general_module, ["Params", "Values"], console)
diff --git a/src/communex/cli/network.py b/src/communex/cli/network.py
index 3392f46..c4333e5 100644
--- a/src/communex/cli/network.py
+++ b/src/communex/cli/network.py
@@ -91,19 +91,21 @@ def propose_globally(
resolved_key = classic_load_key(key)
- proposal: NetworkParams = {"max_allowed_subnets": max_allowed_subnets,
- "max_allowed_modules": max_allowed_modules,
- "max_registrations_per_block": max_registrations_per_block,
- "unit_emission": unit_emission,
- "tx_rate_limit": tx_rate_limit,
- "vote_threshold": vote_threshold,
- "vote_mode": vote_mode,
- "max_proposals": max_proposals,
- "max_name_length": max_name_length,
- "burn_rate": burn_rate,
- "min_burn": min_burn,
- "min_stake": min_stake,
- "min_weight_stake": min_weight_stake}
+ proposal = NetworkParams(
+ max_allowed_subnets=max_allowed_subnets,
+ max_allowed_modules=max_allowed_modules,
+ max_registrations_per_block=max_registrations_per_block,
+ unit_emission=unit_emission,
+ tx_rate_limit=tx_rate_limit,
+ vote_threshold=vote_threshold,
+ vote_mode=vote_mode,
+ max_proposals=max_proposals,
+ max_name_length=max_name_length,
+ burn_rate=burn_rate,
+ min_burn=min_burn,
+ min_stake=min_stake,
+ min_weight_stake=min_weight_stake
+ )
with console.status("Adding a proposal..."):
client.add_global_proposal(resolved_key, proposal)
diff --git a/src/communex/client.py b/src/communex/client.py
index 7971f14..5b8c001 100644
--- a/src/communex/client.py
+++ b/src/communex/client.py
@@ -4,17 +4,15 @@
from contextlib import contextmanager
from copy import deepcopy
from dataclasses import dataclass
-from typing import Any, TypeVar, Mapping
+from typing import Any, TypeVar, Mapping, Union, Dict
from substrateinterface import (ExtrinsicReceipt, Keypair, # type: ignore
SubstrateInterface)
from substrateinterface.storage import StorageKey # type: ignore
-from communex.errors import ChainTransactionError, NetworkQueryError
+from communex.errors import ChainTransactionError, NetworkQueryError, QueryError, QueueEmptyError, SubstrateRequestException, InsufficientBalanceError, InsufficientStakeError, InvalidKeyFormatError, InvalidParameterError, AuthorizationError, MismatchedLengthError, InvalidProposalIDError
from communex.types import NetworkParams, Ss58Address, SubnetParams
-# TODO: InsufficientBalanceError, MismatchedLengthError etc
-
MAX_REQUEST_SIZE = 9_000_000
@@ -62,13 +60,16 @@ def __init__(
url: The URL of the network node to connect to.
num_connections: The number of websocket connections to be opened.
"""
- assert num_connections > 0
- self._num_connections = num_connections
- self.wait_for_finalization = wait_for_finalization
- self._connection_queue = queue.Queue(num_connections)
-
- for _ in range(num_connections):
- self._connection_queue.put(SubstrateInterface(url))
+ try:
+ assert num_connections > 0
+ self._num_connections = num_connections
+ self.wait_for_finalization = wait_for_finalization
+ self._connection_queue = queue.Queue(num_connections)
+ for _ in range(num_connections):
+ self._connection_queue.put(SubstrateInterface(url))
+ except AssertionError as e:
+ raise AssertionError("Maximum connections must be greater than 0") from e
+
@property
def connections(self) -> int:
@@ -102,6 +103,9 @@ def get_conn(self, timeout: float | None = None, init: bool = False):
conn.init_runtime() # type: ignore
try:
yield conn
+ except QueueEmptyError as e:
+ self._connection_queue.put(conn)
+ raise QueryError("No connection available") from e
finally:
self._connection_queue.put(conn)
@@ -203,13 +207,13 @@ def _send_batch(
if extract_result:
try:
results.append(message['result'])
- except Exception:
+ except NetworkQueryError as e:
raise (RuntimeError(
- f"Error extracting result from message: {message}"))
+ f"Error extracting result from message: {message}")) from e
else:
results.append(message)
if 'error' in message:
- raise NetworkQueryError(message['error'])
+ raise QueryError(message['error'])
return results
@@ -425,7 +429,7 @@ def _decode_response(
function_parameters: list[tuple[Any, Any, Any, Any, str]],
prefix_list: list[Any],
block_hash: str,
- ) -> dict[str, dict[Any, Any]]:
+ ) -> Union[Dict[str, dict[Any, Any]], None]:
"""
Decodes a response from the substrate interface and organizes the data into a dictionary.
@@ -455,8 +459,10 @@ def _decode_response(
)
{'storage_function_name': {decoded_key: decoded_value, ...}, ...}
"""
+ if len(response) == 0:
+ return {'storage_function_name': {decoded_key: decoded_value} for decoded_key, decoded_value in zip(response, function_parameters)}
- def concat_hash_len(key_hasher: str) -> int:
+ def concat_hash_len(key_hasher: str) -> Union[Dict[Any, Any], int, None]:
"""
Determines the length of the hash based on the given key hasher type.
@@ -467,71 +473,78 @@ def concat_hash_len(key_hasher: str) -> int:
The length of the hash corresponding to the given key hasher type.
Raises:
- ValueError: If the key hasher type is not supported.
+ InvalidKeyFormatError: If the key hasher type is not supported.
Example:
>>> concat_hash_len("Blake2_128Concat")
16
"""
-
- if key_hasher == "Blake2_128Concat":
- return 16
- elif key_hasher == "Twox64Concat":
- return 8
- elif key_hasher == "Identity":
- return 0
- else:
- raise ValueError('Unsupported hash type')
-
- assert len(response) == len(function_parameters) == len(prefix_list)
- result_dict: dict[str, dict[Any, Any]] = {}
- for res, fun_params_tuple, prefix in zip(
- response, function_parameters, prefix_list
- ):
- if not res:
- return {}
- res = res[0]
- changes = res["changes"] # type: ignore
- value_type, param_types, key_hashers, params, storage_function = fun_params_tuple
- with self.get_conn(init=True) as substrate:
- for item in changes:
- # Determine type string
+ try:
+ if key_hasher == "Blake2_128Concat":
+ return 16
+ elif key_hasher == "Twox64Concat":
+ return 8
+ elif key_hasher == "Identity":
+ return 0
+ except ValueError as e:
+ raise InvalidKeyFormatError('Unsupported hash type') from e
+ try:
+ assert len(response) == len(function_parameters) == len(prefix_list)
+ result_dict: dict[str, dict[Any, Any]] = {}
+ for res, fun_params_tuple, prefix in zip(
+ response, function_parameters, prefix_list
+ ):
+ item = []
key_type_string: list[Any] = []
- for n in range(len(params), len(param_types)):
- key_type_string.append(f'[u8; {concat_hash_len(key_hashers[n])}]')
- key_type_string.append(param_types[n])
-
- item_key_obj = substrate.decode_scale( # type: ignore
+ if not res:
+ return {}
+ res = res[0]
+ changes = res["changes"] # type: ignore
+ value_type, param_types, key_hashers, params, storage_function = fun_params_tuple
+ with self.get_conn(init=True) as substrate:
+ for item in changes:
+ # Determine type string
+ key_type_string: list[Any] = []
+ for n in range(len(params), len(param_types)):
+ key_type_string.append(f'[u8; {concat_hash_len(key_hashers[n])}]')
+ key_type_string.append(param_types[n])
+
+ item_key_obj = substrate.decode_scale( # type: ignore
type_string=f"({', '.join(key_type_string)})",
scale_bytes='0x' + item[0][len(prefix):],
return_scale_obj=True,
block_hash=block_hash
- )
- # strip key_hashers to use as item key
- if len(param_types) - len(params) == 1:
- item_key = item_key_obj.value_object[1] # type: ignore
- else:
- item_key = tuple( # type: ignore
- item_key_obj.value_object[key + 1] for key in range( # type: ignore
- len(params), len(param_types) + 1, 2
- )
)
+ item_key = None
+ # strip key_hashers to use as item key
+ if len(param_types) - len(params) == 1:
+ item_key = item_key_obj.value_object[1] # type: ignore
+ else:
+ item_key = tuple( # type: ignore
+ item_key_obj.value_object[key + 1] for key in range( # type: ignore
+ len(params), len(param_types) + 1, 2
+ )
+ )
- item_value = substrate.decode_scale( # type: ignore
- type_string=value_type,
- scale_bytes=item[1],
- return_scale_obj=True,
- block_hash=block_hash
- )
- result_dict.setdefault(storage_function, {})
- result_dict[storage_function][item_key.value] = item_value.value # type: ignore
- return result_dict
+
+ item_value = substrate.decode_scale( # type: ignore
+ type_string=value_type,
+ scale_bytes=item[1],
+ return_scale_obj=True,
+ block_hash=block_hash
+ )
+ result_dict.setdefault(storage_function, {})
+ result_dict[storage_function][item_key[1]] = item_value.value # type: ignore
+
+ except Exception as e:
+ raise InvalidKeyFormatError from e
+ return result_dict
def query_batch(
self,
functions: dict[str, list[tuple[str, list[Any]]]]
- ) -> dict[str, str]:
+ ) -> Union[Dict[Any, Any], None]:
"""
Executes batch queries on a substrate and returns results in a dictionary format.
@@ -550,31 +563,31 @@ def query_batch(
{'function_name': 'query_result', ...}
"""
- result = None
- with self.get_conn(init=True) as substrate:
- for module, queries in functions.items():
- storage_keys: list[Any] = []
- for fn, params in queries:
- storage_function = substrate.create_storage_key( # type: ignore
- pallet=module, storage_function=fn, params=params)
- storage_keys.append(storage_function)
-
- block_hash = substrate.get_block_hash()
- responses: list[Any] = substrate.query_multi( # type: ignore
- storage_keys=storage_keys, block_hash=block_hash)
-
- result: dict[str, str] | None = {}
-
- for item in responses:
- fun = item[0]
- query = item[1]
- storage_fun = fun.storage_function
- result[storage_fun] = query.value
-
- if result is None:
- raise Exception("No result")
-
- return result
+ try:
+ result = None
+ with self.get_conn(init=True) as substrate:
+ for module, queries in functions.items():
+ storage_keys: list[Any] = []
+ for fn, params in queries:
+ storage_function = substrate.create_storage_key( # type: ignore
+ pallet=module, storage_function=fn, params=params)
+ storage_keys.append(storage_function)
+
+ block_hash = substrate.get_block_hash()
+ responses: list[Any] = substrate.query_multi( # type: ignore
+ storage_keys=storage_keys, block_hash=block_hash)
+
+ result: dict[str, str] | None = {}
+
+ for item in responses:
+ fun = item[0]
+ query = item[1]
+ storage_fun = fun.storage_function
+ result[storage_fun] = query.value
+ return result
+ except Exception as e:
+ raise QueryError from e
+
def query_batch_map(
self,
@@ -595,18 +608,21 @@ def query_batch_map(
>>> query_batch_map(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]})
# Returns the combined result of the map batch query
"""
- multi_result: dict[str, dict[Any, Any]] = {}
-
- def recursive_update(
- d: dict[str, dict[T1, T2] | dict[str, Any]],
- u: Mapping[str, dict[Any, Any] | str]
- ) -> dict[str, dict[T1, T2]]:
- for k, v in u.items():
- if isinstance(v, dict):
- d[k] = recursive_update(d.get(k, {}), v) # type: ignore
- else:
- d[k] = v # type: ignore
- return d # type: ignore
+ try:
+ multi_result: dict[str, dict[Any, Any]] = {}
+
+ def recursive_update(
+ d: dict[str, dict[T1, T2] | dict[str, Any]],
+ u: Mapping[str, dict[Any, Any] | str]
+ ) -> dict[str, dict[T1, T2]]:
+ for k, v in u.items():
+ if isinstance(v, dict):
+ d[k] = recursive_update(d.get(k, {}), v) # type: ignore
+ else:
+ d[k] = v # type: ignore
+ return d # type: ignore
+ except Exception as e:
+ raise QueryError from e
def get_page():
send, prefix_list = self._get_storage_keys(storage, queries, block_hash)
@@ -645,6 +661,8 @@ def get_page():
chunk_info.prefix_list,
block_hash
)
+ if not storage_result:
+ continue
multi_result = recursive_update(multi_result, storage_result)
return multi_result
@@ -652,9 +670,9 @@ def get_page():
def query(
self,
name: str,
- params: list[Any] = [],
+ params: list[Any],
module: str = 'SubspaceModule',
- ) -> Any:
+ ) -> Union[Dict[Any, Any], None]:
"""
Queries a storage function on the network.
@@ -672,15 +690,20 @@ def query(
Raises:
NetworkQueryError: If the query fails or is invalid.
"""
-
- result = self.query_batch({module: [(name, params)]})
+ result = {}
+ try:
+ result = self.query_batch({module: [(name, params)]})
+ except QueryError as e:
+ raise NetworkQueryError(e) from e
+ if not result:
+ return None
return result[name]
def query_map(
self,
name: str,
- params: list[Any] = [],
+ params: list[Any],
module: str = 'SubspaceModule',
extract_value: bool = True,
) -> dict[Any, Any]:
@@ -699,11 +722,13 @@ def query_map(
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query_batch_map({module: [(name, params)]})
- result = self.query_batch_map({module: [(name, params)]})
-
- if extract_value:
- return {k.value: v.value for k, v in result} # type: ignore
+ if extract_value:
+ return {k.value: v.value for k, v in result} # type: ignore
+ except QueryError as e:
+ raise QueryError(e) from e
return result
@@ -742,38 +767,40 @@ def compose_call(
Raises:
ChainTransactionError: If the transaction fails.
"""
+ try:
+ with self.get_conn() as substrate:
+ if wait_for_finalization is None:
+ wait_for_finalization = self.wait_for_finalization
- with self.get_conn() as substrate:
- if wait_for_finalization is None:
- wait_for_finalization = self.wait_for_finalization
-
- call = substrate.compose_call( # type: ignore
- call_module=module,
- call_function=fn,
- call_params=params
- )
- if sudo:
call = substrate.compose_call( # type: ignore
- call_module='Sudo',
- call_function='sudo',
- call_params={
- 'call': call.value, # type: ignore
- }
+ call_module=module,
+ call_function=fn,
+ call_params=params
)
+ if sudo:
+ call = substrate.compose_call( # type: ignore
+ call_module='Sudo',
+ call_function='sudo',
+ call_params={
+ 'call': call.value, # type: ignore
+ }
+ )
- extrinsic = substrate.create_signed_extrinsic( # type: ignore
- call=call, keypair=key # type: ignore
- ) # type: ignore
- response = substrate.submit_extrinsic(
- extrinsic=extrinsic,
- wait_for_inclusion=wait_for_inclusion,
- wait_for_finalization=wait_for_finalization,
- )
- if wait_for_inclusion:
- if not response.is_success:
- raise ChainTransactionError(
- response.error_message, response # type: ignore
+ extrinsic = substrate.create_signed_extrinsic( # type: ignore
+ call=call, keypair=key # type: ignore
+ ) # type: ignore
+ response = substrate.submit_extrinsic(
+ extrinsic=extrinsic,
+ wait_for_inclusion=wait_for_inclusion,
+ wait_for_finalization=wait_for_finalization,
)
+ if wait_for_inclusion:
+ if not response.is_success:
+ raise ChainTransactionError(
+ response.error_message, response # type: ignore
+ )
+ except SubstrateRequestException as e:
+ raise ChainTransactionError(e) from e
return response
@@ -824,43 +851,43 @@ def compose_call_multisig(self,
Raises:
ChainTransactionError: If the transaction fails.
"""
+ try:
+ # getting the call ready
+ with self.get_conn() as substrate:
+ if wait_for_finalization is None:
+ wait_for_finalization = self.wait_for_finalization
- # getting the call ready
- with self.get_conn() as substrate:
- if wait_for_finalization is None:
- wait_for_finalization = self.wait_for_finalization
-
- # prepares the `GenericCall` object
- call = substrate.compose_call( # type: ignore
- call_module=module,
- call_function=fn,
- call_params=params
- )
- if sudo:
+ # prepares the `GenericCall` object
call = substrate.compose_call( # type: ignore
- call_module='Sudo',
- call_function='sudo',
- call_params={
- 'call': call.value, # type: ignore
- }
+ call_module=module,
+ call_function=fn,
+ call_params=params
)
+ if sudo:
+ call = substrate.compose_call( # type: ignore
+ call_module='Sudo',
+ call_function='sudo',
+ call_params={
+ 'call': call.value, # type: ignore
+ }
+ )
- # modify the rpc methods at runtime, to allow for correct payment
- # fee calculation parity has a bug in this version,
- # where the method has to be removed
- rpc_methods = substrate.config.get('rpc_methods') # type: ignore
+ # modify the rpc methods at runtime, to allow for correct payment
+ # fee calculation parity has a bug in this version,
+ # where the method has to be removed
+ rpc_methods = substrate.config.get('rpc_methods') # type: ignore
- if "state_call" in rpc_methods: # type: ignore
- rpc_methods.remove("state_call") # type: ignore
+ if "state_call" in rpc_methods: # type: ignore
+ rpc_methods.remove("state_call") # type: ignore
- # create the multisig account
- multisig_acc = (substrate.generate_multisig_account( # type: ignore
- signatories, threshold))
+ # create the multisig account
+ multisig_acc = (substrate.generate_multisig_account( # type: ignore
+ signatories, threshold))
- # send the multisig extrinsic
- extrinsic = substrate.create_multisig_extrinsic( # type: ignore
- call=call, keypair=key, multisig_account=multisig_acc, # type: ignore
- era=era) # type: ignore
+ # send the multisig extrinsic
+ extrinsic = substrate.create_multisig_extrinsic( # type: ignore
+ call=call, keypair=key, multisig_account=multisig_acc, # type: ignore
+ era=era) # type: ignore
response = substrate.submit_extrinsic(
extrinsic=extrinsic,
@@ -868,11 +895,13 @@ def compose_call_multisig(self,
wait_for_finalization=wait_for_finalization,
)
- if wait_for_inclusion:
- if not response.is_success:
- raise ChainTransactionError(
- response.error_message, response # type: ignore
- )
+ if wait_for_inclusion:
+ if not response.is_success:
+ raise ChainTransactionError(
+ response.error_message, response # type: ignore
+ )
+ except SubstrateRequestException as e:
+ raise ChainTransactionError(e) from e
return response
@@ -899,17 +928,19 @@ def transfer(
enough balance.
ChainTransactionError: If the transaction fails.
"""
+ try:
+ amount = amount - self.get_existential_deposit()
- amount = amount - self.get_existential_deposit()
-
- params = {'dest': dest, 'value': amount}
+ params = {'dest': dest, 'value': amount}
- return self.compose_call(
- module='Balances',
- fn='transfer',
- params=params,
- key=key
- )
+ return self.compose_call(
+ module='Balances',
+ fn='transfer',
+ params=params,
+ key=key
+ )
+ except SubstrateRequestException as e:
+ raise ChainTransactionError(e) from e
def transfer_multiple(
self,
@@ -938,25 +969,27 @@ def transfer_multiple(
enough balance for all transfers.
ChainTransactionError: If the transaction fails.
"""
-
- assert len(destinations) == len(amounts)
-
- # extract existential deposit from amounts
- existential_deposit = self.get_existential_deposit()
- amounts = [a - existential_deposit for a in amounts]
-
- params = {
- "netuid": netuid,
- "destinations": destinations,
- "amounts": amounts,
- }
-
- return self.compose_call(
- module='SubspaceModule',
- fn='transfer_multiple',
- params=params,
- key=key
- )
+ try:
+ assert len(destinations) == len(amounts)
+
+ # extract existential deposit from amounts
+ existential_deposit = self.get_existential_deposit()
+ amounts = [a - existential_deposit for a in amounts]
+
+ params = {
+ "netuid": netuid,
+ "destinations": destinations,
+ "amounts": amounts,
+ }
+
+ return self.compose_call(
+ module='SubspaceModule',
+ fn='transfer_multiple',
+ params=params,
+ key=key
+ )
+ except SubstrateRequestException as e:
+ raise ChainTransactionError(e) from e
def stake(
self,
@@ -982,14 +1015,16 @@ def stake(
enough balance.
ChainTransactionError: If the transaction fails.
"""
+ try:
+ amount = amount - self.get_existential_deposit()
- amount = amount - self.get_existential_deposit()
-
- params = {
- 'amount': amount,
- 'netuid': netuid,
- 'module_key': dest
- }
+ params = {
+ 'amount': amount,
+ 'netuid': netuid,
+ 'module_key': dest
+ }
+ except InsufficientBalanceError as e:
+ raise ChainTransactionError(e) from e
return self.compose_call(fn='add_stake', params=params, key=key)
@@ -1017,15 +1052,17 @@ def unstake(
staked tokens by the signer key.
ChainTransactionError: If the transaction fails.
"""
+ try:
+ amount = amount - self.get_existential_deposit()
- amount = amount - self.get_existential_deposit()
-
- params = {
- 'amount': amount,
- 'netuid': netuid,
- 'module_key': dest
- }
- return self.compose_call(fn='remove_stake', params=params, key=key)
+ params = {
+ 'amount': amount,
+ 'netuid': netuid,
+ 'module_key': dest
+ }
+ return self.compose_call(fn='remove_stake', params=params, key=key)
+ except InsufficientStakeError as e:
+ raise ChainTransactionError(e) from e
def update_module(
self,
@@ -1056,22 +1093,23 @@ def update_module(
InvalidParameterError: If the provided parameters are invalid.
ChainTransactionError: If the transaction fails.
"""
-
- assert isinstance(delegation_fee, int)
-
- if not name:
- name = ''
- if not address:
- address = ''
- params = {
- 'netuid': netuid,
- 'name': name,
- 'address': address,
- 'delegation_fee': delegation_fee
- }
-
- response = self.compose_call('update_module', params=params, key=key)
-
+ try:
+ assert isinstance(delegation_fee, int)
+
+ if not name:
+ name = ''
+ if not address:
+ address = ''
+ params = {
+ 'netuid': netuid,
+ 'name': name,
+ 'address': address,
+ 'delegation_fee': delegation_fee
+ }
+
+ response = self.compose_call('update_module', params=params, key=key)
+ except InvalidParameterError as e:
+ raise ChainTransactionError(e) from e
return response
def register_module(
@@ -1102,20 +1140,24 @@ def register_module(
InvalidParameterError: If the provided parameters are invalid.
ChainTransactionError: If the transaction fails.
"""
+ try:
- stake = self.get_min_stake() if min_stake is None else min_stake
+ stake = self.get_min_stake() if min_stake is None else min_stake
- key_addr = key.ss58_address
+ key_addr = key.ss58_address
- params = {
- 'network': subnet,
- 'address': address,
- 'name': name,
- 'stake': stake,
- 'module_key': key_addr
- }
- response = self.compose_call('register', params=params, key=key)
+ params = {
+ 'network': subnet,
+ 'address': address,
+ 'name': name,
+ 'stake': stake,
+ 'module_key': key_addr
+ }
+ response = self.compose_call('register', params=params, key=key)
+ except InvalidParameterError as e:
+ raise ChainTransactionError(e) from e
return response
+
def vote(
self,
@@ -1144,17 +1186,19 @@ def vote(
do not match.
ChainTransactionError: If the transaction fails.
"""
+ try:
- assert len(uids) == len(weights)
-
- params = {
- 'uids': uids,
- 'weights': weights,
- 'netuid': netuid,
- }
+ assert len(uids) == len(weights)
- response = self.compose_call('set_weights', params=params, key=key)
+ params = {
+ 'uids': uids,
+ 'weights': weights,
+ 'netuid': netuid,
+ }
+ response = self.compose_call('set_weights', params=params, key=key)
+ except InvalidParameterError as e:
+ raise ChainTransactionError(e) from e
return response
def update_subnet(
@@ -1180,16 +1224,18 @@ def update_subnet(
AuthorizationError: If the key is not authorized.
ChainTransactionError: If the transaction fails.
"""
+ try:
- general_params = dict(params)
- general_params['netuid'] = netuid
-
- response = self.compose_call(
- fn='update_subnet',
- params=general_params,
- key=key,
- )
+ general_params = dict(params)
+ general_params['netuid'] = netuid
+ response = self.compose_call(
+ fn='update_subnet',
+ params=general_params,
+ key=key,
+ )
+ except AuthorizationError as e:
+ raise ChainTransactionError(e) from e
return response
def transfer_stake(
@@ -1218,18 +1264,19 @@ def transfer_stake(
enough staked tokens. ChainTransactionError: If the transaction
fails.
"""
-
- amount = amount - self.get_existential_deposit()
-
- params = {
- 'amount': amount,
- 'netuid': netuid,
- 'module_key': from_module_key,
- 'new_module_key': dest_module_address,
- }
-
- response = self.compose_call('transfer_stake', key=key, params=params)
-
+ try:
+ amount = amount - self.get_existential_deposit()
+
+ params = {
+ 'amount': amount,
+ 'netuid': netuid,
+ 'module_key': from_module_key,
+ 'new_module_key': dest_module_address,
+ }
+
+ response = self.compose_call('transfer_stake', key=key, params=params)
+ except InsufficientStakeError as e:
+ raise ChainTransactionError(e) from e
return response
def multiunstake(
@@ -1261,23 +1308,26 @@ def multiunstake(
have enough staked tokens. ChainTransactionError: If the transaction
fails.
"""
+ try:
+ assert len(keys) == len(amounts)
- assert len(keys) == len(amounts)
-
- # extract existential deposit from amounts
- amounts = [a - self.get_existential_deposit() for a in amounts]
-
- params = {
- "netuid": netuid,
- "module_keys": keys,
- "amounts": amounts
- }
+ # extract existential deposit from amounts
+ amounts = [a - self.get_existential_deposit() for a in amounts]
- response = self.compose_call(
- 'remove_stake_multiple',
- params=params, key=key
- )
+ params = {
+ "netuid": netuid,
+ "module_keys": keys,
+ "amounts": amounts
+ }
+ response = self.compose_call(
+ 'remove_stake_multiple',
+ params=params, key=key
+ )
+ except MismatchedLengthError as e:
+ raise ChainTransactionError(e) from e
+ except InsufficientStakeError as e:
+ raise ChainTransactionError(e) from e
return response
def multistake(
@@ -1308,18 +1358,21 @@ def multistake(
do not match.
ChainTransactionError: If the transaction fails.
"""
+ try:
- assert len(keys) == len(amounts)
+ assert len(keys) == len(amounts)
- params = {
- 'module_keys': keys,
- 'amounts': amounts,
- 'netuid': netuid,
- }
+ params = {
+ 'module_keys': keys,
+ 'amounts': amounts,
+ 'netuid': netuid,
+ }
- response = self.compose_call(
- 'add_stake_multiple', params=params, key=key
- )
+ response = self.compose_call(
+ 'add_stake_multiple', params=params, key=key
+ )
+ except MismatchedLengthError as e:
+ raise ChainTransactionError(e) from e
return response
@@ -1350,19 +1403,20 @@ def add_profit_shares(
lists do not match.
ChainTransactionError: If the transaction fails.
"""
+ try:
+ assert len(keys) == len(shares)
- assert len(keys) == len(shares)
-
- params = {
- 'keys': keys,
- 'shares': shares
- }
-
- response = self.compose_call(
- 'add_profit_shares',
- params=params, key=key
- )
+ params = {
+ 'keys': keys,
+ 'shares': shares
+ }
+ response = self.compose_call(
+ 'add_profit_shares',
+ params=params, key=key
+ )
+ except MismatchedLengthError as e:
+ raise ChainTransactionError(e) from e
return response
def add_subnet_proposal(self,
@@ -1390,14 +1444,16 @@ def add_subnet_proposal(self,
parameters are invalid.
ChainTransactionError: If the transaction fails.
"""
+ try:
+ general_params = dict(params)
+ general_params['netuid'] = netuid
- general_params = dict(params)
- general_params['netuid'] = netuid
-
- response = self.compose_call(fn='add_subnet_proposal',
- params=general_params,
- key=key,)
+ response = self.compose_call(fn='add_subnet_proposal',
+ params=general_params,
+ key=key,)
+ except InvalidParameterError as e:
+ raise ChainTransactionError(e) from e
return response
def add_global_proposal(self,
@@ -1427,12 +1483,14 @@ def add_global_proposal(self,
parameters are invalid.
ChainTransactionError: If the transaction fails.
"""
+ try:
+ general_params = dict(params)
+ response = self.compose_call(fn='add_global_proposal',
+ params=general_params,
+ key=key,)
- general_params = dict(params)
- response = self.compose_call(fn='add_global_proposal',
- params=general_params,
- key=key,)
-
+ except InvalidParameterError as e:
+ raise ChainTransactionError(e) from e
return response
def vote_on_proposal(self,
@@ -1454,13 +1512,15 @@ def vote_on_proposal(self,
exist or is invalid.
ChainTransactionError: If the transaction fails.
"""
+ try:
+ params = {
+ 'proposal_id': proposal_id
+ }
- params = {
- 'proposal_id': proposal_id
- }
-
- response = self.compose_call('vote_proposal', key=key, params=params)
+ response = self.compose_call('vote_proposal', key=key, params=params)
+ except InvalidProposalIDError as e:
+ raise ChainTransactionError(e) from e
return response
def unvote_on_proposal(self,
@@ -1484,16 +1544,18 @@ def unvote_on_proposal(self,
ChainTransactionError: If the transaction fails to be processed, or
if there was no prior vote to retract.
"""
+ try:
+ params = {
+ 'proposal_id': proposal_id
+ }
- params = {
- 'proposal_id': proposal_id
- }
-
- response = self.compose_call('unvote_proposal', key=key, params=params)
+ response = self.compose_call('unvote_proposal', key=key, params=params)
+ except InvalidProposalIDError as e:
+ raise ChainTransactionError(e) from e
return response
- def query_map_proposals(self) -> dict[int, dict[str, Any]]:
+ def query_map_proposals(self) -> Union[Dict[int, dict[str, Any]], None]:
"""
Retrieves a mappping of proposals from the network.
@@ -1507,8 +1569,14 @@ def query_map_proposals(self) -> dict[int, dict[str, Any]]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('Proposals', extract_value=False)["Proposals"]
+ result = {}
+ try:
+ result = self.query_map('Proposals', params=[], extract_value=False)["Proposals"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ if not result:
+ return None
+ return result
def query_map_weights(self, netuid: int = 0) -> dict[int, list[int]]:
"""
@@ -1526,8 +1594,12 @@ def query_map_weights(self, netuid: int = 0) -> dict[int, list[int]]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('Weights', [netuid], extract_value=False)["Weights"]
+ try:
+ result = self.query_map('Weights', [netuid], extract_value=False)["Weights"]
+ except QueryError as e:
+ raise QueryError(e) from e
+
+ return result
def query_map_key(
self,
@@ -1550,7 +1622,12 @@ def query_map_key(
Raises:
QueryError: If the query to the network fails or is invalid.
"""
- return self.query_map('Keys', [netuid], extract_value=extract_value)["Keys"]
+
+ try:
+ result = self.query_map('Keys', [netuid], extract_value=extract_value)["Keys"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_address(self, netuid: int = 0) -> dict[int, str]:
"""
@@ -1567,8 +1644,11 @@ def query_map_address(self, netuid: int = 0) -> dict[int, str]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('Address', [netuid], extract_value=False)["Address"]
+ try:
+ result = self.query_map('Address', [netuid], extract_value=False)["Address"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_emission(self) -> dict[int, list[int]]:
"""
@@ -1583,8 +1663,11 @@ def query_map_emission(self) -> dict[int, list[int]]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('Emission', extract_value=False)["Emission"]
+ try:
+ result = self.query_map('Emission', params=[], extract_value=False)["Emission"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_incentive(self) -> dict[int, list[int]]:
"""
@@ -1599,8 +1682,11 @@ def query_map_incentive(self) -> dict[int, list[int]]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('Incentive', extract_value=False)["Incentive"]
+ try:
+ result = self.query_map('Incentives', params=[], extract_value=False)["Incentives"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_dividend(self) -> dict[int, list[int]]:
"""
@@ -1615,8 +1701,11 @@ def query_map_dividend(self) -> dict[int, list[int]]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('Dividends', extract_value=False)["Dividends"]
+ try:
+ result = self.query_map('Dividends', params=[], extract_value=False)["Dividends"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_regblock(self, netuid: int = 0) -> dict[int, int]:
"""
@@ -1634,8 +1723,12 @@ def query_map_regblock(self, netuid: int = 0) -> dict[int, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query_map('RegistrationBlock', [netuid], extract_value=False)["RegistrationBlock"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query_map('RegistrationBlock', [netuid], extract_value=False)["RegistrationBlock"]
def query_map_lastupdate(self) -> dict[int, list[int]]:
"""
@@ -1649,8 +1742,11 @@ def query_map_lastupdate(self) -> dict[int, list[int]]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('LastUpdate', extract_value=False)["LastUpdate"]
+ try:
+ result = self.query_map('LastUpdate', params=[], extract_value=False)["LastUpdate"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_total_stake(self, extract_value: bool = False) -> dict[int, int]:
"""
@@ -1664,8 +1760,11 @@ def query_map_total_stake(self, extract_value: bool = False) -> dict[int, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('TotalStake', extract_value=extract_value)["TotalStake"]
+ try:
+ result = self.query_map('TotalStake', params=[], extract_value=extract_value)["TotalStake"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_stakefrom(self, netuid: int = 0, extract_value: bool = False) -> \
dict[str, list[tuple[str, int]]]:
@@ -1685,8 +1784,11 @@ def query_map_stakefrom(self, netuid: int = 0, extract_value: bool = False) -> \
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('StakeFrom', [netuid], extract_value=extract_value)["StakeFrom"]
+ try:
+ result = self.query_map('StakeFrom', [netuid], extract_value=extract_value)["StakeFrom"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_staketo(self, netuid: int = 0, extract_value: bool = False) -> \
dict[str, list[tuple[str, int]]]:
@@ -1706,8 +1808,11 @@ def query_map_staketo(self, netuid: int = 0, extract_value: bool = False) -> \
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('StakeTo', [netuid], extract_value=extract_value)
+ try:
+ result = self.query_map('StakeTo', [netuid], extract_value=extract_value)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_stake(self, netuid: int = 0) -> dict[str, int]:
"""
@@ -1726,8 +1831,11 @@ def query_map_stake(self, netuid: int = 0) -> dict[str, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('Stake', [netuid], extract_value=False)["Stake"]
+ try:
+ result = self.query_map('Stake', [netuid], extract_value=False)["Stake"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_delegationfee(self, netuid: int = 0) -> dict[str, int]:
"""
@@ -1745,9 +1853,12 @@ def query_map_delegationfee(self, netuid: int = 0) -> dict[str, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('DelegationFee', [netuid], extract_value=False)["DelegationFee"]
-
+ try:
+ result = self.query_map('DelegationFee', [netuid], extract_value=False)["DelegationFee"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
+
def query_map_tempo(self) -> dict[int, int]:
"""
Retrieves a mapping of tempo settings for the network.
@@ -1761,8 +1872,11 @@ def query_map_tempo(self) -> dict[int, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map("Tempo", extract_value=False)["Tempo"]
+ try:
+ result = self.query_map("Tempo", params=[], extract_value=False)["Tempo"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_immunity_period(self) -> dict[int, int]:
"""
@@ -1778,8 +1892,12 @@ def query_map_immunity_period(self) -> dict[int, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map("ImmunityPeriod", extract_value=False)["ImmunityPeriod"]
+ try:
+ result = self.query_map("ImmunityPeriod", params=[], extract_value=False)["ImmunityPeriod"]
+ except QueryError as e:
+ raise QueryError(e) from e
+
+ return result
def query_map_min_allowed_weights(self) -> dict[int, int]:
"""
@@ -1796,8 +1914,11 @@ def query_map_min_allowed_weights(self) -> dict[int, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map("MinAllowedWeights", extract_value=False)["MinAllowedWeights"]
+ try:
+ result = self.query_map("MinAllowedWeights", params=[], extract_value=False)["MinAllowedWeights"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_max_allowed_weights(self) -> dict[int, int]:
"""
@@ -1814,8 +1935,11 @@ def query_map_max_allowed_weights(self) -> dict[int, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map("MaxAllowedWeights", extract_value=False)["MaxAllowedWeights"]
+ try:
+ result = self.query_map("MaxAllowedWeights", params=[], extract_value=False)["MaxAllowedWeights"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_max_allowed_uids(self) -> dict[int, int]:
"""
@@ -1834,8 +1958,11 @@ def query_map_max_allowed_uids(self) -> dict[int, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map("MaxAllowedUids", extract_value=False)["MaxAllowedUids"]
+ try:
+ result = self.query_map("MaxAllowedUids", params=[], extract_value=False)["MaxAllowedUids"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_min_stake(self) -> dict[int, int]:
"""
@@ -1851,8 +1978,11 @@ def query_map_min_stake(self) -> dict[int, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map("MinStake", extract_value=False)["MinStake"]
+ try:
+ result = self.query_map("MinStake", params=[], extract_value=False)["MinStake"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_max_stake(self) -> dict[int, int]:
"""
@@ -1867,8 +1997,11 @@ def query_map_max_stake(self) -> dict[int, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map("MaxStake", extract_value=False)["MaxStake"]
+ try:
+ result = self.query_map("MaxStake", params=[], extract_value=False)["MaxStake"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_founder(self) -> dict[int, str]:
"""
@@ -1883,8 +2016,12 @@ def query_map_founder(self) -> dict[int, str]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query_map("Founder", params=[], extract_value=False)["Founder"]
+ except QueryError as e:
+ raise QueryError(e) from e
- return self.query_map("Founder", extract_value=False)["Founder"]
+ return result
def query_map_founder_share(self) -> dict[int, int]:
"""
@@ -1897,10 +2034,14 @@ def query_map_founder_share(self) -> dict[int, int]:
A dictionary mapping network UIDs to their founder share percentages.
Raises:
- QueryError: If the query to the network fails or is invalid.
+ QueryError: If the query to the network fails or is inval
+ i d.
"""
-
- return self.query_map("FounderShare", extract_value=False)["FounderShare"]
+ try:
+ result = self.query_map("FounderShare", params=[], extract_value=False)["FounderShare"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_incentive_ratio(self) -> dict[int, int]:
"""
@@ -1914,10 +2055,13 @@ def query_map_incentive_ratio(self) -> dict[int, int]:
A dictionary mapping network UIDs to their incentive ratios.
Raises:
- QueryError: If the query to the network fails or is invalid.
+ QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map("IncentiveRatio", extract_value=False)["IncentiveRatio"]
+ try:
+ result = self.query_map("IncentiveRatio", params=[], extract_value=False)["IncentiveRatio"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_trust_ratio(self) -> dict[int, int]:
"""
@@ -1933,8 +2077,11 @@ def query_map_trust_ratio(self) -> dict[int, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map("TrustRatio", extract_value=False)["TrustRatio"]
+ try:
+ result = self.query_map("TrustRatio", params=[],extract_value=False)["TrustRatio"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_vote_threshold_subnet(self) -> dict[int, int]:
"""
@@ -1950,9 +2097,12 @@ def query_map_vote_threshold_subnet(self) -> dict[int, int]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map("VoteThresholdSubnet", extract_value=False)["VoteThresholdSubnet"]
-
+ try:
+ result = self.query_map("VoteThresholdSubnet", params=[],extract_value=False)["VoteThresholdSubnet"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
+
def query_map_vote_mode_subnet(self) -> dict[int, str]:
"""
Retrieves a mapping of vote modes for subnets within the network.
@@ -1966,10 +2116,13 @@ def query_map_vote_mode_subnet(self) -> dict[int, str]:
modes for subnets.
Raises:
- QueryError: If the query to the network fails or is invalid.
+ QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map("VoteModeSubnet", extract_value=False)["VoteModeSubnet"]
+ try:
+ result = self.query_map("VoteModeSubnet", params=[], extract_value=False)["VoteModeSubnet"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def query_map_subnet_names(self, extract_value: bool = False) -> dict[int, str]:
"""
@@ -1985,9 +2138,12 @@ def query_map_subnet_names(self, extract_value: bool = False) -> dict[int, str]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map("SubnetNames", extract_value=extract_value)["SubnetNames"]
-
+ try:
+ result = self.query_map("SubnetNames", params=[], extract_value=extract_value)["SubnetNames"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
+
def query_map_balances(self) -> \
dict[str, dict['str', int | dict[str, int]]]:
"""
@@ -2003,9 +2159,12 @@ def query_map_balances(self) -> \
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('Account', module='System', extract_value=False)["Account"]
-
+ try:
+ result = self.query_map('Account', params=[], module='System', extract_value=False)["Account"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
+
def query_map_registration_blocks(self, netuid: int = 0) -> dict[int, int]:
"""
Retrieves a mapping of registration blocks for UIDs on the network.
@@ -2020,10 +2179,15 @@ def query_map_registration_blocks(self, netuid: int = 0) -> dict[int, int]:
A dictionary mapping UIDs to their registration block numbers.
Raises:
- QueryError: If the query to the network fails or is invalid.
+ QueryError: If the query to the network fails or is
+ i nvalid.
"""
-
- return self.query_map("RegistrationBlock", [netuid], extract_value=False)["RegistrationBlock"]
+ try:
+ result = self.query_map("RegistrationBlock", [netuid], extract_value=False)["RegistrationBlock"]
+ except QueryError as e:
+ raise QueryError(e) from e
+
+ return result
def query_map_name(self, netuid: int = 0) -> dict[int, str]:
"""
@@ -2041,12 +2205,15 @@ def query_map_name(self, netuid: int = 0) -> dict[int, str]:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query_map('Name', [netuid], extract_value=False)["Name "]
+ try:
+ result = self.query_map('Name', [netuid], extract_value=False)["Name"]
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
# == QUERY FUNCTIONS == #
- def get_immunity_period(self, netuid: int = 0) -> int:
+ def get_immunity_period(self, netuid: int = 0) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the immunity period setting.
@@ -2061,12 +2228,16 @@ def get_immunity_period(self, netuid: int = 0) -> int:
The immunity period setting for the specified network subnet.
Raises:
- QueryError: If the query to the network fails or is invalid.
+ QueryError: If the query to the network fails or is
+ i nvalid.
"""
+ try:
+ result = self.query("ImmunityPeriod", params=[netuid],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("ImmunityPeriod", params=[netuid],)
-
- def get_min_allowed_weights(self, netuid: int = 0) -> int:
+ def get_min_allowed_weights(self, netuid: int = 0) -> Dict[Any, Any]:
"""
Queries the network for the minimum allowed weights setting.
@@ -2082,12 +2253,19 @@ def get_min_allowed_weights(self, netuid: int = 0) -> int:
subnet.
Raises:
- QueryError: If the query to the network fails or is invalid.
+ QueryError: If the query to the network fails or is inv
+ a lid.
"""
+ result = {}
+ try:
+ result = self.query("MinAllowedWeights", params=[netuid],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ if not result:
+ return {}
+ return result
- return self.query("MinAllowedWeights", params=[netuid],)
-
- def get_max_allowed_weights(self, netuid: int = 0) -> int:
+ def get_max_allowed_weights(self, netuid: int = 0) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the maximum allowed weights setting.
@@ -2103,12 +2281,20 @@ def get_max_allowed_weights(self, netuid: int = 0) -> int:
subnet.
Raises:
- QueryError: If the query to the network fails or is invalid.
+ QueryError: If the query to the network fails or is in
+ v alid.
"""
+ result = {}
+ try:
+ result = self.query("MaxAllowedWeights", params=[netuid])
+ except QueryError as e:
+ raise QueryError(e) from e
+
+ if not result:
+ return {}
+ return result
- return self.query("MaxAllowedWeights", params=[netuid])
-
- def get_max_allowed_uids(self, netuid: int = 0) -> int:
+ def get_max_allowed_uids(self, netuid: int = 0) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the maximum allowed UIDs setting.
@@ -2122,12 +2308,19 @@ def get_max_allowed_uids(self, netuid: int = 0) -> int:
The maximum number of allowed UIDs for the specified network subnet.
Raises:
- QueryError: If the query to the network fails or is invalid.
+ QueryError: If the query to the network fails or is
+ invalid.
"""
+ try:
+ result = self.query("MaxAllowedUids", params=[netuid])
+ except QueryError as e:
+ raise QueryError(e) from e
- return self.query("MaxAllowedUids", params=[netuid])
+ if not result:
+ return {}
+ return result
- def get_name(self, netuid: int = 0) -> str:
+ def get_name(self, netuid: int = 0) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the name of a specific subnet.
@@ -2140,10 +2333,13 @@ def get_name(self, netuid: int = 0) -> str:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("Name", params=[netuid])
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("Name", params=[netuid])
-
- def get_n(self, netuid: int = 0) -> int:
+ def get_n(self, netuid: int = 0) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the 'N' hyperparameter, which represents how
many modules are on the network.
@@ -2156,12 +2352,16 @@ def get_n(self, netuid: int = 0) -> int:
subnet.
Raises:
- QueryError: If the query to the network fails or is invalid.
+ QueryError: If the query to the networ
+ try:k fails or is invalid.
"""
+ try:
+ result = self.query("N", params=[netuid])
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("N", params=[netuid])
-
- def get_tempo(self, netuid: int = 0) -> int:
+ def get_tempo(self, netuid: int = 0) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the tempo setting, measured in blocks, for the
specified subnet.
@@ -2175,8 +2375,11 @@ def get_tempo(self, netuid: int = 0) -> int:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query("Tempo", params=[netuid])
+ try:
+ result = self.query("Tempo", params=[netuid])
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def get_total_stake(self, netuid: int = 0):
"""
@@ -2193,10 +2396,13 @@ def get_total_stake(self, netuid: int = 0):
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("TotalStake", params=[netuid],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("TotalStake", params=[netuid],)
-
- def get_registrations_per_block(self):
+ def get_registrations_per_block(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the number of registrations per block.
@@ -2209,10 +2415,16 @@ def get_registrations_per_block(self):
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ result = {}
+ try:
+ result = self.query("RegistrationsPerBlock", params=[])
+ except QueryError as e:
+ raise QueryError(e) from e
+ if not result:
+ return {}
+ return result
- return self.query("RegistrationsPerBlock",)
-
- def max_registrations_per_block(self, netuid: int = 0):
+ def max_registrations_per_block(self, netuid: int = 0) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the maximum number of registrations per block.
@@ -2229,10 +2441,13 @@ def max_registrations_per_block(self, netuid: int = 0):
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query("MaxRegistrationsPerBlock", params=[netuid],)
-
- def get_proposal(self, proposal_id: int = 0):
+ try:
+ result = self.query("MaxRegistrationsPerBlock", params=[netuid],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
+
+ def get_proposal(self, proposal_id: int = 0) -> Union[Dict[Any, Any], None]:
"""
Queries the network for a specific proposal.
@@ -2246,10 +2461,13 @@ def get_proposal(self, proposal_id: int = 0):
QueryError: If the query to the network fails, is invalid,
or if the proposal ID does not exist.
"""
-
- return self.query("Proposals", params=[proposal_id],)
-
- def get_trust(self, netuid: int = 0):
+ try:
+ result = self.query("Proposals", params=[proposal_id],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
+
+ def get_trust(self, netuid: int = 0) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the trust setting of a specific network subnet.
@@ -2266,10 +2484,13 @@ def get_trust(self, netuid: int = 0):
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("Trust", params=[netuid],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("Trust", params=[netuid],)
-
- def get_uids(self, key: Ss58Address, netuid: int = 0) -> bool | None:
+ def get_uids(self, key: Ss58Address, netuid: int = 0) -> Union[Dict[Any, Any], None]:
"""
Queries the network for module UIDs associated with a specific key.
@@ -2283,10 +2504,13 @@ def get_uids(self, key: Ss58Address, netuid: int = 0) -> bool | None:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("Uids", params=[netuid, key],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("Uids", params=[netuid, key],)
-
- def get_unit_emission(self) -> int:
+ def get_unit_emission(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the unit emission setting.
@@ -2299,10 +2523,16 @@ def get_unit_emission(self) -> int:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ result = {}
+ try:
+ result = self.query("UnitEmission", params=[])
+ except QueryError as e:
+ raise QueryError(e) from e
+ if result is None:
+ return {}
+ return result
- return self.query("UnitEmission")
-
- def get_tx_rate_limit(self) -> int:
+ def get_tx_rate_limit(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the transaction rate limit.
@@ -2316,10 +2546,13 @@ def get_tx_rate_limit(self) -> int:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("TxRateLimit", params=[])
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("TxRateLimit",)
-
- def get_burn_rate(self) -> int:
+ def get_burn_rate(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the burn rate setting.
@@ -2333,10 +2566,13 @@ def get_burn_rate(self) -> int:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("BurnRate", params=[],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("BurnRate", params=[],)
-
- def get_burn(self) -> int:
+ def get_burn(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the burn setting.
@@ -2348,12 +2584,16 @@ def get_burn(self) -> int:
The burn value for the network.
Raises:
- QueryError: If the query to the network fails or is invalid.
+ QueryError: If the query to the netw
+ try:o rk fails or is invalid.
"""
+ try:
+ result = self.query("Burn", params=[],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("Burn", params=[],)
-
- def get_min_burn(self) -> int:
+ def get_min_burn(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the minimum burn setting.
@@ -2367,10 +2607,13 @@ def get_min_burn(self) -> int:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("MinBurn", params=[],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("MinBurn", params=[],)
-
- def get_min_weight_stake(self) -> int:
+ def get_min_weight_stake(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the minimum weight stake setting.
@@ -2382,12 +2625,15 @@ def get_min_weight_stake(self) -> int:
The minimum weight stake for the network.
Raises:
- QueryError: If the query to the network fails or is invalid.
+ QueryError: If the query to the network fails for is invalid.
"""
+ try:
+ result = self.query("MinWeightStake", params=[])
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("MinWeightStake", params=[])
-
- def get_vote_mode_global(self) -> str:
+ def get_vote_mode_global(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the global vote mode setting.
@@ -2400,10 +2646,13 @@ def get_vote_mode_global(self) -> str:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("VoteModeGlobal", params=[])
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("VoteModeGlobal",)
-
- def get_max_proposals(self) -> int:
+ def get_max_proposals(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the maximum number of proposals allowed.
@@ -2416,10 +2665,13 @@ def get_max_proposals(self) -> int:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("MaxProposals", params=[])
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("MaxProposals",)
-
- def get_max_registrations_per_block(self) -> int:
+ def get_max_registrations_per_block(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the maximum number of registrations per block.
@@ -2432,10 +2684,13 @@ def get_max_registrations_per_block(self) -> int:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("MaxRegistrationsPerBlock", params=[],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("MaxRegistrationsPerBlock", params=[],)
-
- def get_max_name_length(self) -> int:
+ def get_max_name_length(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the maximum length allowed for names.
@@ -2448,10 +2703,13 @@ def get_max_name_length(self) -> int:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("MaxNameLength", params=[],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("MaxNameLength", params=[],)
-
- def get_global_vote_threshold(self) -> int:
+ def get_global_vote_threshold(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the global vote threshold.
@@ -2464,10 +2722,13 @@ def get_global_vote_threshold(self) -> int:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("GlobalVoteThreshold",params=[])
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("GlobalVoteThreshold",)
-
- def get_max_allowed_subnets(self) -> int:
+ def get_max_allowed_subnets(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the maximum number of allowed subnets.
@@ -2480,10 +2741,13 @@ def get_max_allowed_subnets(self) -> int:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
+ try:
+ result = self.query("MaxAllowedSubnets", params=[])
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
- return self.query("MaxAllowedSubnets", params=[],)
-
- def get_max_allowed_modules(self) -> int:
+ def get_max_allowed_modules(self) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the maximum number of allowed modules.
@@ -2496,11 +2760,14 @@ def get_max_allowed_modules(self) -> int:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query("MaxAllowedModules", params=[],)
+ try:
+ result = self.query("MaxAllowedModules", params=[])
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def get_min_stake(self,
- netuid: int = 0) -> int:
+ netuid: int = 0) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the minimum stake required to register a key.
@@ -2516,13 +2783,16 @@ def get_min_stake(self,
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query('MinStake', params=[netuid])
+ try:
+ result = self.query('MinStake', params=[netuid])
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def get_stake(self,
key: Ss58Address,
netuid: int = 0,
- ) -> int:
+ ) -> Union[Dict[Any, Any], None]:
"""
Queries the network for the stake delegated with a specific key.
@@ -2539,14 +2809,17 @@ def get_stake(self,
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- return self.query("Stake", params=[netuid, key],)
+ try:
+ result = self.query("Stake", params=[netuid, key],)
+ except QueryError as e:
+ raise QueryError(e) from e
+ return result
def get_stakefrom(
self,
key_addr: Ss58Address,
netuid: int = 0,
- ) -> dict[str, int]:
+ ) -> Union[Dict[Any, Any], None]:
"""
Retrieves a list of keys from which a specific key address is staked.
@@ -2565,9 +2838,13 @@ def get_stakefrom(
Raises:
QueryError: If the query to the network fails or is invalid.
"""
- result = self.query('StakeFrom', [netuid, key_addr])
-
- return {k: v for k, v in result}
+ try:
+ result = self.query('StakeFrom', [netuid, key_addr])
+ except QueryError as e:
+ raise QueryError(e) from e
+ if not result:
+ return {}
+ return {k: v for k, v in result}
def get_staketo(
self,
@@ -2592,15 +2869,18 @@ def get_staketo(
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- result = self.query('StakeTo', [netuid, key_addr])
-
+ try:
+ result = self.query('StakeTo', [netuid, key_addr])
+ except QueryError as e:
+ raise QueryError(e) from e
+ if not result:
+ return {}
return {k: v for k, v in result}
-
+
def get_balance(
self,
addr: Ss58Address,
- ) -> int:
+ ) -> Union[Dict[Any, Any], None]:
"""
Retrieves the balance of a specific key.
@@ -2613,9 +2893,12 @@ def get_balance(
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- result = self.query('Account', module='System', params=[addr])
-
+ try:
+ result = self.query('Account', module='System', params=[addr])
+ except QueryError as e:
+ raise QueryError(e) from e
+ if not result:
+ return None
return result["data"]["free"]
def get_block(self, block_hash: str | None = None) -> dict[Any, Any] | None:
@@ -2633,12 +2916,13 @@ def get_block(self, block_hash: str | None = None) -> dict[Any, Any] | None:
Raises:
QueryError: If the query to the network fails or is invalid.
"""
-
- with self.get_conn() as substrate:
- block: dict[Any, Any] | None = substrate.get_block( # type: ignore
+ try:
+ with self.get_conn() as substrate:
+ block: dict[Any, Any] | None = substrate.get_block( # type: ignore
block_hash # type: ignore
)
-
+ except QueryError as e:
+ raise QueryError(e) from e
return block
def get_existential_deposit(self, block_hash: str | None = None) -> int:
@@ -2654,9 +2938,10 @@ def get_existential_deposit(self, block_hash: str | None = None) -> int:
The value returned is a fixed value defined in the
client and may not reflect changes in the network's configuration.
"""
-
- with self.get_conn() as substrate:
- result: int = substrate.get_constant( # type: ignore
+ try:
+ with self.get_conn() as substrate:
+ result: int = substrate.get_constant( # type: ignore
"Balances", "ExistentialDeposit", block_hash).value # type: ignore
-
+ except QueryError as e:
+ raise QueryError(e) from e
return result
diff --git a/src/communex/compat/key.py b/src/communex/compat/key.py
index 87e0fd3..5d2114a 100644
--- a/src/communex/compat/key.py
+++ b/src/communex/compat/key.py
@@ -15,6 +15,7 @@
from communex.key import check_ss58_address, is_ss58_address
from communex.types import Ss58Address
from communex.util import bytes_to_hex, check_str
+from communex.errors import InvalidKeyFormatError
def check_key_dict(key_dict: Any) -> CommuneKeyDict:
@@ -73,18 +74,19 @@ def from_classic_dict(data: dict[Any, Any]) -> Keypair:
The reconstructed `Key` instance.
Raises:
- AssertionError: If `data` does not conform to the expected format.
+ InvalidKeyFormatError: If `data` does not conform to the expected format.
"""
+ try:
+ data_ = check_key_dict(data)
- data_ = check_key_dict(data)
-
- ss58_address = data_["ss58_address"]
- private_key = data_["private_key"]
- public_key = data_["public_key"]
- ss58_format = data_["ss58_format"]
-
- key = Keypair.create_from_private_key(private_key, public_key, ss58_address, ss58_format)
+ ss58_address = data_["ss58_address"]
+ private_key = data_["private_key"]
+ public_key = data_["public_key"]
+ ss58_format = data_["ss58_format"]
+ key = Keypair.create_from_private_key(private_key, public_key, ss58_address, ss58_format)
+ except AssertionError as e:
+ raise InvalidKeyFormatError(e) from e
return key
@@ -142,6 +144,9 @@ def local_key_addresses() -> dict[str, Ss58Address]:
addresses_map: dict[str, Ss58Address] = {}
for key_name in key_names:
+ # issue #11 https://github.com/agicommies/communex/issues/12 added check for key2address to stop error from being thrown by wrong key type.
+ if key_name == "key2address":
+ raise KeyFormatError(f"Key '{key_name}' is not a valid key name. Passing key.")
key_dict = classic_load_key(key_name)
addresses_map[key_name] = check_ss58_address(key_dict.ss58_address)
@@ -163,8 +168,8 @@ def resolve_key_ss58(key: Ss58Address | Keypair | str) -> Ss58Address:
try:
keypair = classic_load_key(key)
- except FileNotFoundError:
- raise ValueError(f"Key is not a valid SS58 address nor a valid key name: {key}")
+ except FileNotFoundError as e:
+ raise InvalidKeyFormatError(f"Key is not a valid SS58 address nor a valid key name: {key}") from e
address = keypair.ss58_address
diff --git a/src/communex/errors.py b/src/communex/errors.py
index b8bf2d6..3b23dd3 100644
--- a/src/communex/errors.py
+++ b/src/communex/errors.py
@@ -1,10 +1,70 @@
+
+
+class AuthorizationError(Exception):
+ """Access denied. Unathorized key."""
+
class ChainTransactionError(Exception):
- """Error for any chain transaction related errors."""
+ """Error processing chain transaction."""
+
+class InsufficientBalanceError(Exception):
+ """Insufficient balance on key."""
+
+class InsufficientStakeError(Exception):
+ """Insufficient stake."""
+class InvalidClassError(Exception):
+ """Invalid class."""
+class InvalidIPError(Exception):
+ """Invalid ip."""
+
+class InvalidModuleError(Exception):
+ """Invalid module."""
+
+class InvalidParameterError(Exception):
+ """Invalid parameter."""
+
+class InvalidProposalIDError(Exception):
+ """Invalid proposal id."""
+
+class InvalidKeyFormatError(Exception):
+ """Invalid key format."""
+
+class MismatchedLengthError(Exception):
+ """Mismatched length."""
+
class NetworkError(BaseException):
"""Base for any network related errors."""
-
class NetworkQueryError(NetworkError):
- """Network query related error."""
+ """Network query error."""
+
+class QueueEmptyError(NetworkError):
+ """Queue empty error."""
+
+class QueryError(NetworkError):
+ """Generic query error."""
+
+class SubstrateRequestException(Exception):
+ """Substrate request exception."""
+
+class CLIMenuError(Exception):
+ """Base for any cli menu related errors."""
+
+class CLIBalanceError(CLIMenuError):
+ """Wrong command in balance cli menu."""
+
+class CLIKeyError(CLIMenuError):
+ """Wrong command in key cli menu."""
+
+class CLIMiscError(CLIMenuError):
+ """Wrong command in misc cli menu."""
+
+class CLIModuleError(CLIMenuError):
+ """Wrong command in module cli menu."""
+
+class CLINetworkError(CLIMenuError):
+ """Wrong command in network cli menu."""
+
+class CLISubnetError(CLIMenuError):
+ """Wrong command in subnet cli menu."""
\ No newline at end of file
diff --git a/src/communex/logging.py b/src/communex/logging.py
new file mode 100644
index 0000000..5ef1a46
--- /dev/null
+++ b/src/communex/logging.py
@@ -0,0 +1,12 @@
+import sys
+from loguru import logger
+
+
+logger.level("INFO")
+
+logger.opt(colors=True).add(sys.stderr, format="{message}")
+logger.opt(colors=True).add(sys.stdout, format="{message}")
+logger.opt(colors=True).add("communex.log", format="{message}")
+
+def get_logger():
+ return logger
\ No newline at end of file
diff --git a/src/communex/types.py b/src/communex/types.py
index 4668e2c..04e7a6e 100644
--- a/src/communex/types.py
+++ b/src/communex/types.py
@@ -1,8 +1,8 @@
"""
Common types for the communex module.
"""
-
-from typing import NewType, TypedDict
+from pydantic import BaseModel
+from typing import NewType, TypedDict, Optional
Ss58Address = NewType("Ss58Address", str)
"""Substrate SS58 address.
@@ -18,13 +18,13 @@
# TODO: replace with dataclasses(?)
-
-class NetworkParams(TypedDict):
+# Made it into a pydantic base model so that I could assign none to the values not being passed to it in NetworkParams in src/communex/cli/network.py. The red lines made my brain upset.
+class NetworkParams(BaseModel):
max_allowed_subnets: int
max_allowed_modules: int
max_registrations_per_block: int
- target_registrations_interval: int # in blocks
- target_registrations_per_interval: int
+ target_registrations_interval: Optional[int]=None # in blocks
+ target_registrations_per_interval: Optional[int]=None
unit_emission: int
tx_rate_limit: int
vote_threshold: int
@@ -33,13 +33,12 @@ class NetworkParams(TypedDict):
max_name_length: int
burn_rate: int
min_burn: int # min burn to register
- max_burn: int # max burn to register
- burn: int # this is the actual burn to register
+ max_burn: Optional[int]=None # max burn to register
+ burn: Optional[int]=None # this is the actual burn to register
min_stake: int
min_weight_stake: int
- adjustment_alpha: int
- floor_delegation_fee: int
-
+ adjustment_alpha: Optional[int]=None
+ floor_delegation_fee: Optional[int]=None
class SubnetParams(TypedDict):
name: str