Skip to content

Commit

Permalink
Add async exporting for OTLP gRPC exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
owent committed Nov 15, 2023
1 parent 3e840b0 commit 9b3b057
Show file tree
Hide file tree
Showing 16 changed files with 787 additions and 68 deletions.
7 changes: 4 additions & 3 deletions api/include/opentelemetry/common/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
// GCC 9 has likely attribute but do not support declare it at the beginning of statement
# if defined(__has_cpp_attribute) && (defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 9)
# if __has_cpp_attribute(likely)
# define OPENTELEMETRY_LIKELY_IF(...) \
if (__VA_ARGS__) \
[[likely]]
# define OPENTELEMETRY_LIKELY_IF(...) if (__VA_ARGS__) [[likely]]

# endif
# endif
Expand Down Expand Up @@ -164,16 +162,19 @@ point.
#if defined(__clang__)

# define OPENTELEMETRY_API_SINGLETON __attribute__((visibility("default")))
# define OPENTELEMETRY_LOCAL_SYMBOL __attribute__((visibility("hidden")))

#elif defined(__GNUC__)

# define OPENTELEMETRY_API_SINGLETON __attribute__((visibility("default")))
# define OPENTELEMETRY_LOCAL_SYMBOL __attribute__((visibility("hidden")))

#else

/* Add support for other compilers here. */

# define OPENTELEMETRY_API_SINGLETON
# define OPENTELEMETRY_LOCAL_SYMBOL

#endif

Expand Down
112 changes: 106 additions & 6 deletions exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

#pragma once

#include <grpcpp/completion_queue.h>
#include <grpcpp/grpcpp.h>

#include <memory>

#include "opentelemetry/sdk/common/exporter_utils.h"

#include "opentelemetry/exporters/otlp/otlp_grpc_client_options.h"

#include "opentelemetry/exporters/otlp/protobuf_include_prefix.h"
Expand All @@ -26,12 +29,21 @@ namespace otlp

struct OtlpGrpcClientOptions;

#ifdef ENABLE_ASYNC_EXPORT
struct OtlpGrpcClientAsyncData;
#endif

/**
* The OTLP gRPC client contains utility functions of gRPC.
*/
class OtlpGrpcClient
{
public:
#ifdef ENABLE_ASYNC_EXPORT
OtlpGrpcClient();
~OtlpGrpcClient();
#endif

/**
* Create gRPC channel from the exporter options.
*/
Expand Down Expand Up @@ -68,21 +80,109 @@ class OtlpGrpcClient

static grpc::Status DelegateExport(
proto::collector::trace::v1::TraceService::StubInterface *stub,
grpc::ClientContext *context,
const proto::collector::trace::v1::ExportTraceServiceRequest &request,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::trace::v1::ExportTraceServiceRequest &&request,
proto::collector::trace::v1::ExportTraceServiceResponse *response);

static grpc::Status DelegateExport(
proto::collector::metrics::v1::MetricsService::StubInterface *stub,
grpc::ClientContext *context,
const proto::collector::metrics::v1::ExportMetricsServiceRequest &request,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::metrics::v1::ExportMetricsServiceRequest &&request,
proto::collector::metrics::v1::ExportMetricsServiceResponse *response);

static grpc::Status DelegateExport(
proto::collector::logs::v1::LogsService::StubInterface *stub,
grpc::ClientContext *context,
const proto::collector::logs::v1::ExportLogsServiceRequest &request,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::logs::v1::ExportLogsServiceRequest &&request,
proto::collector::logs::v1::ExportLogsServiceResponse *response);

#ifdef ENABLE_ASYNC_EXPORT

/**
* Async export
* @param options Options used to message to create gRPC context and stub(if necessary)
* @param arena Protobuf arena to hold lifetime of all messages
* @param request Request for this RPC
* @param result_callback callback to call when the exporting is done
* @return return the status of this operation
*/
sdk::common::ExportResult DelegateAsyncExport(
const OtlpGrpcClientOptions &options,
proto::collector::trace::v1::TraceService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::trace::v1::ExportTraceServiceRequest &&request,
std::function<bool(opentelemetry::sdk::common::ExportResult,
std::unique_ptr<google::protobuf::Arena> &&,
const proto::collector::trace::v1::ExportTraceServiceRequest &,
proto::collector::trace::v1::ExportTraceServiceResponse *)>
&&result_callback) noexcept;

/**
* Async export
* @param options Options used to message to create gRPC context and stub(if necessary)
* @param arena Protobuf arena to hold lifetime of all messages
* @param request Request for this RPC
* @param result_callback callback to call when the exporting is done
* @return return the status of this operation
*/
sdk::common::ExportResult DelegateAsyncExport(
const OtlpGrpcClientOptions &options,
proto::collector::metrics::v1::MetricsService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::metrics::v1::ExportMetricsServiceRequest &&request,
std::function<bool(opentelemetry::sdk::common::ExportResult,
std::unique_ptr<google::protobuf::Arena> &&,
const proto::collector::metrics::v1::ExportMetricsServiceRequest &,
proto::collector::metrics::v1::ExportMetricsServiceResponse *)>
&&result_callback) noexcept;

/**
* Async export
* @param options Options used to message to create gRPC context and stub(if necessary)
* @param arena Protobuf arena to hold lifetime of all messages
* @param request Request for this RPC
* @param result_callback callback to call when the exporting is done
* @return return the status of this operation
*/
sdk::common::ExportResult DelegateAsyncExport(
const OtlpGrpcClientOptions &options,
proto::collector::logs::v1::LogsService::StubInterface *stub,
std::unique_ptr<grpc::ClientContext> &&context,
std::unique_ptr<google::protobuf::Arena> &&arena,
proto::collector::logs::v1::ExportLogsServiceRequest &&request,
std::function<bool(opentelemetry::sdk::common::ExportResult,
std::unique_ptr<google::protobuf::Arena> &&,
const proto::collector::logs::v1::ExportLogsServiceRequest &,
proto::collector::logs::v1::ExportLogsServiceResponse *)>
&&result_callback) noexcept;

/**
* Force flush the gRPC client.
*/
bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

/**
* Shut down the gRPC client.
* @param timeout an optional timeout, the default timeout of 0 means that no
* timeout is applied.
* @return return the status of this operation
*/
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept;

std::shared_ptr<OtlpGrpcClientAsyncData> MutableAsyncData(const OtlpGrpcClientOptions &options);

private:
// Stores if this gRPC client had its Shutdown() method called
bool is_shutdown_;

// Stores shared data between threads of this gRPC client
std::shared_ptr<OtlpGrpcClientAsyncData> async_data_;
#endif
};
} // namespace otlp
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ struct OtlpGrpcClientOptions

