Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/respect ioxcolumn typefield metadata #131

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions Client.Test/InfluxDBClientQueryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Threading.Tasks;
using Apache.Arrow;
using Apache.Arrow.Types;
using InfluxDB3.Client.Internal;
using InfluxDB3.Client.Query;
using Moq;
Expand Down Expand Up @@ -130,4 +131,61 @@ public async Task PassHeadersToFlightClient()
.ToListAsync();
mockFlightSqlClient.Verify(m => m.Execute(query, "my-db", queryType, new Dictionary<string, object>(), headers), Times.Exactly(1));
}

// todo: remove
// [Test]
// public async Task bar()
// {
// var hostCloud = "https://us-east-1-1.aws.cloud2.influxdata.com";
// var adminTokenCloud =
// "xWh3VQCb3pMJPw7T2lnEwFLXO-pb4OWzfNN76UTpmRKtlg83yJlz6maLC3AL0B6M6gMWWZY2QApzSdEeEopWlQ==";
// using var client = new InfluxDBClient(
// hostCloud,
// token: adminTokenCloud,
// organization: "admin",
// database: "admin");
// const string query = "select * from host10";
// var a = await client.QueryPoints(query).ToListAsync();
// var meta = new Dictionary<string, string>
// {
// {
// "iox::column::type", "iox::column_type::field::integer"
// }
// };
//
// Field intField = new Field("column2", Int32Type.Default, true, meta);
//
// }

// todo: remove
// [Test]
// public void fooTest()
// {
// var meta = new Dictionary<string, string>
// {
// {
// "iox::column::type", "iox::column_type::field::integer"
// }
// };
//
// Field intField = new Field("column2", Int32Type.Default, true, meta);
// Int32Array intArray = new Int32Array.Builder().Append(1).Append(2).AppendNull().Append(4).Build();
// Schema schema1 = new Schema(new[] { intField, }, null);
// RecordBatch recordBatch = new RecordBatch(schema1, new IArrowArray[] { intArray }, intArray.Length);
//
// var rowCount = recordBatch.Column(0).Length;
// for (var i = 0; i < rowCount; i++)
// {
// for (var j = 0; j < recordBatch.ColumnCount; j++)
// {
// var schema = recordBatch.Schema.FieldsList[j];
// var type = schema.Metadata["iox::column::type"];
// var parts = type.Split(new[] { ':' }, StringSplitOptions.RemoveEmptyEntries);
// var valueType = parts[2];
//
// Console.WriteLine(valueType);
// }
// }
//
// }
}
138 changes: 129 additions & 9 deletions Client/InfluxDBClient.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Numerics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow;
using Apache.Arrow.Types;
using InfluxDB3.Client.Config;
using InfluxDB3.Client.Internal;
using InfluxDB3.Client.Query;
Expand Down Expand Up @@ -465,7 +468,10 @@
{
if (batch.Column(j) is ArrowArray array)
{
row.Add(array.GetObjectValue(i));
var mappedValue = GetMappedValue(
batch.Schema.FieldsList[j],
array.GetObjectValue(i));
row.Add(mappedValue);
}
}

Expand Down Expand Up @@ -538,10 +544,13 @@
var objectValue = array.GetObjectValue(i);
if (objectValue is null)
continue;

if ((fullName == "measurement" || fullName == "iox::measurement") && objectValue is string)
if (objectValue is StringType arrowString)
objectValue = arrowString.ToString();

if (fullName is "measurement" or "iox::measurement" &&
objectValue is string value)
{
point = point.SetMeasurement((string)objectValue);
point = point.SetMeasurement(value);
continue;
}

Expand All @@ -562,18 +571,21 @@
var parts = type.Split(new[] { ':' }, StringSplitOptions.RemoveEmptyEntries);
var valueType = parts[2];
// string fieldType = parts.Length > 3 ? parts[3] : "";
var mappedValue = GetMappedValue(schema, objectValue);
if (mappedValue is null)
continue;

if (valueType == "field")
{
point = point.SetField(fullName, objectValue);
point = point.SetField(fullName, mappedValue);
}
else if (valueType == "tag")
{
point = point.SetTag(fullName, (string)objectValue);
point = point.SetTag(fullName, (string)mappedValue);
}
else if (valueType == "timestamp" && objectValue is DateTimeOffset timestamp)
else if (valueType == "timestamp")
{
point = point.SetTimestamp(timestamp);
point = point.SetTimestamp((BigInteger)mappedValue);
}
}

Expand Down Expand Up @@ -853,7 +865,7 @@
if (handler.SupportsAutomaticDecompression)
{
handler.AutomaticDecompression =
System.Net.DecompressionMethods.GZip | System.Net.DecompressionMethods.Deflate;
DecompressionMethods.GZip | DecompressionMethods.Deflate;
}

