From 8a52b2c52402df8e0a174c5a44507237e3055aba Mon Sep 17 00:00:00 2001 From: Adam Lefevre Date: Tue, 9 Apr 2024 23:28:43 -0500 Subject: [PATCH] make tests actually check for equality and general clean up --- core/Cargo.toml | 1 - core/src/decoders.rs | 296 +++++++++++------- core/src/error.rs | 14 +- core/src/lib.rs | 90 +++--- core/src/pg_schema.rs | 255 +++++++-------- core/tests/decode_integration_tests.rs | 138 +++++--- core/tests/decoding/binary.schema | 1 - core/tests/decoding/binary_nullable.schema | 1 - core/tests/decoding/bool.schema | 2 +- core/tests/decoding/bool_nullable.schema | 2 +- core/tests/decoding/date32.schema | 2 +- core/tests/decoding/date32_nullable.schema | 2 +- core/tests/decoding/duration_us.schema | 2 +- .../decoding/duration_us_nullable.schema | 2 +- core/tests/decoding/float32.schema | 2 +- core/tests/decoding/float32_nullable.schema | 2 +- core/tests/decoding/float64.schema | 2 +- core/tests/decoding/float64_nullable.schema | 2 +- core/tests/decoding/int16.schema | 2 +- core/tests/decoding/int16_nullable.schema | 2 +- core/tests/decoding/int32.schema | 2 +- core/tests/decoding/int32_nullable.schema | 2 +- core/tests/decoding/int64.schema | 2 +- core/tests/decoding/int64_nullable | 2 +- core/tests/decoding/int64_nullable.schema | 2 +- core/tests/decoding/large_binary.schema | 1 + .../decoding/large_binary_nullable.schema | 1 + core/tests/decoding/large_string.schema | 1 + .../decoding/large_string_nullable.schema | 1 + core/tests/decoding/numeric.schema | 2 +- core/tests/decoding/numeric_nullable.schema | 2 +- core/tests/decoding/string.schema | 1 - core/tests/decoding/string_nullable.schema | 1 - core/tests/decoding/time_us.schema | 2 +- core/tests/decoding/time_us_nullable.schema | 2 +- core/tests/decoding/timestamp_us_notz.schema | 2 +- .../timestamp_us_notz_nullable.schema | 2 +- core/tests/decoding/timestamp_us_tz.schema | 2 +- .../decoding/timestamp_us_tz_nullable.schema | 2 +- py/src/pg_schema.rs | 1 + 40 files changed, 451 insertions(+), 402 deletions(-) delete mode 100644 core/tests/decoding/binary.schema delete mode 100644 core/tests/decoding/binary_nullable.schema create mode 100644 core/tests/decoding/large_binary.schema create mode 100644 core/tests/decoding/large_binary_nullable.schema create mode 100644 core/tests/decoding/large_string.schema create mode 100644 core/tests/decoding/large_string_nullable.schema delete mode 100644 core/tests/decoding/string.schema delete mode 100644 core/tests/decoding/string_nullable.schema diff --git a/core/Cargo.toml b/core/Cargo.toml index e458a8a..2e6d606 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -14,7 +14,6 @@ authors = ["Adrian Garcia Badaracco "] [dependencies] bytes = "^1.4.0" arrow-schema = ">=46.0.0" -arrow = ">=51.0.0" enum_dispatch = "0.3.11" anyhow = "1.0.70" thiserror = "1.0.40" diff --git a/core/src/decoders.rs b/core/src/decoders.rs index f8dcd93..36245f9 100644 --- a/core/src/decoders.rs +++ b/core/src/decoders.rs @@ -1,51 +1,44 @@ #![allow(clippy::redundant_closure_call)] -use arrow_array::{self, ArrayRef}; -use std::fmt::Debug; -use std::sync::Arc; use crate::encoders::{PG_BASE_DATE_OFFSET, PG_BASE_TIMESTAMP_OFFSET_US}; -use arrow_array::types::GenericBinaryType; use arrow_array::builder::GenericByteBuilder; +use arrow_array::types::GenericBinaryType; +use arrow_array::{self, ArrayRef}; use arrow_array::{ BooleanArray, Date32Array, DurationMicrosecondArray, Float32Array, Float64Array, - GenericStringArray, Int16Array, Int32Array, Int64Array, - Time64MicrosecondArray, TimestampMicrosecondArray + GenericStringArray, Int16Array, Int32Array, Int64Array, Time64MicrosecondArray, + TimestampMicrosecondArray, }; +use std::fmt::Debug; +use std::sync::Arc; use crate::error::ErrorKind; use crate::pg_schema::{PostgresSchema, PostgresType}; -pub(crate) struct ConsumableBuf<'a> { +pub(crate) struct BufferView<'a> { inner: &'a [u8], consumed: usize, } -impl Debug for ConsumableBuf<'_> { +impl Debug for BufferView<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{:?}", &self.inner[self.consumed..]) } } -// const used for stringifying postgres decimals -const DEC_DIGITS: i16 = 4; -// const used for determining sign of numeric -const NUMERIC_NEG: i16 = 0x4000; - -impl ConsumableBuf<'_> { - pub fn new(inner: &'_ [u8]) -> ConsumableBuf<'_> { - ConsumableBuf { inner, consumed: 0 } +impl BufferView<'_> { + pub fn new(inner: &'_ [u8]) -> BufferView<'_> { + BufferView { inner, consumed: 0 } } pub fn consume_into_u32(&mut self) -> Result { if self.consumed + 4 > self.inner.len() { - return Err(ErrorKind::DataSize { - found: self.inner.len() - self.consumed, - expected: 4, - }); + return Err(ErrorKind::IncompleteData); } let res = u32::from_be_bytes( self.inner[self.consumed..self.consumed + 4] - .try_into().unwrap() + .try_into() + .unwrap(), ); self.consumed += 4; Ok(res) @@ -53,14 +46,12 @@ impl ConsumableBuf<'_> { pub fn consume_into_u16(&mut self) -> Result { if self.consumed + 2 > self.inner.len() { - return Err(ErrorKind::DataSize { - found: self.inner.len() - self.consumed, - expected: 2, - }); + return Err(ErrorKind::IncompleteData); } let res = u16::from_be_bytes( self.inner[self.consumed..self.consumed + 2] - .try_into().unwrap() + .try_into() + .unwrap(), ); self.consumed += 2; Ok(res) @@ -68,18 +59,12 @@ impl ConsumableBuf<'_> { pub fn consume_into_vec_n(&mut self, n: usize) -> Result, ErrorKind> { if self.consumed + n > self.inner.len() { - return Err(ErrorKind::DataSize { - found: self.inner.len() - self.consumed, - expected: n, - }); + return Err(ErrorKind::IncompleteData); } let data = self.inner[self.consumed..self.consumed + n].to_vec(); self.consumed += n; if data.len() != n { - return Err(ErrorKind::DataSize { - found: data.len(), - expected: n, - }); + return Err(ErrorKind::IncompleteData); } Ok(data) } @@ -98,29 +83,25 @@ impl ConsumableBuf<'_> { } pub(crate) trait Decode { - fn decode(&mut self, buf: &mut ConsumableBuf) -> Result<(), ErrorKind>; - fn finish(&mut self, column_len: usize) -> Result; + fn decode(&mut self, buf: &mut BufferView) -> Result<(), ErrorKind>; + fn finish(&mut self, column_len: usize) -> ArrayRef; fn column_len(&self) -> usize; - fn name(&self) -> &str; + fn name(&self) -> String; } macro_rules! impl_decode { ($struct_name:ident, $size:expr, $transform:expr, $array_kind:ident) => { impl Decode for $struct_name { - fn decode(&mut self, buf: &mut ConsumableBuf<'_>) -> Result<(), ErrorKind> { + fn decode(&mut self, buf: &mut BufferView<'_>) -> Result<(), ErrorKind> { let field_size = buf.consume_into_u32()?; if field_size == u32::MAX { self.arr.push(None); return Ok(()); } if field_size != $size { - return Err(ErrorKind::DataSize { - found: field_size as usize, - expected: $size, - }); + return Err(ErrorKind::IncompleteData); } - let data = buf.consume_into_vec_n(field_size as usize)?; // Unwrap is safe here because have checked the field size is the expected size // above @@ -131,19 +112,19 @@ macro_rules! impl_decode { Ok(()) } - fn name(&self) -> &str { - &self.name - } - fn column_len(&self) -> usize { self.arr.len() } - fn finish(&mut self, column_len: usize) -> Result { + fn name(&self) -> String { + self.name.to_string() + } + + fn finish(&mut self, column_len: usize) -> ArrayRef { let mut data = std::mem::take(&mut self.arr); data.resize(column_len, None); let array = Arc::new($array_kind::from(data)); - Ok(array as ArrayRef) + array as ArrayRef } } }; @@ -152,7 +133,7 @@ macro_rules! impl_decode { macro_rules! impl_decode_fallible { ($struct_name:ident, $size:expr, $transform:expr, $array_kind:ident) => { impl Decode for $struct_name { - fn decode(&mut self, buf: &mut ConsumableBuf<'_>) -> Result<(), ErrorKind> { + fn decode(&mut self, buf: &mut BufferView<'_>) -> Result<(), ErrorKind> { let field_size = buf.consume_into_u32()?; if field_size == u32::MAX { @@ -161,10 +142,7 @@ macro_rules! impl_decode_fallible { } if field_size != $size { - return Err(ErrorKind::DataSize { - found: field_size as usize, - expected: $size, - }); + return Err(ErrorKind::IncompleteData); } let data = buf.consume_into_vec_n(field_size as usize)?; @@ -189,19 +167,19 @@ macro_rules! impl_decode_fallible { Ok(()) } - fn name(&self) -> &str { - &self.name - } - fn column_len(&self) -> usize { self.arr.len() } - fn finish(&mut self, column_len: usize) -> Result { + fn name(&self) -> String { + self.name.to_string() + } + + fn finish(&mut self, column_len: usize) -> ArrayRef { let mut data = std::mem::take(&mut self.arr); data.resize(column_len, None); let array = Arc::new($array_kind::from(data)); - Ok(array as ArrayRef) + array as ArrayRef } } }; @@ -210,7 +188,7 @@ macro_rules! impl_decode_fallible { macro_rules! impl_decode_variable_size { ($struct_name:ident, $transform:expr, $extra_bytes:expr, $array_kind:ident, $offset_size:ident) => { impl Decode for $struct_name { - fn decode(&mut self, buf: &mut ConsumableBuf<'_>) -> Result<(), ErrorKind> { + fn decode(&mut self, buf: &mut BufferView<'_>) -> Result<(), ErrorKind> { let field_size = buf.consume_into_u32()?; if field_size == u32::MAX { self.arr.push(None); @@ -218,10 +196,7 @@ macro_rules! impl_decode_variable_size { } if field_size > buf.remaining() as u32 { - return Err(ErrorKind::DataSize { - found: buf.remaining() as usize, - expected: field_size as usize, - }); + return Err(ErrorKind::IncompleteData); } // Consume and any extra data that is not part of the field @@ -251,24 +226,25 @@ macro_rules! impl_decode_variable_size { Ok(()) } - fn name(&self) -> &str { - &self.name - } - fn column_len(&self) -> usize { self.arr.len() } - fn finish(&mut self, column_len: usize) -> Result { + fn name(&self) -> String { + self.name.to_string() + } + + fn finish(&mut self, column_len: usize) -> ArrayRef { let mut data = std::mem::take(&mut self.arr); data.resize(column_len, None); let array = Arc::new($array_kind::<$offset_size>::from(data)); - Ok(array as ArrayRef) + array as ArrayRef } } }; } +#[allow(dead_code)] pub struct BooleanDecoder { name: String, arr: Vec>, @@ -341,6 +317,50 @@ impl_decode_fallible!( TimestampMicrosecondArray ); +pub struct TimestampTzMicrosecondDecoder { + name: String, + arr: Vec>, + timezone: String, +} + +impl Decode for TimestampTzMicrosecondDecoder { + fn decode(&mut self, buf: &mut BufferView<'_>) -> Result<(), ErrorKind> { + let field_size = buf.consume_into_u32()?; + if field_size == u32::MAX { + self.arr.push(None); + return Ok(()); + } + + if field_size != 8 { + return Err(ErrorKind::IncompleteData); + } + + let data = buf.consume_into_vec_n(field_size as usize)?; + let timestamp_us = i64::from_be_bytes(data.try_into().unwrap()); + let timestamp_us = convert_pg_timestamp_to_arrow_timestamp_microseconds(timestamp_us)?; + self.arr.push(Some(timestamp_us)); + + Ok(()) + } + + fn column_len(&self) -> usize { + self.arr.len() + } + + fn name(&self) -> String { + self.name.to_string() + } + + fn finish(&mut self, column_len: usize) -> ArrayRef { + let mut data = std::mem::take(&mut self.arr); + data.resize(column_len, None); + let array = Arc::new( + TimestampMicrosecondArray::from(data).with_timezone(self.timezone.to_string()), + ); + array as ArrayRef + } +} + /// Convert Postgres dates (days since 2000-01-01) to Arrow dates (days since 1970-01-01) #[inline(always)] fn convert_pg_date_to_arrow_date(date: i32) -> Result { @@ -453,7 +473,7 @@ pub struct BinaryDecoder { } impl Decode for BinaryDecoder { - fn decode(&mut self, buf: &mut ConsumableBuf<'_>) -> Result<(), ErrorKind> { + fn decode(&mut self, buf: &mut BufferView<'_>) -> Result<(), ErrorKind> { let field_size = buf.consume_into_u32()?; if field_size == u32::MAX { self.arr.push(None); @@ -466,27 +486,26 @@ impl Decode for BinaryDecoder { Ok(()) } - fn name(&self) -> &str { - &self.name - } - fn column_len(&self) -> usize { self.arr.len() } - fn finish(&mut self, column_len: usize) -> Result { + fn name(&self) -> String { + self.name.to_string() + } + + fn finish(&mut self, column_len: usize) -> ArrayRef { let mut data = std::mem::take(&mut self.arr); data.resize(column_len, None); - let mut builder: GenericByteBuilder> = GenericByteBuilder::new(); + let mut builder: GenericByteBuilder> = GenericByteBuilder::new(); for v in data { match v { Some(v) => builder.append_value(v), None => builder.append_null(), } } - let array = Arc::new(builder.finish()); - Ok(array as ArrayRef) + Arc::new(builder.finish()) as ArrayRef } } @@ -510,6 +529,11 @@ impl_decode_variable_size!( i64 ); +// const used for stringifying postgres decimals +const DEC_DIGITS: i16 = 4; +// const used for determining sign of numeric +const NUMERIC_NEG: i16 = 0x4000; + /// Logic ported from src/backend/utils/adt/numeric.c:get_str_from_var fn parse_pg_decimal_to_string(data: Vec) -> Result { // Decimals will be decoded to strings since rust does not have a ubiquitos @@ -520,7 +544,10 @@ fn parse_pg_decimal_to_string(data: Vec) -> Result { let weight = i16::from_be_bytes(data[2..4].try_into().unwrap()); let sign = i16::from_be_bytes(data[4..6].try_into().unwrap()); let scale = i16::from_be_bytes(data[6..8].try_into().unwrap()); - let digits: Vec = data[8..8 + ndigits as usize].chunks(2).map(|c| i16::from_be_bytes(c.try_into().unwrap())).collect(); + let digits: Vec = data[8..8 + ndigits as usize] + .chunks(2) + .map(|c| i16::from_be_bytes(c.try_into().unwrap())) + .collect(); // the number of digits before the decimal place let pre_decimal = (weight + 1) * DEC_DIGITS; @@ -545,26 +572,30 @@ fn parse_pg_decimal_to_string(data: Vec) -> Result { // Otherwise put digits in the decimal string by computing the value for each place in decimal } else { while digits_index <= weight { - let mut dig = if digits_index < ndigits { digits[digits_index as usize] } else { 0 }; - let mut putit = digits_index > 0; - - /* below unwraps too: + let mut dig = if digits_index < ndigits { + digits[digits_index as usize] + } else { + 0 + }; + let mut putit = digits_index > 0; + + /* below unwraps too: d1 = dig / 1000; - dig -= d1 * 1000; - putit |= (d1 > 0); - if (putit) - *cp++ = d1 + '0'; - d1 = dig / 100; - dig -= d1 * 100; - putit |= (d1 > 0); - if (putit) - *cp++ = d1 + '0'; - d1 = dig / 10; - dig -= d1 * 10; - putit |= (d1 > 0); - if (putit) - *cp++ = d1 + '0'; - *cp++ = dig + '0'; + dig -= d1 * 1000; + putit |= (d1 > 0); + if (putit) + *cp++ = d1 + '0'; + d1 = dig / 100; + dig -= d1 * 100; + putit |= (d1 > 0); + if (putit) + *cp++ = d1 + '0'; + d1 = dig / 10; + dig -= d1 * 10; + putit |= (d1 > 0); + if (putit) + *cp++ = d1 + '0'; + *cp++ = dig + '0'; */ let mut place = 1000; @@ -572,7 +603,9 @@ fn parse_pg_decimal_to_string(data: Vec) -> Result { let d1 = dig / place; dig -= d1 * place; putit |= d1 > 0; - if putit { decimal.push(d1 as u8 + b'0') } + if putit { + decimal.push(d1 as u8 + b'0') + } place /= 10; } decimal.push(dig as u8 + b'0'); @@ -586,8 +619,12 @@ fn parse_pg_decimal_to_string(data: Vec) -> Result { } let mut i = 0; - while i < scale { - let mut dig = if digits_index >= 0 && digits_index < ndigits { digits[digits_index as usize] } else { 0 }; + while i < scale { + let mut dig = if digits_index >= 0 && digits_index < ndigits { + digits[digits_index as usize] + } else { + 0 + }; let mut place = 1000; // Same as the loop above but no putit since all digits prior to the // scale-TH digit are significant @@ -618,7 +655,6 @@ impl_decode_variable_size!( i64 ); - // pub enum Decoder { Boolean(BooleanDecoder), @@ -629,6 +665,7 @@ pub enum Decoder { Float64(Float64Decoder), Decimal(DecimalDecoder), TimestampMicrosecond(TimestampMicrosecondDecoder), + TimestampTzMicrosecond(TimestampTzMicrosecondDecoder), Date32(Date32Decoder), Time64Microsecond(Time64MicrosecondDecoder), DurationMicrosecond(DurationMicrosecondDecoder), @@ -666,15 +703,23 @@ impl Decoder { name: name.to_string(), arr: vec![], }), - PostgresType::Decimal => { - Decoder::Decimal(DecimalDecoder { name: name.to_string(), arr: vec![] }) - } + PostgresType::Decimal => Decoder::Decimal(DecimalDecoder { + name: name.to_string(), + arr: vec![], + }), PostgresType::Timestamp => { Decoder::TimestampMicrosecond(TimestampMicrosecondDecoder { name: name.to_string(), arr: vec![], }) } + PostgresType::TimestampTz(ref timezone) => { + Decoder::TimestampTzMicrosecond(TimestampTzMicrosecondDecoder { + name: name.to_string(), + arr: vec![], + timezone: timezone.to_string(), + }) + } PostgresType::Date => Decoder::Date32(Date32Decoder { name: name.to_string(), arr: vec![], @@ -689,10 +734,12 @@ impl Decoder { arr: vec![], }) } - PostgresType::Text | PostgresType::Char | PostgresType::Json => Decoder::String(StringDecoder { - name: name.to_string(), - arr: vec![], - }), + PostgresType::Text | PostgresType::Char | PostgresType::Json => { + Decoder::String(StringDecoder { + name: name.to_string(), + arr: vec![], + }) + } PostgresType::Bytea => Decoder::Binary(BinaryDecoder { name: name.to_string(), arr: vec![], @@ -702,7 +749,7 @@ impl Decoder { .collect() } - pub(crate) fn apply(&mut self, buf: &mut ConsumableBuf) -> Result<(), ErrorKind> { + pub(crate) fn apply(&mut self, buf: &mut BufferView) -> Result<(), ErrorKind> { match *self { Decoder::Boolean(ref mut decoder) => decoder.decode(buf), Decoder::Int16(ref mut decoder) => decoder.decode(buf), @@ -712,6 +759,7 @@ impl Decoder { Decoder::Float64(ref mut decoder) => decoder.decode(buf), Decoder::Decimal(ref mut decoder) => decoder.decode(buf), Decoder::TimestampMicrosecond(ref mut decoder) => decoder.decode(buf), + Decoder::TimestampTzMicrosecond(ref mut decoder) => decoder.decode(buf), Decoder::Date32(ref mut decoder) => decoder.decode(buf), Decoder::Time64Microsecond(ref mut decoder) => decoder.decode(buf), Decoder::DurationMicrosecond(ref mut decoder) => decoder.decode(buf), @@ -721,6 +769,26 @@ impl Decoder { } } + pub(crate) fn name(&self) -> String { + match *self { + Decoder::Boolean(ref decoder) => decoder.name(), + Decoder::Int16(ref decoder) => decoder.name(), + Decoder::Int32(ref decoder) => decoder.name(), + Decoder::Int64(ref decoder) => decoder.name(), + Decoder::Float32(ref decoder) => decoder.name(), + Decoder::Float64(ref decoder) => decoder.name(), + Decoder::Decimal(ref decoder) => decoder.name(), + Decoder::TimestampMicrosecond(ref decoder) => decoder.name(), + Decoder::TimestampTzMicrosecond(ref decoder) => decoder.name(), + Decoder::Date32(ref decoder) => decoder.name(), + Decoder::Time64Microsecond(ref decoder) => decoder.name(), + Decoder::DurationMicrosecond(ref decoder) => decoder.name(), + Decoder::String(ref decoder) => decoder.name(), + Decoder::Binary(ref decoder) => decoder.name(), + Decoder::Jsonb(ref decoder) => decoder.name(), + } + } + pub(crate) fn column_len(&self) -> usize { match *self { Decoder::Boolean(ref decoder) => decoder.column_len(), @@ -731,6 +799,7 @@ impl Decoder { Decoder::Float64(ref decoder) => decoder.column_len(), Decoder::Decimal(ref decoder) => decoder.column_len(), Decoder::TimestampMicrosecond(ref decoder) => decoder.column_len(), + Decoder::TimestampTzMicrosecond(ref decoder) => decoder.column_len(), Decoder::Date32(ref decoder) => decoder.column_len(), Decoder::Time64Microsecond(ref decoder) => decoder.column_len(), Decoder::DurationMicrosecond(ref decoder) => decoder.column_len(), @@ -740,7 +809,7 @@ impl Decoder { } } - pub(crate) fn finish(&mut self, column_len: usize) -> Result { + pub(crate) fn finish(&mut self, column_len: usize) -> ArrayRef { match *self { Decoder::Boolean(ref mut decoder) => decoder.finish(column_len), Decoder::Int16(ref mut decoder) => decoder.finish(column_len), @@ -750,6 +819,7 @@ impl Decoder { Decoder::Float64(ref mut decoder) => decoder.finish(column_len), Decoder::Decimal(ref mut decoder) => decoder.finish(column_len), Decoder::TimestampMicrosecond(ref mut decoder) => decoder.finish(column_len), + Decoder::TimestampTzMicrosecond(ref mut decoder) => decoder.finish(column_len), Decoder::Date32(ref mut decoder) => decoder.finish(column_len), Decoder::Time64Microsecond(ref mut decoder) => decoder.finish(column_len), Decoder::DurationMicrosecond(ref mut decoder) => decoder.finish(column_len), diff --git a/core/src/error.rs b/core/src/error.rs index 209875a..dbfcc57 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -1,4 +1,4 @@ -use arrow_schema::{DataType, ArrowError}; +use arrow_schema::{ArrowError, DataType}; use thiserror::Error; use crate::pg_schema::PostgresType; @@ -46,17 +46,13 @@ pub enum ErrorKind { Decode { reason: String, name: String }, #[error("Got invalid binary file header {bytes:?}")] InvalidBinaryHeader { bytes: [u8; 11] }, - #[error("Got invalid binary file trailer {bytes:?}")] - InvalidBinaryTrailer { bytes: [u8; 2] }, - #[error("Reached EOF before with incomplete data. Extra data: {remaining_bytes:?}")] + #[error("Reached EOF in the middle of a tuple. partial tuple: {remaining_bytes:?}")] IncompleteDecode { remaining_bytes: Vec }, - #[error("Got data of the wrong size: got: {found}, expected: {expected}")] - DataSize { found: usize, expected: usize }, + #[error("Expected data size was not found")] + IncompleteData, #[error("Invalid column specification: {spec}")] InvalidColumnSpec { spec: String }, - #[error("Extra data found at the end of the file")] - ExtraDataFound, - #[error("Invalid column type: {typ}")] + #[error("Invalid column type found while parsing schema: {typ}")] UnsupportedColumnType { typ: String }, #[error("Got an error in an IO Operation: {io_error:?}")] IOError { io_error: std::io::Error }, diff --git a/core/src/lib.rs b/core/src/lib.rs index 59839ea..f107497 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,19 +1,19 @@ use std::collections::HashMap; -use std::io::{Seek, BufRead}; +use std::io::{BufRead, Seek}; use arrow_array::{Array, RecordBatch}; use arrow_schema::Fields; use arrow_schema::Schema; -use bytes::{BufMut, BytesMut, Buf}; +use bytes::{Buf, BufMut, BytesMut}; use error::ErrorKind; +pub mod decoders; pub mod encoders; pub mod error; pub mod pg_schema; -pub mod decoders; +use crate::decoders::{BufferView, Decoder}; use crate::encoders::{BuildEncoder, Encode, EncoderBuilder}; -use crate::decoders::{Decoder, ConsumableBuf}; use crate::pg_schema::PostgresSchema; const HEADER_MAGIC_BYTES: &[u8] = b"PGCOPY\n\xff\r\n\0"; @@ -155,9 +155,9 @@ impl ArrowToPostgresBinaryEncoder { enum BatchDecodeResult { Batch(RecordBatch), - Incomplete, + Incomplete(usize), Error(ErrorKind), - PartialConsume{batch: RecordBatch, consumed: usize}, + PartialConsume { batch: RecordBatch, consumed: usize }, } pub struct PostgresBinaryToArrowDecoder { @@ -195,9 +195,7 @@ impl PostgresBinaryToArrowDecoder { Ok(()) } - pub fn decode_batches( - &mut self, - ) -> Result, ErrorKind> { + pub fn decode_batches(&mut self) -> Result, ErrorKind> { let mut batches = Vec::new(); let mut buf = BytesMut::with_capacity(self.capacity); let mut eof = false; @@ -243,15 +241,13 @@ impl PostgresBinaryToArrowDecoder { if !buf.is_empty() { match self.decode_batch(&mut buf) { BatchDecodeResult::Batch(batch) => batches.push(batch), - BatchDecodeResult::Incomplete => { + BatchDecodeResult::Error(e) => return Err(e), + BatchDecodeResult::Incomplete(consumed) + | BatchDecodeResult::PartialConsume { batch: _, consumed } => { return Err(ErrorKind::IncompleteDecode { - remaining_bytes: buf.to_vec(), + remaining_bytes: buf[consumed..].to_vec(), }) } - BatchDecodeResult::Error(e) => return Err(e), - BatchDecodeResult::PartialConsume{ batch: _, consumed: _} => { - return Err(ErrorKind::ExtraDataFound) - } } } self.state = EncoderState::Finished; @@ -261,27 +257,23 @@ impl PostgresBinaryToArrowDecoder { BatchDecodeResult::Batch(batch) => { batches.push(batch); buf.clear() - }, + } // If we receive a PartialConsume BatchDecodeResult, store the batches we did // manage to decode and continue reading from the source with the remaining // data from the previous read in the buffer. - BatchDecodeResult::PartialConsume{ batch, consumed } => { + BatchDecodeResult::PartialConsume { batch, consumed } => { batches.push(batch); let old_buf = buf; buf = BytesMut::with_capacity(self.capacity); buf.put(&old_buf[consumed..]); - }, + } // If we receive an Incomplete BatchDecodeResult, increase the capacity of the // buffer reading more data from the source and try to decode the batch again. - BatchDecodeResult::Incomplete => { + BatchDecodeResult::Incomplete(_) => { // println!("Increasing capacity to {}", self.capacity * 2); // increase the capacity attribute of the decoder by a factor of 2. + buf.reserve(self.capacity); self.capacity *= 2; - let old_buf = buf; - // create a new buffer with the new capacity. - buf = BytesMut::with_capacity(self.capacity); - // put all the data from the old buffer into the new buffer. - buf.put(old_buf); } BatchDecodeResult::Error(e) => return Err(e), } @@ -294,8 +286,8 @@ impl PostgresBinaryToArrowDecoder { // ensure that the decoder is in the correct state befpre proceeding. assert_eq!(self.state, EncoderState::Encoding); - // create a new ConsumableBuf from the buffer. - let mut local_buf = ConsumableBuf::new(buf); + // create a new BufferView from the buffer. + let mut local_buf = BufferView::new(buf); // Keep track of the number of rows in the batch. let mut rows = 0; @@ -321,7 +313,7 @@ impl PostgresBinaryToArrowDecoder { // If the tuple length is 0xffff we have reached the end of the // snapshot and we can break the loop and finish the batch. if tuple_len == 0xffff { - break + break; } // Each iteration of the loop reads a column from the tuple using the @@ -331,7 +323,7 @@ impl PostgresBinaryToArrowDecoder { // return a BatchDecodeResult::Incomplete. if local_buf.remaining() == 0 && rows == 0 { // println!("Incomplete no data remaining"); - return BatchDecodeResult::Incomplete; + return BatchDecodeResult::Incomplete(local_buf.consumed()); // If local_buf has been fully consumed and we have read some rows, // return a BatchDecodeResult::PartialConsume, passing the number of bytes // consumed before reading the tuple to the caller so it can know how much data @@ -340,32 +332,33 @@ impl PostgresBinaryToArrowDecoder { // println!("Partial consume no data remaining"); // println!("Finishing batch. Rows: {}", rows); return match self.finish_batch() { - Ok(batch) => BatchDecodeResult::PartialConsume{ batch, consumed}, + Ok(batch) => BatchDecodeResult::PartialConsume { batch, consumed }, Err(e) => BatchDecodeResult::Error(e), - } + }; } // Apply the decoder to the local_buf. Cosume the data from the buffer as needed match decoder.apply(&mut local_buf) { // If the decoder was able to decode the data, continue to the next column. Ok(_) => {} - // If we receive a DataSize error, we have reached the end of the data in - // the buffer. If we have not finished the current tuple - Err(ErrorKind::DataSize { found: _, expected: _ }) => { + // If we receive a IncompleteData error, we have reached the end of the data in + // the buffer. If we have decoded some tuples, return a BatchDecodeResult::PartialConsume, + // otherwise return a BatchDecodeResult::Incomplete. + Err(ErrorKind::IncompleteData) => { // If we have not read any rows, return a BatchDecodeResult::Incomplete. if rows == 0 { // println!("Incomplete mid-batch"); - return BatchDecodeResult::Incomplete; + return BatchDecodeResult::Incomplete(local_buf.consumed()); } else { // If we have read some rows, return a BatchDecodeResult::PartialConsume, // println!("Partial consume mid-batch"); // println!("Finishing batch. Rows: {}", rows); return match self.finish_batch() { - Ok(batch) => BatchDecodeResult::PartialConsume{ batch, consumed}, + Ok(batch) => BatchDecodeResult::PartialConsume { batch, consumed }, Err(e) => BatchDecodeResult::Error(e), - } + }; } - }, - Err(e) => return BatchDecodeResult::Error(e) + } + Err(e) => return BatchDecodeResult::Error(e), } } // Increment the number of rows in the batch. @@ -378,34 +371,27 @@ impl PostgresBinaryToArrowDecoder { Ok(batch) => BatchDecodeResult::Batch(batch), Err(e) => BatchDecodeResult::Error(e), } - } - fn finish_batch( - &mut self, - ) -> Result { - let mut columns: Vec> = Vec::new(); + fn finish_batch(&mut self) -> Result { // Find the mininum length column in the decoders. These can be different if // we are in a partial consume state. We will truncate the columns to the length // of the shortest column and pick up the lost data in the next batch. let column_len = self.decoders.iter().map(|d| d.column_len()).min().unwrap(); // For each decoder call its finish method to coerce the data into an Arrow array. // and append the array to the columns vector. - for decoder in self.decoders.iter_mut() { - match decoder.finish(column_len) { - Ok(array) => columns.push(array), - Err(e) => return Err(e) - } - } + let columns = self + .decoders + .iter_mut() + .map(|decoder| decoder.finish(column_len)) + .collect(); // Create a new RecordBatch from the columns vector and return it. - let record_batch = - RecordBatch::try_new(self.schema.clone().into(), columns)?; + let record_batch = RecordBatch::try_new(self.schema.clone().into(), columns)?; Ok(record_batch) } } - #[cfg(test)] mod tests { use std::{collections::HashMap, sync::Arc}; diff --git a/core/src/pg_schema.rs b/core/src/pg_schema.rs index 515dd7d..5a00c0a 100644 --- a/core/src/pg_schema.rs +++ b/core/src/pg_schema.rs @@ -1,6 +1,6 @@ -use arrow_schema::{Schema, Field, DataType, TimeUnit, SchemaRef}; -use std::sync::Arc; use crate::error::ErrorKind; +use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use std::sync::Arc; #[derive(Debug, Clone, PartialEq)] pub enum TypeSize { @@ -25,6 +25,7 @@ pub enum PostgresType { Date, Time, Timestamp, + TimestampTz(String), Interval, List(Box), } @@ -46,6 +47,7 @@ impl PostgresType { PostgresType::Date => TypeSize::Fixed(4), PostgresType::Time => TypeSize::Fixed(8), PostgresType::Timestamp => TypeSize::Fixed(8), + PostgresType::TimestampTz(_) => TypeSize::Fixed(8), PostgresType::Interval => TypeSize::Fixed(12), PostgresType::Decimal => TypeSize::Variable, PostgresType::List(_) => TypeSize::Variable, @@ -68,6 +70,7 @@ impl PostgresType { PostgresType::Date => Some(1082), PostgresType::Time => Some(1083), PostgresType::Timestamp => Some(1114), + PostgresType::TimestampTz(_) => Some(1182), PostgresType::Interval => Some(1186), PostgresType::List(_) => None, } @@ -89,6 +92,7 @@ impl PostgresType { PostgresType::Date => "DATE".to_string(), PostgresType::Time => "TIME".to_string(), PostgresType::Timestamp => "TIMESTAMP".to_string(), + PostgresType::TimestampTz(_) => "TIMESTAMP WITH ZONE".to_string(), PostgresType::Interval => "INTERVAL".to_string(), PostgresType::List(inner) => { // arrays of structs and such are not supported @@ -98,28 +102,6 @@ impl PostgresType { }; Some(v) } - - pub fn from_name(name: &str) -> Option { - match name { - "BOOL" => Some(PostgresType::Bool), - "BYTEA" => Some(PostgresType::Bytea), - "INT8" => Some(PostgresType::Int8), - "INT2" => Some(PostgresType::Int2), - "INT4" => Some(PostgresType::Int4), - "CHAR" => Some(PostgresType::Char), - "TEXT" => Some(PostgresType::Text), - "JSON" => Some(PostgresType::Json), - "JSONB" => Some(PostgresType::Jsonb), - "FLOAT4" => Some(PostgresType::Float4), - "FLOAT8" => Some(PostgresType::Float8), - "DECIMAL" => Some(PostgresType::Decimal), - "DATE" => Some(PostgresType::Date), - "TIME" => Some(PostgresType::Time), - "TIMESTAMP" => Some(PostgresType::Timestamp), - "INTERVAL" => Some(PostgresType::Interval), - _ => None, - } - } } impl From for DataType { @@ -140,13 +122,15 @@ impl From for DataType { PostgresType::Date => DataType::Date32, PostgresType::Time => DataType::Time64(TimeUnit::Microsecond), PostgresType::Timestamp => DataType::Timestamp(TimeUnit::Microsecond, None), + PostgresType::TimestampTz(timezone) => { + DataType::Timestamp(TimeUnit::Microsecond, Some(timezone.into())) + } PostgresType::Interval => DataType::Duration(TimeUnit::Microsecond), PostgresType::List(_) => unimplemented!(), } } } - #[derive(Debug, Clone, PartialEq)] pub struct Column { pub data_type: PostgresType, @@ -154,109 +138,80 @@ pub struct Column { } impl Column { - pub fn from_parts(type_str: &str, nullable: &str) -> Result { + pub fn from_parts(type_str: &str, nullable: &str, timezone: String) -> Result { match type_str { - "boolean" => { - Ok(Column { - data_type: PostgresType::Bool, - nullable: nullable == "t", - }) - }, - "bytea" => { - Ok(Column { - data_type: PostgresType::Bytea, - nullable: nullable == "t", - }) - }, - "bigint" => { - Ok(Column { - data_type: PostgresType::Int8, - nullable: nullable == "t", - }) - }, - "smallint" => { - Ok(Column { - data_type: PostgresType::Int2, - nullable: nullable == "t", - }) - }, - "integer" => { - Ok(Column { - data_type: PostgresType::Int4, - nullable: nullable == "t", - }) - }, - "character" | "character varying" => { - Ok(Column { - data_type: PostgresType::Char, - nullable: nullable == "t", - }) - }, - "text" => { - Ok(Column { - data_type: PostgresType::Text, - nullable: nullable == "t", - }) - }, - "json" => { - Ok(Column { - data_type: PostgresType::Json, - nullable: nullable == "t", - }) - }, - "jsonb" => { - Ok(Column { - data_type: PostgresType::Jsonb, - nullable: nullable == "t", - }) - }, - "real" => { - Ok(Column { - data_type: PostgresType::Float4, - nullable: nullable == "t", - }) - }, - "double precision" => { - Ok(Column { - data_type: PostgresType::Float8, - nullable: nullable == "t", - }) - }, - "numeric" => { - Ok(Column { - data_type: PostgresType::Decimal, - nullable: nullable == "t", - }) - }, - "date" => { - Ok(Column { - data_type: PostgresType::Date, - nullable: nullable == "t", - }) - }, - "time" => { - Ok(Column { - data_type: PostgresType::Time, - nullable: nullable == "t", - }) - }, - "timestamp with time zone" | "timestamp without time zone" => { - Ok(Column { - data_type: PostgresType::Timestamp, - nullable: nullable == "t", - }) - }, - "interval" => { - Ok(Column { - data_type: PostgresType::Interval, - nullable: nullable == "t", - }) - }, - _ => { - Err(ErrorKind::UnsupportedColumnType { typ: type_str.to_string() }) - } + "boolean" => Ok(Column { + data_type: PostgresType::Bool, + nullable: nullable == "t", + }), + "bytea" => Ok(Column { + data_type: PostgresType::Bytea, + nullable: nullable == "t", + }), + "bigint" => Ok(Column { + data_type: PostgresType::Int8, + nullable: nullable == "t", + }), + "smallint" => Ok(Column { + data_type: PostgresType::Int2, + nullable: nullable == "t", + }), + "integer" => Ok(Column { + data_type: PostgresType::Int4, + nullable: nullable == "t", + }), + "character" | "character varying" => Ok(Column { + data_type: PostgresType::Char, + nullable: nullable == "t", + }), + "text" => Ok(Column { + data_type: PostgresType::Text, + nullable: nullable == "t", + }), + "json" => Ok(Column { + data_type: PostgresType::Json, + nullable: nullable == "t", + }), + "jsonb" => Ok(Column { + data_type: PostgresType::Jsonb, + nullable: nullable == "t", + }), + "real" => Ok(Column { + data_type: PostgresType::Float4, + nullable: nullable == "t", + }), + "double precision" => Ok(Column { + data_type: PostgresType::Float8, + nullable: nullable == "t", + }), + "numeric" => Ok(Column { + data_type: PostgresType::Decimal, + nullable: nullable == "t", + }), + "date" => Ok(Column { + data_type: PostgresType::Date, + nullable: nullable == "t", + }), + "time" => Ok(Column { + data_type: PostgresType::Time, + nullable: nullable == "t", + }), + "timestamp without time zone" => Ok(Column { + data_type: PostgresType::Timestamp, + nullable: nullable == "t", + }), + "timestamp with time zone" => Ok(Column { + data_type: PostgresType::TimestampTz(timezone), + nullable: nullable == "t", + }), + "interval" => Ok(Column { + data_type: PostgresType::Interval, + nullable: nullable == "t", + }), + _ => Err(ErrorKind::UnsupportedColumnType { + typ: type_str.to_string(), + }), } - } } @@ -265,38 +220,46 @@ pub struct PostgresSchema { pub columns: Vec<(String, Column)>, } - impl From for SchemaRef { fn from(pg_schema: PostgresSchema) -> Self { - let fields: Vec = pg_schema.columns.iter().map(|(name, col)| { - Field::new(name, col.data_type.clone().into(), col.nullable) - }).collect(); + let fields: Vec = pg_schema + .columns + .iter() + .map(|(name, col)| Field::new(name, col.data_type.clone().into(), col.nullable)) + .collect(); Arc::new(Schema::new(fields)) } } impl PostgresSchema { - pub fn from_reader(mut reader: R, delim: char) -> Result { + pub fn from_reader( + mut reader: R, + delim: char, + timezone: String, + ) -> Result { let mut schema_str = String::new(); reader.read_to_string(&mut schema_str)?; - let schema = schema_str.split('\n').filter(|s| !s.is_empty()).map(|s|{ - let parts: Vec<&str> = s.splitn(3, delim).collect(); - if parts.len() != 3 { - return Err(ErrorKind::InvalidColumnSpec{spec: s.to_string()}); - } - let name = parts[0]; - let typ = parts[1]; - let nullable = parts[2]; - let col = Column::from_parts(typ, nullable)?; - Ok((name.to_string(), col)) - }).collect::, ErrorKind>>().map(|columns| { - PostgresSchema { columns } - - })?; + let schema = schema_str + .split('\n') + .filter(|s| !s.is_empty()) + .map(|s| { + let parts: Vec<&str> = s.splitn(3, delim).collect(); + if parts.len() != 3 { + return Err(ErrorKind::InvalidColumnSpec { + spec: s.to_string(), + }); + } + let name = parts[0]; + let typ = parts[1]; + let nullable = parts[2]; + let col = Column::from_parts(typ, nullable, timezone.to_string())?; + Ok((name.to_string(), col)) + }) + .collect::, ErrorKind>>() + .map(|columns| PostgresSchema { columns })?; Ok(schema) - } pub fn iter(&self) -> impl Iterator { diff --git a/core/tests/decode_integration_tests.rs b/core/tests/decode_integration_tests.rs index ce6a294..05406eb 100644 --- a/core/tests/decode_integration_tests.rs +++ b/core/tests/decode_integration_tests.rs @@ -1,17 +1,26 @@ +use arrow_array::RecordBatch; +use arrow_ipc::reader::FileReader; +use arrow_schema::{Field, Schema}; +use pgpq::error::ErrorKind; use std::fs::File; -use std::path::PathBuf; use std::io::BufReader; -use pgpq::error::ErrorKind; - -use pgpq::{PostgresBinaryToArrowDecoder, pg_schema::PostgresSchema}; +use std::path::PathBuf; +use std::sync::Arc; +use pgpq::{pg_schema::PostgresSchema, PostgresBinaryToArrowDecoder}; -const KB: usize = 1024* 512; +const READ_CHUNK_SIZE: usize = 1024 * 8; fn read_schema_file(path: PathBuf) -> PostgresSchema { let file = File::open(path).unwrap(); let reader = BufReader::new(file); - PostgresSchema::from_reader(reader, ',').unwrap() + PostgresSchema::from_reader(reader, ',', "America/New_York".to_string()).unwrap() +} + +fn read_arrow_file(path: PathBuf) -> Vec { + let file = File::open(path).unwrap(); + let reader = FileReader::try_new(file, None).unwrap(); + reader.collect::, _>>().unwrap() } fn run_test_case(case: &str) -> Result<(), ErrorKind> { @@ -19,177 +28,202 @@ fn run_test_case(case: &str) -> Result<(), ErrorKind> { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(format!("tests/snapshots/{case}.bin")); let schema_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(format!("tests/decoding/{case}.schema")); + let arrow_path = + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(format!("tests/testdata/{case}.arrow")); + let file = File::open(path).unwrap(); - let reader = BufReader::with_capacity(KB, file); + let reader = BufReader::with_capacity(READ_CHUNK_SIZE, file); let schema = read_schema_file(schema_path); - println!("{:?}", schema); - let mut decoder = PostgresBinaryToArrowDecoder::new(schema, reader, KB).unwrap(); + + let mut decoder = PostgresBinaryToArrowDecoder::new(schema, reader, READ_CHUNK_SIZE).unwrap(); decoder.read_header()?; - let batches = decoder.decode_batches()?; - println!("{:?}", batches); + let batches = decoder.decode_batches()?; + + let mut expected_batches = read_arrow_file(arrow_path); + + // all testdata currently has nullable set where it should not. + // This is a workaround to make the test pass. + if !case.contains("nullable") { + expected_batches = expected_batches + .into_iter() + .map(|batch| { + let new_fields: Vec> = (*(*batch.schema()).clone().fields) + .to_vec() + .clone() + .into_iter() + .map(|f| Arc::new((*f).clone().with_nullable(false))) + .collect(); + let new_schema = Schema::new(new_fields); + println!("{:?}", new_schema); + RecordBatch::try_new(Arc::new(new_schema), batch.columns().to_vec()).unwrap() + }) + .collect::>(); + } + + assert_eq!(batches.len(), expected_batches.len()); + assert_eq!(batches, expected_batches); + Ok(()) } - #[test] fn test_bool() -> Result<(), ErrorKind> { run_test_case("bool")?; - Ok(()) + Ok(()) } #[test] fn test_int16() -> Result<(), ErrorKind> { run_test_case("int16")?; - Ok(()) + Ok(()) } #[test] fn test_int32() -> Result<(), ErrorKind> { run_test_case("int32")?; - Ok(()) + Ok(()) } #[test] fn test_int64() -> Result<(), ErrorKind> { run_test_case("int64")?; - Ok(()) + Ok(()) } #[test] fn test_float32() -> Result<(), ErrorKind> { run_test_case("float32")?; - Ok(()) + Ok(()) } #[test] fn test_float64() -> Result<(), ErrorKind> { run_test_case("float64")?; - Ok(()) + Ok(()) } #[test] fn test_timestamp_us_notz() -> Result<(), ErrorKind> { run_test_case("timestamp_us_notz")?; - Ok(()) + Ok(()) } #[test] fn test_timestamp_us_tz() -> Result<(), ErrorKind> { run_test_case("timestamp_us_tz")?; - Ok(()) + Ok(()) } #[test] fn test_time_us() -> Result<(), ErrorKind> { run_test_case("time_us")?; - Ok(()) + Ok(()) } #[test] fn test_date32() -> Result<(), ErrorKind> { run_test_case("date32")?; - Ok(()) + Ok(()) } #[test] fn test_duration_us() -> Result<(), ErrorKind> { run_test_case("duration_us")?; - Ok(()) + Ok(()) } - #[test] -fn test_binary() -> Result<(), ErrorKind> { - run_test_case("binary")?; - Ok(()) +fn test_large_binary() -> Result<(), ErrorKind> { + run_test_case("large_binary")?; + Ok(()) } #[test] -fn test_string() -> Result<(), ErrorKind> { - run_test_case("string")?; - Ok(()) +fn test_large_string() -> Result<(), ErrorKind> { + run_test_case("large_string")?; + Ok(()) } #[test] fn test_bool_nullable() -> Result<(), ErrorKind> { run_test_case("bool_nullable")?; - Ok(()) + Ok(()) } #[test] fn test_int16_nullable() -> Result<(), ErrorKind> { run_test_case("int16_nullable")?; - Ok(()) + Ok(()) } #[test] fn test_int32_nullable() -> Result<(), ErrorKind> { run_test_case("int32_nullable")?; - Ok(()) + Ok(()) } #[test] fn test_int64_nullable() -> Result<(), ErrorKind> { run_test_case("int64_nullable")?; - Ok(()) + Ok(()) } #[test] fn test_float32_nullable() -> Result<(), ErrorKind> { run_test_case("float32_nullable")?; - Ok(()) + Ok(()) } #[test] fn test_float64_nullable() -> Result<(), ErrorKind> { run_test_case("float64_nullable")?; - Ok(()) + Ok(()) } #[test] fn test_timestamp_us_notz_nullable() -> Result<(), ErrorKind> { run_test_case("timestamp_us_notz_nullable")?; - Ok(()) + Ok(()) } #[test] fn test_timestamp_us_tz_nullable() -> Result<(), ErrorKind> { run_test_case("timestamp_us_tz_nullable")?; - Ok(()) + Ok(()) } #[test] fn test_time_us_nullable() -> Result<(), ErrorKind> { run_test_case("time_us_nullable")?; - Ok(()) + Ok(()) } #[test] fn test_date32_nullable() -> Result<(), ErrorKind> { run_test_case("date32_nullable")?; - Ok(()) + Ok(()) } #[test] fn test_duration_us_nullable() -> Result<(), ErrorKind> { run_test_case("duration_us_nullable")?; - Ok(()) + Ok(()) } #[test] -fn test_binary_nullable() -> Result<(), ErrorKind> { - run_test_case("binary_nullable")?; - Ok(()) +fn test_large_binary_nullable() -> Result<(), ErrorKind> { + run_test_case("large_binary_nullable")?; + Ok(()) } #[test] -fn test_string_nullable() -> Result<(), ErrorKind> { - run_test_case("string_nullable")?; - Ok(()) +fn test_large_string_nullable() -> Result<(), ErrorKind> { + run_test_case("large_string_nullable")?; + Ok(()) } -// #[test] -// fn test_profile() -> Result<(), ErrorKind> { -// run_test_case("profile")?; -// Ok(()) -// } +//#[test] +//fn test_profile() -> Result<(), ErrorKind> { +// run_test_case("profile")?; +// Ok(()) +//} diff --git a/core/tests/decoding/binary.schema b/core/tests/decoding/binary.schema deleted file mode 100644 index a1647a6..0000000 --- a/core/tests/decoding/binary.schema +++ /dev/null @@ -1 +0,0 @@ -binary_column,bytea,f \ No newline at end of file diff --git a/core/tests/decoding/binary_nullable.schema b/core/tests/decoding/binary_nullable.schema deleted file mode 100644 index 0dcd30f..0000000 --- a/core/tests/decoding/binary_nullable.schema +++ /dev/null @@ -1 +0,0 @@ -binary_column,bytea,t \ No newline at end of file diff --git a/core/tests/decoding/bool.schema b/core/tests/decoding/bool.schema index 7894e7f..a0e6dbf 100644 --- a/core/tests/decoding/bool.schema +++ b/core/tests/decoding/bool.schema @@ -1 +1 @@ -bool_column,boolean,f \ No newline at end of file +bool,boolean,f \ No newline at end of file diff --git a/core/tests/decoding/bool_nullable.schema b/core/tests/decoding/bool_nullable.schema index 320990b..5e11eac 100644 --- a/core/tests/decoding/bool_nullable.schema +++ b/core/tests/decoding/bool_nullable.schema @@ -1 +1 @@ -bool_column,boolean,t \ No newline at end of file +bool_nullable,boolean,t diff --git a/core/tests/decoding/date32.schema b/core/tests/decoding/date32.schema index 953ab7d..9fb0ee6 100644 --- a/core/tests/decoding/date32.schema +++ b/core/tests/decoding/date32.schema @@ -1 +1 @@ -date32_column,date,f \ No newline at end of file +date32,date,f \ No newline at end of file diff --git a/core/tests/decoding/date32_nullable.schema b/core/tests/decoding/date32_nullable.schema index 0211de2..f0082d7 100644 --- a/core/tests/decoding/date32_nullable.schema +++ b/core/tests/decoding/date32_nullable.schema @@ -1 +1 @@ -date32_column,date,t \ No newline at end of file +date32_nullable,date,t diff --git a/core/tests/decoding/duration_us.schema b/core/tests/decoding/duration_us.schema index af9e599..64fd6f8 100644 --- a/core/tests/decoding/duration_us.schema +++ b/core/tests/decoding/duration_us.schema @@ -1 +1 @@ -duration_us_column,interval,f \ No newline at end of file +duration_us,interval,f \ No newline at end of file diff --git a/core/tests/decoding/duration_us_nullable.schema b/core/tests/decoding/duration_us_nullable.schema index f251662..8a013b7 100644 --- a/core/tests/decoding/duration_us_nullable.schema +++ b/core/tests/decoding/duration_us_nullable.schema @@ -1 +1 @@ -duration_us_column,interval,t \ No newline at end of file +duration_us_nullable,interval,t diff --git a/core/tests/decoding/float32.schema b/core/tests/decoding/float32.schema index fce1061..1905873 100644 --- a/core/tests/decoding/float32.schema +++ b/core/tests/decoding/float32.schema @@ -1 +1 @@ -float32_column,real,f \ No newline at end of file +float32,real,f \ No newline at end of file diff --git a/core/tests/decoding/float32_nullable.schema b/core/tests/decoding/float32_nullable.schema index 38f5f7e..b3708a9 100644 --- a/core/tests/decoding/float32_nullable.schema +++ b/core/tests/decoding/float32_nullable.schema @@ -1 +1 @@ -float32_column,real,t \ No newline at end of file +float32_nullable,real,t diff --git a/core/tests/decoding/float64.schema b/core/tests/decoding/float64.schema index 9178796..9c68dd4 100644 --- a/core/tests/decoding/float64.schema +++ b/core/tests/decoding/float64.schema @@ -1 +1 @@ -float64_column,double precision,f \ No newline at end of file +float64,double precision,f \ No newline at end of file diff --git a/core/tests/decoding/float64_nullable.schema b/core/tests/decoding/float64_nullable.schema index 78c59d5..a8e547c 100644 --- a/core/tests/decoding/float64_nullable.schema +++ b/core/tests/decoding/float64_nullable.schema @@ -1 +1 @@ -float64_column,double precision,t \ No newline at end of file +float64_nullable,double precision,t diff --git a/core/tests/decoding/int16.schema b/core/tests/decoding/int16.schema index fcd83d9..fa3f2bd 100644 --- a/core/tests/decoding/int16.schema +++ b/core/tests/decoding/int16.schema @@ -1 +1 @@ -int16_column,smallint,f \ No newline at end of file +int16,smallint,f \ No newline at end of file diff --git a/core/tests/decoding/int16_nullable.schema b/core/tests/decoding/int16_nullable.schema index 06fc20a..9eb14de 100644 --- a/core/tests/decoding/int16_nullable.schema +++ b/core/tests/decoding/int16_nullable.schema @@ -1 +1 @@ -int16_column,smallint,t \ No newline at end of file +int16_nullable,smallint,t diff --git a/core/tests/decoding/int32.schema b/core/tests/decoding/int32.schema index d464e55..6409207 100644 --- a/core/tests/decoding/int32.schema +++ b/core/tests/decoding/int32.schema @@ -1 +1 @@ -int32_column,integer,f \ No newline at end of file +int32,integer,f \ No newline at end of file diff --git a/core/tests/decoding/int32_nullable.schema b/core/tests/decoding/int32_nullable.schema index 83a6a32..e88df62 100644 --- a/core/tests/decoding/int32_nullable.schema +++ b/core/tests/decoding/int32_nullable.schema @@ -1 +1 @@ -int32_column,integer,t \ No newline at end of file +int32_nullable,integer,t diff --git a/core/tests/decoding/int64.schema b/core/tests/decoding/int64.schema index 17b12d4..fc3fd4c 100644 --- a/core/tests/decoding/int64.schema +++ b/core/tests/decoding/int64.schema @@ -1 +1 @@ -int64_column,bigint,f \ No newline at end of file +int64,bigint,f \ No newline at end of file diff --git a/core/tests/decoding/int64_nullable b/core/tests/decoding/int64_nullable index 22fad18..4215f43 100644 --- a/core/tests/decoding/int64_nullable +++ b/core/tests/decoding/int64_nullable @@ -1 +1 @@ -int64_column,bigint,t +int64,bigint,t diff --git a/core/tests/decoding/int64_nullable.schema b/core/tests/decoding/int64_nullable.schema index 5809adb..7f0fdab 100644 --- a/core/tests/decoding/int64_nullable.schema +++ b/core/tests/decoding/int64_nullable.schema @@ -1 +1 @@ -int64_column,bigint,t \ No newline at end of file +int64_nullable,bigint,t diff --git a/core/tests/decoding/large_binary.schema b/core/tests/decoding/large_binary.schema new file mode 100644 index 0000000..1f89ade --- /dev/null +++ b/core/tests/decoding/large_binary.schema @@ -0,0 +1 @@ +large_binary,bytea,f diff --git a/core/tests/decoding/large_binary_nullable.schema b/core/tests/decoding/large_binary_nullable.schema new file mode 100644 index 0000000..678b1d1 --- /dev/null +++ b/core/tests/decoding/large_binary_nullable.schema @@ -0,0 +1 @@ +large_binary_nullable,bytea,t diff --git a/core/tests/decoding/large_string.schema b/core/tests/decoding/large_string.schema new file mode 100644 index 0000000..8862a5f --- /dev/null +++ b/core/tests/decoding/large_string.schema @@ -0,0 +1 @@ +large_string,text,f diff --git a/core/tests/decoding/large_string_nullable.schema b/core/tests/decoding/large_string_nullable.schema new file mode 100644 index 0000000..c9e5b40 --- /dev/null +++ b/core/tests/decoding/large_string_nullable.schema @@ -0,0 +1 @@ +large_string_nullable,text,t diff --git a/core/tests/decoding/numeric.schema b/core/tests/decoding/numeric.schema index 0ea6b82..ce2108f 100644 --- a/core/tests/decoding/numeric.schema +++ b/core/tests/decoding/numeric.schema @@ -1 +1 @@ -numeric_column,numeric,f \ No newline at end of file +numeric,numeric,f \ No newline at end of file diff --git a/core/tests/decoding/numeric_nullable.schema b/core/tests/decoding/numeric_nullable.schema index 9d7cb11..32445c4 100644 --- a/core/tests/decoding/numeric_nullable.schema +++ b/core/tests/decoding/numeric_nullable.schema @@ -1 +1 @@ -numeric_column,numeric,t \ No newline at end of file +numeric_nullable,numeric,t diff --git a/core/tests/decoding/string.schema b/core/tests/decoding/string.schema deleted file mode 100644 index 9ebee6f..0000000 --- a/core/tests/decoding/string.schema +++ /dev/null @@ -1 +0,0 @@ -string_column,text,f \ No newline at end of file diff --git a/core/tests/decoding/string_nullable.schema b/core/tests/decoding/string_nullable.schema deleted file mode 100644 index 924fcc9..0000000 --- a/core/tests/decoding/string_nullable.schema +++ /dev/null @@ -1 +0,0 @@ -string_column,text,t \ No newline at end of file diff --git a/core/tests/decoding/time_us.schema b/core/tests/decoding/time_us.schema index 2ef9fa4..e327d79 100644 --- a/core/tests/decoding/time_us.schema +++ b/core/tests/decoding/time_us.schema @@ -1 +1 @@ -time_us_column,time,f \ No newline at end of file +time_us,time,f \ No newline at end of file diff --git a/core/tests/decoding/time_us_nullable.schema b/core/tests/decoding/time_us_nullable.schema index 029177e..8e6d71f 100644 --- a/core/tests/decoding/time_us_nullable.schema +++ b/core/tests/decoding/time_us_nullable.schema @@ -1 +1 @@ -time_us_column,time,t \ No newline at end of file +time_us_nullable,time,t diff --git a/core/tests/decoding/timestamp_us_notz.schema b/core/tests/decoding/timestamp_us_notz.schema index fee451f..705cf01 100644 --- a/core/tests/decoding/timestamp_us_notz.schema +++ b/core/tests/decoding/timestamp_us_notz.schema @@ -1 +1 @@ -timestamp_us_no_tz_column,timestamp without time zone,f \ No newline at end of file +timestamp_us_notz,timestamp without time zone,f diff --git a/core/tests/decoding/timestamp_us_notz_nullable.schema b/core/tests/decoding/timestamp_us_notz_nullable.schema index 4aebe00..1a3360a 100644 --- a/core/tests/decoding/timestamp_us_notz_nullable.schema +++ b/core/tests/decoding/timestamp_us_notz_nullable.schema @@ -1 +1 @@ -timestamp_us_no_tz_column,timestamp without time zone,t \ No newline at end of file +timestamp_us_notz_nullable,timestamp without time zone,t diff --git a/core/tests/decoding/timestamp_us_tz.schema b/core/tests/decoding/timestamp_us_tz.schema index 56a23d1..64ec24b 100644 --- a/core/tests/decoding/timestamp_us_tz.schema +++ b/core/tests/decoding/timestamp_us_tz.schema @@ -1 +1 @@ -timestamp_us_tz_column,timestamp with time zone,f \ No newline at end of file +timestamp_us_tz,timestamp with time zone,f \ No newline at end of file diff --git a/core/tests/decoding/timestamp_us_tz_nullable.schema b/core/tests/decoding/timestamp_us_tz_nullable.schema index d18fd98..e16aede 100644 --- a/core/tests/decoding/timestamp_us_tz_nullable.schema +++ b/core/tests/decoding/timestamp_us_tz_nullable.schema @@ -1 +1 @@ -timestamp_us_tz_column,timestamp with time zone,t \ No newline at end of file +timestamp_us_tz_nullable,timestamp with time zone,t diff --git a/py/src/pg_schema.rs b/py/src/pg_schema.rs index c5f5a41..5588a58 100644 --- a/py/src/pg_schema.rs +++ b/py/src/pg_schema.rs @@ -229,6 +229,7 @@ impl From for PostgresType { pgpq::pg_schema::PostgresType::Time => PostgresType::Time(Time), pgpq::pg_schema::PostgresType::Timestamp => PostgresType::Timestamp(Timestamp), pgpq::pg_schema::PostgresType::Interval => PostgresType::Interval(Interval), + pgpq::pg_schema::PostgresType::Decimal => todo!(), pgpq::pg_schema::PostgresType::List(inner) => { PostgresType::List(List::new((*inner).into())) }