Skip to content

Commit

Permalink
Fix unit test for OTLP gRPC async exporting.
Browse files Browse the repository at this point in the history
  • Loading branch information
owent committed Nov 16, 2023
1 parent 25419f6 commit e7f119a
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 12 deletions.
2 changes: 1 addition & 1 deletion exporters/otlp/src/otlp_grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ OtlpGrpcClient::~OtlpGrpcClient()
std::shared_ptr<OtlpGrpcClientAsyncData> async_data;
async_data.swap(async_data_);

while (async_data->running_requests.load(std::memory_order_acquire) > 0)
while (async_data && async_data->running_requests.load(std::memory_order_acquire) > 0)
{
std::unique_lock<std::mutex> lock{async_data->session_waker_lock};
async_data->session_waker.wait_for(lock, async_data->export_timeout, [async_data]() {
Expand Down
81 changes: 78 additions & 3 deletions exporters/otlp/test/otlp_grpc_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
# include "opentelemetry/sdk/trace/tracer_provider.h"
# include "opentelemetry/trace/provider.h"

# include <grpcpp/version_info.h>
# include <gtest/gtest.h>

# if defined(_MSC_VER)
Expand All @@ -45,6 +46,73 @@ namespace exporter
namespace otlp
{

namespace
{
class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceServiceStub
{
public:
# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039
using async_interface_base =
proto::collector::trace::v1::TraceService::StubInterface::async_interface;
# else
using async_interface_base =
proto::collector::trace::v1::TraceService::StubInterface::experimental_async_interface;
# endif

OtlpMockTraceServiceStub() : async_interface_(this) {}

class async_interface : public async_interface_base
{
public:
async_interface(OtlpMockTraceServiceStub *owner) : stub_(owner) {}

virtual ~async_interface() {}

void Export(
::grpc::ClientContext *context,
const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest *request,
::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse *response,
std::function<void(::grpc::Status)> callback) override
{
stub_->last_async_status_ = stub_->Export(context, *request, response);
callback(stub_->last_async_status_);
}

# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \
defined(GRPC_CALLBACK_API_NONEXPERIMENTAL)
void Export(
::grpc::ClientContext * /*context*/,
const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/,
::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/,
::grpc::ClientUnaryReactor * /*reactor*/) override
{}
# else
void Export(
::grpc::ClientContext * /*context*/,
const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/,
::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/,
::grpc::experimental::ClientUnaryReactor * /*reactor*/)
{}
# endif

private:
OtlpMockTraceServiceStub *stub_;
};

# if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039
async_interface_base *async() override { return &async_interface_; }
# else
async_interface_base *experimental_async() override { return &async_interface_; }
# endif

::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; }

private:
::grpc::Status last_async_status_;
async_interface async_interface_;
};
} // namespace

class OtlpGrpcExporterTestPeer : public ::testing::Test
{
public:
Expand All @@ -64,7 +132,7 @@ class OtlpGrpcExporterTestPeer : public ::testing::Test

TEST_F(OtlpGrpcExporterTestPeer, ShutdownTest)
{
auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub();
auto mock_stub = new OtlpMockTraceServiceStub();
std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> stub_interface(
mock_stub);
auto exporter = GetExporter(stub_interface);
Expand All @@ -78,6 +146,7 @@ TEST_F(OtlpGrpcExporterTestPeer, ShutdownTest)
nostd::span<std::unique_ptr<sdk::trace::Recordable>> batch_1(&recordable_1, 1);
EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK));
auto result = exporter->Export(batch_1);
exporter->ForceFlush();
EXPECT_EQ(sdk::common::ExportResult::kSuccess, result);

exporter->Shutdown();
Expand All @@ -90,7 +159,7 @@ TEST_F(OtlpGrpcExporterTestPeer, ShutdownTest)
// Call Export() directly
TEST_F(OtlpGrpcExporterTestPeer, ExportUnitTest)
{
auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub();
auto mock_stub = new OtlpMockTraceServiceStub();
std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> stub_interface(
mock_stub);
auto exporter = GetExporter(stub_interface);
Expand All @@ -112,13 +181,19 @@ TEST_F(OtlpGrpcExporterTestPeer, ExportUnitTest)
.Times(Exactly(1))
.WillOnce(Return(grpc::Status::CANCELLED));
result = exporter->Export(batch_2);
exporter->ForceFlush();
# if defined(ENABLE_ASYNC_EXPORT)
EXPECT_EQ(sdk::common::ExportResult::kSuccess, result);
EXPECT_FALSE(mock_stub->GetLastAsyncStatus().ok());
# else
EXPECT_EQ(sdk::common::ExportResult::kFailure, result);
# endif
}

// Create spans, let processor call Export()
TEST_F(OtlpGrpcExporterTestPeer, ExportIntegrationTest)
{
auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub();
auto mock_stub = new OtlpMockTraceServiceStub();
std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> stub_interface(
mock_stub);

Expand Down
153 changes: 145 additions & 8 deletions exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "opentelemetry/sdk/trace/tracer_provider_factory.h"
#include "opentelemetry/trace/provider.h"

#include <grpcpp/version_info.h>
#include <gtest/gtest.h>

#if defined(_MSC_VER)
Expand All @@ -41,6 +42,137 @@ namespace exporter
namespace otlp
{

namespace
{
class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceServiceStub
{
public:
#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039
using async_interface_base =
proto::collector::trace::v1::TraceService::StubInterface::async_interface;
#else
using async_interface_base =
proto::collector::trace::v1::TraceService::StubInterface::experimental_async_interface;
#endif

OtlpMockTraceServiceStub() : async_interface_(this) {}

class async_interface : public async_interface_base
{
public:
async_interface(OtlpMockTraceServiceStub *owner) : stub_(owner) {}

virtual ~async_interface() {}

void Export(
::grpc::ClientContext *context,
const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest *request,
::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse *response,
std::function<void(::grpc::Status)> callback) override
{
stub_->last_async_status_ = stub_->Export(context, *request, response);
callback(stub_->last_async_status_);
}

#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \
defined(GRPC_CALLBACK_API_NONEXPERIMENTAL)
void Export(
::grpc::ClientContext * /*context*/,
const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/,
::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/,
::grpc::ClientUnaryReactor * /*reactor*/) override
{}
#else
void Export(
::grpc::ClientContext * /*context*/,
const ::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest * /*request*/,
::opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse * /*response*/,
::grpc::experimental::ClientUnaryReactor * /*reactor*/)
{}
#endif

private:
OtlpMockTraceServiceStub *stub_;
};

#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039
async_interface_base *async() override { return &async_interface_; }
#else
async_interface_base *experimental_async() override { return &async_interface_; }
#endif

::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; }

private:
::grpc::Status last_async_status_;
async_interface async_interface_;
};

class OtlpMockLogsServiceStub : public proto::collector::logs::v1::MockLogsServiceStub
{
public:
#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039
using async_interface_base =
proto::collector::logs::v1::LogsService::StubInterface::async_interface;
#else
using async_interface_base =
proto::collector::logs::v1::LogsService::StubInterface::experimental_async_interface;
#endif

OtlpMockLogsServiceStub() : async_interface_(this) {}

class async_interface : public async_interface_base
{
public:
async_interface(OtlpMockLogsServiceStub *owner) : stub_(owner) {}

virtual ~async_interface() {}

void Export(
::grpc::ClientContext *context,
const ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest *request,
::opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse *response,
std::function<void(::grpc::Status)> callback) override
{
stub_->last_async_status_ = stub_->Export(context, *request, response);
callback(stub_->last_async_status_);
}

#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \
defined(GRPC_CALLBACK_API_NONEXPERIMENTAL)
void Export(
::grpc::ClientContext * /*context*/,
const ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest * /*request*/,
::opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse * /*response*/,
::grpc::ClientUnaryReactor * /*reactor*/) override
{}
#else
void Export(
::grpc::ClientContext * /*context*/,
const ::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest * /*request*/,
::opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse * /*response*/,
::grpc::experimental::ClientUnaryReactor * /*reactor*/)
{}
#endif

private:
OtlpMockLogsServiceStub *stub_;
};

#if (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039
async_interface_base *async() override { return &async_interface_; }
#else
async_interface_base *experimental_async() override { return &async_interface_; }
#endif

::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; }

private:
::grpc::Status last_async_status_;
async_interface async_interface_;
};
} // namespace

class OtlpGrpcLogRecordExporterTestPeer : public ::testing::Test
{
public:
Expand Down Expand Up @@ -68,7 +200,7 @@ class OtlpGrpcLogRecordExporterTestPeer : public ::testing::Test

TEST_F(OtlpGrpcLogRecordExporterTestPeer, ShutdownTest)
{
auto mock_stub = new proto::collector::logs::v1::MockLogsServiceStub();
auto mock_stub = new OtlpMockLogsServiceStub();
std::unique_ptr<proto::collector::logs::v1::LogsService::StubInterface> stub_interface(mock_stub);
auto exporter = GetExporter(stub_interface);

Expand All @@ -77,12 +209,10 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ShutdownTest)

// exporter shuold not be shutdown by default
nostd::span<std::unique_ptr<sdk::logs::Recordable>> batch_1(&recordable_1, 1);
EXPECT_CALL(*mock_stub, Export(_, _, _))
.Times(Exactly(1))
.WillOnce(Return(grpc::Status::OK))
.WillOnce(Return(grpc::Status::CANCELLED));
EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK));

auto result = exporter->Export(batch_1);
exporter->ForceFlush();
EXPECT_EQ(sdk::common::ExportResult::kSuccess, result);

exporter->Shutdown();
Expand All @@ -95,7 +225,7 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ShutdownTest)
// Call Export() directly
TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportUnitTest)
{
auto mock_stub = new proto::collector::logs::v1::MockLogsServiceStub();
auto mock_stub = new OtlpMockLogsServiceStub();
std::unique_ptr<proto::collector::logs::v1::LogsService::StubInterface> stub_interface(mock_stub);
auto exporter = GetExporter(stub_interface);

Expand All @@ -106,6 +236,7 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportUnitTest)
nostd::span<std::unique_ptr<sdk::logs::Recordable>> batch_1(&recordable_1, 1);
EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK));
auto result = exporter->Export(batch_1);
exporter->ForceFlush();
EXPECT_EQ(sdk::common::ExportResult::kSuccess, result);

