From e7f119a37b19fe80ff7bf32a46834ee32cbf6582 Mon Sep 17 00:00:00 2001 From: owent Date: Thu, 16 Nov 2023 19:14:20 +0800 Subject: [PATCH] Fix unit test for OTLP gRPC async exporting. --- exporters/otlp/src/otlp_grpc_client.cc | 2 +- .../otlp/test/otlp_grpc_exporter_test.cc | 81 +++++++++- .../otlp_grpc_log_record_exporter_test.cc | 153 +++++++++++++++++- .../test/otlp_grpc_metric_exporter_test.cc | 1 + 4 files changed, 225 insertions(+), 12 deletions(-) diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index fb48c77364..27fb170a0d 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -238,7 +238,7 @@ OtlpGrpcClient::~OtlpGrpcClient() std::shared_ptr async_data; async_data.swap(async_data_); - while (async_data->running_requests.load(std::memory_order_acquire) > 0) + while (async_data && async_data->running_requests.load(std::memory_order_acquire) > 0) { std::unique_lock lock{async_data->session_waker_lock}; async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() { diff --git a/exporters/otlp/test/otlp_grpc_exporter_test.cc b/exporters/otlp/test/otlp_grpc_exporter_test.cc index 3be2bcc653..a525175078 100644 --- a/exporters/otlp/test/otlp_grpc_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_exporter_test.cc @@ -29,6 +29,7 @@ # include "opentelemetry/sdk/trace/tracer_provider.h" # include "opentelemetry/trace/provider.h" +# include # include # if defined(_MSC_VER) @@ -45,6 +46,73 @@ namespace exporter namespace otlp { +namespace +{ +class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceServiceStub +{ +public: +# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::async_interface; +# else + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::experimental_async_interface; +# endif + + OtlpMockTraceServiceStub() : async_interface_(this) {} + + class async_interface : public async_interface_base + { + public: + async_interface(OtlpMockTraceServiceStub *owner) : stub_(owner) {} + + virtual ~async_interface() {} + + void Export( + ::grpc::ClientContext *context, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest *request, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse *response, + std::function callback) override + { + stub_->last_async_status_ = stub_->Export(context, *request, response); + callback(stub_->last_async_status_); + } + +# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ + defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::ClientUnaryReactor * /*reactor*/) override + {} +# else + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::experimental::ClientUnaryReactor * /*reactor*/) + {} +# endif + + private: + OtlpMockTraceServiceStub *stub_; + }; + +# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + async_interface_base *async() override { return &async_interface_; } +# else + async_interface_base *experimental_async() override { return &async_interface_; } +# endif + + ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } + +private: + ::grpc::Status last_async_status_; + async_interface async_interface_; +}; +} // namespace + class OtlpGrpcExporterTestPeer : public ::testing::Test { public: @@ -64,7 +132,7 @@ class OtlpGrpcExporterTestPeer : public ::testing::Test TEST_F(OtlpGrpcExporterTestPeer, ShutdownTest) { - auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub(); + auto mock_stub = new OtlpMockTraceServiceStub(); std::unique_ptr stub_interface( mock_stub); auto exporter = GetExporter(stub_interface); @@ -78,6 +146,7 @@ TEST_F(OtlpGrpcExporterTestPeer, ShutdownTest) nostd::span> batch_1(&recordable_1, 1); EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK)); auto result = exporter->Export(batch_1); + exporter->ForceFlush(); EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); exporter->Shutdown(); @@ -90,7 +159,7 @@ TEST_F(OtlpGrpcExporterTestPeer, ShutdownTest) // Call Export() directly TEST_F(OtlpGrpcExporterTestPeer, ExportUnitTest) { - auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub(); + auto mock_stub = new OtlpMockTraceServiceStub(); std::unique_ptr stub_interface( mock_stub); auto exporter = GetExporter(stub_interface); @@ -112,13 +181,19 @@ TEST_F(OtlpGrpcExporterTestPeer, ExportUnitTest) .Times(Exactly(1)) .WillOnce(Return(grpc::Status::CANCELLED)); result = exporter->Export(batch_2); + exporter->ForceFlush(); +# if defined(ENABLE_ASYNC_EXPORT) + EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); + EXPECT_FALSE(mock_stub->GetLastAsyncStatus().ok()); +# else EXPECT_EQ(sdk::common::ExportResult::kFailure, result); +# endif } // Create spans, let processor call Export() TEST_F(OtlpGrpcExporterTestPeer, ExportIntegrationTest) { - auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub(); + auto mock_stub = new OtlpMockTraceServiceStub(); std::unique_ptr stub_interface( mock_stub); diff --git a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc index 6a31639219..143be576e6 100644 --- a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc @@ -25,6 +25,7 @@ #include "opentelemetry/sdk/trace/tracer_provider_factory.h" #include "opentelemetry/trace/provider.h" +#include #include #if defined(_MSC_VER) @@ -41,6 +42,137 @@ namespace exporter namespace otlp { +namespace +{ +class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceServiceStub +{ +public: +#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::async_interface; +#else + using async_interface_base = + proto::collector::trace::v1::TraceService::StubInterface::experimental_async_interface; +#endif + + OtlpMockTraceServiceStub() : async_interface_(this) {} + + class async_interface : public async_interface_base + { + public: + async_interface(OtlpMockTraceServiceStub *owner) : stub_(owner) {} + + virtual ~async_interface() {} + + void Export( + ::grpc::ClientContext *context, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest *request, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse *response, + std::function callback) override + { + stub_->last_async_status_ = stub_->Export(context, *request, response); + callback(stub_->last_async_status_); + } + +#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ + defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::ClientUnaryReactor * /*reactor*/) override + {} +#else + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/, + ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/, + ::grpc::experimental::ClientUnaryReactor * /*reactor*/) + {} +#endif + + private: + OtlpMockTraceServiceStub *stub_; + }; + +#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + async_interface_base *async() override { return &async_interface_; } +#else + async_interface_base *experimental_async() override { return &async_interface_; } +#endif + + ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } + +private: + ::grpc::Status last_async_status_; + async_interface async_interface_; +}; + +class OtlpMockLogsServiceStub : public proto::collector::logs::v1::MockLogsServiceStub +{ +public: +#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + using async_interface_base = + proto::collector::logs::v1::LogsService::StubInterface::async_interface; +#else + using async_interface_base = + proto::collector::logs::v1::LogsService::StubInterface::experimental_async_interface; +#endif + + OtlpMockLogsServiceStub() : async_interface_(this) {} + + class async_interface : public async_interface_base + { + public: + async_interface(OtlpMockLogsServiceStub *owner) : stub_(owner) {} + + virtual ~async_interface() {} + + void Export( + ::grpc::ClientContext *context, + const ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest *request, + ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse *response, + std::function callback) override + { + stub_->last_async_status_ = stub_->Export(context, *request, response); + callback(stub_->last_async_status_); + } + +#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ + defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest * /*request*/, + ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse * /*response*/, + ::grpc::ClientUnaryReactor * /*reactor*/) override + {} +#else + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest * /*request*/, + ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse * /*response*/, + ::grpc::experimental::ClientUnaryReactor * /*reactor*/) + {} +#endif + + private: + OtlpMockLogsServiceStub *stub_; + }; + +#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + async_interface_base *async() override { return &async_interface_; } +#else + async_interface_base *experimental_async() override { return &async_interface_; } +#endif + + ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } + +private: + ::grpc::Status last_async_status_; + async_interface async_interface_; +}; +} // namespace + class OtlpGrpcLogRecordExporterTestPeer : public ::testing::Test { public: @@ -68,7 +200,7 @@ class OtlpGrpcLogRecordExporterTestPeer : public ::testing::Test TEST_F(OtlpGrpcLogRecordExporterTestPeer, ShutdownTest) { - auto mock_stub = new proto::collector::logs::v1::MockLogsServiceStub(); + auto mock_stub = new OtlpMockLogsServiceStub(); std::unique_ptr stub_interface(mock_stub); auto exporter = GetExporter(stub_interface); @@ -77,12 +209,10 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ShutdownTest) // exporter shuold not be shutdown by default nostd::span> batch_1(&recordable_1, 1); - EXPECT_CALL(*mock_stub, Export(_, _, _)) - .Times(Exactly(1)) - .WillOnce(Return(grpc::Status::OK)) - .WillOnce(Return(grpc::Status::CANCELLED)); + EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK)); auto result = exporter->Export(batch_1); + exporter->ForceFlush(); EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); exporter->Shutdown(); @@ -95,7 +225,7 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ShutdownTest) // Call Export() directly TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportUnitTest) { - auto mock_stub = new proto::collector::logs::v1::MockLogsServiceStub(); + auto mock_stub = new OtlpMockLogsServiceStub(); std::unique_ptr stub_interface(mock_stub); auto exporter = GetExporter(stub_interface); @@ -106,6 +236,7 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportUnitTest) nostd::span> batch_1(&recordable_1, 1); EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK)); auto result = exporter->Export(batch_1); + exporter->ForceFlush(); EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); // Test failed RPC @@ -114,13 +245,19 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportUnitTest) .Times(Exactly(1)) .WillOnce(Return(grpc::Status::CANCELLED)); result = exporter->Export(batch_2); + exporter->ForceFlush(); +#if defined(ENABLE_ASYNC_EXPORT) + EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); + EXPECT_FALSE(mock_stub->GetLastAsyncStatus().ok()); +#else EXPECT_EQ(sdk::common::ExportResult::kFailure, result); +#endif } // Create spans, let processor call Export() TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportIntegrationTest) { - auto mock_stub = new proto::collector::logs::v1::MockLogsServiceStub(); + auto mock_stub = new OtlpMockLogsServiceStub(); std::unique_ptr stub_interface(mock_stub); auto exporter = GetExporter(stub_interface); @@ -149,7 +286,7 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportIntegrationTest) '3', '2', '1', '0'}; opentelemetry::trace::SpanId span_id{span_id_bin}; - auto trace_mock_stub = new proto::collector::trace::v1::MockTraceServiceStub(); + auto trace_mock_stub = new OtlpMockTraceServiceStub(); std::unique_ptr trace_stub_interface( trace_mock_stub); diff --git a/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc b/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc index dc6dde9d79..40587e5a52 100644 --- a/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc @@ -29,6 +29,7 @@ # include "opentelemetry/sdk/trace/tracer_provider.h" # include "opentelemetry/trace/provider.h" +# include # include # if defined(_MSC_VER)