From cdaa11e0e08f51745cb3c9a65ec4f9e4ff03f317 Mon Sep 17 00:00:00 2001 From: Peter Jung Date: Fri, 13 Dec 2024 13:38:23 +0100 Subject: [PATCH 1/4] Make nft web app nicer --- .../app_nft_treasury_game.py | 86 +++++++++++-------- .../db/long_term_memory_table_handler.py | 2 +- prediction_market_agent/db/sql_handler.py | 2 +- 3 files changed, 52 insertions(+), 38 deletions(-) diff --git a/prediction_market_agent/agents/microchain_agent/nft_treasury_game/app_nft_treasury_game.py b/prediction_market_agent/agents/microchain_agent/nft_treasury_game/app_nft_treasury_game.py index 21296f0b..5bf03757 100644 --- a/prediction_market_agent/agents/microchain_agent/nft_treasury_game/app_nft_treasury_game.py +++ b/prediction_market_agent/agents/microchain_agent/nft_treasury_game/app_nft_treasury_game.py @@ -11,7 +11,8 @@ import streamlit as st from microchain.functions import Reasoning from prediction_market_agent_tooling.tools.balances import get_balances -from prediction_market_agent_tooling.tools.datetime_utc import DatetimeUTC +from prediction_market_agent_tooling.tools.utils import check_not_none +from streamlit_extras.stylable_container import stylable_container from prediction_market_agent.agents.identifiers import AgentIdentifier from prediction_market_agent.agents.microchain_agent.messages_functions import ( @@ -27,6 +28,7 @@ DeployableAgentNFTGameAbstract, ) from prediction_market_agent.db.long_term_memory_table_handler import ( + LongTermMemories, LongTermMemoryTableHandler, ) from prediction_market_agent.db.prompt_table_handler import PromptTableHandler @@ -63,8 +65,8 @@ def send_message_part(nft_agent: type[DeployableAgentNFTGameAbstract]) -> None: def parse_function_and_body( - role: t.Literal["user", "assistant", "system"], message: str -) -> t.Tuple[str | None, str | None]: + role: t.Literal["user", "assistant"], message: str +) -> t.Tuple[str, str]: message = message.strip() if role == "assistant": @@ -75,10 +77,6 @@ def parse_function_and_body( # Responses from the individual functions are stored under `user` role. parsed_function = DummyFunctionName.RESPONSE_FUNCTION_NAME parsed_body = message - elif role == "system": - # System message isn't shown in the chat history, so ignore. - parsed_function = None - parsed_body = None else: raise ValueError(f"Unknown role: {role}") @@ -86,26 +84,23 @@ def parse_function_and_body( def customized_chat_message( - role: t.Literal["user", "assistant", "system"], - message: str, - created_at: DatetimeUTC, + function_call: LongTermMemories, + function_output: LongTermMemories, ) -> None: - parsed_function, parsed_body = parse_function_and_body(role, message) - if parsed_function is None: - return - # If the message is output from one of these functions, skip it, because it's not interesting to read `The reasoning has been recorded` and similar over and over again. - if parsed_body in ( - Reasoning()(""), - BroadcastPublicMessageToHumans.OUTPUT_TEXT, - SendPaidMessageToAnotherAgent.OUTPUT_TEXT, - ): - return + created_at = function_output.datetime_ + + parsed_function_call_name, parsed_function_call_body = parse_function_and_body( + check_not_none(function_call.metadata_dict)["role"], + check_not_none(function_call.metadata_dict)["content"], + ) + parsed_function_output_name, parsed_function_output_body = parse_function_and_body( + check_not_none(function_output.metadata_dict)["role"], + check_not_none(function_output.metadata_dict)["content"], + ) - match parsed_function: + match parsed_function_call_name: case Reasoning.__name__: icon = "🧠" - case DummyFunctionName.RESPONSE_FUNCTION_NAME: - icon = "✔️" case ReceiveMessage.__name__: icon = "👤" case BroadcastPublicMessageToHumans.__name__: @@ -116,11 +111,24 @@ def customized_chat_message( icon = "🤖" with st.chat_message(icon): - if parsed_function: - st.markdown(f"**{parsed_function}**") - st.write(created_at.strftime("%Y-%m-%d %H:%M:%S")) - if message: - st.markdown(parsed_body) + if parsed_function_call_name == Reasoning.__name__: + # Don't show reasoning as function call, to make it a bit nicer. + st.markdown( + parsed_function_call_body.replace("reasoning='", "").replace("')", "") + ) + else: + # Otherwise, show it as a normal function-response call, e.g. `ReceiveMessages() -> ...`. + st.markdown( + f"**{parsed_function_call_name}**({parsed_function_call_body}) *{created_at.strftime('%Y-%m-%d %H:%M:%S')}*" + ) + + # Only show the output if it's supposed to be interesting. + if parsed_function_call_name not in ( + Reasoning.__name__, + BroadcastPublicMessageToHumans.__name__, + SendPaidMessageToAnotherAgent.__name__, + ): + st.markdown(parsed_function_output_body) @st.fragment(run_every=timedelta(seconds=5)) @@ -134,14 +142,20 @@ def show_function_calls_part(nft_agent: type[DeployableAgentNFTGameAbstract]) -> st.markdown("No actions yet.") return - for item in calls: - if item.metadata_dict is None: - continue - customized_chat_message( - item.metadata_dict["role"], - item.metadata_dict["content"], - item.datetime_, - ) + # Filter out system calls, because they aren't supposed to be shown in the chat history itself. + calls = [ + call for call in calls if check_not_none(call.metadata_dict)["role"] != "system" + ] + + # Microchain works on `function call` - `funtion response` pairs, so we will process them together. + for index, (function_output, function_call) in enumerate( + zip(calls[::2], calls[1::2]) + ): + with stylable_container( + key=f"function_call_{index}", + css_styles=f"{{background-color: {'#f0f0f0' if (index % 2 == 0) else 'white'}; border-radius: 5px;}}", + ): + customized_chat_message(function_call, function_output) @st.fragment(run_every=timedelta(seconds=5)) diff --git a/prediction_market_agent/db/long_term_memory_table_handler.py b/prediction_market_agent/db/long_term_memory_table_handler.py index d9a7c3a7..463f2054 100644 --- a/prediction_market_agent/db/long_term_memory_table_handler.py +++ b/prediction_market_agent/db/long_term_memory_table_handler.py @@ -49,7 +49,7 @@ def search( from_: DatetimeUTC | None = None, to_: DatetimeUTC | None = None, limit: int | None = None, - ) -> t.Sequence[LongTermMemories]: + ) -> list[LongTermMemories]: """Searches the LongTermMemoryTableHandler for entries within a specified datetime range that match self.task_description.""" query_filters = [ diff --git a/prediction_market_agent/db/sql_handler.py b/prediction_market_agent/db/sql_handler.py index 35aee573..48094d14 100644 --- a/prediction_market_agent/db/sql_handler.py +++ b/prediction_market_agent/db/sql_handler.py @@ -35,7 +35,7 @@ def get_with_filter_and_order( order_by_column_name: str | None = None, order_desc: bool = True, limit: int | None = None, - ) -> t.Sequence[SQLModelType]: + ) -> list[SQLModelType]: with self.db_manager.get_session() as session: query = session.query(self.table) for exp in query_filters: From 9199a1283f0c1a1e9a844528d4354e791288e0fb Mon Sep 17 00:00:00 2001 From: Peter Jung Date: Fri, 13 Dec 2024 14:25:01 +0100 Subject: [PATCH 2/4] Add popup with unprocessed messages --- .../app_nft_treasury_game.py | 27 +++++++++++ .../db/blockchain_transaction_fetcher.py | 47 ++++++++++--------- 2 files changed, 53 insertions(+), 21 deletions(-) diff --git a/prediction_market_agent/agents/microchain_agent/nft_treasury_game/app_nft_treasury_game.py b/prediction_market_agent/agents/microchain_agent/nft_treasury_game/app_nft_treasury_game.py index 5bf03757..f086bf10 100644 --- a/prediction_market_agent/agents/microchain_agent/nft_treasury_game/app_nft_treasury_game.py +++ b/prediction_market_agent/agents/microchain_agent/nft_treasury_game/app_nft_treasury_game.py @@ -12,6 +12,7 @@ from microchain.functions import Reasoning from prediction_market_agent_tooling.tools.balances import get_balances from prediction_market_agent_tooling.tools.utils import check_not_none +from prediction_market_agent_tooling.tools.web3_utils import wei_to_xdai from streamlit_extras.stylable_container import stylable_container from prediction_market_agent.agents.identifiers import AgentIdentifier @@ -27,6 +28,9 @@ DEPLOYED_NFT_AGENTS, DeployableAgentNFTGameAbstract, ) +from prediction_market_agent.db.blockchain_transaction_fetcher import ( + BlockchainTransactionFetcher, +) from prediction_market_agent.db.long_term_memory_table_handler import ( LongTermMemories, LongTermMemoryTableHandler, @@ -43,6 +47,11 @@ class DummyFunctionName(str, Enum): RESPONSE_FUNCTION_NAME = "Response" +@st.cache_resource +def blockchain_transaction_fetcher() -> BlockchainTransactionFetcher: + return BlockchainTransactionFetcher() + + @st.cache_resource def long_term_memory_table_handler( identifier: AgentIdentifier, @@ -185,6 +194,24 @@ def show_about_agent_part(nft_agent: type[DeployableAgentNFTGameAbstract]) -> No value=system_prompt, disabled=True, ) + st.markdown("---") + with st.popover("Show unprocessed incoming messages"): + transactions = blockchain_transaction_fetcher().fetch_unseen_transactions( + nft_agent.wallet_address + ) + + if not transactions: + st.info("No unprocessed messages") + else: + for transaction in transactions: + st.markdown( + f""" + **From:** {transaction.sender_address} + **Message:** {transaction.data_field} + **Value:** {wei_to_xdai(transaction.value_wei_parsed)} xDai + """ + ) + st.divider() @st.fragment(run_every=timedelta(seconds=5)) diff --git a/prediction_market_agent/db/blockchain_transaction_fetcher.py b/prediction_market_agent/db/blockchain_transaction_fetcher.py index c1988d12..d2977b24 100644 --- a/prediction_market_agent/db/blockchain_transaction_fetcher.py +++ b/prediction_market_agent/db/blockchain_transaction_fetcher.py @@ -1,3 +1,5 @@ +from typing import Any + import polars as pl import spice from eth_typing import ChecksumAddress @@ -27,9 +29,21 @@ def unzip_message_else_do_nothing(self, data_field: str) -> str: except: return data_field - def fetch_unseen_transactions_df( + def blockchain_message_from_dune_df_row( + self, consumer_address: ChecksumAddress, x: dict[str, Any] + ) -> BlockchainMessage: + return BlockchainMessage( + consumer_address=consumer_address, + transaction_hash=x["hash"], + value_wei=str(x["value"]), + block=str(x["block_number"]), + sender_address=x["from"], + data_field=self.unzip_message_else_do_nothing(x["data"]), + ) + + def fetch_unseen_transactions( self, consumer_address: ChecksumAddress - ) -> pl.DataFrame: + ) -> list[BlockchainMessage]: keys = APIKeys() latest_blockchain_message = ( self.blockchain_table_handler.fetch_latest_blockchain_message( @@ -50,13 +64,16 @@ def fetch_unseen_transactions_df( ) # Filter out existing hashes - hashes are by default lowercase df = df.filter(~pl.col("hash").is_in([i.hex() for i in existing_hashes])) - return df + return [ + self.blockchain_message_from_dune_df_row(consumer_address, x) + for x in df.iter_rows(named=True) + ] def fetch_count_unprocessed_transactions( self, consumer_address: ChecksumAddress ) -> int: - df = self.fetch_unseen_transactions_df(consumer_address=consumer_address) - return len(df) + transactions = self.fetch_unseen_transactions(consumer_address=consumer_address) + return len(transactions) def fetch_one_unprocessed_blockchain_message_and_store_as_processed( self, consumer_address: ChecksumAddress @@ -65,25 +82,13 @@ def fetch_one_unprocessed_blockchain_message_and_store_as_processed( Method for fetching oldest unprocessed transaction sent to the consumer address. After being fetched, it is stored in the DB as processed. """ - df = self.fetch_unseen_transactions_df(consumer_address=consumer_address) - if df.is_empty(): + transactions = self.fetch_unseen_transactions(consumer_address=consumer_address) + if not transactions: return None # We only want the oldest non-processed message. - oldest_non_processed_message = df.row(0, named=True) - blockchain_message = BlockchainMessage( - consumer_address=consumer_address, - transaction_hash=oldest_non_processed_message["hash"], - value_wei=str(oldest_non_processed_message["value"]), - block=str(oldest_non_processed_message["block_number"]), - sender_address=oldest_non_processed_message["from"], - data_field=self.unzip_message_else_do_nothing( - oldest_non_processed_message["data"] - ), - ) + blockchain_message = transactions[0] - # Store here to avoid having to refresh after session was closed. - item = blockchain_message.model_copy(deep=True) # mark unseen transaction as processed in DB self.blockchain_table_handler.save_multiple([blockchain_message]) - return item + return blockchain_message From a86ba31d4cd6596e0c273a0cbac17d1f5506a7bc Mon Sep 17 00:00:00 2001 From: Peter Jung Date: Fri, 13 Dec 2024 15:34:18 +0100 Subject: [PATCH 3/4] forgoten deep copy --- prediction_market_agent/db/blockchain_transaction_fetcher.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/prediction_market_agent/db/blockchain_transaction_fetcher.py b/prediction_market_agent/db/blockchain_transaction_fetcher.py index d2977b24..9e718a56 100644 --- a/prediction_market_agent/db/blockchain_transaction_fetcher.py +++ b/prediction_market_agent/db/blockchain_transaction_fetcher.py @@ -65,7 +65,9 @@ def fetch_unseen_transactions( # Filter out existing hashes - hashes are by default lowercase df = df.filter(~pl.col("hash").is_in([i.hex() for i in existing_hashes])) return [ - self.blockchain_message_from_dune_df_row(consumer_address, x) + self.blockchain_message_from_dune_df_row(consumer_address, x).model_copy( + deep=True # To prevent `is not bound to a Session` error. + ) for x in df.iter_rows(named=True) ] From 642c48a42d61d5d22b2e734d1fc5edeefbf57ee6 Mon Sep 17 00:00:00 2001 From: Peter Jung Date: Mon, 16 Dec 2024 09:52:25 +0100 Subject: [PATCH 4/4] workaround --- .../db/blockchain_message_table_handler.py | 5 ++++- prediction_market_agent/db/blockchain_transaction_fetcher.py | 4 +--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/prediction_market_agent/db/blockchain_message_table_handler.py b/prediction_market_agent/db/blockchain_message_table_handler.py index 22950dc9..97080ae9 100644 --- a/prediction_market_agent/db/blockchain_message_table_handler.py +++ b/prediction_market_agent/db/blockchain_message_table_handler.py @@ -48,4 +48,7 @@ def fetch_all_transaction_hashes( return list(set(tx_hashes)) def save_multiple(self, items: t.Sequence[BlockchainMessage]) -> None: - return self.sql_handler.save_multiple(items) + return self.sql_handler.save_multiple( + # Re-create the items to avoid SQLModel errors. This is a workaround. It's weird, but it works. :shrug: + [BlockchainMessage(**i.model_dump()) for i in items] + ) diff --git a/prediction_market_agent/db/blockchain_transaction_fetcher.py b/prediction_market_agent/db/blockchain_transaction_fetcher.py index 9e718a56..d2977b24 100644 --- a/prediction_market_agent/db/blockchain_transaction_fetcher.py +++ b/prediction_market_agent/db/blockchain_transaction_fetcher.py @@ -65,9 +65,7 @@ def fetch_unseen_transactions( # Filter out existing hashes - hashes are by default lowercase df = df.filter(~pl.col("hash").is_in([i.hex() for i in existing_hashes])) return [ - self.blockchain_message_from_dune_df_row(consumer_address, x).model_copy( - deep=True # To prevent `is not bound to a Session` error. - ) + self.blockchain_message_from_dune_df_row(consumer_address, x) for x in df.iter_rows(named=True) ]