diff --git a/bittensor/chain_data.py b/bittensor/chain_data.py index 9140415506..49f92e5ce6 100644 --- a/bittensor/chain_data.py +++ b/bittensor/chain_data.py @@ -14,17 +14,24 @@ # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. -import bittensor + +""" +This module provides data structures and functions for working with the Bittensor network, +including neuron and subnet information, SCALE encoding/decoding, and custom RPC type registry. +""" + import json -from enum import Enum from dataclasses import dataclass, asdict -from scalecodec.types import GenericCall +from enum import Enum from typing import List, Tuple, Dict, Optional, Any, TypedDict, Union + from scalecodec.base import RuntimeConfiguration, ScaleBytes from scalecodec.type_registry import load_type_registry_preset +from scalecodec.types import GenericCall from scalecodec.utils.ss58 import ss58_encode -from .utils import networking as net, U16_MAX, U16_NORMALIZED_FLOAT +import bittensor +from .utils import networking as net, RAOPERTAO, U16_NORMALIZED_FLOAT from .utils.balance import Balance from .utils.registration import torch, use_torch @@ -200,18 +207,16 @@ class AxonInfo: @property def is_serving(self) -> bool: """True if the endpoint is serving.""" - if self.ip == "0.0.0.0": - return False - else: - return True + return self.ip != "0.0.0.0" def ip_str(self) -> str: """Return the whole IP as string""" return net.ip__str__(self.ip_type, self.ip, self.port) def __eq__(self, other: "AxonInfo"): - if other == None: + if other is None: return False + if ( self.version == other.version and self.ip == other.ip @@ -221,8 +226,8 @@ def __eq__(self, other: "AxonInfo"): and self.hotkey == other.hotkey ): return True - else: - return False + + return False def __str__(self): return "AxonInfo( {}, {}, {}, {} )".format( @@ -241,10 +246,23 @@ def to_string(self) -> str: return AxonInfo(0, "", 0, 0, "", "").to_string() @classmethod - def from_string(cls, s: str) -> "AxonInfo": - """Creates an AxonInfo object from its string representation using JSON.""" + def from_string(cls, json_string: str) -> "AxonInfo": + """ + Creates an AxonInfo object from its string representation using JSON. + + Args: + json_string (str): The JSON string representation of the AxonInfo object. + + Returns: + AxonInfo: An instance of AxonInfo created from the JSON string. If decoding fails, returns a default AxonInfo object with default values. + + Raises: + json.JSONDecodeError: If there is an error in decoding the JSON string. + TypeError: If there is a type error when creating the AxonInfo object. + ValueError: If there is a value error when creating the AxonInfo object. + """ try: - data = json.loads(s) + data = json.loads(json_string) return cls(**data) except json.JSONDecodeError as e: bittensor.logging.error(f"Error decoding JSON: {e}") @@ -256,7 +274,15 @@ def from_string(cls, s: str) -> "AxonInfo": @classmethod def from_neuron_info(cls, neuron_info: dict) -> "AxonInfo": - """Converts a dictionary to an axon_info object.""" + """ + Converts a dictionary to an AxonInfo object. + + Args: + neuron_info (dict): A dictionary containing the neuron information. + + Returns: + instance (AxonInfo): An instance of AxonInfo created from the dictionary. + """ return cls( version=neuron_info["axon_info"]["version"], ip=net.int_to_ip(int(neuron_info["axon_info"]["ip"])), @@ -266,33 +292,14 @@ def from_neuron_info(cls, neuron_info: dict) -> "AxonInfo": coldkey=neuron_info["coldkey"], ) - def _to_parameter_dict( - self, return_type: str - ) -> Union[dict[str, Union[int, str]], "torch.nn.ParameterDict"]: - if return_type == "torch": - return torch.nn.ParameterDict(self.__dict__) - else: - return self.__dict__ - def to_parameter_dict( self, ) -> Union[dict[str, Union[int, str]], "torch.nn.ParameterDict"]: - """Returns a torch tensor or dict of the subnet info, depending on the USE_TORCH flag set""" + """Returns a torch tensor or dict of the subnet info, depending on the USE_TORCH flag set.""" if use_torch(): - return self._to_parameter_dict("torch") - else: - return self._to_parameter_dict("numpy") - - @classmethod - def _from_parameter_dict( - cls, - parameter_dict: Union[dict[str, Any], "torch.nn.ParameterDict"], - return_type: str, - ) -> "AxonInfo": - if return_type == "torch": - return cls(**dict(parameter_dict)) + return torch.nn.ParameterDict(self.__dict__) else: - return cls(**parameter_dict) + return self.__dict__ @classmethod def from_parameter_dict( @@ -300,9 +307,9 @@ def from_parameter_dict( ) -> "AxonInfo": """Returns an axon_info object from a torch parameter_dict or a parameter dict.""" if use_torch(): - return cls._from_parameter_dict(parameter_dict, "torch") + return cls(**dict(parameter_dict)) else: - return cls._from_parameter_dict(parameter_dict, "numpy") + return cls(**parameter_dict) class ChainDataType(Enum): @@ -316,18 +323,24 @@ class ChainDataType(Enum): SubnetHyperparameters = 8 -# Constants -RAOPERTAO = 1e9 -U16_MAX = 65535 -U64_MAX = 18446744073709551615 - - def from_scale_encoding( - input: Union[List[int], bytes, ScaleBytes], + input_: Union[List[int], bytes, ScaleBytes], type_name: ChainDataType, is_vec: bool = False, is_option: bool = False, ) -> Optional[Dict]: + """ + Decodes input_ data from SCALE encoding based on the specified type name and modifiers. + + Args: + input_ (Union[List[int], bytes, ScaleBytes]): The input_ data to decode. + type_name (ChainDataType): The type of data being decoded. + is_vec (bool, optional): Whether the data is a vector of the specified type. Default is ``False``. + is_option (bool, optional): Whether the data is an optional value of the specified type. Default is ``False``. + + Returns: + Optional[Dict]: The decoded data as a dictionary, or ``None`` if the decoding fails. + """ type_string = type_name.name if type_name == ChainDataType.DelegatedInfo: # DelegatedInfo is a tuple of (DelegateInfo, Compact) @@ -337,22 +350,22 @@ def from_scale_encoding( if is_vec: type_string = f"Vec<{type_string}>" - return from_scale_encoding_using_type_string(input, type_string) + return from_scale_encoding_using_type_string(input_, type_string) def from_scale_encoding_using_type_string( - input: Union[List[int], bytes, ScaleBytes], type_string: str + input_: Union[List[int], bytes, ScaleBytes], type_string: str ) -> Optional[Dict]: - if isinstance(input, ScaleBytes): - as_scale_bytes = input + if isinstance(input_, ScaleBytes): + as_scale_bytes = input_ else: - if isinstance(input, list) and all([isinstance(i, int) for i in input]): - vec_u8 = input + if isinstance(input_, list) and all([isinstance(i, int) for i in input_]): + vec_u8 = input_ as_bytes = bytes(vec_u8) - elif isinstance(input, bytes): - as_bytes = input + elif isinstance(input_, bytes): + as_bytes = input_ else: - raise TypeError("input must be a List[int], bytes, or ScaleBytes") + raise TypeError("input_ must be a List[int], bytes, or ScaleBytes") as_scale_bytes = ScaleBytes(as_bytes) @@ -368,9 +381,7 @@ def from_scale_encoding_using_type_string( # Dataclasses for chain data. @dataclass class NeuronInfo: - r""" - Dataclass for neuron metadata. - """ + """Dataclass for neuron metadata.""" hotkey: str coldkey: str @@ -399,7 +410,7 @@ class NeuronInfo: @classmethod def fix_decoded_values(cls, neuron_info_decoded: Any) -> "NeuronInfo": - r"""Fixes the values of the NeuronInfo object.""" + """Fixes the values of the NeuronInfo object.""" neuron_info_decoded["hotkey"] = ss58_encode( neuron_info_decoded["hotkey"], bittensor.__ss58_format__ ) @@ -445,26 +456,23 @@ def fix_decoded_values(cls, neuron_info_decoded: Any) -> "NeuronInfo": neuron_info_decoded["axon_info"] = AxonInfo.from_neuron_info( neuron_info_decoded ) - return cls(**neuron_info_decoded) @classmethod def from_vec_u8(cls, vec_u8: List[int]) -> "NeuronInfo": - r"""Returns a NeuronInfo object from a ``vec_u8``.""" + """Returns a NeuronInfo object from a ``vec_u8``.""" if len(vec_u8) == 0: - return NeuronInfo._null_neuron() + return NeuronInfo.get_null_neuron() decoded = from_scale_encoding(vec_u8, ChainDataType.NeuronInfo) if decoded is None: - return NeuronInfo._null_neuron() - - decoded = NeuronInfo.fix_decoded_values(decoded) + return NeuronInfo.get_null_neuron() - return decoded + return NeuronInfo.fix_decoded_values(decoded) @classmethod def list_from_vec_u8(cls, vec_u8: List[int]) -> List["NeuronInfo"]: - r"""Returns a list of NeuronInfo objects from a ``vec_u8``.""" + """Returns a list of NeuronInfo objects from a ``vec_u8``""" decoded_list = from_scale_encoding( vec_u8, ChainDataType.NeuronInfo, is_vec=True @@ -478,7 +486,7 @@ def list_from_vec_u8(cls, vec_u8: List[int]) -> List["NeuronInfo"]: return decoded_list @staticmethod - def _null_neuron() -> "NeuronInfo": + def get_null_neuron() -> "NeuronInfo": neuron = NeuronInfo( uid=0, netuid=0, @@ -519,34 +527,10 @@ def from_weights_bonds_and_neuron_lite( return cls(**n_dict) - @staticmethod - def _neuron_dict_to_namespace(neuron_dict) -> "NeuronInfo": - # TODO: Legacy: remove? - if neuron_dict["hotkey"] == "5C4hrfjw9DjXZTzV3MwzrrAr9P1MJhSrvWGWqi1eSuyUpnhM": - return NeuronInfo._null_neuron() - else: - neuron = NeuronInfo(**neuron_dict) - neuron.stake_dict = { - hk: Balance.from_rao(stake) for hk, stake in neuron.stake.items() - } - neuron.stake = Balance.from_rao(neuron.total_stake) - neuron.total_stake = neuron.stake - neuron.rank = neuron.rank / U16_MAX - neuron.trust = neuron.trust / U16_MAX - neuron.consensus = neuron.consensus / U16_MAX - neuron.validator_trust = neuron.validator_trust / U16_MAX - neuron.incentive = neuron.incentive / U16_MAX - neuron.dividends = neuron.dividends / U16_MAX - neuron.emission = neuron.emission / RAOPERTAO - - return neuron - @dataclass class NeuronInfoLite: - r""" - Dataclass for neuron metadata, but without the weights and bonds. - """ + """Dataclass for neuron metadata, but without the weights and bonds.""" hotkey: str coldkey: str @@ -566,16 +550,14 @@ class NeuronInfoLite: dividends: float last_update: int validator_permit: bool - # weights: List[List[int]] - # bonds: List[List[int]] No weights or bonds in lite version - prometheus_info: "PrometheusInfo" + prometheus_info: Optional["PrometheusInfo"] axon_info: "axon_info" pruning_score: int is_null: bool = False @classmethod def fix_decoded_values(cls, neuron_info_decoded: Any) -> "NeuronInfoLite": - r"""Fixes the values of the NeuronInfoLite object.""" + """Fixes the values of the NeuronInfoLite object.""" neuron_info_decoded["hotkey"] = ss58_encode( neuron_info_decoded["hotkey"], bittensor.__ss58_format__ ) @@ -591,9 +573,6 @@ def fix_decoded_values(cls, neuron_info_decoded: Any) -> "NeuronInfoLite": neuron_info_decoded["stake_dict"] = stake_dict neuron_info_decoded["stake"] = sum(stake_dict.values()) neuron_info_decoded["total_stake"] = neuron_info_decoded["stake"] - # Don't need weights and bonds in lite version - # neuron_info_decoded['weights'] = [[int(weight[0]), int(weight[1])] for weight in neuron_info_decoded['weights']] - # neuron_info_decoded['bonds'] = [[int(bond[0]), int(bond[1])] for bond in neuron_info_decoded['bonds']] neuron_info_decoded["rank"] = U16_NORMALIZED_FLOAT(neuron_info_decoded["rank"]) neuron_info_decoded["emission"] = neuron_info_decoded["emission"] / RAOPERTAO neuron_info_decoded["incentive"] = U16_NORMALIZED_FLOAT( @@ -621,21 +600,19 @@ def fix_decoded_values(cls, neuron_info_decoded: Any) -> "NeuronInfoLite": @classmethod def from_vec_u8(cls, vec_u8: List[int]) -> "NeuronInfoLite": - r"""Returns a NeuronInfoLite object from a ``vec_u8``.""" + """Returns a NeuronInfoLite object from a ``vec_u8``.""" if len(vec_u8) == 0: - return NeuronInfoLite._null_neuron() + return NeuronInfoLite.get_null_neuron() decoded = from_scale_encoding(vec_u8, ChainDataType.NeuronInfoLite) if decoded is None: - return NeuronInfoLite._null_neuron() + return NeuronInfoLite.get_null_neuron() - decoded = NeuronInfoLite.fix_decoded_values(decoded) - - return decoded + return NeuronInfoLite.fix_decoded_values(decoded) @classmethod def list_from_vec_u8(cls, vec_u8: List[int]) -> List["NeuronInfoLite"]: - r"""Returns a list of NeuronInfoLite objects from a ``vec_u8``.""" + """Returns a list of NeuronInfoLite objects from a ``vec_u8``.""" decoded_list = from_scale_encoding( vec_u8, ChainDataType.NeuronInfoLite, is_vec=True @@ -649,7 +626,7 @@ def list_from_vec_u8(cls, vec_u8: List[int]) -> List["NeuronInfoLite"]: return decoded_list @staticmethod - def _null_neuron() -> "NeuronInfoLite": + def get_null_neuron() -> "NeuronInfoLite": neuron = NeuronInfoLite( uid=0, netuid=0, @@ -666,8 +643,6 @@ def _null_neuron() -> "NeuronInfoLite": dividends=0, last_update=0, validator_permit=False, - # weights = [], // No weights or bonds in lite version - # bonds = [], prometheus_info=None, axon_info=None, is_null=True, @@ -677,34 +652,10 @@ def _null_neuron() -> "NeuronInfoLite": ) return neuron - @staticmethod - def _neuron_dict_to_namespace(neuron_dict) -> "NeuronInfoLite": - # TODO: Legacy: remove? - if neuron_dict["hotkey"] == "5C4hrfjw9DjXZTzV3MwzrrAr9P1MJhSrvWGWqi1eSuyUpnhM": - return NeuronInfoLite._null_neuron() - else: - neuron = NeuronInfoLite(**neuron_dict) - neuron.stake = Balance.from_rao(neuron.total_stake) - neuron.stake_dict = { - hk: Balance.from_rao(stake) for hk, stake in neuron.stake.items() - } - neuron.total_stake = neuron.stake - neuron.rank = neuron.rank / U16_MAX - neuron.trust = neuron.trust / U16_MAX - neuron.consensus = neuron.consensus / U16_MAX - neuron.validator_trust = neuron.validator_trust / U16_MAX - neuron.incentive = neuron.incentive / U16_MAX - neuron.dividends = neuron.dividends / U16_MAX - neuron.emission = neuron.emission / RAOPERTAO - - return neuron - @dataclass class PrometheusInfo: - r""" - Dataclass for prometheus info. - """ + """Dataclass for prometheus info.""" block: int version: int @@ -714,7 +665,7 @@ class PrometheusInfo: @classmethod def fix_decoded_values(cls, prometheus_info_decoded: Dict) -> "PrometheusInfo": - r"""Returns a PrometheusInfo object from a prometheus_info_decoded dictionary.""" + """Returns a PrometheusInfo object from a prometheus_info_decoded dictionary.""" prometheus_info_decoded["ip"] = net.int_to_ip( int(prometheus_info_decoded["ip"]) ) @@ -736,7 +687,6 @@ class DelegateInfoLite: validator_permits (list[int]): List of subnets that the delegate is allowed to validate on. return_per_1000 (int): Return per 1000 TAO, for the delegate over a day. total_daily_return (int): Total daily return of the delegate. - """ delegate_ss58: str # Hotkey of delegate @@ -753,7 +703,7 @@ class DelegateInfoLite: @dataclass class DelegateInfo: - r""" + """ Dataclass for delegate information. For a lighter version of this class, see :func:`DelegateInfoLite`. Args: @@ -785,7 +735,7 @@ class DelegateInfo: @classmethod def fix_decoded_values(cls, decoded: Any) -> "DelegateInfo": - r"""Fixes the decoded values.""" + """Fixes the decoded values.""" return cls( hotkey_ss58=ss58_encode( @@ -811,57 +761,47 @@ def fix_decoded_values(cls, decoded: Any) -> "DelegateInfo": @classmethod def from_vec_u8(cls, vec_u8: List[int]) -> Optional["DelegateInfo"]: - r"""Returns a DelegateInfo object from a ``vec_u8``.""" + """Returns a DelegateInfo object from a ``vec_u8``.""" if len(vec_u8) == 0: return None decoded = from_scale_encoding(vec_u8, ChainDataType.DelegateInfo) - if decoded is None: return None - decoded = DelegateInfo.fix_decoded_values(decoded) - - return decoded + return DelegateInfo.fix_decoded_values(decoded) @classmethod def list_from_vec_u8(cls, vec_u8: List[int]) -> List["DelegateInfo"]: - r"""Returns a list of DelegateInfo objects from a ``vec_u8``.""" + """Returns a list of DelegateInfo objects from a ``vec_u8``.""" decoded = from_scale_encoding(vec_u8, ChainDataType.DelegateInfo, is_vec=True) if decoded is None: return [] - decoded = [DelegateInfo.fix_decoded_values(d) for d in decoded] - - return decoded + return [DelegateInfo.fix_decoded_values(d) for d in decoded] @classmethod def delegated_list_from_vec_u8( cls, vec_u8: List[int] ) -> List[Tuple["DelegateInfo", Balance]]: - r"""Returns a list of Tuples of DelegateInfo objects, and Balance, from a ``vec_u8``. + """Returns a list of Tuples of DelegateInfo objects, and Balance, from a ``vec_u8``. This is the list of delegates that the user has delegated to, and the amount of stake delegated. """ decoded = from_scale_encoding(vec_u8, ChainDataType.DelegatedInfo, is_vec=True) - if decoded is None: return [] - decoded = [ + return [ (DelegateInfo.fix_decoded_values(d), Balance.from_rao(s)) for d, s in decoded ] - return decoded - @dataclass class StakeInfo: - r""" - Dataclass for stake info. - """ + """Dataclass for stake info.""" hotkey_ss58: str # Hotkey address coldkey_ss58: str # Coldkey address @@ -869,8 +809,7 @@ class StakeInfo: @classmethod def fix_decoded_values(cls, decoded: Any) -> "StakeInfo": - r"""Fixes the decoded values.""" - + """Fixes the decoded values.""" return cls( hotkey_ss58=ss58_encode(decoded["hotkey"], bittensor.__ss58_format__), coldkey_ss58=ss58_encode(decoded["coldkey"], bittensor.__ss58_format__), @@ -879,60 +818,50 @@ def fix_decoded_values(cls, decoded: Any) -> "StakeInfo": @classmethod def from_vec_u8(cls, vec_u8: List[int]) -> Optional["StakeInfo"]: - r"""Returns a StakeInfo object from a ``vec_u8``.""" + """Returns a StakeInfo object from a ``vec_u8``.""" if len(vec_u8) == 0: return None decoded = from_scale_encoding(vec_u8, ChainDataType.StakeInfo) - if decoded is None: return None - decoded = StakeInfo.fix_decoded_values(decoded) - - return decoded + return StakeInfo.fix_decoded_values(decoded) @classmethod def list_of_tuple_from_vec_u8( cls, vec_u8: List[int] ) -> Dict[str, List["StakeInfo"]]: - r"""Returns a list of StakeInfo objects from a ``vec_u8``.""" + """Returns a list of StakeInfo objects from a ``vec_u8``.""" decoded: Optional[ list[tuple[str, list[object]]] ] = from_scale_encoding_using_type_string( - input=vec_u8, type_string="Vec<(AccountId, Vec)>" + input_=vec_u8, type_string="Vec<(AccountId, Vec)>" ) if decoded is None: return {} - stake_map = { + return { ss58_encode(address=account_id, ss58_format=bittensor.__ss58_format__): [ StakeInfo.fix_decoded_values(d) for d in stake_info ] for account_id, stake_info in decoded } - return stake_map - @classmethod def list_from_vec_u8(cls, vec_u8: List[int]) -> List["StakeInfo"]: - r"""Returns a list of StakeInfo objects from a ``vec_u8``.""" + """Returns a list of StakeInfo objects from a ``vec_u8``.""" decoded = from_scale_encoding(vec_u8, ChainDataType.StakeInfo, is_vec=True) - if decoded is None: return [] - decoded = [StakeInfo.fix_decoded_values(d) for d in decoded] - - return decoded + return [StakeInfo.fix_decoded_values(d) for d in decoded] @dataclass class SubnetInfo: - r""" - Dataclass for subnet info. - """ + """Dataclass for subnet info.""" netuid: int rho: int @@ -953,16 +882,14 @@ class SubnetInfo: emission_value: float burn: Balance owner_ss58: str - # adjustment_alpha: int @classmethod def from_vec_u8(cls, vec_u8: List[int]) -> Optional["SubnetInfo"]: - r"""Returns a SubnetInfo object from a ``vec_u8``.""" + """Returns a SubnetInfo object from a ``vec_u8``.""" if len(vec_u8) == 0: return None decoded = from_scale_encoding(vec_u8, ChainDataType.SubnetInfo) - if decoded is None: return None @@ -978,13 +905,11 @@ def list_from_vec_u8(cls, vec_u8: List[int]) -> List["SubnetInfo"]: if decoded is None: return [] - decoded = [SubnetInfo.fix_decoded_values(d) for d in decoded] - - return decoded + return [SubnetInfo.fix_decoded_values(d) for d in decoded] @classmethod def fix_decoded_values(cls, decoded: Dict) -> "SubnetInfo": - r"""Returns a SubnetInfo object from a decoded SubnetInfo dictionary.""" + """Returns a SubnetInfo object from a decoded SubnetInfo dictionary.""" return SubnetInfo( netuid=decoded["netuid"], rho=decoded["rho"], @@ -1011,48 +936,26 @@ def fix_decoded_values(cls, decoded: Dict) -> "SubnetInfo": owner_ss58=ss58_encode(decoded["owner"], bittensor.__ss58_format__), ) - def _to_parameter_dict( - self, return_type: str - ) -> Union[dict[str, Any], "torch.nn.ParameterDict"]: - if return_type == "torch": - return torch.nn.ParameterDict(self.__dict__) - else: - return self.__dict__ - def to_parameter_dict(self) -> Union[dict[str, Any], "torch.nn.ParameterDict"]: """Returns a torch tensor or dict of the subnet info.""" if use_torch(): - return self._to_parameter_dict("torch") + return torch.nn.ParameterDict(self.__dict__) else: - return self._to_parameter_dict("numpy") - - @classmethod - def _from_parameter_dict_torch( - cls, parameter_dict: "torch.nn.ParameterDict" - ) -> "SubnetInfo": - """Returns a SubnetInfo object from a torch parameter_dict.""" - return cls(**dict(parameter_dict)) - - @classmethod - def _from_parameter_dict_numpy(cls, parameter_dict: dict[str, Any]) -> "SubnetInfo": - r"""Returns a SubnetInfo object from a parameter_dict.""" - return cls(**parameter_dict) + return self.__dict__ @classmethod def from_parameter_dict( cls, parameter_dict: Union[dict[str, Any], "torch.nn.ParameterDict"] ) -> "SubnetInfo": if use_torch(): - return cls._from_parameter_dict_torch(parameter_dict) + return cls(**dict(parameter_dict)) else: - return cls._from_parameter_dict_numpy(parameter_dict) + return cls(**parameter_dict) @dataclass class SubnetHyperparameters: - r""" - Dataclass for subnet hyperparameters. - """ + """Dataclass for subnet hyperparameters.""" rho: int kappa: int @@ -1081,12 +984,11 @@ class SubnetHyperparameters: @classmethod def from_vec_u8(cls, vec_u8: List[int]) -> Optional["SubnetHyperparameters"]: - r"""Returns a SubnetHyperparameters object from a ``vec_u8``.""" + """Returns a SubnetHyperparameters object from a ``vec_u8``.""" if len(vec_u8) == 0: return None decoded = from_scale_encoding(vec_u8, ChainDataType.SubnetHyperparameters) - if decoded is None: return None @@ -1094,21 +996,18 @@ def from_vec_u8(cls, vec_u8: List[int]) -> Optional["SubnetHyperparameters"]: @classmethod def list_from_vec_u8(cls, vec_u8: List[int]) -> List["SubnetHyperparameters"]: - r"""Returns a list of SubnetHyperparameters objects from a ``vec_u8``.""" + """Returns a list of SubnetHyperparameters objects from a ``vec_u8``.""" decoded = from_scale_encoding( vec_u8, ChainDataType.SubnetHyperparameters, is_vec=True, is_option=True ) - if decoded is None: return [] - decoded = [SubnetHyperparameters.fix_decoded_values(d) for d in decoded] - - return decoded + return [SubnetHyperparameters.fix_decoded_values(d) for d in decoded] @classmethod def fix_decoded_values(cls, decoded: Dict) -> "SubnetHyperparameters": - r"""Returns a SubnetInfo object from a decoded SubnetInfo dictionary.""" + """Returns a SubnetInfo object from a decoded SubnetInfo dictionary.""" return SubnetHyperparameters( rho=decoded["rho"], kappa=decoded["kappa"], @@ -1136,59 +1035,35 @@ def fix_decoded_values(cls, decoded: Dict) -> "SubnetHyperparameters": commit_reveal_weights_enabled=decoded["commit_reveal_weights_enabled"], ) - def _to_parameter_dict_torch( - self, return_type: str - ) -> Union[dict[str, Union[int, float, bool]], "torch.nn.ParameterDict"]: - if return_type == "torch": - return torch.nn.ParameterDict(self.__dict__) - else: - return self.__dict__ - def to_parameter_dict( self, ) -> Union[dict[str, Union[int, float, bool]], "torch.nn.ParameterDict"]: """Returns a torch tensor or dict of the subnet hyperparameters.""" if use_torch(): - return self._to_parameter_dict_torch("torch") + return torch.nn.ParameterDict(self.__dict__) else: - return self._to_parameter_dict_torch("numpy") - - @classmethod - def _from_parameter_dict_torch( - cls, parameter_dict: "torch.nn.ParameterDict" - ) -> "SubnetHyperparameters": - """Returns a SubnetHyperparameters object from a torch parameter_dict.""" - return cls(**dict(parameter_dict)) - - @classmethod - def _from_parameter_dict_numpy( - cls, parameter_dict: dict[str, Any] - ) -> "SubnetHyperparameters": - """Returns a SubnetHyperparameters object from a parameter_dict.""" - return cls(**parameter_dict) + return self.__dict__ @classmethod def from_parameter_dict( cls, parameter_dict: Union[dict[str, Any], "torch.nn.ParameterDict"] ) -> "SubnetHyperparameters": if use_torch(): - return cls._from_parameter_dict_torch(parameter_dict) + return cls(**dict(parameter_dict)) else: - return cls._from_parameter_dict_numpy(parameter_dict) + return cls(**parameter_dict) @dataclass class IPInfo: - r""" - Dataclass for associated IP Info. - """ + """Dataclass for associated IP Info.""" ip: str ip_type: int protocol: int def encode(self) -> Dict[str, Any]: - r"""Returns a dictionary of the IPInfo object that can be encoded.""" + """Returns a dictionary of the IPInfo object that can be encoded.""" return { "ip": net.ip_to_int( self.ip @@ -1198,12 +1073,11 @@ def encode(self) -> Dict[str, Any]: @classmethod def from_vec_u8(cls, vec_u8: List[int]) -> Optional["IPInfo"]: - r"""Returns a IPInfo object from a ``vec_u8``.""" + """Returns a IPInfo object from a ``vec_u8``.""" if len(vec_u8) == 0: return None decoded = from_scale_encoding(vec_u8, ChainDataType.IPInfo) - if decoded is None: return None @@ -1217,62 +1091,37 @@ def list_from_vec_u8(cls, vec_u8: List[int]) -> List["IPInfo"]: if decoded is None: return [] - decoded = [IPInfo.fix_decoded_values(d) for d in decoded] - - return decoded + return [IPInfo.fix_decoded_values(d) for d in decoded] @classmethod def fix_decoded_values(cls, decoded: Dict) -> "IPInfo": - r"""Returns a SubnetInfo object from a decoded IPInfo dictionary.""" + """Returns a SubnetInfo object from a decoded IPInfo dictionary.""" return IPInfo( - ip=bittensor.utils.networking.int_to_ip(decoded["ip"]), + ip=net.int_to_ip(decoded["ip"]), ip_type=decoded["ip_type_and_protocol"] >> 4, protocol=decoded["ip_type_and_protocol"] & 0xF, ) - def _to_parameter_dict( - self, return_type: str - ) -> Union[dict[str, Union[str, int]], "torch.nn.ParameterDict"]: - """Returns a torch tensor of the subnet info.""" - if return_type == "torch": - return torch.nn.ParameterDict(self.__dict__) - else: - return self.__dict__ - def to_parameter_dict( self, ) -> Union[dict[str, Union[str, int]], "torch.nn.ParameterDict"]: """Returns a torch tensor or dict of the subnet IP info.""" if use_torch(): - return self._to_parameter_dict("torch") + return torch.nn.ParameterDict(self.__dict__) else: - return self._to_parameter_dict("numpy") - - @classmethod - def _from_parameter_dict_torch( - cls, parameter_dict: "torch.nn.ParameterDict" - ) -> "IPInfo": - """Returns a IPInfo object from a torch parameter_dict.""" - return cls(**dict(parameter_dict)) - - @classmethod - def _from_parameter_dict_numpy(cls, parameter_dict: dict[str, Any]) -> "IPInfo": - """Returns a IPInfo object from a parameter_dict.""" - return cls(**parameter_dict) + return self.__dict__ @classmethod def from_parameter_dict( cls, parameter_dict: Union[dict[str, Any], "torch.nn.ParameterDict"] ) -> "IPInfo": if use_torch(): - return cls._from_parameter_dict_torch(parameter_dict) + return cls(**dict(parameter_dict)) else: - return cls._from_parameter_dict_numpy(parameter_dict) + return cls(**parameter_dict) # Senate / Proposal data - - class ProposalVoteData(TypedDict): index: int threshold: int diff --git a/bittensor/commands/overview.py b/bittensor/commands/overview.py index 477ad9f01a..b35fd85596 100644 --- a/bittensor/commands/overview.py +++ b/bittensor/commands/overview.py @@ -317,7 +317,7 @@ def _run(cli: "bittensor.cli", subtensor: "bittensor.subtensor"): de_registered_neurons = [] for hotkey_addr, our_stake in de_registered_stake: # Make a neuron info lite for this hotkey and coldkey. - de_registered_neuron = bittensor.NeuronInfoLite._null_neuron() + de_registered_neuron = bittensor.NeuronInfoLite.get_null_neuron() de_registered_neuron.hotkey = hotkey_addr de_registered_neuron.coldkey = ( coldkey_wallet.coldkeypub.ss58_address diff --git a/bittensor/mock/subtensor_mock.py b/bittensor/mock/subtensor_mock.py index b6be74095b..a0efe48d74 100644 --- a/bittensor/mock/subtensor_mock.py +++ b/bittensor/mock/subtensor_mock.py @@ -756,7 +756,7 @@ def neuron_for_uid( self, uid: int, netuid: int, block: Optional[int] = None ) -> Optional[NeuronInfo]: if uid is None: - return NeuronInfo._null_neuron() + return NeuronInfo.get_null_neuron() if block: if self.block_number < block: diff --git a/bittensor/subtensor.py b/bittensor/subtensor.py index ff48a50288..46f999eb97 100644 --- a/bittensor/subtensor.py +++ b/bittensor/subtensor.py @@ -942,7 +942,6 @@ def commit_weights( This function allows neurons to create a tamper-proof record of their weight distribution at a specific point in time, enhancing transparency and accountability within the Bittensor network. """ - uid = self.get_uid_for_hotkey_on_subnet(wallet.hotkey.ss58_address, netuid) retries = 0 success = False message = "No attempt made. Perhaps it is too soon to commit weights!" @@ -1081,7 +1080,6 @@ def reveal_weights( This function allows neurons to reveal their previously committed weight distribution, ensuring transparency and accountability within the Bittensor network. """ - uid = self.get_uid_for_hotkey_on_subnet(wallet.hotkey.ss58_address, netuid) retries = 0 success = False message = "No attempt made. Perhaps it is too soon to reveal weights!" @@ -4742,8 +4740,7 @@ def neuron_for_uid( subnet, offering insights into their roles in the network's consensus and validation mechanisms. """ if uid is None: - # TODO: fix `Access to a protected member _null_neuron of a class` error when chane_data.py refactoring. - return NeuronInfo._null_neuron() + return NeuronInfo.get_null_neuron() @retry(delay=1, tries=3, backoff=2, max_delay=4, logger=_logger) def make_substrate_call_with_retry(): @@ -4758,8 +4755,7 @@ def make_substrate_call_with_retry(): json_body = make_substrate_call_with_retry() if not (result := json_body.get("result", None)): - # TODO: fix `Access to a protected member _null_neuron of a class` error when chane_data.py refactoring. - return NeuronInfo._null_neuron() + return NeuronInfo.get_null_neuron() return NeuronInfo.from_vec_u8(result) @@ -4814,8 +4810,7 @@ def neuron_for_uid_lite( subnet without the need for comprehensive data retrieval. """ if uid is None: - # TODO: fix `Access to a protected member _null_neuron of a class` error when chane_data.py refactoring. - return NeuronInfoLite._null_neuron() + return NeuronInfoLite.get_null_neuron() hex_bytes_result = self.query_runtime_api( runtime_api="NeuronInfoRuntimeApi", @@ -4828,8 +4823,7 @@ def neuron_for_uid_lite( ) if hex_bytes_result is None: - # TODO: fix `Access to a protected member _null_neuron of a class` error when chane_data.py refactoring. - return NeuronInfoLite._null_neuron() + return NeuronInfoLite.get_null_neuron() if hex_bytes_result.startswith("0x"): bytes_result = bytes.fromhex(hex_bytes_result[2:]) diff --git a/tests/integration_tests/test_subtensor_integration.py b/tests/integration_tests/test_subtensor_integration.py index 8849aeb9db..e3661210bc 100644 --- a/tests/integration_tests/test_subtensor_integration.py +++ b/tests/integration_tests/test_subtensor_integration.py @@ -734,7 +734,7 @@ def is_registered_side_effect(*args, **kwargs): ) self.subtensor.get_neuron_for_pubkey_and_subnet = MagicMock( - return_value=bittensor.NeuronInfo._null_neuron() + return_value=bittensor.NeuronInfo.get_null_neuron() ) self.subtensor.is_hotkey_registered = MagicMock( side_effect=is_registered_side_effect @@ -816,7 +816,7 @@ class ExitEarly(Exception): # then should create a new pow and check if it is stale # then should enter substrate and exit early because of test self.subtensor.get_neuron_for_pubkey_and_subnet = MagicMock( - return_value=bittensor.NeuronInfo._null_neuron() + return_value=bittensor.NeuronInfo.get_null_neuron() ) with pytest.raises(ExitEarly): bittensor.subtensor.register(mock_subtensor_self, mock_wallet, netuid=3)