Skip to content

Commit

Permalink
[ISSUE mxsm#300]🚧Support send message and send message v2(request cod…
Browse files Browse the repository at this point in the history
…e:10,310)-3 (mxsm#305)

* 1111

* add

* add

* add

* add

* aaa

* add

* add

* add

* add

* [ISSUE mxsm#300]🚧Support send message and send message v2(request code:10,310)-3

* fix code style

* fix test case
  • Loading branch information
mxsm authored Apr 16, 2024
1 parent c917382 commit 56dda0b
Show file tree
Hide file tree
Showing 36 changed files with 923 additions and 501 deletions.
2 changes: 2 additions & 0 deletions rocketmq-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ rand = "0.8.5"
#tools
dirs.workspace = true

local-ip-address = "0.6.1"

[[bin]]
name = "rocketmq-broker-rust"
path = "src/bin/broker_bootstrap_server.rs"
2 changes: 1 addition & 1 deletion rocketmq-broker/src/bin/broker_bootstrap_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn parse_config_file() -> (BrokerConfig, MessageStoreConfig) {
ParseConfigFile::parse_config_file::<BrokerConfig>(config_file.clone())
.ok()
.unwrap(),
ParseConfigFile::parse_config_file::<MessageStoreConfig>(config_file.clone())
ParseConfigFile::parse_config_file::<MessageStoreConfig>(config_file)
.ok()
.unwrap(),
)
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-broker/src/broker_bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Builder {
pub fn new() -> Self {
Builder {
broker_config: Default::default(),
message_store_config: MessageStoreConfig::new(),
message_store_config: MessageStoreConfig::default(),
server_config: Default::default(),
}
}
Expand Down
5 changes: 3 additions & 2 deletions rocketmq-broker/src/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ pub struct BrokerConfig {
impl Default for BrokerConfig {
fn default() -> Self {
let broker_identity = BrokerIdentity::new();
let broker_ip1 = String::from("127.0.0.1");
let broker_ip2 = None;
let local_ip = local_ip_address::local_ip().unwrap();
let broker_ip1 = local_ip.to_string();
let broker_ip2 = Some(local_ip.to_string());
let listen_port = 10911;

BrokerConfig {
Expand Down
39 changes: 32 additions & 7 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use rocketmq_common::common::{
config::TopicConfig, config_manager::ConfigManager, constant::PermName,
};
use rocketmq_remoting::{
code::request_code::RequestCode,
protocol::{
body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper,
static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail,
Expand All @@ -47,7 +48,9 @@ use crate::{
consumer_order_info_manager::ConsumerOrderInfoManager,
},
out_api::broker_outer_api::BrokerOuterAPI,
processor::admin_broker_processor::AdminBrokerProcessor,
processor::{
admin_broker_processor::AdminBrokerProcessor, send_message_processor::SendMessageProcessor,
},
schedule::schedule_message_service::ScheduleMessageService,
subscription::manager::subscription_group_manager::SubscriptionGroupManager,
topic::manager::{
Expand Down Expand Up @@ -141,7 +144,7 @@ impl Drop for BrokerRuntime {

impl BrokerRuntime {
pub(crate) async fn initialize(&mut self) -> bool {
let mut result = self.initialize_metadata().await;
let mut result = self.initialize_metadata();
if !result {
warn!("Initialize metadata failed");
return false;
Expand All @@ -154,7 +157,7 @@ impl BrokerRuntime {
self.recover_initialize_service()
}

async fn initialize_metadata(&self) -> bool {
fn initialize_metadata(&self) -> bool {
info!("======Starting initialize metadata========");

self.topic_config_manager.load()
Expand Down Expand Up @@ -212,10 +215,32 @@ impl BrokerRuntime {

fn initialize_resources(&mut self) {}

fn init_processor(&mut self) -> (BoxedRequestProcessor, RequestProcessorTable) {
fn init_processor(&self) -> (BoxedRequestProcessor, RequestProcessorTable) {
let default_processor = BoxedRequestProcessor::new(Box::<AdminBrokerProcessor>::default());
let request_processor_table = RequestProcessorTable::new();

let mut request_processor_table = RequestProcessorTable::new();

let send_message_process = BoxedRequestProcessor::new(Box::new(SendMessageProcessor::new(
self.topic_queue_mapping_manager.clone(),
self.topic_config_manager.clone(),
self.broker_config.clone(),
self.message_store.clone().unwrap(),
)));
request_processor_table.insert(
RequestCode::SendMessage.to_i32(),
send_message_process.clone(),
);
request_processor_table.insert(
RequestCode::SendMessageV2.to_i32(),
send_message_process.clone(),
);
request_processor_table.insert(
RequestCode::SendBatchMessage.to_i32(),
send_message_process.clone(),
);
request_processor_table.insert(
RequestCode::ConsumerSendMsgBack.to_i32(),
send_message_process,
);
(default_processor, request_processor_table)
}

Expand Down Expand Up @@ -376,7 +401,7 @@ impl BrokerRuntime {
.broker_cluster_name
.clone();
let broker_name = self.broker_config.broker_identity.broker_name.clone();
let broker_addr = self.broker_config.broker_ip1.clone();
let broker_addr = format!("{}:{}", self.broker_config.broker_ip1, 10911);
let broker_id = self.broker_config.broker_identity.broker_id;
self.broker_out_api
.register_broker_all(
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-broker/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl SendMessageProcessorInner {
}

let mut properties =
MessageDecoder::string_to_message_properties(request_header.properties.as_deref());
MessageDecoder::string_to_message_properties(request_header.properties.as_ref());
properties.insert(
MessageConst::PROPERTY_MSG_REGION.to_string(),
self.broker_config.region_id(),
Expand Down
Loading

0 comments on commit 56dda0b

Please sign in to comment.