diff --git a/src/Elastic.Apm/Ingest/ApmChannel.cs b/src/Elastic.Apm/Ingest/ApmChannel.cs index c9ba16295..f8d080c98 100644 --- a/src/Elastic.Apm/Ingest/ApmChannel.cs +++ b/src/Elastic.Apm/Ingest/ApmChannel.cs @@ -5,13 +5,16 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.IO; +using System.Linq; using System.Text.Encodings.Web; using System.Text.Json; using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; using Elastic.Apm.Api; +using Elastic.Apm.Logging; using Elastic.Apm.Report; using Elastic.Channels; using Elastic.Ingest.Transport; @@ -19,6 +22,7 @@ namespace Elastic.Apm.Ingest; +#nullable enable internal static class ApmChannelStatics { public static readonly byte[] LineFeed = { (byte)'\n' }; @@ -40,18 +44,56 @@ internal static class ApmChannelStatics /// public class ApmChannel : TransportChannelBase - , IPayloadSender + , IPayloadSender { + private readonly List> _transactionFilters = new(); + private readonly List> _spanFilters = new(); + private readonly List> _errorFilters = new(); + /// - public ApmChannel(ApmChannelOptions options) : base(options) { } + public ApmChannel(ApmChannelOptions options, IApmLogger? logger = null) : base(options) => + PayloadSenderV2.SetUpFilters(_transactionFilters, _spanFilters, _errorFilters, null, logger ?? new TraceLogger(LogLevel.Trace)); + + public IError? Filter(IError error) => _errorFilters.Aggregate(error, (current, filter) => filter(current)!); + + public ISpan? Filter(ISpan span) => _spanFilters.Aggregate(span, (current, filter) => filter(current)!); - void IPayloadSender.QueueError(IError error) => TryWrite(error); + public ITransaction? Filter(ITransaction span) => _transactionFilters.Aggregate(span, (current, filter) => filter(current)!); + + public bool TryFilter(IError error, [NotNullWhen(true)] out IError? filtered) + { + filtered = _errorFilters.Select(f => f(error)).TakeWhile(e => e != null).LastOrDefault(); + return filtered != null; + } - void IPayloadSender.QueueMetrics(IMetricSet metrics) => TryWrite(metrics); + public bool TryFilter(ISpan span, [NotNullWhen(true)] out ISpan? filtered) + { + filtered = _spanFilters.Select(f => f(span)).TakeWhile(e => e != null).LastOrDefault(); + return filtered != null; + } - void IPayloadSender.QueueSpan(ISpan span) => TryWrite(span); + public bool TryFilter(ITransaction transaction, [NotNullWhen(true)] out ITransaction? filtered) + { + filtered = _transactionFilters.Select(f => f(transaction)).TakeWhile(e => e != null).LastOrDefault(); + return filtered != null; + } - void IPayloadSender.QueueTransaction(ITransaction transaction) => TryWrite(transaction); + public virtual void QueueMetrics(IMetricSet metrics) => TryWrite(metrics); + + public virtual void QueueError(IError error) + { + if (TryFilter(error, out var e)) TryWrite(e); + } + + public virtual void QueueSpan(ISpan span) + { + if (TryFilter(span, out var s)) TryWrite(s); + } + + public virtual void QueueTransaction(ITransaction transaction) + { + if (TryFilter(transaction, out var t)) TryWrite(t); + } //retry if APM server returns 429 /// @@ -74,7 +116,8 @@ public ApmChannel(ApmChannelOptions options) : base(options) { } protected override bool RejectEvent((IIntakeRoot, IntakeErrorItem) @event) => false; /// - protected override Task ExportAsync(HttpTransport transport, ArraySegment page, CancellationToken ctx = default) => + protected override Task + ExportAsync(HttpTransport transport, ArraySegment page, CancellationToken ctx = default) => transport.RequestAsync(HttpMethod.POST, "/intake/v2/events", PostData.StreamHandler(page, (_, _) => diff --git a/src/Elastic.Apm/Ingest/ApmChannelOptions.cs b/src/Elastic.Apm/Ingest/ApmChannelOptions.cs index 835b24ca2..f86e52490 100644 --- a/src/Elastic.Apm/Ingest/ApmChannelOptions.cs +++ b/src/Elastic.Apm/Ingest/ApmChannelOptions.cs @@ -2,6 +2,7 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System; using Elastic.Apm.Api; using Elastic.Ingest.Transport; using Elastic.Transport; @@ -14,5 +15,11 @@ namespace Elastic.Apm.Ingest; public class ApmChannelOptions : TransportChannelOptionsBase { /// - public ApmChannelOptions(HttpTransport transport) : base(transport) { } + private ApmChannelOptions(HttpTransport transport) : base(transport) { } + + public ApmChannelOptions(Uri serverEndpoint, TransportClient transportClient = null) + : this(new DefaultHttpTransport(new TransportConfiguration(new SingleNodePool(serverEndpoint), connection: transportClient!))) + { + + } } diff --git a/src/Elastic.Apm/Libraries/Newtonsoft.Json/Utilities/NullableAttributes.cs b/src/Elastic.Apm/Libraries/Newtonsoft.Json/Utilities/NullableAttributes.cs index 0816c1e8c..560d01ade 100644 --- a/src/Elastic.Apm/Libraries/Newtonsoft.Json/Utilities/NullableAttributes.cs +++ b/src/Elastic.Apm/Libraries/Newtonsoft.Json/Utilities/NullableAttributes.cs @@ -26,6 +26,7 @@ #endregion #nullable enable +#if !NET6_0_OR_GREATER namespace System.Diagnostics.CodeAnalysis { /// Specifies that an output will not be null even if the corresponding type allows it. @@ -78,3 +79,4 @@ internal class DoesNotReturnIfAttribute : Attribute public bool ParameterValue { get; } } } +#endif diff --git a/test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs b/test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs index 715ecf737..e3cfe610f 100644 --- a/test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs +++ b/test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs @@ -10,33 +10,34 @@ using System.Linq; using System.Threading; using Elastic.Apm.Api; +using Elastic.Apm.Ingest; using Elastic.Apm.Libraries.Newtonsoft.Json.Linq; using Elastic.Apm.Logging; using Elastic.Apm.Metrics; using Elastic.Apm.Model; using Elastic.Apm.Report; +using Elastic.Transport; using FluentAssertions; +#nullable enable namespace Elastic.Apm.Tests.Utilities { - internal class MockPayloadSender : IPayloadSender + internal class MockPayloadSender : ApmChannel { private static readonly JObject JsonSpanTypesData = JObject.Parse(File.ReadAllText("./TestResources/json-specs/span_types.json")); - private readonly List _errors = new List(); - private readonly List> _errorFilters = new List>(); - private readonly object _spanLock = new object(); - private readonly object _transactionLock = new object(); - private readonly object _metricsLock = new object(); - private readonly object _errorLock = new object(); - private readonly List _metrics = new List(); - private readonly List> _spanFilters = new List>(); - private readonly List _spans = new List(); - private readonly List> _transactionFilters = new List>(); - private readonly List _transactions = new List(); - - public MockPayloadSender(IApmLogger logger = null) + private readonly object _spanLock = new(); + private readonly object _transactionLock = new(); + private readonly object _metricsLock = new(); + private readonly object _errorLock = new(); + private readonly List _metrics = new(); + private readonly List _errors = new(); + private readonly List _spans = new(); + private readonly List _transactions = new(); + + public MockPayloadSender(IApmLogger? logger = null) + : base(new ApmChannelOptions(new Uri("http://localhost:8080"), transportClient: new InMemoryConnection()), logger) { _waitHandles = new[] { new AutoResetEvent(false), new AutoResetEvent(false), new AutoResetEvent(false), new AutoResetEvent(false) }; @@ -45,7 +46,6 @@ public MockPayloadSender(IApmLogger logger = null) _errorWaitHandle = _waitHandles[2]; _metricSetWaitHandle = _waitHandles[3]; - PayloadSenderV2.SetUpFilters(_transactionFilters, _spanFilters, _errorFilters, MockApmServerInfo.Version710, logger ?? new NoopLogger()); } /// @@ -61,6 +61,54 @@ public MockPayloadSender(IApmLogger logger = null) private readonly AutoResetEvent[] _waitHandles; private static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(1); + public override bool TryWrite(IIntakeRoot item) + { + var written = base.TryWrite(item); + switch (item) + { + case IError error: + _errors.Add(error); + _errorWaitHandle.Set(); + break; + case ITransaction transaction: + _transactions.Add(transaction); + _transactionWaitHandle.Set(); + break; + case ISpan span: + _spans.Add(span); + _spanWaitHandle.Set(); + break; + case IMetricSet metricSet: + _metrics.Add(metricSet); + _metricSetWaitHandle.Set(); + break; + } + return written; + } + + public override void QueueError(IError error) + { + lock (_errorLock) base.QueueError(error); + } + + public override void QueueTransaction(ITransaction transaction) + { + lock (_transactionLock) base.QueueTransaction(transaction); + } + + public override void QueueSpan(ISpan span) + { + VerifySpan(span); + lock (_spanLock) base.QueueSpan(span); + } + + public override void QueueMetrics(IMetricSet metricSet) + { + lock (_metricsLock) base.QueueMetrics(metricSet); + } + + + /// /// Waits for any events to be queued /// @@ -191,19 +239,19 @@ public IReadOnlyList Errors get { lock (_errorLock) - return CreateImmutableSnapshot(_errors); + return CreateImmutableSnapshot(_errors); } } - public Error FirstError => Errors.FirstOrDefault() as Error; - public MetricSet FirstMetric => Metrics.FirstOrDefault() as MetricSet; + public Error? FirstError => Errors.FirstOrDefault() as Error; + public MetricSet? FirstMetric => Metrics.FirstOrDefault() as MetricSet; /// /// The 1. Span on the 1. Transaction /// - public Span FirstSpan => Spans.FirstOrDefault() as Span; + public Span? FirstSpan => Spans.FirstOrDefault() as Span; - public Transaction FirstTransaction => + public Transaction? FirstTransaction => Transactions.FirstOrDefault() as Transaction; public IReadOnlyList Metrics @@ -211,7 +259,7 @@ public IReadOnlyList Metrics get { lock (_metricsLock) - return CreateImmutableSnapshot(_metrics); + return CreateImmutableSnapshot(_metrics); } } @@ -220,7 +268,7 @@ public IReadOnlyList Spans get { lock (_spanLock) - return CreateImmutableSnapshot(_spans); + return CreateImmutableSnapshot(_spans); } } @@ -229,45 +277,15 @@ public IReadOnlyList Transactions get { lock (_transactionLock) - return CreateImmutableSnapshot(_transactions); + return CreateImmutableSnapshot(_transactions); } } public Span[] SpansOnFirstTransaction => - Spans.Where(n => n.TransactionId == Transactions.First().Id).Select(n => n as Span).ToArray(); - - public void QueueError(IError error) - { - lock (_errorLock) - { - error = _errorFilters.Aggregate(error, - (current, filter) => filter(current)); - _errors.Add(error); - _errorWaitHandle.Set(); - } - } - - public virtual void QueueTransaction(ITransaction transaction) - { - lock (_transactionLock) - { - transaction = _transactionFilters.Aggregate(transaction, - (current, filter) => filter(current)); - _transactions.Add(transaction); - _transactionWaitHandle.Set(); - } - } - - public void QueueSpan(ISpan span) - { - VerifySpan(span); - lock (_spanLock) - { - span = _spanFilters.Aggregate(span, (current, filter) => filter(current)); - _spans.Add(span); - _spanWaitHandle.Set(); - } - } + Spans + .Where(n => n.TransactionId == Transactions.First().Id) + .Select(n => (Span)n) + .ToArray(); private void VerifySpan(ISpan span) { @@ -279,7 +297,7 @@ private void VerifySpan(ISpan span) var spanTypeInfo = JsonSpanTypesData[type] as JObject; spanTypeInfo.Should().NotBeNull($"span type '{type}' is not allowed by the spec"); - var allowNullSubtype = spanTypeInfo["allow_null_subtype"]?.Value(); + var allowNullSubtype = spanTypeInfo!["allow_null_subtype"]?.Value(); var allowUnlistedSubtype = spanTypeInfo["allow_unlisted_subtype"]?.Value(); var subTypes = spanTypeInfo["subtypes"]; var hasSubtypes = subTypes != null && subTypes.Any(); @@ -289,7 +307,7 @@ private void VerifySpan(ISpan span) { if (!allowUnlistedSubtype.GetValueOrDefault() && hasSubtypes) { - var subTypeInfo = subTypes[subType]; + var subTypeInfo = subTypes![subType]; subTypeInfo.Should() .NotBeNull($"span subtype '{subType}' is not allowed by the spec for type '{type}'"); } @@ -305,15 +323,6 @@ private void VerifySpan(ISpan span) } } - public void QueueMetrics(IMetricSet metricSet) - { - lock (_metricsLock) - { - _metrics.Add(metricSet); - _metricSetWaitHandle.Set(); - } - } - public void Clear() { lock (_spanLock) diff --git a/test/opentelemetry/Elastic.Apm.OpenTelemetry.Tests/OpenTelemetryTests.cs b/test/opentelemetry/Elastic.Apm.OpenTelemetry.Tests/OpenTelemetryTests.cs index 1537eb25d..da3c2103c 100644 --- a/test/opentelemetry/Elastic.Apm.OpenTelemetry.Tests/OpenTelemetryTests.cs +++ b/test/opentelemetry/Elastic.Apm.OpenTelemetry.Tests/OpenTelemetryTests.cs @@ -24,10 +24,10 @@ public void MixApisTest1() configuration: new MockConfiguration(openTelemetryBridgeEnabled: "true")))) OTSamples.Sample2(agent.Tracer); - payloadSender.FirstTransaction.Name.Should().Be("Sample2"); + payloadSender.FirstTransaction!.Name.Should().Be("Sample2"); payloadSender.Spans.Should().HaveCount(2); - payloadSender.FirstSpan.Name.Should().Be("foo"); + payloadSender.FirstSpan!.Name.Should().Be("foo"); payloadSender.Spans.ElementAt(1).Name.Should().Be("ElasticApmSpan"); payloadSender.FirstSpan.ParentId.Should().Be(payloadSender.FirstTransaction.Id); @@ -39,7 +39,7 @@ public void MixApisTest1() private void AssertOnTraceIds(MockPayloadSender payloadSender) { foreach (var span in payloadSender.Spans) - span.TraceId.Should().Be(payloadSender.FirstTransaction.TraceId); + span.TraceId.Should().Be(payloadSender.FirstTransaction!.TraceId); } [Fact] @@ -50,10 +50,10 @@ public void MixApisTest2() configuration: new MockConfiguration(openTelemetryBridgeEnabled: "true")))) OTSamples.Sample3(agent.Tracer); - payloadSender.FirstTransaction.Name.Should().Be("Sample3"); + payloadSender.FirstTransaction!.Name.Should().Be("Sample3"); payloadSender.Spans.Should().HaveCount(2); - payloadSender.FirstSpan.Name.Should().Be("ElasticApmSpan"); + payloadSender.FirstSpan!.Name.Should().Be("ElasticApmSpan"); payloadSender.Spans.ElementAt(1).Name.Should().Be("foo"); payloadSender.Spans.ElementAt(1).ParentId.Should().Be(payloadSender.FirstTransaction.Id); @@ -72,10 +72,10 @@ public void MixApisTest3() configuration: new MockConfiguration(openTelemetryBridgeEnabled: "true")))) OTSamples.Sample4(agent.Tracer); - payloadSender.FirstTransaction.Name.Should().Be("Sample4"); + payloadSender.FirstTransaction!.Name.Should().Be("Sample4"); payloadSender.Spans.Should().HaveCount(2); - payloadSender.FirstSpan.Name.Should().Be("ElasticApmSpan"); + payloadSender.FirstSpan!.Name.Should().Be("ElasticApmSpan"); payloadSender.Spans.ElementAt(1).Name.Should().Be("foo"); payloadSender.Spans.ElementAt(1).ParentId.Should().Be(payloadSender.FirstTransaction.Id); @@ -92,7 +92,7 @@ public void TestOtelFieldsWith1Span() configuration: new MockConfiguration(openTelemetryBridgeEnabled: "true")))) OTSamples.OneSpanWithAttributes(); - payloadSender.FirstTransaction.Name.Should().Be("foo"); + payloadSender.FirstTransaction!.Name.Should().Be("foo"); payloadSender.FirstTransaction.Otel.Should().NotBeNull(); payloadSender.FirstTransaction.Otel.SpanKind.Should().Be("Server"); payloadSender.FirstTransaction.Otel.Attributes.Should().NotBeNull(); @@ -107,13 +107,13 @@ public void TestOtelFieldsWith3Spans() configuration: new MockConfiguration(openTelemetryBridgeEnabled: "true")))) OTSamples.TwoSpansWithAttributes(); - payloadSender.FirstTransaction.Name.Should().Be("foo"); + payloadSender.FirstTransaction!.Name.Should().Be("foo"); payloadSender.FirstTransaction.Otel.Should().NotBeNull(); payloadSender.FirstTransaction.Otel.SpanKind.Should().Be("Server"); payloadSender.FirstTransaction.Otel.Attributes.Should().NotBeNull(); payloadSender.FirstTransaction.Otel.Attributes.Should().Contain("foo1", "bar1"); - payloadSender.FirstSpan.Name.Should().Be("bar"); + payloadSender.FirstSpan!.Name.Should().Be("bar"); payloadSender.FirstSpan.Otel.Should().NotBeNull(); payloadSender.FirstSpan.Otel.SpanKind.Should().Be("Internal"); payloadSender.FirstSpan.Otel.Attributes.Should().NotBeNull(); @@ -128,7 +128,7 @@ public void SpanKindTests() configuration: new MockConfiguration(openTelemetryBridgeEnabled: "true")))) OTSamples.SpanKindSample(); - payloadSender.FirstSpan.Type.Should().Be(ApiConstants.TypeExternal); + payloadSender.FirstSpan!.Type.Should().Be(ApiConstants.TypeExternal); payloadSender.FirstSpan.Subtype.Should().Be(ApiConstants.SubtypeHttp); payloadSender.Spans.ElementAt(1).Type.Should().Be(ApiConstants.TypeDb); @@ -178,7 +178,7 @@ public void DistributedTracingTest() payloadSender.WaitForTransactions(count: 2); - payloadSender.FirstTransaction.TraceId.Should() + payloadSender.FirstTransaction!.TraceId.Should() .Be(payloadSender.Transactions[1].TraceId, because: "The transactions should be under the same trace."); } @@ -196,7 +196,7 @@ public void ActivityAndTransactionTraceIdSynced() using (var activity = src.StartActivity("foo", ActivityKind.Server)) traceId = activity?.TraceId.ToString(); traceId.Should().NotBeNull(); - payloadSender.FirstTransaction.TraceId.Should().Be(traceId); + payloadSender.FirstTransaction!.TraceId.Should().Be(traceId); } [Fact]