Skip to content

Commit

Permalink
[ISSUE mxsm#334]🚧Optimize commitlog recover-2 (mxsm#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored May 1, 2024
1 parent 523ce23 commit fa79f26
Show file tree
Hide file tree
Showing 20 changed files with 1,416 additions and 636 deletions.
2 changes: 1 addition & 1 deletion rocketmq-cli/src/content_show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn print_content(from: Option<u32>, to: Option<u32>, path: Option<PathBuf>)
let path_buf = path.unwrap().into_os_string();
let file_metadata = fs::metadata(path_buf.clone()).unwrap();
println!("file size: {}B", file_metadata.len());
let mut mapped_file = LocalMappedFile::new(
let mapped_file = LocalMappedFile::new(
path_buf.to_os_string().to_string_lossy().to_string(),
file_metadata.len(),
);
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-common/src/common/attribute/cq_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::str::FromStr;

use anyhow::anyhow;

#[derive(PartialEq, Default, Debug)]
#[derive(PartialEq, Default, Debug, Copy, Clone)]
pub enum CQType {
#[default]
SimpleCQ,
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-common/src/common/broker/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub struct BrokerConfig {
pub is_in_broker_container: bool,
pub commercial_size_per_msg: i32,
pub recover_concurrently: bool,
pub duplication_enable: bool,
}

impl Default for BrokerConfig {
Expand Down Expand Up @@ -148,6 +149,7 @@ impl Default for BrokerConfig {
is_in_broker_container: false,
commercial_size_per_msg: 4 * 1024,
recover_concurrently: false,
duplication_enable: false,
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions rocketmq-store/src/base/commit_log_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use crate::base::dispatch_request::DispatchRequest;

#[trait_variant::make(CommitLogDispatcher:Send)]
pub trait RocketMQCommitLogDispatcher: Clone {
async fn dispatch(&mut self, dispatch_request: &DispatchRequest);
pub trait CommitLogDispatcher: Send + Clone {
fn dispatch(&mut self, dispatch_request: &DispatchRequest);
}
37 changes: 36 additions & 1 deletion rocketmq-store/src/base/dispatch_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
* limitations under the License.
*/

use std::collections::HashMap;
use std::{
collections::HashMap,
fmt::{Display, Formatter},
};

#[derive(Debug, Default)]
pub struct DispatchRequest {
Expand All @@ -39,3 +42,35 @@ pub struct DispatchRequest {
pub next_reput_from_offset: i64,
pub offset_id: String,
}

impl Display for DispatchRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DispatchRequest {{ topic: {}, queue_id: {}, commit_log_offset: {}, msg_size: {}, \
tags_code: {}, store_timestamp: {}, consume_queue_offset: {}, keys: {}, success: {}, \
uniq_key: {:?}, sys_flag: {}, prepared_transaction_offset: {}, properties_map: {:?}, \
bit_map: {:?}, buffer_size: {}, msg_base_offset: {}, batch_size: {}, \
next_reput_from_offset: {}, offset_id: {} }}",
self.topic,
self.queue_id,
self.commit_log_offset,
self.msg_size,
self.tags_code,
self.store_timestamp,
self.consume_queue_offset,
self.keys,
self.success,
self.uniq_key,
self.sys_flag,
self.prepared_transaction_offset,
self.properties_map,
self.bit_map,
self.buffer_size,
self.msg_base_offset,
self.batch_size,
self.next_reput_from_offset,
self.offset_id
)
}
}
11 changes: 9 additions & 2 deletions rocketmq-store/src/config/message_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use serde::Deserialize;
use crate::{
base::store_enum::StoreType,
config::{broker_role::BrokerRole, flush_disk_type::FlushDiskType},
queue::single_consume_queue::CQ_STORE_UNIT_SIZE,
};

lazy_static! {
Expand Down Expand Up @@ -251,9 +252,9 @@ impl Default for MessageStoreConfig {
store_type: Default::default(),
mapped_file_size_consume_queue: 0,
enable_consume_queue_ext: false,
mapped_file_size_consume_queue_ext: 0,
mapped_file_size_consume_queue_ext: 48 * 1024 * 1024,
mapper_file_size_batch_consume_queue: 0,
bit_map_length_consume_queue_ext: 0,
bit_map_length_consume_queue_ext: 64,
flush_interval_commit_log: 0,
commit_interval_commit_log: 0,
max_recovery_commit_log_files: 0,
Expand Down Expand Up @@ -404,4 +405,10 @@ impl MessageStoreConfig {
pub fn is_enable_rocksdb_store(&self) -> bool {
self.store_type == StoreType::RocksDB
}

pub fn get_mapped_file_size_consume_queue(&self) -> i32 {
let factor = (self.mapped_file_size_consume_queue as f64 / (CQ_STORE_UNIT_SIZE as f64))
.ceil() as i32;
factor * CQ_STORE_UNIT_SIZE
}
}
53 changes: 39 additions & 14 deletions rocketmq-store/src/consume_queue/mapped_file_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::{

use log::warn;
use rocketmq_common::UtilAll::offset_to_file_name;
use tokio::sync::Mutex;
use tracing::info;

use crate::{
Expand All @@ -37,7 +36,8 @@ pub struct MappedFileQueue {

pub(crate) mapped_file_size: u64,
//pub(crate) mapped_files: Arc<Mutex<Vec<LocalMappedFile>>>,
pub(crate) mapped_files: Vec<Arc<Mutex<LocalMappedFile>>>,
//pub(crate) mapped_files: Vec<Arc<Mutex<LocalMappedFile>>>,
pub(crate) mapped_files: Vec<Arc<LocalMappedFile>>,
// pub(crate) mapped_files: Vec<LocalMappedFile>,
pub(crate) allocate_mapped_file_service: Option<AllocateMappedFileService>,

Expand Down Expand Up @@ -130,7 +130,7 @@ impl MappedFileQueue {
LocalMappedFile::new(file.to_string_lossy().to_string(), self.mapped_file_size);
// Set wrote, flushed, committed positions for mapped_file

self.mapped_files.push(Arc::new(Mutex::new(mapped_file)));
self.mapped_files.push(Arc::new(mapped_file));
// self.mapped_files
// .push(mapped_file);
info!("load {} OK", file.display());
Expand All @@ -153,7 +153,7 @@ impl MappedFileQueue {
// self.mapped_files.last()
// }

pub fn get_last_mapped_file(&self) -> Option<Arc<Mutex<LocalMappedFile>>> {
pub fn get_last_mapped_file(&self) -> Option<Arc<LocalMappedFile>> {
if self.mapped_files.is_empty() {
return None;
}
Expand All @@ -164,7 +164,7 @@ impl MappedFileQueue {
&mut self,
start_offset: u64,
need_create: bool,
) -> Option<Arc<Mutex<LocalMappedFile>>> {
) -> Option<Arc<LocalMappedFile>> {
let mut create_offset = -1i64;
let file_size = self.mapped_file_size as i64;
let mapped_file_last = self.get_last_mapped_file();
Expand All @@ -173,8 +173,8 @@ impl MappedFileQueue {
create_offset = start_offset as i64 - (start_offset as i64 % file_size);
}
Some(ref value) => {
if value.lock().await.is_full() {
create_offset = value.lock().await.get_file_from_offset() as i64 + file_size
if value.is_full() {
create_offset = value.get_file_from_offset() as i64 + file_size
}
}
}
Expand All @@ -184,10 +184,7 @@ impl MappedFileQueue {
mapped_file_last
}

pub fn try_create_mapped_file(
&mut self,
create_offset: u64,
) -> Option<Arc<Mutex<LocalMappedFile>>> {
pub fn try_create_mapped_file(&mut self, create_offset: u64) -> Option<Arc<LocalMappedFile>> {
let next_file_path =
PathBuf::from(self.store_path.clone()).join(offset_to_file_name(create_offset));
let next_next_file_path = PathBuf::from(self.store_path.clone())
Expand All @@ -199,7 +196,7 @@ impl MappedFileQueue {
&mut self,
next_file_path: PathBuf,
_next_next_file_path: PathBuf,
) -> Option<Arc<Mutex<LocalMappedFile>>> {
) -> Option<Arc<LocalMappedFile>> {
let mut mapped_file = match self.allocate_mapped_file_service {
None => LocalMappedFile::new(
next_file_path.to_string_lossy().to_string(),
Expand All @@ -213,12 +210,12 @@ impl MappedFileQueue {
if self.mapped_files.is_empty() {
mapped_file.set_first_create_in_queue(true);
}
let inner = Arc::new(Mutex::new(mapped_file));
let inner = Arc::new(mapped_file);
self.mapped_files.push(inner.clone());
Some(inner)
}

pub fn get_mapped_files(&self) -> Vec<Arc<Mutex<LocalMappedFile>>> {
pub fn get_mapped_files(&self) -> Vec<Arc<LocalMappedFile>> {
self.mapped_files.to_vec()
}

Expand All @@ -231,6 +228,34 @@ impl MappedFileQueue {
}

pub fn truncate_dirty_files(&mut self, offset: i64) {}

pub fn get_max_offset(&self) -> i64 {
/*let handle = Handle::current();
let mapped_file = self.get_last_mapped_file();
std::thread::spawn(move || {
handle.block_on(async move {
match mapped_file {
None => 0,
Some(value) => {
let file = value.lock().await;
file.get_file_from_offset() as i64 + file.get_read_position() as i64
}
}
})
})
.join()
.unwrap()*/
match self.get_last_mapped_file() {
None => 0,
Some(file) => file.get_file_from_offset() as i64 + file.get_read_position() as i64,
}
}

pub fn delete_last_mapped_file(&self) {
unimplemented!()
}

pub(crate) fn delete_expired_file(&self, files: Vec<Option<Arc<LocalMappedFile>>>) {}
}

#[cfg(test)]
Expand Down
16 changes: 4 additions & 12 deletions rocketmq-store/src/index/index_dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

use std::sync::Arc;

use tokio::sync::Mutex;

use crate::{
base::{commit_log_dispatcher::CommitLogDispatcher, dispatch_request::DispatchRequest},
config::message_store_config::MessageStoreConfig,
Expand All @@ -27,15 +25,12 @@ use crate::{

#[derive(Clone)]
pub struct CommitLogDispatcherBuildIndex {
index_service: Arc<Mutex<IndexService>>,
index_service: IndexService,
message_store_config: Arc<MessageStoreConfig>,
}

impl CommitLogDispatcherBuildIndex {
pub fn new(
index_service: Arc<Mutex<IndexService>>,
message_store_config: Arc<MessageStoreConfig>,
) -> Self {
pub fn new(index_service: IndexService, message_store_config: Arc<MessageStoreConfig>) -> Self {
Self {
index_service,
message_store_config,
Expand All @@ -44,12 +39,9 @@ impl CommitLogDispatcherBuildIndex {
}

impl CommitLogDispatcher for CommitLogDispatcherBuildIndex {
async fn dispatch(&mut self, dispatch_request: &DispatchRequest) {
fn dispatch(&mut self, dispatch_request: &DispatchRequest) {
if self.message_store_config.message_index_enable {
self.index_service
.lock()
.await
.build_index(dispatch_request);
self.index_service.build_index(dispatch_request);
}
}
}
Loading

0 comments on commit fa79f26

Please sign in to comment.