Skip to content

Commit

Permalink
add a field for internal indexer version
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Sep 17, 2024
1 parent 4dd94c6 commit 8b4d7e2
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 62 deletions.
23 changes: 4 additions & 19 deletions api/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl AccountsApi {

let context = self.context.clone();
api_spawn_blocking(move || {
let account = Account::new(context, address.0, ledger_version.0, None, None, false)?;
let account = Account::new(context, address.0, ledger_version.0, None, None)?;
account.account(&accept_type)
})
.await
Expand Down Expand Up @@ -118,7 +118,6 @@ impl AccountsApi {
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
true,
)?;
account.resources(&accept_type)
})
Expand Down Expand Up @@ -171,7 +170,6 @@ impl AccountsApi {
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
true,
)?;
account.modules(&accept_type)
})
Expand Down Expand Up @@ -201,24 +199,11 @@ impl Account {
requested_ledger_version: Option<U64>,
start: Option<StateKey>,
limit: Option<u16>,
require_state_indices: bool,
) -> Result<Self, BasicErrorWith404> {
let sharding_enabled = context
.node_config
.storage
.rocksdb_configs
.enable_storage_sharding;

let (latest_ledger_info, requested_version) = if sharding_enabled && require_state_indices {
context.get_latest_ledger_info_and_verify_internal_indexer_lookup_version(
let (latest_ledger_info, requested_version) = context
.get_latest_ledger_info_and_verify_lookup_version(
requested_ledger_version.map(|inner| inner.0),
)?
} else {
// Use the latest ledger version, or the requested associated version
context.get_latest_ledger_info_and_verify_lookup_version(
requested_ledger_version.map(|inner| inner.0),
)?
};
)?;

Ok(Self {
context,
Expand Down
86 changes: 47 additions & 39 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,20 +221,26 @@ impl Context {
.map_err(|e| e.into())
}

pub fn get_latest_ledger_info<E: ServiceUnavailableError>(&self) -> Result<LedgerInfo, E> {
pub fn get_oldest_version_and_block_height<E: ServiceUnavailableError>(
&self,
) -> Result<(Version, u64), E> {
self.db
.get_first_viable_block()
.context("Failed to retrieve oldest block information")
.map_err(|e| E::service_unavailable_with_code_no_info(e, AptosErrorCode::InternalError))
}

pub fn get_latest_storage_ledger_info<E: ServiceUnavailableError>(
&self,
) -> Result<LedgerInfo, E> {
let ledger_info = self
.get_latest_ledger_info_with_signatures()
.context("Failed to retrieve latest ledger info")
.map_err(|e| {
E::service_unavailable_with_code_no_info(e, AptosErrorCode::InternalError)
})?;
let (oldest_version, oldest_block_height) = self
.db
.get_first_viable_block()
.context("Failed to retrieve oldest block information")
.map_err(|e| {
E::service_unavailable_with_code_no_info(e, AptosErrorCode::InternalError)
})?;

let (oldest_version, oldest_block_height) = self.get_oldest_version_and_block_height()?;
let (_, _, newest_block_event) = self
.db
.get_block_info_by_version(ledger_info.ledger_info().version())
Expand All @@ -252,32 +258,12 @@ impl Context {
))
}

pub fn get_latest_ledger_info_and_verify_internal_indexer_lookup_version<E: StdApiError>(
&self,
requested_ledger_version: Option<Version>,
) -> Result<(LedgerInfo, Version), E> {
if self.indexer_reader.is_none() {
return Err(E::internal_with_code_no_info(
"Indexer reader doesn't exist",
AptosErrorCode::InternalError,
));
}

let (latest_ledger_info, latest_internal_indexer_ledger_version) =
self.get_latest_internal_indexer_ledger_version_and_main_db_info()?;
if let Some(version) = requested_ledger_version {
let request_ledger_version = Version::from(version);
if latest_internal_indexer_ledger_version < request_ledger_version {
return Err(version_not_found(
request_ledger_version,
&latest_ledger_info,
));
} else if request_ledger_version < latest_ledger_info.oldest_ledger_version.0 {
return Err(version_pruned(request_ledger_version, &latest_ledger_info));
}
Ok((latest_ledger_info, request_ledger_version))
pub fn get_latest_ledger_info<E: ServiceUnavailableError>(&self) -> Result<LedgerInfo, E> {
if self.indexer_reader.is_some() {
let ledger_info = self.get_latest_internal_indexer_ledger_version_and_ledger_info()?;
Ok(ledger_info)
} else {
Ok((latest_ledger_info, latest_internal_indexer_ledger_version))
self.get_latest_storage_ledger_info()
}
}

Expand Down Expand Up @@ -306,20 +292,42 @@ impl Context {
Ok((latest_ledger_info, requested_ledger_version))
}

pub fn get_latest_internal_indexer_ledger_version_and_main_db_info<E: StdApiError>(
pub fn get_latest_internal_indexer_ledger_version_and_ledger_info<
E: ServiceUnavailableError,
>(
&self,
) -> Result<(LedgerInfo, Version), E> {
) -> Result<LedgerInfo, E> {
if let Some(indexer_reader) = self.indexer_reader.as_ref() {
if let Some(latest_version) = indexer_reader
.get_latest_internal_indexer_ledger_version()
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?
.map_err(|err| {
E::service_unavailable_with_code_no_info(err, AptosErrorCode::InternalError)
})?
{
let latest_ledger_info = self.get_latest_ledger_info()?;
return Ok((latest_ledger_info, latest_version));
let (_, _, new_block_event) = self
.db
.get_block_info_by_version(latest_version)
.map_err(|_| {
E::service_unavailable_with_code_no_info(
"Failed to get block",
AptosErrorCode::InternalError,
)
})?;
let (oldest_version, oldest_block_height) =
self.get_oldest_version_and_block_height()?;
return Ok(LedgerInfo::new_ledger_info(
&self.chain_id(),
new_block_event.epoch(),
latest_version,
oldest_version,
oldest_block_height,
new_block_event.height(),
new_block_event.proposed_time(),
));
}
}

Err(E::internal_with_code_no_info(
Err(E::service_unavailable_with_code_no_info(
"Indexer reader doesn't exist, or doesn't have data.",
AptosErrorCode::InternalError,
))
Expand Down
4 changes: 2 additions & 2 deletions api/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl EventsApi {
// Ensure that account exists
let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None, true)?;
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
account.verify_account_or_object_resource()?;
api.list(
account.latest_ledger_info,
Expand Down Expand Up @@ -144,7 +144,7 @@ impl EventsApi {

let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None, true)?;
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let key = account.find_event_key(event_handle.0, field_name.0.into())?;
api.list(account.latest_ledger_info, accept_type, page, key)
})
Expand Down
1 change: 0 additions & 1 deletion api/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ impl IndexApi {
self.context
.check_api_output_enabled("Get ledger info", &accept_type)?;
let ledger_info = self.context.get_latest_ledger_info()?;

let node_role = self.context.node_role();

api_spawn_blocking(move || match accept_type {
Expand Down
2 changes: 1 addition & 1 deletion api/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ impl TransactionsApi {
address: Address,
) -> BasicResultWith404<Vec<Transaction>> {
// Verify the account exists
let account = Account::new(self.context.clone(), address, None, None, None, true)?;
let account = Account::new(self.context.clone(), address, None, None, None)?;
account.get_account_resource()?;

let latest_ledger_info = account.latest_ledger_info;
Expand Down
20 changes: 20 additions & 0 deletions api/types/src/ledger_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ impl LedgerInfo {
}
}

pub fn new_ledger_info(
chain_id: &ChainId,
epoch: u64,
ledger_version: u64,
oldest_ledger_version: u64,
oldest_block_height: u64,
block_height: u64,
ledger_timestamp: u64,
) -> Self {
Self {
chain_id: chain_id.id(),
epoch: epoch.into(),
ledger_version: ledger_version.into(),
oldest_ledger_version: oldest_ledger_version.into(),
block_height: block_height.into(),
oldest_block_height: oldest_block_height.into(),
ledger_timestamp: ledger_timestamp.into(),
}
}

pub fn epoch(&self) -> u64 {
self.epoch.into()
}
Expand Down

0 comments on commit 8b4d7e2

Please sign in to comment.