Skip to content

Commit

Permalink
Library-Wide Schema improvements (#308)
Browse files Browse the repository at this point in the history
- low-level: full logical types support in Parquet Thrift schema
- class serializer supports custom attributes to customize serialisation of dates, times, and decimals.
- added support for .NET 6 DateOnly type.
  • Loading branch information
aloneguid authored Apr 21, 2023
1 parent d6ef0b1 commit 26ea729
Show file tree
Hide file tree
Showing 22 changed files with 576 additions and 300 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/full.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: 'Full Workflow'

env:
VERSION: 4.8.1
VERSION: 4.9.0
ASM_VERSION: 4.0.0

on:
Expand Down
35 changes: 35 additions & 0 deletions docs/serialisation.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,41 @@ Serialisation tries to fit into C# ecosystem like a ninja 🥷, including custom
- [`JsonIgnore`](https://learn.microsoft.com/en-us/dotnet/api/system.text.json.serialization.jsonignoreattribute?view=net-7.0) - ignores property when reading or writing.
- [`JsonPropertyOrder`](https://learn.microsoft.com/en-us/dotnet/api/system.text.json.serialization.jsonpropertyorderattribute?view=net-6.0) - allows to reorder columns when writing to file (by default they are written in class definition order). Only root properties and struct (classes) properties can be ordered (it won't make sense to do the others).

Where built-in JSON attributes are not sufficient, extra attributes are added.

### Dates

By default, dates (`DateTime`) are serialized as `INT96` number, which include nanoseconds in the day. In general, `INT96` is obsolete in Parquet, however older systems such as Impala and Hive are still actively using it to represent dates.

Therefore, when this library sees `INT96` type, it will automatically treat it as a date for both serialization and deserialization.

If you need to rather use a normal non-legacy date type, just annotate a property with `[ParquetTimestamp]`:

```csharp
[ParquetTimestamp]
public DateTime TimestampDate { get; set; }
```

### Times

By default, time (`TimeSpan`) is serialised with millisecond precision. but you can increase it by adding `[ParquetMicroSecondsTime]` attribute:

```csharp
[ParquetMicroSecondsTime]
public TimeSpan MicroTime { get; set; }
```

### Decimals Numbers

By default, `decimal` is serialized with precision (number of digits in a number) of `38` and scale (number of digits to the right of the decimal point in a number) of `18`. If you need to use different precision/scale pair, use `[ParquetDecimal]` attribute:

```csharp
[ParquetDecimal(40, 20)]
public decimal With_40_20 { get; set; }
```



## Nested Types

You can also serialize [more complex types](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types) supported by the Parquet format. Sometimes you might want to store more complex data in your parquet files, like lists or maps. These are called *nested types* and they can be useful for organizing your information. However, they also come with a trade-off: they make your code slower and use more CPU resources. That's why you should only use them when you really need them and not just because they look cool. Simple columns are faster and easier to work with, so stick to them whenever you can.
Expand Down
7 changes: 4 additions & 3 deletions src/Parquet.PerfRunner/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// for performance tests only

using BenchmarkDotNet.Running;
using Parquet;
using Parquet.PerfRunner.Benchmarks;

if(args.Length == 1) {
Expand All @@ -18,7 +19,7 @@
} else {
//new VsParquetSharp().Main();
//await new DataTypes().NullableInts();
var c = new Classes();
c.SetUp();
c.Serialise();
//var c = new Classes();
//c.SetUp();
//c.Serialise();
}
24 changes: 9 additions & 15 deletions src/Parquet.Test/File/Values/Primitives/BigDecimalTest.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
using System;
using System.Collections.Generic;
using System.Text;
using Parquet.File.Values.Primitives;
using Parquet.File.Values.Primitives;
using Xunit;

namespace Parquet.Test.File.Values.Primitives
{
public class BigDecimalTest
{
[Fact]
public void Valid_but_massive_bigdecimal()
{
var bd = new BigDecimal(83086059037282.54m, 38, 16);
namespace Parquet.Test.File.Values.Primitives {
public class BigDecimalTest {
[Fact]
public void Valid_but_massive_bigdecimal() {
var bd = new BigDecimal(83086059037282.54m, 38, 16);

//if exception is not thrown (overflow) we're OK
}
}
//if exception is not thrown (overflow) we're OK
}
}
}
26 changes: 0 additions & 26 deletions src/Parquet.Test/Schema/SchemaTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -312,32 +312,6 @@ public async Task ReadSchemaActuallyEqualToWriteSchema() {
}
}

[Theory]
[InlineData(typeof(bool), TT.BOOLEAN, null)]
[InlineData(typeof(byte), TT.INT32, CT.UINT_8)]
[InlineData(typeof(sbyte), TT.INT32, CT.INT_8)]
[InlineData(typeof(short), TT.INT32, CT.INT_16)]
[InlineData(typeof(ushort), TT.INT32, CT.UINT_16)]
[InlineData(typeof(int), TT.INT32, CT.INT_32)]
[InlineData(typeof(uint), TT.INT32, CT.UINT_32)]
[InlineData(typeof(long), TT.INT64, CT.INT_64)]
[InlineData(typeof(ulong), TT.INT64, CT.UINT_64)]
[InlineData(typeof(BigInteger), TT.INT96, null)]
[InlineData(typeof(float), TT.FLOAT, null)]
[InlineData(typeof(double), TT.DOUBLE, null)]
[InlineData(typeof(byte[]), TT.BYTE_ARRAY, null)]
[InlineData(typeof(string), TT.BYTE_ARRAY, CT.UTF8)]
// decimal
[InlineData(typeof(DateTime), TT.INT96, null)]
// TimeSpan
// Interval
public void SystemTypeToThriftMapping(Type t, TT expectedTT, CT? expectedCT) {
Assert.True(SchemaEncoder.FindTypeTuple(t, out TT foundTT, out CT? foundCT));

Assert.Equal(expectedTT, foundTT);
Assert.Equal(expectedCT, foundCT);
}

[Fact]
public void Decode_list_normal() {
ParquetSchema schema = ThriftFooter.Parse(
Expand Down
74 changes: 74 additions & 0 deletions src/Parquet.Test/Serialisation/SchemaReflectorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Text.Json.Serialization;
using Parquet.Schema;
using Parquet.Serialization;
using Parquet.Serialization.Attributes;
using Xunit;

namespace Parquet.Test.Serialisation {
Expand Down Expand Up @@ -239,5 +240,78 @@ public void ListOfStructs() {
expectedSchema.Equals(actualSchema),
expectedSchema.GetNotEqualsMessage(actualSchema, "expected", "actual"));
}

class DatesPoco {

public DateTime ImpalaDate { get; set; }

[ParquetTimestamp]
public DateTime TimestampDate { get; set; }

public TimeSpan DefaultTime { get; set; }

[ParquetMicroSecondsTime]
public TimeSpan MicroTime { get; set; }
}

[Fact]
public void Date_default_impala() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);

Assert.True(s.DataFields[0] is DateTimeDataField);
Assert.Equal(DateTimeFormat.Impala, ((DateTimeDataField)s.DataFields[0]).DateTimeFormat);
}

[Fact]
public void Date_timestamp() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);

Assert.True(s.DataFields[1] is DateTimeDataField);
Assert.Equal(DateTimeFormat.DateAndTime, ((DateTimeDataField)s.DataFields[1]).DateTimeFormat);
}

[Fact]
public void Time_default() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);

