Skip to content

Commit

Permalink
fix(csharp/src/Drivers/BigQuery): add support for scopes (apache#1482)
Browse files Browse the repository at this point in the history
- Adds support for passing scopes to the BigQueryClient
- Removes many of the null warnings from the pipeline runs
- Improves serialization for complex structs
- Includes test updates

---------

Co-authored-by: David Coe <[email protected]>
  • Loading branch information
2 people authored and soumyadsanyal committed Jan 31, 2024
1 parent af3180e commit b22f183
Show file tree
Hide file tree
Showing 19 changed files with 487 additions and 223 deletions.
3 changes: 1 addition & 2 deletions csharp/src/Apache.Arrow.Adbc/AdbcStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,9 @@ public virtual void Dispose()
/// <param name="index">
/// The index in the array to get the value from.
/// </param>
public virtual object GetValue(IArrowArray arrowArray, Field field, int index)
public virtual object GetValue(IArrowArray arrowArray, int index)
{
if (arrowArray == null) throw new ArgumentNullException(nameof(arrowArray));
if (field == null) throw new ArgumentNullException(nameof(field));
if (index < 0) throw new ArgumentOutOfRangeException(nameof(index));

switch (arrowArray)
Expand Down
2 changes: 1 addition & 1 deletion csharp/src/Client/AdbcDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public ReadOnlyCollection<AdbcColumn> GetAdbcColumnSchema()
public object GetValue(IArrowArray arrowArray, int ordinal)
{
Field field = this.schema.GetFieldByIndex(ordinal);
return this.adbcCommand.AdbcStatement.GetValue(arrowArray, field, this.currentRowInRecordBatch);
return this.adbcCommand.AdbcStatement.GetValue(arrowArray, this.currentRowInRecordBatch);
}

/// <summary>
Expand Down
10 changes: 10 additions & 0 deletions csharp/src/Client/SchemaConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ public static DataTable ConvertArrowSchema(Schema schema, AdbcStatement adbcStat
row[SchemaTableColumn.NumericPrecision] = Convert.ToInt32(f.Metadata["precision"]);
row[SchemaTableColumn.NumericScale] = Convert.ToInt32(f.Metadata["scale"]);
}
else if (f.DataType is Decimal128Type decimal128Type)
{
row[SchemaTableColumn.NumericPrecision] = decimal128Type.Precision;
row[SchemaTableColumn.NumericScale] = decimal128Type.Scale;
}
else if (f.DataType is Decimal256Type decimal256Type)
{
row[SchemaTableColumn.NumericPrecision] = decimal256Type.Precision;
row[SchemaTableColumn.NumericScale] = decimal256Type.Scale;
}
else
{
row[SchemaTableColumn.NumericPrecision] = DBNull.Value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net6.0</TargetFrameworks>
<PackageReadmeFile>readme.md</PackageReadmeFile>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="System.Net.Http.WinHttpHandler" Version="7.0.0" Condition="'$(TargetFrameworkIdentifier)' == '.NETStandard'" />
Expand Down
350 changes: 206 additions & 144 deletions csharp/src/Drivers/BigQuery/BigQueryConnection.cs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions csharp/src/Drivers/BigQuery/BigQueryInfoArrowStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
internal class BigQueryInfoArrowStream : IArrowArrayStream
{
private Schema schema;
private RecordBatch batch;
private RecordBatch? batch;

public BigQueryInfoArrowStream(Schema schema, List<IArrowArray> data)
{
Expand All @@ -38,11 +38,11 @@ public BigQueryInfoArrowStream(Schema schema, List<IArrowArray> data)

public Schema Schema { get { return this.schema; } }

public ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
public ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
RecordBatch batch = this.batch;
RecordBatch? batch = this.batch;
this.batch = null;
return new ValueTask<RecordBatch>(batch);
return new ValueTask<RecordBatch?>(batch);
}

public void Dispose()
Expand Down
1 change: 1 addition & 0 deletions csharp/src/Drivers/BigQuery/BigQueryParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class BigQueryParameters
public const string AllowLargeResults = "adbc.bigquery.allow_large_results";
public const string UseLegacySQL = "adbc.bigquery.use_legacy_sql";
public const string LargeDecimalsAsString = "adbc.bigquery.large_decimals_as_string";
public const string Scopes = "adbc.bigquery.scopes";
}

/// <summary>
Expand Down
137 changes: 130 additions & 7 deletions csharp/src/Drivers/BigQuery/BigQueryStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/

using System;
using System.Collections;
using System.Collections.Generic;
using System.Data.SqlTypes;
using System.IO;
using System.Linq;
using System.Text.Json;
Expand Down Expand Up @@ -87,16 +89,16 @@ private Field TranslateField(TableFieldSchema field)
return new Field(field.Name, TranslateType(field), field.Mode == "NULLABLE");
}

public override object GetValue(IArrowArray arrowArray, Field field, int index)
public override object GetValue(IArrowArray arrowArray, int index)
{
switch(arrowArray)
switch (arrowArray)
{
case StructArray structArray:
return SerializeToJson(structArray, index);
case ListArray listArray:
return listArray.GetSlicedValues(index);
default:
return base.GetValue(arrowArray, field, index);
return base.GetValue(arrowArray, index);
}
}

Expand Down Expand Up @@ -141,7 +143,10 @@ private IArrowType TranslateType(TableFieldSchema field)
return GetType(field, new Decimal128Type(38, 9));

case "BIGNUMERIC" or "BIGDECIMAL":
return bool.Parse(this.Options[BigQueryParameters.LargeDecimalsAsString]) ? GetType(field, StringType.Default) : GetType(field, new Decimal256Type(76, 38));
if (this.Options != null)
return bool.Parse(this.Options[BigQueryParameters.LargeDecimalsAsString]) ? GetType(field, StringType.Default) : GetType(field, new Decimal256Type(76, 38));
else
return GetType(field, StringType.Default);

default: throw new InvalidOperationException($"{field.Type} cannot be translated");
}
Expand Down Expand Up @@ -189,17 +194,135 @@ static IArrowReader ReadChunk(BigQueryReadClient readClient, string streamName)
}

