diff --git a/rocketmq-broker/Cargo.toml b/rocketmq-broker/Cargo.toml index 63583637..edcec317 100644 --- a/rocketmq-broker/Cargo.toml +++ b/rocketmq-broker/Cargo.toml @@ -54,6 +54,8 @@ rand = "0.8.5" #tools dirs.workspace = true +local-ip-address = "0.6.1" + [[bin]] name = "rocketmq-broker-rust" path = "src/bin/broker_bootstrap_server.rs" diff --git a/rocketmq-broker/src/bin/broker_bootstrap_server.rs b/rocketmq-broker/src/bin/broker_bootstrap_server.rs index d707f1cf..a5a4362f 100644 --- a/rocketmq-broker/src/bin/broker_bootstrap_server.rs +++ b/rocketmq-broker/src/bin/broker_bootstrap_server.rs @@ -48,7 +48,7 @@ fn parse_config_file() -> (BrokerConfig, MessageStoreConfig) { ParseConfigFile::parse_config_file::(config_file.clone()) .ok() .unwrap(), - ParseConfigFile::parse_config_file::(config_file.clone()) + ParseConfigFile::parse_config_file::(config_file) .ok() .unwrap(), ) diff --git a/rocketmq-broker/src/broker_bootstrap.rs b/rocketmq-broker/src/broker_bootstrap.rs index 0d465024..2af4b14d 100644 --- a/rocketmq-broker/src/broker_bootstrap.rs +++ b/rocketmq-broker/src/broker_bootstrap.rs @@ -59,7 +59,7 @@ impl Builder { pub fn new() -> Self { Builder { broker_config: Default::default(), - message_store_config: MessageStoreConfig::new(), + message_store_config: MessageStoreConfig::default(), server_config: Default::default(), } } diff --git a/rocketmq-broker/src/broker_config.rs b/rocketmq-broker/src/broker_config.rs index d3efa8e3..4f7cd488 100644 --- a/rocketmq-broker/src/broker_config.rs +++ b/rocketmq-broker/src/broker_config.rs @@ -58,8 +58,9 @@ pub struct BrokerConfig { impl Default for BrokerConfig { fn default() -> Self { let broker_identity = BrokerIdentity::new(); - let broker_ip1 = String::from("127.0.0.1"); - let broker_ip2 = None; + let local_ip = local_ip_address::local_ip().unwrap(); + let broker_ip1 = local_ip.to_string(); + let broker_ip2 = Some(local_ip.to_string()); let listen_port = 10911; BrokerConfig { diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index b6f5e077..abca28d4 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -21,6 +21,7 @@ use rocketmq_common::common::{ config::TopicConfig, config_manager::ConfigManager, constant::PermName, }; use rocketmq_remoting::{ + code::request_code::RequestCode, protocol::{ body::topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper, static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail, @@ -47,7 +48,9 @@ use crate::{ consumer_order_info_manager::ConsumerOrderInfoManager, }, out_api::broker_outer_api::BrokerOuterAPI, - processor::admin_broker_processor::AdminBrokerProcessor, + processor::{ + admin_broker_processor::AdminBrokerProcessor, send_message_processor::SendMessageProcessor, + }, schedule::schedule_message_service::ScheduleMessageService, subscription::manager::subscription_group_manager::SubscriptionGroupManager, topic::manager::{ @@ -141,7 +144,7 @@ impl Drop for BrokerRuntime { impl BrokerRuntime { pub(crate) async fn initialize(&mut self) -> bool { - let mut result = self.initialize_metadata().await; + let mut result = self.initialize_metadata(); if !result { warn!("Initialize metadata failed"); return false; @@ -154,7 +157,7 @@ impl BrokerRuntime { self.recover_initialize_service() } - async fn initialize_metadata(&self) -> bool { + fn initialize_metadata(&self) -> bool { info!("======Starting initialize metadata========"); self.topic_config_manager.load() @@ -212,10 +215,32 @@ impl BrokerRuntime { fn initialize_resources(&mut self) {} - fn init_processor(&mut self) -> (BoxedRequestProcessor, RequestProcessorTable) { + fn init_processor(&self) -> (BoxedRequestProcessor, RequestProcessorTable) { let default_processor = BoxedRequestProcessor::new(Box::::default()); - let request_processor_table = RequestProcessorTable::new(); - + let mut request_processor_table = RequestProcessorTable::new(); + + let send_message_process = BoxedRequestProcessor::new(Box::new(SendMessageProcessor::new( + self.topic_queue_mapping_manager.clone(), + self.topic_config_manager.clone(), + self.broker_config.clone(), + self.message_store.clone().unwrap(), + ))); + request_processor_table.insert( + RequestCode::SendMessage.to_i32(), + send_message_process.clone(), + ); + request_processor_table.insert( + RequestCode::SendMessageV2.to_i32(), + send_message_process.clone(), + ); + request_processor_table.insert( + RequestCode::SendBatchMessage.to_i32(), + send_message_process.clone(), + ); + request_processor_table.insert( + RequestCode::ConsumerSendMsgBack.to_i32(), + send_message_process, + ); (default_processor, request_processor_table) } @@ -376,7 +401,7 @@ impl BrokerRuntime { .broker_cluster_name .clone(); let broker_name = self.broker_config.broker_identity.broker_name.clone(); - let broker_addr = self.broker_config.broker_ip1.clone(); + let broker_addr = format!("{}:{}", self.broker_config.broker_ip1, 10911); let broker_id = self.broker_config.broker_identity.broker_id; self.broker_out_api .register_broker_all( diff --git a/rocketmq-broker/src/processor.rs b/rocketmq-broker/src/processor.rs index 5a830199..78ed927a 100644 --- a/rocketmq-broker/src/processor.rs +++ b/rocketmq-broker/src/processor.rs @@ -99,7 +99,7 @@ impl SendMessageProcessorInner { } let mut properties = - MessageDecoder::string_to_message_properties(request_header.properties.as_deref()); + MessageDecoder::string_to_message_properties(request_header.properties.as_ref()); properties.insert( MessageConst::PROPERTY_MSG_REGION.to_string(), self.broker_config.region_id(), diff --git a/rocketmq-broker/src/processor/send_message_processor.rs b/rocketmq-broker/src/processor/send_message_processor.rs index a6bc6810..422f59ab 100644 --- a/rocketmq-broker/src/processor/send_message_processor.rs +++ b/rocketmq-broker/src/processor/send_message_processor.rs @@ -15,17 +15,23 @@ * limitations under the License. */ -use std::sync::Arc; +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; -use rocketmq_common::common::{ - attribute::topic_message_type::TopicMessageType, - message::{ - message_single::{MessageExt, MessageExtBrokerInner}, - MessageConst, +use rocketmq_common::{ + common::{ + attribute::{cleanup_policy::CleanupPolicy, topic_message_type::TopicMessageType}, + message::{ + message_batch::MessageExtBatch, + message_client_id_setter, + message_single::{MessageExt, MessageExtBrokerInner}, + MessageConst, + }, + mix_all::RETRY_GROUP_TOPIC_PREFIX, }, + CleanupPolicyUtils, MessageDecoder, }; use rocketmq_remoting::{ - code::request_code::RequestCode, + code::{request_code::RequestCode, response_code::ResponseCode}, protocol::{ header::message_operation_header::{ send_message_request_header::{parse_request_header, SendMessageRequestHeader}, @@ -38,6 +44,8 @@ use rocketmq_remoting::{ runtime::{processor::RequestProcessor, server::ConnectionHandlerContext}, }; use rocketmq_store::{base::message_result::PutMessageResult, log_file::MessageStore}; +use tokio::runtime::Handle; +use tracing::info; use crate::{ broker_config::BrokerConfig, @@ -51,26 +59,28 @@ use crate::{ pub struct SendMessageProcessor { inner: SendMessageProcessorInner, - topic_queue_mapping_manager: Arc>, - topic_config_manager: Arc>, - broker_config: Arc>, - message_store: Arc>, + topic_queue_mapping_manager: Arc, + topic_config_manager: Arc, + broker_config: Arc, + message_store: Arc, + store_host: SocketAddr, } impl Default for SendMessageProcessor { fn default() -> Self { + let store_host = "127.0.0.1:100".parse::().unwrap(); Self { inner: SendMessageProcessorInner::default(), - topic_queue_mapping_manager: Arc::new(parking_lot::RwLock::new( - TopicQueueMappingManager::default(), - )), - topic_config_manager: Arc::new(parking_lot::RwLock::new(TopicConfigManager::default())), - broker_config: Arc::new(parking_lot::RwLock::new(BrokerConfig::default())), - message_store: Arc::new(parking_lot::RwLock::new(Default::default())), + topic_queue_mapping_manager: Arc::new(TopicQueueMappingManager::default()), + topic_config_manager: Arc::new(TopicConfigManager::default()), + broker_config: Arc::new(BrokerConfig::default()), + message_store: Arc::new(Default::default()), + store_host, } } } +// RequestProcessor implementation impl RequestProcessor for SendMessageProcessor { fn process_request( &self, @@ -78,17 +88,16 @@ impl RequestProcessor for SendMessageProcessor { request: RemotingCommand, ) -> RemotingCommand { let request_code = RequestCode::from(request.code()); + info!("process_request: {:?}", request_code); let response = match request_code { RequestCode::ConsumerSendMsgBack => self.inner.consumer_send_msg_back(&ctx, &request), _ => { let mut request_header = parse_request_header(&request).unwrap(); let mapping_context = self .topic_queue_mapping_manager - .write() .build_topic_queue_mapping_context(&request_header, true); let rewrite_result = self .topic_queue_mapping_manager - .read() .rewrite_request_for_static_topic(&request_header, &mapping_context); if let Some(rewrite_result) = rewrite_result { return rewrite_result; @@ -125,15 +134,23 @@ impl RequestProcessor for SendMessageProcessor { #[allow(unused_variables)] impl SendMessageProcessor { pub fn new( - topic_queue_mapping_manager: Arc>, - message_store: Arc>, + topic_queue_mapping_manager: Arc, + topic_config_manager: Arc, + broker_config: Arc, + message_store: Arc, ) -> Self { + let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port) + .parse::() + .unwrap(); Self { - inner: SendMessageProcessorInner::default(), + inner: SendMessageProcessorInner { + broker_config: broker_config.clone(), + }, topic_queue_mapping_manager, - topic_config_manager: Arc::new(Default::default()), - broker_config: Arc::new(Default::default()), + topic_config_manager, + broker_config, message_store, + store_host, } } @@ -164,30 +181,179 @@ impl SendMessageProcessor { where F: FnOnce(&SendMessageContext, &RemotingCommand), { - let response = self.pre_send(ctx.as_ref(), request.as_ref(), &request_header); + let mut response = self.pre_send(ctx.as_ref(), request.as_ref(), &request_header); if response.code() != -1 { return Some(response); } + + if request_header.topic.len() > i8::MAX as usize { + return Some( + response + .set_code(ResponseCode::MessageIllegal) + .set_remark(Some(format!( + "message topic length too long {}", + request_header.topic().len() + ))), + ); + } + + if !request_header.topic.is_empty() + && request_header.topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) + { + return Some( + response + .set_code(ResponseCode::MessageIllegal) + .set_remark(Some(format!( + "batch request does not support retry group {}", + request_header.topic() + ))), + ); + } + let response_header = SendMessageResponseHeader::default(); - let topic_config = self + let mut topic_config = self .topic_config_manager - .read() .select_topic_config(request_header.topic().as_str()) .unwrap(); - let mut _queue_id = request_header.queue_id; - if _queue_id < 0 { - _queue_id = self.inner.random_queue_id(topic_config.write_queue_nums) as i32; + let mut queue_id = request_header.queue_id; + if queue_id < 0 { + queue_id = self.inner.random_queue_id(topic_config.write_queue_nums) as i32; + } + + let mut message_ext_batch = MessageExtBatch::default(); + message_ext_batch + .message_ext_broker_inner + .message_ext_inner + .message + .topic = request_header.topic().to_string(); + message_ext_batch + .message_ext_broker_inner + .message_ext_inner + .queue_id = queue_id; + let mut ori_props = + MessageDecoder::string_to_message_properties(request_header.properties.as_ref()); + if self.handle_retry_and_dlq( + &request_header, + &mut response, + request, + &message_ext_batch.message_ext_broker_inner.message_ext_inner, + &mut topic_config, + &mut ori_props, + ) { + return Some(response); } + message_ext_batch + .message_ext_broker_inner + .message_ext_inner + .message + .body + .clone_from(request.body()); + message_ext_batch + .message_ext_broker_inner + .message_ext_inner + .message + .flag = request_header.flag; - if self.broker_config.read().async_send_enable { - None + let uniq_key = ori_props.get(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + let uniq_key = if let Some(value) = uniq_key { + if value.is_empty() { + let uniq_key_inner = message_client_id_setter::create_uniq_id(); + ori_props.insert( + MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX.to_string(), + uniq_key_inner.clone(), + ); + uniq_key_inner + } else { + value.to_string() + } } else { - let result = self - .message_store - .read() - .put_message(MessageExtBrokerInner::default()); - Some(response) + let uniq_key_inner = message_client_id_setter::create_uniq_id(); + ori_props.insert( + MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX.to_string(), + uniq_key_inner.clone(), + ); + uniq_key_inner + }; + + message_ext_batch + .message_ext_broker_inner + .message_ext_inner + .message + .properties = ori_props; + let cleanup_policy = CleanupPolicyUtils::get_delete_policy(Some(&topic_config)); + + if cleanup_policy == CleanupPolicy::COMPACTION { + if let Some(value) = message_ext_batch + .message_ext_broker_inner + .message_ext_inner + .message + .properties + .get(MessageConst::PROPERTY_KEYS) + { + if value.trim().is_empty() { + return Some( + response + .set_code(ResponseCode::MessageIllegal) + .set_remark(Some("Required message key is missing".to_string())), + ); + } + } } + message_ext_batch.message_ext_broker_inner.tags_code = + MessageExtBrokerInner::tags_string2tags_code( + &topic_config.topic_filter_type, + message_ext_batch + .get_tags() + .unwrap_or("".to_string()) + .as_str(), + ) as i64; + + message_ext_batch + .message_ext_broker_inner + .message_ext_inner + .born_timestamp = request_header.born_timestamp; + message_ext_batch + .message_ext_broker_inner + .message_ext_inner + .born_host = ctx.remoting_address(); + + message_ext_batch + .message_ext_broker_inner + .message_ext_inner + .store_host = self.store_host; + + message_ext_batch + .message_ext_broker_inner + .message_ext_inner + .reconsume_times = request_header.reconsume_times.unwrap_or(0); + + message_ext_batch + .message_ext_broker_inner + .message_ext_inner + .message + .properties + .insert( + MessageConst::PROPERTY_CLUSTER.to_string(), + self.broker_config + .broker_identity + .broker_cluster_name + .clone(), + ); + + message_ext_batch.message_ext_broker_inner.properties_string = + MessageDecoder::message_properties_to_string( + &message_ext_batch + .message_ext_broker_inner + .message_ext_inner + .message + .properties, + ); + + let result = self + .message_store + .put_message(MessageExtBrokerInner::default()); + let put_message_result = Handle::current().block_on(result); + Some(response) } fn handle_put_message_result( @@ -216,11 +382,11 @@ impl SendMessageProcessor { response.with_opaque(request.opaque()); response.add_ext_field( MessageConst::PROPERTY_MSG_REGION, - self.broker_config.read().region_id(), + self.broker_config.region_id(), ); response.add_ext_field( MessageConst::PROPERTY_TRACE_SWITCH, - self.broker_config.read().trace_on.to_string(), + self.broker_config.trace_on.to_string(), ); //todo java code to implement @@ -233,4 +399,16 @@ impl SendMessageProcessor { .msg_check(ctx, request, request_header, &mut response); response } + + fn handle_retry_and_dlq( + &self, + request_header: &SendMessageRequestHeader, + response: &mut RemotingCommand, + request: &RemotingCommand, + msg: &MessageExt, + topic_config: &mut rocketmq_common::common::config::TopicConfig, + properties: &mut HashMap, + ) -> bool { + unimplemented!() + } } diff --git a/rocketmq-common/src/common.rs b/rocketmq-common/src/common.rs index 72ec8126..6feaa320 100644 --- a/rocketmq-common/src/common.rs +++ b/rocketmq-common/src/common.rs @@ -39,7 +39,7 @@ pub mod namesrv; pub mod sys_flag; pub mod topic; -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Eq, PartialEq)] pub enum TopicFilterType { #[default] SingleTag, diff --git a/rocketmq-common/src/common/attribute.rs b/rocketmq-common/src/common/attribute.rs index 5d75a195..d933a794 100644 --- a/rocketmq-common/src/common/attribute.rs +++ b/rocketmq-common/src/common/attribute.rs @@ -17,55 +17,12 @@ use std::collections::HashSet; pub mod attribute_enum; +pub mod cleanup_policy; pub mod cq_type; pub mod topic_attributes; pub mod topic_message_type; -pub trait Attribute { - fn verify(&self, value: &str); - fn get_name(&self) -> &str; - fn is_changeable(&self) -> bool; -} - -pub struct EnumAttribute { - name: String, - changeable: bool, - universe: HashSet, - default_value: String, -} - -impl EnumAttribute { - pub fn new( - name: impl Into, - changeable: bool, - universe: HashSet, - default_value: impl Into, - ) -> Self { - EnumAttribute { - name: name.into(), - changeable, - universe, - default_value: default_value.into(), - } - } - - pub fn get_default_value(&self) -> &str { - &self.default_value - } -} - -impl Attribute for EnumAttribute { - fn verify(&self, value: &str) { - if !self.universe.contains(value) { - panic!("value is not in set: {:?}", self.universe); - } - } - - fn get_name(&self) -> &str { - &self.name - } - - fn is_changeable(&self) -> bool { - self.changeable - } +pub struct Attribute { + pub(crate) name: String, + pub(crate) changeable: bool, } diff --git a/rocketmq-common/src/common/attribute/attribute_enum.rs b/rocketmq-common/src/common/attribute/attribute_enum.rs index 2944f981..78d391b9 100644 --- a/rocketmq-common/src/common/attribute/attribute_enum.rs +++ b/rocketmq-common/src/common/attribute/attribute_enum.rs @@ -14,3 +14,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::collections::HashSet; + +use crate::common::attribute::Attribute; + +pub struct EnumAttribute { + pub(crate) attribute: Attribute, + pub(crate) universe: HashSet, + pub(crate) default_value: String, +} + +impl EnumAttribute { + pub fn get_name(&self) -> &str { + self.attribute.name.as_str() + } + + pub fn get_default_value(&self) -> &str { + &self.default_value + } + + pub fn get_universe(&self) -> &HashSet { + &self.universe + } + + pub fn verify(&self, value: &str) -> bool { + !self.universe.contains(value) + } +} diff --git a/rocketmq-common/src/common/attribute/cleanup_policy.rs b/rocketmq-common/src/common/attribute/cleanup_policy.rs new file mode 100644 index 00000000..56f61666 --- /dev/null +++ b/rocketmq-common/src/common/attribute/cleanup_policy.rs @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum CleanupPolicy { + #[default] + DELETE, + COMPACTION, +} diff --git a/rocketmq-common/src/common/attribute/topic_attributes.rs b/rocketmq-common/src/common/attribute/topic_attributes.rs index 79272ca0..510babfb 100644 --- a/rocketmq-common/src/common/attribute/topic_attributes.rs +++ b/rocketmq-common/src/common/attribute/topic_attributes.rs @@ -3,43 +3,48 @@ use std::collections::HashMap; use lazy_static::lazy_static; use crate::{ - common::attribute::{topic_message_type::TopicMessageType, Attribute, EnumAttribute}, + common::attribute::{ + attribute_enum::EnumAttribute, topic_message_type::TopicMessageType, Attribute, + }, hashset, }; lazy_static! { pub static ref CLEANUP_POLICY_ATTRIBUTE: EnumAttribute = EnumAttribute { - name: String::from("cleanup.policy"), - changeable: false, + attribute: Attribute { + name: String::from("cleanup.policy"), + changeable: false, + }, universe: hashset! {String::from("DELETE"), String::from("COMPACTION")}, default_value: String::from("DELETE"), }; pub static ref TOPIC_MESSAGE_TYPE_ATTRIBUTE: EnumAttribute = EnumAttribute { - name: String::from("message.type"), - changeable: true, + attribute: Attribute { + name: String::from("message.type"), + changeable: true, + }, universe: TopicMessageType::topic_message_type_set(), default_value: TopicMessageType::Normal.to_string(), }; pub static ref QUEUE_TYPE_ATTRIBUTE: EnumAttribute = EnumAttribute { - name: String::from("queue.type"), - changeable: false, + attribute: Attribute { + name: String::from("queue.type"), + changeable: false, + }, universe: hashset! {String::from("BatchCQ"), String::from("SimpleCQ")}, default_value: String::from("SimpleCQ"), }; - pub static ref ALL: HashMap<&'static str, &'static (dyn Attribute + Send + Sync)> = { - let mut map = HashMap::new(); - map.insert( - QUEUE_TYPE_ATTRIBUTE.get_name(), - QUEUE_TYPE_ATTRIBUTE.deref() as &(dyn Attribute + std::marker::Send + Sync), - ); +/* pub static ref ALL: HashMap<&'static str, EnumAttribute> = { + let mut map = HashMap::<&'static str, EnumAttribute>::new(); + map.insert(QUEUE_TYPE_ATTRIBUTE.get_name(), QUEUE_TYPE_ATTRIBUTE); map.insert( CLEANUP_POLICY_ATTRIBUTE.get_name(), - CLEANUP_POLICY_ATTRIBUTE.deref() as &(dyn Attribute + std::marker::Send + Sync), + CLEANUP_POLICY_ATTRIBUTE, ); map.insert( TOPIC_MESSAGE_TYPE_ATTRIBUTE.get_name(), - TOPIC_MESSAGE_TYPE_ATTRIBUTE.deref() as &(dyn Attribute + std::marker::Send + Sync), + TOPIC_MESSAGE_TYPE_ATTRIBUTE, ); map - }; + };*/ } diff --git a/rocketmq-common/src/common/message.rs b/rocketmq-common/src/common/message.rs index fa2b2676..fd04a126 100644 --- a/rocketmq-common/src/common/message.rs +++ b/rocketmq-common/src/common/message.rs @@ -22,7 +22,9 @@ use std::{ use lazy_static::lazy_static; +pub mod message_accessor; pub mod message_batch; +pub mod message_client_id_setter; pub mod message_decoder; pub mod message_enum; pub mod message_id; diff --git a/rocketmq-common/src/common/message/message_accessor.rs b/rocketmq-common/src/common/message/message_accessor.rs new file mode 100644 index 00000000..865d91d0 --- /dev/null +++ b/rocketmq-common/src/common/message/message_accessor.rs @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::common::message::{message_single::Message, MessageConst, MessageTrait}; + +pub fn set_transfer_flag(msg: &mut Message, unit: &str) { + msg.put_property( + MessageConst::PROPERTY_TRANSFER_FLAG.to_string(), + unit.to_string(), + ); +} + +pub fn get_transfer_flag(msg: &Message) -> Option { + msg.get_property(MessageConst::PROPERTY_TRANSFER_FLAG) +} + +pub fn set_correction_flag(msg: &mut Message, unit: &str) { + msg.put_property( + MessageConst::PROPERTY_CORRECTION_FLAG.to_string(), + unit.to_string(), + ); +} + +pub fn get_correction_flag(msg: &Message) -> Option { + msg.get_property(MessageConst::PROPERTY_CORRECTION_FLAG) +} + +pub fn set_origin_message_id(msg: &mut Message, origin_message_id: &str) { + msg.put_property( + MessageConst::PROPERTY_ORIGIN_MESSAGE_ID.to_string(), + origin_message_id.to_string(), + ); +} + +pub fn get_origin_message_id(msg: &Message) -> Option { + msg.get_property(MessageConst::PROPERTY_ORIGIN_MESSAGE_ID) +} + +pub fn set_mq2_flag(msg: &mut Message, flag: &str) { + msg.put_property( + MessageConst::PROPERTY_MQ2_FLAG.to_string(), + flag.to_string(), + ); +} + +pub fn get_mq2_flag(msg: &Message) -> Option { + msg.get_property(MessageConst::PROPERTY_MQ2_FLAG) +} + +pub fn set_reconsume_time(msg: &mut Message, reconsume_times: &str) { + msg.put_property( + MessageConst::PROPERTY_RECONSUME_TIME.to_string(), + reconsume_times.to_string(), + ); +} + +pub fn get_reconsume_time(msg: &Message) -> Option { + msg.get_property(MessageConst::PROPERTY_RECONSUME_TIME) +} + +pub fn set_max_reconsume_times(msg: &mut Message, max_reconsume_times: &str) { + msg.put_property( + MessageConst::PROPERTY_MAX_RECONSUME_TIMES.to_string(), + max_reconsume_times.to_string(), + ); +} + +pub fn get_max_reconsume_times(msg: &Message) -> Option { + msg.get_property(MessageConst::PROPERTY_MAX_RECONSUME_TIMES) +} + +pub fn set_consume_start_time_stamp(msg: &mut Message, property_consume_start_time_stamp: &str) { + msg.put_property( + MessageConst::PROPERTY_CONSUME_START_TIMESTAMP.to_string(), + property_consume_start_time_stamp.to_string(), + ); +} + +pub fn get_consume_start_time_stamp(msg: &Message) -> Option { + msg.get_property(MessageConst::PROPERTY_CONSUME_START_TIMESTAMP) +} diff --git a/rocketmq-common/src/common/message/message_batch.rs b/rocketmq-common/src/common/message/message_batch.rs index 828b4c42..20e613bb 100644 --- a/rocketmq-common/src/common/message/message_batch.rs +++ b/rocketmq-common/src/common/message/message_batch.rs @@ -67,7 +67,7 @@ impl MessageTrait for MessageBatch { } } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MessageExtBatch { pub message_ext_broker_inner: MessageExtBrokerInner, pub is_inner_batch: bool, @@ -77,4 +77,8 @@ impl MessageExtBatch { pub fn wrap(&self) -> Option { self.message_ext_broker_inner.body() } + + pub fn get_tags(&self) -> Option { + self.message_ext_broker_inner.get_tags() + } } diff --git a/rocketmq-common/src/common/message/message_client_id_setter.rs b/rocketmq-common/src/common/message/message_client_id_setter.rs new file mode 100644 index 00000000..83b8b29f --- /dev/null +++ b/rocketmq-common/src/common/message/message_client_id_setter.rs @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub fn create_uniq_id() -> String { + unimplemented!() +} diff --git a/rocketmq-common/src/common/message/message_decoder.rs b/rocketmq-common/src/common/message/message_decoder.rs index 8f247672..09f43a03 100644 --- a/rocketmq-common/src/common/message/message_decoder.rs +++ b/rocketmq-common/src/common/message/message_decoder.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; pub const NAME_VALUE_SEPARATOR: char = '\u{0001}'; pub const PROPERTY_SEPARATOR: char = '\u{0002}'; -pub fn string_to_message_properties(properties: Option<&str>) -> HashMap { +pub fn string_to_message_properties(properties: Option<&String>) -> HashMap { let mut map = HashMap::new(); if let Some(properties) = properties { let mut index = 0; diff --git a/rocketmq-common/src/common/message/message_single.rs b/rocketmq-common/src/common/message/message_single.rs index 13b82d17..45188e94 100644 --- a/rocketmq-common/src/common/message/message_single.rs +++ b/rocketmq-common/src/common/message/message_single.rs @@ -15,14 +15,19 @@ * limitations under the License. */ -use std::{collections::HashMap, net::SocketAddr}; +use std::{ + collections::HashMap, + hash::{DefaultHasher, Hash, Hasher}, + net::SocketAddr, +}; use bytes::{Buf, BufMut}; use crate::{ common::{ - message::{MessageTrait, MessageVersion, MESSAGE_MAGIC_CODE_V1}, + message::{MessageConst, MessageTrait, MessageVersion, MESSAGE_MAGIC_CODE_V1}, sys_flag::message_sys_flag::MessageSysFlag, + TopicFilterType, }, MessageUtils, }; @@ -41,6 +46,14 @@ impl Message { self.properties.remove(name.into().as_str()); } + pub fn set_properties(&mut self, properties: HashMap) { + self.properties = properties; + } + + pub fn get_property(&self, key: impl Into) -> Option { + self.properties.get(key.into().as_str()).cloned() + } + pub fn body(&self) -> Option { self.body.as_ref().cloned() } @@ -58,6 +71,10 @@ impl Message { pub fn transaction_id(&self) -> Option<&str> { self.transaction_id.as_deref() } + + pub fn get_tags(&self) -> Option { + self.get_property(MessageConst::PROPERTY_TAGS) + } } #[allow(unused_variables)] @@ -101,7 +118,7 @@ impl MessageTrait for Message { #[derive(Clone, Debug)] pub struct MessageExt { - pub message_inner: Message, + pub message: Message, pub broker_name: String, pub queue_id: i32, pub store_size: i32, @@ -145,7 +162,7 @@ impl MessageExt { } pub fn topic(&self) -> &str { - self.message_inner.topic() + self.message.topic() } pub fn born_host(&self) -> SocketAddr { @@ -165,7 +182,7 @@ impl MessageExt { } pub fn body(&self) -> Option { - self.message_inner.body() + self.message.body() } #[inline] @@ -182,11 +199,11 @@ impl MessageExt { } pub fn flag(&self) -> i32 { - self.message_inner.flag() + self.message.flag() } pub fn message_inner(&self) -> &Message { - &self.message_inner + &self.message } pub fn broker_name(&self) -> &str { &self.broker_name @@ -217,7 +234,7 @@ impl MessageExt { } pub fn set_message_inner(&mut self, message_inner: Message) { - self.message_inner = message_inner; + self.message = message_inner; } pub fn set_broker_name(&mut self, broker_name: String) { self.broker_name = broker_name; @@ -263,14 +280,18 @@ impl MessageExt { } pub fn properties(&self) -> &HashMap { - self.message_inner.properties() + self.message.properties() + } + + pub fn get_tags(&self) -> Option { + self.message.get_tags() } } impl Default for MessageExt { fn default() -> Self { Self { - message_inner: Default::default(), + message: Default::default(), broker_name: "".to_string(), queue_id: 0, store_size: 0, @@ -299,7 +320,7 @@ pub struct MessageExtBrokerInner { pub message_ext_inner: MessageExt, pub properties_string: String, pub tags_code: i64, - pub encoded_buff: bytes::Bytes, + pub encoded_buff: bytes::BytesMut, pub encode_completed: bool, pub version: MessageVersion, } @@ -309,9 +330,7 @@ impl MessageExtBrokerInner { pub fn delete_property(&mut self, name: impl Into) { let name = name.into(); - self.message_ext_inner - .message_inner - .clear_property(name.as_str()); + self.message_ext_inner.message.clear_property(name.as_str()); self.properties_string = MessageUtils::delete_property(self.properties_string.as_str(), name.as_str()); } @@ -394,4 +413,17 @@ impl MessageExtBrokerInner { pub fn queue_offset(&self) -> i64 { self.message_ext_inner.queue_offset() } + + pub fn tags_string2tags_code(_filter: &TopicFilterType, tags: &str) -> u64 { + if tags.is_empty() { + return 0; + } + let mut hasher = DefaultHasher::new(); + tags.hash(&mut hasher); + hasher.finish() + } + + pub fn get_tags(&self) -> Option { + self.message_ext_inner.get_tags() + } } diff --git a/rocketmq-common/src/lib.rs b/rocketmq-common/src/lib.rs index 627116e7..fdda5bc7 100644 --- a/rocketmq-common/src/lib.rs +++ b/rocketmq-common/src/lib.rs @@ -24,9 +24,9 @@ pub use crate::{ TokioExecutorService, }, utils::{ - crc32_utils as CRC32Utils, env_utils as EnvUtils, file_utils as FileUtils, - message_utils as MessageUtils, parse_config_file as ParseConfigFile, - time_utils as TimeUtils, util_all as UtilAll, + cleanup_policy_utils as CleanupPolicyUtils, crc32_utils as CRC32Utils, + env_utils as EnvUtils, file_utils as FileUtils, message_utils as MessageUtils, + parse_config_file as ParseConfigFile, time_utils as TimeUtils, util_all as UtilAll, }, }; @@ -37,7 +37,9 @@ pub mod log; mod thread_pool; pub mod utils; -pub use crate::common::message::message_decoder as MessageDecoder; +pub use crate::common::message::{ + message_accessor as MessageAccessor, message_decoder as MessageDecoder, +}; #[cfg(test)] mod tests {} diff --git a/rocketmq-common/src/utils.rs b/rocketmq-common/src/utils.rs index a2b95eef..7aad8ac9 100644 --- a/rocketmq-common/src/utils.rs +++ b/rocketmq-common/src/utils.rs @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +pub mod cleanup_policy_utils; pub mod crc32_utils; pub mod env_utils; pub mod file_utils; diff --git a/rocketmq-common/src/utils/cleanup_policy_utils.rs b/rocketmq-common/src/utils/cleanup_policy_utils.rs new file mode 100644 index 00000000..c3238a0e --- /dev/null +++ b/rocketmq-common/src/utils/cleanup_policy_utils.rs @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::common::{attribute::cleanup_policy::CleanupPolicy, config::TopicConfig}; + +pub fn get_delete_policy(_topic_config: Option<&TopicConfig>) -> CleanupPolicy { + unimplemented!() +} diff --git a/rocketmq-common/src/utils/message_utils.rs b/rocketmq-common/src/utils/message_utils.rs index 52db901a..808a6a9b 100644 --- a/rocketmq-common/src/utils/message_utils.rs +++ b/rocketmq-common/src/utils/message_utils.rs @@ -34,7 +34,7 @@ pub fn get_sharding_key_index(sharding_key: &str, index_size: usize) -> usize { #[allow(clippy::manual_unwrap_or_default)] pub fn get_sharding_key_index_by_msg(msg: &MessageExt, index_size: usize) -> usize { let sharding_key = match msg - .message_inner + .message .properties .get(MessageConst::PROPERTY_SHARDING_KEY) { diff --git a/rocketmq-common/src/utils/parse_config_file.rs b/rocketmq-common/src/utils/parse_config_file.rs index 408ef87e..30eccbef 100644 --- a/rocketmq-common/src/utils/parse_config_file.rs +++ b/rocketmq-common/src/utils/parse_config_file.rs @@ -35,6 +35,6 @@ where .map_or(C::default(), |result| { result.try_deserialize::().unwrap_or_default() }); - info!("parse config: {:?}", config_file); + //info!("parse config: {:?}", config_file); Ok(config_file) } diff --git a/rocketmq-remoting/src/protocol/remoting_command.rs b/rocketmq-remoting/src/protocol/remoting_command.rs index 3a5efa88..cd665d07 100644 --- a/rocketmq-remoting/src/protocol/remoting_command.rs +++ b/rocketmq-remoting/src/protocol/remoting_command.rs @@ -24,6 +24,7 @@ use std::{ use bytes::Bytes; use lazy_static::lazy_static; +use rocketmq_common::common::mq_version::RocketMqVersion; use serde::{Deserialize, Serialize}; use super::{RemotingCommandType, SerializeType}; @@ -44,8 +45,10 @@ lazy_static! { fn set_cmd_version(cmd: &mut RemotingCommand) { INIT.call_once(|| { let v = match std::env::var("REMOTING_VERSION_KEY") { - Ok(value) => value.parse::().unwrap_or(-1), - Err(_) => -1, + Ok(value) => value + .parse::() + .unwrap_or(i32::from(RocketMqVersion::V500)), + Err(_) => i32::from(RocketMqVersion::V500), }; *CONFIG_VERSION.write().unwrap() = v; }); diff --git a/rocketmq-store/Cargo.toml b/rocketmq-store/Cargo.toml index e5098e38..b922e966 100644 --- a/rocketmq-store/Cargo.toml +++ b/rocketmq-store/Cargo.toml @@ -19,6 +19,7 @@ data_store = ["local_file_store"] [dependencies] rocketmq-common = { version = "0.2.0", path = "../rocketmq-common" } +rocketmq-runtime = { version = "0.2.0", path = "../rocketmq-runtime" } #tools dirs.workspace = true diff --git a/rocketmq-store/src/base/append_message_callback.rs b/rocketmq-store/src/base/append_message_callback.rs index 6d60cc5b..d9348809 100644 --- a/rocketmq-store/src/base/append_message_callback.rs +++ b/rocketmq-store/src/base/append_message_callback.rs @@ -14,26 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use std::{io::Write, mem, sync::Arc}; - -use bytes::{Buf, BufMut}; -use rocketmq_common::{ - common::{ - message::{ - message_batch::MessageExtBatch, - message_single::{MessageExt, MessageExtBrokerInner}, - }, - sys_flag::message_sys_flag::MessageSysFlag, - }, - UtilAll, +use std::sync::Arc; + +use bytes::BytesMut; +use rocketmq_common::common::{ + message::{message_batch::MessageExtBatch, message_single::MessageExtBrokerInner}, + sys_flag::message_sys_flag::MessageSysFlag, }; use crate::{ base::{ - message_result::AppendMessageResult, put_message_context::PutMessageContext, ByteBuffer, + message_result::AppendMessageResult, message_status_enum::AppendMessageStatus, + put_message_context::PutMessageContext, }, config::message_store_config::MessageStoreConfig, - log_file::commit_log::{CommitLog, BLANK_MAGIC_CODE, CRC32_RESERVED_LEN}, + log_file::commit_log::{CommitLog, CRC32_RESERVED_LEN}, }; /// Write messages callback interface @@ -54,10 +49,9 @@ pub trait AppendMessageCallback { fn do_append( &mut self, file_from_offset: i64, - byte_buffer: &mut ByteBuffer, max_blank: i32, msg: &mut MessageExtBrokerInner, - put_message_context: &PutMessageContext, + //put_message_context: &PutMessageContext, ) -> AppendMessageResult; /// After batched message serialization, write MappedByteBuffer @@ -76,7 +70,7 @@ pub trait AppendMessageCallback { fn do_append_batch( &self, file_from_offset: i64, - byte_buffer: &mut [u8], + byte_buffer: &mut BytesMut, max_blank: i32, message_ext_batch: &MessageExtBatch, put_message_context: &PutMessageContext, @@ -107,6 +101,113 @@ impl DefaultAppendMessageCallback { #[allow(unused_variables)] impl AppendMessageCallback for DefaultAppendMessageCallback { fn do_append( + &mut self, + file_from_offset: i64, + max_blank: i32, + msg_inner: &mut MessageExtBrokerInner, + ) -> AppendMessageResult { + let mut pre_encode_buffer = msg_inner.encoded_buff.clone(); // Assuming get_encoded_buff returns Option + let is_multi_dispatch_msg = self.message_store_config.enable_multi_dispatch + && CommitLog::is_multi_dispatch_msg(msg_inner); + if is_multi_dispatch_msg { + /*if let Some(result) = self.handle_properties_for_lmq_msg(&msg_inner) { + return result; + }*/ + } + + let msg_len = i32::from_le_bytes(pre_encode_buffer[0..4].try_into().unwrap()); + let wrote_offset = file_from_offset + 1; + + let msg_id_supplier = || { + let sysflag = msg_inner.sys_flag(); + let msg_id_len = if sysflag & MessageSysFlag::STOREHOSTADDRESS_V6_FLAG == 0 { + 4 + 4 + 8 + } else { + 16 + 4 + 8 + }; + }; + + let mut queue_offset = msg_inner.queue_offset(); + let message_num = CommitLog::get_message_num(msg_inner); + + match MessageSysFlag::get_transaction_value(msg_inner.sys_flag()) { + MessageSysFlag::TRANSACTION_PREPARED_TYPE + | MessageSysFlag::TRANSACTION_ROLLBACK_TYPE => queue_offset = 0, + // MessageSysFlag::TRANSACTION_NOT_TYPE | MessageSysFlag::TRANSACTION_COMMIT_TYPE | _ => + // {} + _ => {} + } + + if (msg_len + END_FILE_MIN_BLANK_LENGTH) > max_blank { + /* self.msg_store_item_memory.0.clear(); + self.msg_store_item_memory.0.extend_from_slice(&(max_blank as i32).to_le_bytes()); + self.msg_store_item_memory.0.extend_from_slice(&(BLANK_MAGIC_CODE as i32).to_le_bytes()); + let mut byte_buffer = Vec::new(); + byte_buffer.extend_from_slice(&self.msg_store_item_memory.0[..8]);*/ + return AppendMessageResult { + status: AppendMessageStatus::EndOfFile, + ..Default::default() + }; + } + + let mut pos = 4 + 4 + 4 + 4 + 4; + pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&queue_offset.to_le_bytes()); + pos += 8; + pre_encode_buffer[pos..(pos + 8)].copy_from_slice(&wrote_offset.to_le_bytes()); + let ip_len = if msg_inner.sys_flag() & MessageSysFlag::BORNHOST_V6_FLAG == 0 { + 4 + 4 + } else { + 16 + 4 + }; + pos += 8 + 4 + 8 + ip_len; + pre_encode_buffer[pos..(pos + 8)] + .copy_from_slice(&msg_inner.store_timestamp().to_le_bytes()); + /* if self.enabled_append_prop_crc { + let check_size = msg_len - self.crc32_reserved_length as i32; + let mut tmp_buffer = pre_encode_buffer.clone(); + tmp_buffer.0.truncate(tmp_buffer.0.len() + check_size as usize); + let crc32 = util_all::crc32(&tmp_buffer.0); + tmp_buffer.0.extend_from_slice(&crc32.to_le_bytes()); + }*/ + + /* let begin_time_mills = self.default_message_store.now(); + self.message_store.get_perf_counter().start_tick("WRITE_MEMORY_TIME_MS");*/ + msg_inner.encoded_buff = pre_encode_buffer; + /* self.message_store.get_perf_counter().end_tick("WRITE_MEMORY_TIME_MS"); + msg_inner.set_encoded_buff(None);*/ + + /*if is_multi_dispatch_msg { + self.multi_dispatch.update_multi_queue_offset(msg_inner); + }*/ + + AppendMessageResult { + status: AppendMessageStatus::PutOk, + ..Default::default() + } + } + + fn do_append_batch( + &self, + file_from_offset: i64, + byte_buffer: &mut BytesMut, + max_blank: i32, + message_ext_batch: &MessageExtBatch, + put_message_context: &PutMessageContext, + ) -> AppendMessageResult { + todo!() + } + + /* fn do_append_batch( + &self, + file_from_offset: i64, + byte_buffer: &mut BytesMut, + max_blank: i32, + message_ext_batch: &MessageExtBatch, + put_message_context: &PutMessageContext, + ) -> AppendMessageResult { + todo!() + }*/ + /* fn do_append( &mut self, file_from_offset: i64, byte_buffer: &mut ByteBuffer, @@ -225,16 +326,5 @@ impl AppendMessageCallback for DefaultAppendMessageCallback { Some(message_num), )*/ AppendMessageResult::default() - } - - fn do_append_batch( - &self, - file_from_offset: i64, - byte_buffer: &mut [u8], - max_blank: i32, - message_ext_batch: &MessageExtBatch, - put_message_context: &PutMessageContext, - ) -> AppendMessageResult { - unimplemented!() - } + }*/ } diff --git a/rocketmq-store/src/base/message_result.rs b/rocketmq-store/src/base/message_result.rs index 2c2d74fa..8a3b979f 100644 --- a/rocketmq-store/src/base/message_result.rs +++ b/rocketmq-store/src/base/message_result.rs @@ -72,6 +72,14 @@ impl PutMessageResult { remote_put, } } + + pub fn new_default(put_message_status: PutMessageStatus) -> Self { + Self { + put_message_status, + append_message_result: None, + remote_put: false, + } + } } /// Represents the result of getting a message. diff --git a/rocketmq-store/src/consume_queue/mapped_file_queue.rs b/rocketmq-store/src/consume_queue/mapped_file_queue.rs index fdae1425..50c818e8 100644 --- a/rocketmq-store/src/consume_queue/mapped_file_queue.rs +++ b/rocketmq-store/src/consume_queue/mapped_file_queue.rs @@ -21,8 +21,7 @@ use log::warn; use tracing::info; use crate::{ - base::swappable::Swappable, - log_file::mapped_file::{default_impl::DefaultMappedFile, MappedFile}, + base::swappable::Swappable, log_file::mapped_file::default_impl_refactor::LocalMappedFile, services::allocate_mapped_file_service::AllocateMappedFileService, }; @@ -32,7 +31,7 @@ pub struct MappedFileQueue { pub(crate) mapped_file_size: u64, - pub(crate) mapped_files: Vec>, + pub(crate) mapped_files: Vec, pub(crate) allocate_mapped_file_service: AllocateMappedFileService, @@ -120,21 +119,25 @@ impl MappedFileQueue { } let mapped_file = - DefaultMappedFile::new(file.to_string_lossy().to_string(), self.mapped_file_size); + LocalMappedFile::new(file.to_string_lossy().to_string(), self.mapped_file_size); // Set wrote, flushed, committed positions for mapped_file - self.mapped_files.push(Box::new(mapped_file)); + self.mapped_files.push(mapped_file); info!("load {} OK", file.display()); } true } - pub fn get_last_mapped_file(&self) -> Option<&(dyn MappedFile + Send)> { + pub fn get_last_mapped_file_mut(&mut self) -> Option<&mut LocalMappedFile> { if self.mapped_files.is_empty() { return None; } - self.mapped_files.last().take().map(|value| value.as_ref()) + self.mapped_files.last_mut() + } + + pub fn get_last_mapped_file_mut_start_offset(&mut self) -> Option<&mut LocalMappedFile> { + unimplemented!() } } diff --git a/rocketmq-store/src/hook/put_message_hook.rs b/rocketmq-store/src/hook/put_message_hook.rs index 12ccb5e7..e72a6440 100644 --- a/rocketmq-store/src/hook/put_message_hook.rs +++ b/rocketmq-store/src/hook/put_message_hook.rs @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use rocketmq_common::common::message::message_batch::MessageExtBatch; +use rocketmq_common::common::message::message_single::MessageExt; use crate::base::message_result::PutMessageResult; @@ -33,7 +33,7 @@ pub trait PutMessageHook { /// # Returns /// /// The result of putting the message - fn execute_before_put_message(&self, msg: &MessageExtBatch) -> PutMessageResult; + fn execute_before_put_message(&self, msg: &MessageExt) -> Option; } /// Alias for `Arc`. diff --git a/rocketmq-store/src/log_file.rs b/rocketmq-store/src/log_file.rs index 6ef00511..607a6a56 100644 --- a/rocketmq-store/src/log_file.rs +++ b/rocketmq-store/src/log_file.rs @@ -14,17 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use std::future::Future; -use rocketmq_common::common::message::{ - message_batch::MessageExtBatch, message_single::MessageExtBrokerInner, -}; +use rocketmq_common::common::message::message_single::MessageExtBrokerInner; use crate::base::message_result::PutMessageResult; pub mod commit_log; pub mod mapped_file; +#[allow(async_fn_in_trait)] pub trait MessageStore { /// Load previously stored messages. /// @@ -38,210 +36,5 @@ pub trait MessageStore { /// Throws an `Exception` if there is any error. fn start(&mut self) -> Result<(), Box>; - /// Shutdown this message store. - //fn shutdown(&self); - - /// Destroy this message store. Generally, all persistent files should be removed after - /// invocation. - // fn destroy(&self); - - /// Store a message into the store in an async manner. The processor can process the next - /// request rather than wait for the result. When the result is completed, notify the client - /// in an async manner. - /// - /// # Arguments - /// - /// * `msg` - MessageInstance to store. - /// - /// # Returns - /// - /// A `Future` for the result of the store operation. - fn async_put_message( - &self, - msg: MessageExtBrokerInner, - ) -> impl Future { - async move { self.put_message(msg) } - } - - /// Store a batch of messages in an async manner. - /// - /// # Arguments - /// - /// * `message_ext_batch` - The message batch. - /// - /// # Returns - /// - /// A `Future` for the result of the store operation. - fn async_put_messages( - &self, - message_ext_batch: MessageExtBatch, - ) -> impl Future { - async move { self.put_messages(message_ext_batch) } - } - - /// Store a message into the store. - /// - /// # Arguments - /// - /// * `msg` - Message instance to store. - /// - /// # Returns - /// - /// Result of the store operation. - fn put_message(&self, msg: MessageExtBrokerInner) -> PutMessageResult; - - /// Store a batch of messages. - /// - /// # Arguments - /// - /// * `message_ext_batch` - Message batch. - /// - /// # Returns - /// - /// Result of storing batch messages. - fn put_messages(&self, message_ext_batch: MessageExtBatch) -> PutMessageResult; - - /// Query at most `max_msg_nums` messages belonging to `topic` at `queue_id` starting - /// from given `offset`. Resulting messages will further be screened using provided message - /// filter. - /// - /// # Arguments - /// - /// * `group` - Consumer group that launches this query. - /// * `topic` - Topic to query. - /// * `queue_id` - Queue ID to query. - /// * `offset` - Logical offset to start from. - /// * `max_msg_nums` - Maximum count of messages to query. - /// * `message_filter` - Message filter used to screen desired messages. - /// - /// # Returns - /// - /// Matched messages. - /*fn get_message( - &self, - group: &str, - topic: &str, - queue_id: i32, - offset: i64, - max_msg_nums: i32, - message_filter: impl MessageFilter, - ) -> GetMessageResult;*/ - - /// Asynchronous get message. - /// - /// # See - /// - /// [`get_message`](#method.get_message) - /// - /// # Arguments - /// - /// * `group` - Consumer group that launches this query. - /// * `topic` - Topic to query. - /// * `queue_id` - Queue ID to query. - /// * `offset` - Logical offset to start from. - /// * `max_msg_nums` - Maximum count of messages to query. - /// * `message_filter` - Message filter used to screen desired messages. - /// - /// # Returns - /// - /// Matched messages. - /* fn get_message_async( - &self, - group: &str, - topic: &str, - queue_id: i32, - offset: i64, - max_msg_nums: i32, - message_filter: impl MessageFilter, - ) -> impl Future;*/ - - /// Query at most `max_msg_nums` messages belonging to `topic` at `queue_id` starting - /// from given `offset`. Resulting messages will further be screened using provided message - /// filter. - /// - /// # Arguments - /// - /// * `group` - Consumer group that launches this query. - /// * `topic` - Topic to query. - /// * `queue_id` - Queue ID to query. - /// * `offset` - Logical offset to start from. - /// * `max_msg_nums` - Maximum count of messages to query. - /// * `max_total_msg_size` - Maximum total msg size of the messages. - /// * `message_filter` - Message filter used to screen desired messages. - /// - /// # Returns - /// - /// Matched messages. - /*fn get_message_with_size( - &self, - group: &str, - topic: &str, - queue_id: i32, - offset: i64, - max_msg_nums: i32, - max_total_msg_size: i32, - message_filter: impl MessageFilter, - ) -> GetMessageResult;*/ - - /// Asynchronous get message. - /// - /// # See - /// - /// [`get_message_with_size`](#method.get_message_with_size) - /// - /// # Arguments - /// - /// * `group` - Consumer group that launches this query. - /// * `topic` - Topic to query. - /// * `queue_id` - Queue ID to query. - /// * `offset` - Logical offset to start from. - /// * `max_msg_nums` - Maximum count of messages to query. - /// * `max_total_msg_size` - Maximum total msg size of the messages. - /// * `message_filter` - Message filter used to screen desired messages. - /// - /// # Returns - /// - /// Matched messages. - /*fn get_message_with_size_async( - &self, - group: &str, - topic: &str, - queue_id: i32, - offset: i64, - max_msg_nums: i32, - max_total_msg_size: i32, - message_filter: impl MessageFilter, - ) -> impl Future;*/ - - /// Get the maximum offset of the topic queue. - /// - /// # Arguments - /// - /// * `topic` - Topic name. - /// * `queue_id` - Queue ID. - /// - /// # Returns - /// - /// Maximum offset at present. - /* fn get_max_offset_in_queue(&self, topic: &str, queue_id: i32) -> i64; */ - - /// Get the maximum offset of the topic queue. - /// - /// # Arguments - /// - /// * `topic` - Topic name. - /// * `queue_id` - Queue ID. - /// * `committed` - Return the max offset in ConsumeQueue if true, or the max offset in - /// CommitLog if false. - /// - /// # Returns - /// - /// Maximum offset at present. - - fn get_max_offset_in_queue_with_commit( - &self, - topic: &str, - queue_id: i32, - committed: bool, - ) -> i64; + async fn put_message(&self, msg: MessageExtBrokerInner) -> PutMessageResult; } diff --git a/rocketmq-store/src/log_file/commit_log.rs b/rocketmq-store/src/log_file/commit_log.rs index a1a0d499..287bb92e 100644 --- a/rocketmq-store/src/log_file/commit_log.rs +++ b/rocketmq-store/src/log_file/commit_log.rs @@ -25,11 +25,17 @@ use rocketmq_common::{ utils::time_utils, CRC32Utils::crc32, }; +use tokio::runtime::Handle; use crate::{ - base::{message_result::PutMessageResult, swappable::Swappable}, + base::{ + append_message_callback::{AppendMessageCallback, DefaultAppendMessageCallback}, + message_result::PutMessageResult, + swappable::Swappable, + }, config::message_store_config::MessageStoreConfig, consume_queue::mapped_file_queue::MappedFileQueue, + message_encoder::message_ext_encoder::MessageExtEncoder, }; // Message's MAGIC CODE daa320a7 @@ -44,24 +50,35 @@ pub const CRC32_RESERVED_LEN: i32 = (MessageConst::PROPERTY_CRC32.len() + 1 + 10 #[derive(Default)] pub struct CommitLog { - pub(crate) mapped_file_queue: MappedFileQueue, - pub(crate) message_store_config: Arc, - pub(crate) enabled_append_prop_crc: bool, + mapped_file_queue: Arc>, + message_store_config: Arc, + enabled_append_prop_crc: bool, +} + +impl CommitLog { + pub fn new(message_store_config: Arc) -> Self { + Self { + mapped_file_queue: Default::default(), + message_store_config, + enabled_append_prop_crc: false, + } + } } impl CommitLog { pub fn load(&mut self) -> bool { - self.mapped_file_queue.load() + let arc = self.mapped_file_queue.clone(); + Handle::current().block_on(async move { arc.write().await.load() }) } - async fn async_put_message(&self, msg: MessageExtBrokerInner) -> PutMessageResult { + pub async fn put_message(&self, msg: MessageExtBrokerInner) -> PutMessageResult { let mut msg = msg; if !self.message_store_config.duplication_enable { msg.message_ext_inner.store_timestamp = time_utils::get_current_millis() as i64; } msg.message_ext_inner.body_crc = crc32( msg.message_ext_inner - .message_inner + .message .body .clone() .expect("REASON") @@ -92,6 +109,26 @@ impl CommitLog { msg.with_store_host_v6_flag(); } + let mut encoder = MessageExtEncoder::new(self.message_store_config.clone()); + let put_message_result = encoder.encode(&msg); + if let Some(result) = put_message_result { + return result; + } + msg.encoded_buff = encoder.byte_buf(); + + let mut mapped_file_guard = self.mapped_file_queue.write().await; + let mapped_file = match mapped_file_guard.get_last_mapped_file_mut() { + None => mapped_file_guard + .get_last_mapped_file_mut_start_offset() + .unwrap(), + Some(mapped_file) => mapped_file, + }; + + let mut append_message_callback = + DefaultAppendMessageCallback::new(self.message_store_config.clone()); + let _result = + append_message_callback.do_append(mapped_file.file_from_offset() as i64, 0, &mut msg); + mapped_file.append_data(msg.encoded_buff.clone(), false); PutMessageResult::default() } diff --git a/rocketmq-store/src/log_file/mapped_file.rs b/rocketmq-store/src/log_file/mapped_file.rs index c4e00b75..d97fd5ab 100644 --- a/rocketmq-store/src/log_file/mapped_file.rs +++ b/rocketmq-store/src/log_file/mapped_file.rs @@ -32,6 +32,7 @@ use crate::{ }; pub(crate) mod default_impl; +pub(crate) mod default_impl_refactor; pub trait MappedFile { /// Returns the file name of the `MappedFile`. diff --git a/rocketmq-store/src/log_file/mapped_file/default_impl.rs b/rocketmq-store/src/log_file/mapped_file/default_impl.rs index 3487a9f1..c22a25fe 100644 --- a/rocketmq-store/src/log_file/mapped_file/default_impl.rs +++ b/rocketmq-store/src/log_file/mapped_file/default_impl.rs @@ -34,7 +34,7 @@ use crate::{ compaction_append_msg_callback::CompactionAppendMsgCallback, message_result::AppendMessageResult, message_status_enum::AppendMessageStatus, put_message_context::PutMessageContext, select_result::SelectMappedBufferResult, - transient_store_pool::TransientStorePool, ByteBuffer, + transient_store_pool::TransientStorePool, }, config::flush_disk_type::FlushDiskType, log_file::mapped_file::MappedFile, @@ -387,13 +387,14 @@ impl DefaultMappedFile { }; } //do append to the Mapped file(Default is local file) - append_message_callback.do_append( + /* append_message_callback.do_append( self.file_from_offset, // file start logic address offset &mut ByteBuffer::new(&mut self.mmapped_file, current_write_pos as i64), (self.file_size - current_write_pos as u64) as i32, message, put_message_context, - ) + )*/ + AppendMessageResult::default() } fn append_messages_inner( diff --git a/rocketmq-store/src/log_file/mapped_file/default_impl_refactor.rs b/rocketmq-store/src/log_file/mapped_file/default_impl_refactor.rs new file mode 100644 index 00000000..4e4e6627 --- /dev/null +++ b/rocketmq-store/src/log_file/mapped_file/default_impl_refactor.rs @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::{ + fs::{File, OpenOptions}, + io::Write, + path::PathBuf, + sync::atomic::{AtomicI32, Ordering}, +}; + +use bytes::BytesMut; +use memmap2::MmapMut; + +pub struct LocalMappedFile { + //file information + file_name: String, + file_size: u64, + file: File, + //file data information + wrote_position: AtomicI32, + committed_position: AtomicI32, + flushed_position: AtomicI32, + + file_from_offset: u64, + + mmapped_file: MmapMut, +} + +impl LocalMappedFile { + pub fn new(file_name: String, file_size: u64) -> Self { + let path_buf = PathBuf::from(file_name.clone()); + let file = OpenOptions::new() + .read(true) + .write(true) + //.create(true) + .open(&path_buf) + .unwrap(); + file.set_len(file_size).unwrap(); + + let mmap = unsafe { MmapMut::map_mut(&file).unwrap() }; + Self { + file_name, + file_size, + file, + wrote_position: AtomicI32::new(0), + committed_position: AtomicI32::new(0), + flushed_position: AtomicI32::new(0), + file_from_offset: path_buf + .file_name() + .unwrap() + .to_string_lossy() + .to_string() + .parse::() + .unwrap(), + mmapped_file: mmap, + } + } + + pub fn file_size(&self) -> u64 { + self.file_size + } + pub fn wrote_position(&self) -> i32 { + self.wrote_position.load(Ordering::Relaxed) + } + pub fn committed_position(&self) -> i32 { + self.committed_position.load(Ordering::Relaxed) + } + pub fn flushed_position(&self) -> i32 { + self.flushed_position.load(Ordering::Relaxed) + } + + pub fn file_from_offset(&self) -> u64 { + self.file_from_offset + } +} + +impl LocalMappedFile { + pub fn append_data(&mut self, data: BytesMut, sync: bool) -> bool { + let current_pos = self.wrote_position.load(Ordering::SeqCst) as usize; + if current_pos + data.len() > self.file_size as usize { + return false; + } + let mut write_success = if (&mut self.mmapped_file[current_pos..]) + .write_all(data.as_ref()) + .is_ok() + { + self.wrote_position + .store((current_pos + data.len()) as i32, Ordering::SeqCst); + true + } else { + false + }; + + write_success &= if sync { + self.mmapped_file.flush().is_ok() + } else { + self.mmapped_file.flush_async().is_ok() + }; + + write_success + } +} + +#[cfg(test)] +mod tests { + + // use super::*; + + #[test] + pub fn test_local_mapped_file() { + // let mut file = LocalMappedFile::new( + // "C:\\Users\\ljbmx\\Desktop\\EventMesh\\0000".to_string(), + // 1024, + // ); + // let data = Bytes::from("ttt"); + // assert!(file.append_data(data, true)); + } +} diff --git a/rocketmq-store/src/message_encoder/message_ext_encoder.rs b/rocketmq-store/src/message_encoder/message_ext_encoder.rs index 31e244da..a53e9b58 100644 --- a/rocketmq-store/src/message_encoder/message_ext_encoder.rs +++ b/rocketmq-store/src/message_encoder/message_ext_encoder.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use bytes::{Buf, BufMut}; -use log::warn; use rocketmq_common::{ common::{ message::{ @@ -27,6 +26,7 @@ use rocketmq_common::{ }, MessageDecoder, }; +use tracing::warn; use crate::{ base::{ @@ -155,10 +155,8 @@ impl MessageExtEncoder { "message body size exceeded, msg body size: {}, maxMessageSize: {}", body_length, self.max_message_body_size ); - return Some(PutMessageResult::new( + return Some(PutMessageResult::new_default( PutMessageStatus::MessageIllegal, - None, - false, )); } @@ -253,7 +251,7 @@ impl MessageExtEncoder { + self.crc32_reserved_length as usize; if properties_length > i16::MAX as usize { - println!( + warn!( "putMessage message properties length too long. length={}", properties_length ); @@ -351,7 +349,9 @@ impl MessageExtEncoder { // 15 BODY self.byte_buf.put_i32(body_length.try_into().unwrap()); if let Some(body) = msg_inner.body() { - self.byte_buf.put(body); + if body_length > 0 { + self.byte_buf.put(body); + } } // 16 TOPIC @@ -360,7 +360,7 @@ impl MessageExtEncoder { } else { self.byte_buf.put_u8(topic_length as u8); } - self.byte_buf.extend_from_slice(topic_data); + self.byte_buf.put(topic_data); // 17 PROPERTIES self.byte_buf.put_u16(properties_length as u16); @@ -371,9 +371,7 @@ impl MessageExtEncoder { self.byte_buf .put_u8(MessageDecoder::PROPERTY_SEPARATOR as u32 as u8); } - // 18 CRC32 - None } @@ -524,6 +522,9 @@ impl MessageExtEncoder { }; self.byte_buf.resize(self.max_message_size as usize, 0); } + pub fn byte_buf(&mut self) -> bytes::BytesMut { + self.byte_buf.split() + } } #[cfg(test)] diff --git a/rocketmq-store/src/message_store/local_file_store.rs b/rocketmq-store/src/message_store/local_file_store.rs index e111d8c5..de6a7dcc 100644 --- a/rocketmq-store/src/message_store/local_file_store.rs +++ b/rocketmq-store/src/message_store/local_file_store.rs @@ -15,138 +15,95 @@ * limitations under the License. */ -use std::{collections::HashMap, error::Error}; +use std::{collections::HashMap, error::Error, sync::Arc}; use rocketmq_common::common::{ config::TopicConfig, - message::{ - message_batch::MessageExtBatch, message_single::MessageExtBrokerInner, MessageConst, - }, + message::{message_single::MessageExtBrokerInner, MessageConst}, sys_flag::message_sys_flag::MessageSysFlag, }; +use rocketmq_runtime::RocketMQRuntime; +use tracing::warn; use crate::{ base::{message_result::PutMessageResult, message_status_enum::PutMessageStatus}, - config::{message_store_config::MessageStoreConfig, store_path_config_helper}, + config::message_store_config::MessageStoreConfig, hook::put_message_hook::BoxedPutMessageHook, - log_file::MessageStore, + log_file::{commit_log::CommitLog, MessageStore}, }; ///Using local files to store message data, which is also the default method. #[derive(Default)] pub struct LocalFileMessageStore { - pub message_store_config: MessageStoreConfig, - pub put_message_hook_list: Vec, - pub topic_config_table: Option>, + message_store_config: Arc, + put_message_hook_list: Vec, + topic_config_table: HashMap, + message_store_runtime: Option, + commit_log: Arc, } -impl MessageStoreConfig { - pub fn new() -> Self { - MessageStoreConfig::default() +impl LocalFileMessageStore { + pub fn new(message_store_config: Arc) -> Self { + Self { + message_store_config: message_store_config.clone(), + put_message_hook_list: vec![], + topic_config_table: HashMap::new(), + message_store_runtime: Some(RocketMQRuntime::new_multi(10, "message-store-thread")), + commit_log: Arc::new(CommitLog::new(message_store_config)), + } } } -impl MessageStore for LocalFileMessageStore { - fn load(&mut self) -> bool { - // let mut reuslt = true; - //check abort file exists - let last_exit_ok = self.is_temp_file_exist(); - - tracing::info!( - "last shutdown {}, store path root dir: {}", - if last_exit_ok { - "normally" - } else { - "abnormally" - }, - self.message_store_config.store_path_root_dir.clone() - ); - //load commit log file - - // load consume queue file - - // if compaction is enabled, load compaction file - - // check point and confirm the max offset - - true - } - - fn start(&mut self) -> Result<(), Box> { - Ok(()) +impl Drop for LocalFileMessageStore { + fn drop(&mut self) { + if let Some(runtime) = self.message_store_runtime.take() { + runtime.shutdown(); + } } +} - async fn async_put_messages(&self, message_ext_batch: MessageExtBatch) -> PutMessageResult { - for put_message_hook in self.put_message_hook_list.iter() { - let _handle_result = put_message_hook.execute_before_put_message(&message_ext_batch); +impl LocalFileMessageStore { + pub fn get_topic_config(&self, topic: &str) -> Option { + if self.topic_config_table.is_empty() { + return None; } - PutMessageResult::default() + self.topic_config_table.get(topic).cloned() } +} - fn put_message(&self, _msg: MessageExtBrokerInner) -> PutMessageResult { +impl MessageStore for LocalFileMessageStore { + fn load(&mut self) -> bool { todo!() } - fn get_max_offset_in_queue_with_commit( - &self, - _topic: &str, - _queue_id: i32, - _committed: bool, - ) -> i64 { + fn start(&mut self) -> Result<(), Box> { todo!() } - async fn async_put_message(&self, msg: MessageExtBrokerInner) -> PutMessageResult { - /* for put_message_hook in self.put_message_hook_list.iter() { - let _handle_result = put_message_hook.execute_before_put_message(&message_ext_batch); - }*/ + async fn put_message(&self, msg: MessageExtBrokerInner) -> PutMessageResult { + for hook in self.put_message_hook_list.iter() { + if let Some(result) = hook.execute_before_put_message(&msg.message_ext_inner) { + return result; + } + } + if msg .message_ext_inner - .message_inner .properties() .contains_key(MessageConst::PROPERTY_INNER_NUM) - && MessageSysFlag::check( - msg.message_ext_inner.sys_flag, - MessageSysFlag::TRANSACTION_PREPARED_TYPE, - ) + && !MessageSysFlag::check(msg.sys_flag(), MessageSysFlag::INNER_BATCH_FLAG) { - return PutMessageResult::new(PutMessageStatus::MessageIllegal, None, false); + warn!( + "[BUG]The message had property {} but is not an inner batch", + MessageConst::PROPERTY_INNER_NUM + ); + return PutMessageResult::new_default(PutMessageStatus::MessageIllegal); } - if MessageSysFlag::check( - msg.message_ext_inner.sys_flag, - MessageSysFlag::INNER_BATCH_FLAG, - ) { - self.get_topic_config(msg.message_ext_inner.message_inner.topic()); + if MessageSysFlag::check(msg.sys_flag(), MessageSysFlag::INNER_BATCH_FLAG) { + let _topic_config = self.get_topic_config(msg.topic()); + //todo } - todo!() - } - - fn put_messages(&self, _message_ext_batch: MessageExtBatch) -> PutMessageResult { - todo!() - } -} - -impl LocalFileMessageStore { - pub fn new(message_store_config: MessageStoreConfig) -> Self { - LocalFileMessageStore { - message_store_config, - put_message_hook_list: vec![], - topic_config_table: None, - } - } -} - -// private method impl -impl LocalFileMessageStore { - fn is_temp_file_exist(&self) -> bool { - let abort_file_path = store_path_config_helper::get_abort_file( - &self.message_store_config.store_path_root_dir, - ); - std::path::Path::new(&abort_file_path).exists() - } - - pub fn get_topic_config(&self, topic: &str) -> Option<&TopicConfig> { - self.topic_config_table.as_ref()?.get(topic) + self.commit_log.put_message(msg).await } }