From 52ed008750ee5cf8b8b76804529162694b30b733 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Wed, 27 Mar 2024 14:38:48 +0100 Subject: [PATCH 1/5] feat: custom headers can be specified for queries --- .github/ISSUE_TEMPLATE/SUPPORT.yml | 2 +- Client.Test/InfluxDBClientQueryTest.cs | 35 ++++- Client.Test/Internal/FlightSqlClientTest.cs | 77 ++++++++++- Client.Test/MockServerTest.cs | 2 +- Client/Config/ClientConfig.cs | 19 ++- Client/InfluxDBClient.cs | 134 +++++++++++++++++--- Client/Internal/FlightSqlClient.cs | 46 +++++-- 7 files changed, 283 insertions(+), 32 deletions(-) diff --git a/.github/ISSUE_TEMPLATE/SUPPORT.yml b/.github/ISSUE_TEMPLATE/SUPPORT.yml index ca228bf..b37d9fe 100644 --- a/.github/ISSUE_TEMPLATE/SUPPORT.yml +++ b/.github/ISSUE_TEMPLATE/SUPPORT.yml @@ -8,7 +8,7 @@ body: WOAHH, hold up. This isn't the best place for support questions. You can get a faster response on slack or forums: - Please redirect any QUESTIONS about Telegraf usage to + Please redirect any QUESTIONS about Client usage to - InfluxData Slack Channel: https://app.slack.com/huddle/TH8RGQX5Z/C02UDUPLQKA - InfluxData Community Site: https://community.influxdata.com diff --git a/Client.Test/InfluxDBClientQueryTest.cs b/Client.Test/InfluxDBClientQueryTest.cs index 976d2ee..e34008d 100644 --- a/Client.Test/InfluxDBClientQueryTest.cs +++ b/Client.Test/InfluxDBClientQueryTest.cs @@ -54,7 +54,7 @@ public async Task PassNamedParametersToFlightClient() var mockFlightSqlClient = new Mock(); mockFlightSqlClient .Setup(m => m.Execute(It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny>())) + It.IsAny>(), It.IsAny>())) .Returns(new List().ToAsyncEnumerable()); // @@ -76,7 +76,7 @@ public async Task PassNamedParametersToFlightClient() _ = await _client.QueryPoints(query, database: "my-db", queryType: queryType, namedParameters: namedParameters) .ToListAsync(); - mockFlightSqlClient.Verify(m => m.Execute(query, "my-db", queryType, namedParameters), Times.Exactly(1)); + mockFlightSqlClient.Verify(m => m.Execute(query, "my-db", queryType, namedParameters, new Dictionary()), Times.Exactly(1)); } [Test] @@ -99,4 +99,35 @@ public void NotSupportedQueryParameterType() Is.EqualTo( "The parameter 'location' has unsupported type 'System.DateTime'. The supported types are 'string', 'bool', 'int' and 'float'.")); } + + [Test] + public async Task PassHeadersToFlightClient() + { + // + // Mock the FlightSqlClient + // + var mockFlightSqlClient = new Mock(); + mockFlightSqlClient + .Setup(m => m.Execute(It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny>(), It.IsAny>())) + .Returns(new List().ToAsyncEnumerable()); + + // + // Setup the client with the mocked FlightSqlClient + // + _client = new InfluxDBClient(MockServerUrl); + _client.FlightSqlClient.Dispose(); + _client.FlightSqlClient = mockFlightSqlClient.Object; + + const string query = "select * from cpu"; + const QueryType queryType = QueryType.SQL; + + var headers = new Dictionary{ + { + "X-Tracing-Id", "123" + }}; + _ = await _client.QueryPoints(query, database: "my-db", queryType: queryType, headers: headers) + .ToListAsync(); + mockFlightSqlClient.Verify(m => m.Execute(query, "my-db", queryType, new Dictionary(), headers), Times.Exactly(1)); + } } \ No newline at end of file diff --git a/Client.Test/Internal/FlightSqlClientTest.cs b/Client.Test/Internal/FlightSqlClientTest.cs index ac0c575..282d5e4 100644 --- a/Client.Test/Internal/FlightSqlClientTest.cs +++ b/Client.Test/Internal/FlightSqlClientTest.cs @@ -23,8 +23,8 @@ public void SetUp() _flightSqlClient = new FlightSqlClient(config, InfluxDBClient.CreateAndConfigureHttpClient(config)); } - [TearDownAttribute] - public void TearDownAttribute() + [TearDown] + public new void TearDown() { _flightSqlClient.Dispose(); } @@ -73,4 +73,77 @@ public void PrepareFlightTicketNamedParameters() Assert.That(Encoding.UTF8.GetString(prepareFlightTicket.Ticket.ToByteArray()), Is.EqualTo(ticket)); }); } + + [Test] + public void HeadersMetadataFromRequest() + { + var prepareHeadersMetadata = + _flightSqlClient.PrepareHeadersMetadata(new Dictionary { { "X-Tracing-Id", "987" } }); + + Assert.Multiple(() => + { + Assert.That(prepareHeadersMetadata, Is.Not.Null); + Assert.That(prepareHeadersMetadata, Has.Count.EqualTo(1)); + Assert.That(prepareHeadersMetadata[0].Key, Is.EqualTo("x-tracing-id")); + Assert.That(prepareHeadersMetadata[0].Value, Is.EqualTo("987")); + }); + } + + [Test] + public void HeadersMetadataFromConfig() + { + _flightSqlClient.Dispose(); + + var config = new ClientConfig + { + Host = MockServerUrl, + Timeout = TimeSpan.FromSeconds(45), + Headers = new Dictionary + { + { "X-Global-Tracing-Id", "123" } + } + }; + + _flightSqlClient = new FlightSqlClient(config, InfluxDBClient.CreateAndConfigureHttpClient(config)); + + var prepareHeadersMetadata = + _flightSqlClient.PrepareHeadersMetadata(new Dictionary()); + + Assert.Multiple(() => + { + Assert.That(prepareHeadersMetadata, Is.Not.Null); + Assert.That(prepareHeadersMetadata, Has.Count.EqualTo(1)); + Assert.That(prepareHeadersMetadata[0].Key, Is.EqualTo("x-global-tracing-id")); + Assert.That(prepareHeadersMetadata[0].Value, Is.EqualTo("123")); + }); + } + + [Test] + public void HeadersMetadataFromRequestArePreferred() + { + _flightSqlClient.Dispose(); + + var config = new ClientConfig + { + Host = MockServerUrl, + Timeout = TimeSpan.FromSeconds(45), + Headers = new Dictionary + { + { "X-Tracing-Id", "ABC" } + } + }; + + _flightSqlClient = new FlightSqlClient(config, InfluxDBClient.CreateAndConfigureHttpClient(config)); + + var prepareHeadersMetadata = + _flightSqlClient.PrepareHeadersMetadata(new Dictionary { { "X-Tracing-Id", "258" } }); + + Assert.Multiple(() => + { + Assert.That(prepareHeadersMetadata, Is.Not.Null); + Assert.That(prepareHeadersMetadata, Has.Count.EqualTo(1)); + Assert.That(prepareHeadersMetadata[0].Key, Is.EqualTo("x-tracing-id")); + Assert.That(prepareHeadersMetadata[0].Value, Is.EqualTo("258")); + }); + } } \ No newline at end of file diff --git a/Client.Test/MockServerTest.cs b/Client.Test/MockServerTest.cs index 7391620..99fcec0 100644 --- a/Client.Test/MockServerTest.cs +++ b/Client.Test/MockServerTest.cs @@ -36,7 +36,7 @@ public void OneTimeSetUp() MockProxyUrl = MockProxy.Urls[0]; } - [OneTimeTearDownAttribute] + [OneTimeTearDown] public void OneTimeTearDownAttribute() { MockServer.Dispose(); diff --git a/Client/Config/ClientConfig.cs b/Client/Config/ClientConfig.cs index 4be4f45..fb55595 100644 --- a/Client/Config/ClientConfig.cs +++ b/Client/Config/ClientConfig.cs @@ -111,9 +111,24 @@ public string Host public string? Database { get; set; } /// - /// The set of HTTP headers to be included in requests. + /// The custom headers that will be added to requests. This is useful for adding custom headers to requests, + /// such as tracing headers. To add custom headers use following code: + /// + /// + /// using var client = new InfluxDBClient(new ClientConfig + /// { + /// Host = "https://us-east-1-1.aws.cloud2.influxdata.com", + /// Token = "my-token", + /// Organization = "my-org", + /// Database = "my-database", + /// Headers = new Dictionary<string, string> + /// { + /// { "X-Tracing-Id", "123" }, + /// } + /// }); + /// /// - public Dictionary? Headers { get; set; } + public Dictionary? Headers { get; set; } /// /// Timeout to wait before the HTTP request times out. Default to '10 seconds'. diff --git a/Client/InfluxDBClient.cs b/Client/InfluxDBClient.cs index b0dd7cd..77c4056 100644 --- a/Client/InfluxDBClient.cs +++ b/Client/InfluxDBClient.cs @@ -19,7 +19,8 @@ public interface IInfluxDBClient : IDisposable { /// /// Query data from InfluxDB IOx using FlightSQL. - /// + /// + /// /// /// The following example shows how to use SQL query with named parameters: /// @@ -32,7 +33,19 @@ public interface IInfluxDBClient : IDisposable /// ); /// /// - /// + /// + /// + /// The following example shows how to use custom request headers: + /// + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// var results = client.Query( + /// query: "SELECT a, b, c FROM my_table", + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// The SQL query string to execute. /// The type of query sent to InfluxDB. Default to 'SQL'. /// The database to be used for InfluxDB operations. @@ -41,18 +54,22 @@ public interface IInfluxDBClient : IDisposable /// '$placeholder' syntax in the query will be substituted with the values provided. /// The supported types are: string, bool, int, float. /// + /// + /// The headers to be added to query request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// Batches of rows /// The client is already disposed IAsyncEnumerable Query(string query, QueryType? queryType = null, string? database = null, - Dictionary? namedParameters = null); + Dictionary? namedParameters = null, Dictionary? headers = null); /// /// Query data from InfluxDB IOx using FlightSQL. /// - /// + /// /// /// The following example shows how to use SQL query with named parameters: - /// + /// /// /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); /// @@ -62,6 +79,19 @@ public interface IInfluxDBClient : IDisposable /// ); /// /// + /// + /// + /// The following example shows how to use custom request headers: + /// + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// var results = client.QueryBatches( + /// query: "SELECT a, b, c FROM my_table", + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// The SQL query string to execute. /// The type of query sent to InfluxDB. Default to 'SQL'. /// The database to be used for InfluxDB operations. @@ -70,10 +100,14 @@ public interface IInfluxDBClient : IDisposable /// '$placeholder' syntax in the query will be substituted with the values provided. /// The supported types are: string, bool, int, float. /// + /// + /// The headers to be added to query request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// Batches of rows /// The client is already disposed IAsyncEnumerable QueryBatches(string query, QueryType? queryType = null, string? database = null, - Dictionary? namedParameters = null); + Dictionary? namedParameters = null, Dictionary? headers = null); /// /// Query data from InfluxDB IOx into PointData structure using FlightSQL. @@ -91,6 +125,18 @@ IAsyncEnumerable QueryBatches(string query, QueryType? queryType = /// ); /// /// + /// + /// The following example shows how to use custom request headers: + /// + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// var results = client.QueryPoints( + /// query: "SELECT a, b, c FROM my_table", + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// The SQL query string to execute. /// The type of query sent to InfluxDB. Default to 'SQL'. /// The database to be used for InfluxDB operations. @@ -99,10 +145,15 @@ IAsyncEnumerable QueryBatches(string query, QueryType? queryType = /// '$placeholder' syntax in the query will be substituted with the values provided. /// The supported types are: string, bool, int, float. /// + /// + /// The headers to be added to query request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// Batches of rows /// The client is already disposed IAsyncEnumerable QueryPoints(string query, QueryType? queryType = null, - string? database = null, Dictionary? namedParameters = null); + string? database = null, Dictionary? namedParameters = null, + Dictionary? headers = null); /// /// Write data to InfluxDB. @@ -294,6 +345,19 @@ public InfluxDBClient() : this( /// ); /// /// + /// + /// + /// The following example shows how to use custom request headers: + /// + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// var results = client.Query( + /// query: "SELECT a, b, c FROM my_table", + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// The SQL query string to execute. /// The type of query sent to InfluxDB. Default to 'SQL'. /// The database to be used for InfluxDB operations. @@ -302,12 +366,17 @@ public InfluxDBClient() : this( /// '$placeholder' syntax in the query will be substituted with the values provided. /// The supported types are: string, bool, int, float. /// + /// + /// The headers to be added to query request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// Batches of rows /// The client is already disposed public async IAsyncEnumerable Query(string query, QueryType? queryType = null, - string? database = null, Dictionary? namedParameters = null) + string? database = null, Dictionary? namedParameters = null, + Dictionary? headers = null) { - await foreach (var batch in QueryBatches(query, queryType, database, namedParameters).ConfigureAwait(false)) + await foreach (var batch in QueryBatches(query, queryType, database, namedParameters, headers).ConfigureAwait(false)) { var rowCount = batch.Column(0).Length; for (var i = 0; i < rowCount; i++) @@ -342,6 +411,18 @@ public InfluxDBClient() : this( /// ); /// /// + /// + /// The following example shows how to use custom request headers: + /// + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// var results = client.QueryPoints( + /// query: "SELECT a, b, c FROM my_table", + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// The SQL query string to execute. /// The type of query sent to InfluxDB. Default to 'SQL'. /// The database to be used for InfluxDB operations. @@ -350,12 +431,16 @@ public InfluxDBClient() : this( /// '$placeholder' syntax in the query will be substituted with the values provided. /// The supported types are: string, bool, int, float. /// + /// + /// The headers to be added to query request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// Batches of rows /// The client is already disposed public async IAsyncEnumerable QueryPoints(string query, QueryType? queryType = null, - string? database = null, Dictionary? namedParameters = null) + string? database = null, Dictionary? namedParameters = null, Dictionary? headers = null) { - await foreach (var batch in QueryBatches(query, queryType, database, namedParameters).ConfigureAwait(false)) + await foreach (var batch in QueryBatches(query, queryType, database, namedParameters, headers).ConfigureAwait(false)) { var rowCount = batch.Column(0).Length; for (var i = 0; i < rowCount; i++) @@ -419,10 +504,10 @@ public async IAsyncEnumerable QueryPoints(string query, QueryTy /// /// Query data from InfluxDB IOx using FlightSQL. /// - /// + /// /// /// The following example shows how to use SQL query with named parameters: - /// + /// /// /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); /// @@ -432,6 +517,19 @@ public async IAsyncEnumerable QueryPoints(string query, QueryTy /// ); /// /// + /// + /// + /// The following example shows how to use custom request headers: + /// + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// var results = client.QueryBatches( + /// query: "SELECT a, b, c FROM my_table", + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// The SQL query string to execute. /// The type of query sent to InfluxDB. Default to 'SQL'. /// The database to be used for InfluxDB operations. @@ -440,10 +538,15 @@ public async IAsyncEnumerable QueryPoints(string query, QueryTy /// '$placeholder' syntax in the query will be substituted with the values provided. /// The supported types are: string, bool, int, float. /// + /// + /// The headers to be added to query request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// Batches of rows /// The client is already disposed public IAsyncEnumerable QueryBatches(string query, QueryType? queryType = null, - string? database = null, Dictionary? namedParameters = null) + string? database = null, Dictionary? namedParameters = null, + Dictionary? headers = null) { if (_disposed) { @@ -453,7 +556,8 @@ public IAsyncEnumerable QueryBatches(string query, QueryType? query return FlightSqlClient.Execute(query, (database ?? _config.Database) ?? throw new InvalidOperationException(OptionMessage("database")), queryType ?? QueryType.SQL, - namedParameters ?? new Dictionary()); + namedParameters ?? new Dictionary(), + headers ?? new Dictionary()); } /// diff --git a/Client/Internal/FlightSqlClient.cs b/Client/Internal/FlightSqlClient.cs index dfb84b2..b7c6f7a 100644 --- a/Client/Internal/FlightSqlClient.cs +++ b/Client/Internal/FlightSqlClient.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Linq; using System.Net.Http; using System.Runtime.Serialization; using System.Runtime.Serialization.Json; @@ -24,13 +25,20 @@ internal interface IFlightSqlClient : IDisposable /// Execute the query and return the result as a sequence of record batches. /// internal IAsyncEnumerable Execute(string query, string database, QueryType queryType, - Dictionary namedParameters); + Dictionary namedParameters, Dictionary headers); /// /// Prepare the FlightTicket for the query. /// internal FlightTicket PrepareFlightTicket(string query, string database, QueryType queryType, Dictionary namedParameters); + + /// + /// Prepare the headers metadata. + /// + /// The invocation headers + /// + internal Metadata PrepareHeadersMetadata(Dictionary headers); } internal class FlightSqlClient : IFlightSqlClient @@ -66,7 +74,7 @@ internal FlightSqlClient(ClientConfig config, HttpClient httpClient) } async IAsyncEnumerable IFlightSqlClient.Execute(string query, string database, QueryType queryType, - Dictionary namedParameters) + Dictionary namedParameters, Dictionary headers) { // // verify that values of namedParameters is supported type @@ -82,16 +90,11 @@ async IAsyncEnumerable IFlightSqlClient.Execute(string query, strin } } - var headers = new Metadata(); - // authorization by token - if (!string.IsNullOrEmpty(_config.Token)) - { - headers.Add("Authorization", $"Bearer {_config.Token}"); - } + var metadata = ((IFlightSqlClient)this).PrepareHeadersMetadata(headers); var ticket = ((IFlightSqlClient)this).PrepareFlightTicket(query, database, queryType, namedParameters); - using var stream = _flightClient.GetStream(ticket, headers); + using var stream = _flightClient.GetStream(ticket, metadata); while (await stream.ResponseStream.MoveNext().ConfigureAwait(false)) { yield return stream.ResponseStream.Current; @@ -125,6 +128,31 @@ FlightTicket IFlightSqlClient.PrepareFlightTicket(string query, string database, return flightTicket; } + Metadata IFlightSqlClient.PrepareHeadersMetadata(Dictionary headers) + { + var metadata = new Metadata(); + // authorization by token + if (!string.IsNullOrEmpty(_config.Token)) + { + metadata.Add("Authorization", $"Bearer {_config.Token}"); + } + + // add request headers + foreach (var header in headers) + { + metadata.Add(header.Key, header.Value); + } + // add global headers + if (_config.Headers != null) + { + foreach (var header in _config.Headers.Where(header => !headers.ContainsKey(header.Key))) + { + metadata.Add(header.Key, header.Value); + } + } + return metadata; + } + private string SerializeDictionary(Dictionary ticketData) { using var memoryStream = new MemoryStream(); From c9a503b05afe07bb919b29915b288ca3b3c4e671 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 28 Mar 2024 06:03:08 +0100 Subject: [PATCH 2/5] feat: custom headers can be specified for writes --- Client.Test/InfluxDBClientWriteTest.cs | 71 ++++++++++++++++++++- Client/InfluxDBClient.cs | 86 +++++++++++++++++--------- Client/Internal/FlightSqlClient.cs | 2 +- Client/Internal/RestClient.cs | 22 ++++++- 4 files changed, 148 insertions(+), 33 deletions(-) diff --git a/Client.Test/InfluxDBClientWriteTest.cs b/Client.Test/InfluxDBClientWriteTest.cs index a941264..f7290c3 100644 --- a/Client.Test/InfluxDBClientWriteTest.cs +++ b/Client.Test/InfluxDBClientWriteTest.cs @@ -286,7 +286,7 @@ public async Task Proxy() Token = "my-token", Organization = "my-org", Database = "my-database", - Proxy = new System.Net.WebProxy + Proxy = new WebProxy { Address = new Uri(MockProxyUrl), BypassProxyOnLocal = false @@ -333,7 +333,74 @@ public async Task CustomHeader() await _client.WritePointAsync(point); var requests = MockServer.LogEntries.ToList(); - Assert.That(requests[0].RequestMessage.BodyData?.BodyAsString, Is.EqualTo("h2o,location=europe level=2i 123000000000")); + Assert.That(requests, Has.Count.EqualTo(1)); + Assert.Multiple(() => + { + Assert.That(requests[0].RequestMessage.BodyData?.BodyAsString, Is.EqualTo("h2o,location=europe level=2i 123000000000")); + Assert.That(requests[0].RequestMessage.Headers?["X-device"].First(), Is.EqualTo("ab-01")); + }); + } + + [Test] + public async Task CustomHeaderFromRequest() + { + _client = new InfluxDBClient(new ClientConfig + { + Host = MockServerUrl, + Token = "my-token", + Organization = "my-org", + Database = "my-database" + }); + MockServer + .Given(Request.Create().WithPath("/api/v2/write").WithHeader("X-Tracing-ID", "123").UsingPost()) + .RespondWith(Response.Create().WithStatusCode(204)); + + var point = PointData.Measurement("h2o") + .SetTag("location", "europe") + .SetField("level", 2) + .SetTimestamp(123_000_000_000L); + + await _client.WritePointAsync(point, headers: new Dictionary + { + { "X-Tracing-ID", "123" }, + }); + + var requests = MockServer.LogEntries.ToList(); + Assert.That(requests, Has.Count.EqualTo(1)); + Assert.That(requests[0].RequestMessage.Headers?["X-Tracing-ID"].First(), Is.EqualTo("123")); + } + + [Test] + public async Task CustomHeaderFromRequestArePreferred() + { + _client = new InfluxDBClient(new ClientConfig + { + Host = MockServerUrl, + Token = "my-token", + Organization = "my-org", + Database = "my-database", + Headers = new Dictionary + { + { "X-Client-ID", "123" }, + } + }); + MockServer + .Given(Request.Create().WithPath("/api/v2/write").WithHeader("X-Client-ID", "456").UsingPost()) + .RespondWith(Response.Create().WithStatusCode(204)); + + var point = PointData.Measurement("h2o") + .SetTag("location", "europe") + .SetField("level", 2) + .SetTimestamp(123_000_000_000L); + + await _client.WritePointAsync(point, headers: new Dictionary + { + { "X-Client-ID", "456" }, + }); + + var requests = MockServer.LogEntries.ToList(); + Assert.That(requests, Has.Count.EqualTo(1)); + Assert.That(requests[0].RequestMessage.Headers?["X-Client-ID"].First(), Is.EqualTo("456")); } private async Task WriteData() diff --git a/Client/InfluxDBClient.cs b/Client/InfluxDBClient.cs index 77c4056..3095d1c 100644 --- a/Client/InfluxDBClient.cs +++ b/Client/InfluxDBClient.cs @@ -161,9 +161,13 @@ IAsyncEnumerable QueryPoints(string query, QueryType? queryType /// Specifies the record in InfluxDB Line Protocol. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. + /// + /// The headers to be added to write request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// specifies the token to monitor for cancellation requests. - Task WriteRecordAsync(string record, string? database = null, - WritePrecision? precision = null, CancellationToken cancellationToken = default); + Task WriteRecordAsync(string record, string? database = null, WritePrecision? precision = null, + Dictionary? headers = null, CancellationToken cancellationToken = default); /// /// Write data to InfluxDB. @@ -171,9 +175,13 @@ Task WriteRecordAsync(string record, string? database = null, /// Specifies the records in InfluxDB Line Protocol. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. + /// + /// The headers to be added to write request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// specifies the token to monitor for cancellation requests. - Task WriteRecordsAsync(IEnumerable records, string? database = null, - WritePrecision? precision = null, CancellationToken cancellationToken = default); + Task WriteRecordsAsync(IEnumerable records, string? database = null, WritePrecision? precision = null, + Dictionary? headers = null, CancellationToken cancellationToken = default); /// /// Write data to InfluxDB. @@ -181,9 +189,13 @@ Task WriteRecordsAsync(IEnumerable records, string? database = null, /// Specifies the Data point to write into InfluxDB. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. + /// + /// The headers to be added to write request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// specifies the token to monitor for cancellation requests. - Task WritePointAsync(PointData point, string? database = null, - WritePrecision? precision = null, CancellationToken cancellationToken = default); + Task WritePointAsync(PointData point, string? database = null, WritePrecision? precision = null, + Dictionary? headers = null, CancellationToken cancellationToken = default); /// /// Write data to InfluxDB. @@ -191,9 +203,13 @@ Task WritePointAsync(PointData point, string? database = null, /// Specifies the Data points to write into InfluxDB. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. + /// + /// The headers to be added to write request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// specifies the token to monitor for cancellation requests. - Task WritePointsAsync(IEnumerable points, string? database = null, - WritePrecision? precision = null, CancellationToken cancellationToken = default); + Task WritePointsAsync(IEnumerable points, string? database = null, WritePrecision? precision = null, + Dictionary? headers = null, CancellationToken cancellationToken = default); } public class InfluxDBClient : IInfluxDBClient @@ -566,11 +582,15 @@ public IAsyncEnumerable QueryBatches(string query, QueryType? query /// Specifies the record in InfluxDB Line Protocol. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. + /// + /// The headers to be added to write request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// specifies the token to monitor for cancellation requests. - public Task WriteRecordAsync(string record, string? database = null, - WritePrecision? precision = null, CancellationToken cancellationToken = default) + public Task WriteRecordAsync(string record, string? database = null, WritePrecision? precision = null, + Dictionary? headers = null, CancellationToken cancellationToken = default) { - return WriteRecordsAsync(new[] { record }, database, precision, cancellationToken); + return WriteRecordsAsync(new[] { record }, database, precision, headers, cancellationToken); } /// @@ -579,11 +599,16 @@ public Task WriteRecordAsync(string record, string? database = null, /// Specifies the records in InfluxDB Line Protocol. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. + /// + /// The headers to be added to write request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// specifies the token to monitor for cancellation requests. - public Task WriteRecordsAsync(IEnumerable records, string? database = null, - WritePrecision? precision = null, CancellationToken cancellationToken = default) + public Task WriteRecordsAsync(IEnumerable records, string? database = null, + WritePrecision? precision = null, Dictionary? headers = null, + CancellationToken cancellationToken = default) { - return WriteData(records, database, precision, cancellationToken); + return WriteData(records, database, precision, headers, cancellationToken); } /// @@ -592,11 +617,15 @@ public Task WriteRecordsAsync(IEnumerable records, string? database = nu /// Specifies the Data point to write into InfluxDB. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. + /// + /// The headers to be added to write request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// specifies the token to monitor for cancellation requests. - public Task WritePointAsync(PointData point, string? database = null, - WritePrecision? precision = null, CancellationToken cancellationToken = default) + public Task WritePointAsync(PointData point, string? database = null, WritePrecision? precision = null, + Dictionary? headers = null, CancellationToken cancellationToken = default) { - return WritePointsAsync(new[] { point }, database, precision, cancellationToken); + return WritePointsAsync(new[] { point }, database, precision, headers, cancellationToken); } /// @@ -605,15 +634,21 @@ public Task WritePointAsync(PointData point, string? database = null, /// Specifies the Data points to write into InfluxDB. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. + /// + /// The headers to be added to write request. The headers specified here are preferred over + /// the headers specified in the client configuration. + /// /// specifies the token to monitor for cancellation requests. public Task WritePointsAsync(IEnumerable points, string? database = null, - WritePrecision? precision = null, CancellationToken cancellationToken = default) + WritePrecision? precision = null, Dictionary? headers = null, + CancellationToken cancellationToken = default) { - return WriteData(points, database, precision, cancellationToken); + return WriteData(points, database, precision, headers, cancellationToken); } - private async Task WriteData(IEnumerable data, string? database = null, - WritePrecision? precision = null, CancellationToken cancellationToken = default) + private async Task WriteData(IEnumerable data, string? database = null, + WritePrecision? precision = null, Dictionary? headers = null, + CancellationToken cancellationToken = default) { if (_disposed) { @@ -645,7 +680,7 @@ private async Task WriteData(IEnumerable data, string? database = null, }; await _restClient - .Request("api/v2/write", HttpMethod.Post, content, queryParams, cancellationToken) + .Request("api/v2/write", HttpMethod.Post, content, queryParams, headers, cancellationToken) .ConfigureAwait(false); } @@ -715,13 +750,6 @@ internal static HttpClient CreateAndConfigureHttpClient(ClientConfig config) { Timeout = config.Timeout }; - if (config.Headers != null) - { - foreach (var header in config.Headers) - { - client.DefaultRequestHeaders.Add(header.Key, header.Value); - } - } client.DefaultRequestHeaders.UserAgent.ParseAdd($"influxdb3-csharp/{AssemblyHelper.GetVersion()}"); if (!string.IsNullOrEmpty(config.Token)) diff --git a/Client/Internal/FlightSqlClient.cs b/Client/Internal/FlightSqlClient.cs index b7c6f7a..e31428d 100644 --- a/Client/Internal/FlightSqlClient.cs +++ b/Client/Internal/FlightSqlClient.cs @@ -142,7 +142,7 @@ Metadata IFlightSqlClient.PrepareHeadersMetadata(Dictionary head { metadata.Add(header.Key, header.Value); } - // add global headers + // add config headers if (_config.Headers != null) { foreach (var header in _config.Headers.Where(header => !headers.ContainsKey(header.Key))) diff --git a/Client/Internal/RestClient.cs b/Client/Internal/RestClient.cs index 4451c80..fc8abc8 100644 --- a/Client/Internal/RestClient.cs +++ b/Client/Internal/RestClient.cs @@ -29,7 +29,7 @@ internal RestClient(ClientConfig config, HttpClient httpClient) } internal async Task Request(string path, HttpMethod method, HttpContent? content = null, - Dictionary? queryParams = null, + Dictionary? queryParams = null, Dictionary? headers = null, CancellationToken cancellationToken = default) { var builder = new UriBuilder(new Uri($"{_config.Host}{path}")); @@ -58,6 +58,26 @@ internal async Task Request(string path, HttpMethod method, HttpContent? content RequestUri = builder.Uri, Content = content, }; + // add request headers + if (headers is not null) + { + foreach (var header in headers) + { + request.Headers.Add(header.Key, header.Value); + } + } + + // add config headers + if (_config.Headers != null) + { + foreach (var header in _config.Headers) + { + if (headers == null || !headers.ContainsKey(header.Key)) + { + request.Headers.Add(header.Key, header.Value); + } + } + } var result = await _httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); if (!result.IsSuccessStatusCode) From e14919c0121c6d65f643b1ecab6575eb893545d2 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 28 Mar 2024 06:22:36 +0100 Subject: [PATCH 3/5] docs: add examples how to use headers in write --- Client/InfluxDBClient.cs | 134 +++++++++++++++++++++++++++++++++++---- 1 file changed, 123 insertions(+), 11 deletions(-) diff --git a/Client/InfluxDBClient.cs b/Client/InfluxDBClient.cs index 3095d1c..af5651d 100644 --- a/Client/InfluxDBClient.cs +++ b/Client/InfluxDBClient.cs @@ -158,6 +158,19 @@ IAsyncEnumerable QueryPoints(string query, QueryType? queryType /// /// Write data to InfluxDB. /// + /// + /// + /// The following example shows how to write a single record with custom headers: + /// + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// await client.WriteRecordAsync( + /// record: "stat,unit=temperature value=24.5", + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// Specifies the record in InfluxDB Line Protocol. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. @@ -166,12 +179,25 @@ IAsyncEnumerable QueryPoints(string query, QueryType? queryType /// the headers specified in the client configuration. /// /// specifies the token to monitor for cancellation requests. - Task WriteRecordAsync(string record, string? database = null, WritePrecision? precision = null, + Task WriteRecordAsync(string record, string? database = null, WritePrecision? precision = null, Dictionary? headers = null, CancellationToken cancellationToken = default); /// /// Write data to InfluxDB. /// + /// + /// + /// The following example shows how to write multiple records with custom headers: + /// + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// await client.WriteRecordsAsync( + /// records: new[] { "stat,unit=temperature value=24.5", "stat,unit=temperature value=25.5" }, + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// Specifies the records in InfluxDB Line Protocol. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. @@ -180,12 +206,24 @@ Task WriteRecordAsync(string record, string? database = null, WritePrecision? pr /// the headers specified in the client configuration. /// /// specifies the token to monitor for cancellation requests. - Task WriteRecordsAsync(IEnumerable records, string? database = null, WritePrecision? precision = null, + Task WriteRecordsAsync(IEnumerable records, string? database = null, WritePrecision? precision = null, Dictionary? headers = null, CancellationToken cancellationToken = default); /// /// Write data to InfluxDB. /// + /// + /// + /// The following example shows how to write a single point with custom headers: + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// await client.WritePointAsync( + /// point: PointData.Measurement("h2o").SetTag("location", "europe").SetField("level", 2), + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// Specifies the Data point to write into InfluxDB. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. @@ -200,6 +238,22 @@ Task WritePointAsync(PointData point, string? database = null, WritePrecision? p /// /// Write data to InfluxDB. /// + /// + /// + /// The following example shows how to write multiple points with custom headers: + /// + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// await client.WritePointsAsync( + /// points: new[]{ + /// PointData.Measurement("h2o").SetTag("location", "europe").SetField("level", 2), + /// PointData.Measurement("h2o").SetTag("location", "us-west").SetField("level", 4), + /// }, + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// Specifies the Data points to write into InfluxDB. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. @@ -208,7 +262,7 @@ Task WritePointAsync(PointData point, string? database = null, WritePrecision? p /// the headers specified in the client configuration. /// /// specifies the token to monitor for cancellation requests. - Task WritePointsAsync(IEnumerable points, string? database = null, WritePrecision? precision = null, + Task WritePointsAsync(IEnumerable points, string? database = null, WritePrecision? precision = null, Dictionary? headers = null, CancellationToken cancellationToken = default); } @@ -392,7 +446,8 @@ public InfluxDBClient() : this( string? database = null, Dictionary? namedParameters = null, Dictionary? headers = null) { - await foreach (var batch in QueryBatches(query, queryType, database, namedParameters, headers).ConfigureAwait(false)) + await foreach (var batch in QueryBatches(query, queryType, database, namedParameters, headers) + .ConfigureAwait(false)) { var rowCount = batch.Column(0).Length; for (var i = 0; i < rowCount; i++) @@ -454,9 +509,11 @@ public InfluxDBClient() : this( /// Batches of rows /// The client is already disposed public async IAsyncEnumerable QueryPoints(string query, QueryType? queryType = null, - string? database = null, Dictionary? namedParameters = null, Dictionary? headers = null) + string? database = null, Dictionary? namedParameters = null, + Dictionary? headers = null) { - await foreach (var batch in QueryBatches(query, queryType, database, namedParameters, headers).ConfigureAwait(false)) + await foreach (var batch in QueryBatches(query, queryType, database, namedParameters, headers) + .ConfigureAwait(false)) { var rowCount = batch.Column(0).Length; for (var i = 0; i < rowCount; i++) @@ -579,6 +636,20 @@ public IAsyncEnumerable QueryBatches(string query, QueryType? query /// /// Write data to InfluxDB. /// + /// + /// + /// The following example shows how to write a single record with custom headers: + /// + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// await client.WriteRecordAsync( + /// record: "stat,unit=temperature value=24.5", + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// + /// /// Specifies the record in InfluxDB Line Protocol. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. @@ -587,7 +658,7 @@ public IAsyncEnumerable QueryBatches(string query, QueryType? query /// the headers specified in the client configuration. /// /// specifies the token to monitor for cancellation requests. - public Task WriteRecordAsync(string record, string? database = null, WritePrecision? precision = null, + public Task WriteRecordAsync(string record, string? database = null, WritePrecision? precision = null, Dictionary? headers = null, CancellationToken cancellationToken = default) { return WriteRecordsAsync(new[] { record }, database, precision, headers, cancellationToken); @@ -596,6 +667,19 @@ public Task WriteRecordAsync(string record, string? database = null, WritePrecis /// /// Write data to InfluxDB. /// + /// + /// + /// The following example shows how to write multiple records with custom headers: + /// + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// await client.WriteRecordsAsync( + /// records: new[] { "stat,unit=temperature value=24.5", "stat,unit=temperature value=25.5" }, + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// Specifies the records in InfluxDB Line Protocol. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. @@ -604,7 +688,7 @@ public Task WriteRecordAsync(string record, string? database = null, WritePrecis /// the headers specified in the client configuration. /// /// specifies the token to monitor for cancellation requests. - public Task WriteRecordsAsync(IEnumerable records, string? database = null, + public Task WriteRecordsAsync(IEnumerable records, string? database = null, WritePrecision? precision = null, Dictionary? headers = null, CancellationToken cancellationToken = default) { @@ -614,6 +698,18 @@ public Task WriteRecordsAsync(IEnumerable records, string? database = nu /// /// Write data to InfluxDB. /// + /// + /// + /// The following example shows how to write a single point with custom headers: + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// await client.WritePointAsync( + /// point: PointData.Measurement("h2o").SetTag("location", "europe").SetField("level", 2), + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// Specifies the Data point to write into InfluxDB. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. @@ -622,7 +718,7 @@ public Task WriteRecordsAsync(IEnumerable records, string? database = nu /// the headers specified in the client configuration. /// /// specifies the token to monitor for cancellation requests. - public Task WritePointAsync(PointData point, string? database = null, WritePrecision? precision = null, + public Task WritePointAsync(PointData point, string? database = null, WritePrecision? precision = null, Dictionary? headers = null, CancellationToken cancellationToken = default) { return WritePointsAsync(new[] { point }, database, precision, headers, cancellationToken); @@ -631,6 +727,22 @@ public Task WritePointAsync(PointData point, string? database = null, WritePreci /// /// Write data to InfluxDB. /// + /// + /// + /// The following example shows how to write multiple points with custom headers: + /// + /// + /// using var client = new InfluxDBClient("http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); + /// + /// await client.WritePointsAsync( + /// points: new[]{ + /// PointData.Measurement("h2o").SetTag("location", "europe").SetField("level", 2), + /// PointData.Measurement("h2o").SetTag("location", "us-west").SetField("level", 4), + /// }, + /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } + /// ); + /// + /// /// Specifies the Data points to write into InfluxDB. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. @@ -646,8 +758,8 @@ public Task WritePointsAsync(IEnumerable points, string? database = n return WriteData(points, database, precision, headers, cancellationToken); } - private async Task WriteData(IEnumerable data, string? database = null, - WritePrecision? precision = null, Dictionary? headers = null, + private async Task WriteData(IEnumerable data, string? database = null, + WritePrecision? precision = null, Dictionary? headers = null, CancellationToken cancellationToken = default) { if (_disposed) From 6991154ec24dcd1d013b96723e6dc579982d3649 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 28 Mar 2024 06:50:58 +0100 Subject: [PATCH 4/5] fix: formatting --- Client.Test/InfluxDBClientWriteTest.cs | 4 ++-- Client/Internal/RestClient.cs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Client.Test/InfluxDBClientWriteTest.cs b/Client.Test/InfluxDBClientWriteTest.cs index f7290c3..d9caa4f 100644 --- a/Client.Test/InfluxDBClientWriteTest.cs +++ b/Client.Test/InfluxDBClientWriteTest.cs @@ -340,7 +340,7 @@ public async Task CustomHeader() Assert.That(requests[0].RequestMessage.Headers?["X-device"].First(), Is.EqualTo("ab-01")); }); } - + [Test] public async Task CustomHeaderFromRequest() { @@ -369,7 +369,7 @@ public async Task CustomHeaderFromRequest() Assert.That(requests, Has.Count.EqualTo(1)); Assert.That(requests[0].RequestMessage.Headers?["X-Tracing-ID"].First(), Is.EqualTo("123")); } - + [Test] public async Task CustomHeaderFromRequestArePreferred() { diff --git a/Client/Internal/RestClient.cs b/Client/Internal/RestClient.cs index fc8abc8..328c2c9 100644 --- a/Client/Internal/RestClient.cs +++ b/Client/Internal/RestClient.cs @@ -66,7 +66,7 @@ internal async Task Request(string path, HttpMethod method, HttpContent? content request.Headers.Add(header.Key, header.Value); } } - + // add config headers if (_config.Headers != null) { From 27f4e5bad8459ed21b0174c35e52aa4160b824b1 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 28 Mar 2024 07:22:54 +0100 Subject: [PATCH 5/5] docs: update CHANGELOG.md --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5460253..c25fac4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 0.6.0 [unreleased] +### Features + +1. [#90](https://github.com/InfluxCommunity/influxdb3-csharp/pull/90): Custom `HTTP/gRPC` headers can be specified globally by config or per request + ## 0.5.0 [2024-03-01] ### Features