Skip to content

Commit

Permalink
feat: add retry logic for MetaPeerClient (#991)
Browse files Browse the repository at this point in the history
* add retry logic in meta_peer_client

* impl need_retry function

* create meta_peer_client using the builder pattern

* cr
  • Loading branch information
fengys1996 authored Feb 15, 2023
1 parent de0b8aa commit e2904b9
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 41 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
dashmap = "5.4"
derive_builder = "0.12"
etcd-client = "0.10"
futures.workspace = true
h2 = "0.3"
Expand Down
10 changes: 8 additions & 2 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::server::Router;

use crate::cluster::MetaPeerClient;
use crate::cluster::MetaPeerClientBuilder;
use crate::election::etcd::EtcdElection;
use crate::lock::etcd::EtcdLock;
use crate::metasrv::builder::MetaSrvBuilder;
Expand Down Expand Up @@ -91,7 +91,13 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
};

let in_memory = Arc::new(MemStore::default()) as ResetableKvStoreRef;
let meta_peer_client = MetaPeerClient::new(in_memory.clone(), election.clone());

let meta_peer_client = MetaPeerClientBuilder::default()
.election(election.clone())
.in_memory(in_memory.clone())
.build()
// Safety: all required fields set at initialization
.unwrap();

let selector = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector {
Expand Down
144 changes: 105 additions & 39 deletions src/meta-srv/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,52 +13,93 @@
// limitations under the License.

use std::collections::HashMap;
use std::time::Duration;

use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{
BatchGetRequest, BatchGetResponse, KeyValue, RangeRequest, RangeResponse, ResponseHeader,
};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::warn;
use derive_builder::Builder;
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::Result;
use crate::error::{match_for_io_error, Result};
use crate::keys::{StatKey, StatValue, DN_STAT_PREFIX};
use crate::metasrv::ElectionRef;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::ResetableKvStoreRef;
use crate::{error, util};

#[derive(Clone)]
#[derive(Builder, Clone)]
pub struct MetaPeerClient {
election: Option<ElectionRef>,
in_memory: ResetableKvStoreRef,
#[builder(default = "ChannelManager::default()")]
channel_manager: ChannelManager,
#[builder(default = "3")]
max_retry_count: usize,
#[builder(default = "1000")]
retry_interval_ms: u64,
}

impl MetaPeerClient {
pub fn new(in_mem: ResetableKvStoreRef, election: Option<ElectionRef>) -> Self {
Self {
election,
in_memory: in_mem,
channel_manager: ChannelManager::default(),
}
}

// Get all datanode stat kvs from leader meta.
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<StatKey, StatValue>> {
let stat_prefix = format!("{DN_STAT_PREFIX}-").into_bytes();
let range_end = util::get_prefix_end_key(&stat_prefix);
let req = RangeRequest {
key: stat_prefix.clone(),
range_end,
..Default::default()
};
let key = format!("{DN_STAT_PREFIX}-").into_bytes();
let range_end = util::get_prefix_end_key(&key);

let kvs = self.range(key, range_end).await?;

to_stat_kv_map(kvs)
}

// Get datanode stat kvs from leader meta by input keys.
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
let stat_keys = keys.into_iter().map(|key| key.into()).collect();

let kvs = self.batch_get(stat_keys).await?;

to_stat_kv_map(kvs)
}

// Range kv information from the leader's in_mem kv store
pub async fn range(&self, key: Vec<u8>, range_end: Vec<u8>) -> Result<Vec<KeyValue>> {
if self.is_leader() {
let kvs = self.in_memory.range(req).await?.kvs;
return to_stat_kv_map(kvs);
let request = RangeRequest {
key,
range_end,
..Default::default()
};

return self.in_memory.range(request).await.map(|resp| resp.kvs);
}

let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;

for _ in 0..max_retry_count {
match self.remote_range(key.clone(), range_end.clone()).await {
Ok(kvs) => return Ok(kvs),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
} else {
return Err(e);
}
}
}
}

error::ExceededRetryLimitSnafu {
func_name: "range",
retry_num: max_retry_count,
}
.fail()
}

async fn remote_range(&self, key: Vec<u8>, range_end: Vec<u8>) -> Result<Vec<KeyValue>> {
// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();

Expand All @@ -69,39 +110,54 @@ impl MetaPeerClient {
.get(&leader_addr)
.context(error::CreateChannelSnafu)?;

let request = tonic::Request::new(req);
let request = tonic::Request::new(RangeRequest {
key,
range_end,
..Default::default()
});

let response: RangeResponse = ClusterClient::new(channel)
.range(request)
.await
.context(error::BatchGetSnafu)?
.context(error::RangeSnafu)?
.into_inner();

check_resp_header(&response.header, Context { addr: &leader_addr })?;

to_stat_kv_map(response.kvs)
}

// Get datanode stat kvs from leader meta by input keys.
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
let stat_kvs = self.batch_get(stat_keys).await?;

let mut result = HashMap::with_capacity(stat_kvs.len());
for stat_kv in stat_kvs {
let stat_key = stat_kv.key.try_into()?;
let stat_val = stat_kv.value.try_into()?;
result.insert(stat_key, stat_val);
}
Ok(result)
Ok(response.kvs)
}

