Skip to content

Commit

Permalink
[ISSUE mxsm#1627]🚀Replace Client consumer's WeakArcMut<DefaultMQPushC…
Browse files Browse the repository at this point in the history
…onsumerImpl> with ArcMut<DefaultMQPushConsumerImpl>🔥 (mxsm#1629)
  • Loading branch information
mxsm authored Dec 7, 2024
1 parent c2a14f4 commit 01c9508
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 395 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeM
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
use rocketmq_runtime::RocketMQRuntime;
use rocketmq_rust::ArcMut;
use rocketmq_rust::WeakArcMut;
use tracing::info;
use tracing::warn;

Expand All @@ -49,7 +48,7 @@ use crate::consumer::listener::message_listener_concurrently::ArcBoxMessageListe
use crate::hook::consume_message_context::ConsumeMessageContext;

pub struct ConsumeMessageConcurrentlyService {
pub(crate) default_mqpush_consumer_impl: Option<WeakArcMut<DefaultMQPushConsumerImpl>>,
pub(crate) default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,
pub(crate) client_config: ArcMut<ClientConfig>,
pub(crate) consumer_config: ArcMut<ConsumerConfig>,
pub(crate) consumer_group: CheetahString,
Expand All @@ -63,7 +62,7 @@ impl ConsumeMessageConcurrentlyService {
consumer_config: ArcMut<ConsumerConfig>,
consumer_group: CheetahString,
message_listener: ArcBoxMessageListenerConcurrently,
default_mqpush_consumer_impl: Option<WeakArcMut<DefaultMQPushConsumerImpl>>,
default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,
) -> Self {
let consume_thread = consumer_config.consume_thread_max;
let consumer_group_tag = format!("{}_{}", "ConsumeMessageThread_", consumer_group);
Expand All @@ -83,29 +82,24 @@ impl ConsumeMessageConcurrentlyService {

impl ConsumeMessageConcurrentlyService {
async fn clean_expire_msg(&mut self) {
if let Some(default_mqpush_consumer_impl) = self
.default_mqpush_consumer_impl
.as_ref()
.unwrap()
.upgrade()
{
let process_queue_table = default_mqpush_consumer_impl
.rebalance_impl
.rebalance_impl_inner
.process_queue_table
.read()
let default_mqpush_consumer_impl = self.default_mqpush_consumer_impl.clone().unwrap();

let process_queue_table = default_mqpush_consumer_impl
.rebalance_impl
.rebalance_impl_inner
.process_queue_table
.read()
.await;
for (_, process_queue) in process_queue_table.iter() {
process_queue
.clean_expired_msg(self.default_mqpush_consumer_impl.clone())
.await;
for (_, process_queue) in process_queue_table.iter() {
process_queue
.clean_expired_msg(self.default_mqpush_consumer_impl.clone())
.await;
}
}
}

async fn process_consume_result(
&mut self,
this: WeakArcMut<Self>,
this: ArcMut<Self>,
status: ConsumeConcurrentlyStatus,
context: &ConsumeConcurrentlyContext,
consume_request: &mut ConsumeRequest,
Expand Down Expand Up @@ -175,36 +169,29 @@ impl ConsumeMessageConcurrentlyService {
.remove_message(&consume_request.msgs)
.await;
if offset >= 0 && !consume_request.process_queue.is_dropped() {
if let Some(mut default_mqpush_consumer_impl) = self
.default_mqpush_consumer_impl
.as_ref()
let default_mqpush_consumer_impl = self.default_mqpush_consumer_impl.as_mut().unwrap();
default_mqpush_consumer_impl
.offset_store
.as_mut()
.unwrap()
.upgrade()
{
default_mqpush_consumer_impl
.offset_store
.as_mut()
.unwrap()
.update_offset(&consume_request.message_queue, offset, true)
.await;
}
.update_offset(&consume_request.message_queue, offset, true)
.await;
}
}

fn submit_consume_request_later(
&self,
msgs: Vec<ArcMut<MessageClientExt>>,
this: WeakArcMut<Self>,
this: ArcMut<Self>,
process_queue: Arc<ProcessQueue>,
message_queue: MessageQueue,
) {
self.consume_runtime.get_handle().spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
let this_ = this.clone();
if let Some(this) = this.upgrade() {
this.submit_consume_request(this_, msgs, process_queue, message_queue, true)
.await;
}

this.submit_consume_request(this_, msgs, process_queue, message_queue, true)
.await;
});
}

Expand All @@ -215,38 +202,31 @@ impl ConsumeMessageConcurrentlyService {
) -> bool {
let delay_level = context.delay_level_when_next_consume;
msg.set_topic(self.client_config.with_namespace(msg.get_topic().as_str()));
match self
.default_mqpush_consumer_impl
.as_ref()

self.default_mqpush_consumer_impl
.as_mut()
.unwrap()
.upgrade()
{
None => false,
Some(mut default_mqpush_consumer_impl) => default_mqpush_consumer_impl
.send_message_back(
msg,
delay_level,
&self
.client_config
.queue_with_namespace(context.get_message_queue().clone()),
)
.await
.is_ok(),
}
.send_message_back(
msg,
delay_level,
&self
.client_config
.queue_with_namespace(context.get_message_queue().clone()),
)
.await
.is_ok()
}
}

impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
fn start(&mut self, this: WeakArcMut<Self>) {
fn start(&mut self, mut this: ArcMut<Self>) {
self.consume_runtime.get_handle().spawn(async move {
if let Some(mut this) = this.upgrade() {
let timeout = this.consumer_config.consume_timeout;
let mut interval = tokio::time::interval(Duration::from_secs(timeout * 60));
let timeout = this.consumer_config.consume_timeout;
let mut interval = tokio::time::interval(Duration::from_secs(timeout * 60));
interval.tick().await;
loop {
interval.tick().await;
loop {
interval.tick().await;
this.clean_expire_msg().await;
}
this.clean_expire_msg().await;
}
});
}
Expand Down Expand Up @@ -285,8 +265,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
self.default_mqpush_consumer_impl
.as_ref()
.unwrap()
.upgrade()
.unwrap()
.mut_from_ref()
.reset_retry_and_namespace(msgs.as_mut_slice(), self.consumer_group.as_str());

let begin_timestamp = Instant::now();
Expand Down Expand Up @@ -322,7 +301,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {

async fn submit_consume_request(
&self,
this: WeakArcMut<Self>,
this: ArcMut<Self>,
msgs: Vec<ArcMut<MessageClientExt>>,
process_queue: Arc<ProcessQueue>,
message_queue: MessageQueue,
Expand Down Expand Up @@ -383,13 +362,13 @@ struct ConsumeRequest {
message_queue: MessageQueue,
dispatch_to_consume: bool,
consumer_group: CheetahString,
default_mqpush_consumer_impl: Option<WeakArcMut<DefaultMQPushConsumerImpl>>,
default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,
}

impl ConsumeRequest {
async fn run(
&mut self,
consume_message_concurrently_service: WeakArcMut<ConsumeMessageConcurrentlyService>,
mut consume_message_concurrently_service: ArcMut<ConsumeMessageConcurrentlyService>,
) {
if self.process_queue.is_dropped() {
info!(
Expand All @@ -404,15 +383,8 @@ impl ConsumeRequest {
ack_index: i32::MAX,
};

let default_mqpush_consumer_impl = self
.default_mqpush_consumer_impl
.as_ref()
.unwrap()
.upgrade();
if default_mqpush_consumer_impl.is_none() {
return;
}
let mut default_mqpush_consumer_impl = default_mqpush_consumer_impl.unwrap();
let mut default_mqpush_consumer_impl =
self.default_mqpush_consumer_impl.as_ref().unwrap().clone();
let consumer_group = self.consumer_group.clone();
DefaultMQPushConsumerImpl::try_reset_pop_retry_topic(
&mut self.msgs,
Expand Down Expand Up @@ -511,11 +483,10 @@ impl ConsumeRequest {
);
} else {
let this = consume_message_concurrently_service.clone();
if let Some(mut consume_message_concurrently_service) = this.upgrade() {
consume_message_concurrently_service
.process_consume_result(this, status.unwrap(), &context, self)
.await;
}

consume_message_concurrently_service
.process_consume_result(this, status.unwrap(), &context, self)
.await;
}
}
}
Loading

0 comments on commit 01c9508

Please sign in to comment.