From 89a94ba6482f158d9b44e1ed9c9893e402e70e0f Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Sun, 10 Nov 2024 11:28:29 +0100 Subject: [PATCH 01/10] grpc/protos: rename BIGQUERY_PROTO to GOOGLEAPIS_PROTO Signed-off-by: Attila Szakacs --- modules/grpc/protos/CMakeLists.txt | 32 +++++++++++++++--------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/modules/grpc/protos/CMakeLists.txt b/modules/grpc/protos/CMakeLists.txt index 4a5319bc9..5331a848a 100644 --- a/modules/grpc/protos/CMakeLists.txt +++ b/modules/grpc/protos/CMakeLists.txt @@ -6,9 +6,9 @@ set(LOKI_PROTO_SRCDIR "${CMAKE_CURRENT_SOURCE_DIR}/grafana-loki") set(LOKI_PROTO_BUILDDIR "${CMAKE_CURRENT_BINARY_DIR}/grafana-loki") set(LOKI_PROTO_BUILDDIR ${LOKI_PROTO_BUILDDIR} PARENT_SCOPE) -set(BIGQUERY_PROTO_SRCDIR "${CMAKE_CURRENT_SOURCE_DIR}/googleapis-proto") -set(BIGQUERY_PROTO_BUILDDIR "${CMAKE_CURRENT_BINARY_DIR}/googleapis-proto") -set(BIGQUERY_PROTO_BUILDDIR ${BIGQUERY_PROTO_BUILDDIR} PARENT_SCOPE) +set(GOOGLEAPIS_PROTO_SRCDIR "${CMAKE_CURRENT_SOURCE_DIR}/googleapis-proto") +set(GOOGLEAPIS_PROTO_BUILDDIR "${CMAKE_CURRENT_BINARY_DIR}/googleapis-proto") +set(GOOGLEAPIS_PROTO_BUILDDIR ${GOOGLEAPIS_PROTO_BUILDDIR} PARENT_SCOPE) set(CLICKHOUSE_PROTO_SRCDIR "${CMAKE_CURRENT_SOURCE_DIR}/clickhouse-proto") set(CLICKHOUSE_PROTO_BUILDDIR "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-proto") @@ -26,7 +26,7 @@ set(OTEL_PROTO_GRPC_SOURCES opentelemetry/proto/collector/metrics/v1/metrics_service.proto opentelemetry/proto/collector/trace/v1/trace_service.proto) -set(BIGQUERY_PROTO_SOURCES +set(GOOGLEAPIS_PROTO_SOURCES google/api/annotations.proto google/api/client.proto google/api/field_behavior.proto @@ -42,7 +42,7 @@ set(BIGQUERY_PROTO_SOURCES google/rpc/status.proto ) -set(BIGQUERY_PROTO_GRPC_SOURCES +set(GOOGLEAPIS_PROTO_GRPC_SOURCES google/cloud/bigquery/storage/v1/storage.proto ) @@ -72,16 +72,16 @@ protobuf_generate_cpp_grpc( PROTOS ${LOKI_PROTO_GRPC_SOURCES}) protobuf_generate_cpp( - PROTO_PATH ${BIGQUERY_PROTO_SRCDIR} - CPP_OUT ${BIGQUERY_PROTO_BUILDDIR} - OUT_SRCS BIGQUERY_PROTO_GENERATED_SOURCES - PROTOS ${BIGQUERY_PROTO_SOURCES}) + PROTO_PATH ${GOOGLEAPIS_PROTO_SRCDIR} + CPP_OUT ${GOOGLEAPIS_PROTO_BUILDDIR} + OUT_SRCS GOOGLEAPIS_PROTO_GENERATED_SOURCES + PROTOS ${GOOGLEAPIS_PROTO_SOURCES}) protobuf_generate_cpp_grpc( - PROTO_PATH ${BIGQUERY_PROTO_SRCDIR} - CPP_OUT ${BIGQUERY_PROTO_BUILDDIR} - OUT_SRCS BIGQUERY_PROTO_GENERATED_GRPC_SOURCES - PROTOS ${BIGQUERY_PROTO_GRPC_SOURCES}) + PROTO_PATH ${GOOGLEAPIS_PROTO_SRCDIR} + CPP_OUT ${GOOGLEAPIS_PROTO_BUILDDIR} + OUT_SRCS GOOGLEAPIS_PROTO_GENERATED_GRPC_SOURCES + PROTOS ${GOOGLEAPIS_PROTO_GRPC_SOURCES}) protobuf_generate_cpp( PROTO_PATH ${CLICKHOUSE_PROTO_SRCDIR} @@ -99,8 +99,8 @@ add_library(grpc-protos SHARED ${OTEL_PROTO_GENERATED_SOURCES} ${OTEL_PROTO_GENERATED_GRPC_SOURCES} ${LOKI_PROTO_GENERATED_GRPC_SOURCES} - ${BIGQUERY_PROTO_GENERATED_SOURCES} - ${BIGQUERY_PROTO_GENERATED_GRPC_SOURCES} + ${GOOGLEAPIS_PROTO_GENERATED_SOURCES} + ${GOOGLEAPIS_PROTO_GENERATED_GRPC_SOURCES} ${CLICKHOUSE_PROTO_GENERATED_SOURCES} ${CLICKHOUSE_PROTO_GENERATED_GRPC_SOURCES} apphook.cpp @@ -108,7 +108,7 @@ add_library(grpc-protos SHARED target_link_libraries(grpc-protos PRIVATE ${MODULE_GRPC_LIBS} syslog-ng) target_include_directories(grpc-protos - PUBLIC ${OTEL_PROTO_BUILDDIR} ${LOKI_PROTO_BUILDDIR} ${BIGQUERY_PROTO_BUILDDIR} ${CLICKHOUSE_PROTO_BUILDDIR} + PUBLIC ${OTEL_PROTO_BUILDDIR} ${LOKI_PROTO_BUILDDIR} ${GOOGLEAPIS_PROTO_BUILDDIR} ${CLICKHOUSE_PROTO_BUILDDIR} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/modules/grpc PRIVATE ${CMAKE_CURRENT_BINARY_DIR} ${PROJECT_SOURCE_DIR} ) From 264527e17d5243fe97f41ec7d4c94e23cdcb022f Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Sun, 10 Nov 2024 11:28:51 +0100 Subject: [PATCH 02/10] grpc/protos: add google pubsub proto Copied from: https://github.com/googleapis/googleapis/blob/c72f219fedbb57d3f83c10550e135c4824b670eb/google/pubsub/v1/ Signed-off-by: Attila Szakacs --- modules/grpc/protos/CMakeLists.txt | 2 + modules/grpc/protos/Makefile.am | 8 + .../google/pubsub/v1/pubsub.proto | 1978 +++++++++++++++++ .../google/pubsub/v1/schema.proto | 410 ++++ 4 files changed, 2398 insertions(+) create mode 100644 modules/grpc/protos/googleapis-proto/google/pubsub/v1/pubsub.proto create mode 100644 modules/grpc/protos/googleapis-proto/google/pubsub/v1/schema.proto diff --git a/modules/grpc/protos/CMakeLists.txt b/modules/grpc/protos/CMakeLists.txt index 5331a848a..e08fd48ab 100644 --- a/modules/grpc/protos/CMakeLists.txt +++ b/modules/grpc/protos/CMakeLists.txt @@ -44,6 +44,8 @@ set(GOOGLEAPIS_PROTO_SOURCES set(GOOGLEAPIS_PROTO_GRPC_SOURCES google/cloud/bigquery/storage/v1/storage.proto + google/pubsub/v1/pubsub.proto + google/pubsub/v1/schema.proto ) set(LOKI_PROTO_GRPC_SOURCES diff --git a/modules/grpc/protos/Makefile.am b/modules/grpc/protos/Makefile.am index 3646f58f9..220368af8 100644 --- a/modules/grpc/protos/Makefile.am +++ b/modules/grpc/protos/Makefile.am @@ -115,6 +115,14 @@ GRPC_PROTOS_BUILT_SOURCES = \ $(GOOGLEAPIS_PROTO_BUILDDIR)/google/cloud/bigquery/storage/v1/stream.pb.h \ $(GOOGLEAPIS_PROTO_BUILDDIR)/google/cloud/bigquery/storage/v1/table.pb.cc \ $(GOOGLEAPIS_PROTO_BUILDDIR)/google/cloud/bigquery/storage/v1/table.pb.h \ + $(GOOGLEAPIS_PROTO_BUILDDIR)/google/pubsub/v1/pubsub.grpc.pb.cc \ + $(GOOGLEAPIS_PROTO_BUILDDIR)/google/pubsub/v1/pubsub.grpc.pb.h \ + $(GOOGLEAPIS_PROTO_BUILDDIR)/google/pubsub/v1/pubsub.pb.cc \ + $(GOOGLEAPIS_PROTO_BUILDDIR)/google/pubsub/v1/pubsub.pb.h \ + $(GOOGLEAPIS_PROTO_BUILDDIR)/google/pubsub/v1/schema.grpc.pb.cc \ + $(GOOGLEAPIS_PROTO_BUILDDIR)/google/pubsub/v1/schema.grpc.pb.h \ + $(GOOGLEAPIS_PROTO_BUILDDIR)/google/pubsub/v1/schema.pb.cc \ + $(GOOGLEAPIS_PROTO_BUILDDIR)/google/pubsub/v1/schema.pb.h \ $(GOOGLEAPIS_PROTO_BUILDDIR)/google/rpc/status.pb.cc \ $(GOOGLEAPIS_PROTO_BUILDDIR)/google/rpc/status.pb.h \ \ diff --git a/modules/grpc/protos/googleapis-proto/google/pubsub/v1/pubsub.proto b/modules/grpc/protos/googleapis-proto/google/pubsub/v1/pubsub.proto new file mode 100644 index 000000000..0f269f525 --- /dev/null +++ b/modules/grpc/protos/googleapis-proto/google/pubsub/v1/pubsub.proto @@ -0,0 +1,1978 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.pubsub.v1; + +import "google/api/annotations.proto"; +import "google/api/client.proto"; +import "google/api/field_behavior.proto"; +import "google/api/resource.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/field_mask.proto"; +import "google/protobuf/timestamp.proto"; +import "google/pubsub/v1/schema.proto"; + +option cc_enable_arenas = true; +option csharp_namespace = "Google.Cloud.PubSub.V1"; +option go_package = "cloud.google.com/go/pubsub/apiv1/pubsubpb;pubsubpb"; +option java_multiple_files = true; +option java_outer_classname = "PubsubProto"; +option java_package = "com.google.pubsub.v1"; +option php_namespace = "Google\\Cloud\\PubSub\\V1"; +option ruby_package = "Google::Cloud::PubSub::V1"; + +// The service that an application uses to manipulate topics, and to send +// messages to a topic. +service Publisher { + option (google.api.default_host) = "pubsub.googleapis.com"; + option (google.api.oauth_scopes) = + "https://www.googleapis.com/auth/cloud-platform," + "https://www.googleapis.com/auth/pubsub"; + + // Creates the given topic with the given name. See the [resource name rules] + // (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). + rpc CreateTopic(Topic) returns (Topic) { + option (google.api.http) = { + put: "/v1/{name=projects/*/topics/*}" + body: "*" + }; + option (google.api.method_signature) = "name"; + } + + // Updates an existing topic by updating the fields specified in the update + // mask. Note that certain properties of a topic are not modifiable. + rpc UpdateTopic(UpdateTopicRequest) returns (Topic) { + option (google.api.http) = { + patch: "/v1/{topic.name=projects/*/topics/*}" + body: "*" + }; + option (google.api.method_signature) = "topic,update_mask"; + } + + // Adds one or more messages to the topic. Returns `NOT_FOUND` if the topic + // does not exist. + rpc Publish(PublishRequest) returns (PublishResponse) { + option (google.api.http) = { + post: "/v1/{topic=projects/*/topics/*}:publish" + body: "*" + }; + option (google.api.method_signature) = "topic,messages"; + } + + // Gets the configuration of a topic. + rpc GetTopic(GetTopicRequest) returns (Topic) { + option (google.api.http) = { + get: "/v1/{topic=projects/*/topics/*}" + }; + option (google.api.method_signature) = "topic"; + } + + // Lists matching topics. + rpc ListTopics(ListTopicsRequest) returns (ListTopicsResponse) { + option (google.api.http) = { + get: "/v1/{project=projects/*}/topics" + }; + option (google.api.method_signature) = "project"; + } + + // Lists the names of the attached subscriptions on this topic. + rpc ListTopicSubscriptions(ListTopicSubscriptionsRequest) + returns (ListTopicSubscriptionsResponse) { + option (google.api.http) = { + get: "/v1/{topic=projects/*/topics/*}/subscriptions" + }; + option (google.api.method_signature) = "topic"; + } + + // Lists the names of the snapshots on this topic. Snapshots are used in + // [Seek](https://cloud.google.com/pubsub/docs/replay-overview) operations, + // which allow you to manage message acknowledgments in bulk. That is, you can + // set the acknowledgment state of messages in an existing subscription to the + // state captured by a snapshot. + rpc ListTopicSnapshots(ListTopicSnapshotsRequest) + returns (ListTopicSnapshotsResponse) { + option (google.api.http) = { + get: "/v1/{topic=projects/*/topics/*}/snapshots" + }; + option (google.api.method_signature) = "topic"; + } + + // Deletes the topic with the given name. Returns `NOT_FOUND` if the topic + // does not exist. After a topic is deleted, a new topic may be created with + // the same name; this is an entirely new topic with none of the old + // configuration or subscriptions. Existing subscriptions to this topic are + // not deleted, but their `topic` field is set to `_deleted-topic_`. + rpc DeleteTopic(DeleteTopicRequest) returns (google.protobuf.Empty) { + option (google.api.http) = { + delete: "/v1/{topic=projects/*/topics/*}" + }; + option (google.api.method_signature) = "topic"; + } + + // Detaches a subscription from this topic. All messages retained in the + // subscription are dropped. Subsequent `Pull` and `StreamingPull` requests + // will return FAILED_PRECONDITION. If the subscription is a push + // subscription, pushes to the endpoint will stop. + rpc DetachSubscription(DetachSubscriptionRequest) + returns (DetachSubscriptionResponse) { + option (google.api.http) = { + post: "/v1/{subscription=projects/*/subscriptions/*}:detach" + }; + } +} + +// A policy constraining the storage of messages published to the topic. +message MessageStoragePolicy { + // Optional. A list of IDs of Google Cloud regions where messages that are + // published to the topic may be persisted in storage. Messages published by + // publishers running in non-allowed Google Cloud regions (or running outside + // of Google Cloud altogether) are routed for storage in one of the allowed + // regions. An empty list means that no regions are allowed, and is not a + // valid configuration. + repeated string allowed_persistence_regions = 1 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. If true, `allowed_persistence_regions` is also used to enforce + // in-transit guarantees for messages. That is, Pub/Sub will fail + // Publish operations on this topic and subscribe operations + // on any subscription attached to this topic in any region that is + // not in `allowed_persistence_regions`. + bool enforce_in_transit = 2 [(google.api.field_behavior) = OPTIONAL]; +} + +// Settings for validating messages published against a schema. +message SchemaSettings { + // Required. The name of the schema that messages published should be + // validated against. Format is `projects/{project}/schemas/{schema}`. The + // value of this field will be `_deleted-schema_` if the schema has been + // deleted. + string schema = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Schema" } + ]; + + // Optional. The encoding of messages validated against `schema`. + Encoding encoding = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The minimum (inclusive) revision allowed for validating messages. + // If empty or not present, allow any revision to be validated against + // last_revision or any revision created before. + string first_revision_id = 3 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The maximum (inclusive) revision allowed for validating messages. + // If empty or not present, allow any revision to be validated against + // first_revision or any revision created after. + string last_revision_id = 4 [(google.api.field_behavior) = OPTIONAL]; +} + +// Settings for an ingestion data source on a topic. +message IngestionDataSourceSettings { + // Ingestion settings for Amazon Kinesis Data Streams. + message AwsKinesis { + // Possible states for ingestion from Amazon Kinesis Data Streams. + enum State { + // Default value. This value is unused. + STATE_UNSPECIFIED = 0; + + // Ingestion is active. + ACTIVE = 1; + + // Permission denied encountered while consuming data from Kinesis. + // This can happen if: + // - The provided `aws_role_arn` does not exist or does not have the + // appropriate permissions attached. + // - The provided `aws_role_arn` is not set up properly for Identity + // Federation using `gcp_service_account`. + // - The Pub/Sub SA is not granted the + // `iam.serviceAccounts.getOpenIdToken` permission on + // `gcp_service_account`. + KINESIS_PERMISSION_DENIED = 2; + + // Permission denied encountered while publishing to the topic. This can + // happen if the Pub/Sub SA has not been granted the [appropriate publish + // permissions](https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher) + PUBLISH_PERMISSION_DENIED = 3; + + // The Kinesis stream does not exist. + STREAM_NOT_FOUND = 4; + + // The Kinesis consumer does not exist. + CONSUMER_NOT_FOUND = 5; + } + + // Output only. An output-only field that indicates the state of the Kinesis + // ingestion source. + State state = 1 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Required. The Kinesis stream ARN to ingest data from. + string stream_arn = 2 [(google.api.field_behavior) = REQUIRED]; + + // Required. The Kinesis consumer ARN to used for ingestion in Enhanced + // Fan-Out mode. The consumer must be already created and ready to be used. + string consumer_arn = 3 [(google.api.field_behavior) = REQUIRED]; + + // Required. AWS role ARN to be used for Federated Identity authentication + // with Kinesis. Check the Pub/Sub docs for how to set up this role and the + // required permissions that need to be attached to it. + string aws_role_arn = 4 [(google.api.field_behavior) = REQUIRED]; + + // Required. The GCP service account to be used for Federated Identity + // authentication with Kinesis (via a `AssumeRoleWithWebIdentity` call for + // the provided role). The `aws_role_arn` must be set up with + // `accounts.google.com:sub` equals to this service account number. + string gcp_service_account = 5 [(google.api.field_behavior) = REQUIRED]; + } + + // Ingestion settings for Cloud Storage. + message CloudStorage { + // Possible states for ingestion from Cloud Storage. + enum State { + // Default value. This value is unused. + STATE_UNSPECIFIED = 0; + + // Ingestion is active. + ACTIVE = 1; + + // Permission denied encountered while calling the Cloud Storage API. This + // can happen if the Pub/Sub SA has not been granted the + // [appropriate + // permissions](https://cloud.google.com/storage/docs/access-control/iam-permissions): + // - storage.objects.list: to list the objects in a bucket. + // - storage.objects.get: to read the objects in a bucket. + // - storage.buckets.get: to verify the bucket exists. + CLOUD_STORAGE_PERMISSION_DENIED = 2; + + // Permission denied encountered while publishing to the topic. This can + // happen if the Pub/Sub SA has not been granted the [appropriate publish + // permissions](https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher) + PUBLISH_PERMISSION_DENIED = 3; + + // The provided Cloud Storage bucket doesn't exist. + BUCKET_NOT_FOUND = 4; + + // The Cloud Storage bucket has too many objects, ingestion will be + // paused. + TOO_MANY_OBJECTS = 5; + } + + // Configuration for reading Cloud Storage data in text format. Each line of + // text as specified by the delimiter will be set to the `data` field of a + // Pub/Sub message. + message TextFormat { + // Optional. When unset, '\n' is used. + optional string delimiter = 1 [(google.api.field_behavior) = OPTIONAL]; + } + + // Configuration for reading Cloud Storage data in Avro binary format. The + // bytes of each object will be set to the `data` field of a Pub/Sub + // message. + message AvroFormat {} + + // Configuration for reading Cloud Storage data written via [Cloud Storage + // subscriptions](https://cloud.google.com/pubsub/docs/cloudstorage). The + // data and attributes fields of the originally exported Pub/Sub message + // will be restored when publishing. + message PubSubAvroFormat {} + + // Output only. An output-only field that indicates the state of the Cloud + // Storage ingestion source. + State state = 1 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Optional. Cloud Storage bucket. The bucket name must be without any + // prefix like "gs://". See the [bucket naming requirements] + // (https://cloud.google.com/storage/docs/buckets#naming). + string bucket = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Defaults to text format. + oneof input_format { + // Optional. Data from Cloud Storage will be interpreted as text. + TextFormat text_format = 3 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Data from Cloud Storage will be interpreted in Avro format. + AvroFormat avro_format = 4 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. It will be assumed data from Cloud Storage was written via + // [Cloud Storage + // subscriptions](https://cloud.google.com/pubsub/docs/cloudstorage). + PubSubAvroFormat pubsub_avro_format = 5 + [(google.api.field_behavior) = OPTIONAL]; + } + + // Optional. Only objects with a larger or equal creation timestamp will be + // ingested. + google.protobuf.Timestamp minimum_object_create_time = 6 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Glob pattern used to match objects that will be ingested. If + // unset, all objects will be ingested. See the [supported + // patterns](https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-objects-and-prefixes-using-glob). + string match_glob = 9 [(google.api.field_behavior) = OPTIONAL]; + } + + // Only one source type can have settings set. + oneof source { + // Optional. Amazon Kinesis Data Streams. + AwsKinesis aws_kinesis = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Cloud Storage. + CloudStorage cloud_storage = 2 [(google.api.field_behavior) = OPTIONAL]; + } + + // Optional. Platform Logs settings. If unset, no Platform Logs will be + // generated. + PlatformLogsSettings platform_logs_settings = 4 + [(google.api.field_behavior) = OPTIONAL]; +} + +// Settings for Platform Logs produced by Pub/Sub. +message PlatformLogsSettings { + // Severity levels of Platform Logs. + enum Severity { + // Default value. Logs level is unspecified. Logs will be disabled. + SEVERITY_UNSPECIFIED = 0; + + // Logs will be disabled. + DISABLED = 1; + + // Debug logs and higher-severity logs will be written. + DEBUG = 2; + + // Info logs and higher-severity logs will be written. + INFO = 3; + + // Warning logs and higher-severity logs will be written. + WARNING = 4; + + // Only error logs will be written. + ERROR = 5; + } + + // Optional. The minimum severity level of Platform Logs that will be written. + Severity severity = 1 [(google.api.field_behavior) = OPTIONAL]; +} + +// Payload of the Platform Log entry sent when a failure is encountered while +// ingesting. +message IngestionFailureEvent { + // Specifies the reason why some data may have been left out of + // the desired Pub/Sub message due to the API message limits + // (https://cloud.google.com/pubsub/quotas#resource_limits). For example, + // when the number of attributes is larger than 100, the number of + // attributes is truncated to 100 to respect the limit on the attribute count. + // Other attribute limits are treated similarly. When the size of the desired + // message would've been larger than 10MB, the message won't be published at + // all, and ingestion of the subsequent messages will proceed as normal. + message ApiViolationReason {} + + // Set when an Avro file is unsupported or its format is not valid. When this + // occurs, one or more Avro objects won't be ingested. + message AvroFailureReason {} + + // Failure when ingesting from a Cloud Storage source. + message CloudStorageFailure { + // Optional. Name of the Cloud Storage bucket used for ingestion. + string bucket = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Name of the Cloud Storage object which contained the section + // that couldn't be ingested. + string object_name = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Generation of the Cloud Storage object which contained the + // section that couldn't be ingested. + int64 object_generation = 3 [(google.api.field_behavior) = OPTIONAL]; + + // Reason why ingestion failed for the specified object. + oneof reason { + // Optional. Failure encountered when parsing an Avro file. + AvroFailureReason avro_failure_reason = 5 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The Pub/Sub API limits prevented the desired message from + // being published. + ApiViolationReason api_violation_reason = 6 + [(google.api.field_behavior) = OPTIONAL]; + } + } + + // Required. Name of the import topic. Format is: + // projects/{project_name}/topics/{topic_name}. + string topic = 1 [(google.api.field_behavior) = REQUIRED]; + + // Required. Error details explaining why ingestion to Pub/Sub has failed. + string error_message = 2 [(google.api.field_behavior) = REQUIRED]; + + oneof failure { + // Optional. Failure when ingesting from Cloud Storage. + CloudStorageFailure cloud_storage_failure = 3 + [(google.api.field_behavior) = OPTIONAL]; + } +} + +// A topic resource. +message Topic { + option (google.api.resource) = { + type: "pubsub.googleapis.com/Topic" + pattern: "projects/{project}/topics/{topic}" + pattern: "_deleted-topic_" + }; + + // The state of the topic. + enum State { + // Default value. This value is unused. + STATE_UNSPECIFIED = 0; + + // The topic does not have any persistent errors. + ACTIVE = 1; + + // Ingestion from the data source has encountered a permanent error. + // See the more detailed error state in the corresponding ingestion + // source configuration. + INGESTION_RESOURCE_ERROR = 2; + } + + // Required. The name of the topic. It must have the format + // `"projects/{project}/topics/{topic}"`. `{topic}` must start with a letter, + // and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`), + // underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent + // signs (`%`). It must be between 3 and 255 characters in length, and it + // must not start with `"goog"`. + string name = 1 [(google.api.field_behavior) = REQUIRED]; + + // Optional. See [Creating and managing labels] + // (https://cloud.google.com/pubsub/docs/labels). + map labels = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Policy constraining the set of Google Cloud Platform regions + // where messages published to the topic may be stored. If not present, then + // no constraints are in effect. + MessageStoragePolicy message_storage_policy = 3 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The resource name of the Cloud KMS CryptoKey to be used to + // protect access to messages published on this topic. + // + // The expected format is `projects/*/locations/*/keyRings/*/cryptoKeys/*`. + string kms_key_name = 5 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Settings for validating messages published against a schema. + SchemaSettings schema_settings = 6 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Reserved for future use. This field is set only in responses from + // the server; it is ignored if it is set in any requests. + bool satisfies_pzs = 7 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Indicates the minimum duration to retain a message after it is + // published to the topic. If this field is set, messages published to the + // topic in the last `message_retention_duration` are always available to + // subscribers. For instance, it allows any attached subscription to [seek to + // a + // timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time) + // that is up to `message_retention_duration` in the past. If this field is + // not set, message retention is controlled by settings on individual + // subscriptions. Cannot be more than 31 days or less than 10 minutes. + google.protobuf.Duration message_retention_duration = 8 + [(google.api.field_behavior) = OPTIONAL]; + + // Output only. An output-only field indicating the state of the topic. + State state = 9 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Optional. Settings for ingestion from a data source into this topic. + IngestionDataSourceSettings ingestion_data_source_settings = 10 + [(google.api.field_behavior) = OPTIONAL]; +} + +// A message that is published by publishers and consumed by subscribers. The +// message must contain either a non-empty data field or at least one attribute. +// Note that client libraries represent this object differently +// depending on the language. See the corresponding [client library +// documentation](https://cloud.google.com/pubsub/docs/reference/libraries) for +// more information. See [quotas and limits] +// (https://cloud.google.com/pubsub/quotas) for more information about message +// limits. +message PubsubMessage { + // Optional. The message data field. If this field is empty, the message must + // contain at least one attribute. + bytes data = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Attributes for this message. If this field is empty, the message + // must contain non-empty data. This can be used to filter messages on the + // subscription. + map attributes = 2 [(google.api.field_behavior) = OPTIONAL]; + + // ID of this message, assigned by the server when the message is published. + // Guaranteed to be unique within the topic. This value may be read by a + // subscriber that receives a `PubsubMessage` via a `Pull` call or a push + // delivery. It must not be populated by the publisher in a `Publish` call. + string message_id = 3; + + // The time at which the message was published, populated by the server when + // it receives the `Publish` call. It must not be populated by the + // publisher in a `Publish` call. + google.protobuf.Timestamp publish_time = 4; + + // Optional. If non-empty, identifies related messages for which publish order + // should be respected. If a `Subscription` has `enable_message_ordering` set + // to `true`, messages published with the same non-empty `ordering_key` value + // will be delivered to subscribers in the order in which they are received by + // the Pub/Sub system. All `PubsubMessage`s published in a given + // `PublishRequest` must specify the same `ordering_key` value. For more + // information, see [ordering + // messages](https://cloud.google.com/pubsub/docs/ordering). + string ordering_key = 5 [(google.api.field_behavior) = OPTIONAL]; +} + +// Request for the GetTopic method. +message GetTopicRequest { + // Required. The name of the topic to get. + // Format is `projects/{project}/topics/{topic}`. + string topic = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Topic" } + ]; +} + +// Request for the UpdateTopic method. +message UpdateTopicRequest { + // Required. The updated topic object. + Topic topic = 1 [(google.api.field_behavior) = REQUIRED]; + + // Required. Indicates which fields in the provided topic to update. Must be + // specified and non-empty. Note that if `update_mask` contains + // "message_storage_policy" but the `message_storage_policy` is not set in + // the `topic` provided above, then the updated value is determined by the + // policy configured at the project or organization level. + google.protobuf.FieldMask update_mask = 2 + [(google.api.field_behavior) = REQUIRED]; +} + +// Request for the Publish method. +message PublishRequest { + // Required. The messages in the request will be published on this topic. + // Format is `projects/{project}/topics/{topic}`. + string topic = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Topic" } + ]; + + // Required. The messages to publish. + repeated PubsubMessage messages = 2 [(google.api.field_behavior) = REQUIRED]; +} + +// Response for the `Publish` method. +message PublishResponse { + // Optional. The server-assigned ID of each published message, in the same + // order as the messages in the request. IDs are guaranteed to be unique + // within the topic. + repeated string message_ids = 1 [(google.api.field_behavior) = OPTIONAL]; +} + +// Request for the `ListTopics` method. +message ListTopicsRequest { + // Required. The name of the project in which to list topics. + // Format is `projects/{project-id}`. + string project = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "cloudresourcemanager.googleapis.com/Project" + } + ]; + + // Optional. Maximum number of topics to return. + int32 page_size = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The value returned by the last `ListTopicsResponse`; indicates + // that this is a continuation of a prior `ListTopics` call, and that the + // system should return the next page of data. + string page_token = 3 [(google.api.field_behavior) = OPTIONAL]; +} + +// Response for the `ListTopics` method. +message ListTopicsResponse { + // Optional. The resulting topics. + repeated Topic topics = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. If not empty, indicates that there may be more topics that match + // the request; this value should be passed in a new `ListTopicsRequest`. + string next_page_token = 2 [(google.api.field_behavior) = OPTIONAL]; +} + +// Request for the `ListTopicSubscriptions` method. +message ListTopicSubscriptionsRequest { + // Required. The name of the topic that subscriptions are attached to. + // Format is `projects/{project}/topics/{topic}`. + string topic = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Topic" } + ]; + + // Optional. Maximum number of subscription names to return. + int32 page_size = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The value returned by the last `ListTopicSubscriptionsResponse`; + // indicates that this is a continuation of a prior `ListTopicSubscriptions` + // call, and that the system should return the next page of data. + string page_token = 3 [(google.api.field_behavior) = OPTIONAL]; +} + +// Response for the `ListTopicSubscriptions` method. +message ListTopicSubscriptionsResponse { + // Optional. The names of subscriptions attached to the topic specified in the + // request. + repeated string subscriptions = 1 [ + (google.api.field_behavior) = OPTIONAL, + (google.api.resource_reference) = { + type: "pubsub.googleapis.com/Subscription" + } + ]; + + // Optional. If not empty, indicates that there may be more subscriptions that + // match the request; this value should be passed in a new + // `ListTopicSubscriptionsRequest` to get more subscriptions. + string next_page_token = 2 [(google.api.field_behavior) = OPTIONAL]; +} + +// Request for the `ListTopicSnapshots` method. +message ListTopicSnapshotsRequest { + // Required. The name of the topic that snapshots are attached to. + // Format is `projects/{project}/topics/{topic}`. + string topic = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Topic" } + ]; + + // Optional. Maximum number of snapshot names to return. + int32 page_size = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The value returned by the last `ListTopicSnapshotsResponse`; + // indicates that this is a continuation of a prior `ListTopicSnapshots` call, + // and that the system should return the next page of data. + string page_token = 3 [(google.api.field_behavior) = OPTIONAL]; +} + +// Response for the `ListTopicSnapshots` method. +message ListTopicSnapshotsResponse { + // Optional. The names of the snapshots that match the request. + repeated string snapshots = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. If not empty, indicates that there may be more snapshots that + // match the request; this value should be passed in a new + // `ListTopicSnapshotsRequest` to get more snapshots. + string next_page_token = 2 [(google.api.field_behavior) = OPTIONAL]; +} + +// Request for the `DeleteTopic` method. +message DeleteTopicRequest { + // Required. Name of the topic to delete. + // Format is `projects/{project}/topics/{topic}`. + string topic = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Topic" } + ]; +} + +// Request for the DetachSubscription method. +message DetachSubscriptionRequest { + // Required. The subscription to detach. + // Format is `projects/{project}/subscriptions/{subscription}`. + string subscription = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "pubsub.googleapis.com/Subscription" + } + ]; +} + +// Response for the DetachSubscription method. +// Reserved for future use. +message DetachSubscriptionResponse {} + +// The service that an application uses to manipulate subscriptions and to +// consume messages from a subscription via the `Pull` method or by +// establishing a bi-directional stream using the `StreamingPull` method. +service Subscriber { + option (google.api.default_host) = "pubsub.googleapis.com"; + option (google.api.oauth_scopes) = + "https://www.googleapis.com/auth/cloud-platform," + "https://www.googleapis.com/auth/pubsub"; + + // Creates a subscription to a given topic. See the [resource name rules] + // (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). + // If the subscription already exists, returns `ALREADY_EXISTS`. + // If the corresponding topic doesn't exist, returns `NOT_FOUND`. + // + // If the name is not provided in the request, the server will assign a random + // name for this subscription on the same project as the topic, conforming + // to the [resource name format] + // (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). The + // generated name is populated in the returned Subscription object. Note that + // for REST API requests, you must specify a name in the request. + rpc CreateSubscription(Subscription) returns (Subscription) { + option (google.api.http) = { + put: "/v1/{name=projects/*/subscriptions/*}" + body: "*" + }; + option (google.api.method_signature) = + "name,topic,push_config,ack_deadline_seconds"; + } + + // Gets the configuration details of a subscription. + rpc GetSubscription(GetSubscriptionRequest) returns (Subscription) { + option (google.api.http) = { + get: "/v1/{subscription=projects/*/subscriptions/*}" + }; + option (google.api.method_signature) = "subscription"; + } + + // Updates an existing subscription by updating the fields specified in the + // update mask. Note that certain properties of a subscription, such as its + // topic, are not modifiable. + rpc UpdateSubscription(UpdateSubscriptionRequest) returns (Subscription) { + option (google.api.http) = { + patch: "/v1/{subscription.name=projects/*/subscriptions/*}" + body: "*" + }; + option (google.api.method_signature) = "subscription,update_mask"; + } + + // Lists matching subscriptions. + rpc ListSubscriptions(ListSubscriptionsRequest) + returns (ListSubscriptionsResponse) { + option (google.api.http) = { + get: "/v1/{project=projects/*}/subscriptions" + }; + option (google.api.method_signature) = "project"; + } + + // Deletes an existing subscription. All messages retained in the subscription + // are immediately dropped. Calls to `Pull` after deletion will return + // `NOT_FOUND`. After a subscription is deleted, a new one may be created with + // the same name, but the new one has no association with the old + // subscription or its topic unless the same topic is specified. + rpc DeleteSubscription(DeleteSubscriptionRequest) + returns (google.protobuf.Empty) { + option (google.api.http) = { + delete: "/v1/{subscription=projects/*/subscriptions/*}" + }; + option (google.api.method_signature) = "subscription"; + } + + // Modifies the ack deadline for a specific message. This method is useful + // to indicate that more time is needed to process a message by the + // subscriber, or to make the message available for redelivery if the + // processing was interrupted. Note that this does not modify the + // subscription-level `ackDeadlineSeconds` used for subsequent messages. + rpc ModifyAckDeadline(ModifyAckDeadlineRequest) + returns (google.protobuf.Empty) { + option (google.api.http) = { + post: "/v1/{subscription=projects/*/subscriptions/*}:modifyAckDeadline" + body: "*" + }; + option (google.api.method_signature) = + "subscription,ack_ids,ack_deadline_seconds"; + } + + // Acknowledges the messages associated with the `ack_ids` in the + // `AcknowledgeRequest`. The Pub/Sub system can remove the relevant messages + // from the subscription. + // + // Acknowledging a message whose ack deadline has expired may succeed, + // but such a message may be redelivered later. Acknowledging a message more + // than once will not result in an error. + rpc Acknowledge(AcknowledgeRequest) returns (google.protobuf.Empty) { + option (google.api.http) = { + post: "/v1/{subscription=projects/*/subscriptions/*}:acknowledge" + body: "*" + }; + option (google.api.method_signature) = "subscription,ack_ids"; + } + + // Pulls messages from the server. + rpc Pull(PullRequest) returns (PullResponse) { + option (google.api.http) = { + post: "/v1/{subscription=projects/*/subscriptions/*}:pull" + body: "*" + }; + option (google.api.method_signature) = + "subscription,return_immediately,max_messages"; + option (google.api.method_signature) = "subscription,max_messages"; + } + + // Establishes a stream with the server, which sends messages down to the + // client. The client streams acknowledgements and ack deadline modifications + // back to the server. The server will close the stream and return the status + // on any error. The server may close the stream with status `UNAVAILABLE` to + // reassign server-side resources, in which case, the client should + // re-establish the stream. Flow control can be achieved by configuring the + // underlying RPC channel. + rpc StreamingPull(stream StreamingPullRequest) + returns (stream StreamingPullResponse) {} + + // Modifies the `PushConfig` for a specified subscription. + // + // This may be used to change a push subscription to a pull one (signified by + // an empty `PushConfig`) or vice versa, or change the endpoint URL and other + // attributes of a push subscription. Messages will accumulate for delivery + // continuously through the call regardless of changes to the `PushConfig`. + rpc ModifyPushConfig(ModifyPushConfigRequest) + returns (google.protobuf.Empty) { + option (google.api.http) = { + post: "/v1/{subscription=projects/*/subscriptions/*}:modifyPushConfig" + body: "*" + }; + option (google.api.method_signature) = "subscription,push_config"; + } + + // Gets the configuration details of a snapshot. Snapshots are used in + // [Seek](https://cloud.google.com/pubsub/docs/replay-overview) operations, + // which allow you to manage message acknowledgments in bulk. That is, you can + // set the acknowledgment state of messages in an existing subscription to the + // state captured by a snapshot. + rpc GetSnapshot(GetSnapshotRequest) returns (Snapshot) { + option (google.api.http) = { + get: "/v1/{snapshot=projects/*/snapshots/*}" + }; + option (google.api.method_signature) = "snapshot"; + } + + // Lists the existing snapshots. Snapshots are used in [Seek]( + // https://cloud.google.com/pubsub/docs/replay-overview) operations, which + // allow you to manage message acknowledgments in bulk. That is, you can set + // the acknowledgment state of messages in an existing subscription to the + // state captured by a snapshot. + rpc ListSnapshots(ListSnapshotsRequest) returns (ListSnapshotsResponse) { + option (google.api.http) = { + get: "/v1/{project=projects/*}/snapshots" + }; + option (google.api.method_signature) = "project"; + } + + // Creates a snapshot from the requested subscription. Snapshots are used in + // [Seek](https://cloud.google.com/pubsub/docs/replay-overview) operations, + // which allow you to manage message acknowledgments in bulk. That is, you can + // set the acknowledgment state of messages in an existing subscription to the + // state captured by a snapshot. + // If the snapshot already exists, returns `ALREADY_EXISTS`. + // If the requested subscription doesn't exist, returns `NOT_FOUND`. + // If the backlog in the subscription is too old -- and the resulting snapshot + // would expire in less than 1 hour -- then `FAILED_PRECONDITION` is returned. + // See also the `Snapshot.expire_time` field. If the name is not provided in + // the request, the server will assign a random + // name for this snapshot on the same project as the subscription, conforming + // to the [resource name format] + // (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). The + // generated name is populated in the returned Snapshot object. Note that for + // REST API requests, you must specify a name in the request. + rpc CreateSnapshot(CreateSnapshotRequest) returns (Snapshot) { + option (google.api.http) = { + put: "/v1/{name=projects/*/snapshots/*}" + body: "*" + }; + option (google.api.method_signature) = "name,subscription"; + } + + // Updates an existing snapshot by updating the fields specified in the update + // mask. Snapshots are used in + // [Seek](https://cloud.google.com/pubsub/docs/replay-overview) operations, + // which allow you to manage message acknowledgments in bulk. That is, you can + // set the acknowledgment state of messages in an existing subscription to the + // state captured by a snapshot. + rpc UpdateSnapshot(UpdateSnapshotRequest) returns (Snapshot) { + option (google.api.http) = { + patch: "/v1/{snapshot.name=projects/*/snapshots/*}" + body: "*" + }; + option (google.api.method_signature) = "snapshot,update_mask"; + } + + // Removes an existing snapshot. Snapshots are used in [Seek] + // (https://cloud.google.com/pubsub/docs/replay-overview) operations, which + // allow you to manage message acknowledgments in bulk. That is, you can set + // the acknowledgment state of messages in an existing subscription to the + // state captured by a snapshot. + // When the snapshot is deleted, all messages retained in the snapshot + // are immediately dropped. After a snapshot is deleted, a new one may be + // created with the same name, but the new one has no association with the old + // snapshot or its subscription, unless the same subscription is specified. + rpc DeleteSnapshot(DeleteSnapshotRequest) returns (google.protobuf.Empty) { + option (google.api.http) = { + delete: "/v1/{snapshot=projects/*/snapshots/*}" + }; + option (google.api.method_signature) = "snapshot"; + } + + // Seeks an existing subscription to a point in time or to a given snapshot, + // whichever is provided in the request. Snapshots are used in [Seek] + // (https://cloud.google.com/pubsub/docs/replay-overview) operations, which + // allow you to manage message acknowledgments in bulk. That is, you can set + // the acknowledgment state of messages in an existing subscription to the + // state captured by a snapshot. Note that both the subscription and the + // snapshot must be on the same topic. + rpc Seek(SeekRequest) returns (SeekResponse) { + option (google.api.http) = { + post: "/v1/{subscription=projects/*/subscriptions/*}:seek" + body: "*" + }; + } +} + +// A subscription resource. If none of `push_config`, `bigquery_config`, or +// `cloud_storage_config` is set, then the subscriber will pull and ack messages +// using API methods. At most one of these fields may be set. +message Subscription { + option (google.api.resource) = { + type: "pubsub.googleapis.com/Subscription" + pattern: "projects/{project}/subscriptions/{subscription}" + }; + + // Possible states for a subscription. + enum State { + // Default value. This value is unused. + STATE_UNSPECIFIED = 0; + + // The subscription can actively receive messages + ACTIVE = 1; + + // The subscription cannot receive messages because of an error with the + // resource to which it pushes messages. See the more detailed error state + // in the corresponding configuration. + RESOURCE_ERROR = 2; + } + + // Information about an associated Analytics Hub subscription + // (https://cloud.google.com/bigquery/docs/analytics-hub-manage-subscriptions). + message AnalyticsHubSubscriptionInfo { + // Optional. The name of the associated Analytics Hub listing resource. + // Pattern: + // "projects/{project}/locations/{location}/dataExchanges/{data_exchange}/listings/{listing}" + string listing = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The name of the associated Analytics Hub subscription resource. + // Pattern: + // "projects/{project}/locations/{location}/subscriptions/{subscription}" + string subscription = 2 [(google.api.field_behavior) = OPTIONAL]; + } + + // Required. The name of the subscription. It must have the format + // `"projects/{project}/subscriptions/{subscription}"`. `{subscription}` must + // start with a letter, and contain only letters (`[A-Za-z]`), numbers + // (`[0-9]`), dashes (`-`), underscores (`_`), periods (`.`), tildes (`~`), + // plus (`+`) or percent signs (`%`). It must be between 3 and 255 characters + // in length, and it must not start with `"goog"`. + string name = 1 [(google.api.field_behavior) = REQUIRED]; + + // Required. The name of the topic from which this subscription is receiving + // messages. Format is `projects/{project}/topics/{topic}`. The value of this + // field will be `_deleted-topic_` if the topic has been deleted. + string topic = 2 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Topic" } + ]; + + // Optional. If push delivery is used with this subscription, this field is + // used to configure it. + PushConfig push_config = 4 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. If delivery to BigQuery is used with this subscription, this + // field is used to configure it. + BigQueryConfig bigquery_config = 18 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. If delivery to Google Cloud Storage is used with this + // subscription, this field is used to configure it. + CloudStorageConfig cloud_storage_config = 22 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The approximate amount of time (on a best-effort basis) Pub/Sub + // waits for the subscriber to acknowledge receipt before resending the + // message. In the interval after the message is delivered and before it is + // acknowledged, it is considered to be _outstanding_. During that time + // period, the message will not be redelivered (on a best-effort basis). + // + // For pull subscriptions, this value is used as the initial value for the ack + // deadline. To override this value for a given message, call + // `ModifyAckDeadline` with the corresponding `ack_id` if using + // non-streaming pull or send the `ack_id` in a + // `StreamingModifyAckDeadlineRequest` if using streaming pull. + // The minimum custom deadline you can specify is 10 seconds. + // The maximum custom deadline you can specify is 600 seconds (10 minutes). + // If this parameter is 0, a default value of 10 seconds is used. + // + // For push delivery, this value is also used to set the request timeout for + // the call to the push endpoint. + // + // If the subscriber never acknowledges the message, the Pub/Sub + // system will eventually redeliver the message. + int32 ack_deadline_seconds = 5 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Indicates whether to retain acknowledged messages. If true, then + // messages are not expunged from the subscription's backlog, even if they are + // acknowledged, until they fall out of the `message_retention_duration` + // window. This must be true if you would like to [`Seek` to a timestamp] + // (https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time) in + // the past to replay previously-acknowledged messages. + bool retain_acked_messages = 7 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. How long to retain unacknowledged messages in the subscription's + // backlog, from the moment a message is published. If `retain_acked_messages` + // is true, then this also configures the retention of acknowledged messages, + // and thus configures how far back in time a `Seek` can be done. Defaults to + // 7 days. Cannot be more than 31 days or less than 10 minutes. + google.protobuf.Duration message_retention_duration = 8 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. See [Creating and managing + // labels](https://cloud.google.com/pubsub/docs/labels). + map labels = 9 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. If true, messages published with the same `ordering_key` in + // `PubsubMessage` will be delivered to the subscribers in the order in which + // they are received by the Pub/Sub system. Otherwise, they may be delivered + // in any order. + bool enable_message_ordering = 10 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. A policy that specifies the conditions for this subscription's + // expiration. A subscription is considered active as long as any connected + // subscriber is successfully consuming messages from the subscription or is + // issuing operations on the subscription. If `expiration_policy` is not set, + // a *default policy* with `ttl` of 31 days will be used. The minimum allowed + // value for `expiration_policy.ttl` is 1 day. If `expiration_policy` is set, + // but `expiration_policy.ttl` is not set, the subscription never expires. + ExpirationPolicy expiration_policy = 11 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. An expression written in the Pub/Sub [filter + // language](https://cloud.google.com/pubsub/docs/filtering). If non-empty, + // then only `PubsubMessage`s whose `attributes` field matches the filter are + // delivered on this subscription. If empty, then no messages are filtered + // out. + string filter = 12 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. A policy that specifies the conditions for dead lettering + // messages in this subscription. If dead_letter_policy is not set, dead + // lettering is disabled. + // + // The Pub/Sub service account associated with this subscriptions's + // parent project (i.e., + // service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) must have + // permission to Acknowledge() messages on this subscription. + DeadLetterPolicy dead_letter_policy = 13 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. A policy that specifies how Pub/Sub retries message delivery for + // this subscription. + // + // If not set, the default retry policy is applied. This generally implies + // that messages will be retried as soon as possible for healthy subscribers. + // RetryPolicy will be triggered on NACKs or acknowledgement deadline + // exceeded events for a given message. + RetryPolicy retry_policy = 14 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Indicates whether the subscription is detached from its topic. + // Detached subscriptions don't receive messages from their topic and don't + // retain any backlog. `Pull` and `StreamingPull` requests will return + // FAILED_PRECONDITION. If the subscription is a push subscription, pushes to + // the endpoint will not be made. + bool detached = 15 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. If true, Pub/Sub provides the following guarantees for the + // delivery of a message with a given value of `message_id` on this + // subscription: + // + // * The message sent to a subscriber is guaranteed not to be resent + // before the message's acknowledgement deadline expires. + // * An acknowledged message will not be resent to a subscriber. + // + // Note that subscribers may still receive multiple copies of a message + // when `enable_exactly_once_delivery` is true if the message was published + // multiple times by a publisher client. These copies are considered distinct + // by Pub/Sub and have distinct `message_id` values. + bool enable_exactly_once_delivery = 16 + [(google.api.field_behavior) = OPTIONAL]; + + // Output only. Indicates the minimum duration for which a message is retained + // after it is published to the subscription's topic. If this field is set, + // messages published to the subscription's topic in the last + // `topic_message_retention_duration` are always available to subscribers. See + // the `message_retention_duration` field in `Topic`. This field is set only + // in responses from the server; it is ignored if it is set in any requests. + google.protobuf.Duration topic_message_retention_duration = 17 + [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Output only. An output-only field indicating whether or not the + // subscription can receive messages. + State state = 19 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Output only. Information about the associated Analytics Hub subscription. + // Only set if the subscritpion is created by Analytics Hub. + AnalyticsHubSubscriptionInfo analytics_hub_subscription_info = 23 + [(google.api.field_behavior) = OUTPUT_ONLY]; +} + +// A policy that specifies how Pub/Sub retries message delivery. +// +// Retry delay will be exponential based on provided minimum and maximum +// backoffs. https://en.wikipedia.org/wiki/Exponential_backoff. +// +// RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded +// events for a given message. +// +// Retry Policy is implemented on a best effort basis. At times, the delay +// between consecutive deliveries may not match the configuration. That is, +// delay can be more or less than configured backoff. +message RetryPolicy { + // Optional. The minimum delay between consecutive deliveries of a given + // message. Value should be between 0 and 600 seconds. Defaults to 10 seconds. + google.protobuf.Duration minimum_backoff = 1 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The maximum delay between consecutive deliveries of a given + // message. Value should be between 0 and 600 seconds. Defaults to 600 + // seconds. + google.protobuf.Duration maximum_backoff = 2 + [(google.api.field_behavior) = OPTIONAL]; +} + +// Dead lettering is done on a best effort basis. The same message might be +// dead lettered multiple times. +// +// If validation on any of the fields fails at subscription creation/updation, +// the create/update subscription request will fail. +message DeadLetterPolicy { + // Optional. The name of the topic to which dead letter messages should be + // published. Format is `projects/{project}/topics/{topic}`.The Pub/Sub + // service account associated with the enclosing subscription's parent project + // (i.e., service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) must + // have permission to Publish() to this topic. + // + // The operation will fail if the topic does not exist. + // Users should ensure that there is a subscription attached to this topic + // since messages published to a topic with no subscriptions are lost. + string dead_letter_topic = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The maximum number of delivery attempts for any message. The + // value must be between 5 and 100. + // + // The number of delivery attempts is defined as 1 + (the sum of number of + // NACKs and number of times the acknowledgement deadline has been exceeded + // for the message). + // + // A NACK is any call to ModifyAckDeadline with a 0 deadline. Note that + // client libraries may automatically extend ack_deadlines. + // + // This field will be honored on a best effort basis. + // + // If this parameter is 0, a default value of 5 is used. + int32 max_delivery_attempts = 2 [(google.api.field_behavior) = OPTIONAL]; +} + +// A policy that specifies the conditions for resource expiration (i.e., +// automatic resource deletion). +message ExpirationPolicy { + // Optional. Specifies the "time-to-live" duration for an associated resource. + // The resource expires if it is not active for a period of `ttl`. The + // definition of "activity" depends on the type of the associated resource. + // The minimum and maximum allowed values for `ttl` depend on the type of the + // associated resource, as well. If `ttl` is not set, the associated resource + // never expires. + google.protobuf.Duration ttl = 1 [(google.api.field_behavior) = OPTIONAL]; +} + +// Configuration for a push delivery endpoint. +message PushConfig { + // Contains information needed for generating an + // [OpenID Connect + // token](https://developers.google.com/identity/protocols/OpenIDConnect). + message OidcToken { + // Optional. [Service account + // email](https://cloud.google.com/iam/docs/service-accounts) + // used for generating the OIDC token. For more information + // on setting up authentication, see + // [Push subscriptions](https://cloud.google.com/pubsub/docs/push). + string service_account_email = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Audience to be used when generating OIDC token. The audience + // claim identifies the recipients that the JWT is intended for. The + // audience value is a single case-sensitive string. Having multiple values + // (array) for the audience field is not supported. More info about the OIDC + // JWT token audience here: + // https://tools.ietf.org/html/rfc7519#section-4.1.3 Note: if not specified, + // the Push endpoint URL will be used. + string audience = 2 [(google.api.field_behavior) = OPTIONAL]; + } + + // The payload to the push endpoint is in the form of the JSON representation + // of a PubsubMessage + // (https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage). + message PubsubWrapper {} + + // Sets the `data` field as the HTTP body for delivery. + message NoWrapper { + // Optional. When true, writes the Pub/Sub message metadata to + // `x-goog-pubsub-:` headers of the HTTP request. Writes the + // Pub/Sub message attributes to `:` headers of the HTTP request. + bool write_metadata = 1 [(google.api.field_behavior) = OPTIONAL]; + } + + // Optional. A URL locating the endpoint to which messages should be pushed. + // For example, a Webhook endpoint might use `https://example.com/push`. + string push_endpoint = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Endpoint configuration attributes that can be used to control + // different aspects of the message delivery. + // + // The only currently supported attribute is `x-goog-version`, which you can + // use to change the format of the pushed message. This attribute + // indicates the version of the data expected by the endpoint. This + // controls the shape of the pushed message (i.e., its fields and metadata). + // + // If not present during the `CreateSubscription` call, it will default to + // the version of the Pub/Sub API used to make such call. If not present in a + // `ModifyPushConfig` call, its value will not be changed. `GetSubscription` + // calls will always return a valid version, even if the subscription was + // created without this attribute. + // + // The only supported values for the `x-goog-version` attribute are: + // + // * `v1beta1`: uses the push format defined in the v1beta1 Pub/Sub API. + // * `v1` or `v1beta2`: uses the push format defined in the v1 Pub/Sub API. + // + // For example: + // `attributes { "x-goog-version": "v1" }` + map attributes = 2 [(google.api.field_behavior) = OPTIONAL]; + + // An authentication method used by push endpoints to verify the source of + // push requests. This can be used with push endpoints that are private by + // default to allow requests only from the Pub/Sub system, for example. + // This field is optional and should be set only by users interested in + // authenticated push. + oneof authentication_method { + // Optional. If specified, Pub/Sub will generate and attach an OIDC JWT + // token as an `Authorization` header in the HTTP request for every pushed + // message. + OidcToken oidc_token = 3 [(google.api.field_behavior) = OPTIONAL]; + } + + // The format of the delivered message to the push endpoint is defined by + // the chosen wrapper. When unset, `PubsubWrapper` is used. + oneof wrapper { + // Optional. When set, the payload to the push endpoint is in the form of + // the JSON representation of a PubsubMessage + // (https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage). + PubsubWrapper pubsub_wrapper = 4 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. When set, the payload to the push endpoint is not wrapped. + NoWrapper no_wrapper = 5 [(google.api.field_behavior) = OPTIONAL]; + } +} + +// Configuration for a BigQuery subscription. +message BigQueryConfig { + // Possible states for a BigQuery subscription. + enum State { + // Default value. This value is unused. + STATE_UNSPECIFIED = 0; + + // The subscription can actively send messages to BigQuery + ACTIVE = 1; + + // Cannot write to the BigQuery table because of permission denied errors. + // This can happen if + // - Pub/Sub SA has not been granted the [appropriate BigQuery IAM + // permissions](https://cloud.google.com/pubsub/docs/create-subscription#assign_bigquery_service_account) + // - bigquery.googleapis.com API is not enabled for the project + // ([instructions](https://cloud.google.com/service-usage/docs/enable-disable)) + PERMISSION_DENIED = 2; + + // Cannot write to the BigQuery table because it does not exist. + NOT_FOUND = 3; + + // Cannot write to the BigQuery table due to a schema mismatch. + SCHEMA_MISMATCH = 4; + + // Cannot write to the destination because enforce_in_transit is set to true + // and the destination locations are not in the allowed regions. + IN_TRANSIT_LOCATION_RESTRICTION = 5; + } + + // Optional. The name of the table to which to write data, of the form + // {projectId}.{datasetId}.{tableId} + string table = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. When true, use the topic's schema as the columns to write to in + // BigQuery, if it exists. `use_topic_schema` and `use_table_schema` cannot be + // enabled at the same time. + bool use_topic_schema = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. When true, write the subscription name, message_id, publish_time, + // attributes, and ordering_key to additional columns in the table. The + // subscription name, message_id, and publish_time fields are put in their own + // columns while all other message properties (other than data) are written to + // a JSON object in the attributes column. + bool write_metadata = 3 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. When true and use_topic_schema is true, any fields that are a + // part of the topic schema that are not part of the BigQuery table schema are + // dropped when writing to BigQuery. Otherwise, the schemas must be kept in + // sync and any messages with extra fields are not written and remain in the + // subscription's backlog. + bool drop_unknown_fields = 4 [(google.api.field_behavior) = OPTIONAL]; + + // Output only. An output-only field that indicates whether or not the + // subscription can receive messages. + State state = 5 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Optional. When true, use the BigQuery table's schema as the columns to + // write to in BigQuery. `use_table_schema` and `use_topic_schema` cannot be + // enabled at the same time. + bool use_table_schema = 6 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The service account to use to write to BigQuery. The subscription + // creator or updater that specifies this field must have + // `iam.serviceAccounts.actAs` permission on the service account. If not + // specified, the Pub/Sub [service + // agent](https://cloud.google.com/iam/docs/service-agents), + // service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com, is used. + string service_account_email = 7 [(google.api.field_behavior) = OPTIONAL]; +} + +// Configuration for a Cloud Storage subscription. +message CloudStorageConfig { + // Configuration for writing message data in text format. + // Message payloads will be written to files as raw text, separated by a + // newline. + message TextConfig {} + + // Configuration for writing message data in Avro format. + // Message payloads and metadata will be written to files as an Avro binary. + message AvroConfig { + // Optional. When true, write the subscription name, message_id, + // publish_time, attributes, and ordering_key as additional fields in the + // output. The subscription name, message_id, and publish_time fields are + // put in their own fields while all other message properties other than + // data (for example, an ordering_key, if present) are added as entries in + // the attributes map. + bool write_metadata = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. When true, the output Cloud Storage file will be serialized + // using the topic schema, if it exists. + bool use_topic_schema = 2 [(google.api.field_behavior) = OPTIONAL]; + } + + // Possible states for a Cloud Storage subscription. + enum State { + // Default value. This value is unused. + STATE_UNSPECIFIED = 0; + + // The subscription can actively send messages to Cloud Storage. + ACTIVE = 1; + + // Cannot write to the Cloud Storage bucket because of permission denied + // errors. + PERMISSION_DENIED = 2; + + // Cannot write to the Cloud Storage bucket because it does not exist. + NOT_FOUND = 3; + + // Cannot write to the destination because enforce_in_transit is set to true + // and the destination locations are not in the allowed regions. + IN_TRANSIT_LOCATION_RESTRICTION = 4; + + // Cannot write to the Cloud Storage bucket due to an incompatibility + // between the topic schema and subscription settings. + SCHEMA_MISMATCH = 5; + } + + // Required. User-provided name for the Cloud Storage bucket. + // The bucket must be created by the user. The bucket name must be without + // any prefix like "gs://". See the [bucket naming + // requirements] (https://cloud.google.com/storage/docs/buckets#naming). + string bucket = 1 [(google.api.field_behavior) = REQUIRED]; + + // Optional. User-provided prefix for Cloud Storage filename. See the [object + // naming requirements](https://cloud.google.com/storage/docs/objects#naming). + string filename_prefix = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. User-provided suffix for Cloud Storage filename. See the [object + // naming requirements](https://cloud.google.com/storage/docs/objects#naming). + // Must not end in "/". + string filename_suffix = 3 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. User-provided format string specifying how to represent datetimes + // in Cloud Storage filenames. See the [datetime format + // guidance](https://cloud.google.com/pubsub/docs/create-cloudstorage-subscription#file_names). + string filename_datetime_format = 10 [(google.api.field_behavior) = OPTIONAL]; + + // Defaults to text format. + oneof output_format { + // Optional. If set, message data will be written to Cloud Storage in text + // format. + TextConfig text_config = 4 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. If set, message data will be written to Cloud Storage in Avro + // format. + AvroConfig avro_config = 5 [(google.api.field_behavior) = OPTIONAL]; + } + + // Optional. The maximum duration that can elapse before a new Cloud Storage + // file is created. Min 1 minute, max 10 minutes, default 5 minutes. May not + // exceed the subscription's acknowledgement deadline. + google.protobuf.Duration max_duration = 6 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The maximum bytes that can be written to a Cloud Storage file + // before a new file is created. Min 1 KB, max 10 GiB. The max_bytes limit may + // be exceeded in cases where messages are larger than the limit. + int64 max_bytes = 7 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The maximum number of messages that can be written to a Cloud + // Storage file before a new file is created. Min 1000 messages. + int64 max_messages = 8 [(google.api.field_behavior) = OPTIONAL]; + + // Output only. An output-only field that indicates whether or not the + // subscription can receive messages. + State state = 9 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Optional. The service account to use to write to Cloud Storage. The + // subscription creator or updater that specifies this field must have + // `iam.serviceAccounts.actAs` permission on the service account. If not + // specified, the Pub/Sub + // [service agent](https://cloud.google.com/iam/docs/service-agents), + // service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com, is used. + string service_account_email = 11 [(google.api.field_behavior) = OPTIONAL]; +} + +// A message and its corresponding acknowledgment ID. +message ReceivedMessage { + // Optional. This ID can be used to acknowledge the received message. + string ack_id = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The message. + PubsubMessage message = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The approximate number of times that Pub/Sub has attempted to + // deliver the associated message to a subscriber. + // + // More precisely, this is 1 + (number of NACKs) + + // (number of ack_deadline exceeds) for this message. + // + // A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline + // exceeds event is whenever a message is not acknowledged within + // ack_deadline. Note that ack_deadline is initially + // Subscription.ackDeadlineSeconds, but may get extended automatically by + // the client library. + // + // Upon the first delivery of a given message, `delivery_attempt` will have a + // value of 1. The value is calculated at best effort and is approximate. + // + // If a DeadLetterPolicy is not set on the subscription, this will be 0. + int32 delivery_attempt = 3 [(google.api.field_behavior) = OPTIONAL]; +} + +// Request for the GetSubscription method. +message GetSubscriptionRequest { + // Required. The name of the subscription to get. + // Format is `projects/{project}/subscriptions/{sub}`. + string subscription = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "pubsub.googleapis.com/Subscription" + } + ]; +} + +// Request for the UpdateSubscription method. +message UpdateSubscriptionRequest { + // Required. The updated subscription object. + Subscription subscription = 1 [(google.api.field_behavior) = REQUIRED]; + + // Required. Indicates which fields in the provided subscription to update. + // Must be specified and non-empty. + google.protobuf.FieldMask update_mask = 2 + [(google.api.field_behavior) = REQUIRED]; +} + +// Request for the `ListSubscriptions` method. +message ListSubscriptionsRequest { + // Required. The name of the project in which to list subscriptions. + // Format is `projects/{project-id}`. + string project = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "cloudresourcemanager.googleapis.com/Project" + } + ]; + + // Optional. Maximum number of subscriptions to return. + int32 page_size = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The value returned by the last `ListSubscriptionsResponse`; + // indicates that this is a continuation of a prior `ListSubscriptions` call, + // and that the system should return the next page of data. + string page_token = 3 [(google.api.field_behavior) = OPTIONAL]; +} + +// Response for the `ListSubscriptions` method. +message ListSubscriptionsResponse { + // Optional. The subscriptions that match the request. + repeated Subscription subscriptions = 1 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. If not empty, indicates that there may be more subscriptions that + // match the request; this value should be passed in a new + // `ListSubscriptionsRequest` to get more subscriptions. + string next_page_token = 2 [(google.api.field_behavior) = OPTIONAL]; +} + +// Request for the DeleteSubscription method. +message DeleteSubscriptionRequest { + // Required. The subscription to delete. + // Format is `projects/{project}/subscriptions/{sub}`. + string subscription = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "pubsub.googleapis.com/Subscription" + } + ]; +} + +// Request for the ModifyPushConfig method. +message ModifyPushConfigRequest { + // Required. The name of the subscription. + // Format is `projects/{project}/subscriptions/{sub}`. + string subscription = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "pubsub.googleapis.com/Subscription" + } + ]; + + // Required. The push configuration for future deliveries. + // + // An empty `pushConfig` indicates that the Pub/Sub system should + // stop pushing messages from the given subscription and allow + // messages to be pulled and acknowledged - effectively pausing + // the subscription if `Pull` or `StreamingPull` is not called. + PushConfig push_config = 2 [(google.api.field_behavior) = REQUIRED]; +} + +// Request for the `Pull` method. +message PullRequest { + // Required. The subscription from which messages should be pulled. + // Format is `projects/{project}/subscriptions/{sub}`. + string subscription = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "pubsub.googleapis.com/Subscription" + } + ]; + + // Optional. If this field set to true, the system will respond immediately + // even if it there are no messages available to return in the `Pull` + // response. Otherwise, the system may wait (for a bounded amount of time) + // until at least one message is available, rather than returning no messages. + // Warning: setting this field to `true` is discouraged because it adversely + // impacts the performance of `Pull` operations. We recommend that users do + // not set this field. + bool return_immediately = 2 + [deprecated = true, (google.api.field_behavior) = OPTIONAL]; + + // Required. The maximum number of messages to return for this request. Must + // be a positive integer. The Pub/Sub system may return fewer than the number + // specified. + int32 max_messages = 3 [(google.api.field_behavior) = REQUIRED]; +} + +// Response for the `Pull` method. +message PullResponse { + // Optional. Received Pub/Sub messages. The list will be empty if there are no + // more messages available in the backlog, or if no messages could be returned + // before the request timeout. For JSON, the response can be entirely + // empty. The Pub/Sub system may return fewer than the `maxMessages` requested + // even if there are more messages available in the backlog. + repeated ReceivedMessage received_messages = 1 + [(google.api.field_behavior) = OPTIONAL]; +} + +// Request for the ModifyAckDeadline method. +message ModifyAckDeadlineRequest { + // Required. The name of the subscription. + // Format is `projects/{project}/subscriptions/{sub}`. + string subscription = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "pubsub.googleapis.com/Subscription" + } + ]; + + // Required. List of acknowledgment IDs. + repeated string ack_ids = 4 [(google.api.field_behavior) = REQUIRED]; + + // Required. The new ack deadline with respect to the time this request was + // sent to the Pub/Sub system. For example, if the value is 10, the new ack + // deadline will expire 10 seconds after the `ModifyAckDeadline` call was + // made. Specifying zero might immediately make the message available for + // delivery to another subscriber client. This typically results in an + // increase in the rate of message redeliveries (that is, duplicates). + // The minimum deadline you can specify is 0 seconds. + // The maximum deadline you can specify in a single request is 600 seconds + // (10 minutes). + int32 ack_deadline_seconds = 3 [(google.api.field_behavior) = REQUIRED]; +} + +// Request for the Acknowledge method. +message AcknowledgeRequest { + // Required. The subscription whose message is being acknowledged. + // Format is `projects/{project}/subscriptions/{sub}`. + string subscription = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "pubsub.googleapis.com/Subscription" + } + ]; + + // Required. The acknowledgment ID for the messages being acknowledged that + // was returned by the Pub/Sub system in the `Pull` response. Must not be + // empty. + repeated string ack_ids = 2 [(google.api.field_behavior) = REQUIRED]; +} + +// Request for the `StreamingPull` streaming RPC method. This request is used to +// establish the initial stream as well as to stream acknowledgements and ack +// deadline modifications from the client to the server. +message StreamingPullRequest { + // Required. The subscription for which to initialize the new stream. This + // must be provided in the first request on the stream, and must not be set in + // subsequent requests from client to server. + // Format is `projects/{project}/subscriptions/{sub}`. + string subscription = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "pubsub.googleapis.com/Subscription" + } + ]; + + // Optional. List of acknowledgement IDs for acknowledging previously received + // messages (received on this stream or a different stream). If an ack ID has + // expired, the corresponding message may be redelivered later. Acknowledging + // a message more than once will not result in an error. If the + // acknowledgement ID is malformed, the stream will be aborted with status + // `INVALID_ARGUMENT`. + repeated string ack_ids = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The list of new ack deadlines for the IDs listed in + // `modify_deadline_ack_ids`. The size of this list must be the same as the + // size of `modify_deadline_ack_ids`. If it differs the stream will be aborted + // with `INVALID_ARGUMENT`. Each element in this list is applied to the + // element in the same position in `modify_deadline_ack_ids`. The new ack + // deadline is with respect to the time this request was sent to the Pub/Sub + // system. Must be >= 0. For example, if the value is 10, the new ack deadline + // will expire 10 seconds after this request is received. If the value is 0, + // the message is immediately made available for another streaming or + // non-streaming pull request. If the value is < 0 (an error), the stream will + // be aborted with status `INVALID_ARGUMENT`. + repeated int32 modify_deadline_seconds = 3 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. List of acknowledgement IDs whose deadline will be modified based + // on the corresponding element in `modify_deadline_seconds`. This field can + // be used to indicate that more time is needed to process a message by the + // subscriber, or to make the message available for redelivery if the + // processing was interrupted. + repeated string modify_deadline_ack_ids = 4 + [(google.api.field_behavior) = OPTIONAL]; + + // Required. The ack deadline to use for the stream. This must be provided in + // the first request on the stream, but it can also be updated on subsequent + // requests from client to server. The minimum deadline you can specify is 10 + // seconds. The maximum deadline you can specify is 600 seconds (10 minutes). + int32 stream_ack_deadline_seconds = 5 + [(google.api.field_behavior) = REQUIRED]; + + // Optional. A unique identifier that is used to distinguish client instances + // from each other. Only needs to be provided on the initial request. When a + // stream disconnects and reconnects for the same stream, the client_id should + // be set to the same value so that state associated with the old stream can + // be transferred to the new stream. The same client_id should not be used for + // different client instances. + string client_id = 6 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Flow control settings for the maximum number of outstanding + // messages. When there are `max_outstanding_messages` currently sent to the + // streaming pull client that have not yet been acked or nacked, the server + // stops sending more messages. The sending of messages resumes once the + // number of outstanding messages is less than this value. If the value is + // <= 0, there is no limit to the number of outstanding messages. This + // property can only be set on the initial StreamingPullRequest. If it is set + // on a subsequent request, the stream will be aborted with status + // `INVALID_ARGUMENT`. + int64 max_outstanding_messages = 7 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Flow control settings for the maximum number of outstanding + // bytes. When there are `max_outstanding_bytes` or more worth of messages + // currently sent to the streaming pull client that have not yet been acked or + // nacked, the server will stop sending more messages. The sending of messages + // resumes once the number of outstanding bytes is less than this value. If + // the value is <= 0, there is no limit to the number of outstanding bytes. + // This property can only be set on the initial StreamingPullRequest. If it is + // set on a subsequent request, the stream will be aborted with status + // `INVALID_ARGUMENT`. + int64 max_outstanding_bytes = 8 [(google.api.field_behavior) = OPTIONAL]; +} + +// Response for the `StreamingPull` method. This response is used to stream +// messages from the server to the client. +message StreamingPullResponse { + // Acknowledgement IDs sent in one or more previous requests to acknowledge a + // previously received message. + message AcknowledgeConfirmation { + // Optional. Successfully processed acknowledgement IDs. + repeated string ack_ids = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. List of acknowledgement IDs that were malformed or whose + // acknowledgement deadline has expired. + repeated string invalid_ack_ids = 2 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. List of acknowledgement IDs that were out of order. + repeated string unordered_ack_ids = 3 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. List of acknowledgement IDs that failed processing with + // temporary issues. + repeated string temporary_failed_ack_ids = 4 + [(google.api.field_behavior) = OPTIONAL]; + } + + // Acknowledgement IDs sent in one or more previous requests to modify the + // deadline for a specific message. + message ModifyAckDeadlineConfirmation { + // Optional. Successfully processed acknowledgement IDs. + repeated string ack_ids = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. List of acknowledgement IDs that were malformed or whose + // acknowledgement deadline has expired. + repeated string invalid_ack_ids = 2 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. List of acknowledgement IDs that failed processing with + // temporary issues. + repeated string temporary_failed_ack_ids = 3 + [(google.api.field_behavior) = OPTIONAL]; + } + + // Subscription properties sent as part of the response. + message SubscriptionProperties { + // Optional. True iff exactly once delivery is enabled for this + // subscription. + bool exactly_once_delivery_enabled = 1 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. True iff message ordering is enabled for this subscription. + bool message_ordering_enabled = 2 [(google.api.field_behavior) = OPTIONAL]; + } + + // Optional. Received Pub/Sub messages. This will not be empty. + repeated ReceivedMessage received_messages = 1 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. This field will only be set if `enable_exactly_once_delivery` is + // set to `true`. + AcknowledgeConfirmation acknowledge_confirmation = 5 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. This field will only be set if `enable_exactly_once_delivery` is + // set to `true`. + ModifyAckDeadlineConfirmation modify_ack_deadline_confirmation = 3 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Properties associated with this subscription. + SubscriptionProperties subscription_properties = 4 + [(google.api.field_behavior) = OPTIONAL]; +} + +// Request for the `CreateSnapshot` method. +message CreateSnapshotRequest { + // Required. User-provided name for this snapshot. If the name is not provided + // in the request, the server will assign a random name for this snapshot on + // the same project as the subscription. Note that for REST API requests, you + // must specify a name. See the [resource name + // rules](https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). + // Format is `projects/{project}/snapshots/{snap}`. + string name = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Snapshot" } + ]; + + // Required. The subscription whose backlog the snapshot retains. + // Specifically, the created snapshot is guaranteed to retain: + // (a) The existing backlog on the subscription. More precisely, this is + // defined as the messages in the subscription's backlog that are + // unacknowledged upon the successful completion of the + // `CreateSnapshot` request; as well as: + // (b) Any messages published to the subscription's topic following the + // successful completion of the CreateSnapshot request. + // Format is `projects/{project}/subscriptions/{sub}`. + string subscription = 2 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "pubsub.googleapis.com/Subscription" + } + ]; + + // Optional. See [Creating and managing + // labels](https://cloud.google.com/pubsub/docs/labels). + map labels = 3 [(google.api.field_behavior) = OPTIONAL]; +} + +// Request for the UpdateSnapshot method. +message UpdateSnapshotRequest { + // Required. The updated snapshot object. + Snapshot snapshot = 1 [(google.api.field_behavior) = REQUIRED]; + + // Required. Indicates which fields in the provided snapshot to update. + // Must be specified and non-empty. + google.protobuf.FieldMask update_mask = 2 + [(google.api.field_behavior) = REQUIRED]; +} + +// A snapshot resource. Snapshots are used in +// [Seek](https://cloud.google.com/pubsub/docs/replay-overview) +// operations, which allow you to manage message acknowledgments in bulk. That +// is, you can set the acknowledgment state of messages in an existing +// subscription to the state captured by a snapshot. +message Snapshot { + option (google.api.resource) = { + type: "pubsub.googleapis.com/Snapshot" + pattern: "projects/{project}/snapshots/{snapshot}" + }; + + // Optional. The name of the snapshot. + string name = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The name of the topic from which this snapshot is retaining + // messages. + string topic = 2 [ + (google.api.field_behavior) = OPTIONAL, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Topic" } + ]; + + // Optional. The snapshot is guaranteed to exist up until this time. + // A newly-created snapshot expires no later than 7 days from the time of its + // creation. Its exact lifetime is determined at creation by the existing + // backlog in the source subscription. Specifically, the lifetime of the + // snapshot is `7 days - (age of oldest unacked message in the subscription)`. + // For example, consider a subscription whose oldest unacked message is 3 days + // old. If a snapshot is created from this subscription, the snapshot -- which + // will always capture this 3-day-old backlog as long as the snapshot + // exists -- will expire in 4 days. The service will refuse to create a + // snapshot that would expire in less than 1 hour after creation. + google.protobuf.Timestamp expire_time = 3 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. See [Creating and managing labels] + // (https://cloud.google.com/pubsub/docs/labels). + map labels = 4 [(google.api.field_behavior) = OPTIONAL]; +} + +// Request for the GetSnapshot method. +message GetSnapshotRequest { + // Required. The name of the snapshot to get. + // Format is `projects/{project}/snapshots/{snap}`. + string snapshot = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Snapshot" } + ]; +} + +// Request for the `ListSnapshots` method. +message ListSnapshotsRequest { + // Required. The name of the project in which to list snapshots. + // Format is `projects/{project-id}`. + string project = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "cloudresourcemanager.googleapis.com/Project" + } + ]; + + // Optional. Maximum number of snapshots to return. + int32 page_size = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The value returned by the last `ListSnapshotsResponse`; indicates + // that this is a continuation of a prior `ListSnapshots` call, and that the + // system should return the next page of data. + string page_token = 3 [(google.api.field_behavior) = OPTIONAL]; +} + +// Response for the `ListSnapshots` method. +message ListSnapshotsResponse { + // Optional. The resulting snapshots. + repeated Snapshot snapshots = 1 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. If not empty, indicates that there may be more snapshot that + // match the request; this value should be passed in a new + // `ListSnapshotsRequest`. + string next_page_token = 2 [(google.api.field_behavior) = OPTIONAL]; +} + +// Request for the `DeleteSnapshot` method. +message DeleteSnapshotRequest { + // Required. The name of the snapshot to delete. + // Format is `projects/{project}/snapshots/{snap}`. + string snapshot = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Snapshot" } + ]; +} + +// Request for the `Seek` method. +message SeekRequest { + // Required. The subscription to affect. + string subscription = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "pubsub.googleapis.com/Subscription" + } + ]; + + oneof target { + // Optional. The time to seek to. + // Messages retained in the subscription that were published before this + // time are marked as acknowledged, and messages retained in the + // subscription that were published after this time are marked as + // unacknowledged. Note that this operation affects only those messages + // retained in the subscription (configured by the combination of + // `message_retention_duration` and `retain_acked_messages`). For example, + // if `time` corresponds to a point before the message retention + // window (or to a point before the system's notion of the subscription + // creation time), only retained messages will be marked as unacknowledged, + // and already-expunged messages will not be restored. + google.protobuf.Timestamp time = 2 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The snapshot to seek to. The snapshot's topic must be the same + // as that of the provided subscription. Format is + // `projects/{project}/snapshots/{snap}`. + string snapshot = 3 [ + (google.api.field_behavior) = OPTIONAL, + (google.api.resource_reference) = { + type: "pubsub.googleapis.com/Snapshot" + } + ]; + } +} + +// Response for the `Seek` method (this response is empty). +message SeekResponse {} diff --git a/modules/grpc/protos/googleapis-proto/google/pubsub/v1/schema.proto b/modules/grpc/protos/googleapis-proto/google/pubsub/v1/schema.proto new file mode 100644 index 000000000..d52c678c5 --- /dev/null +++ b/modules/grpc/protos/googleapis-proto/google/pubsub/v1/schema.proto @@ -0,0 +1,410 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.pubsub.v1; + +import "google/api/annotations.proto"; +import "google/api/client.proto"; +import "google/api/field_behavior.proto"; +import "google/api/resource.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; + +option cc_enable_arenas = true; +option csharp_namespace = "Google.Cloud.PubSub.V1"; +option go_package = "cloud.google.com/go/pubsub/apiv1/pubsubpb;pubsubpb"; +option java_multiple_files = true; +option java_outer_classname = "SchemaProto"; +option java_package = "com.google.pubsub.v1"; +option php_namespace = "Google\\Cloud\\PubSub\\V1"; +option ruby_package = "Google::Cloud::PubSub::V1"; + +// Service for doing schema-related operations. +service SchemaService { + option (google.api.default_host) = "pubsub.googleapis.com"; + option (google.api.oauth_scopes) = + "https://www.googleapis.com/auth/cloud-platform," + "https://www.googleapis.com/auth/pubsub"; + + // Creates a schema. + rpc CreateSchema(CreateSchemaRequest) returns (Schema) { + option (google.api.http) = { + post: "/v1/{parent=projects/*}/schemas" + body: "schema" + }; + option (google.api.method_signature) = "parent,schema,schema_id"; + } + + // Gets a schema. + rpc GetSchema(GetSchemaRequest) returns (Schema) { + option (google.api.http) = { + get: "/v1/{name=projects/*/schemas/*}" + }; + option (google.api.method_signature) = "name"; + } + + // Lists schemas in a project. + rpc ListSchemas(ListSchemasRequest) returns (ListSchemasResponse) { + option (google.api.http) = { + get: "/v1/{parent=projects/*}/schemas" + }; + option (google.api.method_signature) = "parent"; + } + + // Lists all schema revisions for the named schema. + rpc ListSchemaRevisions(ListSchemaRevisionsRequest) + returns (ListSchemaRevisionsResponse) { + option (google.api.http) = { + get: "/v1/{name=projects/*/schemas/*}:listRevisions" + }; + option (google.api.method_signature) = "name"; + } + + // Commits a new schema revision to an existing schema. + rpc CommitSchema(CommitSchemaRequest) returns (Schema) { + option (google.api.http) = { + post: "/v1/{name=projects/*/schemas/*}:commit" + body: "*" + }; + option (google.api.method_signature) = "name,schema"; + } + + // Creates a new schema revision that is a copy of the provided revision_id. + rpc RollbackSchema(RollbackSchemaRequest) returns (Schema) { + option (google.api.http) = { + post: "/v1/{name=projects/*/schemas/*}:rollback" + body: "*" + }; + option (google.api.method_signature) = "name,revision_id"; + } + + // Deletes a specific schema revision. + rpc DeleteSchemaRevision(DeleteSchemaRevisionRequest) returns (Schema) { + option (google.api.http) = { + delete: "/v1/{name=projects/*/schemas/*}:deleteRevision" + }; + option (google.api.method_signature) = "name,revision_id"; + } + + // Deletes a schema. + rpc DeleteSchema(DeleteSchemaRequest) returns (google.protobuf.Empty) { + option (google.api.http) = { + delete: "/v1/{name=projects/*/schemas/*}" + }; + option (google.api.method_signature) = "name"; + } + + // Validates a schema. + rpc ValidateSchema(ValidateSchemaRequest) returns (ValidateSchemaResponse) { + option (google.api.http) = { + post: "/v1/{parent=projects/*}/schemas:validate" + body: "*" + }; + option (google.api.method_signature) = "parent,schema"; + } + + // Validates a message against a schema. + rpc ValidateMessage(ValidateMessageRequest) + returns (ValidateMessageResponse) { + option (google.api.http) = { + post: "/v1/{parent=projects/*}/schemas:validateMessage" + body: "*" + }; + } +} + +// A schema resource. +message Schema { + option (google.api.resource) = { + type: "pubsub.googleapis.com/Schema" + pattern: "projects/{project}/schemas/{schema}" + }; + + // Possible schema definition types. + enum Type { + // Default value. This value is unused. + TYPE_UNSPECIFIED = 0; + + // A Protocol Buffer schema definition. + PROTOCOL_BUFFER = 1; + + // An Avro schema definition. + AVRO = 2; + } + + // Required. Name of the schema. + // Format is `projects/{project}/schemas/{schema}`. + string name = 1 [(google.api.field_behavior) = REQUIRED]; + + // The type of the schema definition. + Type type = 2; + + // The definition of the schema. This should contain a string representing + // the full definition of the schema that is a valid schema definition of + // the type specified in `type`. + string definition = 3; + + // Output only. Immutable. The revision ID of the schema. + string revision_id = 4 [ + (google.api.field_behavior) = IMMUTABLE, + (google.api.field_behavior) = OUTPUT_ONLY + ]; + + // Output only. The timestamp that the revision was created. + google.protobuf.Timestamp revision_create_time = 6 + [(google.api.field_behavior) = OUTPUT_ONLY]; +} + +// View of Schema object fields to be returned by GetSchema and ListSchemas. +enum SchemaView { + // The default / unset value. + // The API will default to the BASIC view. + SCHEMA_VIEW_UNSPECIFIED = 0; + + // Include the name and type of the schema, but not the definition. + BASIC = 1; + + // Include all Schema object fields. + FULL = 2; +} + +// Request for the CreateSchema method. +message CreateSchemaRequest { + // Required. The name of the project in which to create the schema. + // Format is `projects/{project-id}`. + string parent = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + child_type: "pubsub.googleapis.com/Schema" + } + ]; + + // Required. The schema object to create. + // + // This schema's `name` parameter is ignored. The schema object returned + // by CreateSchema will have a `name` made using the given `parent` and + // `schema_id`. + Schema schema = 2 [(google.api.field_behavior) = REQUIRED]; + + // The ID to use for the schema, which will become the final component of + // the schema's resource name. + // + // See https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names for + // resource name constraints. + string schema_id = 3; +} + +// Request for the GetSchema method. +message GetSchemaRequest { + // Required. The name of the schema to get. + // Format is `projects/{project}/schemas/{schema}`. + string name = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Schema" } + ]; + + // The set of fields to return in the response. If not set, returns a Schema + // with all fields filled out. Set to `BASIC` to omit the `definition`. + SchemaView view = 2; +} + +// Request for the `ListSchemas` method. +message ListSchemasRequest { + // Required. The name of the project in which to list schemas. + // Format is `projects/{project-id}`. + string parent = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "cloudresourcemanager.googleapis.com/Project" + } + ]; + + // The set of Schema fields to return in the response. If not set, returns + // Schemas with `name` and `type`, but not `definition`. Set to `FULL` to + // retrieve all fields. + SchemaView view = 2; + + // Maximum number of schemas to return. + int32 page_size = 3; + + // The value returned by the last `ListSchemasResponse`; indicates that + // this is a continuation of a prior `ListSchemas` call, and that the + // system should return the next page of data. + string page_token = 4; +} + +// Response for the `ListSchemas` method. +message ListSchemasResponse { + // The resulting schemas. + repeated Schema schemas = 1; + + // If not empty, indicates that there may be more schemas that match the + // request; this value should be passed in a new `ListSchemasRequest`. + string next_page_token = 2; +} + +// Request for the `ListSchemaRevisions` method. +message ListSchemaRevisionsRequest { + // Required. The name of the schema to list revisions for. + string name = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Schema" } + ]; + + // The set of Schema fields to return in the response. If not set, returns + // Schemas with `name` and `type`, but not `definition`. Set to `FULL` to + // retrieve all fields. + SchemaView view = 2; + + // The maximum number of revisions to return per page. + int32 page_size = 3; + + // The page token, received from a previous ListSchemaRevisions call. + // Provide this to retrieve the subsequent page. + string page_token = 4; +} + +// Response for the `ListSchemaRevisions` method. +message ListSchemaRevisionsResponse { + // The revisions of the schema. + repeated Schema schemas = 1; + + // A token that can be sent as `page_token` to retrieve the next page. + // If this field is empty, there are no subsequent pages. + string next_page_token = 2; +} + +// Request for CommitSchema method. +message CommitSchemaRequest { + // Required. The name of the schema we are revising. + // Format is `projects/{project}/schemas/{schema}`. + string name = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Schema" } + ]; + + // Required. The schema revision to commit. + Schema schema = 2 [(google.api.field_behavior) = REQUIRED]; +} + +// Request for the `RollbackSchema` method. +message RollbackSchemaRequest { + // Required. The schema being rolled back with revision id. + string name = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Schema" } + ]; + + // Required. The revision ID to roll back to. + // It must be a revision of the same schema. + // + // Example: c7cfa2a8 + string revision_id = 2 [(google.api.field_behavior) = REQUIRED]; +} + +// Request for the `DeleteSchemaRevision` method. +message DeleteSchemaRevisionRequest { + // Required. The name of the schema revision to be deleted, with a revision ID + // explicitly included. + // + // Example: `projects/123/schemas/my-schema@c7cfa2a8` + string name = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Schema" } + ]; + + // Optional. This field is deprecated and should not be used for specifying + // the revision ID. The revision ID should be specified via the `name` + // parameter. + string revision_id = 2 + [deprecated = true, (google.api.field_behavior) = OPTIONAL]; +} + +// Request for the `DeleteSchema` method. +message DeleteSchemaRequest { + // Required. Name of the schema to delete. + // Format is `projects/{project}/schemas/{schema}`. + string name = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Schema" } + ]; +} + +// Request for the `ValidateSchema` method. +message ValidateSchemaRequest { + // Required. The name of the project in which to validate schemas. + // Format is `projects/{project-id}`. + string parent = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "cloudresourcemanager.googleapis.com/Project" + } + ]; + + // Required. The schema object to validate. + Schema schema = 2 [(google.api.field_behavior) = REQUIRED]; +} + +// Response for the `ValidateSchema` method. +// Empty for now. +message ValidateSchemaResponse {} + +// Request for the `ValidateMessage` method. +message ValidateMessageRequest { + // Required. The name of the project in which to validate schemas. + // Format is `projects/{project-id}`. + string parent = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "cloudresourcemanager.googleapis.com/Project" + } + ]; + + oneof schema_spec { + // Name of the schema against which to validate. + // + // Format is `projects/{project}/schemas/{schema}`. + string name = 2 [ + (google.api.resource_reference) = { type: "pubsub.googleapis.com/Schema" } + ]; + + // Ad-hoc schema against which to validate + Schema schema = 3; + } + + // Message to validate against the provided `schema_spec`. + bytes message = 4; + + // The encoding expected for messages + Encoding encoding = 5; +} + +// Response for the `ValidateMessage` method. +// Empty for now. +message ValidateMessageResponse {} + +// Possible encoding types for messages. +enum Encoding { + // Unspecified + ENCODING_UNSPECIFIED = 0; + + // JSON encoding + JSON = 1; + + // Binary encoding, as defined by the schema type. For some schema types, + // binary encoding may not be available. + BINARY = 2; +} From f4b42ef753e8d1d15930bc522da71a5fe7eacb74 Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Sun, 10 Nov 2024 11:35:26 +0100 Subject: [PATCH 03/10] grpc/pubsub: add boilerplates Signed-off-by: Attila Szakacs --- configure.ac | 2 +- docker/apkbuild/axoflow/axosyslog/APKBUILD | 1 + modules/grpc/CMakeLists.txt | 1 + modules/grpc/Makefile.am | 1 + modules/grpc/pubsub/CMakeLists.txt | 36 ++++++++ modules/grpc/pubsub/Makefile.am | 71 +++++++++++++++ modules/grpc/pubsub/pubsub-dest-worker.cpp | 52 +++++++++++ modules/grpc/pubsub/pubsub-dest-worker.hpp | 50 +++++++++++ modules/grpc/pubsub/pubsub-dest.cpp | 95 +++++++++++++++++++++ modules/grpc/pubsub/pubsub-dest.h | 37 ++++++++ modules/grpc/pubsub/pubsub-dest.hpp | 56 ++++++++++++ modules/grpc/pubsub/pubsub-grammar.ym | 82 ++++++++++++++++++ modules/grpc/pubsub/pubsub-parser.c | 51 +++++++++++ modules/grpc/pubsub/pubsub-parser.h | 34 ++++++++ modules/grpc/pubsub/pubsub-plugin.c | 56 ++++++++++++ packaging/debian/axosyslog-mod-grpc.install | 1 + packaging/debian/control | 1 + packaging/rhel/axosyslog.spec | 3 +- tests/copyright/policy | 1 + 19 files changed, 629 insertions(+), 2 deletions(-) create mode 100644 modules/grpc/pubsub/CMakeLists.txt create mode 100644 modules/grpc/pubsub/Makefile.am create mode 100644 modules/grpc/pubsub/pubsub-dest-worker.cpp create mode 100644 modules/grpc/pubsub/pubsub-dest-worker.hpp create mode 100644 modules/grpc/pubsub/pubsub-dest.cpp create mode 100644 modules/grpc/pubsub/pubsub-dest.h create mode 100644 modules/grpc/pubsub/pubsub-dest.hpp create mode 100644 modules/grpc/pubsub/pubsub-grammar.ym create mode 100644 modules/grpc/pubsub/pubsub-parser.c create mode 100644 modules/grpc/pubsub/pubsub-parser.h create mode 100644 modules/grpc/pubsub/pubsub-plugin.c diff --git a/configure.ac b/configure.ac index 8fe929ff0..f906a0dfa 100644 --- a/configure.ac +++ b/configure.ac @@ -313,7 +313,7 @@ AC_ARG_WITH(libpaho-mqtt, [use libpaho-mqtt library from (prefix) directory DIR]),,) AC_ARG_ENABLE(grpc, - [ --enable-grpc Enable GRPC based modules support (OpenTelemetry, Loki, BigQuery, ClickHouse) (default: auto)] + [ --enable-grpc Enable GRPC based modules support (OpenTelemetry, Loki, BigQuery, Pub/Sub, ClickHouse) (default: auto)] ,,enable_grpc="auto") AC_ARG_WITH(protoc, diff --git a/docker/apkbuild/axoflow/axosyslog/APKBUILD b/docker/apkbuild/axoflow/axosyslog/APKBUILD index df1f13ff4..f165ef901 100644 --- a/docker/apkbuild/axoflow/axosyslog/APKBUILD +++ b/docker/apkbuild/axoflow/axosyslog/APKBUILD @@ -155,6 +155,7 @@ _grpc() { _submv usr/lib/syslog-ng/libotel.so \ usr/lib/syslog-ng/libloki.so \ usr/lib/syslog-ng/libbigquery.so \ + usr/lib/syslog-ng/libpubsub.so \ usr/lib/syslog-ng/libclickhouse.so } diff --git a/modules/grpc/CMakeLists.txt b/modules/grpc/CMakeLists.txt index 620468ebb..27c6db27a 100644 --- a/modules/grpc/CMakeLists.txt +++ b/modules/grpc/CMakeLists.txt @@ -80,4 +80,5 @@ endif() add_subdirectory(loki) add_subdirectory(otel) add_subdirectory(bigquery) +add_subdirectory(pubsub) add_subdirectory(clickhouse) diff --git a/modules/grpc/Makefile.am b/modules/grpc/Makefile.am index 7d07de79d..817eb1c3d 100644 --- a/modules/grpc/Makefile.am +++ b/modules/grpc/Makefile.am @@ -5,6 +5,7 @@ include modules/grpc/common/Makefile.am include modules/grpc/otel/Makefile.am include modules/grpc/loki/Makefile.am include modules/grpc/bigquery/Makefile.am +include modules/grpc/pubsub/Makefile.am include modules/grpc/clickhouse/Makefile.am if ENABLE_GRPC diff --git a/modules/grpc/pubsub/CMakeLists.txt b/modules/grpc/pubsub/CMakeLists.txt new file mode 100644 index 000000000..2287409b4 --- /dev/null +++ b/modules/grpc/pubsub/CMakeLists.txt @@ -0,0 +1,36 @@ +if(NOT ENABLE_GRPC) + return() +endif() + +set(PUBSUB_CPP_SOURCES + ${GRPC_METRICS_SOURCES} + pubsub-dest.hpp + pubsub-dest.cpp + pubsub-dest.h + pubsub-dest-worker.hpp + pubsub-dest-worker.cpp +) + +set(PUBSUB_SOURCES + pubsub-plugin.c + pubsub-parser.c + pubsub-parser.h +) + +add_module( + TARGET pubsub-cpp + SOURCES ${PUBSUB_CPP_SOURCES} + DEPENDS ${MODULE_GRPC_LIBS} grpc-protos grpc-common-cpp + INCLUDES ${PUBSUB_PROTO_BUILDDIR} ${PROJECT_SOURCE_DIR}/modules/grpc + LIBRARY_TYPE STATIC +) + +add_module( + TARGET pubsub + GRAMMAR pubsub-grammar + DEPENDS pubsub-cpp grpc-common-cpp + INCLUDES ${PROJECT_SOURCE_DIR}/modules/grpc + SOURCES ${PUBSUB_SOURCES} +) + +set_target_properties(pubsub PROPERTIES INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib;${CMAKE_INSTALL_PREFIX}/lib/syslog-ng") diff --git a/modules/grpc/pubsub/Makefile.am b/modules/grpc/pubsub/Makefile.am new file mode 100644 index 000000000..bded8bcd6 --- /dev/null +++ b/modules/grpc/pubsub/Makefile.am @@ -0,0 +1,71 @@ +if ENABLE_GRPC + +noinst_LTLIBRARIES += modules/grpc/pubsub/libpubsub_cpp.la + +modules_grpc_pubsub_libpubsub_cpp_la_SOURCES = \ + modules/grpc/pubsub/pubsub-dest.h \ + modules/grpc/pubsub/pubsub-dest.hpp \ + modules/grpc/pubsub/pubsub-dest.cpp \ + modules/grpc/pubsub/pubsub-dest-worker.hpp \ + modules/grpc/pubsub/pubsub-dest-worker.cpp + +modules_grpc_pubsub_libpubsub_cpp_la_CXXFLAGS = \ + $(AM_CXXFLAGS) \ + $(PROTOBUF_CFLAGS) \ + $(GRPCPP_CFLAGS) \ + $(GRPC_COMMON_CFLAGS) \ + -I$(GOOGLEAPIS_PROTO_BUILDDIR) \ + -I$(top_srcdir)/modules/grpc \ + -I$(top_srcdir)/modules/grpc/pubsub \ + -I$(top_builddir)/modules/grpc/pubsub + +modules_grpc_pubsub_libpubsub_cpp_la_LIBADD = $(MODULE_DEPS_LIBS) $(PROTOBUF_LIBS) $(GRPCPP_LIBS) +modules_grpc_pubsub_libpubsub_cpp_la_LDFLAGS = $(MODULE_LDFLAGS) +EXTRA_modules_grpc_pubsub_libpubsub_cpp_la_DEPENDENCIES = $(MODULE_DEPS_LIBS) + +module_LTLIBRARIES += modules/grpc/pubsub/libpubsub.la + +modules_grpc_pubsub_libpubsub_la_SOURCES = \ + modules/grpc/pubsub/pubsub-grammar.y \ + modules/grpc/pubsub/pubsub-parser.c \ + modules/grpc/pubsub/pubsub-parser.h \ + modules/grpc/pubsub/pubsub-plugin.c + +modules_grpc_pubsub_libpubsub_la_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + $(GRPC_COMMON_CFLAGS) \ + -I$(top_srcdir)/modules/grpc/pubsub \ + -I$(top_builddir)/modules/grpc/pubsub \ + -I$(top_srcdir)/modules/grpc + +modules_grpc_pubsub_libpubsub_la_LIBADD = \ + $(MODULE_DEPS_LIBS) \ + $(GRPC_COMMON_LIBS) \ + $(top_builddir)/modules/grpc/protos/libgrpc-protos.la \ + $(top_builddir)/modules/grpc/pubsub/libpubsub_cpp.la + +nodist_EXTRA_modules_grpc_pubsub_libpubsub_la_SOURCES = force-cpp-linker-with-default-stdlib.cpp + +modules_grpc_pubsub_libpubsub_la_LDFLAGS = $(MODULE_LDFLAGS) +EXTRA_modules_grpc_pubsub_libpubsub_la_DEPENDENCIES = \ + $(MODULE_DEPS_LIBS) \ + $(GRPC_COMMON_LIBS) \ + $(top_builddir)/modules/grpc/protos/libgrpc-protos.la \ + $(top_builddir)/modules/grpc/pubsub/libpubsub_cpp.la + +modules/grpc/pubsub modules/grpc/pubsub/ mod-pubsub: modules/grpc/pubsub/libpubsub.la + +else +modules/grpc/pubsub modules/grpc/pubsub/ mod-pubsub: +endif + +BUILT_SOURCES += \ + modules/grpc/pubsub/pubsub-grammar.y \ + modules/grpc/pubsub/pubsub-grammar.c \ + modules/grpc/pubsub/pubsub-grammar.h + +EXTRA_DIST += \ + modules/grpc/pubsub/pubsub-grammar.ym \ + modules/grpc/pubsub/CMakeLists.txt + +.PHONY: modules/grpc/pubsub/ mod-pubsub diff --git a/modules/grpc/pubsub/pubsub-dest-worker.cpp b/modules/grpc/pubsub/pubsub-dest-worker.cpp new file mode 100644 index 000000000..b57ffbc70 --- /dev/null +++ b/modules/grpc/pubsub/pubsub-dest-worker.cpp @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2024 Axoflow + * Copyright (c) 2024 Attila Szakacs + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include "pubsub-dest-worker.hpp" +#include "pubsub-dest.hpp" + + +using syslogng::grpc::pubsub::DestWorker; +using syslogng::grpc::pubsub::DestDriver; + +DestWorker::DestWorker(GrpcDestWorker *s) + : syslogng::grpc::DestWorker(s) +{ +} + +LogThreadedResult +DestWorker::insert(LogMessage *msg) +{ + return LTR_NOT_CONNECTED; +} + +LogThreadedResult +DestWorker::flush(LogThreadedFlushMode mode) +{ + return LTR_ERROR; +} + +DestDriver * +DestWorker::get_owner() +{ + return pubsub_dd_get_cpp(this->owner.super); +} diff --git a/modules/grpc/pubsub/pubsub-dest-worker.hpp b/modules/grpc/pubsub/pubsub-dest-worker.hpp new file mode 100644 index 000000000..2a46d2a2c --- /dev/null +++ b/modules/grpc/pubsub/pubsub-dest-worker.hpp @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2024 Axoflow + * Copyright (c) 2024 Attila Szakacs + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#ifndef PUBSUB_DEST_WORKER_HPP +#define PUBSUB_DEST_WORKER_HPP + +#include "pubsub-dest.hpp" +#include "grpc-dest-worker.hpp" + +namespace syslogng { +namespace grpc { +namespace pubsub { + +class DestWorker final : public syslogng::grpc::DestWorker +{ +public: + DestWorker(GrpcDestWorker *s); + + LogThreadedResult insert(LogMessage *msg); + LogThreadedResult flush(LogThreadedFlushMode mode); + +private: + DestDriver *get_owner(); +}; + +} +} +} + +#endif diff --git a/modules/grpc/pubsub/pubsub-dest.cpp b/modules/grpc/pubsub/pubsub-dest.cpp new file mode 100644 index 000000000..7cd47fd94 --- /dev/null +++ b/modules/grpc/pubsub/pubsub-dest.cpp @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2024 Axoflow + * Copyright (c) 2024 Attila Szakacs + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include "pubsub-dest.hpp" +#include "pubsub-dest-worker.hpp" +#include "grpc-dest-worker.hpp" + +#include "compat/cpp-start.h" +#include "messages.h" +#include "compat/cpp-end.h" + +using syslogng::grpc::pubsub::DestDriver; + +DestDriver::DestDriver(GrpcDestDriver *s) + : syslogng::grpc::DestDriver(s) +{ +} + +bool +DestDriver::init() +{ + return syslogng::grpc::DestDriver::init(); +} + +const gchar * +DestDriver::generate_persist_name() +{ + static gchar persist_name[1024]; + + // TODO: update when options are available + + LogPipe *s = &this->super->super.super.super.super; + if (s->persist_name) + g_snprintf(persist_name, sizeof(persist_name), "google_pubsub_grpc.%s", s->persist_name); + else + g_snprintf(persist_name, sizeof(persist_name), "google_pubsub_grpc(%s)", this->url.c_str()); + + return persist_name; +} + +const gchar * +DestDriver::format_stats_key(StatsClusterKeyBuilder *kb) +{ + // TODO: update when options are available + + stats_cluster_key_builder_add_legacy_label(kb, stats_cluster_label("driver", "pubsub")); + stats_cluster_key_builder_add_legacy_label(kb, stats_cluster_label("url", this->url.c_str())); + + return nullptr; +} + +LogThreadedDestWorker * +DestDriver::construct_worker(int worker_index) +{ + GrpcDestWorker *worker = grpc_dw_new(this->super, worker_index); + worker->cpp = new DestWorker(worker); + return &worker->super; +} + + +/* C Wrappers */ + +DestDriver * +pubsub_dd_get_cpp(GrpcDestDriver *self) +{ + return (DestDriver *) self->cpp; +} + +LogDriver * +pubsub_dd_new(GlobalConfig *cfg) +{ + GrpcDestDriver *self = grpc_dd_new(cfg, "google_pubsub_grpc"); + self->cpp = new DestDriver(self); + return &self->super.super.super; +} diff --git a/modules/grpc/pubsub/pubsub-dest.h b/modules/grpc/pubsub/pubsub-dest.h new file mode 100644 index 000000000..91a91a40a --- /dev/null +++ b/modules/grpc/pubsub/pubsub-dest.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2024 Axoflow + * Copyright (c) 2024 Attila Szakacs + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#ifndef PUBSUB_DEST_H +#define PUBSUB_DEST_H + +#include "syslog-ng.h" + +#include "compat/cpp-start.h" +#include "driver.h" +#include "template/templates.h" + +LogDriver *pubsub_dd_new(GlobalConfig *cfg); + +#include "compat/cpp-end.h" + +#endif diff --git a/modules/grpc/pubsub/pubsub-dest.hpp b/modules/grpc/pubsub/pubsub-dest.hpp new file mode 100644 index 000000000..36ff9488b --- /dev/null +++ b/modules/grpc/pubsub/pubsub-dest.hpp @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024 Axoflow + * Copyright (c) 2024 Attila Szakacs + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#ifndef PUBSUB_DEST_HPP +#define PUBSUB_DEST_HPP + +#include "pubsub-dest.h" +#include "grpc-dest.hpp" + +#include + +namespace syslogng { +namespace grpc { +namespace pubsub { + +class DestDriver final : public syslogng::grpc::DestDriver +{ +public: + DestDriver(GrpcDestDriver *s); + bool init(); + const gchar *generate_persist_name(); + const gchar *format_stats_key(StatsClusterKeyBuilder *kb); + LogThreadedDestWorker *construct_worker(int worker_index); + +private: + friend class DestWorker; +}; + + +} +} +} + +syslogng::grpc::pubsub::DestDriver *pubsub_dd_get_cpp(GrpcDestDriver *self); + +#endif diff --git a/modules/grpc/pubsub/pubsub-grammar.ym b/modules/grpc/pubsub/pubsub-grammar.ym new file mode 100644 index 000000000..aed605c22 --- /dev/null +++ b/modules/grpc/pubsub/pubsub-grammar.ym @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2024 Axoflow + * Copyright (c) 2024 Attila Szakacs + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +/* REQUIRE modules/grpc/common/grpc-grammar.ym */ + +%code top { +#include "pubsub-parser.h" + +} + + +%code { + +#include "cfg-grammar-internal.h" +#include "plugin.h" +#include "syslog-names.h" + +#include "pubsub-dest.h" + +#include + +} + +%define api.prefix {pubsub_} + +%lex-param {CfgLexer *lexer} +%parse-param {CfgLexer *lexer} +%parse-param {LogDriver **instance} +%parse-param {gpointer arg} + +/* INCLUDE_DECLS */ + +%token KW_GOOGLE_PUBSUB_GRPC + +%type pubsub_dest + +%% + +start + : LL_CONTEXT_DESTINATION pubsub_dest { YYACCEPT; } + ; + +pubsub_dest + : KW_GOOGLE_PUBSUB_GRPC + { + last_driver = *instance = pubsub_dd_new(configuration); + } + '(' _inner_dest_context_push pubsub_dest_options _inner_dest_context_pop ')' { $$ = last_driver; } + ; + +pubsub_dest_options + : pubsub_dest_option pubsub_dest_options + | + ; + +pubsub_dest_option + : grpc_dest_general_option + ; + +/* INCLUDE_RULES */ + +%% diff --git a/modules/grpc/pubsub/pubsub-parser.c b/modules/grpc/pubsub/pubsub-parser.c new file mode 100644 index 000000000..a1cbe0403 --- /dev/null +++ b/modules/grpc/pubsub/pubsub-parser.c @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024 Axoflow + * Copyright (c) 2024 Attila Szakacs + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include "driver.h" +#include "cfg-parser.h" +#include "pubsub-grammar.h" +#include "grpc-parser.h" + +extern int pubsub_debug; + +int pubsub_parse(CfgLexer *lexer, LogDriver **instance, gpointer arg); + +static CfgLexerKeyword pubsub_keywords[] = +{ + GRPC_KEYWORDS, + { "google_pubsub_grpc", KW_GOOGLE_PUBSUB_GRPC }, + { NULL } +}; + +CfgParser pubsub_parser = +{ +#if SYSLOG_NG_ENABLE_DEBUG + .debug_flag = &pubsub_debug, +#endif + .name = "google_pubsub_grpc", + .keywords = pubsub_keywords, + .parse = (gint (*)(CfgLexer *, gpointer *, gpointer)) pubsub_parse, + .cleanup = (void (*)(gpointer)) log_pipe_unref, +}; + +CFG_PARSER_IMPLEMENT_LEXER_BINDING(pubsub_, PUBSUB_, LogDriver **) diff --git a/modules/grpc/pubsub/pubsub-parser.h b/modules/grpc/pubsub/pubsub-parser.h new file mode 100644 index 000000000..a03db9b45 --- /dev/null +++ b/modules/grpc/pubsub/pubsub-parser.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024 Axoflow + * Copyright (c) 2024 Attila Szakacs + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#ifndef PUBSUB_PARSER_H_INCLUDED +#define PUBSUB_PARSER_H_INCLUDED + +#include "cfg-parser.h" +#include "driver.h" + +extern CfgParser pubsub_parser; + +CFG_PARSER_DECLARE_LEXER_BINDING(pubsub_, PUBSUB_, LogDriver **) + +#endif diff --git a/modules/grpc/pubsub/pubsub-plugin.c b/modules/grpc/pubsub/pubsub-plugin.c new file mode 100644 index 000000000..f87c3a656 --- /dev/null +++ b/modules/grpc/pubsub/pubsub-plugin.c @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024 Axoflow + * Copyright (c) 2024 Attila Szakacs + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include "cfg-parser.h" +#include "plugin.h" +#include "plugin-types.h" +#include "protos/apphook.h" + +extern CfgParser pubsub_parser; + +static Plugin pubsub_plugins[] = +{ + { + .type = LL_CONTEXT_DESTINATION, + .name = "google_pubsub_grpc", + .parser = &pubsub_parser, + }, +}; + +gboolean +google_pubsub_grpc_module_init(PluginContext *context, CfgArgs *args) +{ + plugin_register(context, pubsub_plugins, G_N_ELEMENTS(pubsub_plugins)); + grpc_register_global_initializers(); + return TRUE; +} + +const ModuleInfo module_info = +{ + .canonical_name = "google_pubsub_grpc", + .version = SYSLOG_NG_VERSION, + .description = "Google Pub/Sub gRPC plugins", + .core_revision = SYSLOG_NG_SOURCE_REVISION, + .plugins = pubsub_plugins, + .plugins_len = G_N_ELEMENTS(pubsub_plugins), +}; diff --git a/packaging/debian/axosyslog-mod-grpc.install b/packaging/debian/axosyslog-mod-grpc.install index ec14a3342..fba3c2a8d 100644 --- a/packaging/debian/axosyslog-mod-grpc.install +++ b/packaging/debian/axosyslog-mod-grpc.install @@ -3,4 +3,5 @@ usr/lib/syslog-ng/libgrpc-protos.so.* usr/lib/syslog-ng/*/libotel.so usr/lib/syslog-ng/*/libloki.so usr/lib/syslog-ng/*/libbigquery.so +usr/lib/syslog-ng/*/libpubsub.so usr/lib/syslog-ng/*/libclickhouse.so diff --git a/packaging/debian/control b/packaging/debian/control index d94da87c9..1e7ec5754 100644 --- a/packaging/debian/control +++ b/packaging/debian/control @@ -884,6 +884,7 @@ Description: Enhanced system logging daemon (GRPC plugins) . * OpenTelemetry logs, traces and metrics * Google BigQuery + * Google Pub/Sub * Grafana Loki * ClickHouse diff --git a/packaging/rhel/axosyslog.spec b/packaging/rhel/axosyslog.spec index 08b48bd08..b498e051d 100644 --- a/packaging/rhel/axosyslog.spec +++ b/packaging/rhel/axosyslog.spec @@ -308,7 +308,7 @@ Requires: %{name}%{?_isa} = %{version}-%{release} %description grpc This module provides gRPC plugins that allows receiving and sending logs from/to -OpenTelemetry, Google BigQuery, Grafana Loki and ClickHouse. +OpenTelemetry, Google BigQuery, Google Pub/Sub, Grafana Loki and ClickHouse. %package bpf Summary: BPF support for %{name} @@ -590,6 +590,7 @@ fi %{_libdir}/syslog-ng/libotel.so %{_libdir}/syslog-ng/libloki.so %{_libdir}/syslog-ng/libbigquery.so +%{_libdir}/syslog-ng/libpubsub.so %{_libdir}/syslog-ng/libclickhouse.so %endif diff --git a/tests/copyright/policy b/tests/copyright/policy index b15229128..73724bbb2 100644 --- a/tests/copyright/policy +++ b/tests/copyright/policy @@ -212,6 +212,7 @@ modules/grpc/otel/tests/test-syslog-ng-otlp\.cpp$ modules/grpc/otel/tests/test-otel-filterx\.cpp$ modules/grpc/loki modules/grpc/bigquery +modules/grpc/pubsub modules/grpc/clickhouse modules/grpc/common modules/grpc/protos/apphook\.(cpp|h)$ From f6d7a930ad199ebd8913e2a8ff046f62cd081516 Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Sun, 10 Nov 2024 12:07:23 +0100 Subject: [PATCH 04/10] grpc/pubsub: add project() and topic() options Signed-off-by: Attila Szakacs --- modules/grpc/pubsub/pubsub-dest.cpp | 45 ++++++++++++++++++++++++--- modules/grpc/pubsub/pubsub-dest.h | 3 ++ modules/grpc/pubsub/pubsub-dest.hpp | 17 ++++++++++ modules/grpc/pubsub/pubsub-grammar.ym | 6 +++- modules/grpc/pubsub/pubsub-parser.c | 2 ++ 5 files changed, 67 insertions(+), 6 deletions(-) diff --git a/modules/grpc/pubsub/pubsub-dest.cpp b/modules/grpc/pubsub/pubsub-dest.cpp index 7cd47fd94..a9b4ff614 100644 --- a/modules/grpc/pubsub/pubsub-dest.cpp +++ b/modules/grpc/pubsub/pubsub-dest.cpp @@ -29,16 +29,36 @@ #include "messages.h" #include "compat/cpp-end.h" +#include + using syslogng::grpc::pubsub::DestDriver; DestDriver::DestDriver(GrpcDestDriver *s) : syslogng::grpc::DestDriver(s) { + this->flush_on_key_change = true; +} + +DestDriver::~DestDriver() +{ + log_template_unref(this->project); + log_template_unref(this->topic); } bool DestDriver::init() { + if ((!this->project || strlen(this->project->template_str) == 0) || + (!this->topic || strlen(this->topic->template_str) == 0)) + { + msg_error("Error initializing Google Pub/Sub destination, project() and topic() are mandatory options", + log_pipe_location_tag(&this->super->super.super.super.super)); + return false; + } + + this->extend_worker_partition_key(std::string("project=") + this->project->template_str); + this->extend_worker_partition_key(std::string("topic=") + this->topic->template_str); + return syslogng::grpc::DestDriver::init(); } @@ -47,13 +67,12 @@ DestDriver::generate_persist_name() { static gchar persist_name[1024]; - // TODO: update when options are available - LogPipe *s = &this->super->super.super.super.super; if (s->persist_name) g_snprintf(persist_name, sizeof(persist_name), "google_pubsub_grpc.%s", s->persist_name); else - g_snprintf(persist_name, sizeof(persist_name), "google_pubsub_grpc(%s)", this->url.c_str()); + g_snprintf(persist_name, sizeof(persist_name), "google_pubsub_grpc(%s,%s,%s)", + this->url.c_str(), this->project->template_str, this->topic->template_str); return persist_name; } @@ -61,10 +80,10 @@ DestDriver::generate_persist_name() const gchar * DestDriver::format_stats_key(StatsClusterKeyBuilder *kb) { - // TODO: update when options are available - stats_cluster_key_builder_add_legacy_label(kb, stats_cluster_label("driver", "pubsub")); stats_cluster_key_builder_add_legacy_label(kb, stats_cluster_label("url", this->url.c_str())); + stats_cluster_key_builder_add_legacy_label(kb, stats_cluster_label("project", this->project->template_str)); + stats_cluster_key_builder_add_legacy_label(kb, stats_cluster_label("topic", this->topic->template_str)); return nullptr; } @@ -86,6 +105,22 @@ pubsub_dd_get_cpp(GrpcDestDriver *self) return (DestDriver *) self->cpp; } +void +pubsub_dd_set_project(LogDriver *d, LogTemplate *project) +{ + GrpcDestDriver *self = (GrpcDestDriver *) d; + DestDriver *cpp = pubsub_dd_get_cpp(self); + cpp->set_project(project); +} + +void +pubsub_dd_set_topic(LogDriver *d, LogTemplate *topic) +{ + GrpcDestDriver *self = (GrpcDestDriver *) d; + DestDriver *cpp = pubsub_dd_get_cpp(self); + cpp->set_topic(topic); +} + LogDriver * pubsub_dd_new(GlobalConfig *cfg) { diff --git a/modules/grpc/pubsub/pubsub-dest.h b/modules/grpc/pubsub/pubsub-dest.h index 91a91a40a..d996d96d5 100644 --- a/modules/grpc/pubsub/pubsub-dest.h +++ b/modules/grpc/pubsub/pubsub-dest.h @@ -32,6 +32,9 @@ LogDriver *pubsub_dd_new(GlobalConfig *cfg); +void pubsub_dd_set_project(LogDriver *d, LogTemplate *project); +void pubsub_dd_set_topic(LogDriver *d, LogTemplate *topic); + #include "compat/cpp-end.h" #endif diff --git a/modules/grpc/pubsub/pubsub-dest.hpp b/modules/grpc/pubsub/pubsub-dest.hpp index 36ff9488b..5841707dc 100644 --- a/modules/grpc/pubsub/pubsub-dest.hpp +++ b/modules/grpc/pubsub/pubsub-dest.hpp @@ -37,13 +37,30 @@ class DestDriver final : public syslogng::grpc::DestDriver { public: DestDriver(GrpcDestDriver *s); + ~DestDriver(); bool init(); const gchar *generate_persist_name(); const gchar *format_stats_key(StatsClusterKeyBuilder *kb); LogThreadedDestWorker *construct_worker(int worker_index); + void set_project(LogTemplate *p) + { + log_template_unref(this->project); + this->project = log_template_ref(p); + } + + void set_topic(LogTemplate *t) + { + log_template_unref(this->topic); + this->topic = log_template_ref(t); + } + private: friend class DestWorker; + +private: + LogTemplate *project = nullptr; + LogTemplate *topic = nullptr; }; diff --git a/modules/grpc/pubsub/pubsub-grammar.ym b/modules/grpc/pubsub/pubsub-grammar.ym index aed605c22..2c6315f5c 100644 --- a/modules/grpc/pubsub/pubsub-grammar.ym +++ b/modules/grpc/pubsub/pubsub-grammar.ym @@ -51,6 +51,8 @@ /* INCLUDE_DECLS */ %token KW_GOOGLE_PUBSUB_GRPC +%token KW_PROJECT +%token KW_TOPIC %type pubsub_dest @@ -74,7 +76,9 @@ pubsub_dest_options ; pubsub_dest_option - : grpc_dest_general_option + : KW_PROJECT '(' template_name_or_content ')' { pubsub_dd_set_project(last_driver, $3); log_template_unref($3); } + | KW_TOPIC '(' template_name_or_content ')' { pubsub_dd_set_topic(last_driver, $3); log_template_unref($3); } + | grpc_dest_general_option ; /* INCLUDE_RULES */ diff --git a/modules/grpc/pubsub/pubsub-parser.c b/modules/grpc/pubsub/pubsub-parser.c index a1cbe0403..1b610180b 100644 --- a/modules/grpc/pubsub/pubsub-parser.c +++ b/modules/grpc/pubsub/pubsub-parser.c @@ -34,6 +34,8 @@ static CfgLexerKeyword pubsub_keywords[] = { GRPC_KEYWORDS, { "google_pubsub_grpc", KW_GOOGLE_PUBSUB_GRPC }, + { "project", KW_PROJECT }, + { "topic", KW_TOPIC }, { NULL } }; From e82ddaa83209d4f20be25efdfa5b1190a5b9debd Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Sun, 10 Nov 2024 12:08:08 +0100 Subject: [PATCH 05/10] grpc/pubsub: add service_endpoint() alias to url() To match what we have in the HTTP based SCL. Also set its default value. Signed-off-by: Attila Szakacs --- modules/grpc/pubsub/pubsub-dest.cpp | 1 + modules/grpc/pubsub/pubsub-grammar.ym | 4 +++- modules/grpc/pubsub/pubsub-parser.c | 1 + 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/grpc/pubsub/pubsub-dest.cpp b/modules/grpc/pubsub/pubsub-dest.cpp index a9b4ff614..3331c4010 100644 --- a/modules/grpc/pubsub/pubsub-dest.cpp +++ b/modules/grpc/pubsub/pubsub-dest.cpp @@ -36,6 +36,7 @@ using syslogng::grpc::pubsub::DestDriver; DestDriver::DestDriver(GrpcDestDriver *s) : syslogng::grpc::DestDriver(s) { + this->url = "pubsub.googleapis.com:443"; this->flush_on_key_change = true; } diff --git a/modules/grpc/pubsub/pubsub-grammar.ym b/modules/grpc/pubsub/pubsub-grammar.ym index 2c6315f5c..7e5510e61 100644 --- a/modules/grpc/pubsub/pubsub-grammar.ym +++ b/modules/grpc/pubsub/pubsub-grammar.ym @@ -51,6 +51,7 @@ /* INCLUDE_DECLS */ %token KW_GOOGLE_PUBSUB_GRPC +%token KW_SERVICE_ENDPOINT %token KW_PROJECT %token KW_TOPIC @@ -76,7 +77,8 @@ pubsub_dest_options ; pubsub_dest_option - : KW_PROJECT '(' template_name_or_content ')' { pubsub_dd_set_project(last_driver, $3); log_template_unref($3); } + : KW_SERVICE_ENDPOINT '(' string ')' { grpc_dd_set_url(last_driver, $3); free($3); } + | KW_PROJECT '(' template_name_or_content ')' { pubsub_dd_set_project(last_driver, $3); log_template_unref($3); } | KW_TOPIC '(' template_name_or_content ')' { pubsub_dd_set_topic(last_driver, $3); log_template_unref($3); } | grpc_dest_general_option ; diff --git a/modules/grpc/pubsub/pubsub-parser.c b/modules/grpc/pubsub/pubsub-parser.c index 1b610180b..5b97f2837 100644 --- a/modules/grpc/pubsub/pubsub-parser.c +++ b/modules/grpc/pubsub/pubsub-parser.c @@ -34,6 +34,7 @@ static CfgLexerKeyword pubsub_keywords[] = { GRPC_KEYWORDS, { "google_pubsub_grpc", KW_GOOGLE_PUBSUB_GRPC }, + { "service_endpoint", KW_SERVICE_ENDPOINT }, { "project", KW_PROJECT }, { "topic", KW_TOPIC }, { NULL } From 7509ff59b6bc4b0615db6ff216e08f38a1386bc6 Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Sun, 10 Nov 2024 14:39:18 +0100 Subject: [PATCH 06/10] grpc/pubsub: use ADC for authentication by default Signed-off-by: Attila Szakacs --- modules/grpc/pubsub/pubsub-dest.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/grpc/pubsub/pubsub-dest.cpp b/modules/grpc/pubsub/pubsub-dest.cpp index 3331c4010..25659c884 100644 --- a/modules/grpc/pubsub/pubsub-dest.cpp +++ b/modules/grpc/pubsub/pubsub-dest.cpp @@ -37,6 +37,7 @@ DestDriver::DestDriver(GrpcDestDriver *s) : syslogng::grpc::DestDriver(s) { this->url = "pubsub.googleapis.com:443"; + this->credentials_builder.set_mode(GCAM_ADC); this->flush_on_key_change = true; } From 5386c6697b4bb2957e07890291a6f1aeda0eaa3a Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Sun, 10 Nov 2024 12:33:49 +0100 Subject: [PATCH 07/10] grpc/pubsub: add data() and attributes() options Signed-off-by: Attila Szakacs --- modules/grpc/pubsub/pubsub-dest.cpp | 23 +++++++++++++++++++++++ modules/grpc/pubsub/pubsub-dest.h | 2 ++ modules/grpc/pubsub/pubsub-dest.hpp | 14 ++++++++++++++ modules/grpc/pubsub/pubsub-grammar.ym | 18 ++++++++++++++++++ modules/grpc/pubsub/pubsub-parser.c | 2 ++ 5 files changed, 59 insertions(+) diff --git a/modules/grpc/pubsub/pubsub-dest.cpp b/modules/grpc/pubsub/pubsub-dest.cpp index 25659c884..1d170f976 100644 --- a/modules/grpc/pubsub/pubsub-dest.cpp +++ b/modules/grpc/pubsub/pubsub-dest.cpp @@ -39,12 +39,19 @@ DestDriver::DestDriver(GrpcDestDriver *s) this->url = "pubsub.googleapis.com:443"; this->credentials_builder.set_mode(GCAM_ADC); this->flush_on_key_change = true; + + GlobalConfig *cfg = log_pipe_get_config(&s->super.super.super.super); + LogTemplate *default_data_template = log_template_new(cfg, NULL); + g_assert(log_template_compile(default_data_template, "$MESSAGE", NULL)); + this->set_data(default_data_template); + log_template_unref(default_data_template); } DestDriver::~DestDriver() { log_template_unref(this->project); log_template_unref(this->topic); + log_template_unref(this->data); } bool @@ -123,6 +130,22 @@ pubsub_dd_set_topic(LogDriver *d, LogTemplate *topic) cpp->set_topic(topic); } +void +pubsub_dd_set_data(LogDriver *d, LogTemplate *data) +{ + GrpcDestDriver *self = (GrpcDestDriver *) d; + DestDriver *cpp = pubsub_dd_get_cpp(self); + cpp->set_data(data); +} + +void +pubsub_dd_add_attribute(LogDriver *d, const gchar *name, LogTemplate *value) +{ + GrpcDestDriver *self = (GrpcDestDriver *) d; + DestDriver *cpp = pubsub_dd_get_cpp(self); + cpp->add_attribute(name, value); +} + LogDriver * pubsub_dd_new(GlobalConfig *cfg) { diff --git a/modules/grpc/pubsub/pubsub-dest.h b/modules/grpc/pubsub/pubsub-dest.h index d996d96d5..12ae04880 100644 --- a/modules/grpc/pubsub/pubsub-dest.h +++ b/modules/grpc/pubsub/pubsub-dest.h @@ -34,6 +34,8 @@ LogDriver *pubsub_dd_new(GlobalConfig *cfg); void pubsub_dd_set_project(LogDriver *d, LogTemplate *project); void pubsub_dd_set_topic(LogDriver *d, LogTemplate *topic); +void pubsub_dd_set_data(LogDriver *d, LogTemplate *data); +void pubsub_dd_add_attribute(LogDriver *d, const gchar *name, LogTemplate *value); #include "compat/cpp-end.h" diff --git a/modules/grpc/pubsub/pubsub-dest.hpp b/modules/grpc/pubsub/pubsub-dest.hpp index 5841707dc..22ff14fd4 100644 --- a/modules/grpc/pubsub/pubsub-dest.hpp +++ b/modules/grpc/pubsub/pubsub-dest.hpp @@ -28,6 +28,7 @@ #include "grpc-dest.hpp" #include +#include namespace syslogng { namespace grpc { @@ -55,12 +56,25 @@ class DestDriver final : public syslogng::grpc::DestDriver this->topic = log_template_ref(t); } + void set_data(LogTemplate *d) + { + log_template_unref(this->data); + this->data = log_template_ref(d); + } + + void add_attribute(const std::string &name, LogTemplate *value) + { + this->attributes.push_back(NameValueTemplatePair{name, value}); + } + private: friend class DestWorker; private: LogTemplate *project = nullptr; LogTemplate *topic = nullptr; + LogTemplate *data = nullptr; + std::vector attributes; }; diff --git a/modules/grpc/pubsub/pubsub-grammar.ym b/modules/grpc/pubsub/pubsub-grammar.ym index 7e5510e61..7991bde78 100644 --- a/modules/grpc/pubsub/pubsub-grammar.ym +++ b/modules/grpc/pubsub/pubsub-grammar.ym @@ -54,6 +54,8 @@ %token KW_SERVICE_ENDPOINT %token KW_PROJECT %token KW_TOPIC +%token KW_DATA +%token KW_ATTRIBUTES %type pubsub_dest @@ -80,9 +82,25 @@ pubsub_dest_option : KW_SERVICE_ENDPOINT '(' string ')' { grpc_dd_set_url(last_driver, $3); free($3); } | KW_PROJECT '(' template_name_or_content ')' { pubsub_dd_set_project(last_driver, $3); log_template_unref($3); } | KW_TOPIC '(' template_name_or_content ')' { pubsub_dd_set_topic(last_driver, $3); log_template_unref($3); } + | KW_DATA '(' template_name_or_content ')' { pubsub_dd_set_data(last_driver, $3); log_template_unref($3); } + | KW_ATTRIBUTES '(' pubsub_dest_attributes ')' | grpc_dest_general_option ; +pubsub_dest_attributes + : pubsub_dest_attribute pubsub_dest_attributes + | + ; + +pubsub_dest_attribute + : string LL_ARROW template_name_or_content + { + pubsub_dd_add_attribute(last_driver, $1, $3); + free($1); + log_template_unref($3); + } + ; + /* INCLUDE_RULES */ %% diff --git a/modules/grpc/pubsub/pubsub-parser.c b/modules/grpc/pubsub/pubsub-parser.c index 5b97f2837..5dd4b627f 100644 --- a/modules/grpc/pubsub/pubsub-parser.c +++ b/modules/grpc/pubsub/pubsub-parser.c @@ -37,6 +37,8 @@ static CfgLexerKeyword pubsub_keywords[] = { "service_endpoint", KW_SERVICE_ENDPOINT }, { "project", KW_PROJECT }, { "topic", KW_TOPIC }, + { "data", KW_DATA }, + { "attributes", KW_ATTRIBUTES }, { NULL } }; From a727ba7abeeccd8fb38c6a69f8d09676c4d5932b Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Sun, 10 Nov 2024 13:50:18 +0100 Subject: [PATCH 08/10] grpc/pubsub: implement worker Signed-off-by: Attila Szakacs --- modules/grpc/pubsub/pubsub-dest-worker.cpp | 184 ++++++++++++++++++++- modules/grpc/pubsub/pubsub-dest-worker.hpp | 23 +++ modules/grpc/pubsub/pubsub-dest.cpp | 2 +- 3 files changed, 207 insertions(+), 2 deletions(-) diff --git a/modules/grpc/pubsub/pubsub-dest-worker.cpp b/modules/grpc/pubsub/pubsub-dest-worker.cpp index b57ffbc70..4781d6a4c 100644 --- a/modules/grpc/pubsub/pubsub-dest-worker.cpp +++ b/modules/grpc/pubsub/pubsub-dest-worker.cpp @@ -24,6 +24,9 @@ #include "pubsub-dest-worker.hpp" #include "pubsub-dest.hpp" +#include "compat/cpp-start.h" +#include "scratch-buffers.h" +#include "compat/cpp-end.h" using syslogng::grpc::pubsub::DestWorker; using syslogng::grpc::pubsub::DestDriver; @@ -31,18 +34,197 @@ using syslogng::grpc::pubsub::DestDriver; DestWorker::DestWorker(GrpcDestWorker *s) : syslogng::grpc::DestWorker(s) { + std::shared_ptr<::grpc::ChannelCredentials> credentials = this->create_credentials(); + if (!credentials) + { + msg_error("Error querying Google Pub/Sub credentials", + evt_tag_str("url", this->owner.get_url().c_str()), + log_pipe_location_tag(&this->super->super.owner->super.super.super)); + throw std::runtime_error("Error querying Google Pub/Sub credentials"); + } + + ::grpc::ChannelArguments args = this->create_channel_args(); + + this->channel = ::grpc::CreateCustomChannel(this->owner.get_url(), credentials, args); + this->stub = ::google::pubsub::v1::Publisher::NewStub(this->channel); +} + +bool +DestWorker::should_initiate_flush() +{ + return this->current_batch_bytes >= this->get_owner()->batch_bytes; +} + +const std::string +DestWorker::format_topic(LogMessage *msg) +{ + ScratchBuffersMarker m; + scratch_buffers_mark(&m); + + GString *project_buf = scratch_buffers_alloc(); + GString *topic_buf = scratch_buffers_alloc(); + + Slice project_slice = this->format_template(this->get_owner()->project, msg, project_buf, NULL, 0); + Slice topic_slice = this->format_template(this->get_owner()->topic, msg, topic_buf, NULL, 0); + + std::string topic = std::string("projects/") + project_slice.str + "/topics/" + topic_slice.str; + + scratch_buffers_reclaim_marked(m); + return topic; +} + +/* TODO: create a C++ Template class, usefor all gRPC drivers. */ +DestWorker::Slice +DestWorker::format_template(LogTemplate *tmpl, LogMessage *msg, GString *value, LogMessageValueType *type, + gint seq_num) const +{ + if (log_template_is_trivial(tmpl)) + { + gssize trivial_value_len; + const gchar *trivial_value = log_template_get_trivial_value_and_type(tmpl, msg, &trivial_value_len, type); + + if (trivial_value_len < 0) + return Slice{"", 0}; + + return Slice{trivial_value, (std::size_t) trivial_value_len}; + } + + LogTemplateEvalOptions options = {&this->owner.get_template_options(), LTZ_SEND, seq_num, NULL, LM_VT_STRING}; + log_template_format_value_and_type(tmpl, msg, &options, value, type); + return Slice{value->str, value->len}; } LogThreadedResult DestWorker::insert(LogMessage *msg) { + DestDriver *owner_ = this->get_owner(); + + ScratchBuffersMarker m; + GString *buf = scratch_buffers_alloc_and_mark(&m); + Slice buf_slice; + size_t message_bytes = 0; + + ::google::pubsub::v1::PubsubMessage *message = this->request.add_messages(); + + buf_slice = this->format_template(owner_->data, msg, buf, NULL, this->super->super.seq_num); + message->set_data(buf_slice.str, buf_slice.len); + message_bytes += buf_slice.len; + + auto attributes = message->mutable_attributes(); + for (const auto &attribute : owner_->attributes) + { + buf_slice = this->format_template(attribute.value, msg, buf, NULL, this->super->super.seq_num); + attributes->insert({attribute.name, buf_slice.str}); + message_bytes += buf_slice.len; + } + + scratch_buffers_reclaim_marked(m); + + this->current_batch_bytes += message_bytes; + log_threaded_dest_driver_insert_msg_length_stats(this->super->super.owner, message_bytes); + + this->batch_size++; + + if (!this->client_context.get()) + { + this->client_context = std::make_unique<::grpc::ClientContext>(); + prepare_context_dynamic(*this->client_context, msg); + this->request.set_topic(this->format_topic(msg)); + } + + msg_trace("Message added to Google Pub/Sub batch", + evt_tag_str("project/topic", this->request.topic().c_str()), + log_pipe_location_tag(&this->super->super.owner->super.super.super)); + + if (this->should_initiate_flush()) + return log_threaded_dest_worker_flush(&this->super->super, LTF_FLUSH_NORMAL); + + return LTR_QUEUED; +} + +static LogThreadedResult +_map_grpc_status_to_log_threaded_result(const ::grpc::Status &status) +{ + // TODO: this is based on OTLP, we should check how the Google Pub/Sub gRPC server behaves + + switch (status.error_code()) + { + case ::grpc::StatusCode::OK: + return LTR_SUCCESS; + case ::grpc::StatusCode::UNAVAILABLE: + case ::grpc::StatusCode::CANCELLED: + case ::grpc::StatusCode::DEADLINE_EXCEEDED: + case ::grpc::StatusCode::ABORTED: + case ::grpc::StatusCode::OUT_OF_RANGE: + case ::grpc::StatusCode::DATA_LOSS: + goto temporary_error; + case ::grpc::StatusCode::UNKNOWN: + case ::grpc::StatusCode::INVALID_ARGUMENT: + case ::grpc::StatusCode::NOT_FOUND: + case ::grpc::StatusCode::ALREADY_EXISTS: + case ::grpc::StatusCode::PERMISSION_DENIED: + case ::grpc::StatusCode::UNAUTHENTICATED: + case ::grpc::StatusCode::FAILED_PRECONDITION: + case ::grpc::StatusCode::UNIMPLEMENTED: + case ::grpc::StatusCode::INTERNAL: + goto permanent_error; + case ::grpc::StatusCode::RESOURCE_EXHAUSTED: + if (status.error_details().length() > 0) + goto temporary_error; + goto permanent_error; + default: + g_assert_not_reached(); + } + +temporary_error: + msg_debug("Google Pub/Sub server responded with a temporary error status code, retrying after time-reopen() seconds", + evt_tag_int("error_code", status.error_code()), + evt_tag_str("error_message", status.error_message().c_str()), + evt_tag_str("error_details", status.error_details().c_str())); return LTR_NOT_CONNECTED; + +permanent_error: + msg_error("Google Pub/Sub server responded with a permanent error status code, dropping batch", + evt_tag_int("error_code", status.error_code()), + evt_tag_str("error_message", status.error_message().c_str()), + evt_tag_str("error_details", status.error_details().c_str())); + return LTR_DROP; +} + +void +DestWorker::prepare_batch() +{ + this->request.clear_topic(); + this->request.clear_messages(); + this->batch_size = 0; + this->current_batch_bytes = 0; + this->client_context.reset(); } LogThreadedResult DestWorker::flush(LogThreadedFlushMode mode) { - return LTR_ERROR; + if (this->batch_size == 0) + return LTR_SUCCESS; + + ::google::pubsub::v1::PublishResponse response; + + ::grpc::Status status = this->stub->Publish(this->client_context.get(), this->request, &response); + LogThreadedResult result = _map_grpc_status_to_log_threaded_result(status); + if (result != LTR_SUCCESS) + goto exit; + + log_threaded_dest_worker_written_bytes_add(&this->super->super, this->current_batch_bytes); + log_threaded_dest_driver_insert_batch_length_stats(this->super->super.owner, this->current_batch_bytes); + + msg_debug("Google Pub/Sub batch delivered", + evt_tag_str("project/topic", this->request.topic().c_str()), + log_pipe_location_tag(&this->super->super.owner->super.super.super)); + +exit: + this->get_owner()->metrics.insert_grpc_request_stats(status); + this->prepare_batch(); + return result; } DestDriver * diff --git a/modules/grpc/pubsub/pubsub-dest-worker.hpp b/modules/grpc/pubsub/pubsub-dest-worker.hpp index 2a46d2a2c..848696ba4 100644 --- a/modules/grpc/pubsub/pubsub-dest-worker.hpp +++ b/modules/grpc/pubsub/pubsub-dest-worker.hpp @@ -27,12 +27,21 @@ #include "pubsub-dest.hpp" #include "grpc-dest-worker.hpp" +#include "google/pubsub/v1/pubsub.grpc.pb.h" + namespace syslogng { namespace grpc { namespace pubsub { class DestWorker final : public syslogng::grpc::DestWorker { +private: + struct Slice + { + const char *str; + std::size_t len; + }; + public: DestWorker(GrpcDestWorker *s); @@ -40,7 +49,21 @@ class DestWorker final : public syslogng::grpc::DestWorker LogThreadedResult flush(LogThreadedFlushMode mode); private: + bool should_initiate_flush(); + void prepare_batch(); + const std::string format_topic(LogMessage *msg); + DestWorker::Slice format_template(LogTemplate *tmpl, LogMessage *msg, GString *value, LogMessageValueType *type, + gint seq_num) const; DestDriver *get_owner(); + +private: + std::shared_ptr<::grpc::Channel> channel; + std::unique_ptr<::google::pubsub::v1::Publisher::Stub> stub; + std::unique_ptr<::grpc::ClientContext> client_context; + + ::google::pubsub::v1::PublishRequest request; + size_t batch_size = 0; + size_t current_batch_bytes = 0; }; } diff --git a/modules/grpc/pubsub/pubsub-dest.cpp b/modules/grpc/pubsub/pubsub-dest.cpp index 1d170f976..126d47d33 100644 --- a/modules/grpc/pubsub/pubsub-dest.cpp +++ b/modules/grpc/pubsub/pubsub-dest.cpp @@ -38,7 +38,7 @@ DestDriver::DestDriver(GrpcDestDriver *s) { this->url = "pubsub.googleapis.com:443"; this->credentials_builder.set_mode(GCAM_ADC); - this->flush_on_key_change = true; + this->enable_dynamic_headers(); GlobalConfig *cfg = log_pipe_get_config(&s->super.super.super.super); LogTemplate *default_data_template = log_template_new(cfg, NULL); From fd3628bc74a298ed76cba807945ca3c6f6be1915 Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Sun, 10 Nov 2024 14:43:29 +0100 Subject: [PATCH 09/10] grpc/pubsub: set 10 MB batch limit Signed-off-by: Attila Szakacs --- modules/grpc/pubsub/pubsub-dest.cpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/modules/grpc/pubsub/pubsub-dest.cpp b/modules/grpc/pubsub/pubsub-dest.cpp index 126d47d33..5399bfa03 100644 --- a/modules/grpc/pubsub/pubsub-dest.cpp +++ b/modules/grpc/pubsub/pubsub-dest.cpp @@ -33,6 +33,8 @@ using syslogng::grpc::pubsub::DestDriver; +const size_t MAX_BATCH_BYTES = 10 * 1000 * 1000; + DestDriver::DestDriver(GrpcDestDriver *s) : syslogng::grpc::DestDriver(s) { @@ -40,6 +42,9 @@ DestDriver::DestDriver(GrpcDestDriver *s) this->credentials_builder.set_mode(GCAM_ADC); this->enable_dynamic_headers(); + /* https://cloud.google.com/pubsub/quotas#resource_limits */ + this->batch_bytes = MAX_BATCH_BYTES; + GlobalConfig *cfg = log_pipe_get_config(&s->super.super.super.super); LogTemplate *default_data_template = log_template_new(cfg, NULL); g_assert(log_template_compile(default_data_template, "$MESSAGE", NULL)); @@ -57,6 +62,14 @@ DestDriver::~DestDriver() bool DestDriver::init() { + if (this->batch_bytes > MAX_BATCH_BYTES) + { + msg_error("Error initializing Google Pub/Sub destination, batch-bytes() cannot be larger than 10 MB. " + "For more info see https://cloud.google.com/pubsub/quotas#resource_limits", + log_pipe_location_tag(&this->super->super.super.super.super)); + return false; + } + if ((!this->project || strlen(this->project->template_str) == 0) || (!this->topic || strlen(this->topic->template_str) == 0)) { From 2c49846a8a103d5ba0e48b0a6ae0c6d359f79b1b Mon Sep 17 00:00:00 2001 From: Attila Szakacs Date: Sun, 10 Nov 2024 14:50:44 +0100 Subject: [PATCH 10/10] news: add entry for #373 Signed-off-by: Attila Szakacs --- news/feature-373.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 news/feature-373.md diff --git a/news/feature-373.md b/news/feature-373.md new file mode 100644 index 000000000..31c37e677 --- /dev/null +++ b/news/feature-373.md @@ -0,0 +1,22 @@ +`google-pubsub-grpc()`: Added a new destination that sends logs to Google Pub/Sub via the gRPC interface. + +Example config: +``` +google-pubsub-grpc( + project("my_project") + topic($topic) + + data($MESSAGE) + attributes( + timestamp => $S_ISODATE, + host => $HOST, + ) + + workers(4) + batch-timeout(1000) # ms + batch-lines(1000) +); +``` + +The `project()` and `topic()` options are templatable. +The default service endpoint can be changed with the `service_endpoint()` option.