// Get kv information from the leader's in_mem kv store
async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
pub async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
if self.is_leader() {
return self.in_memory.batch_get(keys).await;
}

let max_retry_count = self.max_retry_count;
let retry_interval_ms = self.retry_interval_ms;

for _ in 0..max_retry_count {
match self.remote_batch_get(keys.clone()).await {
Ok(kvs) => return Ok(kvs),
Err(e) => {
if need_retry(&e) {
warn!("Encountered an error that need to retry, err: {:?}", e);
tokio::time::sleep(Duration::from_millis(retry_interval_ms)).await;
} else {
return Err(e);
}
}
}
}

error::ExceededRetryLimitSnafu {
func_name: "batch_get",
retry_num: max_retry_count,
}
.fail()
}

async fn remote_batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();

Expand All @@ -113,11 +169,11 @@ impl MetaPeerClient {
.context(error::CreateChannelSnafu)?;

let request = tonic::Request::new(BatchGetRequest {
keys: keys.clone(),
keys,
..Default::default()
});

let response: BatchGetResponse = ClusterClient::new(channel.clone())
let response: BatchGetResponse = ClusterClient::new(channel)
.batch_get(request)
.await
.context(error::BatchGetSnafu)?
Expand Down Expand Up @@ -165,6 +221,16 @@ fn check_resp_header(header: &Option<ResponseHeader>, ctx: Context) -> Result<()
Ok(())
}

fn need_retry(error: &error::Error) -> bool {
match error {
error::Error::IsNotLeader { .. } => true,
error::Error::Range { source, .. } | error::Error::BatchGet { source, .. } => {
match_for_io_error(source).is_some()
}
_ => false,
}
}

#[cfg(test)]
mod tests {
use api::v1::meta::{Error, ErrorCode, KeyValue, ResponseHeader};
Expand Down
22 changes: 22 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"Failed to batch range KVs from leader's in_memory kv store, source: {}",
source
))]
Range {
source: tonic::Status,
backtrace: Backtrace,
},

#[snafu(display("Response header not found"))]
ResponseHeaderNotFound { backtrace: Backtrace },

Expand All @@ -213,6 +222,17 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"The number of retries for the grpc call {} exceeded the limit, {}",
func_name,
retry_num
))]
ExceededRetryLimit {
func_name: String,
retry_num: usize,
backtrace: Backtrace,
},

#[snafu(display("An error occurred in Meta, source: {}", source))]
MetaInternal {
#[snafu(backtrace)]
Expand Down Expand Up @@ -271,6 +291,7 @@ impl ErrorExt for Error {
| Error::NoLeader { .. }
| Error::CreateChannel { .. }
| Error::BatchGet { .. }
| Error::Range { .. }
| Error::ResponseHeaderNotFound { .. }
| Error::IsNotLeader { .. }
| Error::NoMetaPeerClient { .. }
Expand All @@ -279,6 +300,7 @@ impl ErrorExt for Error {
| Error::Unlock { .. }
| Error::LeaseGrant { .. }
| Error::LockNotConfig { .. }
| Error::ExceededRetryLimit { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::EmptyTableName { .. }
Expand Down
5 changes: 5 additions & 0 deletions src/meta-srv/src/service/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use api::v1::meta::{
cluster_server, BatchGetRequest, BatchGetResponse, Error, RangeRequest, RangeResponse,
ResponseHeader,
};
use common_telemetry::warn;
use tonic::{Request, Response};

use crate::metasrv::MetaSrv;
Expand All @@ -31,6 +32,8 @@ impl cluster_server::Cluster for MetaSrv {
header: Some(is_not_leader),
..Default::default()
};

warn!("The current meta is not leader, but a batch_get request have reached the meta. Detail: {:?}.", req);
return Ok(Response::new(resp));
}

Expand All @@ -53,6 +56,8 @@ impl cluster_server::Cluster for MetaSrv {
header: Some(is_not_leader),
..Default::default()
};

warn!("The current meta is not leader, but a range request have reached the meta. Detail: {:?}.", req);
return Ok(Response::new(resp));
}

Expand Down

0 comments on commit e2904b9

Please sign in to comment.