Skip to content

Commit

Permalink
add impl PackType
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuxiujia committed Jul 29, 2024
1 parent e76750b commit ed45c7f
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions src/plugin/file_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub trait Packer: Send + Sync {

/// is can do pack?
pub trait CanPack: Send {
fn is(&mut self, temp_size: usize, arg: &FastLogRecord) -> bool;
fn is(&mut self, temp_size: usize, arg: &FastLogRecord) -> Option<DateTime>;
}

/// keep logs, for example keep by log num or keep by log create time.
Expand Down Expand Up @@ -178,22 +178,23 @@ pub enum PackType {
}

impl CanPack for PackType {
fn is(&mut self, temp_size: usize, arg: &FastLogRecord) -> bool {
fn is(&mut self, temp_size: usize, arg: &FastLogRecord) -> Option<DateTime> {
return match self {
PackType::ByDate(date_time) => {
let dt = fastdate::DateTime::from_system_time(arg.now, fastdate::offset_sec());
if dt.day() > date_time.day() {
let last_time = date_time.clone();
*date_time = dt;
true
Some(last_time)
} else {
false
None
}
}
PackType::BySize(limit) => {
if temp_size >= limit.get_len() {
true
Some(DateTime::now())
} else {
false
None
}
}
};
Expand Down Expand Up @@ -266,13 +267,12 @@ impl FileSplitAppender {
})
}
/// send data make an pack,and truncate data when finish.
pub fn send_pack(&self, time: SystemTime, wg: Option<WaitGroup>) {
pub fn send_pack(&self, date: DateTime, wg: Option<WaitGroup>) {
let mut sp = "";
if !self.dir_path.is_empty() && !self.dir_path.ends_with("/") {
sp = "/";
}
let first_file_path = format!("{}{}{}", self.dir_path, sp, &self.temp_name);
let date = DateTime::from_system_time(time, fastdate::offset_sec());
let new_log_name = self
.packer
.new_data_log_name(&first_file_path, date);
Expand Down Expand Up @@ -400,7 +400,7 @@ impl LogAppender for FileSplitAppender {
let current_temp_size = self.temp_bytes.load(Ordering::Relaxed)
+ temp.as_bytes().len()
+ x.formated.as_bytes().len();
if self.is_pack.is(current_temp_size, x) {
if let Some(pack_time) = self.is_pack.is(current_temp_size, x) {
self.temp_bytes.fetch_add(
{
let w = self.file.write(temp.as_bytes());
Expand All @@ -413,14 +413,14 @@ impl LogAppender for FileSplitAppender {
Ordering::SeqCst,
);
temp.clear();
self.send_pack(x.now, None);
self.send_pack(pack_time, None);
}
temp.push_str(x.formated.as_str());
}
Command::CommandExit => {}
Command::CommandFlush(ref w) => {
let current_temp_size = self.temp_bytes.load(Ordering::Relaxed);
if self.is_pack.is(current_temp_size, x) {
if let Some(pack_time) = self.is_pack.is(current_temp_size, x) {
self.temp_bytes.fetch_add(
{
let w = self.file.write(temp.as_bytes());
Expand All @@ -433,7 +433,7 @@ impl LogAppender for FileSplitAppender {
Ordering::SeqCst,
);
temp.clear();
self.send_pack(x.now, Some(w.clone()));
self.send_pack(pack_time, Some(w.clone()));
}
}
}
Expand Down

0 comments on commit ed45c7f

Please sign in to comment.