/** User agent. */
std::string user_agent;

/** max number of threads that can be allocated from this */
std::size_t max_threads;

#ifdef ENABLE_ASYNC_EXPORT
// Concurrent requests
std::size_t max_concurrent_requests;
#endif
};

} // namespace otlp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace exporter
namespace otlp
{

class OtlpGrpcClient;

/**
* The OTLP exporter exports span data in OpenTelemetry Protocol (OTLP) format.
*/
Expand Down Expand Up @@ -73,6 +75,10 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
// The configuration options associated with this exporter.
const OtlpGrpcExporterOptions options_;

#ifdef ENABLE_ASYNC_EXPORT
std::shared_ptr<OtlpGrpcClient> client_;
#endif

// For testing
friend class OtlpGrpcExporterTestPeer;
friend class OtlpGrpcLogRecordExporterTestPeer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace exporter
namespace otlp
{

class OtlpGrpcClient;

/**
* The OTLP exporter exports log data in OpenTelemetry Protocol (OTLP) format in gRPC.
*/
Expand Down Expand Up @@ -73,6 +75,10 @@ class OtlpGrpcLogRecordExporter : public opentelemetry::sdk::logs::LogRecordExpo
// Configuration options for the exporter
const OtlpGrpcLogRecordExporterOptions options_;

#ifdef ENABLE_ASYNC_EXPORT
std::shared_ptr<OtlpGrpcClient> client_;
#endif

// For testing
friend class OtlpGrpcLogRecordExporterTestPeer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace exporter
namespace otlp
{

class OtlpGrpcClient;

/**
* The OTLP exporter exports metrics data in OpenTelemetry Protocol (OTLP) format in gRPC.
*/
Expand Down Expand Up @@ -59,6 +61,10 @@ class OtlpGrpcMetricExporter : public opentelemetry::sdk::metrics::PushMetricExp
// The configuration options associated with this exporter.
const OtlpGrpcMetricExporterOptions options_;

#ifdef ENABLE_ASYNC_EXPORT
std::shared_ptr<OtlpGrpcClient> client_;
#endif

// Aggregation Temporality selector
const sdk::metrics::AggregationTemporalitySelector aggregation_temporality_selector_;

Expand Down
Loading

0 comments on commit 9b3b057

Please sign in to comment.