private string SerializeToJson(StructArray structArray, int index)
{
Dictionary<String, object> jsonDictionary = ParseStructArray(structArray, index);

return JsonSerializer.Serialize(jsonDictionary);
}

private Dictionary<String, object> ParseStructArray(StructArray structArray, int index)
{
Dictionary<String, object> jsonDictionary = new Dictionary<String, object>();
StructType structType = (StructType)structArray.Data.DataType;
for (int i = 0; i < structArray.Data.Children.Length; i++)
{
jsonDictionary.Add(structType.Fields[i].Name,
GetValue(structArray.Fields[i], structType.GetFieldByName(structType.Fields[i].Name), index));
string name = structType.Fields[i].Name;
object value = GetValue(structArray.Fields[i], index);

if (value is StructArray structArray1)
{
List<Dictionary<string, object>> children = new List<Dictionary<string, object>>();

if (structArray1.Length > 1)
{
for (int j = 0; j < structArray1.Length; j++)
children.Add(ParseStructArray(structArray1, j));
}

if (children.Count > 0)
{
jsonDictionary.Add(name, children);
}
else
{
jsonDictionary.Add(name, ParseStructArray(structArray1, index));
}
}
else if (value is IArrowArray arrowArray)
{
IList? values = CreateList(arrowArray);

if (values != null)
{
for (int j = 0; j < arrowArray.Length; j++)
{
values.Add(base.GetValue(arrowArray, j));
}

jsonDictionary.Add(name, values);
}
else
{
jsonDictionary.Add(name, new List<object>());
}
}
else
{
jsonDictionary.Add(name, value);
}
}

return jsonDictionary;
}

private IList? CreateList(IArrowArray arrowArray)
{
if (arrowArray == null) throw new ArgumentNullException(nameof(arrowArray));

switch (arrowArray)
{
case BooleanArray booleanArray:
return new List<bool>();
case Date32Array date32Array:
case Date64Array date64Array:
return new List<DateTime>();
case Decimal128Array decimal128Array:
return new List<SqlDecimal>();
case Decimal256Array decimal256Array:
return new List<string>();
case DoubleArray doubleArray:
return new List<double>();
case FloatArray floatArray:
return new List<float>();
#if NET5_0_OR_GREATER
case PrimitiveArray<Half> halfFloatArray:
return new List<Half>();
#endif
case Int8Array int8Array:
return new List<sbyte>();
case Int16Array int16Array:
return new List<short>();
case Int32Array int32Array:
return new List<int>();
case Int64Array int64Array:
return new List<long>();
case StringArray stringArray:
return new List<string>();
#if NET6_0_OR_GREATER
case Time32Array time32Array:
case Time64Array time64Array:
return new List<TimeOnly>();
#else
case Time32Array time32Array:
case Time64Array time64Array:
return new List<TimeSpan>();
#endif
case TimestampArray timestampArray:
return new List<DateTimeOffset>();
case UInt8Array uInt8Array:
return new List<byte>();
case UInt16Array uInt16Array:
return new List<ushort>();
case UInt32Array uInt32Array:
return new List<uint>();
case UInt64Array uInt64Array:
return new List<ulong>();

case BinaryArray binaryArray:
return new List<byte>();

// not covered:
// -- struct array
// -- dictionary array
// -- fixed size binary
// -- list array
// -- union array
}
return JsonSerializer.Serialize(jsonDictionary);

return null;
}


