From 0242f984d7e84a611a412bef96e6d02a85218713 Mon Sep 17 00:00:00 2001 From: Martin Date: Tue, 13 Dec 2022 03:13:46 +0300 Subject: [PATCH] Cache field types of the given model --- pydantic_redis/model.py | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/pydantic_redis/model.py b/pydantic_redis/model.py index ff99a3a0..13f950c5 100644 --- a/pydantic_redis/model.py +++ b/pydantic_redis/model.py @@ -19,6 +19,7 @@ class Model(_AbstractModel): _nested_model_tuple_fields = {} _nested_model_list_fields = {} _nested_model_fields = {} + _field_types = {} @classmethod def __get_primary_key(cls, primary_key_value: Any): @@ -37,13 +38,13 @@ def get_table_index_key(cls): @classmethod def initialize(cls): """Initializes class-wide variables for performance's reasons e.g. it caches the nested model fields""" - field_types = typing.get_type_hints(cls) + cls._field_types = typing.get_type_hints(cls) cls._nested_model_list_fields = {} cls._nested_model_tuple_fields = {} cls._nested_model_fields = {} - for field, field_type in field_types.items(): + for field, field_type in cls._field_types.items(): try: # In case the annotation is Optional, an alias of Union[X, None], extract the X is_generic = hasattr(field_type, "__origin__") @@ -199,23 +200,22 @@ def __parse_dict_list(cls, data: List[Dict[bytes, Any]]) -> List[Dict[str, Any]] cls.deserialize_partially(record) for record in data if record != {} ] if len(parsed_data) > 0: - field_types = typing.get_type_hints(cls) keys = [*parsed_data[0].keys()] for k in keys: if k.startswith(NESTED_MODEL_LIST_FIELD_PREFIX): cls.__eager_load_nested_model_lists( - prefixed_field=k, data=parsed_data, field_types=field_types + prefixed_field=k, data=parsed_data ) elif k.startswith(NESTED_MODEL_TUPLE_FIELD_PREFIX): cls.__eager_load_nested_model_tuples( - prefixed_field=k, data=parsed_data, field_types=field_types + prefixed_field=k, data=parsed_data ) elif k.startswith(NESTED_MODEL_PREFIX): cls.__eager_load_nested_models( - prefixed_field=k, data=parsed_data, field_types=field_types + prefixed_field=k, data=parsed_data, field_types=cls._field_types ) return parsed_data @@ -224,7 +224,6 @@ def __eager_load_nested_model_lists( cls, prefixed_field: str, data: List[Dict[str, Any]], - field_types: Dict[str, Any], ): """ Eagerly loads any properties that have `List[Model]` or Optional[List[Model]]` as their type annotations @@ -234,24 +233,17 @@ def __eager_load_nested_model_lists( [{"___books": ["id1", "id2"]}] becomes [{"books": [Book{"id": "id1", ...}, Book{"id": "id2", ...}]}] """ field = strip_leading(prefixed_field, NESTED_MODEL_LIST_FIELD_PREFIX) - field_type = field_types.get(field) - model_type = field_type.__args__[0] - - # in case the field is Optional e.g. books: Optional[List[Model]], an alias for Union[List[Model], None] - is_optional = field_type.__origin__ == Union - if is_optional: - model_type = model_type.__args__[0] + field_type = cls._nested_model_list_fields.get(field) for record in data: ids = record.pop(prefixed_field, None) - record[field] = model_type.select(ids=ids) + record[field] = field_type.select(ids=ids) @classmethod def __eager_load_nested_model_tuples( cls, prefixed_field: str, data: List[Dict[str, Any]], - field_types: Dict[str, Any], ): """ Eagerly loads any properties that have `Tuple[Model]` or `Optional[Tuple[Model]]` as their type annotations @@ -287,7 +279,7 @@ def __eager_load_nested_models( [{"__book": "id1"}] becomes [{"book": Book{"id": "id1", ...}}] """ field = strip_leading(prefixed_field, NESTED_MODEL_PREFIX) - model_type = field_types.get(field) + model_type = cls._nested_model_fields.get(field) ids: List[str] = [record.pop(prefixed_field, None) for record in data] # a bulk network request might be faster than eagerly loading for each record for many records @@ -400,7 +392,7 @@ def __get_select_fields(cls, columns: Optional[List[str]]) -> Optional[List[str] if columns is None: return None - field_types = typing.get_type_hints(cls) + field_types = cls._field_types fields = [] for col in columns: