Skip to content

Commit

Permalink
feat: copy support option COLUMN_MATCH_MODE (databendlabs#16963)
Browse files Browse the repository at this point in the history
* rm unused code.

* remove dup code

* feat: parquet support copy option `COLUMN_MATCH_MODE`
  • Loading branch information
youngsofun authored Nov 29, 2024
1 parent ae0cdc3 commit cc4d3f2
Show file tree
Hide file tree
Showing 24 changed files with 359 additions and 168 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions src/meta/app/src/principal/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use std::fmt::Display;
use std::fmt::Formatter;
use std::str::FromStr;

use databend_common_ast::ast::ColumnMatchMode;
use databend_common_ast::ast::CopyIntoTableOptions;
use databend_common_ast::ast::FileFormatOptions;
use databend_common_ast::ast::FileFormatValue;
use databend_common_exception::ErrorCode;
Expand Down Expand Up @@ -107,6 +109,26 @@ impl FileFormatParams {
}
}

pub fn check_copy_options(&self, options: &mut CopyIntoTableOptions) -> Result<()> {
if let Some(m) = &options.column_match_mode {
match self {
FileFormatParams::Parquet(_) => {
if let ColumnMatchMode::Position = m {
return Err(ErrorCode::BadArguments(
"COLUMN_MATCH_MODE=POSITION not supported yet.",
));
}
}
_ => {
return Err(ErrorCode::BadArguments(
"COLUMN_MATCH_MODE can only apply to Parquet for now.",
));
}
}
}
Ok(())
}

pub fn need_field_default(&self) -> bool {
match self {
FileFormatParams::Parquet(v) => v.missing_field_as == NullAs::FieldDefault,
Expand Down
90 changes: 1 addition & 89 deletions src/meta/app/src/principal/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::str::FromStr;

use chrono::DateTime;
use chrono::Utc;
pub use databend_common_ast::ast::OnErrorMode;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_io::constants::NAN_BYTES_SNAKE;
Expand Down Expand Up @@ -400,95 +401,6 @@ pub struct StageParams {
pub storage: StorageParams,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Copy)]
pub enum OnErrorMode {
Continue,
SkipFileNum(u64),
AbortNum(u64),
}

impl Default for OnErrorMode {
fn default() -> Self {
Self::AbortNum(1)
}
}

impl FromStr for OnErrorMode {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, String> {
match s.to_uppercase().as_str() {
"" | "ABORT" => Ok(OnErrorMode::AbortNum(1)),
"CONTINUE" => Ok(OnErrorMode::Continue),
"SKIP_FILE" => Ok(OnErrorMode::SkipFileNum(1)),
v => {
if v.starts_with("ABORT_") {
let num_str = v.replace("ABORT_", "");
let nums = num_str.parse::<u64>();
match nums {
Ok(n) if n < 1 => {
Err("OnError mode `ABORT_<num>` num must be greater than 0".to_string())
}
Ok(n) => Ok(OnErrorMode::AbortNum(n)),
Err(_) => Err(format!(
"Unknown OnError mode:{:?}, must one of {{ CONTINUE | SKIP_FILE | SKIP_FILE_<num> | ABORT | ABORT_<num> }}",
v
)),
}
} else {
let num_str = v.replace("SKIP_FILE_", "");
let nums = num_str.parse::<u64>();
match nums {
Ok(n) if n < 1 => {
Err("OnError mode `SKIP_FILE_<num>` num must be greater than 0"
.to_string())
}
Ok(n) => Ok(OnErrorMode::SkipFileNum(n)),
Err(_) => Err(format!(
"Unknown OnError mode:{:?}, must one of {{ CONTINUE | SKIP_FILE | SKIP_FILE_<num> | ABORT | ABORT_<num> }}",
v
)),
}
}
}
}
}
}

impl Display for OnErrorMode {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
OnErrorMode::Continue => {
write!(f, "continue")
}
OnErrorMode::SkipFileNum(n) => {
if *n <= 1 {
write!(f, "skipfile")
} else {
write!(f, "skipfile_{}", n)
}
}
OnErrorMode::AbortNum(n) => {
if *n <= 1 {
write!(f, "abort")
} else {
write!(f, "abort_{}", n)
}
}
}
}
}

