Skip to content

Commit

Permalink
Improve matching of langfuse traces with resolved bets. Add example (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
evangriffiths authored Sep 19, 2024
1 parent 45601b6 commit 3794ce2
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 25 deletions.
57 changes: 57 additions & 0 deletions examples/monitor/match_bets_with_langfuse_traces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from datetime import datetime

from langfuse import Langfuse
from web3 import Web3

from prediction_market_agent_tooling.config import APIKeys
from prediction_market_agent_tooling.markets.data_models import ResolvedBet
from prediction_market_agent_tooling.markets.omen.omen import OmenAgentMarket
from prediction_market_agent_tooling.tools.langfuse_client_utils import (
ProcessMarketTrace,
ResolvedBetWithTrace,
get_trace_for_bet,
get_traces_for_agent,
)

if __name__ == "__main__":
api_keys = APIKeys()
assert api_keys.bet_from_address == Web3.to_checksum_address(
"0xe7aa88a1d044e5c987ecce55ae8d2b562a41b72d" # prophetgpt4
)
start_time = datetime(2024, 9, 13)
langfuse = Langfuse(
secret_key=api_keys.langfuse_secret_key.get_secret_value(),
public_key=api_keys.langfuse_public_key,
host=api_keys.langfuse_host,
)

traces = get_traces_for_agent(
agent_name="DeployablePredictionProphetGPT4TurboFinalAgent",
trace_name="process_market",
from_timestamp=start_time,
has_output=True,
client=langfuse,
)
print(f"All traces: {len(traces)}")
process_market_traces = []
for trace in traces:
if process_market_trace := ProcessMarketTrace.from_langfuse_trace(trace):
process_market_traces.append(process_market_trace)
print(f"All process_market_traces: {len(process_market_traces)}")

bets: list[ResolvedBet] = OmenAgentMarket.get_resolved_bets_made_since(
better_address=api_keys.bet_from_address,
start_time=start_time,
end_time=None,
)
print(f"All bets: {len(bets)}")

# All bets should have a trace, but not all traces should have a bet
# (e.g. if all markets are deemed unpredictable), so iterate over bets
bets_with_traces: list[ResolvedBetWithTrace] = []
for bet in bets:
trace = get_trace_for_bet(bet, process_market_traces)
if trace:
bets_with_traces.append(ResolvedBetWithTrace(bet=bet, trace=trace))

print(f"Matched bets with traces: {len(bets_with_traces)}")
107 changes: 82 additions & 25 deletions prediction_market_agent_tooling/tools/langfuse_client_utils.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,58 @@
import typing as t
from datetime import datetime

import numpy as np
from langfuse import Langfuse
from langfuse.client import TraceWithDetails
from pydantic import BaseModel

from prediction_market_agent_tooling.loggers import logger
from prediction_market_agent_tooling.markets.data_models import (
ProbabilisticAnswer,
ResolvedBet,
Trade,
TradeType,
)
from prediction_market_agent_tooling.markets.omen.omen import OmenAgentMarket
from prediction_market_agent_tooling.tools.utils import add_utc_timezone_validator


class ProcessMarketTrace(BaseModel):
timestamp: datetime
market: OmenAgentMarket
answer: ProbabilisticAnswer
trades: list[Trade]

@property
def buy_trade(self) -> Trade:
buy_trades = [t for t in self.trades if t.trade_type == TradeType.BUY]
if len(buy_trades) == 1:
return buy_trades[0]
raise ValueError("No buy trade found")

@staticmethod
def from_langfuse_trace(
trace: TraceWithDetails,
) -> t.Optional["ProcessMarketTrace"]:
market = trace_to_omen_agent_market(trace)
answer = trace_to_answer(trace)
trades = trace_to_trades(trace)

if not market or not answer or not trades:
return None

return ProcessMarketTrace(
market=market,
answer=answer,
trades=trades,
timestamp=trace.timestamp,
)


class ResolvedBetWithTrace(BaseModel):
bet: ResolvedBet
trace: ProcessMarketTrace


def get_traces_for_agent(
agent_name: str,
trace_name: str,
Expand Down Expand Up @@ -47,11 +87,18 @@ def get_traces_for_agent(
return all_agent_traces


def trace_to_omen_agent_market(trace: TraceWithDetails) -> OmenAgentMarket:
assert trace.input is not None, "Trace input is None"
assert trace.input["args"] is not None, "Trace input args is None"
def trace_to_omen_agent_market(trace: TraceWithDetails) -> OmenAgentMarket | None:
if not trace.input:
return None
if not trace.input["args"]:
return None
assert len(trace.input["args"]) == 2 and trace.input["args"][0] == "omen"
return OmenAgentMarket.model_validate(trace.input["args"][1])
try:
# If the market model is invalid (e.g. outdated), it will raise an exception
market = OmenAgentMarket.model_validate(trace.input["args"][1])
return market
except Exception:
return None


def trace_to_answer(trace: TraceWithDetails) -> ProbabilisticAnswer:
Expand All @@ -78,25 +125,35 @@ def get_closest_datetime_from_list(


def get_trace_for_bet(
bet: ResolvedBet, traces: list[TraceWithDetails]
) -> TraceWithDetails | None:
# Get traces with the same market id
traces_for_bet = [
t for t in traces if trace_to_omen_agent_market(t).id == bet.market_id
]

# In-case there are multiple traces for the same market, get the closest trace to the bet
closest_trace_index = get_closest_datetime_from_list(
add_utc_timezone_validator(bet.created_time),
[t.timestamp for t in traces_for_bet],
)
# Sanity check - the trace should be after the bet
if traces_for_bet[closest_trace_index].timestamp < add_utc_timezone_validator(
bet.created_time
):
logger.warning(
f"No trace for bet on market {bet.market_id} at time {bet.created_time} found"
)
bet: ResolvedBet, traces: list[ProcessMarketTrace]
) -> ProcessMarketTrace | None:
# Filter for traces with the same market id
traces = [t for t in traces if t.market.id == bet.market_id]

# Filter for traces with the same bet outcome and amount
traces_for_bet: list[ProcessMarketTrace] = []
for t in traces:
# Cannot use exact comparison due to gas fees
if t.buy_trade.outcome == bet.outcome and np.isclose(
t.buy_trade.amount.amount, bet.amount.amount
):
traces_for_bet.append(t)

if not traces_for_bet:
return None
elif len(traces_for_bet) == 1:
return traces_for_bet[0]
else:
# In-case there are multiple traces for the same market, get the closest
# trace to the bet
closest_trace_index = get_closest_datetime_from_list(
add_utc_timezone_validator(bet.created_time),
[t.timestamp for t in traces_for_bet],
)
# Sanity check - the trace should be after the bet
if traces_for_bet[closest_trace_index].timestamp < add_utc_timezone_validator(
bet.created_time
):
return None

return traces_for_bet[closest_trace_index]
return traces_for_bet[closest_trace_index]

0 comments on commit 3794ce2

Please sign in to comment.