diff --git a/be/src/common/config.h b/be/src/common/config.h index 76b240818cb9f..9616f0ea4d92e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1512,4 +1512,8 @@ CONF_mInt32(batch_write_rpc_request_retry_interval_ms, "500"); CONF_mInt32(batch_write_rpc_reqeust_timeout_ms, "10000"); CONF_mInt32(batch_write_poll_load_status_interval_ms, "200"); CONF_mBool(batch_write_trace_log_enable, "false"); + +// ignore union type tag in avro kafka routine load +CONF_mBool(avro_ignore_union_type_tag, "false"); + } // namespace starrocks::config diff --git a/be/src/formats/avro/binary_column.cpp b/be/src/formats/avro/binary_column.cpp index bdb44fc8b2300..60969b8b3a116 100644 --- a/be/src/formats/avro/binary_column.cpp +++ b/be/src/formats/avro/binary_column.cpp @@ -14,6 +14,10 @@ #include "binary_column.h" +#include +#include +#include + #include "column/binary_column.h" #include "column/json_column.h" #include "common/status.h" @@ -199,15 +203,238 @@ static Status add_column_with_boolean_value(BinaryColumn* column, const TypeDesc return Status::OK(); } +static Status avro_value_to_rapidjson(const avro_value_t& value, rapidjson::Document::AllocatorType& allocator, + rapidjson::Value& out) { + switch (avro_value_get_type(&value)) { + case AVRO_STRING: { + const char* in; + size_t size; + if (avro_value_get_string(&value, &in, &size) != 0) { + return Status::InvalidArgument(strings::Substitute("Get string value error $0", avro_strerror())); + } + out.SetString(in, allocator); + return Status::OK(); + } + case AVRO_BYTES: { + const char* in; + size_t size; + if (avro_value_get_fixed(&value, (const void**)&in, &size) != 0) { + return Status::InvalidArgument(strings::Substitute("Get string value error $0", avro_strerror())); + } + out.SetString(in, allocator); + return Status::OK(); + } + case AVRO_INT32: { + int32_t in; + if (avro_value_get_int(&value, &in) != 0) { + return Status::InvalidArgument(strings::Substitute("Get int32 value error $0", avro_strerror())); + } + out.SetInt(in); + return Status::OK(); + } + case AVRO_INT64: { + int64_t in; + if (avro_value_get_long(&value, &in) != 0) { + return Status::InvalidArgument(strings::Substitute("Get int64 value error $0", avro_strerror())); + } + out.SetInt64(in); + return Status::OK(); + } + case AVRO_FLOAT: { + float in; + if (avro_value_get_float(&value, &in) != 0) { + return Status::InvalidArgument(strings::Substitute("Get float value error $0", avro_strerror())); + } + out.SetFloat(in); + return Status::OK(); + } + case AVRO_DOUBLE: { + double in; + if (avro_value_get_double(&value, &in) != 0) { + return Status::InvalidArgument(strings::Substitute("Get double value error $0", avro_strerror())); + } + out.SetDouble(in); + return Status::OK(); + } + case AVRO_BOOLEAN: { + int in; + if (avro_value_get_boolean(&value, &in) != 0) { + return Status::InvalidArgument(strings::Substitute("Get boolean value error $0", avro_strerror())); + } + out.SetBool(in); + return Status::OK(); + } + case AVRO_NULL: { + out.SetNull(); + return Status::OK(); + } + case AVRO_RECORD: { + size_t field_count = 0; + if (avro_value_get_size(&value, &field_count) != 0) { + return Status::InvalidArgument(strings::Substitute("Get record field count error $0", avro_strerror())); + } + + out.SetObject(); + for (size_t i = 0; i < field_count; ++i) { + avro_value_t field_value; + const char* field_name; + if (avro_value_get_by_index(&value, i, &field_value, &field_name) != 0) { + return Status::InvalidArgument(strings::Substitute("Get record field error $0", avro_strerror())); + } + + rapidjson::Value field_name_val; + field_name_val.SetString(field_name, allocator); + rapidjson::Value field_value_val; + RETURN_IF_ERROR(avro_value_to_rapidjson(field_value, allocator, field_value_val)); + out.AddMember(field_name_val, field_value_val, allocator); + } + return Status::OK(); + } + case AVRO_ENUM: { + avro_schema_t enum_schema; + int symbol_value; + if (avro_value_get_enum(&value, &symbol_value) != 0) { + return Status::InvalidArgument(strings::Substitute("Get enum value error $0", avro_strerror())); + } + + enum_schema = avro_value_get_schema(&value); + const char* symbol_name; + symbol_name = avro_schema_enum_get(enum_schema, symbol_value); + out.SetString(symbol_name, allocator); + return Status::OK(); + } + case AVRO_FIXED: { + const char* in; + size_t size; + if (avro_value_get_fixed(&value, (const void**)&in, &size) != 0) { + return Status::InvalidArgument(strings::Substitute("Get fixed value error $0", avro_strerror())); + } + out.SetString(in, allocator); + return Status::OK(); + } + case AVRO_MAP: { + size_t map_size = 0; + if (avro_value_get_size(&value, &map_size) != 0) { + return Status::InvalidArgument(strings::Substitute("Get map size error $0", avro_strerror())); + } + + out.SetObject(); + for (int i = 0; i < map_size; ++i) { + const char* key; + avro_value_t map_value; + if (avro_value_get_by_index(&value, i, &map_value, &key) != 0) { + return Status::InvalidArgument(strings::Substitute("Get map key value error $0", avro_strerror())); + } + + rapidjson::Value key_val; + key_val.SetString(key, allocator); + rapidjson::Value value_val; + RETURN_IF_ERROR(avro_value_to_rapidjson(map_value, allocator, value_val)); + out.AddMember(key_val, value_val, allocator); + } + return Status::OK(); + } + case AVRO_ARRAY: { + size_t array_size = 0; + if (avro_value_get_size(&value, &array_size) != 0) { + return Status::InvalidArgument(strings::Substitute("Get array size error $0", avro_strerror())); + } + + out.SetArray(); + for (int i = 0; i < array_size; ++i) { + avro_value_t element; + if (avro_value_get_by_index(&value, i, &element, nullptr) != 0) { + return Status::InvalidArgument(strings::Substitute("Get array element error $0", avro_strerror())); + } + + rapidjson::Value element_value; + RETURN_IF_ERROR(avro_value_to_rapidjson(element, allocator, element_value)); + out.PushBack(element_value, allocator); + } + return Status::OK(); + } + case AVRO_UNION: { + avro_value_t union_value; + if (avro_value_get_current_branch(&value, &union_value) != 0) { + return Status::InvalidArgument(strings::Substitute("Get union value error $0", avro_strerror())); + } + RETURN_IF_ERROR(avro_value_to_rapidjson(union_value, allocator, out)); + return Status::OK(); + } + default: + return Status::InvalidArgument("Unsupported avro type"); + } +} + +// Convert an avro value to a json object using rapidjson. +// Different from avro `avro_value_to_json`, this function will ignore the union type tags. +// +// schema: +// { +// "type": "record", +// "name": "User", +// "fields": [ +// {"name": "id", "type": "int"}, +// {"name": "name", "type": "string"}, +// {"name": "email", "type": ["null", +// { +// "type": "record", +// "name": "email2", +// "fields": [ +// { +// "name": "x", +// "type" : ["null", "int"] +// }, +// { +// "name": "y", +// "type": ["null", "string"] +// } +// ] +// } +// ] +// } +// ] +// } +// +// avro `avro_value_to_json` result: +// {"id": 1, "name": "Alice", "email": {"email2": {"x": {"int": 1}, "y": {"string": "alice@example.com"}}}} +// +// this function result: +// {"id":1,"name":"Alice","email":{"x":1,"y":"alice@example.com"}} +static Status avro_value_to_json_str(const avro_value_t& value, std::string* json_str) { + rapidjson::Document doc; + auto& allocator = doc.GetAllocator(); + rapidjson::Value root; + RETURN_IF_ERROR(avro_value_to_rapidjson(value, allocator, root)); + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + root.Accept(writer); + json_str->append(buffer.GetString(), buffer.GetSize()); + return Status::OK(); +} + static Status add_column_with_array_object_value(BinaryColumn* column, const TypeDescriptor& type_desc, const std::string& name, const avro_value_t& value) { - char* as_json; - if (avro_value_to_json(&value, 1, &as_json)) { - LOG(ERROR) << "avro to json failed: %s" << avro_strerror(); - return Status::InternalError("avro to json failed"); + if (config::avro_ignore_union_type_tag) { + std::string json_str; + auto st = avro_value_to_json_str(value, &json_str); + if (!st.ok()) { + return Status::InternalError( + strings::Substitute("avro to json failed. column=$0, err=$1", name, st.message())); + } + + column->append(Slice(json_str)); + } else { + char* as_json; + if (avro_value_to_json(&value, 1, &as_json)) { + LOG(WARNING) << "avro to json failed: %s" << avro_strerror(); + return Status::InternalError( + strings::Substitute("avro to json failed. column=$0, err=$1", name, avro_strerror())); + } + DeferOp json_deleter([&] { free(as_json); }); + column->append(Slice(as_json)); } - column->append(Slice(as_json)); - free(as_json); return Status::OK(); } @@ -251,18 +478,34 @@ Status add_binary_column(Column* column, const TypeDescriptor& type_desc, const Status add_native_json_column(Column* column, const TypeDescriptor& type_desc, const std::string& name, const avro_value_t& value) { - auto json_column = down_cast(column); - char* as_json; - if (avro_value_to_json(&value, 1, &as_json)) { - LOG(ERROR) << "avro to json failed: %s" << avro_strerror(); - return Status::InternalError("avro to json failed"); - } - DeferOp json_deleter([&] { free(as_json); }); JsonValue json_value; - Status s = JsonValue::parse(as_json, &json_value); - if (!s.ok()) { - return Status::InternalError("parse json failed"); + Status st; + if (config::avro_ignore_union_type_tag) { + std::string json_str; + st = avro_value_to_json_str(value, &json_str); + if (!st.ok()) { + return Status::InternalError( + strings::Substitute("avro to json failed. column=$0, err=$1", name, st.message())); + } + + st = JsonValue::parse(Slice(json_str), &json_value); + } else { + char* as_json; + if (avro_value_to_json(&value, 1, &as_json)) { + LOG(WARNING) << "avro to json failed: %s" << avro_strerror(); + return Status::InternalError( + strings::Substitute("avro to json failed. column=$0, err=$1", name, avro_strerror())); + } + + DeferOp json_deleter([&] { free(as_json); }); + st = JsonValue::parse(as_json, &json_value); + } + + if (!st.ok()) { + return Status::InternalError(strings::Substitute("parse json failed. column=$0, err=$1", name, st.message())); } + + auto json_column = down_cast(column); json_column->append(std::move(json_value)); return Status::OK(); } diff --git a/be/test/exec/avro_scanner_test.cpp b/be/test/exec/avro_scanner_test.cpp index eddccffb889d9..0b6556d7fd40d 100644 --- a/be/test/exec/avro_scanner_test.cpp +++ b/be/test/exec/avro_scanner_test.cpp @@ -109,7 +109,7 @@ class AvroScannerTest : public ::testing::Test { std::string starrocks_home = getenv("STARROCKS_HOME"); } - void TearDown() override {} + void TearDown() override { config::avro_ignore_union_type_tag = false; } void init_avro_value(std::string schema_path, AvroHelper& avro_helper) { std::ifstream infile_schema; @@ -169,6 +169,11 @@ TEST_F(AvroScannerTest, test_basic_type) { avro_value_set_boolean(&boolean_value, true); } + avro_value_t int_value; + if (avro_value_get_by_name(&avro_helper.avro_val, "inttype", &int_value, NULL) == 0) { + avro_value_set_int(&int_value, 10); + } + avro_value_t long_value; if (avro_value_get_by_name(&avro_helper.avro_val, "longtype", &long_value, NULL) == 0) { avro_value_set_long(&long_value, 4294967296); @@ -193,6 +198,7 @@ TEST_F(AvroScannerTest, test_basic_type) { std::vector types; types.emplace_back(TYPE_BOOLEAN); + types.emplace_back(TYPE_INT); types.emplace_back(TYPE_BIGINT); types.emplace_back(TYPE_DOUBLE); types.emplace_back(TypeDescriptor::create_varchar_type(20)); @@ -204,9 +210,9 @@ TEST_F(AvroScannerTest, test_basic_type) { range.__set_path(data_path); ranges.emplace_back(range); - auto scanner = - create_avro_scanner(types, ranges, {"booleantype", "longtype", "doubletype", "stringtype", "enumtype"}, - avro_helper.schema_text); + auto scanner = create_avro_scanner(types, ranges, + {"booleantype", "inttype", "longtype", "doubletype", "stringtype", "enumtype"}, + avro_helper.schema_text); Status st = scanner->open(); ASSERT_TRUE(st.ok()); @@ -215,13 +221,153 @@ TEST_F(AvroScannerTest, test_basic_type) { ASSERT_TRUE(st2.ok()); ChunkPtr chunk = st2.value(); - EXPECT_EQ(5, chunk->num_columns()); + EXPECT_EQ(6, chunk->num_columns()); EXPECT_EQ(1, chunk->num_rows()); EXPECT_EQ(1, chunk->get(0)[0].get_int8()); - EXPECT_EQ(4294967296, chunk->get(0)[1].get_int64()); - EXPECT_FLOAT_EQ(1.234567, chunk->get(0)[2].get_double()); - EXPECT_EQ("abcdefg", chunk->get(0)[3].get_slice()); - EXPECT_EQ("DIAMONDS", chunk->get(0)[4].get_slice()); + EXPECT_EQ(10, chunk->get(0)[1].get_int32()); + EXPECT_EQ(4294967296, chunk->get(0)[2].get_int64()); + EXPECT_FLOAT_EQ(1.234567, chunk->get(0)[3].get_double()); + EXPECT_EQ("abcdefg", chunk->get(0)[4].get_slice()); + EXPECT_EQ("DIAMONDS", chunk->get(0)[5].get_slice()); +} + +TEST_F(AvroScannerTest, test_basic_type_to_json_or_string) { + std::string schema_path = "./be/test/exec/test_data/avro_scanner/avro_basic_schema.json"; + AvroHelper avro_helper; + init_avro_value(schema_path, avro_helper); + DeferOp avro_helper_deleter([&] { + avro_schema_decref(avro_helper.schema); + avro_value_iface_decref(avro_helper.iface); + avro_value_decref(&avro_helper.avro_val); + }); + + avro_value_t boolean_value; + if (avro_value_get_by_name(&avro_helper.avro_val, "booleantype", &boolean_value, NULL) == 0) { + avro_value_set_boolean(&boolean_value, true); + } + + avro_value_t int_value; + if (avro_value_get_by_name(&avro_helper.avro_val, "inttype", &int_value, NULL) == 0) { + avro_value_set_int(&int_value, 10); + } + + avro_value_t long_value; + if (avro_value_get_by_name(&avro_helper.avro_val, "longtype", &long_value, NULL) == 0) { + avro_value_set_long(&long_value, 4294967296); + } + + avro_value_t double_value; + if (avro_value_get_by_name(&avro_helper.avro_val, "doubletype", &double_value, NULL) == 0) { + avro_value_set_double(&double_value, 1.234567); + } + + avro_value_t string_value; + if (avro_value_get_by_name(&avro_helper.avro_val, "stringtype", &string_value, NULL) == 0) { + avro_value_set_string(&string_value, "abcdefg"); + } + + avro_value_t enum_value; + if (avro_value_get_by_name(&avro_helper.avro_val, "enumtype", &enum_value, NULL) == 0) { + avro_value_set_enum(&enum_value, 2); + } + std::string data_path = "./be/test/exec/test_data/avro_scanner/tmp/avro_basic_data.json"; + write_avro_data(avro_helper, data_path); + + std::vector ranges; + TBrokerRangeDesc range; + range.format_type = TFileFormatType::FORMAT_AVRO; + range.__isset.jsonpaths = true; + range.jsonpaths = R"(["$"])"; + range.__set_path(data_path); + ranges.emplace_back(range); + + // json + { + config::avro_ignore_union_type_tag = false; + + auto scanner = create_avro_scanner({TypeDescriptor::create_json_type()}, ranges, {"jsontype"}, + avro_helper.schema_text); + + Status st = scanner->open(); + ASSERT_TRUE(st.ok()); + + auto st2 = scanner->get_next(); + ASSERT_TRUE(st2.ok()); + + ChunkPtr chunk = st2.value(); + EXPECT_EQ(1, chunk->num_columns()); + EXPECT_EQ(1, chunk->num_rows()); + const JsonValue* json = chunk->get(0)[0].get_json(); + EXPECT_EQ( + "{\"booleantype\": true, \"doubletype\": 1.234567, \"enumtype\": \"DIAMONDS\", \"inttype\": 10, " + "\"longtype\": 4294967296, \"stringtype\": \"abcdefg\"}", + json->to_string_uncheck()); + } + + { + config::avro_ignore_union_type_tag = true; + + auto scanner = create_avro_scanner({TypeDescriptor::create_json_type()}, ranges, {"jsontype"}, + avro_helper.schema_text); + + Status st = scanner->open(); + ASSERT_TRUE(st.ok()); + + auto st2 = scanner->get_next(); + ASSERT_TRUE(st2.ok()); + + ChunkPtr chunk = st2.value(); + EXPECT_EQ(1, chunk->num_columns()); + EXPECT_EQ(1, chunk->num_rows()); + const JsonValue* json = chunk->get(0)[0].get_json(); + EXPECT_EQ( + "{\"booleantype\": true, \"doubletype\": 1.234567, \"enumtype\": \"DIAMONDS\", \"inttype\": 10, " + "\"longtype\": 4294967296, \"stringtype\": \"abcdefg\"}", + json->to_string_uncheck()); + } + + // string + { + config::avro_ignore_union_type_tag = false; + + auto scanner = create_avro_scanner({TypeDescriptor::create_varchar_type(300)}, ranges, {"jsontype"}, + avro_helper.schema_text); + + Status st = scanner->open(); + ASSERT_TRUE(st.ok()); + + auto st2 = scanner->get_next(); + ASSERT_TRUE(st2.ok()); + + ChunkPtr chunk = st2.value(); + EXPECT_EQ(1, chunk->num_columns()); + EXPECT_EQ(1, chunk->num_rows()); + EXPECT_EQ( + "{\"booleantype\": true, \"inttype\": 10, \"longtype\": 4294967296, \"doubletype\": 1.234567, " + "\"stringtype\": \"abcdefg\", \"enumtype\": \"DIAMONDS\"}", + chunk->get(0)[0].get_slice()); + } + + { + config::avro_ignore_union_type_tag = true; + + auto scanner = create_avro_scanner({TypeDescriptor::create_varchar_type(300)}, ranges, {"jsontype"}, + avro_helper.schema_text); + + Status st = scanner->open(); + ASSERT_TRUE(st.ok()); + + auto st2 = scanner->get_next(); + ASSERT_TRUE(st2.ok()); + + ChunkPtr chunk = st2.value(); + EXPECT_EQ(1, chunk->num_columns()); + EXPECT_EQ(1, chunk->num_rows()); + EXPECT_EQ( + "{\"booleantype\":true,\"inttype\":10,\"longtype\":4294967296,\"doubletype\":1.234567,\"stringtype\":" + "\"abcdefg\",\"enumtype\":\"DIAMONDS\"}", + chunk->get(0)[0].get_slice()); + } } TEST_F(AvroScannerTest, test_preprocess_jsonpaths) { @@ -761,6 +907,114 @@ TEST_F(AvroScannerTest, test_complex_schema) { EXPECT_EQ("klj", chunk->get(0)[3].get_slice()); } +TEST_F(AvroScannerTest, test_complex_schema_to_json) { + std::string schema_path = "./be/test/exec/test_data/avro_scanner/avro_complex_schema.json"; + AvroHelper avro_helper; + init_avro_value(schema_path, avro_helper); + DeferOp avro_helper_deleter([&] { + avro_schema_decref(avro_helper.schema); + avro_value_iface_decref(avro_helper.iface); + avro_value_decref(&avro_helper.avro_val); + }); + + avro_value_t decoded_logs_value; + avro_value_set_branch(&avro_helper.avro_val, 1, &decoded_logs_value); + avro_value_t id_value; + if (avro_value_get_by_name(&decoded_logs_value, "id", &id_value, NULL) == 0) { + avro_value_set_string(&id_value, "12345"); + } + + avro_value_t event_signature_val; + if (avro_value_get_by_name(&decoded_logs_value, "eventsignature", &event_signature_val, NULL) == 0) { + avro_value_t null_vale; + avro_value_set_branch(&event_signature_val, 0, &null_vale); + avro_value_set_null(&null_vale); + } + + avro_value_t event_params_val; + if (avro_value_get_by_name(&decoded_logs_value, "eventparams", &event_params_val, NULL) == 0) { + avro_value_t array_value; + avro_value_set_branch(&event_params_val, 1, &array_value); + + avro_value_t ele1; + avro_value_append(&array_value, &ele1, NULL); + avro_value_set_string(&ele1, "abc"); + + avro_value_t ele2; + avro_value_append(&array_value, &ele2, NULL); + avro_value_set_string(&ele2, "def"); + } + + avro_value_t raw_log_val; + if (avro_value_get_by_name(&decoded_logs_value, "rawlog", &raw_log_val, NULL) == 0) { + avro_value_t record_value; + avro_value_set_branch(&raw_log_val, 1, &record_value); + + avro_value_t id_value; + if (avro_value_get_by_name(&record_value, "id", &id_value, NULL) == 0) { + avro_value_set_string(&id_value, "iop"); + } + avro_value_t data_value; + if (avro_value_get_by_name(&record_value, "data", &data_value, NULL) == 0) { + avro_value_set_string(&data_value, "klj"); + } + } + + std::string data_path = "./be/test/exec/test_data/avro_scanner/tmp/avro_complex_data.json"; + write_avro_data(avro_helper, data_path); + + std::vector types; + types.emplace_back(TYPE_JSON); + + std::vector ranges; + TBrokerRangeDesc range; + range.format_type = TFileFormatType::FORMAT_AVRO; + range.__isset.jsonpaths = true; + range.jsonpaths = R"(["$"])"; + range.__set_path(data_path); + ranges.emplace_back(range); + + { + config::avro_ignore_union_type_tag = false; + + auto scanner = create_avro_scanner(types, ranges, {"jsontype"}, avro_helper.schema_text); + Status st = scanner->open(); + ASSERT_TRUE(st.ok()); + + auto st2 = scanner->get_next(); + ASSERT_TRUE(st2.ok()); + + ChunkPtr chunk = st2.value(); + EXPECT_EQ(1, chunk->num_columns()); + EXPECT_EQ(1, chunk->num_rows()); + const JsonValue* json = chunk->get(0)[0].get_json(); + EXPECT_EQ( + "{\"eventparams\": {\"array\": [\"abc\", \"def\"]}, \"eventsignature\": null, \"id\": \"12345\", " + "\"rawlog\": {\"logs\": {\"data\": \"klj\", \"id\": \"iop\"}}}", + json->to_string_uncheck()); + } + + { + config::avro_ignore_union_type_tag = true; + + auto scanner = create_avro_scanner(types, ranges, {"jsontype"}, avro_helper.schema_text); + Status st = scanner->open(); + ASSERT_TRUE(st.ok()); + + auto st2 = scanner->get_next(); + ASSERT_TRUE(st2.ok()); + + ChunkPtr chunk = st2.value(); + EXPECT_EQ(1, chunk->num_columns()); + EXPECT_EQ(1, chunk->num_rows()); + const JsonValue* json = chunk->get(0)[0].get_json(); + EXPECT_EQ( + "{\"eventparams\": [\"abc\", \"def\"], \"eventsignature\": null, \"id\": \"12345\", \"rawlog\": " + "{\"data\": \"klj\", \"id\": \"iop\"}}", + json->to_string_uncheck()); + } +} + TEST_F(AvroScannerTest, test_complex_schema_null_data) { std::string schema_path = "./be/test/exec/test_data/avro_scanner/avro_complex_schema.json"; AvroHelper avro_helper; @@ -846,6 +1100,108 @@ TEST_F(AvroScannerTest, test_complex_schema_null_data) { EXPECT_TRUE(chunk->get(0)[3].is_null()); } +TEST_F(AvroScannerTest, test_complex_schema_null_data_to_json) { + std::string schema_path = "./be/test/exec/test_data/avro_scanner/avro_complex_schema.json"; + AvroHelper avro_helper; + init_avro_value(schema_path, avro_helper); + DeferOp avro_helper_deleter([&] { + avro_schema_decref(avro_helper.schema); + avro_value_iface_decref(avro_helper.iface); + avro_value_decref(&avro_helper.avro_val); + }); + + avro_value_t decoded_logs_value; + avro_value_set_branch(&avro_helper.avro_val, 1, &decoded_logs_value); + avro_value_t id_value; + if (avro_value_get_by_name(&decoded_logs_value, "id", &id_value, NULL) == 0) { + avro_value_set_string(&id_value, "12345"); + } + + avro_value_t event_signature_val; + if (avro_value_get_by_name(&decoded_logs_value, "eventsignature", &event_signature_val, NULL) == 0) { + avro_value_t null_vale; + avro_value_set_branch(&event_signature_val, 0, &null_vale); + avro_value_set_null(&null_vale); + } + + avro_value_t event_params_val; + if (avro_value_get_by_name(&decoded_logs_value, "eventparams", &event_params_val, NULL) == 0) { + avro_value_t array_value; + avro_value_set_branch(&event_params_val, 1, &array_value); + + avro_value_t ele1; + avro_value_append(&array_value, &ele1, NULL); + avro_value_set_string(&ele1, "abc"); + + avro_value_t ele2; + avro_value_append(&array_value, &ele2, NULL); + avro_value_set_string(&ele2, "def"); + } + + avro_value_t raw_log_val; + if (avro_value_get_by_name(&decoded_logs_value, "rawlog", &raw_log_val, NULL) == 0) { + avro_value_t null_vale; + avro_value_set_branch(&raw_log_val, 0, &null_vale); + avro_value_set_null(&null_vale); + } + + std::string data_path = "./be/test/exec/test_data/avro_scanner/tmp/avro_complex_data.json"; + write_avro_data(avro_helper, data_path); + + std::vector types; + types.emplace_back(TYPE_JSON); + + std::vector ranges; + TBrokerRangeDesc range; + range.format_type = TFileFormatType::FORMAT_AVRO; + range.__isset.strip_outer_array = false; + range.__isset.jsonpaths = true; + range.jsonpaths = R"(["$"])"; + range.__isset.json_root = false; + range.__set_path(data_path); + ranges.emplace_back(range); + + { + config::avro_ignore_union_type_tag = false; + + auto scanner = create_avro_scanner(types, ranges, {"jsontype"}, avro_helper.schema_text); + Status st = scanner->open(); + ASSERT_TRUE(st.ok()); + + auto st2 = scanner->get_next(); + ASSERT_TRUE(st2.ok()); + + ChunkPtr chunk = st2.value(); + EXPECT_EQ(1, chunk->num_columns()); + EXPECT_EQ(1, chunk->num_rows()); + const JsonValue* json = chunk->get(0)[0].get_json(); + EXPECT_EQ( + "{\"eventparams\": {\"array\": [\"abc\", \"def\"]}, \"eventsignature\": null, \"id\": \"12345\", " + "\"rawlog\": null}", + json->to_string_uncheck()); + } + + { + config::avro_ignore_union_type_tag = true; + + auto scanner = create_avro_scanner(types, ranges, {"jsontype"}, avro_helper.schema_text); + Status st = scanner->open(); + ASSERT_TRUE(st.ok()); + + auto st2 = scanner->get_next(); + ASSERT_TRUE(st2.ok()); + + ChunkPtr chunk = st2.value(); + EXPECT_EQ(1, chunk->num_columns()); + EXPECT_EQ(1, chunk->num_rows()); + const JsonValue* json = chunk->get(0)[0].get_json(); + EXPECT_EQ( + "{\"eventparams\": [\"abc\", \"def\"], \"eventsignature\": null, \"id\": \"12345\", \"rawlog\": " + "null}", + json->to_string_uncheck()); + } +} + TEST_F(AvroScannerTest, test_map_to_json) { std::string schema_path = "./be/test/exec/test_data/avro_scanner/avro_map_schema.json"; AvroHelper avro_helper; @@ -897,22 +1253,47 @@ TEST_F(AvroScannerTest, test_map_to_json) { range.__set_path(data_path); ranges.emplace_back(range); - auto scanner = create_avro_scanner(types, ranges, {"booleantype", "longtype", "doubletype", "maptype"}, - avro_helper.schema_text); - Status st = scanner->open(); - ASSERT_TRUE(st.ok()); - - auto st2 = scanner->get_next(); - ASSERT_TRUE(st2.ok()); + { + config::avro_ignore_union_type_tag = false; + + auto scanner = create_avro_scanner(types, ranges, {"booleantype", "longtype", "doubletype", "maptype"}, + avro_helper.schema_text); + Status st = scanner->open(); + ASSERT_TRUE(st.ok()); + + auto st2 = scanner->get_next(); + ASSERT_TRUE(st2.ok()); + + ChunkPtr chunk = st2.value(); + EXPECT_EQ(4, chunk->num_columns()); + EXPECT_EQ(1, chunk->num_rows()); + EXPECT_EQ(1, chunk->get(0)[0].get_int8()); + EXPECT_EQ(4294967296, chunk->get(0)[1].get_int64()); + EXPECT_FLOAT_EQ(1.234567, chunk->get(0)[2].get_double()); + const JsonValue* json = chunk->get(0)[3].get_json(); + EXPECT_EQ("{\"ele1\": 4294967297, \"ele2\": 4294967298}", json->to_string_uncheck()); + } - ChunkPtr chunk = st2.value(); - EXPECT_EQ(4, chunk->num_columns()); - EXPECT_EQ(1, chunk->num_rows()); - EXPECT_EQ(1, chunk->get(0)[0].get_int8()); - EXPECT_EQ(4294967296, chunk->get(0)[1].get_int64()); - EXPECT_FLOAT_EQ(1.234567, chunk->get(0)[2].get_double()); - const JsonValue* json = chunk->get(0)[3].get_json(); - EXPECT_EQ("{\"ele1\": 4294967297, \"ele2\": 4294967298}", json->to_string_uncheck()); + { + config::avro_ignore_union_type_tag = true; + + auto scanner = create_avro_scanner(types, ranges, {"booleantype", "longtype", "doubletype", "maptype"}, + avro_helper.schema_text); + Status st = scanner->open(); + ASSERT_TRUE(st.ok()); + + auto st2 = scanner->get_next(); + ASSERT_TRUE(st2.ok()); + + ChunkPtr chunk = st2.value(); + EXPECT_EQ(4, chunk->num_columns()); + EXPECT_EQ(1, chunk->num_rows()); + EXPECT_EQ(1, chunk->get(0)[0].get_int8()); + EXPECT_EQ(4294967296, chunk->get(0)[1].get_int64()); + EXPECT_FLOAT_EQ(1.234567, chunk->get(0)[2].get_double()); + const JsonValue* json = chunk->get(0)[3].get_json(); + EXPECT_EQ("{\"ele1\": 4294967297, \"ele2\": 4294967298}", json->to_string_uncheck()); + } } TEST_F(AvroScannerTest, test_root_array) { diff --git a/be/test/exec/test_data/avro_scanner/avro_basic_schema.json b/be/test/exec/test_data/avro_scanner/avro_basic_schema.json index a10c993811dc5..289c15ec8c06f 100644 --- a/be/test/exec/test_data/avro_scanner/avro_basic_schema.json +++ b/be/test/exec/test_data/avro_scanner/avro_basic_schema.json @@ -3,6 +3,7 @@ "name": "basic", "fields" : [ {"name": "booleantype", "type" : "boolean"}, + {"name": "inttype", "type": "int"}, {"name": "longtype", "type": "long"}, {"name": "doubletype", "type": "double"}, {"name": "stringtype", "type": "string"},