impl From<databend_common_ast::ast::OnErrorMode> for OnErrorMode {
fn from(opt: databend_common_ast::ast::OnErrorMode) -> Self {
match opt {
databend_common_ast::ast::OnErrorMode::Continue => OnErrorMode::Continue,
databend_common_ast::ast::OnErrorMode::SkipFileNum(n) => OnErrorMode::SkipFileNum(n),
databend_common_ast::ast::OnErrorMode::AbortNum(n) => OnErrorMode::AbortNum(n),
}
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Default, Debug, Eq, PartialEq)]
#[serde(default)]
pub struct CopyOptions {
Expand Down
108 changes: 70 additions & 38 deletions src/query/ast/src/ast/statements/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ impl CopyIntoTableStmt {
CopyIntoTableOption::DisableVariantCheck(v) => self.options.disable_variant_check = v,
CopyIntoTableOption::ReturnFailedOnly(v) => self.options.return_failed_only = v,
CopyIntoTableOption::OnError(v) => self.options.on_error = OnErrorMode::from_str(&v)?,
CopyIntoTableOption::ColumnMatchMode(v) => {
self.options.column_match_mode = Some(ColumnMatchMode::from_str(&v)?)
}
}
Ok(())
}
Expand Down Expand Up @@ -111,37 +114,7 @@ impl Display for CopyIntoTableStmt {
if !self.file_format.is_empty() {
write!(f, " FILE_FORMAT = ({})", self.file_format)?;
}

if !self.options.validation_mode.is_empty() {
write!(f, "VALIDATION_MODE = {}", self.options.validation_mode)?;
}

if self.options.size_limit != 0 {
write!(f, " SIZE_LIMIT = {}", self.options.size_limit)?;
}

if self.options.max_files != 0 {
write!(f, " MAX_FILES = {}", self.options.max_files)?;
}

if self.options.split_size != 0 {
write!(f, " SPLIT_SIZE = {}", self.options.split_size)?;
}

write!(f, " PURGE = {}", self.options.purge)?;
write!(f, " FORCE = {}", self.options.force)?;
write!(
f,
" DISABLE_VARIANT_CHECK = {}",
self.options.disable_variant_check
)?;
write!(f, " ON_ERROR = {}", self.options.on_error)?;
write!(
f,
" RETURN_FAILED_ONLY = {}",
self.options.return_failed_only
)?;

write!(f, " {}", self.options)?;
Ok(())
}
}
Expand All @@ -159,6 +132,7 @@ pub struct CopyIntoTableOptions {
pub disable_variant_check: bool,
pub return_failed_only: bool,
pub validation_mode: String,
pub column_match_mode: Option<ColumnMatchMode>,
}

impl CopyIntoTableOptions {
Expand All @@ -169,6 +143,10 @@ impl CopyIntoTableOptions {
bool::from_str(v).map_err(|e| format!("can not parse {}={} as bool: {}", k, v, e))
}

pub fn set_column_match_mode(&mut self, mode: ColumnMatchMode) {
self.column_match_mode = Some(mode);
}

pub fn apply(
&mut self,
opts: &BTreeMap<String, String>,
Expand All @@ -183,6 +161,10 @@ impl CopyIntoTableOptions {
let on_error = OnErrorMode::from_str(v)?;
self.on_error = on_error;
}
"column_match_mode" => {
let column_match_mode = ColumnMatchMode::from_str(v)?;
self.column_match_mode = Some(column_match_mode);
}
"size_limit" => {
self.size_limit = Self::parse_uint(k, v)?;
}
Expand Down Expand Up @@ -214,13 +196,30 @@ impl CopyIntoTableOptions {

impl Display for CopyIntoTableOptions {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "OnErrorMode {}", self.on_error)?;
write!(f, "SizeLimit {}", self.size_limit)?;
write!(f, "MaxFiles {}", self.max_files)?;
write!(f, "SplitSize {}", self.split_size)?;
write!(f, "Purge {}", self.purge)?;
write!(f, "DisableVariantCheck {}", self.disable_variant_check)?;
write!(f, "ReturnFailedOnly {}", self.return_failed_only)?;
if !self.validation_mode.is_empty() {
write!(f, "VALIDATION_MODE = {}", self.validation_mode)?;
}

if self.size_limit != 0 {
write!(f, " SIZE_LIMIT = {}", self.size_limit)?;
}

if self.max_files != 0 {
write!(f, " MAX_FILES = {}", self.max_files)?;
}

if self.split_size != 0 {
write!(f, " SPLIT_SIZE = {}", self.split_size)?;
}

write!(f, " PURGE = {}", self.purge)?;
write!(f, " FORCE = {}", self.force)?;
write!(f, " DISABLE_VARIANT_CHECK = {}", self.disable_variant_check)?;
write!(f, " ON_ERROR = {}", self.on_error)?;
write!(f, " RETURN_FAILED_ONLY = {}", self.return_failed_only)?;
if let Some(mode) = &self.column_match_mode {
write!(f, " COLUMN_MATCH_MODE = {}", mode)?;
}
Ok(())
}
}
Expand Down Expand Up @@ -572,6 +571,7 @@ pub enum CopyIntoTableOption {
DisableVariantCheck(bool),
ReturnFailedOnly(bool),
OnError(String),
ColumnMatchMode(String),
}

pub enum CopyIntoLocationOption {
Expand Down Expand Up @@ -714,3 +714,35 @@ impl FromStr for OnErrorMode {
}
}
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Eq)]
pub enum ColumnMatchMode {
CaseSensitive,
CaseInsensitive,
Position,
}

