Skip to content

Commit

Permalink
feat: add publish_interval logic (#121)
Browse files Browse the repository at this point in the history
* feat: add publish_interval logic

* refactor: rename PublisherPermissions to PricePublishingMetadata

* fix: pre-commit

* chore: bump pyth-agent version

* feat: use NativeTime instead of unix timestamp to have more accurate price timestamps

* refactor: use match to avoid unwrap
  • Loading branch information
keyvankhademi authored May 13, 2024
1 parent 3d35c1b commit 8754c47
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 74 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "2.6.2"
version = "2.7.0"
edition = "2021"

[[bin]]
Expand Down
66 changes: 65 additions & 1 deletion integration-tests/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@
},
"metadata": {"jump_id": "78876711", "jump_symbol": "SOLUSD", "price_exp": -8, "min_publishers": 1},
}
PYTH_USD = {
"account": "",
"attr_dict": {
"symbol": "Crypto.PYTH/USD",
"asset_type": "Crypto",
"base": "PYTH",
"quote_currency": "USD",
"generic_symbol": "PYTHUSD",
"description": "PYTH/USD",
"publish_interval": "2",
},
"metadata": {"jump_id": "78876712", "jump_symbol": "PYTHUSD", "price_exp": -8, "min_publishers": 1},
}
AAPL_USD = {
"account": "",
"attr_dict": {
Expand Down Expand Up @@ -110,7 +123,7 @@
},
"metadata": {"jump_id": "78876710", "jump_symbol": "ETHUSD", "price_exp": -8, "min_publishers": 1},
}
ALL_PRODUCTS=[BTC_USD, AAPL_USD, ETH_USD, SOL_USD]
ALL_PRODUCTS=[BTC_USD, AAPL_USD, ETH_USD, SOL_USD, PYTH_USD]

asyncio.set_event_loop(asyncio.new_event_loop())

Expand Down Expand Up @@ -293,6 +306,7 @@ def refdata_permissions(self, refdata_path):
"BTCUSD": {"price": ["some_publisher_b", "some_publisher_a"]}, # Reversed order helps ensure permission discovery works correctly for publisher A
"ETHUSD": {"price": ["some_publisher_b"]},
"SOLUSD": {"price": ["some_publisher_a"]},
"PYTHUSD": {"price": ["some_publisher_a"]},
}))
f.flush()
yield f.name
Expand Down Expand Up @@ -820,3 +834,53 @@ async def test_agent_respects_holiday_hours(self, client: PythAgentClient):
assert final_price_account["price"] == 0
assert final_price_account["conf"] == 0
assert final_price_account["status"] == "unknown"

@pytest.mark.asyncio
async def test_agent_respects_publish_interval(self, client: PythAgentClient):
'''
Similar to test_agent_respects_market_hours, but using PYTH_USD.
This test asserts that consecutive price updates will only get published
if it's after the specified publish interval.
'''

# Fetch all products
products = {product["attr_dict"]["symbol"]: product for product in await client.get_all_products()}

# Find the product account ID corresponding to the AAPL/USD symbol
product = products[PYTH_USD["attr_dict"]["symbol"]]
product_account = product["account"]

# Get the price account with which to send updates
price_account = product["price_accounts"][0]["account"]

# Send an "update_price" request
await client.update_price(price_account, 42, 2, "trading")
time.sleep(1)

# Send another update_price request to "trigger" aggregation
# (aggregation would happen if publish interval were to fail, but
# we want to catch that happening if there's a problem)
await client.update_price(price_account, 81, 1, "trading")
time.sleep(2)

# Confirm that the price account has not been updated
final_product_state = await client.get_product(product_account)

final_price_account = final_product_state["price_accounts"][0]
assert final_price_account["price"] == 0
assert final_price_account["conf"] == 0
assert final_price_account["status"] == "unknown"


# Send another update_price request to "trigger" aggregation
# Now it is after the publish interval, so the price should be updated
await client.update_price(price_account, 81, 1, "trading")
time.sleep(2)

# Confirm that the price account has been updated
final_product_state = await client.get_product(product_account)

final_price_account = final_product_state["price_accounts"][0]
assert final_price_account["price"] == 42
assert final_price_account["conf"] == 2
assert final_price_account["status"] == "trading"
6 changes: 1 addition & 5 deletions src/agent/dashboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,7 @@ impl MetricsServer {
};