Assert.True(s.DataFields[2] is TimeSpanDataField);
Assert.Equal(TimeSpanFormat.MilliSeconds, ((TimeSpanDataField)s.DataFields[2]).TimeSpanFormat);
}

[Fact]
public void Time_micros() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);

Assert.True(s.DataFields[3] is TimeSpanDataField);
Assert.Equal(TimeSpanFormat.MicroSeconds, ((TimeSpanDataField)s.DataFields[3]).TimeSpanFormat);
}



class DecimalPoco {
public decimal Default { get; set; }

[ParquetDecimal(40, 20)]
public decimal With_40_20 { get; set; }
}

[Fact]
public void Decimal_default() {
ParquetSchema s = typeof(DecimalPoco).GetParquetSchema(true);

Assert.True(s.DataFields[0] is DecimalDataField);
Assert.Equal(38, ((DecimalDataField)s.DataFields[0]).Precision);
Assert.Equal(18, ((DecimalDataField)s.DataFields[0]).Scale);
}

[Fact]
public void Decimal_override() {
ParquetSchema s = typeof(DecimalPoco).GetParquetSchema(true);

Assert.True(s.DataFields[1] is DecimalDataField);
Assert.Equal(40, ((DecimalDataField)s.DataFields[1]).Precision);
Assert.Equal(20, ((DecimalDataField)s.DataFields[1]).Scale);
}

}
}
8 changes: 7 additions & 1 deletion src/Parquet.Test/TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,16 @@ protected async Task<object> WriteReadSingle(DataField field, object? value, Com
//for sanity, use disconnected streams
byte[] data;

var options = new ParquetOptions();
#if !NETCOREAPP3_1
if(value is DateOnly)
options.UseDateOnlyTypeForDates = true;
#endif

using(var ms = new MemoryStream()) {
// write single value

using(ParquetWriter writer = await ParquetWriter.CreateAsync(new ParquetSchema(field), ms)) {
using(ParquetWriter writer = await ParquetWriter.CreateAsync(new ParquetSchema(field), ms, options)) {
writer.CompressionMethod = compressionMethod;

using ParquetRowGroupWriter rg = writer.CreateRowGroup();
Expand Down
18 changes: 14 additions & 4 deletions src/Parquet.Test/Types/EndToEndTypeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class EndToEndTypeTest : TestBase {
["dateDateAndTime local kind"] = (new DateTimeDataField("dateDateAndTime unknown kind", DateTimeFormat.DateAndTime), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
// don't want any excess info in the offset INT32 doesn't contain or care about this data
["dateDate"] = (new DateTimeDataField("dateDate", DateTimeFormat.Date), DateTime.UtcNow.RoundToDay()),
#if !NETCOREAPP3_1
["dateOnly"] = (new DataField<DateOnly>("dateOnly"), DateOnly.FromDateTime(DateTime.UtcNow)),
#endif
["interval"] = (new DataField<Interval>("interval"), new Interval(3, 2, 1)),
// time test(loses precision slightly)
["time_micros"] = (new TimeSpanDataField("timeMicros", TimeSpanFormat.MicroSeconds), new TimeSpan(DateTime.UtcNow.TimeOfDay.Ticks / 10 * 10)),
Expand Down Expand Up @@ -108,6 +111,9 @@ public class EndToEndTypeTest : TestBase {
[InlineData("impala date local kind")]
[InlineData("dateDateAndTime local kind")]
[InlineData("dateDate")]
#if !NETCOREAPP3_1
[InlineData("dateOnly")]
#endif
[InlineData("interval")]
[InlineData("time_micros")]
[InlineData("time_millis")]
Expand Down Expand Up @@ -143,17 +149,21 @@ public async Task Type_writes_and_reads_end_to_end(string name) {
object actual = await WriteReadSingle(input.field, input.expectedValue);

bool equal;
if(input.expectedValue == null && actual == null) equal = true;
else if(actual.GetType().IsArrayOf<byte>() && input.expectedValue != null) equal = ((byte[])actual).SequenceEqual((byte[])input.expectedValue);
else if(actual.GetType() == typeof(DateTime)) {
if(input.expectedValue == null && actual == null)
equal = true;
else if(actual.GetType().IsArrayOf<byte>() && input.expectedValue != null) {
equal = ((byte[])actual).SequenceEqual((byte[])input.expectedValue);
} else if(actual.GetType() == typeof(DateTime)) {
var dtActual = (DateTime)actual;
Assert.Equal(DateTimeKind.Utc, dtActual.Kind);
var dtExpected = (DateTime)input.expectedValue!;
dtExpected = dtExpected.Kind == DateTimeKind.Unspecified
? DateTime.SpecifyKind(dtExpected, DateTimeKind.Utc) // assumes value is UTC
: dtExpected.ToUniversalTime();
equal = dtActual.Equals(dtExpected);
} else equal = actual.Equals(input.expectedValue);
} else {
equal = actual.Equals(input.expectedValue);
}

Assert.True(equal, $"{name}| expected: [{input.expectedValue}], actual: [{actual}], schema element: {input.field}");
}
Expand Down
1 change: 1 addition & 0 deletions src/Parquet/Data/DecimalFormatDefaults.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class DecimalFormatDefaults {
/// The Default Precision value used when not explicitly defined; this is the value used prior to parquet-dotnet v3.9.
/// </summary>
public const int DefaultPrecision = 38;

/// <summary>
/// The Default Scale value used when not explicitly defined; this is the value used prior to parquet-dotnet v3.9.
/// </summary>
Expand Down
57 changes: 57 additions & 0 deletions src/Parquet/Encodings/ParquetPlainEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ public static void Encode(
Encode(span, destination, tse);
if(stats != null)
FillStats(span, stats);
#if NET6_0_OR_GREATER
} else if(t == typeof(DateOnly[])) {
Span<DateOnly> span = ((DateOnly[])data).AsSpan(offset, count);
Encode(span, destination, tse);
if(stats != null)
FillStats(span, stats);
#endif
} else if(t == typeof(TimeSpan[])) {
Span<TimeSpan> span = ((TimeSpan[])data).AsSpan(offset, count);
Encode(span, destination, tse);
Expand Down Expand Up @@ -171,6 +178,11 @@ public static void Decode(
} else if(t == typeof(DateTime[])) {
Span<DateTime> span = ((DateTime[])data).AsSpan(offset, count);
elementsRead = Decode(source, span, tse);
#if NET6_0_OR_GREATER
} else if(t == typeof(DateOnly[])) {
Span<DateOnly> span = ((DateOnly[])data).AsSpan(offset, count);
elementsRead = Decode(source, span, tse);
#endif
} else if(t == typeof(TimeSpan[])) {
Span<TimeSpan> span = ((TimeSpan[])data).AsSpan(offset, count);
elementsRead = Decode(source, span, tse);
Expand Down Expand Up @@ -238,6 +250,10 @@ public static bool TryEncode(object? value, SchemaElement tse, out byte[]? resul
return true;
} else if(t == typeof(DateTime))
return TryEncode((DateTime)value, tse, out result);
#if NET6_0_OR_GREATER
else if(t == typeof(DateOnly))
return TryEncode((DateOnly)value, tse, out result);
#endif
else if(t == typeof(TimeSpan))
return TryEncode((TimeSpan)value, tse, out result);
else if(t == typeof(Interval)) {
Expand Down Expand Up @@ -339,6 +355,14 @@ private static bool TryEncode(DateTime value, SchemaElement tse, out byte[] resu
}
}

#if NET6_0_OR_GREATER
private static bool TryEncode(DateOnly value, SchemaElement tse, out byte[] result) {
int days = value.ToUnixDays();
result = BitConverter.GetBytes(days);
return true;
}
#endif

private static bool TryEncode(decimal value, SchemaElement tse, out byte[] result) {
try {
switch(tse.Type) {
Expand Down Expand Up @@ -766,6 +790,16 @@ public static void Encode(ReadOnlySpan<DateTime> data, Stream destination, Schem
}
}

#if NET6_0_OR_GREATER
public static void Encode(ReadOnlySpan<DateOnly> data, Stream destination, SchemaElement tse) {
foreach(DateOnly element in data) {
int days = element.ToUnixDays();
byte[] raw = BitConverter.GetBytes(days);
destination.Write(raw, 0, raw.Length);
}
}
#endif

public static int Decode(Span<byte> source, Span<DateTime> data, SchemaElement tse) {
switch(tse.Type) {
case Thrift.Type.INT32:
Expand Down Expand Up @@ -816,6 +850,20 @@ public static int Decode(Span<byte> source, Span<DateTime> data, SchemaElement t
}
}

#if NET6_0_OR_GREATER
public static int Decode(Span<byte> source, Span<DateOnly> data, SchemaElement tse) {
int[] ints = ArrayPool<int>.Shared.Rent(data.Length);
try {
int intsRead = Decode(source, ints.AsSpan(0, data.Length));
for(int i = 0; i < intsRead; i++)
data[i] = DateOnly.FromDateTime(ints[i].AsUnixDaysInDateTime());
return intsRead;
} finally {
ArrayPool<int>.Shared.Return(ints);
}
}
#endif

public static void Encode(ReadOnlySpan<TimeSpan> data, Stream destination, SchemaElement tse) {
switch(tse.Type) {
case Thrift.Type.INT32:
Expand Down Expand Up @@ -1076,6 +1124,15 @@ public static void FillStats(ReadOnlySpan<DateTime> data, DataColumnStatistics s
stats.MaxValue = max;
}

#if NET6_0_OR_GREATER
public static void FillStats(ReadOnlySpan<DateOnly> data, DataColumnStatistics stats) {
data.MinMax(out DateOnly min, out DateOnly max);
stats.MinValue = min;
stats.MaxValue = max;
}

#endif

public static void FillStats(ReadOnlySpan<TimeSpan> data, DataColumnStatistics stats) {
data.MinMax(out TimeSpan min, out TimeSpan max);
stats.MinValue = min;
Expand Down
Loading

0 comments on commit 26ea729

Please sign in to comment.