-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add popup with unprocessed messages #603
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
from microchain.functions import Reasoning, Stop | ||
from prediction_market_agent_tooling.tools.balances import get_balances | ||
from prediction_market_agent_tooling.tools.utils import check_not_none | ||
from prediction_market_agent_tooling.tools.web3_utils import wei_to_xdai | ||
from streamlit_extras.stylable_container import stylable_container | ||
|
||
from prediction_market_agent.agents.identifiers import AgentIdentifier | ||
|
@@ -27,6 +28,9 @@ | |
DEPLOYED_NFT_AGENTS, | ||
DeployableAgentNFTGameAbstract, | ||
) | ||
from prediction_market_agent.db.blockchain_transaction_fetcher import ( | ||
BlockchainTransactionFetcher, | ||
) | ||
from prediction_market_agent.db.long_term_memory_table_handler import ( | ||
LongTermMemories, | ||
LongTermMemoryTableHandler, | ||
|
@@ -43,6 +47,11 @@ class DummyFunctionName(str, Enum): | |
RESPONSE_FUNCTION_NAME = "Response" | ||
|
||
|
||
@st.cache_resource | ||
def blockchain_transaction_fetcher() -> BlockchainTransactionFetcher: | ||
return BlockchainTransactionFetcher() | ||
|
||
|
||
@st.cache_resource | ||
def long_term_memory_table_handler( | ||
identifier: AgentIdentifier, | ||
|
@@ -191,6 +200,24 @@ def show_about_agent_part(nft_agent: type[DeployableAgentNFTGameAbstract]) -> No | |
value=system_prompt, | ||
disabled=True, | ||
) | ||
st.markdown("---") | ||
with st.popover("Show unprocessed incoming messages"): | ||
transactions = blockchain_transaction_fetcher().fetch_unseen_transactions( | ||
nft_agent.wallet_address | ||
) | ||
|
||
if not transactions: | ||
st.info("No unprocessed messages") | ||
else: | ||
for transaction in transactions: | ||
st.markdown( | ||
f""" | ||
**From:** {transaction.sender_address} | ||
**Message:** {transaction.data_field} | ||
**Value:** {wei_to_xdai(transaction.value_wei_parsed)} xDai | ||
""" | ||
) | ||
Comment on lines
+213
to
+219
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure proper sanitization when displaying user-generated content Displaying Verify that import html
st.markdown(
f"""
**From:** {transaction.sender_address}
**Message:** {html.escape(transaction.data_field)}
**Value:** {wei_to_xdai(transaction.value_wei_parsed)} xDai
"""
) This ensures that any HTML or special characters in |
||
st.divider() | ||
|
||
|
||
@st.fragment(run_every=timedelta(seconds=5)) | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,3 +1,5 @@ | ||||||||||||||||||||||||||||||||||||||||||||||
from typing import Any | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
import polars as pl | ||||||||||||||||||||||||||||||||||||||||||||||
import spice | ||||||||||||||||||||||||||||||||||||||||||||||
from eth_typing import ChecksumAddress | ||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -27,9 +29,21 @@ def unzip_message_else_do_nothing(self, data_field: str) -> str: | |||||||||||||||||||||||||||||||||||||||||||||
except: | ||||||||||||||||||||||||||||||||||||||||||||||
return data_field | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
def fetch_unseen_transactions_df( | ||||||||||||||||||||||||||||||||||||||||||||||
def blockchain_message_from_dune_df_row( | ||||||||||||||||||||||||||||||||||||||||||||||
self, consumer_address: ChecksumAddress, x: dict[str, Any] | ||||||||||||||||||||||||||||||||||||||||||||||
) -> BlockchainMessage: | ||||||||||||||||||||||||||||||||||||||||||||||
return BlockchainMessage( | ||||||||||||||||||||||||||||||||||||||||||||||
consumer_address=consumer_address, | ||||||||||||||||||||||||||||||||||||||||||||||
transaction_hash=x["hash"], | ||||||||||||||||||||||||||||||||||||||||||||||
value_wei=str(x["value"]), | ||||||||||||||||||||||||||||||||||||||||||||||
block=str(x["block_number"]), | ||||||||||||||||||||||||||||||||||||||||||||||
sender_address=x["from"], | ||||||||||||||||||||||||||||||||||||||||||||||
data_field=self.unzip_message_else_do_nothing(x["data"]), | ||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+32
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle potential In Consider modifying the method to handle missing keys gracefully: def blockchain_message_from_dune_df_row(
self, consumer_address: ChecksumAddress, x: dict[str, Any]
) -> BlockchainMessage:
return BlockchainMessage(
consumer_address=consumer_address,
- transaction_hash=x["hash"],
- value_wei=str(x["value"]),
- block=str(x["block_number"]),
- sender_address=x["from"],
- data_field=self.unzip_message_else_do_nothing(x["data"]),
+ transaction_hash=x.get("hash", ""),
+ value_wei=str(x.get("value", 0)),
+ block=str(x.get("block_number", 0)),
+ sender_address=x.get("from", ""),
+ data_field=self.unzip_message_else_do_nothing(x.get("data", "")),
) This ensures that missing keys default to safe values, preventing 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
def fetch_unseen_transactions( | ||||||||||||||||||||||||||||||||||||||||||||||
self, consumer_address: ChecksumAddress | ||||||||||||||||||||||||||||||||||||||||||||||
) -> pl.DataFrame: | ||||||||||||||||||||||||||||||||||||||||||||||
) -> list[BlockchainMessage]: | ||||||||||||||||||||||||||||||||||||||||||||||
keys = APIKeys() | ||||||||||||||||||||||||||||||||||||||||||||||
latest_blockchain_message = ( | ||||||||||||||||||||||||||||||||||||||||||||||
self.blockchain_table_handler.fetch_latest_blockchain_message( | ||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -50,13 +64,18 @@ def fetch_unseen_transactions_df( | |||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||
# Filter out existing hashes - hashes are by default lowercase | ||||||||||||||||||||||||||||||||||||||||||||||
df = df.filter(~pl.col("hash").is_in([i.hex() for i in existing_hashes])) | ||||||||||||||||||||||||||||||||||||||||||||||
return df | ||||||||||||||||||||||||||||||||||||||||||||||
return [ | ||||||||||||||||||||||||||||||||||||||||||||||
self.blockchain_message_from_dune_df_row(consumer_address, x).model_copy( | ||||||||||||||||||||||||||||||||||||||||||||||
deep=True # To prevent `is not bound to a Session` error. | ||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||
for x in df.iter_rows(named=True) | ||||||||||||||||||||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
def fetch_count_unprocessed_transactions( | ||||||||||||||||||||||||||||||||||||||||||||||
self, consumer_address: ChecksumAddress | ||||||||||||||||||||||||||||||||||||||||||||||
) -> int: | ||||||||||||||||||||||||||||||||||||||||||||||
df = self.fetch_unseen_transactions_df(consumer_address=consumer_address) | ||||||||||||||||||||||||||||||||||||||||||||||
return len(df) | ||||||||||||||||||||||||||||||||||||||||||||||
transactions = self.fetch_unseen_transactions(consumer_address=consumer_address) | ||||||||||||||||||||||||||||||||||||||||||||||
return len(transactions) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+75
to
77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Avoid fetching all transactions just to get the count Calling Refactor def fetch_count_unprocessed_transactions(
self, consumer_address: ChecksumAddress
) -> int:
- transactions = self.fetch_unseen_transactions(consumer_address=consumer_address)
- return len(transactions)
+ keys = APIKeys()
+ latest_blockchain_message = (
+ self.blockchain_table_handler.fetch_latest_blockchain_message(
+ consumer_address
+ )
+ )
+ min_block_number = (
+ 0 if not latest_blockchain_message else latest_blockchain_message.block
+ )
+ query = f'SELECT COUNT(*) AS count FROM gnosis.transactions WHERE "to" = {Web3.to_checksum_address(consumer_address)} AND block_number >= {min_block_number} AND value >= {xdai_to_wei(MicrochainAgentKeys().RECEIVER_MINIMUM_AMOUNT)}'
+ df = spice.query(query, api_key=keys.dune_api_key.get_secret_value())
+ existing_hashes = self.blockchain_table_handler.fetch_all_transaction_hashes(
+ consumer_address=consumer_address
+ )
+ total_count = df.select("count")[0, 0]
+ unseen_count = total_count - len(existing_hashes)
+ return max(unseen_count, 0) This change reduces unnecessary data retrieval and improves performance.
|
||||||||||||||||||||||||||||||||||||||||||||||
def fetch_one_unprocessed_blockchain_message_and_store_as_processed( | ||||||||||||||||||||||||||||||||||||||||||||||
self, consumer_address: ChecksumAddress | ||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -65,25 +84,13 @@ def fetch_one_unprocessed_blockchain_message_and_store_as_processed( | |||||||||||||||||||||||||||||||||||||||||||||
Method for fetching oldest unprocessed transaction sent to the consumer address. | ||||||||||||||||||||||||||||||||||||||||||||||
After being fetched, it is stored in the DB as processed. | ||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||
df = self.fetch_unseen_transactions_df(consumer_address=consumer_address) | ||||||||||||||||||||||||||||||||||||||||||||||
if df.is_empty(): | ||||||||||||||||||||||||||||||||||||||||||||||
transactions = self.fetch_unseen_transactions(consumer_address=consumer_address) | ||||||||||||||||||||||||||||||||||||||||||||||
if not transactions: | ||||||||||||||||||||||||||||||||||||||||||||||
return None | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
# We only want the oldest non-processed message. | ||||||||||||||||||||||||||||||||||||||||||||||
oldest_non_processed_message = df.row(0, named=True) | ||||||||||||||||||||||||||||||||||||||||||||||
blockchain_message = BlockchainMessage( | ||||||||||||||||||||||||||||||||||||||||||||||
consumer_address=consumer_address, | ||||||||||||||||||||||||||||||||||||||||||||||
transaction_hash=oldest_non_processed_message["hash"], | ||||||||||||||||||||||||||||||||||||||||||||||
value_wei=str(oldest_non_processed_message["value"]), | ||||||||||||||||||||||||||||||||||||||||||||||
block=str(oldest_non_processed_message["block_number"]), | ||||||||||||||||||||||||||||||||||||||||||||||
sender_address=oldest_non_processed_message["from"], | ||||||||||||||||||||||||||||||||||||||||||||||
data_field=self.unzip_message_else_do_nothing( | ||||||||||||||||||||||||||||||||||||||||||||||
oldest_non_processed_message["data"] | ||||||||||||||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||
blockchain_message = transactions[0] | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
# Store here to avoid having to refresh after session was closed. | ||||||||||||||||||||||||||||||||||||||||||||||
item = blockchain_message.model_copy(deep=True) | ||||||||||||||||||||||||||||||||||||||||||||||
# mark unseen transaction as processed in DB | ||||||||||||||||||||||||||||||||||||||||||||||
self.blockchain_table_handler.save_multiple([blockchain_message]) | ||||||||||||||||||||||||||||||||||||||||||||||
return item | ||||||||||||||||||||||||||||||||||||||||||||||
return blockchain_message | ||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+85
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Optimize fetching of a single unprocessed transaction Fetching all unseen transactions and selecting the first one is inefficient, especially when only one transaction is needed. Modify the method to fetch only the oldest unseen transaction: def fetch_one_unprocessed_blockchain_message_and_store_as_processed(
self, consumer_address: ChecksumAddress
) -> BlockchainMessage | None:
"""
Method for fetching the oldest unprocessed transaction sent to the consumer address.
After being fetched, it is stored in the DB as processed.
"""
- transactions = self.fetch_unseen_transactions(consumer_address=consumer_address)
- if not transactions:
+ transactions = self.fetch_unseen_transactions(consumer_address=consumer_address, limit=1)
+ if not transactions:
return None
blockchain_message = transactions[0]
# Mark unseen transaction as processed in DB
self.blockchain_table_handler.save_multiple([blockchain_message])
return blockchain_message Adjust
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I click 2x on the button, does it trigger
fetch_unseen_transactions
2x?spice
is caching results, but still I would suggest placing thetransactions
inside ast.session_state
, updating that throughst.fragment
and just reading from that inside the popover. Wdyt?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole thing is in the fragment already:
I don't see how saving it manually to state would help?
As for the button, I tested it and code inside of with block is executed regardless of clicking the button:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ow, understood - so button simply toggles the
disabled
property and shows stuff, but doesn't trigger any fetching logic. Fine.