if (handler.SupportsProxy && config.Proxy != null)
Expand All @@ -874,7 +886,7 @@
client.DefaultRequestHeaders.UserAgent.ParseAdd(AssemblyHelper.GetUserAgent());
if (!string.IsNullOrEmpty(config.Token))
{
string authScheme = string.IsNullOrEmpty(config.AuthScheme) ? "Token" : config.AuthScheme;

Check warning on line 889 in Client/InfluxDBClient.cs

View workflow job for this annotation

GitHub Actions / CodeQL-Build

Converting null literal or possible null value to non-nullable type.

Check warning on line 889 in Client/InfluxDBClient.cs

View workflow job for this annotation

GitHub Actions / CodeQL-Build

Converting null literal or possible null value to non-nullable type.
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(authScheme, config.Token);
}

Expand All @@ -885,6 +897,114 @@
{
return $"Please specify the '{property}' as a method parameter or use default configuration " +
$"at 'ClientConfig.{property[0].ToString().ToUpper()}{property.Substring(1)}'.";
}

/// <summary>
/// Function to cast value return base on metadata from InfluxDB.
/// </summary>
/// <param name="field">The Field object from Arrow</param>
/// <param name="value">The value to cast</param>
/// <returns>The casted value</returns>
private static object? GetMappedValue(Field field, object? value)
{
if (value is null)
{
return null;
}

var fieldName = field.Name;
if (fieldName is "measurement" or "iox::measurement")
{
return Convert.ToString(value);
}

var metaType = field.HasMetadata ? field.Metadata["iox::column::type"] : null;
if (metaType == null)
{
if (fieldName == "time" && value is DateTimeOffset timeOffset)
{
return TimestampConverter.GetNanoTime(timeOffset.UtcDateTime);
}

return value;
}

var parts = metaType.Split(new[] { ':' }, StringSplitOptions.RemoveEmptyEntries);
var valueType = parts[2];
if (valueType == "field")
{
switch (metaType)
{
case "iox::column_type::field::integer":
if (IsNumber(value))
{
return Convert.ToInt64(value);
}
Trace.TraceWarning($"The field [{fieldName}] is not an integer");
return value;
case "iox::column_type::field::uinteger":
if (IsNumber(value))
{
return Convert.ToUInt64(value);
}
Trace.TraceWarning($"The field [{fieldName}] is not an uinteger");
return value;
case "iox::column_type::field::float":
if (IsNumber(value))
{
return Convert.ToDouble(value);
}

Trace.TraceWarning($"The field [{fieldName}] is not a double");
return value;
case "iox::column_type::field::string":
if (value is string)
{
return value;
}

if (value is StringType v)
{
return v.ToString();
}

Trace.TraceWarning($"The field [{fieldName}] is not a string");
return value;
case "iox::column_type::field::boolean":
if (value is bool)
{
return Convert.ToBoolean(value);
}
Trace.TraceWarning($"The field [{fieldName}] is not a boolean");
return value;
default:
return value;
}
}

if (valueType == "timestamp" && value is DateTimeOffset dateTimeOffset)
{
return TimestampConverter.GetNanoTime(dateTimeOffset.UtcDateTime);
}

return value;
}

private static bool IsNumber(object? value)
{
return value is sbyte
or byte
or short
or ushort
or int
or uint
or long
or ulong
or float
or double
or decimal
or BigInteger;
}
}

}
25 changes: 25 additions & 0 deletions Client/Internal/TimestampConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using System.Numerics;

namespace InfluxDB3.Client.Internal;

public class TimestampConverter
{
private static readonly DateTime EpochStart = new(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);

/// <summary>
/// Get Nano time from Datetime and EpochStart time .
/// </summary>
/// <param name="timestamp">the Datetime object</param>
/// <returns>the time span object</returns>
public static BigInteger GetNanoTime(DateTime dateTime)
{
var utcTimestamp = dateTime.Kind switch
{
DateTimeKind.Local => dateTime.ToUniversalTime(),
DateTimeKind.Unspecified => DateTime.SpecifyKind(dateTime, DateTimeKind.Utc),
_ => dateTime
};
return utcTimestamp.Subtract(EpochStart).Ticks * 100;
}
}
11 changes: 11 additions & 0 deletions Client/Write/PointDataValues.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ public PointDataValues SetTimestamp(long timestamp, WritePrecision? timeUnit = n
return this;
}

/// <summary>
/// Updates the timestamp for the point.
/// </summary>
/// <param name="timestamp">the timestamp in nanosecond</param>
/// <returns></returns>
public PointDataValues SetTimestamp(BigInteger timestamp)
{
_time = timestamp;
return this;
}

/// <summary>
/// Updates the timestamp for the point represented by <see cref="TimeSpan"/>.
/// </summary>
Expand Down
Loading