let last_local_update_string = if let Some(local_data) = price_data.local_data {
if let Some(datetime) = DateTime::from_timestamp(local_data.timestamp, 0) {
datetime.format("%Y-%m-%d %H:%M:%S").to_string()
} else {
format!("Invalid timestamp {}", local_data.timestamp)
}
local_data.timestamp.format("%Y-%m-%d %H:%M:%S").to_string()
} else {
"no data".to_string()
};
Expand Down
6 changes: 2 additions & 4 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ use {
},
warp::{
hyper::StatusCode,
reply::{
self,
},
reply,
Filter,
Rejection,
Reply,
Expand Down Expand Up @@ -428,7 +426,7 @@ impl PriceLocalMetrics {
.get_or_create(&PriceLocalLabels {
pubkey: price_key.to_string(),
})
.set(price_info.timestamp);
.set(price_info.timestamp.and_utc().timestamp());
update_count
.get_or_create(&PriceLocalLabels {
pubkey: price_key.to_string(),
Expand Down
14 changes: 8 additions & 6 deletions src/agent/pythd/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ mod tests {
)
.unwrap(),
solana::oracle::ProductEntry {
account_data: pyth_sdk_solana::state::ProductAccount {
account_data: pyth_sdk_solana::state::ProductAccount {
magic: 0xa1b2c3d4,
ver: 6,
atype: 4,
Expand Down Expand Up @@ -499,8 +499,9 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
],
},
schedule: Default::default(),
price_accounts: vec![
schedule: Default::default(),
publish_interval: None,
price_accounts: vec![
solana_sdk::pubkey::Pubkey::from_str(
"GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU",
)
Expand All @@ -522,7 +523,7 @@ mod tests {
)
.unwrap(),
solana::oracle::ProductEntry {
account_data: pyth_sdk_solana::state::ProductAccount {
account_data: pyth_sdk_solana::state::ProductAccount {
magic: 0xa1b2c3d4,
ver: 5,
atype: 3,
Expand Down Expand Up @@ -559,8 +560,9 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
],
},
schedule: Default::default(),
price_accounts: vec![
schedule: Default::default(),
publish_interval: None,
price_accounts: vec![
solana_sdk::pubkey::Pubkey::from_str(
"GG3FTE7xhc9Diy7dn9P6BWzoCrAEE4D3p5NBYrDAm5DD",
)
Expand Down
2 changes: 1 addition & 1 deletion src/agent/pythd/adapter/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ impl AdapterApi for Adapter {
status: Adapter::map_status(&status)?,
price,
conf,
timestamp: Utc::now().timestamp(),
timestamp: Utc::now().naive_utc(),
},
})
.await
Expand Down
99 changes: 60 additions & 39 deletions src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ use {
},
key_store,
network::Network,
oracle::PricePublishingMetadata,
},
crate::agent::{
market_schedule::MarketSchedule,
remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
},
crate::agent::remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
},
anyhow::{
anyhow,
Expand Down Expand Up @@ -68,7 +66,6 @@ use {
sync::{
mpsc::{
self,
error::TryRecvError,
Sender,
},
oneshot,
Expand Down Expand Up @@ -174,7 +171,9 @@ pub fn spawn_exporter(
network: Network,
rpc_url: &str,
rpc_timeout: Duration,
publisher_permissions_rx: watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
publisher_permissions_rx: watch::Receiver<
HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>,
>,
key_store: KeyStore,
local_store_tx: Sender<store::local::Message>,
global_store_tx: Sender<store::global::Lookup>,
Expand Down Expand Up @@ -262,10 +261,11 @@ pub struct Exporter {
inflight_transactions_tx: Sender<Signature>,

/// publisher => { permissioned_price => market hours } as read by the oracle module
publisher_permissions_rx: watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
publisher_permissions_rx:
watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>>,

/// Currently known permissioned prices of this publisher along with their market hours
our_prices: HashMap<Pubkey, MarketSchedule>,
our_prices: HashMap<Pubkey, PricePublishingMetadata>,

/// Interval to update the dynamic price (if enabled)
dynamic_compute_unit_price_update_interval: Interval,
Expand All @@ -289,7 +289,9 @@ impl Exporter {
global_store_tx: Sender<store::global::Lookup>,
network_state_rx: watch::Receiver<NetworkState>,
inflight_transactions_tx: Sender<Signature>,
publisher_permissions_rx: watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
publisher_permissions_rx: watch::Receiver<
HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>,
>,
keypair_request_tx: mpsc::Sender<KeypairRequest>,
logger: Logger,
) -> Self {
Expand Down Expand Up @@ -432,22 +434,30 @@ impl Exporter {
async fn get_permissioned_updates(&mut self) -> Result<Vec<(PriceIdentifier, PriceInfo)>> {
let local_store_contents = self.fetch_local_store_contents().await?;

let now = Utc::now().timestamp();
let publish_keypair = self.get_publish_keypair().await?;
self.update_our_prices(&publish_keypair.pubkey());

let now = Utc::now().naive_utc();

debug!(self.logger, "Exporter: filtering prices permissioned to us";
"our_prices" => format!("{:?}", self.our_prices.keys()),
"publish_pubkey" => publish_keypair.pubkey().to_string(),
);

// Filter the contents to only include information we haven't already sent,
// and to ignore stale information.
let fresh_updates = local_store_contents
Ok(local_store_contents
.into_iter()
.filter(|(_identifier, info)| {
// Filter out timestamps that are old
(now - info.timestamp) < self.config.staleness_threshold.as_secs() as i64
now < info.timestamp + self.config.staleness_threshold
})
.filter(|(identifier, info)| {
// Filter out unchanged price data if the max delay wasn't reached

if let Some(last_info) = self.last_published_state.get(identifier) {
if info.timestamp.saturating_sub(last_info.timestamp)
> self.config.unchanged_publish_threshold.as_secs() as i64
if info.timestamp
> last_info.timestamp + self.config.unchanged_publish_threshold
{
true // max delay since last published state reached, we publish anyway
} else {
Expand All @@ -457,33 +467,17 @@ impl Exporter {
true // No prior data found, letting the price through
}
})
.collect::<Vec<_>>();

let publish_keypair = self.get_publish_keypair().await?;

self.update_our_prices(&publish_keypair.pubkey());

debug!(self.logger, "Exporter: filtering prices permissioned to us";
"our_prices" => format!("{:?}", self.our_prices.keys()),
"publish_pubkey" => publish_keypair.pubkey().to_string(),
);

// Get a fresh system time
let now = Utc::now();

// Filter out price accounts we're not permissioned to update
Ok(fresh_updates
.into_iter()
.filter(|(id, _data)| {
let key_from_id = Pubkey::from((*id).clone().to_bytes());
if let Some(schedule) = self.our_prices.get(&key_from_id) {
let ret = schedule.can_publish_at(&now);
if let Some(publisher_permission) = self.our_prices.get(&key_from_id) {
let now_utc = Utc::now();
let ret = publisher_permission.schedule.can_publish_at(&now_utc);

if !ret {
debug!(self.logger, "Exporter: Attempted to publish price outside market hours";
"price_account" => key_from_id.to_string(),
"schedule" => format!("{:?}", schedule),
"utc_time" => now.format("%c").to_string(),
"schedule" => format!("{:?}", publisher_permission.schedule),
"utc_time" => now_utc.format("%c").to_string(),
);
}

Expand All @@ -501,6 +495,33 @@ impl Exporter {
false
}
})
.filter(|(id, info)| {
// Filtering out prices that are being updated too frequently according to publisher_permission.publish_interval
let last_info = match self.last_published_state.get(id) {
Some(last_info) => last_info,
None => {
// No prior data found, letting the price through
return true;
}
};

let key_from_id = Pubkey::from((*id).clone().to_bytes());
let publisher_metadata = match self.our_prices.get(&key_from_id) {
Some(metadata) => metadata,
None => {
// Should never happen since we have filtered out the price above
return false;
}
};

if let Some(publish_interval) = publisher_metadata.publish_interval {
if info.timestamp < last_info.timestamp + publish_interval {
// Updating the price too soon after the last update, skipping
return false;
}
}
true
})
.collect::<Vec<_>>())
}

Expand Down Expand Up @@ -623,9 +644,9 @@ impl Exporter {
let network_state = *self.network_state_rx.borrow();
for (identifier, price_info_result) in refreshed_batch {
let price_info = price_info_result?;
let now = Utc::now().naive_utc();

let stale_price = (Utc::now().timestamp() - price_info.timestamp)
> self.config.staleness_threshold.as_secs() as i64;
let stale_price = now > price_info.timestamp + self.config.staleness_threshold;
if stale_price {
continue;
}
Expand Down
Loading

0 comments on commit 8754c47

Please sign in to comment.