Skip to content

Commit

Permalink
Add an OpenTelemetry gRPC access logger (just gRPC handling) (#14818)
Browse files Browse the repository at this point in the history
Signed-off-by: Itamar Kaminski <[email protected]>
  • Loading branch information
itamarkam authored Feb 10, 2021
1 parent 8488506 commit 684afd1
Show file tree
Hide file tree
Showing 12 changed files with 545 additions and 1 deletion.
1 change: 1 addition & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
/api/ @envoyproxy/api-shepherds
# access loggers
/*/extensions/access_loggers/common @auni53 @zuercher
/*/extensions/access_loggers/open_telemetry @itamarkam @yanavlasov @htuch
# compression extensions
/*/extensions/compression/common/compressor @rojkov @junr03
/*/extensions/compression/gzip/compressor @rojkov @junr03
Expand Down
26 changes: 26 additions & 0 deletions api/bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def api_dependencies():
name = "com_github_openzipkin_zipkinapi",
build_file_content = ZIPKINAPI_BUILD_CONTENT,
)
external_http_archive(
name = "opentelemetry_proto",
build_file_content = OPENTELEMETRY_LOGS_BUILD_CONTENT,
)

PROMETHEUSMETRICS_BUILD_CONTENT = """
load("@envoy_api//bazel:api_build_system.bzl", "api_cc_py_proto_library")
Expand Down Expand Up @@ -101,3 +105,25 @@ go_proto_library(
visibility = ["//visibility:public"],
)
"""

OPENTELEMETRY_LOGS_BUILD_CONTENT = """
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@rules_cc//cc:defs.bzl", "cc_proto_library")
proto_library(
name = "logs",
srcs = [
"opentelemetry/proto/collector/logs/v1/logs_service.proto",
"opentelemetry/proto/common/v1/common.proto",
"opentelemetry/proto/logs/v1/logs.proto",
"opentelemetry/proto/resource/v1/resource.proto",
],
visibility = ["//visibility:public"],
)
cc_proto_library(
name = "logs_cc_proto",
deps = [":logs"],
visibility = ["//visibility:public"],
)
"""
11 changes: 11 additions & 0 deletions api/bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,15 @@ REPOSITORY_LOCATIONS_SPEC = dict(
release_date = "2020-08-17",
use_category = ["api"],
),
opentelemetry_proto = dict(
project_name = "OpenTelemetry Proto",
project_desc = "Language Independent Interface Types For OpenTelemetry",
project_url = "https://github.com/open-telemetry/opentelemetry-proto",
version = "0.7.0",
sha256 = "39cc1fb45039c7687354ca497aff8a55c71d0f1e484f6b81124ba9d821c36441",
strip_prefix = "opentelemetry-proto-{version}",
urls = ["https://github.com/open-telemetry/opentelemetry-proto/archive/v{version}.tar.gz"],
release_date = "2020-12-09",
use_category = ["api"],
),
)
26 changes: 26 additions & 0 deletions generated_api_shadow/bazel/repositories.bzl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions generated_api_shadow/bazel/repository_locations.bzl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion source/extensions/access_loggers/common/grpc_access_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
virtual void initMessage() PURE;
virtual void addEntry(HttpLogProto&& entry) PURE;
virtual void addEntry(TcpLogProto&& entry) PURE;
virtual void clearMessage() { message_.Clear(); }

void flush() {
if (isEmpty()) {
Expand All @@ -233,7 +234,7 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
if (client_.log(message_)) {
// Clear the message regardless of the success.
approximate_message_size_bytes_ = 0;
message_.Clear();
clearMessage();
}
}

Expand Down
27 changes: 27 additions & 0 deletions source/extensions/access_loggers/open_telemetry/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_extension_package",
)

licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_cc_library(
name = "grpc_access_log_lib",
srcs = ["grpc_access_log_impl.cc"],
hdrs = ["grpc_access_log_impl.h"],
deps = [
"//include/envoy/event:dispatcher_interface",
"//include/envoy/grpc:async_client_manager_interface",
"//include/envoy/local_info:local_info_interface",
"//include/envoy/thread_local:thread_local_interface",
"//source/common/config:utility_lib",
"//source/common/grpc:typed_async_client_lib",
"//source/common/protobuf",
"//source/extensions/access_loggers/common:grpc_access_logger",
"@envoy_api//envoy/extensions/access_loggers/grpc/v3:pkg_cc_proto",
"@opentelemetry_proto//:logs_cc_proto",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#include "extensions/access_loggers/open_telemetry/grpc_access_log_impl.h"

#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/local_info/local_info.h"

#include "common/config/utility.h"
#include "common/grpc/typed_async_client.h"

#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h"
#include "opentelemetry/proto/common/v1/common.pb.h"
#include "opentelemetry/proto/logs/v1/logs.pb.h"
#include "opentelemetry/proto/resource/v1/resource.pb.h"

const char GRPC_LOG_STATS_PREFIX[] = "access_logs.open_telemetry_access_log.";

namespace Envoy {
namespace Extensions {
namespace AccessLoggers {
namespace OpenTelemetry {

GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(
Grpc::RawAsyncClientPtr&& client, std::string log_name,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
envoy::config::core::v3::ApiVersion transport_api_version)
: GrpcAccessLogger(
std::move(client), buffer_flush_interval_msec, max_buffer_size_bytes, dispatcher, scope,
GRPC_LOG_STATS_PREFIX,
Grpc::VersionedMethods("opentelemetry.proto.collector.logs.v1.LogsService.Export",
"opentelemetry.proto.collector.logs.v1.LogsService.Export")
.getMethodDescriptorForVersion(transport_api_version),
transport_api_version) {
initMessageRoot(log_name, local_info);
}

namespace {

opentelemetry::proto::common::v1::KeyValue getStringKeyValue(const std::string& key,
const std::string& value) {
opentelemetry::proto::common::v1::KeyValue keyValue;
keyValue.set_key(key);
keyValue.mutable_value()->set_string_value(value);
return keyValue;
}

} // namespace

// See comment about the structure of repeated fields in the header file.
// TODO(itamarkam): allow user configurable attributes.
void GrpcAccessLoggerImpl::initMessageRoot(const std::string& log_name,
const LocalInfo::LocalInfo& local_info) {
auto* resource_logs = message_.add_resource_logs();
root_ = resource_logs->add_instrumentation_library_logs();
auto* resource = resource_logs->mutable_resource();
*resource->add_attributes() = getStringKeyValue("log_name", log_name);
*resource->add_attributes() = getStringKeyValue("zone_name", local_info.zoneName());
*resource->add_attributes() = getStringKeyValue("cluster_name", local_info.clusterName());
*resource->add_attributes() = getStringKeyValue("node_name", local_info.nodeName());
}

void GrpcAccessLoggerImpl::addEntry(opentelemetry::proto::logs::v1::LogRecord&& entry) {
root_->mutable_logs()->Add(std::move(entry));
}

bool GrpcAccessLoggerImpl::isEmpty() { return root_->logs().empty(); }

// The message is already initialized in the c'tor, and only the logs are cleared.
void GrpcAccessLoggerImpl::initMessage() {}

void GrpcAccessLoggerImpl::clearMessage() { root_->clear_logs(); }

GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager,
Stats::Scope& scope,
ThreadLocal::SlotAllocator& tls,
const LocalInfo::LocalInfo& local_info)
: GrpcAccessLoggerCache(async_client_manager, scope, tls), local_info_(local_info) {}

GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
Grpc::RawAsyncClientPtr&& client, std::chrono::milliseconds buffer_flush_interval_msec,
uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher, Stats::Scope& scope) {
return std::make_shared<GrpcAccessLoggerImpl>(
std::move(client), config.log_name(), buffer_flush_interval_msec, max_buffer_size_bytes,
dispatcher, local_info_, scope, Config::Utility::getAndCheckTransportVersion(config));
}

} // namespace OpenTelemetry
} // namespace AccessLoggers
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#pragma once

#include <memory>

#include "envoy/event/dispatcher.h"
#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/local_info/local_info.h"
#include "envoy/thread_local/thread_local.h"

#include "common/protobuf/protobuf.h"

#include "extensions/access_loggers/common/grpc_access_logger.h"

#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h"
#include "opentelemetry/proto/common/v1/common.pb.h"
#include "opentelemetry/proto/logs/v1/logs.pb.h"
#include "opentelemetry/proto/resource/v1/resource.pb.h"

namespace Envoy {
namespace Extensions {
namespace AccessLoggers {
namespace OpenTelemetry {

// Note: OpenTelemetry protos are extra flexible and used also in the OT collector for batching and
// so forth. As a result, some fields are repeated, but for our use case we assume the following
// structure:
// ExportLogsServiceRequest -> (single) ResourceLogs -> (single) InstrumentationLibraryLogs ->
// (repeated) LogRecord.
class GrpcAccessLoggerImpl
: public Common::GrpcAccessLogger<
opentelemetry::proto::logs::v1::LogRecord,
// OpenTelemetry logging uses LogRecord for both HTTP and TCP, so protobuf::Empty is used
// as an empty placeholder for the non-used addEntry method.
// TODO(itamarkam): Don't cache OpenTelemetry loggers by type (HTTP/TCP).
ProtobufWkt::Empty, opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest,
opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse> {
public:
GrpcAccessLoggerImpl(Grpc::RawAsyncClientPtr&& client, std::string log_name,
std::chrono::milliseconds buffer_flush_interval_msec,
uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher,
const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
envoy::config::core::v3::ApiVersion transport_api_version);

private:
void initMessageRoot(const std::string& log_name, const LocalInfo::LocalInfo& local_info);
// Extensions::AccessLoggers::GrpcCommon::GrpcAccessLogger
void addEntry(opentelemetry::proto::logs::v1::LogRecord&& entry) override;
// Non used addEntry method (the above is used for both TCP and HTTP).
void addEntry(ProtobufWkt::Empty&& entry) override { (void)entry; };
bool isEmpty() override;
void initMessage() override;
void clearMessage() override;

opentelemetry::proto::logs::v1::InstrumentationLibraryLogs* root_;
};

class GrpcAccessLoggerCacheImpl
: public Common::GrpcAccessLoggerCache<
GrpcAccessLoggerImpl,
envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig> {
public:
GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager, Stats::Scope& scope,
ThreadLocal::SlotAllocator& tls,
const LocalInfo::LocalInfo& local_info);

private:
// Common::GrpcAccessLoggerCache
GrpcAccessLoggerImpl::SharedPtr
createLogger(const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
Grpc::RawAsyncClientPtr&& client,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, Stats::Scope& scope) override;

const LocalInfo::LocalInfo& local_info_;
};

/**
* Aliases for class interfaces for mock definitions.
*/
using GrpcAccessLogger = GrpcAccessLoggerImpl::Interface;
using GrpcAccessLoggerSharedPtr = GrpcAccessLogger::SharedPtr;

using GrpcAccessLoggerCache = GrpcAccessLoggerCacheImpl::Interface;
using GrpcAccessLoggerCacheSharedPtr = GrpcAccessLoggerCache::SharedPtr;

} // namespace OpenTelemetry
} // namespace AccessLoggers
} // namespace Extensions
} // namespace Envoy
Loading

0 comments on commit 684afd1

Please sign in to comment.