// Test failed RPC
Expand All @@ -114,13 +245,19 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportUnitTest)
.Times(Exactly(1))
.WillOnce(Return(grpc::Status::CANCELLED));
result = exporter->Export(batch_2);
exporter->ForceFlush();
#if defined(ENABLE_ASYNC_EXPORT)
EXPECT_EQ(sdk::common::ExportResult::kSuccess, result);
EXPECT_FALSE(mock_stub->GetLastAsyncStatus().ok());
#else
EXPECT_EQ(sdk::common::ExportResult::kFailure, result);
#endif
}

// Create spans, let processor call Export()
TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportIntegrationTest)
{
auto mock_stub = new proto::collector::logs::v1::MockLogsServiceStub();
auto mock_stub = new OtlpMockLogsServiceStub();
std::unique_ptr<proto::collector::logs::v1::LogsService::StubInterface> stub_interface(mock_stub);

auto exporter = GetExporter(stub_interface);
Expand Down Expand Up @@ -149,7 +286,7 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportIntegrationTest)
'3', '2', '1', '0'};
opentelemetry::trace::SpanId span_id{span_id_bin};

auto trace_mock_stub = new proto::collector::trace::v1::MockTraceServiceStub();
auto trace_mock_stub = new OtlpMockTraceServiceStub();
std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> trace_stub_interface(
trace_mock_stub);

Expand Down
1 change: 1 addition & 0 deletions exporters/otlp/test/otlp_grpc_metric_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
# include "opentelemetry/sdk/trace/tracer_provider.h"
# include "opentelemetry/trace/provider.h"

# include <grpcpp/version_info.h>
# include <gtest/gtest.h>

# if defined(_MSC_VER)
Expand Down

0 comments on commit e7f119a

Please sign in to comment.