class MultiArrowReader : IArrowArrayStream
{
readonly Schema schema;
Expand Down
6 changes: 3 additions & 3 deletions csharp/src/Drivers/BigQuery/BigQueryTokenResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
internal class BigQueryTokenResponse
{
[JsonPropertyName("access_token")]
public string AccessToken { get; set; }
public string? AccessToken { get; set; }

[JsonPropertyName("expires_in")]
public int ExpiresIn { get; set; }

[JsonPropertyName("scope")]
public string Scope { get; set; }
public string? Scope { get; set; }

[JsonPropertyName("token_type")]
public string TokenType { get; set; }
public string? TokenType { get; set; }
}
}
3 changes: 3 additions & 0 deletions csharp/src/Drivers/BigQuery/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ https://cloud.google.com/dotnet/docs/reference/Google.Cloud.BigQuery.V2/latest/G
**adbc.bigquery.refresh_token**<br>
&nbsp;&nbsp;&nbsp;&nbsp;The refresh token used for when the generated OAuth token expires. Required for `user` authentication.

**adbc.bigquery.scopes**<br>
&nbsp;&nbsp;&nbsp;&nbsp;Optional. Comma separated list of scopes to include for the credential.

**adbc.bigquery.use_legacy_sql**<br>
&nbsp;&nbsp;&nbsp;&nbsp;Sets the [UseLegacySql](https://cloud.google.com/dotnet/docs/reference/Google.Cloud.BigQuery.V2/latest/Google.Cloud.BigQuery.V2.QueryOptions#Google_Cloud_BigQuery_V2_QueryOptions_UseLegacySql) value of the QueryOptions to `true` if configured; otherwise, the default is `false`.

Expand Down
2 changes: 1 addition & 1 deletion csharp/test/Apache.Arrow.Adbc.Tests/Client/ClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private AdbcDataReader GetMoqDataReader(DecimalBehavior decimalBehavior, string

Mock<AdbcStatement> mockStatement = new Mock<AdbcStatement>();
mockStatement.Setup(x => x.ExecuteQuery()).Returns(queryResult); ;
mockStatement.Setup(x => x.GetValue(It.IsAny<IArrowArray>(), It.IsAny<Field>(), It.IsAny<int>())).Returns(sqlDecimal);
mockStatement.Setup(x => x.GetValue(It.IsAny<IArrowArray>(), It.IsAny<int>())).Returns(sqlDecimal);

Adbc.Client.AdbcConnection mockConnection = new Adbc.Client.AdbcConnection();
mockConnection.DecimalBehavior = decimalBehavior;
Expand Down
4 changes: 2 additions & 2 deletions csharp/test/Apache.Arrow.Adbc.Tests/ClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ public static void CanClientExecuteQuery(Adbc.Client.AdbcConnection adbcConnecti
/// <param name="sampleDataBuilder">The <see cref="SampleDataBuilder"/> to use</param>
public static void VerifyTypesAndValues(Adbc.Client.AdbcConnection adbcConnection, SampleDataBuilder sampleDataBuilder)
{
if(adbcConnection == null) throw new ArgumentNullException(nameof(adbcConnection));
if(sampleDataBuilder == null) throw new ArgumentNullException(nameof(sampleDataBuilder));
if (adbcConnection == null) throw new ArgumentNullException(nameof(adbcConnection));
if (sampleDataBuilder == null) throw new ArgumentNullException(nameof(sampleDataBuilder));

adbcConnection.Open();

Expand Down
4 changes: 2 additions & 2 deletions csharp/test/Apache.Arrow.Adbc.Tests/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static bool CanExecuteTest(string environmentVariable, out string environ
public static T LoadTestConfiguration<T>(string environmentVariable)
where T : TestConfiguration
{
if(CanExecuteTest(environmentVariable, out string environmentValue))
if (CanExecuteTest(environmentVariable, out string environmentValue))
return GetTestConfiguration<T>(environmentValue);

throw new InvalidOperationException($"Cannot execute test configuration from environment variable `{environmentVariable}`");
Expand All @@ -92,7 +92,7 @@ public static T LoadTestConfiguration<T>(string environmentVariable)
public static T GetTestConfiguration<T>(string fileName)
where T : TestConfiguration
{
if(!File.Exists(fileName))
if (!File.Exists(fileName))
throw new FileNotFoundException(fileName);

// use a JSON file for the various settings
Expand Down
56 changes: 56 additions & 0 deletions csharp/test/Drivers/BigQuery/BigQueryData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,62 @@ public static SampleDataBuilder GetSampleData()
}
});

// complex struct
sampleDataBuilder.Samples.Add(
new SampleData()
{
Query = "SELECT " +
"STRUCT(" +
"\"Iron Man\" as name," +
"\"Avengers\" as team," +
"[\"Genius\", \"Billionaire\", \"Playboy\", \"Philanthropist\"] as powers," +
"[" +
" STRUCT(" +
" \"Captain America\" as name, " +
" \"Avengers\" as team, " +
" [\"Super Soldier Serum\", \"Vibranium Shield\"] as powers, " +
" [" +
" STRUCT(" +
" \"Thanos\" as name, " +
" \"Black Order\" as team, " +
" [\"Infinity Gauntlet\", \"Super Strength\", \"Teleportation\"] as powers, " +
" [" +
" STRUCT(" +
" \"Loki\" as name, " +
" \"Asgard\" as team, " +
" [\"Magic\", \"Shapeshifting\", \"Trickery\"] as powers " +
" )" +
" ] as allies" +
" )" +
" ] as enemies" +
" )," +
" STRUCT(" +
" \"Spider-Man\" as name, " +
" \"Avengers\" as team, " +
" [\"Spider-Sense\", \"Web-Shooting\", \"Wall-Crawling\"] as powers, " +
" [" +
" STRUCT(" +
" \"Green Goblin\" as name, " +
" \"Sinister Six\" as team, " +
" [\"Glider\", \"Pumpkin Bombs\", \"Super Strength\"] as powers, " +
" [" +
" STRUCT(" +
" \"Doctor Octopus\" as name, " +
" \"Sinister Six\" as team, " +
" [\"Mechanical Arms\", \"Genius\", \"Madness\"] as powers " +
" )" +
" ] as allies" +
" )" +
" ] as enemies" +
" )" +
" ] as friends" +
") as iron_man",
ExpectedValues = new List<ColumnNetTypeArrowTypeValue>()
{
new ColumnNetTypeArrowTypeValue("iron_man", typeof(string), typeof(StringType), "{\"name\":\"Iron Man\",\"team\":\"Avengers\",\"powers\":[\"Genius\",\"Billionaire\",\"Playboy\",\"Philanthropist\"],\"friends\":[{\"name\":\"Captain America\",\"team\":\"Avengers\",\"powers\":[\"Super Soldier Serum\",\"Vibranium Shield\"],\"enemies\":{\"name\":\"Thanos\",\"team\":\"Black Order\",\"powers\":[\"Infinity Gauntlet\",\"Super Strength\",\"Teleportation\"],\"allies\":{\"name\":\"Loki\",\"team\":\"Asgard\",\"powers\":[\"Magic\",\"Shapeshifting\",\"Trickery\"]}}},{\"name\":\"Spider-Man\",\"team\":\"Avengers\",\"powers\":[\"Spider-Sense\",\"Web-Shooting\",\"Wall-Crawling\"],\"enemies\":{\"name\":\"Green Goblin\",\"team\":\"Sinister Six\",\"powers\":[\"Glider\",\"Pumpkin Bombs\",\"Super Strength\"],\"allies\":{\"name\":\"Doctor Octopus\",\"team\":\"Sinister Six\",\"powers\":[\"Mechanical Arms\",\"Genius\",\"Madness\"]}}}]}")
}
});

return sampleDataBuilder;
}
}
Expand Down
Loading

0 comments on commit b22f183

Please sign in to comment.