impl Display for ColumnMatchMode {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
ColumnMatchMode::CaseSensitive => write!(f, "CASE_SENSITIVE"),
ColumnMatchMode::CaseInsensitive => write!(f, "CASE_INSENSITIVE"),
ColumnMatchMode::Position => write!(f, "POSITION"),
}
}
}

const COLUMN_MATCH_MODE_MSG: &str =
"ColumnMatchMode must be one of {{ CASE_SENSITIVE | CASE_INSENSITIVE | POSITION }}";
impl FromStr for ColumnMatchMode {
type Err = &'static str;

fn from_str(s: &str) -> std::result::Result<Self, &'static str> {
match s.to_uppercase().as_str() {
"CASE_SENSITIVE" => Ok(Self::CaseSensitive),
"CASE_INSENSITIVE" => Ok(Self::CaseInsensitive),
"POSITION" => Ok(Self::Position),
_ => Err(COLUMN_MATCH_MODE_MSG),
}
}
}
8 changes: 8 additions & 0 deletions src/query/ast/src/ast/statements/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub enum SelectStageOption {
Pattern(LiteralStringOrVariable),
FileFormat(String),
Connection(BTreeMap<String, String>),
CaseSensitive(bool),
}

impl SelectStageOptions {
Expand All @@ -83,6 +84,7 @@ impl SelectStageOptions {
SelectStageOption::Pattern(v) => options.pattern = Some(v),
SelectStageOption::FileFormat(v) => options.file_format = Some(v),
SelectStageOption::Connection(v) => options.connection = v,
SelectStageOption::CaseSensitive(v) => options.case_sensitive = Some(v),
}
}
options
Expand All @@ -95,6 +97,7 @@ pub struct SelectStageOptions {
pub pattern: Option<LiteralStringOrVariable>,
pub file_format: Option<String>,
pub connection: BTreeMap<String, String>,
pub case_sensitive: Option<bool>,
}

impl SelectStageOptions {
Expand All @@ -103,6 +106,7 @@ impl SelectStageOptions {
&& self.pattern.is_none()
&& self.file_format.is_none()
&& self.connection.is_empty()
&& self.case_sensitive.is_none()
}
}

Expand Down Expand Up @@ -139,6 +143,10 @@ impl Display for SelectStageOptions {
write!(f, " PATTERN => {},", pattern)?;
}

if let Some(case_sensitive) = self.case_sensitive {
write!(f, " CASE_SENSITIVE => {},", case_sensitive)?;
}

if !self.connection.is_empty() {
write!(f, " CONNECTION => (")?;
write_comma_separated_string_map(f, &self.connection)?;
Expand Down
4 changes: 4 additions & 0 deletions src/query/ast/src/parser/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ fn copy_into_table_option(i: Input) -> IResult<CopyIntoTableOption> {
map(rule! { ON_ERROR ~ "=" ~ #ident }, |(_, _, on_error)| {
CopyIntoTableOption::OnError(on_error.to_string())
}),
map(
rule! { COLUMN_MATCH_MODE ~ "=" ~ #ident },
|(_, _, mode)| CopyIntoTableOption::ColumnMatchMode(mode.to_string()),
),
map(
rule! { DISABLE_VARIANT_CHECK ~ "=" ~ #literal_bool },
|(_, _, disable_variant_check)| {
Expand Down
4 changes: 4 additions & 0 deletions src/query/ast/src/parser/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,5 +262,9 @@ pub fn select_stage_option(i: Input) -> IResult<SelectStageOption> {
rule! { CONNECTION ~ ^"=>" ~ ^#connection_options },
|(_, _, file_format)| SelectStageOption::Connection(file_format),
),
map(
rule! { CASE_SENSITIVE ~ ^"=>" ~ ^#literal_bool },
|(_, _, case_sensitive)| SelectStageOption::CaseSensitive(case_sensitive),
),
))(i)
}
4 changes: 4 additions & 0 deletions src/query/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,8 @@ pub enum TokenKind {
CALL,
#[token("CASE", ignore(ascii_case))]
CASE,
#[token("CASE_SENSITIVE", ignore(ascii_case))]
CASE_SENSITIVE,
#[token("CAST", ignore(ascii_case))]
CAST,
#[token("CATALOG", ignore(ascii_case))]
Expand Down Expand Up @@ -455,6 +457,8 @@ pub enum TokenKind {
CHAR,
#[token("COLUMN", ignore(ascii_case))]
COLUMN,
#[token("COLUMN_MATCH_MODE", ignore(ascii_case))]
COLUMN_MATCH_MODE,
#[token("COLUMNS", ignore(ascii_case))]
COLUMNS,
#[token("CHARACTER", ignore(ascii_case))]
Expand Down
Loading

0 comments on commit cc4d3f2

Please sign in to comment.