Skip to content

Commit

Permalink
DataFrame integration now supports write operations
Browse files Browse the repository at this point in the history
  • Loading branch information
aloneguid authored Apr 19, 2023
1 parent cf9ff20 commit f2c0754
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/full.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: 'Full Workflow'

env:
VERSION: 4.8.0
VERSION: 4.8.1
ASM_VERSION: 4.0.0

on:
Expand Down
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Whether you want to build apps for Linux, MacOS, Windows, iOS, Android, Tizen, X
- [Schema](schema.md)
- [DataColumn](column.md)
- [Nested Types](nested_types.md)
- Integration with [Microsoft.Data.Analysis](dataframe.md).
- [Integration with Microsoft.Data.Analysis](dataframe.md).

Make sure to check out [Apache Parquet Viewer](https://aloneguid.github.io/parquet-online/) built with this library:

Expand Down
36 changes: 28 additions & 8 deletions docs/dataframe.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,42 @@

Since v4.8 support for [`Microsoft.Data.Analysis`](https://www.nuget.org/packages/Microsoft.Data.Analysis) was added.

This is an early draft and you are welcome to create issues/discussions around it.

## What's Supported?

Due to `DataFrame` being in general less functional than Parquet, only primitive (atomic) columns are supported at the moment. If `DataFrame` supports more functionality in future, this integration can be extended.
Due to `DataFrame` being in general less functional than Parquet, only primitive (atomic) columns are supported at the moment. If `DataFrame` supports more functionality in future (see related links below), this integration can be extended.

When reading and writing, this integration will ignore any columns that are not atomic.

## Writing

In addition to this, structs are also supported, as long as they do not contain any maps or lists. Structs are flattened into separate flat columns.
There is magic happening under the hood, but as a user you only need to call `WriteAsync()` extension method on `DataFrame` and specify the destination stream to write it to, like so:

```csharp
DataFrame df;
await df.WriteAsync(stream);
```

## Reading

As with writing, the magic is already done under the hood, so you can use `System.IO.Stream`'s extension method to read from parquet stream into `DataFrame`

```csharp
DataFrame df = await fs.ReadParquetAsDataFrameAsync();
```

## Related Issues
## Samples

For your convenience, there is a [sample Jupyter notebook](../notebooks/read_dataframe.ipynb) available that demonstrates reading parquet files into `DataFrame` and displaying them:

[![](img/ipynb-preview.png)](../notebooks/read_dataframe.ipynb)


In order to run this notebook, you can use [VS Code](https://code.visualstudio.com/) with [Polyglot Notebooks extension](https://marketplace.visualstudio.com/items?itemName=ms-dotnettools.dotnet-interactive-vscode).

## Related Links

- https://github.com/dotnet/machinelearning/issues/6144
- https://github.com/dotnet/machinelearning/issues/6088
- https://github.com/dotnet/machinelearning/issues/5972
- Original blog post "[An Introduction to DataFrame](https://devblogs.microsoft.com/dotnet/an-introduction-to-dataframe/)".
- External GitHub Issues
- [DataFrame (Microsoft.Data.Analysis) Tracking Issue](https://github.com/dotnet/machinelearning/issues/6144).
- [DataFrame enhancements](https://github.com/dotnet/machinelearning/issues/6088).
- [Add parquet support for importing and exporting data to/from DataFrame](https://github.com/dotnet/machinelearning/issues/5972).
Binary file added docs/img/ipynb-preview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
196 changes: 196 additions & 0 deletions notebooks/read_dataframe.ipynb

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion src/Parquet.Test/DataAnalysis/DataFrameReaderTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class DataFrameReaderTest : TestBase {
[InlineData(typeof(bool?), true, null)]
[InlineData(typeof(string), "1", "2")]
[InlineData(typeof(string), null, "2")]
public async Task Read_all_types(Type t, object el1, object el2) {
public async Task Roundtrip_all_types(Type t, object el1, object el2) {

// arrange
using var ms = new MemoryStream();
Expand All @@ -44,6 +44,19 @@ public async Task Read_all_types(Type t, object el1, object el2) {
DataFrame df = await ms.ReadParquetAsDataFrameAsync();

Assert.Equal(data, df.Rows.Select(r => r[0]).ToArray());

// write DataFrame to file
using var ms1 = new MemoryStream();
await df.WriteAsync(ms1);

// validate both are the same
ms1.Position = 0;
DataFrame df1 = await ms1.ReadParquetAsDataFrameAsync();

Assert.Equal(df.Columns.Count, df1.Columns.Count);
for(int i = 0; i < df.Columns.Count; i++) {
Assert.Equal(df.Columns[i], df1.Columns[i]);
}
}

[Fact]
Expand Down
107 changes: 83 additions & 24 deletions src/Parquet/Data/Analysis/DataFrameMapper.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Numerics;
using Microsoft.Data.Analysis;

Expand All @@ -7,84 +8,84 @@ static class DataFrameMapper {
public static DataFrameColumn ToDataFrameColumn(DataColumn dc) {
string colName = string.Join("_", dc.Field.Path.ToList());

if(dc.Field.ClrType == typeof(bool)) {
if(dc.Field.ClrType == typeof(bool)) {
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
return new PrimitiveDataFrameColumn<bool>(colName, (bool[])dc.Data);
} else {
return new PrimitiveDataFrameColumn<bool>(colName, (bool?[])dc.Data);
}
}
if(dc.Field.ClrType == typeof(int)) {
if(dc.Field.ClrType == typeof(int)) {
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
return new PrimitiveDataFrameColumn<int>(colName, (int[])dc.Data);
} else {
return new PrimitiveDataFrameColumn<int>(colName, (int?[])dc.Data);
}
}
if(dc.Field.ClrType == typeof(uint)) {
if(dc.Field.ClrType == typeof(uint)) {
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
return new PrimitiveDataFrameColumn<uint>(colName, (uint[])dc.Data);
} else {
return new PrimitiveDataFrameColumn<uint>(colName, (uint?[])dc.Data);
}
}
if(dc.Field.ClrType == typeof(long)) {
if(dc.Field.ClrType == typeof(long)) {
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
return new PrimitiveDataFrameColumn<long>(colName, (long[])dc.Data);
} else {
return new PrimitiveDataFrameColumn<long>(colName, (long?[])dc.Data);
}
}
if(dc.Field.ClrType == typeof(ulong)) {
if(dc.Field.ClrType == typeof(ulong)) {
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
return new PrimitiveDataFrameColumn<ulong>(colName, (ulong[])dc.Data);
} else {
return new PrimitiveDataFrameColumn<ulong>(colName, (ulong?[])dc.Data);
}
}
if(dc.Field.ClrType == typeof(byte)) {
if(dc.Field.ClrType == typeof(byte)) {
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
return new PrimitiveDataFrameColumn<byte>(colName, (byte[])dc.Data);
} else {
return new PrimitiveDataFrameColumn<byte>(colName, (byte?[])dc.Data);
}
}
if(dc.Field.ClrType == typeof(sbyte)) {
if(dc.Field.ClrType == typeof(sbyte)) {
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
return new PrimitiveDataFrameColumn<sbyte>(colName, (sbyte[])dc.Data);
} else {
return new PrimitiveDataFrameColumn<sbyte>(colName, (sbyte?[])dc.Data);
}
}
if(dc.Field.ClrType == typeof(DateTime)) {
if(dc.Field.ClrType == typeof(DateTime)) {
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
return new PrimitiveDataFrameColumn<DateTime>(colName, (DateTime[])dc.Data);
} else {
return new PrimitiveDataFrameColumn<DateTime>(colName, (DateTime?[])dc.Data);
}
}
if(dc.Field.ClrType == typeof(TimeSpan)) {
if(dc.Field.ClrType == typeof(TimeSpan)) {
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
return new PrimitiveDataFrameColumn<TimeSpan>(colName, (TimeSpan[])dc.Data);
} else {
return new PrimitiveDataFrameColumn<TimeSpan>(colName, (TimeSpan?[])dc.Data);
}
}
if(dc.Field.ClrType == typeof(decimal)) {
if(dc.Field.ClrType == typeof(decimal)) {
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
return new PrimitiveDataFrameColumn<decimal>(colName, (decimal[])dc.Data);
} else {
return new PrimitiveDataFrameColumn<decimal>(colName, (decimal?[])dc.Data);
}
}
if(dc.Field.ClrType == typeof(float)) {
if(dc.Field.ClrType == typeof(float)) {
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
return new PrimitiveDataFrameColumn<float>(colName, (float[])dc.Data);
} else {
return new PrimitiveDataFrameColumn<float>(colName, (float?[])dc.Data);
}
}
if(dc.Field.ClrType == typeof(double)) {
if(dc.Field.ClrType == typeof(double)) {
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
return new PrimitiveDataFrameColumn<double>(colName, (double[])dc.Data);
} else {
Expand All @@ -100,7 +101,7 @@ public static DataFrameColumn ToDataFrameColumn(DataColumn dc) {
}

public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
if(dc.Field.ClrType == typeof(bool)) {
if(dc.Field.ClrType == typeof(bool)) {
var tdfc = (PrimitiveDataFrameColumn<bool>)dfc;
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
foreach(bool el in (bool[])dc.Data) {
Expand All @@ -113,7 +114,7 @@ public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
}
return;
}
if(dc.Field.ClrType == typeof(int)) {
if(dc.Field.ClrType == typeof(int)) {
var tdfc = (PrimitiveDataFrameColumn<int>)dfc;
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
foreach(int el in (int[])dc.Data) {
Expand All @@ -126,7 +127,7 @@ public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
}
return;
}
if(dc.Field.ClrType == typeof(uint)) {
if(dc.Field.ClrType == typeof(uint)) {
var tdfc = (PrimitiveDataFrameColumn<uint>)dfc;
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
foreach(uint el in (uint[])dc.Data) {
Expand All @@ -139,7 +140,7 @@ public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
}
return;
}
if(dc.Field.ClrType == typeof(long)) {
if(dc.Field.ClrType == typeof(long)) {
var tdfc = (PrimitiveDataFrameColumn<long>)dfc;
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
foreach(long el in (long[])dc.Data) {
Expand All @@ -152,7 +153,7 @@ public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
}
return;
}
if(dc.Field.ClrType == typeof(ulong)) {
if(dc.Field.ClrType == typeof(ulong)) {
var tdfc = (PrimitiveDataFrameColumn<ulong>)dfc;
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
foreach(ulong el in (ulong[])dc.Data) {
Expand All @@ -165,7 +166,7 @@ public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
}
return;
}
if(dc.Field.ClrType == typeof(byte)) {
if(dc.Field.ClrType == typeof(byte)) {
var tdfc = (PrimitiveDataFrameColumn<byte>)dfc;
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
foreach(byte el in (byte[])dc.Data) {
Expand All @@ -178,7 +179,7 @@ public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
}
return;
}
if(dc.Field.ClrType == typeof(sbyte)) {
if(dc.Field.ClrType == typeof(sbyte)) {
var tdfc = (PrimitiveDataFrameColumn<sbyte>)dfc;
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
foreach(sbyte el in (sbyte[])dc.Data) {
Expand All @@ -191,7 +192,7 @@ public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
}
return;
}
if(dc.Field.ClrType == typeof(DateTime)) {
if(dc.Field.ClrType == typeof(DateTime)) {
var tdfc = (PrimitiveDataFrameColumn<DateTime>)dfc;
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
foreach(DateTime el in (DateTime[])dc.Data) {
Expand All @@ -204,7 +205,7 @@ public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
}
return;
}
if(dc.Field.ClrType == typeof(TimeSpan)) {
if(dc.Field.ClrType == typeof(TimeSpan)) {
var tdfc = (PrimitiveDataFrameColumn<TimeSpan>)dfc;
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
foreach(TimeSpan el in (TimeSpan[])dc.Data) {
Expand All @@ -217,7 +218,7 @@ public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
}
return;
}
if(dc.Field.ClrType == typeof(decimal)) {
if(dc.Field.ClrType == typeof(decimal)) {
var tdfc = (PrimitiveDataFrameColumn<decimal>)dfc;
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
foreach(decimal el in (decimal[])dc.Data) {
Expand All @@ -230,7 +231,7 @@ public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
}
return;
}
if(dc.Field.ClrType == typeof(float)) {
if(dc.Field.ClrType == typeof(float)) {
var tdfc = (PrimitiveDataFrameColumn<float>)dfc;
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
foreach(float el in (float[])dc.Data) {
Expand All @@ -243,7 +244,7 @@ public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
}
return;
}
if(dc.Field.ClrType == typeof(double)) {
if(dc.Field.ClrType == typeof(double)) {
var tdfc = (PrimitiveDataFrameColumn<double>)dfc;
if(dc.Field.ClrType == dc.Field.ClrNullableIfHasNullsType) {
foreach(double el in (double[])dc.Data) {
Expand All @@ -268,5 +269,63 @@ public static void AppendValues(DataFrameColumn dfc, DataColumn dc) {
throw new NotSupportedException(dc.Field.ClrType.Name);

}

public static Array GetTypedDataFast(DataFrameColumn col) {

if(col.DataType == typeof(bool)) {
return ((PrimitiveDataFrameColumn<bool>)col).ToArray();
}

if(col.DataType == typeof(int)) {
return ((PrimitiveDataFrameColumn<int>)col).ToArray();
}

if(col.DataType == typeof(uint)) {
return ((PrimitiveDataFrameColumn<uint>)col).ToArray();
}

if(col.DataType == typeof(long)) {
return ((PrimitiveDataFrameColumn<long>)col).ToArray();
}

if(col.DataType == typeof(ulong)) {
return ((PrimitiveDataFrameColumn<ulong>)col).ToArray();
}

if(col.DataType == typeof(byte)) {
return ((PrimitiveDataFrameColumn<byte>)col).ToArray();
}

if(col.DataType == typeof(sbyte)) {
return ((PrimitiveDataFrameColumn<sbyte>)col).ToArray();
}

if(col.DataType == typeof(DateTime)) {
return ((PrimitiveDataFrameColumn<DateTime>)col).ToArray();
}

if(col.DataType == typeof(TimeSpan)) {
return ((PrimitiveDataFrameColumn<TimeSpan>)col).ToArray();
}

if(col.DataType == typeof(decimal)) {
return ((PrimitiveDataFrameColumn<decimal>)col).ToArray();
}

if(col.DataType == typeof(float)) {
return ((PrimitiveDataFrameColumn<float>)col).ToArray();
}

if(col.DataType == typeof(double)) {
return ((PrimitiveDataFrameColumn<double>)col).ToArray();
}

// special case
if(col.DataType == typeof(string)) {
return ((StringDataFrameColumn)col).ToArray();
}

throw new NotSupportedException($"type {col.DataType} is not supported (column: {col.Name})");
}
}
}
Loading

0 comments on commit f2c0754

Please sign in to comment.