Skip to content

Commit

Permalink
[ISSUE mxsm#1616]🚀Implement ConsumeMessageOrderlyService#consumeMessa…
Browse files Browse the repository at this point in the history
…geDirectly🔥 (mxsm#1628)
  • Loading branch information
mxsm authored Dec 7, 2024
1 parent aef5f76 commit c2a14f4
Showing 1 changed file with 53 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ use rocketmq_common::common::message::MessageConst;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::mix_all;
use rocketmq_common::MessageAccessor::MessageAccessor;
use rocketmq_remoting::protocol::body::cm_result::CMResult;
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
use rocketmq_runtime::RocketMQRuntime;
use rocketmq_rust::ArcMut;
use rocketmq_rust::RocketMQTokioMutex;
use rocketmq_rust::WeakArcMut;
use tracing::info;
use tracing::warn;

use crate::base::client_config::ClientConfig;
Expand Down Expand Up @@ -435,12 +437,62 @@ impl ConsumeMessageServiceTrait for ConsumeMessageOrderlyService {
todo!()
}

#[allow(deprecated)]
async fn consume_message_directly(
&self,
msg: MessageExt,
broker_name: Option<CheetahString>,
) -> ConsumeMessageDirectlyResult {
todo!()
info!("consumeMessageDirectly receive new message: {}", msg);
let mq = MessageQueue::from_parts(
msg.topic().clone(),
broker_name.unwrap_or_default(),
msg.queue_id(),
);
let mut msgs = vec![ArcMut::new(MessageClientExt::new(msg))];
let mut context = ConsumeOrderlyContext::new(mq);
self.default_mqpush_consumer_impl
.as_ref()
.unwrap()
.upgrade()
.unwrap()
.reset_retry_and_namespace(msgs.as_mut_slice(), self.consumer_group.as_str());

let begin_timestamp = Instant::now();

let status = self.message_listener.consume_message(
&msgs
.iter()
.map(|msg| &msg.message_ext_inner)
.collect::<Vec<&MessageExt>>(),
&mut context,
);
let mut result = ConsumeMessageDirectlyResult::default();
result.set_order(true);
result.set_auto_commit(context.is_auto_commit());
match status {
Ok(status) => match status {
ConsumeOrderlyStatus::Success => {
result.set_consume_result(CMResult::CRSuccess);
}
ConsumeOrderlyStatus::Rollback => {
result.set_consume_result(CMResult::CRRollback);
}
ConsumeOrderlyStatus::Commit => {
result.set_consume_result(CMResult::CRCommit);
}
ConsumeOrderlyStatus::SuspendCurrentQueueAMoment => {
result.set_consume_result(CMResult::CRLater);
}
},
Err(e) => {
result.set_consume_result(CMResult::CRThrowException);
result.set_remark(CheetahString::from_string(e.to_string()))
}
}
result.set_spent_time_mills(begin_timestamp.elapsed().as_millis() as u64);
info!("consumeMessageDirectly Result: {}", result);
result
}

async fn submit_consume_request(
Expand Down

0 comments on commit c2a14f4

Please sign in to comment.