Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.2.307 hotfix 20240430 #20

Open
wants to merge 28 commits into
base: 1.2.307-opendal
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5bb9eb4
hack need_stats_provider skip column_statistics_provider
yufan022 Mar 21, 2024
0dcc470
fix(query): fix attach_query_str
sundy-li Mar 23, 2024
6f503f0
fix(query): fix attach_query_str
yufan022 Mar 25, 2024
8dff2e0
fix(stateless-test): change mock_s3 to mock_aws
yufan022 Mar 25, 2024
e582031
refactor: StringType::Scalar: `Vec<u8>` -> `String` (#14349)
andylokandy Jan 22, 2024
2711d85
chore(query): support domain contains in string type (#15023)
sundy-li Mar 20, 2024
5d74107
feat: support outer to inner join (#14401)
xudong963 Jan 22, 2024
2363463
chore: Forbidden non equal merge into distributed (#14426)
JackTan25 Jan 24, 2024
c545e94
feat: allow to config task session parameters. (#14446)
ZhiHanZ Jan 24, 2024
ebf04fd
refactor(executor): refactor processor profiling (#14377)
zhang2014 Jan 24, 2024
fc8d81e
chore: support mark join commutation and fix left mark join (#14455)
xudong963 Jan 25, 2024
6020a34
refactor(executor): remove old profile framework (#14468)
zhang2014 Jan 25, 2024
390e924
feat: pushdown limit to single window (#14460)
ariesdevil Jan 25, 2024
ec9b545
refactor: replace `OneTable` with `DummyTableScan` (#14461)
xudong963 Jan 25, 2024
f805182
feat(planner): support predicates move around (#14338)
Dousir9 Jan 27, 2024
98a221f
feat: support single join to inner join (#14442)
xudong963 Feb 1, 2024
33b870a
chore: refactor pattern plan with `Matcher` (#14549)
leiysky Feb 1, 2024
9b1f1e7
fix: runtime filters add to wrong table (#14552)
xudong963 Feb 1, 2024
6e3fcb2
feat(query): support early filtering for more join types (#14525)
Dousir9 Feb 2, 2024
16726fe
chore: make runtime filter for broadcast rework (#14559)
xudong963 Feb 2, 2024
a01ee98
chore: refine hash join code (#14571)
Dousir9 Feb 5, 2024
30f4992
chore: add table index to explain join (#14632)
xudong963 Feb 6, 2024
840796b
chore(query): improve hash join code (#14614)
Dousir9 Feb 7, 2024
46dac82
chore: optimize q17 join order (#14624)
xudong963 Feb 19, 2024
d5eaebd
refactor: enables the bloom runtime filter to be turned on adaptively…
xudong963 Feb 19, 2024
cc47269
feat(planner): improve push down filter join (#14872)
Dousir9 Mar 27, 2024
3d8d72f
fix(query): fix decimal op loss precision (#15313)
sundy-li Apr 23, 2024
56bc747
chore(planner): fix can_filter_null (#15298)
Dousir9 Apr 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
22 changes: 9 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ members = [
"src/query/pipeline/sinks",
"src/query/pipeline/sources",
"src/query/pipeline/transforms",
"src/query/profile",
"src/query/settings",
"src/query/sql",
"src/query/storages/common/blocks",
Expand Down
5 changes: 5 additions & 0 deletions src/common/cloud_control/proto/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ message CreateTaskRequest {
// DAG
repeated string after = 11; //
optional string when_condition = 12;
map<string, string> session_parameters = 13;
}

message TaskError {
Expand Down Expand Up @@ -90,6 +91,7 @@ message Task {
optional string last_suspended_at = 16;
repeated string after = 17;
optional string when_condition = 18;
map<string, string> session_parameters = 19;
}


Expand Down Expand Up @@ -134,6 +136,8 @@ message AlterTaskRequest {
optional string when_condition = 11;
repeated string add_after = 12;
repeated string remove_after = 13;
bool set_session_parameters = 14;
map<string, string> session_parameters = 15;
}

message AlterTaskResponse {
Expand Down Expand Up @@ -179,6 +183,7 @@ message TaskRun {
string query_id = 17;
string condition_text = 18;
string root_task_id = 19;
map<string, string> session_parameters = 20;
}

message ShowTaskRunsResponse {
Expand Down
5 changes: 5 additions & 0 deletions src/common/cloud_control/src/task_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::fmt::Display;
use std::fmt::Formatter;

Expand Down Expand Up @@ -77,6 +78,7 @@ pub struct Task {
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub last_suspended_at: Option<DateTime<Utc>>,
pub session_params: BTreeMap<String, String>,
}

pub fn format_schedule_options(s: &ScheduleOptions) -> Result<String> {
Expand Down Expand Up @@ -203,6 +205,7 @@ impl TryFrom<crate::pb::Task> for Task {
status,
created_at,
updated_at,
session_params: value.session_parameters,
};
Ok(t)
}
Expand All @@ -227,6 +230,7 @@ pub struct TaskRun {
pub error_code: i64,
pub error_message: Option<String>,
pub root_task_id: String,
pub session_params: BTreeMap<String, String>,
}

// convert from crate::pb::taskRun to struct taskRun
Expand Down Expand Up @@ -305,6 +309,7 @@ impl TryFrom<crate::pb::TaskRun> for TaskRun {
scheduled_at,
completed_at,
root_task_id: value.root_task_id,
session_params: value.session_parameters,
};
Ok(tr)
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/cloud_control/tests/it/task_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl TaskService for MockTaskService {
last_suspended_at: None,
after: vec![],
when_condition: None,
session_parameters: Default::default(),
}),
error: None,
}))
Expand Down Expand Up @@ -196,6 +197,7 @@ async fn test_task_client_success_cases() -> Result<()> {
if_not_exist: false,
after: vec![],
when_condition: None,
session_parameters: Default::default(),
});

let response = client.create_task(request).await?;
Expand Down
66 changes: 45 additions & 21 deletions src/common/hashtable/src/hashjoin_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,56 +209,80 @@ where
count
}

// Using hashes to probe hash table and converting them in-place to pointers for memory reuse.
fn early_filtering_probe(&self, hashes: &mut [u64], bitmap: Option<Bitmap>) -> usize {
// Perform early filtering probe, store matched indexes in `matched_selection` and store unmatched indexes
// in `unmatched_selection`, return the number of matched and unmatched indexes.
fn early_filtering_probe(
&self,
hashes: &mut [u64],
bitmap: Option<Bitmap>,
matched_selection: &mut [u32],
unmatched_selection: &mut [u32],
) -> (usize, usize) {
let mut valids = None;
if let Some(bitmap) = bitmap {
if bitmap.unset_bits() == bitmap.len() {
hashes.iter_mut().for_each(|hash| {
*hash = 0;
});
return 0;
unmatched_selection
.iter_mut()
.enumerate()
.for_each(|(idx, val)| {
*val = idx as u32;
});
return (0, hashes.len());
} else if bitmap.unset_bits() > 0 {
valids = Some(bitmap);
}
}
let mut count = 0;
let mut matched_idx = 0;
let mut unmatched_idx = 0;
match valids {
Some(valids) => {
valids
.iter()
.zip(hashes.iter_mut())
.for_each(|(valid, hash)| {
valids.iter().zip(hashes.iter_mut().enumerate()).for_each(
|(valid, (idx, hash))| {
if valid {
let header = self.pointers[(*hash >> self.hash_shift) as usize];
if header != 0 && early_filtering(header, *hash) {
*hash = remove_header_tag(header);
count += 1;
unsafe {
*matched_selection.get_unchecked_mut(matched_idx) = idx as u32
};
matched_idx += 1;
} else {
*hash = 0;
unsafe {
*unmatched_selection.get_unchecked_mut(unmatched_idx) =
idx as u32
};
unmatched_idx += 1;
}
} else {
*hash = 0;
unsafe {
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
};
unmatched_idx += 1;
}
});
},
);
}
None => {
hashes.iter_mut().for_each(|hash| {
hashes.iter_mut().enumerate().for_each(|(idx, hash)| {
let header = self.pointers[(*hash >> self.hash_shift) as usize];
if header != 0 && early_filtering(header, *hash) {
*hash = remove_header_tag(header);
count += 1;
unsafe { *matched_selection.get_unchecked_mut(matched_idx) = idx as u32 };
matched_idx += 1;
} else {
*hash = 0;
unsafe {
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
};
unmatched_idx += 1;
}
});
}
}
count
(matched_idx, unmatched_idx)
}

// Using hashes to probe hash table and converting them in-place to pointers for memory reuse.
fn early_filtering_probe_with_selection(
// Perform early filtering probe and store matched indexes in `selection`, return the number of matched indexes.
fn early_filtering_matched_probe(
&self,
hashes: &mut [u64],
bitmap: Option<Bitmap>,
Expand Down
58 changes: 41 additions & 17 deletions src/common/hashtable/src/hashjoin_string_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,53 +138,77 @@ where A: Allocator + Clone + 'static
count
}

// Using hashes to probe hash table and converting them in-place to pointers for memory reuse.
fn early_filtering_probe(&self, hashes: &mut [u64], bitmap: Option<Bitmap>) -> usize {
// Perform early filtering probe, store matched indexes in `matched_selection` and store unmatched indexes
// in `unmatched_selection`, return the number of matched and unmatched indexes.
fn early_filtering_probe(
&self,
hashes: &mut [u64],
bitmap: Option<Bitmap>,
matched_selection: &mut [u32],
unmatched_selection: &mut [u32],
) -> (usize, usize) {
let mut valids = None;
if let Some(bitmap) = bitmap {
if bitmap.unset_bits() == bitmap.len() {
hashes.iter_mut().for_each(|hash| {
*hash = 0;
});
return 0;
unmatched_selection
.iter_mut()
.enumerate()
.for_each(|(idx, val)| {
*val = idx as u32;
});
return (0, hashes.len());
} else if bitmap.unset_bits() > 0 {
valids = Some(bitmap);
}
}
let mut count = 0;
let mut matched_idx = 0;
let mut unmatched_idx = 0;
match valids {
Some(valids) => {
hashes.iter_mut().enumerate().for_each(|(idx, hash)| {
if unsafe { valids.get_bit_unchecked(idx) } {
let header = self.pointers[(*hash >> self.hash_shift) as usize];
if header != 0 && early_filtering(header, *hash) {
*hash = remove_header_tag(header);
count += 1;
unsafe {
*matched_selection.get_unchecked_mut(matched_idx) = idx as u32
};
matched_idx += 1;
} else {
*hash = 0;
unsafe {
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
};
unmatched_idx += 1;
}
} else {
*hash = 0;
};
unsafe {
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
};
unmatched_idx += 1;
}
});
}
None => {
hashes.iter_mut().for_each(|hash| {
hashes.iter_mut().enumerate().for_each(|(idx, hash)| {
let header = self.pointers[(*hash >> self.hash_shift) as usize];
if header != 0 && early_filtering(header, *hash) {
*hash = remove_header_tag(header);
count += 1;
unsafe { *matched_selection.get_unchecked_mut(matched_idx) = idx as u32 };
matched_idx += 1;
} else {
*hash = 0;
unsafe {
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
};
unmatched_idx += 1;
}
});
}
}
count
(matched_idx, unmatched_idx)
}

// Using hashes to probe hash table and converting them in-place to pointers for memory reuse.
fn early_filtering_probe_with_selection(
// Perform early filtering probe and store matched indexes in `selection`, return the number of matched indexes.
fn early_filtering_matched_probe(
&self,
hashes: &mut [u64],
bitmap: Option<Bitmap>,
Expand Down
Loading