diff --git a/.gitignore b/.gitignore
index 7318728cc..9d1d59e5d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,4 +2,5 @@
\#*
*.prof
tmp-build
-.DS_Store
\ No newline at end of file
+.DS_Store
+.idea
\ No newline at end of file
diff --git a/.semaphore/project.yml b/.semaphore/project.yml
new file mode 100644
index 000000000..72ede6a3a
--- /dev/null
+++ b/.semaphore/project.yml
@@ -0,0 +1,23 @@
+apiVersion: v1alpha
+kind: Project
+metadata:
+ name: confluent-kafka-go
+ description: ""
+spec:
+ visibility: private
+ repository:
+ url: git@github.com:confluentinc/confluent-kafka-go.git
+ run_on:
+ - branches
+ - tags
+ - pull_requests
+ pipeline_file: .semaphore/semaphore.yml
+ integration_type: github_app
+ status:
+ pipeline_files:
+ - path: .semaphore/semaphore.yml
+ level: pipeline
+ whitelist:
+ branches:
+ - master
+ - "/^v\\d+\\.\\d+\\.x$/"
\ No newline at end of file
diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml
new file mode 100644
index 000000000..6648f62e9
--- /dev/null
+++ b/.semaphore/semaphore.yml
@@ -0,0 +1,81 @@
+version: v1.0
+name: build-test-release
+agent:
+ machine:
+ type: e1-standard-4
+ os_image: ubuntu1804
+
+auto_cancel:
+ running:
+ when: "branch != 'master'"
+
+execution_time_limit:
+ hours: 1
+
+global_job_config:
+ secrets:
+ - name: vault_sem2_approle
+ prologue:
+ commands:
+ - chmod 400 ~/.ssh/id_rsa
+ - sem-version go 1.18
+ - export "GOPATH=$(go env GOPATH)"
+ - export "SEMAPHORE_GIT_DIR=${GOPATH}/src/github.com/confluentinc/${SEMAPHORE_PROJECT_NAME}"
+ - export "PATH=${GOPATH}/bin:${PATH}"
+ - mkdir -vp "${SEMAPHORE_GIT_DIR}" "${GOPATH}/bin"
+ - git config --global url."git@github.com:".insteadOf "https://github.com/"
+ - sem-service start postgres 13
+ # This username/password is just to create test db to run tests. DB will be destroyed when build finishes.
+ - psql -U postgres -h localhost -c "CREATE USER runner WITH PASSWORD 'semaphoredb';"
+ - psql -U postgres -h localhost -c "ALTER USER runner WITH SUPERUSER;"
+ - export ADMIN_DB_URL=postgres://runner:semaphoredb@localhost:5432
+ # Let the app set the default but don't let cc-go.mk sets the default
+ - export DB_URL=""
+ - sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
+ - wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
+ - sudo apt-get update
+ - sudo apt-get -y install postgresql-client-13
+ - checkout
+ - make install-vault
+ - . mk-include/bin/vault-setup
+ - . vault-sem-get-secret gitconfig
+ - . vault-sem-get-secret ssh_id_rsa
+ - . vault-sem-get-secret ssh_config
+ - . vault-sem-get-secret netrc
+ - . vault-sem-get-secret artifactory-docker-helm
+ - . vault-sem-get-secret maven-settings
+ - . vault-sem-get-secret cpd_gcloud
+ - . vault-sem-get-secret aws_credentials
+ - . vault-sem-get-secret testbreak-reporting
+ - . vault-sem-get-secret v1/ci/kv/service-foundations/cc-mk-include
+ - exec &> >(tee -a build.log)
+ - make init-ci
+ epilogue:
+ always:
+ commands:
+ - make epilogue-ci
+
+blocks:
+ - name: "Build, Test, Release"
+ run:
+ # don't run the build or unit tests on non-functional changes...
+ when: "change_in('/', {exclude: ['/.deployed-versions/', '.github/']})"
+ task:
+ # You can customize your CI job here
+# env_vars:
+# # custom env_vars
+# prologue:
+# commands:
+# # custom vault secrets
+# # custom prologue commands
+ jobs:
+ - name: "Build, Test, Release"
+ commands:
+ - make build
+ - make test
+ - make release-ci
+ epilogue:
+ always:
+ commands:
+ - make epilogue-ci
+ - make testbreak-after
diff --git a/.travis.yml b/.travis.yml
index eb94cd239..b552c2924 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -12,23 +12,14 @@ jobs:
os: osx
env:
- EXPECT_LINK_INFO="static"
- - name: "Go 1.14 OSX bundled librdkafka"
- go: "1.14"
- os: osx
- env:
- - EXPECT_LINK_INFO="static"
- name: "Go 1.16 Linux bundled librdkafka"
go: "1.16"
os: linux
env:
- EXPECT_LINK_INFO="static"
- - name: "Go 1.14 Linux bundled librdkafka"
- go: "1.14"
- os: linux
- env:
- - EXPECT_LINK_INFO="static"
- - name: "Go 1.14 OSX dynamic librdkafka"
- go: "1.14"
+ - name: "Go 1.16 OSX dynamic librdkafka"
+ if: tag is present
+ go: "1.16"
os: osx
env:
- EXPECT_LINK_INFO="dynamic"
@@ -37,8 +28,9 @@ jobs:
- LD_LIBRARY_PATH="$HOME/gopath/src/github.com/confluentinc/confluent-kafka-go/tmp-build/lib"
- DYLD_LIBRARY_PATH="$HOME/gopath/src/github.com/confluentinc/confluent-kafka-go/tmp-build/lib"
- LIBRDKAFKA_VERSION=master
- - name: "Go 1.14 Linux dynamic librdkafka"
- go: "1.14"
+ - name: "Go 1.16 Linux dynamic librdkafka"
+ if: tag is present
+ go: "1.16"
os: linux
env:
- EXPECT_LINK_INFO="dynamic"
@@ -52,11 +44,6 @@ jobs:
os: windows
env:
- EXPECT_LINK_INFO="static"
- - name: "Go 1.14 Windows bundled librdkafka"
- go: "1.14"
- os: windows
- env:
- - EXPECT_LINK_INFO="static"
before_install:
- if [[ $TRAVIS_OS_NAME == linux ]]; then wget -qO - https://packages.confluent.io/deb/5.4/archive.key | sudo apt-key add - ; fi
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 97aa2ae0b..2ba9a5e13 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,11 +1,47 @@
# Confluent's Golang client for Apache Kafka
-## v1.7.0
+## v1.9.0
-confluent-kafka-go is based on librdkafka v1.7.0, see the
-[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.7.0)
+This is a feature release:
+
+ * OAUTHBEARER OIDC support
+ * KIP-140 Admin API ACL support
+ * Added MockCluster for functional testing of applications without the need
+ for a real Kafka cluster (by @SourceFellows and @kkoehler, #729).
+ See [examples/mock_cluster](examples/mock_cluster).
+
+
+### Fixes
+
+ * Fix Rebalance events behavior for static membership (@jliunyu, #757,
+ #798).
+ * Fix consumer close taking 10 seconds when there's no rebalance
+ needed (@jliunyu, #757).
+
+confluent-kafka-go is based on librdkafka v1.9.0, see the
+[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.9.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.
+
+## v1.8.2
+
+This is a maintenance release:
+
+ * Bundles librdkafka v1.8.2
+ * Check termination channel while reading delivery reports (by @zjj)
+ * Added convenience method Consumer.StoreMessage() (@finncolman, #676)
+
+
+confluent-kafka-go is based on librdkafka v1.8.2, see the
+[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.8.2)
+for a complete list of changes, enhancements, fixes and upgrade considerations.
+
+
+**Note**: There were no confluent-kafka-go v1.8.0 and v1.8.1 releases.
+
+
+## v1.7.0
+
### Enhancements
* Experimental Windows support (by @neptoess).
@@ -22,6 +58,10 @@ for a complete list of changes, enhancements, fixes and upgrade considerations.
ReplicationFactor without specifying an explicit ReplicaAssignment, this is
now fixed.
+confluent-kafka-go is based on librdkafka v1.7.0, see the
+[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.7.0)
+for a complete list of changes, enhancements, fixes and upgrade considerations.
+
## v1.6.1
diff --git a/README.md b/README.md
index 86490883e..a5ab9896f 100644
--- a/README.md
+++ b/README.md
@@ -30,6 +30,9 @@ for the balanced consumer groups of Apache Kafka 0.9 and above.
See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-go/index.html) for more information.
+For a step-by-step guide on using the client see [Getting Started with Apache Kafka and Golang](https://developer.confluent.io/get-started/go/).
+
+
Examples
========
@@ -123,7 +126,7 @@ for use with [Confluent Cloud](https://www.confluent.io/confluent-cloud/).
Getting Started
===============
-Supports Go 1.11+ and librdkafka 1.6.0+.
+Supports Go 1.11+ and librdkafka 1.9.0+.
Using Go Modules
----------------
@@ -155,17 +158,14 @@ your `go.mod` file.
Install the client
------------------
-If Go modules can't be used we recommend that you version pin the
-confluent-kafka-go import to `v1` using gopkg.in:
-
Manual install:
```bash
-go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
+go get -u github.com/confluentinc/confluent-kafka-go/kafka
```
Golang import:
```golang
-import "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
+import "github.com/confluentinc/confluent-kafka-go/kafka"
```
librdkafka
@@ -218,87 +218,34 @@ with `-tags dynamic`.
**Note:** If you use the `master` branch of the Go client, then you need to use
the `master` branch of librdkafka.
-**confluent-kafka-go requires librdkafka v1.6.0 or later.**
+**confluent-kafka-go requires librdkafka v1.9.0 or later.**
API Strands
===========
-There are two main API strands: function and channel-based.
+The recommended API strand is the Function-Based one,
+the Channel-Based one is documented in [examples/legacy](examples/legacy).
Function-Based Consumer
-----------------------
Messages, errors and events are polled through the `consumer.Poll()` function.
-Pros:
-
- * More direct mapping to underlying librdkafka functionality.
-
-Cons:
-
- * Makes it harder to read from multiple channels, but a go-routine easily
- solves that (see Cons in channel-based consumer below about outdated events).
- * Slower than the channel consumer.
+It has direct mapping to underlying librdkafka functionality.
See [examples/consumer_example](examples/consumer_example)
-Channel-Based Consumer (deprecated)
------------------------------------
-
-*Deprecated*: The channel-based consumer is deprecated due to the channel issues
- mentioned below. Use the function-based consumer.
-
-Messages, errors and events are posted on the `consumer.Events()` channel
-for the application to read.
-
-Pros:
-
- * Possibly more Golang:ish
- * Makes reading from multiple channels easy
- * Fast
-
-Cons:
-
- * Outdated events and messages may be consumed due to the buffering nature
- of channels. The extent is limited, but not remedied, by the Events channel
- buffer size (`go.events.channel.size`).
-
-See [examples/consumer_channel_example](examples/consumer_channel_example)
-
-Channel-Based Producer
-----------------------
-
-Application writes messages to the `producer.ProducerChannel()`.
-Delivery reports are emitted on the `producer.Events()` or specified private channel.
-
-Pros:
-
- * Go:ish
- * Proper channel backpressure if librdkafka internal queue is full.
-
-Cons:
-
- * Double queueing: messages are first queued in the channel (size is configurable)
- and then inside librdkafka.
-
-See [examples/producer_channel_example](examples/producer_channel_example)
-
Function-Based Producer
-----------------------
Application calls `producer.Produce()` to produce messages.
Delivery reports are emitted on the `producer.Events()` or specified private channel.
-Pros:
-
- * Go:ish
-
-Cons:
+_Warnings_
* `Produce()` is a non-blocking call, if the internal librdkafka queue is full
- the call will fail.
- * Somewhat slower than the channel producer.
+ the call will fail and can be retried.
See [examples/producer_example](examples/producer_example)
@@ -319,3 +266,8 @@ See [kafka/README](kafka/README.md)
Contributions to the code, examples, documentation, et.al, are very much appreciated.
Make your changes, run `gofmt`, tests, etc, push your branch, create a PR, and [sign the CLA](http://clabot.confluent.io/cla).
+
+Confluent Cloud
+===============
+
+For a step-by-step guide on using the Golang client with Confluent Cloud see [Getting Started with Apache Kafka and Golang](https://developer.confluent.io/get-started/go/) on [Confluent Developer](https://developer.confluent.io/).
diff --git a/examples/.gitignore b/examples/.gitignore
index 408c6ca7b..53c9c6450 100644
--- a/examples/.gitignore
+++ b/examples/.gitignore
@@ -1,10 +1,21 @@
-consumer_channel_example/consumer_channel_example
+admin_create_acls/admin_create_acls
+admin_create_topic/admin_create_topic
+admin_delete_acls/admin_delete_acls
+admin_delete_topics/admin_delete_topics
+admin_describe_acls/admin_describe_acls
+admin_describe_config/admin_describe_config
+confluent_cloud_example/confluent_cloud_example
consumer_example/consumer_example
consumer_offset_metadata/consumer_offset_metadata
-producer_channel_example/producer_channel_example
-producer_example/producer_example
+cooperative_consumer_example/cooperative_consumer_example
go-kafkacat/go-kafkacat
-admin_describe_config/admin_describe_config
-admin_delete_topics/admin_delete_topics
-admin_create_topic/admin_create_topic
+idempotent_producer_example/idempotent_producer_example
+legacy/consumer_channel_example/consumer_channel_example
+legacy/producer_channel_example/producer_channel_example
+library-version/library-version
+mockcluster_example/mockcluster_example
+oauthbearer_example/oauthbearer_example
+producer_custom_channel_example/producer_custom_channel_example
+producer_example/producer_example
stats_example/stats_example
+transactions_example/transactions_example
diff --git a/examples/README.md b/examples/README.md
index 98776bc34..c0ad634e5 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -1,24 +1,51 @@
-Examples:
+Examples
+--------
- consumer_channel_example - Channel based consumer
- consumer_example - Function & callback based consumer
- consumer_offset_metadata - Commit offset with metadata
+ [admin_create_acls](admin_create_acls) - Create Access Control Lists
+
+ [admin_create_topic](admin_create_topic) - Create a topic
- producer_channel_example - Channel based producer
- producer_example - Function based producer
+ [admin_delete_acls](admin_delete_acls) - Delete Access Control Lists using different filters
+
+ [admin_delete_topics](admin_delete_topics) - Delete some topics
+
+ [admin_describe_acls](admin_describe_acls) - Find Access Control Lists using a filter
+
+ [admin_describe_config](admin_describe_config) - Describe broker, topic or group configs
+
+ [consumer_example](consumer_example) - Function & callback based consumer
+
+ [consumer_offset_metadata](consumer_offset_metadata) - Commit offset with metadata
+
+ [cooperative_consumer_example](cooperative_consumer_example) - Using the cooperative incremental rebalancing protocol
- transactions_example - Showcasing a transactional consume-process-produce application
+ [confluent_cloud_example](confluent_cloud_example) - Usage example with Confluent Cloud
- go-kafkacat - Channel based kafkacat Go clone
+ [go-kafkacat](go-kafkacat) - Channel based kafkacat Go clone
- oauthbearer_example - Provides unsecured SASL/OAUTHBEARER example
+ [idempotent_producer_example](idempotent_producer_example) - Idempotent producer
+
+ [legacy](legacy) - Legacy examples
+
+ [library-version](library-version) - Show the library version
+ [mockcluster_example](mockcluster_example) - Use a mock cluster for testing
-Usage example:
+ [oauthbearer_example](oauthbearer_example) - Provides unsecured SASL/OAUTHBEARER example
+
+ [producer_custom_channel_example](producer_custom_channel_example) - Function based producer with a custom delivery channel
+
+ [producer_example](producer_example) - Function based producer
+
+ [stats_example](stats_example) - Receiving stats events
+
+ [transactions_example](transactions_example) - Showcasing a transactional consume-process-produce application
+
+Usage example
+-------------
$ cd consumer_example
$ go build (or 'go install')
$ ./consumer_example # see usage
$ ./consumer_example mybroker mygroup mytopic
-
diff --git a/examples/admin_create_acls/admin_create_acls.go b/examples/admin_create_acls/admin_create_acls.go
new file mode 100644
index 000000000..71b8bbf82
--- /dev/null
+++ b/examples/admin_create_acls/admin_create_acls.go
@@ -0,0 +1,147 @@
+// Create ACLs
+package main
+
+/**
+ * Copyright 2022 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+)
+
+// Parses a list of 7n arguments to a slice of n ACLBinding
+func parseACLBindings(args []string) (aclBindings kafka.ACLBindings, err error) {
+ nACLBindings := len(args) / 7
+ parsedACLBindings := make(kafka.ACLBindings, nACLBindings)
+
+ for i := 0; i < nACLBindings; i++ {
+ start := i * 7
+ resourceTypeString := args[start]
+ name := args[start+1]
+ resourcePatternTypeString := args[start+2]
+ principal := args[start+3]
+ host := args[start+4]
+ operationString := args[start+5]
+ permissionTypeString := args[start+6]
+
+ resourceType, errParse := kafka.ResourceTypeFromString(resourceTypeString)
+ if errParse != nil {
+ err = errParse
+ fmt.Printf("Invalid resource type: %s: %v\n", resourceTypeString, err)
+ return
+ }
+
+ resourcePatternType, errParse := kafka.ResourcePatternTypeFromString(resourcePatternTypeString)
+ if errParse != nil {
+ err = errParse
+ fmt.Printf("Invalid resource pattern type: %s: %v\n", resourcePatternTypeString, err)
+ return
+ }
+
+ operation, errParse := kafka.ACLOperationFromString(operationString)
+ if errParse != nil {
+ err = errParse
+ fmt.Printf("Invalid operation: %s: %v\n", operationString, err)
+ return
+ }
+
+ permissionType, errParse := kafka.ACLPermissionTypeFromString(permissionTypeString)
+ if errParse != nil {
+ err = errParse
+ fmt.Printf("Invalid permission type: %s: %v\n", permissionTypeString, err)
+ return
+ }
+
+ parsedACLBindings[i] = kafka.ACLBinding{
+ Type: resourceType,
+ Name: name,
+ ResourcePatternType: resourcePatternType,
+ Principal: principal,
+ Host: host,
+ Operation: operation,
+ PermissionType: permissionType,
+ }
+ }
+ aclBindings = parsedACLBindings
+ return
+}
+
+func main() {
+
+ // 2 + 7n arguments to create n ACL bindings
+ nArgs := len(os.Args)
+ aclBindingArgs := nArgs - 2
+ if aclBindingArgs <= 0 || aclBindingArgs%7 != 0 {
+ fmt.Fprintf(os.Stderr,
+ "Usage: %s
-
+
00version.go
-
+
adminapi.go
-
+
adminoptions.go
-
- build_glibc_linux.go
+
+ build_darwin.go
-
+
config.go
-
+
consumer.go
-
+
context.go
-
+
error.go
-
+
error_gen.go
-
+
event.go
-
+
generated_errors.go
-
+
handle.go
-
+
header.go
-
+
kafka.go
-
+
log.go
-
+
message.go
-
+
metadata.go
-
+
misc.go
-
+
+ mockcluster.go
+
+
offset.go
-
+
producer.go
-
+
testhelpers.go
-
+
time.go
@@ -1282,37 +1435,87 @@
LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client
OffsetBeginning represents the earliest offset (logical)
OffsetEnd represents the latest offset (logical)
OffsetInvalid represents an invalid/unspecified offset
OffsetStored represents a stored offset
PartitionAny represents any partition (for partitioning),
or unspecified value (for all other cases)
LibraryVersion returns the underlying librdkafka library version as a
(version_int, version_str) tuple.
WriteErrorCodes writes Go error code constants to file from the
librdkafka error codes.
This function is not intended for public use.
+ ACLBinding specifies the operation and permission type for a specific principal
+over one or more resources of the same type. Used by `AdminClient.CreateACLs`,
+returned by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`.
+
+ ACLBindingFilter specifies a filter used to return a list of ACL bindings matching some or all of its attributes.
+Used by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`.
+
+ ACLBindingFilters is a slice of ACLBindingFilter that also implements
+the sort interface
+
+ ACLBindings is a slice of ACLBinding that also implements
+the sort interface
+
+ ACLOperation enumerates the different types of ACL operation.
+
+ ACLOperationFromString translates a ACL operation name to
+a ACLOperation value.
+
+ String returns the human-readable representation of an ACLOperation
+
+ ACLPermissionType enumerates the different types of ACL permission types.
+
+ ACLPermissionTypeFromString translates a ACL permission type name to
+a ACLPermissionType value.
+
+ String returns the human-readable representation of an ACLPermissionType
+
NewAdminClient creats a new AdminClient instance with a new underlying client instance
NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance.
The AdminClient will use the same configuration and connections as the parent instance.
NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance.
The AdminClient will use the same configuration and connections as the parent instance.
AlterConfigs alters/updates cluster resource configuration.
func WriteErrorCodes(f *os.File)
+
func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error)
func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error)
func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error
func NewTestConsumerGroupMetadata(groupID string) (*ConsumerGroupMetadata, error)
type CreateTopicsAdminOption
type Metadata
type RebalanceCb
const (
// ResourceUnknown - Unknown
- ResourceUnknown = ResourceType(C.RD_KAFKA_RESOURCE_UNKNOWN)
+ ResourceUnknown = ResourceType(C.RD_KAFKA_RESOURCE_UNKNOWN)
// ResourceAny - match any resource type (DescribeConfigs)
- ResourceAny = ResourceType(C.RD_KAFKA_RESOURCE_ANY)
+ ResourceAny = ResourceType(C.RD_KAFKA_RESOURCE_ANY)
// ResourceTopic - Topic
- ResourceTopic = ResourceType(C.RD_KAFKA_RESOURCE_TOPIC)
+ ResourceTopic = ResourceType(C.RD_KAFKA_RESOURCE_TOPIC)
// ResourceGroup - Group
- ResourceGroup = ResourceType(C.RD_KAFKA_RESOURCE_GROUP)
+ ResourceGroup = ResourceType(C.RD_KAFKA_RESOURCE_GROUP)
// ResourceBroker - Broker
- ResourceBroker = ResourceType(C.RD_KAFKA_RESOURCE_BROKER)
+ ResourceBroker = ResourceType(C.RD_KAFKA_RESOURCE_BROKER)
)
const (
// ConfigSourceUnknown is the default value
- ConfigSourceUnknown = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG)
+ ConfigSourceUnknown = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG)
// ConfigSourceDynamicTopic is dynamic topic config that is configured for a specific topic
- ConfigSourceDynamicTopic = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG)
+ ConfigSourceDynamicTopic = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG)
// ConfigSourceDynamicBroker is dynamic broker config that is configured for a specific broker
- ConfigSourceDynamicBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG)
+ ConfigSourceDynamicBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG)
// ConfigSourceDynamicDefaultBroker is dynamic broker config that is configured as default for all brokers in the cluster
- ConfigSourceDynamicDefaultBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG)
+ ConfigSourceDynamicDefaultBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG)
// ConfigSourceStaticBroker is static broker config provided as broker properties at startup (e.g. from server.properties file)
- ConfigSourceStaticBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG)
+ ConfigSourceStaticBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG)
// ConfigSourceDefault is built-in default configuration for configs that have a default value
- ConfigSourceDefault = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG)
+ ConfigSourceDefault = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG)
+)
+ const (
+ // ResourcePatternTypeUnknown is a resource pattern type not known or not set.
+ ResourcePatternTypeUnknown = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_UNKNOWN)
+ // ResourcePatternTypeAny matches any resource, used for lookups.
+ ResourcePatternTypeAny = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_ANY)
+ // ResourcePatternTypeMatch will perform pattern matching
+ ResourcePatternTypeMatch = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_MATCH)
+ // ResourcePatternTypeLiteral matches a literal resource name
+ ResourcePatternTypeLiteral = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_LITERAL)
+ // ResourcePatternTypePrefixed matches a prefixed resource name
+ ResourcePatternTypePrefixed = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_PREFIXED)
+)
+ const (
+ // ACLOperationUnknown represents an unknown or unset operation
+ ACLOperationUnknown = ACLOperation(C.RD_KAFKA_ACL_OPERATION_UNKNOWN)
+ // ACLOperationAny in a filter, matches any ACLOperation
+ ACLOperationAny = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ANY)
+ // ACLOperationAll represents all the operations
+ ACLOperationAll = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALL)
+ // ACLOperationRead a read operation
+ ACLOperationRead = ACLOperation(C.RD_KAFKA_ACL_OPERATION_READ)
+ // ACLOperationWrite represents a write operation
+ ACLOperationWrite = ACLOperation(C.RD_KAFKA_ACL_OPERATION_WRITE)
+ // ACLOperationCreate represents a create operation
+ ACLOperationCreate = ACLOperation(C.RD_KAFKA_ACL_OPERATION_CREATE)
+ // ACLOperationDelete represents a delete operation
+ ACLOperationDelete = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DELETE)
+ // ACLOperationAlter represents an alter operation
+ ACLOperationAlter = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALTER)
+ // ACLOperationDescribe represents a describe operation
+ ACLOperationDescribe = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DESCRIBE)
+ // ACLOperationClusterAction represents a cluster action operation
+ ACLOperationClusterAction = ACLOperation(C.RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION)
+ // ACLOperationDescribeConfigs represents a describe configs operation
+ ACLOperationDescribeConfigs = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS)
+ // ACLOperationAlterConfigs represents an alter configs operation
+ ACLOperationAlterConfigs = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS)
+ // ACLOperationIdempotentWrite represents an idempotent write operation
+ ACLOperationIdempotentWrite = ACLOperation(C.RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE)
+)
+ const (
+ // ACLPermissionTypeUnknown represents an unknown ACLPermissionType
+ ACLPermissionTypeUnknown = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN)
+ // ACLPermissionTypeAny in a filter, matches any ACLPermissionType
+ ACLPermissionTypeAny = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_ANY)
+ // ACLPermissionTypeDeny disallows access
+ ACLPermissionTypeDeny = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_DENY)
+ // ACLPermissionTypeAllow grants access
+ ACLPermissionTypeAllow = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW)
)
const (
// TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support
- TimestampNotAvailable = TimestampType(C.RD_KAFKA_TIMESTAMP_NOT_AVAILABLE)
+ TimestampNotAvailable = TimestampType(C.RD_KAFKA_TIMESTAMP_NOT_AVAILABLE)
// TimestampCreateTime indicates timestamp set by producer (source time)
- TimestampCreateTime = TimestampType(C.RD_KAFKA_TIMESTAMP_CREATE_TIME)
+ TimestampCreateTime = TimestampType(C.RD_KAFKA_TIMESTAMP_CREATE_TIME)
// TimestampLogAppendTime indicates timestamp set set by broker (store time)
- TimestampLogAppendTime = TimestampType(C.RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
+ TimestampLogAppendTime = TimestampType(C.RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
)
const (
// PurgeInFlight purges messages in-flight to or from the broker.
@@ -1320,75 +1523,253 @@
// broker, making it impossible for the application to know if these
// messages were successfully delivered or not.
// Retrying these messages may lead to duplicates.
- PurgeInFlight = int(C.RD_KAFKA_PURGE_F_INFLIGHT)
+ PurgeInFlight = int(C.RD_KAFKA_PURGE_F_INFLIGHT)
// PurgeQueue Purge messages in internal queues.
- PurgeQueue = int(C.RD_KAFKA_PURGE_F_QUEUE)
+ PurgeQueue = int(C.RD_KAFKA_PURGE_F_QUEUE)
// PurgeNonBlocking Don't wait for background thread queue purging to finish.
- PurgeNonBlocking = int(C.RD_KAFKA_PURGE_F_NON_BLOCKING)
+ PurgeNonBlocking = int(C.RD_KAFKA_PURGE_F_NON_BLOCKING)
)
const (
// AlterOperationSet sets/overwrites the configuration setting.
- AlterOperationSet = iota
+ AlterOperationSet = iota
)
const LibrdkafkaLinkInfo = "static glibc_linux from librdkafka-static-bundle-v1.7.0.tgz"
+ const LibrdkafkaLinkInfo = "static darwin from librdkafka-static-bundle-v1.9.0.tgz"
const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)
+ const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)
const OffsetEnd = Offset(C.RD_KAFKA_OFFSET_END)
+ const OffsetEnd = Offset(C.RD_KAFKA_OFFSET_END)
const OffsetInvalid = Offset(C.RD_KAFKA_OFFSET_INVALID)
+ const OffsetInvalid = Offset(C.RD_KAFKA_OFFSET_INVALID)
const OffsetStored = Offset(C.RD_KAFKA_OFFSET_STORED)
+ const OffsetStored = Offset(C.RD_KAFKA_OFFSET_STORED)
const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)
+ const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)
func
-
+
LibraryVersion
¶
- func LibraryVersion() (int, string)
+ func LibraryVersion() (int, string)
func
-
+
WriteErrorCodes
¶
- func WriteErrorCodes(f *os.File)
+ func WriteErrorCodes(f *os.File)
+ type
+
+ ACLBinding
+
+
+ ¶
+
+
+ type ACLBinding struct {
+ Type ResourceType // The resource type.
+ // The resource name, which depends on the resource type.
+ // For ResourceBroker the resource name is the broker id.
+ Name string
+ ResourcePatternType ResourcePatternType // The resource pattern, relative to the name.
+ Principal string // The principal this ACLBinding refers to.
+ Host string // The host that the call is allowed to come from.
+ Operation ACLOperation // The operation/s specified by this binding.
+ PermissionType ACLPermissionType // The permission type for the specified operation.
+}
+
+
+ type
+
+ ACLBindingFilter
+
+
+ ¶
+
+
+ type ACLBindingFilter = ACLBinding
+
+ type
+
+ ACLBindingFilters
+
+
+ ¶
+
+
+ type ACLBindingFilters []ACLBindingFilter
+
+ type
+
+ ACLBindings
+
+
+ ¶
+
+
+ type ACLBindings []ACLBinding
+
+ func (ACLBindings)
+
+ Len
+
+
+ ¶
+
+
+ func (a ACLBindings) Len() int
+
+ func (ACLBindings)
+
+ Less
+
+
+ ¶
+
+
+ func (a ACLBindings) Less(i, j int) bool
+
+ func (ACLBindings)
+
+ Swap
+
+
+ ¶
+
+
+ func (a ACLBindings) Swap(i, j int)
+
+ type
+
+ ACLOperation
+
+
+ ¶
+
+
+ type ACLOperation int
+
+ func
+
+ ACLOperationFromString
+
+
+ ¶
+
+
+ func ACLOperationFromString(aclOperationString string) (ACLOperation, error)
+
+ func (ACLOperation)
+
+ String
+
+
+ ¶
+
+
+ func (o ACLOperation) String() string
+
+ type
+
+ ACLPermissionType
+
+
+ ¶
+
+
+ type ACLPermissionType int
+
+ func
+
+ ACLPermissionTypeFromString
+
+
+ ¶
+
+
+ func ACLPermissionTypeFromString(aclPermissionTypeString string) (ACLPermissionType, error)
+
+ func (ACLPermissionType)
+
+ String
+
+
+ ¶
+
+
+ func (o ACLPermissionType) String() string
+
type
-
+
AdminClient
@@ -1404,55 +1785,55 @@
func
-
+
NewAdminClient
¶
- func NewAdminClient(conf *ConfigMap) (*AdminClient, error)
+ func NewAdminClient(conf *ConfigMap) (*AdminClient, error)
func
-
+
NewAdminClientFromConsumer
¶
- func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error)
+ func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error)
func
-
+
NewAdminClientFromProducer
¶
- func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error)
+ func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error)
func (*AdminClient)
-
+
AlterConfigs
¶
- func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error)
+ func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error)
func (a *AdminClient) ClusterID(ctx context.Context) (clusterID string, err error)+
func (a *AdminClient) ClusterID(ctx context.Context) (clusterID string, err error)
ClusterID returns the cluster ID as reported in broker metadata.
@@ -1512,14 +1893,14 @@func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error)+
func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error)
ControllerID returns the broker ID of the current controller as reported in broker metadata. @@ -1532,29 +1913,53 @@
Requires broker version >= 0.10.0.
+func (a *AdminClient) CreateACLs(ctx context.Context, aclBindings ACLBindings, options ...CreateACLsAdminOption) (result []CreateACLResult, err error)+
+ CreateACLs creates one or more ACL bindings. +
++ Parameters: +
+* `ctx` - context with the maximum amount of time to block, or nil for indefinite. +* `aclBindings` - A slice of ACL binding specifications to create. +* `options` - Create ACLs options ++
+ Returns a slice of CreateACLResult with a ErrNoError ErrorCode when the operation was successful +plus an error that is not nil for client level errors +
func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error)+
func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error)
CreatePartitions creates additional partitions for topics.
func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error)+
func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error)
CreateTopics creates topics in cluster.
@@ -1568,16 +1973,45 @@Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
+func (a *AdminClient) DeleteACLs(ctx context.Context, aclBindingFilters ACLBindingFilters, options ...DeleteACLsAdminOption) (result []DeleteACLsResult, err error)+
+ DeleteACLs deletes ACL bindings matching one or more ACL binding filters. +
++ Parameters: +
+* `ctx` - context with the maximum amount of time to block, or nil for indefinite. +* `aclBindingFilters` - a slice of ACL binding filters to match ACLs to delete. + string attributes match exact values or any string if set to empty string. + Enum attributes match exact values or any value if ending with `Any`. + If `ResourcePatternType` is set to `ResourcePatternTypeMatch` returns all + the ACL bindings with `ResourcePatternTypeLiteral`, `ResourcePatternTypeWildcard` + or `ResourcePatternTypePrefixed` pattern type that match the resource name. +* `options` - Delete ACLs options ++
+ Returns a slice of ACLBinding for each filter when the operation was successful +plus an error that is not `nil` for client level errors +
func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error)+
func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error)
DeleteTopics deletes a batch of topics.
@@ -1591,16 +2025,45 @@Requires broker version >= 0.10.1.0
+func (a *AdminClient) DescribeACLs(ctx context.Context, aclBindingFilter ACLBindingFilter, options ...DescribeACLsAdminOption) (result *DescribeACLsResult, err error)+
+ DescribeACLs matches ACL bindings by filter. +
++ Parameters: +
+* `ctx` - context with the maximum amount of time to block, or nil for indefinite. +* `aclBindingFilter` - A filter with attributes that must match. + string attributes match exact values or any string if set to empty string. + Enum attributes match exact values or any value if ending with `Any`. + If `ResourcePatternType` is set to `ResourcePatternTypeMatch` returns all + the ACL bindings with `ResourcePatternTypeLiteral`, `ResourcePatternTypeWildcard` + or `ResourcePatternTypePrefixed` pattern type that match the resource name. +* `options` - Describe ACLs options ++
+ Returns a slice of ACLBindings when the operation was successful +plus an error that is not `nil` for client level errors +
func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error)+
func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error)
DescribeConfigs retrieves configuration for cluster resources.
@@ -1633,14 +2096,14 @@func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)+
func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if @@ -1650,14 +2113,14 @@
func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error+
func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil @@ -1675,14 +2138,14 @@
func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error+
func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error
SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 @@ -1694,20 +2157,20 @@
func (a *AdminClient) String() string+
func (a *AdminClient) String() string
String returns a human readable name for an AdminClient instance
func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout)+
func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout)
SetAdminOperationTimeout sets the broker's operation timeout, such as the timeout for CreateTopics to complete the creation of topics on the controller @@ -1782,7 +2245,7 @@
func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout)+
func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout)
SetAdminRequestTimeout sets the overall request timeout, including broker lookup, request transmission, operation time on broker, and response. @@ -1825,7 +2288,7 @@
func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly)+
func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly)
SetAdminValidateOnly tells the broker to only validate the request, without performing the requested operation (create topics, etc). @@ -1868,7 +2331,7 @@
type AlterOperation int+
type AlterOperation int
func (o AlterOperation) String() string+
func (o AlterOperation) String() string
String returns the human-readable representation of an AlterOperation
func (e AssignedPartitions) String() string+
func (e AssignedPartitions) String() string
type BrokerMetadata struct { - ID int32 - Host string - Port int + ID int32 + Host string + Port int }
type ConfigEntry struct { // Name of configuration entry, e.g., topic configuration property name. - Name string + Name string // Value of configuration entry. - Value string + Value string // Operation to perform on the entry. Operation AlterOperation }
func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry+
func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry
StringMapToConfigEntries creates a new map of ConfigEntry objects from the provided string map. The AlterOperation is set on each created entry.
func (c ConfigEntry) String() string+
func (c ConfigEntry) String() string
String returns a human-readable representation of a ConfigEntry.
type ConfigEntryResult struct { // Name of configuration entry, e.g., topic configuration property name. - Name string + Name string // Value of configuration entry. - Value string + Value string // Source indicates the configuration source. Source ConfigSource // IsReadOnly indicates whether the configuration entry can be altered. - IsReadOnly bool + IsReadOnly bool // IsSensitive indicates whether the configuration entry contains sensitive information, in which case the value will be unset. - IsSensitive bool + IsSensitive bool // IsSynonym indicates whether the configuration entry is a synonym for another configuration property. - IsSynonym bool + IsSynonym bool // Synonyms contains a map of configuration entries that are synonyms to this configuration entry. - Synonyms map[string]ConfigEntryResult + Synonyms map[string]ConfigEntryResult }
func (c ConfigEntryResult) String() string+
func (c ConfigEntryResult) String() string
String returns a human-readable representation of a ConfigEntryResult.
type ConfigMap map[string]ConfigValue+
type ConfigMap map[string]ConfigValue
func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error)+
func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error)
Get finds the given key in the ConfigMap and returns its value. If the key is not found `defval` is returned. @@ -2090,28 +2553,28 @@
func (m ConfigMap) Set(kv string) error+
func (m ConfigMap) Set(kv string) error
Set implements flag.Set (command line argument parser) as a convenience for `-X key=value` config.
func (m ConfigMap) SetKey(key string, value ConfigValue) error+
func (m ConfigMap) SetKey(key string, value ConfigValue) error
SetKey sets configuration property key to value.
@@ -2121,7 +2584,7 @@func (c ConfigResource) String() string+
func (c ConfigResource) String() string
String returns a human-readable representation of a ConfigResource
func (c ConfigResourceResult) String() string+
func (c ConfigResourceResult) String() string
String returns a human-readable representation of a ConfigResourceResult.
ConfigSource represents an Apache Kafka config source
-type ConfigSource int+
type ConfigSource int
func (t ConfigSource) String() string+
func (t ConfigSource) String() string
String returns the human-readable representation of a ConfigSource type
type ConfigValue interface{}
func NewConsumer(conf *ConfigMap) (*Consumer, error)+
func NewConsumer(conf *ConfigMap) (*Consumer, error)
NewConsumer creates a new high-level Consumer instance.
@@ -2288,14 +2751,14 @@func (c *Consumer) Assign(partitions []TopicPartition) (err error)+
func (c *Consumer) Assign(partitions []TopicPartition) (err error)
Assign an atomic set of partitions to consume.
@@ -2311,27 +2774,27 @@func (c *Consumer) Assignment() (partitions []TopicPartition, err error)+
func (c *Consumer) Assignment() (partitions []TopicPartition, err error)
Assignment returns the current partition assignments
func (c *Consumer) AssignmentLost() bool+
func (c *Consumer) AssignmentLost() bool
AssignmentLost returns true if current partition assignment has been lost. This method is only applicable for use with a subscribing consumer when @@ -2341,28 +2804,28 @@
func (c *Consumer) Close() (err error)+
func (c *Consumer) Close() (err error)
Close Consumer instance. The object is no longer usable after this call.
func (c *Consumer) Commit() ([]TopicPartition, error)+
func (c *Consumer) Commit() ([]TopicPartition, error)
Commit offsets for currently assigned partitions This is a blocking call. @@ -2370,14 +2833,14 @@
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)+
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)
CommitMessage commits offset based on the provided message. This is a blocking call. @@ -2385,14 +2848,14 @@
func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)+
func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
CommitOffsets commits the provided list of offsets This is a blocking call. @@ -2400,20 +2863,20 @@
func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)+
func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
Committed retrieves committed offsets for the given set of partitions
func (c *Consumer) GetConsumerGroupMetadata() (*ConsumerGroupMetadata, error)+
func (c *Consumer) GetConsumerGroupMetadata() (*ConsumerGroupMetadata, error)
GetConsumerGroupMetadata returns the consumer's current group metadata. This object should be passed to the transactional producer's @@ -2441,14 +2904,14 @@
func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)+
func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if @@ -2458,14 +2921,14 @@
func (c *Consumer) GetRebalanceProtocol() string+
func (c *Consumer) GetRebalanceProtocol() string
GetRebalanceProtocol returns the current consumer group rebalance protocol, which is either "EAGER" or "COOPERATIVE". @@ -2475,14 +2938,14 @@
func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)+
func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
GetWatermarkOffsets returns the cached low and high offsets for the given topic and partition. The high offset is populated on every fetch response or via calling QueryWatermarkOffsets. @@ -2491,14 +2954,14 @@
func (c *Consumer) IncrementalAssign(partitions []TopicPartition) (err error)+
func (c *Consumer) IncrementalAssign(partitions []TopicPartition) (err error)
IncrementalAssign adds the specified partitions to the current set of partitions to consume. @@ -2515,14 +2978,14 @@
func (c *Consumer) IncrementalUnassign(partitions []TopicPartition) (err error)+
func (c *Consumer) IncrementalUnassign(partitions []TopicPartition) (err error)
IncrementalUnassign removes the specified partitions from the current set of partitions to consume. @@ -2535,7 +2998,7 @@
func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)+
func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
OffsetsForTimes looks up offsets by timestamp for the given partitions.
@@ -2579,14 +3042,14 @@func (c *Consumer) Pause(partitions []TopicPartition) (err error)+
func (c *Consumer) Pause(partitions []TopicPartition) (err error)
Pause consumption for the provided list of partitions
@@ -2597,14 +3060,14 @@func (c *Consumer) Poll(timeoutMs int) (event Event)+
func (c *Consumer) Poll(timeoutMs int) (event Event)
Poll the consumer for messages or events.
@@ -2621,14 +3084,14 @@func (c *Consumer) Position(partitions []TopicPartition) (offsets []TopicPartition, err error)+
func (c *Consumer) Position(partitions []TopicPartition) (offsets []TopicPartition, err error)
Position returns the current consume position for the given partitions. Typical use is to call Assignment() to get the partition list @@ -2639,27 +3102,27 @@
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)+
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
QueryWatermarkOffsets queries the broker for the low and high offsets for the given topic and partition.
func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error)+
func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error)
ReadMessage polls the consumer for a message.
@@ -2686,27 +3149,27 @@func (c *Consumer) Resume(partitions []TopicPartition) (err error)+
func (c *Consumer) Resume(partitions []TopicPartition) (err error)
Resume consumption for the provided list of partitions
func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error+
func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error
Seek seeks the given topic partitions using the offset from the TopicPartition.
@@ -2728,14 +3191,14 @@func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error+
func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil @@ -2753,14 +3216,14 @@
func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error+
func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error
SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 @@ -2769,17 +3232,31 @@
func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err error)+
+ StoreMessage stores offset based on the provided message. +This is a convenience method that uses StoreOffsets to do the actual work.
func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)+
func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)
StoreOffsets stores the provided list of offsets that will be committed to the offset store according to `auto.commit.interval.ms` or manual @@ -2792,87 +3269,87 @@
func (c *Consumer) String() string+
func (c *Consumer) String() string
Strings returns a human readable name for a Consumer instance
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error+
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
Subscribe to a single topic This replaces the current subscription
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)+
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
SubscribeTopics subscribes to the provided list of topics. This replaces the current subscription.
func (c *Consumer) Subscription() (topics []string, err error)+
func (c *Consumer) Subscription() (topics []string, err error)
Subscription returns the current subscription as set by Subscribe()
func (c *Consumer) Unassign() (err error)+
func (c *Consumer) Unassign() (err error)
Unassign the current set of partitions to consume.
func (c *Consumer) Unsubscribe() (err error)+
func (c *Consumer) Unsubscribe() (err error)
Unsubscribe from the current subscription, if any.
func NewTestConsumerGroupMetadata(groupID string) (*ConsumerGroupMetadata, error)+
func NewTestConsumerGroupMetadata(groupID string) (*ConsumerGroupMetadata, error)
NewTestConsumerGroupMetadata creates a new consumer group metadata instance mainly for testing use. Use GetConsumerGroupMetadata() to retrieve the real metadata.
++ CreateACLResult provides create ACL error information. +
+type CreateACLResult struct {
+ // Error, if any, of result. Check with `Error.Code() != ErrNoError`.
+ Error Error
+}
+
+ + CreateACLsAdminOption - see setter. +
++ See SetAdminRequestTimeout +
+type CreateACLsAdminOption interface {
+ // contains filtered or unexported methods
+}
type CreateTopicsAdminOption interface {
// contains filtered or unexported methods
}
+ + DeleteACLsAdminOption - see setter. +
++ See SetAdminRequestTimeout +
+type DeleteACLsAdminOption interface {
+ // contains filtered or unexported methods
+}
+ + DeleteACLsResult provides delete ACLs result or error information. +
+type DeleteACLsResult = DescribeACLsResult
type DeleteTopicsAdminOption interface {
// contains filtered or unexported methods
}
+ + DescribeACLsAdminOption - see setter. +
++ See SetAdminRequestTimeout +
+type DescribeACLsAdminOption interface {
+ // contains filtered or unexported methods
+}
+ + DescribeACLsResult provides describe ACLs result or error information. +
+type DescribeACLsResult struct { + // Slice of ACL bindings matching the provided filter + ACLBindings ACLBindings + // Error, if any, of result. Check with `Error.Code() != ErrNoError`. + Error Error +} +
func NewError(code ErrorCode, str string, fatal bool) (err Error)+
func NewError(code ErrorCode, str string, fatal bool) (err Error)
NewError creates a new Error.
func (e Error) Error() string+
func (e Error) Error() string
Error returns a human readable representation of an Error Same as Error.String()
func (e Error) IsFatal() bool+
func (e Error) IsFatal() bool
IsFatal returns true if the error is a fatal error. A fatal error indicates the client instance is no longer operable and @@ -3047,14 +3627,14 @@
func (e Error) IsRetriable() bool+
func (e Error) IsRetriable() bool
IsRetriable returns true if the operation that caused this error may be retried. @@ -3062,27 +3642,27 @@
func (e Error) String() string+
func (e Error) String() string
String returns a human readable representation of an Error
func (e Error) TxnRequiresAbort() bool+
func (e Error) TxnRequiresAbort() bool
TxnRequiresAbort returns true if the error is an abortable transaction error that requires the application to abort the current transaction with @@ -3092,7 +3672,7 @@
ErrorCode is the integer representation of local and broker error codes
-type ErrorCode int+
type ErrorCode int
const ( // ErrBadMsg Local: Bad message format - ErrBadMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_MSG) + ErrBadMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_MSG) // ErrBadCompression Local: Invalid compressed data - ErrBadCompression ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_COMPRESSION) + ErrBadCompression ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_COMPRESSION) // ErrDestroy Local: Broker handle destroyed - ErrDestroy ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__DESTROY) + ErrDestroy ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__DESTROY) // ErrFail Local: Communication failure with broker - ErrFail ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FAIL) + ErrFail ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FAIL) // ErrTransport Local: Broker transport failure - ErrTransport ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TRANSPORT) + ErrTransport ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TRANSPORT) // ErrCritSysResource Local: Critical system resource failure - ErrCritSysResource ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE) + ErrCritSysResource ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE) // ErrResolve Local: Host resolution failure - ErrResolve ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RESOLVE) + ErrResolve ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RESOLVE) // ErrMsgTimedOut Local: Message timed out - ErrMsgTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MSG_TIMED_OUT) + ErrMsgTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MSG_TIMED_OUT) // ErrPartitionEOF Broker: No more messages - ErrPartitionEOF ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTITION_EOF) + ErrPartitionEOF ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTITION_EOF) // ErrUnknownPartition Local: Unknown partition - ErrUnknownPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) + ErrUnknownPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION) // ErrFs Local: File or filesystem error - ErrFs ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FS) + ErrFs ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FS) // ErrUnknownTopic Local: Unknown topic - ErrUnknownTopic ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) + ErrUnknownTopic ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) // ErrAllBrokersDown Local: All broker connections are down - ErrAllBrokersDown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) + ErrAllBrokersDown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN) // ErrInvalidArg Local: Invalid argument or configuration - ErrInvalidArg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_ARG) + ErrInvalidArg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_ARG) // ErrTimedOut Local: Timed out - ErrTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT) + ErrTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT) // ErrQueueFull Local: Queue full - ErrQueueFull ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__QUEUE_FULL) + ErrQueueFull ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__QUEUE_FULL) // ErrIsrInsuff Local: ISR count insufficient - ErrIsrInsuff ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ISR_INSUFF) + ErrIsrInsuff ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ISR_INSUFF) // ErrNodeUpdate Local: Broker node update - ErrNodeUpdate ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NODE_UPDATE) + ErrNodeUpdate ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NODE_UPDATE) // ErrSsl Local: SSL error - ErrSsl ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__SSL) + ErrSsl ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__SSL) // ErrWaitCoord Local: Waiting for coordinator - ErrWaitCoord ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_COORD) + ErrWaitCoord ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_COORD) // ErrUnknownGroup Local: Unknown group - ErrUnknownGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_GROUP) + ErrUnknownGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_GROUP) // ErrInProgress Local: Operation in progress - ErrInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__IN_PROGRESS) + ErrInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__IN_PROGRESS) // ErrPrevInProgress Local: Previous operation in progress - ErrPrevInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS) + ErrPrevInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS) // ErrExistingSubscription Local: Existing subscription - ErrExistingSubscription ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION) + ErrExistingSubscription ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION) // ErrAssignPartitions Local: Assign partitions - ErrAssignPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) + ErrAssignPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) // ErrRevokePartitions Local: Revoke partitions - ErrRevokePartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) + ErrRevokePartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) // ErrConflict Local: Conflicting use - ErrConflict ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CONFLICT) + ErrConflict ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CONFLICT) // ErrState Local: Erroneous state - ErrState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__STATE) + ErrState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__STATE) // ErrUnknownProtocol Local: Unknown protocol - ErrUnknownProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL) + ErrUnknownProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL) // ErrNotImplemented Local: Not implemented - ErrNotImplemented ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED) + ErrNotImplemented ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED) // ErrAuthentication Local: Authentication failure - ErrAuthentication ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__AUTHENTICATION) + ErrAuthentication ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__AUTHENTICATION) // ErrNoOffset Local: No offset stored - ErrNoOffset ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NO_OFFSET) + ErrNoOffset ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NO_OFFSET) // ErrOutdated Local: Outdated - ErrOutdated ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__OUTDATED) + ErrOutdated ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__OUTDATED) // ErrTimedOutQueue Local: Timed out in queue - ErrTimedOutQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE) + ErrTimedOutQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE) // ErrUnsupportedFeature Local: Required feature not supported by broker - ErrUnsupportedFeature ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE) + ErrUnsupportedFeature ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE) // ErrWaitCache Local: Awaiting cache update - ErrWaitCache ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_CACHE) + ErrWaitCache ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_CACHE) // ErrIntr Local: Operation interrupted - ErrIntr ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INTR) + ErrIntr ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INTR) // ErrKeySerialization Local: Key serialization error - ErrKeySerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_SERIALIZATION) + ErrKeySerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_SERIALIZATION) // ErrValueSerialization Local: Value serialization error - ErrValueSerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION) + ErrValueSerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION) // ErrKeyDeserialization Local: Key deserialization error - ErrKeyDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION) + ErrKeyDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION) // ErrValueDeserialization Local: Value deserialization error - ErrValueDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION) + ErrValueDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION) // ErrPartial Local: Partial response - ErrPartial ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTIAL) + ErrPartial ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTIAL) // ErrReadOnly Local: Read-only object - ErrReadOnly ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__READ_ONLY) + ErrReadOnly ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__READ_ONLY) // ErrNoent Local: No such entry - ErrNoent ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOENT) + ErrNoent ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOENT) // ErrUnderflow Local: Read underflow - ErrUnderflow ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNDERFLOW) + ErrUnderflow ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNDERFLOW) // ErrInvalidType Local: Invalid type - ErrInvalidType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_TYPE) + ErrInvalidType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_TYPE) // ErrRetry Local: Retry operation - ErrRetry ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RETRY) + ErrRetry ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RETRY) // ErrPurgeQueue Local: Purged in queue - ErrPurgeQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PURGE_QUEUE) + ErrPurgeQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PURGE_QUEUE) // ErrPurgeInflight Local: Purged in flight - ErrPurgeInflight ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PURGE_INFLIGHT) + ErrPurgeInflight ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PURGE_INFLIGHT) // ErrFatal Local: Fatal error - ErrFatal ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FATAL) + ErrFatal ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FATAL) // ErrInconsistent Local: Inconsistent state - ErrInconsistent ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INCONSISTENT) + ErrInconsistent ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INCONSISTENT) // ErrGaplessGuarantee Local: Gap-less ordering would not be guaranteed if proceeding - ErrGaplessGuarantee ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE) + ErrGaplessGuarantee ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE) // ErrMaxPollExceeded Local: Maximum application poll interval (max.poll.interval.ms) exceeded - ErrMaxPollExceeded ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED) + ErrMaxPollExceeded ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED) // ErrUnknownBroker Local: Unknown broker - ErrUnknownBroker ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_BROKER) + ErrUnknownBroker ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_BROKER) // ErrNotConfigured Local: Functionality not configured - ErrNotConfigured ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOT_CONFIGURED) + ErrNotConfigured ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOT_CONFIGURED) // ErrFenced Local: This instance has been fenced by a newer instance - ErrFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FENCED) + ErrFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FENCED) // ErrApplication Local: Application generated error - ErrApplication ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__APPLICATION) + ErrApplication ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__APPLICATION) // ErrAssignmentLost Local: Group partition assignment lost - ErrAssignmentLost ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST) + ErrAssignmentLost ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST) // ErrNoop Local: No operation performed - ErrNoop ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOOP) + ErrNoop ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOOP) // ErrAutoOffsetReset Local: No offset to automatically reset to - ErrAutoOffsetReset ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET) + ErrAutoOffsetReset ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET) // ErrUnknown Unknown broker error - ErrUnknown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN) + ErrUnknown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN) // ErrNoError Success - ErrNoError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NO_ERROR) + ErrNoError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NO_ERROR) // ErrOffsetOutOfRange Broker: Offset out of range - ErrOffsetOutOfRange ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE) + ErrOffsetOutOfRange ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE) // ErrInvalidMsg Broker: Invalid message - ErrInvalidMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG) + ErrInvalidMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG) // ErrUnknownTopicOrPart Broker: Unknown topic or partition - ErrUnknownTopicOrPart ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) + ErrUnknownTopicOrPart ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) // ErrInvalidMsgSize Broker: Invalid message size - ErrInvalidMsgSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE) + ErrInvalidMsgSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE) // ErrLeaderNotAvailable Broker: Leader not available - ErrLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) + ErrLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE) // ErrNotLeaderForPartition Broker: Not leader for partition - ErrNotLeaderForPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) + ErrNotLeaderForPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) // ErrRequestTimedOut Broker: Request timed out - ErrRequestTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT) + ErrRequestTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT) // ErrBrokerNotAvailable Broker: Broker not available - ErrBrokerNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE) + ErrBrokerNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE) // ErrReplicaNotAvailable Broker: Replica not available - ErrReplicaNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE) + ErrReplicaNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE) // ErrMsgSizeTooLarge Broker: Message size too large - ErrMsgSizeTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE) + ErrMsgSizeTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE) // ErrStaleCtrlEpoch Broker: StaleControllerEpochCode - ErrStaleCtrlEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH) + ErrStaleCtrlEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH) // ErrOffsetMetadataTooLarge Broker: Offset metadata string too large - ErrOffsetMetadataTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE) + ErrOffsetMetadataTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE) // ErrNetworkException Broker: Broker disconnected before response received - ErrNetworkException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION) + ErrNetworkException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION) // ErrCoordinatorLoadInProgress Broker: Coordinator load in progress - ErrCoordinatorLoadInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS) + ErrCoordinatorLoadInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS) // ErrCoordinatorNotAvailable Broker: Coordinator not available - ErrCoordinatorNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE) + ErrCoordinatorNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE) // ErrNotCoordinator Broker: Not coordinator - ErrNotCoordinator ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_COORDINATOR) + ErrNotCoordinator ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_COORDINATOR) // ErrTopicException Broker: Invalid topic - ErrTopicException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION) + ErrTopicException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION) // ErrRecordListTooLarge Broker: Message batch larger than configured server segment size - ErrRecordListTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE) + ErrRecordListTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE) // ErrNotEnoughReplicas Broker: Not enough in-sync replicas - ErrNotEnoughReplicas ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS) + ErrNotEnoughReplicas ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS) // ErrNotEnoughReplicasAfterAppend Broker: Message(s) written to insufficient number of in-sync replicas - ErrNotEnoughReplicasAfterAppend ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND) + ErrNotEnoughReplicasAfterAppend ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND) // ErrInvalidRequiredAcks Broker: Invalid required acks value - ErrInvalidRequiredAcks ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS) + ErrInvalidRequiredAcks ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS) // ErrIllegalGeneration Broker: Specified group generation id is not valid - ErrIllegalGeneration ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION) + ErrIllegalGeneration ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION) // ErrInconsistentGroupProtocol Broker: Inconsistent group protocol - ErrInconsistentGroupProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL) + ErrInconsistentGroupProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL) // ErrInvalidGroupID Broker: Invalid group.id - ErrInvalidGroupID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_GROUP_ID) + ErrInvalidGroupID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_GROUP_ID) // ErrUnknownMemberID Broker: Unknown member - ErrUnknownMemberID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID) + ErrUnknownMemberID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID) // ErrInvalidSessionTimeout Broker: Invalid session timeout - ErrInvalidSessionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT) + ErrInvalidSessionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT) // ErrRebalanceInProgress Broker: Group rebalance in progress - ErrRebalanceInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS) + ErrRebalanceInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS) // ErrInvalidCommitOffsetSize Broker: Commit offset data size is not valid - ErrInvalidCommitOffsetSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE) + ErrInvalidCommitOffsetSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE) // ErrTopicAuthorizationFailed Broker: Topic authorization failed - ErrTopicAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED) + ErrTopicAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED) // ErrGroupAuthorizationFailed Broker: Group authorization failed - ErrGroupAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED) + ErrGroupAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED) // ErrClusterAuthorizationFailed Broker: Cluster authorization failed - ErrClusterAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED) + ErrClusterAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED) // ErrInvalidTimestamp Broker: Invalid timestamp - ErrInvalidTimestamp ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP) + ErrInvalidTimestamp ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP) // ErrUnsupportedSaslMechanism Broker: Unsupported SASL mechanism - ErrUnsupportedSaslMechanism ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM) + ErrUnsupportedSaslMechanism ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM) // ErrIllegalSaslState Broker: Request not valid in current SASL state - ErrIllegalSaslState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE) + ErrIllegalSaslState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE) // ErrUnsupportedVersion Broker: API version not supported - ErrUnsupportedVersion ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION) + ErrUnsupportedVersion ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION) // ErrTopicAlreadyExists Broker: Topic already exists - ErrTopicAlreadyExists ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS) + ErrTopicAlreadyExists ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS) // ErrInvalidPartitions Broker: Invalid number of partitions - ErrInvalidPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PARTITIONS) + ErrInvalidPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PARTITIONS) // ErrInvalidReplicationFactor Broker: Invalid replication factor - ErrInvalidReplicationFactor ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR) + ErrInvalidReplicationFactor ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR) // ErrInvalidReplicaAssignment Broker: Invalid replica assignment - ErrInvalidReplicaAssignment ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT) + ErrInvalidReplicaAssignment ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT) // ErrInvalidConfig Broker: Configuration is invalid - ErrInvalidConfig ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_CONFIG) + ErrInvalidConfig ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_CONFIG) // ErrNotController Broker: Not controller for cluster - ErrNotController ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_CONTROLLER) + ErrNotController ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_CONTROLLER) // ErrInvalidRequest Broker: Invalid request - ErrInvalidRequest ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUEST) + ErrInvalidRequest ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUEST) // ErrUnsupportedForMessageFormat Broker: Message format on broker does not support request - ErrUnsupportedForMessageFormat ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT) + ErrUnsupportedForMessageFormat ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT) // ErrPolicyViolation Broker: Policy violation - ErrPolicyViolation ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_POLICY_VIOLATION) + ErrPolicyViolation ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_POLICY_VIOLATION) // ErrOutOfOrderSequenceNumber Broker: Broker received an out of order sequence number - ErrOutOfOrderSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER) + ErrOutOfOrderSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER) // ErrDuplicateSequenceNumber Broker: Broker received a duplicate sequence number - ErrDuplicateSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER) + ErrDuplicateSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER) // ErrInvalidProducerEpoch Broker: Producer attempted an operation with an old epoch - ErrInvalidProducerEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH) + ErrInvalidProducerEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH) // ErrInvalidTxnState Broker: Producer attempted a transactional operation in an invalid state - ErrInvalidTxnState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TXN_STATE) + ErrInvalidTxnState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TXN_STATE) // ErrInvalidProducerIDMapping Broker: Producer attempted to use a producer id which is not currently assigned to its transactional id - ErrInvalidProducerIDMapping ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING) + ErrInvalidProducerIDMapping ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING) // ErrInvalidTransactionTimeout Broker: Transaction timeout is larger than the maximum value allowed by the broker's max.transaction.timeout.ms - ErrInvalidTransactionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT) + ErrInvalidTransactionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT) // ErrConcurrentTransactions Broker: Producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing - ErrConcurrentTransactions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS) + ErrConcurrentTransactions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS) // ErrTransactionCoordinatorFenced Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer - ErrTransactionCoordinatorFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED) + ErrTransactionCoordinatorFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED) // ErrTransactionalIDAuthorizationFailed Broker: Transactional Id authorization failed - ErrTransactionalIDAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED) + ErrTransactionalIDAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED) // ErrSecurityDisabled Broker: Security features are disabled - ErrSecurityDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_SECURITY_DISABLED) + ErrSecurityDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_SECURITY_DISABLED) // ErrOperationNotAttempted Broker: Operation not attempted - ErrOperationNotAttempted ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED) + ErrOperationNotAttempted ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED) // ErrKafkaStorageError Broker: Disk error when trying to access log file on disk - ErrKafkaStorageError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR) + ErrKafkaStorageError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR) // ErrLogDirNotFound Broker: The user-specified log directory is not found in the broker config - ErrLogDirNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND) + ErrLogDirNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LOG_DIR_NOT_FOUND) // ErrSaslAuthenticationFailed Broker: SASL Authentication failed - ErrSaslAuthenticationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED) + ErrSaslAuthenticationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED) // ErrUnknownProducerID Broker: Unknown Producer Id - ErrUnknownProducerID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID) + ErrUnknownProducerID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID) // ErrReassignmentInProgress Broker: Partition reassignment is in progress - ErrReassignmentInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS) + ErrReassignmentInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REASSIGNMENT_IN_PROGRESS) // ErrDelegationTokenAuthDisabled Broker: Delegation Token feature is not enabled - ErrDelegationTokenAuthDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED) + ErrDelegationTokenAuthDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTH_DISABLED) // ErrDelegationTokenNotFound Broker: Delegation Token is not found on server - ErrDelegationTokenNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND) + ErrDelegationTokenNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_NOT_FOUND) // ErrDelegationTokenOwnerMismatch Broker: Specified Principal is not valid Owner/Renewer - ErrDelegationTokenOwnerMismatch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH) + ErrDelegationTokenOwnerMismatch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_OWNER_MISMATCH) // ErrDelegationTokenRequestNotAllowed Broker: Delegation Token requests are not allowed on this connection - ErrDelegationTokenRequestNotAllowed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED) + ErrDelegationTokenRequestNotAllowed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED) // ErrDelegationTokenAuthorizationFailed Broker: Delegation Token authorization failed - ErrDelegationTokenAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED) + ErrDelegationTokenAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED) // ErrDelegationTokenExpired Broker: Delegation Token is expired - ErrDelegationTokenExpired ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED) + ErrDelegationTokenExpired ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DELEGATION_TOKEN_EXPIRED) // ErrInvalidPrincipalType Broker: Supplied principalType is not supported - ErrInvalidPrincipalType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE) + ErrInvalidPrincipalType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRINCIPAL_TYPE) // ErrNonEmptyGroup Broker: The group is not empty - ErrNonEmptyGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP) + ErrNonEmptyGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NON_EMPTY_GROUP) // ErrGroupIDNotFound Broker: The group id does not exist - ErrGroupIDNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND) + ErrGroupIDNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND) // ErrFetchSessionIDNotFound Broker: The fetch session ID was not found - ErrFetchSessionIDNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND) + ErrFetchSessionIDNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FETCH_SESSION_ID_NOT_FOUND) // ErrInvalidFetchSessionEpoch Broker: The fetch session epoch is invalid - ErrInvalidFetchSessionEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH) + ErrInvalidFetchSessionEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_FETCH_SESSION_EPOCH) // ErrListenerNotFound Broker: No matching listener - ErrListenerNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND) + ErrListenerNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LISTENER_NOT_FOUND) // ErrTopicDeletionDisabled Broker: Topic deletion is disabled - ErrTopicDeletionDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED) + ErrTopicDeletionDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_DELETION_DISABLED) // ErrFencedLeaderEpoch Broker: Leader epoch is older than broker epoch - ErrFencedLeaderEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH) + ErrFencedLeaderEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH) // ErrUnknownLeaderEpoch Broker: Leader epoch is newer than broker epoch - ErrUnknownLeaderEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH) + ErrUnknownLeaderEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH) // ErrUnsupportedCompressionType Broker: Unsupported compression type - ErrUnsupportedCompressionType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE) + ErrUnsupportedCompressionType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_COMPRESSION_TYPE) // ErrStaleBrokerEpoch Broker: Broker epoch has changed - ErrStaleBrokerEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH) + ErrStaleBrokerEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_BROKER_EPOCH) // ErrOffsetNotAvailable Broker: Leader high watermark is not caught up - ErrOffsetNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE) + ErrOffsetNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE) // ErrMemberIDRequired Broker: Group member needs a valid member ID - ErrMemberIDRequired ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED) + ErrMemberIDRequired ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED) // ErrPreferredLeaderNotAvailable Broker: Preferred leader was not available - ErrPreferredLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE) + ErrPreferredLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_PREFERRED_LEADER_NOT_AVAILABLE) // ErrGroupMaxSizeReached Broker: Consumer group has reached maximum size - ErrGroupMaxSizeReached ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED) + ErrGroupMaxSizeReached ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED) // ErrFencedInstanceID Broker: Static consumer fenced by other consumer with same group.instance.id - ErrFencedInstanceID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID) + ErrFencedInstanceID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID) // ErrEligibleLeadersNotAvailable Broker: Eligible partition leaders are not available - ErrEligibleLeadersNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE) + ErrEligibleLeadersNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE) // ErrElectionNotNeeded Broker: Leader election not needed for topic partition - ErrElectionNotNeeded ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ELECTION_NOT_NEEDED) + ErrElectionNotNeeded ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ELECTION_NOT_NEEDED) // ErrNoReassignmentInProgress Broker: No partition reassignment is in progress - ErrNoReassignmentInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NO_REASSIGNMENT_IN_PROGRESS) + ErrNoReassignmentInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NO_REASSIGNMENT_IN_PROGRESS) // ErrGroupSubscribedToTopic Broker: Deleting offsets of a topic while the consumer group is subscribed to it - ErrGroupSubscribedToTopic ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC) + ErrGroupSubscribedToTopic ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC) // ErrInvalidRecord Broker: Broker failed to validate record - ErrInvalidRecord ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_RECORD) + ErrInvalidRecord ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_RECORD) // ErrUnstableOffsetCommit Broker: There are unstable offsets that need to be cleared - ErrUnstableOffsetCommit ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) + ErrUnstableOffsetCommit ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) // ErrThrottlingQuotaExceeded Broker: Throttling quota has been exceeded - ErrThrottlingQuotaExceeded ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_THROTTLING_QUOTA_EXCEEDED) + ErrThrottlingQuotaExceeded ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_THROTTLING_QUOTA_EXCEEDED) // ErrProducerFenced Broker: There is a newer producer with the same transactionalId which fences the current one - ErrProducerFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_PRODUCER_FENCED) + ErrProducerFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_PRODUCER_FENCED) // ErrResourceNotFound Broker: Request illegally referred to resource that does not exist - ErrResourceNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_RESOURCE_NOT_FOUND) + ErrResourceNotFound ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_RESOURCE_NOT_FOUND) // ErrDuplicateResource Broker: Request illegally referred to the same resource twice - ErrDuplicateResource ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DUPLICATE_RESOURCE) + ErrDuplicateResource ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DUPLICATE_RESOURCE) // ErrUnacceptableCredential Broker: Requested credential would not meet criteria for acceptability - ErrUnacceptableCredential ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNACCEPTABLE_CREDENTIAL) + ErrUnacceptableCredential ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNACCEPTABLE_CREDENTIAL) // ErrInconsistentVoterSet Broker: Indicates that the either the sender or recipient of a voter-only request is not one of the expected voters - ErrInconsistentVoterSet ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INCONSISTENT_VOTER_SET) + ErrInconsistentVoterSet ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INCONSISTENT_VOTER_SET) // ErrInvalidUpdateVersion Broker: Invalid update version - ErrInvalidUpdateVersion ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_UPDATE_VERSION) + ErrInvalidUpdateVersion ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_UPDATE_VERSION) // ErrFeatureUpdateFailed Broker: Unable to update finalized features due to server error - ErrFeatureUpdateFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FEATURE_UPDATE_FAILED) + ErrFeatureUpdateFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_FEATURE_UPDATE_FAILED) // ErrPrincipalDeserializationFailure Broker: Request principal deserialization failed during forwarding - ErrPrincipalDeserializationFailure ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE) + ErrPrincipalDeserializationFailure ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE) )
func (c ErrorCode) String() string+
func (c ErrorCode) String() string
String returns a human readable representation of an error code
type Event interface {
// String returns a human-readable representation of the event
- String() string
+ String() string
}
type Header struct { - Key string // Header name (utf-8 string) - Value []byte // Header value (nil, empty, or binary) + Key string // Header name (utf-8 string) + Value []byte // Header value (nil, empty, or binary) }
func (h Header) String() string+
func (h Header) String() string
String returns the Header Key and data in a human representable possibly truncated form suitable for displaying to the user.
type LogEvent struct { - Name string // Name of client instance - Tag string // Log tag that provides context to the log Message (e.g., "METADATA" or "GRPCOORD") - Message string // Log message - Level int // Log syslog level, lower is more critical. - Timestamp time.Time // Log timestamp + Name string // Name of client instance + Tag string // Log tag that provides context to the log Message (e.g., "METADATA" or "GRPCOORD") + Message string // Log message + Level int // Log syslog level, lower is more critical. + Timestamp time.Time // Log timestamp }
func (logEvent LogEvent) String() string+
func (logEvent LogEvent) String() string
type Message struct { TopicPartition TopicPartition - Value []byte - Key []byte - Timestamp time.Time + Value []byte + Key []byte + Timestamp time.Time TimestampType TimestampType Opaque interface{} Headers []Header @@ -3586,21 +4166,21 @@
func (m *Message) String() string+
func (m *Message) String() string
String returns a human readable representation of a Message. Key and payload are not represented.
type Metadata struct { Brokers []BrokerMetadata - Topics map[string]TopicMetadata + Topics map[string]TopicMetadata OriginatingBroker BrokerMetadata }+
+ MockCluster represents a Kafka mock cluster instance which can be used +for testing. +
+type MockCluster struct {
+ // contains filtered or unexported fields
+}
+
+ func NewMockCluster(brokerCount int) (*MockCluster, error)+
+ NewMockCluster provides a mock Kafka cluster with a configurable +number of brokers that support a reasonable subset of Kafka protocol +operations, error injection, etc. +
++ Mock clusters provide localhost listeners that can be used as the bootstrap +servers by multiple Kafka client instances. +
++ Currently supported functionality: +- Producer +- Idempotent Producer +- Transactional Producer +- Low-level consumer +- High-level balanced consumer groups with offset commits +- Topic Metadata and auto creation +
++ Warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL. +
+func (mc *MockCluster) BootstrapServers() string+
+ BootstrapServers returns the bootstrap.servers property for this MockCluster +
+func (mc *MockCluster) Close()+
+ Close and destroy the MockCluster +
type OAuthBearerTokenRefresh struct {
// Config is the value of the sasl.oauthbearer.config property
- Config string
+ Config string
}
func (o OAuthBearerTokenRefresh) String() string+
func (o OAuthBearerTokenRefresh) String() string
Offset type (int64) with support for canonical names
-type Offset int64+
type Offset int64
func NewOffset(offset interface{}) (Offset, error)+
func NewOffset(offset interface{}) (Offset, error)
NewOffset creates a new Offset using the provided logical string, or an absolute int64 offset value. @@ -3706,7 +4360,7 @@
func (o *Offset) Set(offset interface{}) error+
func (o *Offset) Set(offset interface{}) error
Set offset value, see NewOffset()
func (o Offset) String() string+
func (o Offset) String() string
type OffsetsCommitted struct { - Error error + Error error Offsets []TopicPartition }
func (o OffsetsCommitted) String() string+
func (o OffsetsCommitted) String() string
type PartitionEOF TopicPartition
func (p PartitionEOF) String() string+
func (p PartitionEOF) String() string
type PartitionMetadata struct { - ID int32 + ID int32 Error Error - Leader int32 - Replicas []int32 - Isrs []int32 + Leader int32 + Replicas []int32 + Isrs []int32 }
type PartitionsSpecification struct { // Topic to create more partitions for. - Topic string + Topic string // New partition count for topic, must be higher than current partition count. - IncreaseTo int + IncreaseTo int // (Optional) Explicit replica assignment. The outer array is // indexed by the new partition index (i.e., 0 for the first added // partition), while the inner per-partition array // contains the replica broker ids. The first broker in each // broker id list will be the preferred replica. - ReplicaAssignment [][]int32 + ReplicaAssignment [][]int32 }
func NewProducer(conf *ConfigMap) (*Producer, error)+
func NewProducer(conf *ConfigMap) (*Producer, error)
NewProducer creates a new high-level Producer instance.
@@ -3888,14 +4542,14 @@func (p *Producer) AbortTransaction(ctx context.Context) error+
func (p *Producer) AbortTransaction(ctx context.Context) error
AbortTransaction aborts the ongoing transaction.
@@ -3933,14 +4587,14 @@func (p *Producer) BeginTransaction() error+
func (p *Producer) BeginTransaction() error
BeginTransaction starts a new transaction.
@@ -3970,7 +4624,7 @@func (p *Producer) CommitTransaction(ctx context.Context) error+
func (p *Producer) CommitTransaction(ctx context.Context) error
CommitTransaction commits the current transaction.
@@ -4034,7 +4688,7 @@func (p *Producer) Flush(timeoutMs int) int+
func (p *Producer) Flush(timeoutMs int) int
Flush and wait for outstanding messages and requests to complete delivery. Includes messages on ProduceChannel. @@ -4063,27 +4717,27 @@
func (p *Producer) GetFatalError() error+
func (p *Producer) GetFatalError() error
GetFatalError returns an Error object if the client instance has raised a fatal error, else nil.
func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)+
func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if @@ -4093,14 +4747,14 @@
func (p *Producer) InitTransactions(ctx context.Context) error+
func (p *Producer) InitTransactions(ctx context.Context) error
InitTransactions Initializes transactions for the producer instance.
@@ -4148,14 +4802,14 @@func (p *Producer) Len() int+
func (p *Producer) Len() int
Len returns the number of messages and requests waiting to be transmitted to the broker as well as delivery reports queued for the application. @@ -4163,7 +4817,7 @@
func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)+
func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
OffsetsForTimes looks up offsets by timestamp for the given partitions.
@@ -4207,14 +4861,14 @@func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error+
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error
Produce single message. This is an asynchronous call that enqueues the message on the internal @@ -4229,7 +4883,7 @@
func (p *Producer) Purge(flags int) error+
func (p *Producer) Purge(flags int) error
Purge messages currently handled by this producer instance.
@@ -4283,28 +4937,28 @@func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)+
func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
QueryWatermarkOffsets returns the broker's low and high offsets for the given topic and partition.
func (p *Producer) SendOffsetsToTransaction(ctx context.Context, offsets []TopicPartition, consumerMetadata *ConsumerGroupMetadata) error+
func (p *Producer) SendOffsetsToTransaction(ctx context.Context, offsets []TopicPartition, consumerMetadata *ConsumerGroupMetadata) error
SendOffsetsToTransaction sends a list of topic partition offsets to the consumer group coordinator for `consumerMetadata`, and marks the offsets @@ -4352,14 +5006,14 @@
func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error+
func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil @@ -4377,14 +5031,14 @@
func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error+
func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error
SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 @@ -4396,34 +5050,34 @@
func (p *Producer) String() string+
func (p *Producer) String() string
String returns a human readable name for a Producer instance
func (p *Producer) TestFatalError(code ErrorCode, str string) ErrorCode+
func (p *Producer) TestFatalError(code ErrorCode, str string) ErrorCode
TestFatalError triggers a fatal error in the underlying client. This is to be used strictly for testing purposes.
type RebalanceCb func(*Consumer, Event) error+
type RebalanceCb func(*Consumer, Event) error+
+ ResourcePatternType enumerates the different types of Kafka resource patterns. +
+type ResourcePatternType int+
func ResourcePatternTypeFromString(patternTypeString string) (ResourcePatternType, error)+
+ ResourcePatternTypeFromString translates a resource pattern type name to +a ResourcePatternType value. +
+func (t ResourcePatternType) String() string+
+ String returns the human-readable representation of a ResourcePatternType +
ResourceType represents an Apache Kafka resource type
-type ResourceType int+
type ResourceType int
func ResourceTypeFromString(typeString string) (ResourceType, error)+
func ResourceTypeFromString(typeString string) (ResourceType, error)
ResourceTypeFromString translates a resource type name/string to a ResourceType value.
func (t ResourceType) String() string+
func (t ResourceType) String() string
String returns the human-readable representation of a ResourceType
func (e RevokedPartitions) String() string+
func (e RevokedPartitions) String() string
func (e Stats) String() string+
func (e Stats) String() string
TimestampType is a the Message timestamp type or source
-type TimestampType int+
type TimestampType int
func (t TimestampType) String() string+
func (t TimestampType) String() string
type TopicMetadata struct { - Topic string + Topic string Partitions []PartitionMetadata Error Error }
type TopicPartition struct { - Topic *string - Partition int32 + Topic *string + Partition int32 Offset Offset - Metadata *string - Error error + Metadata *string + Error error }
func (p TopicPartition) String() string+
func (p TopicPartition) String() string
type TopicPartitions []TopicPartition
func (tps TopicPartitions) Len() int+
func (tps TopicPartitions) Len() int
func (tps TopicPartitions) Less(i, j int) bool+
func (tps TopicPartitions) Less(i, j int) bool
func (tps TopicPartitions) Swap(i, j int)+
func (tps TopicPartitions) Swap(i, j int)
type TopicResult struct { // Topic name - Topic string + Topic string // Error, if any, of result. Check with `Error.Code() != ErrNoError`. Error Error }
func (t TopicResult) String() string+
func (t TopicResult) String() string
String returns a human-readable representation of a TopicResult.
type TopicSpecification struct { // Topic name to create. - Topic string + Topic string // Number of partitions in topic. - NumPartitions int + NumPartitions int // Default replication factor for the topic's partitions, or zero // if an explicit ReplicaAssignment is set. - ReplicationFactor int + ReplicationFactor int // (Optional) Explicit replica assignment. The outer array is // indexed by the partition number, while the inner per-partition array // contains the replica broker ids. The first broker in each // broker id list will be the preferred replica. - ReplicaAssignment [][]int32 + ReplicaAssignment [][]int32 // Topic configuration. - Config map[string]string + Config map[string]string }diff --git a/kafka/build_darwin.go b/kafka/build_darwin.go index d8886f5e4..dd34cc11a 100644 --- a/kafka/build_darwin.go +++ b/kafka/build_darwin.go @@ -6,8 +6,8 @@ package kafka // #cgo CFLAGS: -DUSE_VENDORED_LIBRDKAFKA -DLIBRDKAFKA_STATICLIB -// #cgo LDFLAGS: ${SRCDIR}/librdkafka_vendor/librdkafka_darwin.a -lz -lm -lsasl2 -ldl -lpthread +// #cgo LDFLAGS: ${SRCDIR}/librdkafka_vendor/librdkafka_darwin.a -lm -lsasl2 -ldl -lpthread -framework CoreFoundation -framework SystemConfiguration import "C" // LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client -const LibrdkafkaLinkInfo = "static darwin from librdkafka-static-bundle-v1.7.0.tgz" +const LibrdkafkaLinkInfo = "static darwin from librdkafka-static-bundle-v1.9.0.tgz" diff --git a/kafka/build_glibc_linux.go b/kafka/build_glibc_linux.go index 26cf9e95b..9a03db873 100644 --- a/kafka/build_glibc_linux.go +++ b/kafka/build_glibc_linux.go @@ -10,4 +10,4 @@ package kafka import "C" // LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client -const LibrdkafkaLinkInfo = "static glibc_linux from librdkafka-static-bundle-v1.7.0.tgz" +const LibrdkafkaLinkInfo = "static glibc_linux from librdkafka-static-bundle-v1.9.0.tgz" diff --git a/kafka/build_musl_linux.go b/kafka/build_musl_linux.go index 8d8ddc4f0..4ef601edd 100644 --- a/kafka/build_musl_linux.go +++ b/kafka/build_musl_linux.go @@ -10,4 +10,4 @@ package kafka import "C" // LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client -const LibrdkafkaLinkInfo = "static musl_linux from librdkafka-static-bundle-v1.7.0.tgz" +const LibrdkafkaLinkInfo = "static musl_linux from librdkafka-static-bundle-v1.9.0.tgz" diff --git a/kafka/build_windows.go b/kafka/build_windows.go index 656827afb..53f47d424 100644 --- a/kafka/build_windows.go +++ b/kafka/build_windows.go @@ -10,4 +10,4 @@ package kafka import "C" // LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client -const LibrdkafkaLinkInfo = "static windows from librdkafka-static-bundle-v1.7.0.tgz" +const LibrdkafkaLinkInfo = "static windows from librdkafka-static-bundle-v1.9.0.tgz" diff --git a/kafka/config_test.go b/kafka/config_test.go index 60093e496..67ee724ac 100644 --- a/kafka/config_test.go +++ b/kafka/config_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "fmt" "testing" diff --git a/kafka/consumer.go b/kafka/consumer.go index e58ebb56b..156f34d30 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -292,6 +292,20 @@ func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []Topic return storedOffsets, nil } +// StoreMessage stores offset based on the provided message. +// This is a convenience method that uses StoreOffsets to do the actual work. +func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err error) { + if m.TopicPartition.Error != nil { + return nil, newErrorFromString(ErrInvalidArg, "Can't store errored message") + } + if m.TopicPartition.Offset < 0 { + return nil, newErrorFromString(ErrInvalidArg, "Can't store message with offset less than 0") + } + offsets := []TopicPartition{m.TopicPartition} + offsets[0].Offset++ + return c.StoreOffsets(offsets) +} + // Seek seeks the given topic partitions using the offset from the TopicPartition. // // If timeoutMs is not 0 the call will wait this long for the @@ -410,30 +424,16 @@ func (c *Consumer) Close() (err error) { close(c.events) } - // librdkafka's rd_kafka_consumer_close() will block - // and trigger the rebalance_cb() if one is set, if not, which is the - // case with the Go client since it registers EVENTs rather than callbacks, - // librdkafka will shortcut the rebalance_cb and do a forced unassign. - // But we can't have that since the application might need the final RevokePartitions - // before shutting down. So we trigger an Unsubscribe() first, wait for that to - // propagate (in the Poll loop below), and then close the consumer. - c.Unsubscribe() + C.rd_kafka_consumer_close_queue(c.handle.rk, c.handle.rkq) - // Poll for rebalance events - for { - c.Poll(10 * 1000) - if int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 { - break - } + for C.rd_kafka_consumer_closed(c.handle.rk) != 1 { + c.Poll(100) } // Destroy our queue C.rd_kafka_queue_destroy(c.handle.rkq) c.handle.rkq = nil - // Close the consumer - C.rd_kafka_consumer_close(c.handle.rk) - c.handle.cleanup() C.rd_kafka_destroy(c.handle.rk) diff --git a/kafka/consumer_performance_test.go b/kafka/consumer_performance_test.go index 49edbaea5..e77cf22dc 100644 --- a/kafka/consumer_performance_test.go +++ b/kafka/consumer_performance_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "fmt" "math/rand" diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index d7cacc3ed..7f5ea38d0 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,14 +16,14 @@ * limitations under the License. */ -package kafka - import ( "fmt" "os" "reflect" "sort" "strings" + "sync" + "sync/atomic" "testing" "time" ) @@ -85,6 +87,16 @@ func TestConsumerAPIs(t *testing.T) { t.Errorf("StoreOffsets(empty) failed: %s", err) } + // test StoreMessage doesn't fail either + stored, err = c.StoreMessage(&Message{TopicPartition: TopicPartition{Topic: &topic, Partition: 0, Offset: 1}}) + if err != nil && err.(Error).Code() != ErrUnknownPartition { + t.Errorf("StoreMessage() failed: %s", err) + toppar := stored[0] + if toppar.Error != nil && toppar.Error.(Error).Code() == ErrUnknownPartition { + t.Errorf("StoreMessage() TopicPartition error: %s", toppar.Error) + } + } + topic1 := "gotest1" topic2 := "gotest2" err = c.Assign([]TopicPartition{{Topic: &topic1, Partition: 2}, @@ -419,3 +431,280 @@ func TestConsumerLog(t *testing.T) { } } } + +func wrapRebalanceCb(assignedEvents *int32, revokedEvents *int32, t *testing.T) func(c *Consumer, event Event) error { + return func(c *Consumer, event Event) error { + switch ev := event.(type) { + case AssignedPartitions: + atomic.AddInt32(assignedEvents, 1) + + t.Logf("%v, %s rebalance: %d new partition(s) assigned: %v\n", + c, c.GetRebalanceProtocol(), len(ev.Partitions), + ev.Partitions) + err := c.Assign(ev.Partitions) + if err != nil { + panic(err) + } + + case RevokedPartitions: + atomic.AddInt32(revokedEvents, 1) + + t.Logf("%v, %s rebalance: %d partition(s) revoked: %v\n", + c, c.GetRebalanceProtocol(), len(ev.Partitions), + ev.Partitions) + if c.AssignmentLost() { + // Our consumer has been kicked out of the group and the + // entire assignment is thus lost. + t.Logf("%v, Current assignment lost!\n", c) + } + + // The client automatically calls Unassign() unless + // the callback has already called that method. + } + return nil + } +} + +func testPoll(c *Consumer, doneChan chan bool, t *testing.T, wg *sync.WaitGroup) { + defer wg.Done() + + run := true + for run { + select { + case <-doneChan: + run = false + + default: + ev := c.Poll(100) + if ev == nil { + continue + } + switch e := ev.(type) { + case *Message: + t.Logf("Message on %s:\n%s\n", + e.TopicPartition, string(e.Value)) + if e.Headers != nil { + t.Logf("Headers: %v\n", e.Headers) + } + + case Error: + // Errors should generally be + // considered informational, the client + // will try to automatically recover. + t.Logf("Error: %v: %v for "+ + "consumer %v\n", e.Code(), e, c) + + default: + t.Logf("Ignored %v for consumer %v\n", + e, c) + } + } + } +} + +// TestConsumerCloseForStaticMember verifies the rebalance +// for static membership. +// According to KIP-345, the consumer group will not trigger rebalance unless +// 1) A new member joins +// 2) A leader rejoins (possibly due to topic assignment change) +// 3) An existing member offline time is over session timeout +// 4) Broker receives a leave group request containing alistof +// `group.instance.id`s (details later) +// +// This test uses 3 consumers while each consumer joins after the assignment +// finished for the previous consumers. +// The expected behavior for these consumers are: +// 1) First consumer joins, AssignedPartitions happens. Assign all the +// partitions to it. +// 2) Second consumer joins, RevokedPartitions happens from the first consumer, +// then AssignedPartitions happens to both consumers. +// 3) Third consumer joins, RevokedPartitions happens from the previous two +// consumers, then AssignedPartitions happens to all the three consumers. +// 4) Close the second consumer, revoke its assignments will happen, but it +// should not notice other consumers. +// 5) Rejoin the second consumer, rebalance should not happen to all the other +// consumers since it's not the leader, AssignedPartitions only happened +// to this consumer to assign the partitions. +// 6) Close the third consumer, revoke its assignments will happen, but it +// should not notice other consumers. +// 7) Close the rejoined consumer, revoke its assignments will happen, +// but it should not notice other consumers. +// 8) Close the first consumer, revoke its assignments will happen. +// +// The total number of AssignedPartitions for the first consumer is 3, +// and the total number of RevokedPartitions for the first consumer is 3. +// The total number of AssignedPartitions for the second consumer is 2, +// and the total number of RevokedPartitions for the second consumer is 2. +// The total number of AssignedPartitions for the third consumer is 1, +// and the total number of RevokedPartitions for the third consumer is 1. +// The total number of AssignedPartitions for the rejoined consumer +// (originally second consumer) is 1, +// and the total number of RevokedPartitions for the rejoined consumer +// (originally second consumer) is 1. +func TestConsumerCloseForStaticMember(t *testing.T) { + if !testconfRead() { + t.Skipf("Missing testconf.json") + } + broker := testconf.Brokers + topic := createTestTopic(t, "staticMembership", 3, 1) + + var assignedEvents1 int32 + var revokedEvents1 int32 + + var assignedEvents2 int32 + var revokedEvents2 int32 + + var assignedEvents3 int32 + var revokedEvents3 int32 + + var assignedEvents4 int32 + var revokedEvents4 int32 + + conf1 := ConfigMap{ + "bootstrap.servers": broker, + "group.id": "rebalance", + "session.timeout.ms": "6000", + "max.poll.interval.ms": "10000", + "group.instance.id": "staticmember1", + } + c1, err := NewConsumer(&conf1) + + conf2 := ConfigMap{ + "bootstrap.servers": broker, + "group.id": "rebalance", + "session.timeout.ms": "6000", + "max.poll.interval.ms": "10000", + "group.instance.id": "staticmember2", + } + c2, err := NewConsumer(&conf2) + if err != nil { + t.Fatalf("%s", err) + } + + conf3 := ConfigMap{ + "bootstrap.servers": broker, + "group.id": "rebalance", + "session.timeout.ms": "6000", + "max.poll.interval.ms": "10000", + "group.instance.id": "staticmember3", + } + + c3, err := NewConsumer(&conf3) + if err != nil { + t.Fatalf("%s", err) + } + wrapRebalancecb1 := wrapRebalanceCb(&assignedEvents1, &revokedEvents1, t) + err = c1.Subscribe(topic, wrapRebalancecb1) + if err != nil { + t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) + } + + wg := sync.WaitGroup{} + doneChan := make(chan bool, 3) + + wg.Add(1) + go testPoll(c1, doneChan, t, &wg) + testConsumerWaitAssignment(c1, t) + + closeChan := make(chan bool) + wrapRebalancecb2 := wrapRebalanceCb(&assignedEvents2, &revokedEvents2, t) + err = c2.Subscribe(topic, wrapRebalancecb2) + if err != nil { + t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) + } + wg.Add(1) + go testPoll(c2, closeChan, t, &wg) + testConsumerWaitAssignment(c2, t) + + wrapRebalancecb3 := wrapRebalanceCb(&assignedEvents3, &revokedEvents3, t) + err = c3.Subscribe(topic, wrapRebalancecb3) + if err != nil { + t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) + } + wg.Add(1) + go testPoll(c3, doneChan, t, &wg) + testConsumerWaitAssignment(c3, t) + + closeChan <- true + close(closeChan) + c2.Close() + + c2, err = NewConsumer(&conf2) + if err != nil { + t.Fatalf("%s", err) + } + + wrapRebalancecb4 := wrapRebalanceCb(&assignedEvents4, &revokedEvents4, t) + err = c2.Subscribe(topic, wrapRebalancecb4) + if err != nil { + t.Fatalf("Failed to subscribe to topic %s: %s\n", topic, err) + } + + wg.Add(1) + go testPoll(c2, doneChan, t, &wg) + testConsumerWaitAssignment(c2, t) + + doneChan <- true + close(doneChan) + + c3.Close() + c2.Close() + c1.Close() + + wg.Wait() + + // Wait 2 * session.timeout.ms to make sure no revokedEvents happens + time.Sleep(2 * 6000 * time.Millisecond) + + if atomic.LoadInt32(&assignedEvents1) != 3 { + t.Fatalf("3 assignedEvents are Expected to happen for the first consumer, but %d happened\n", + atomic.LoadInt32(&assignedEvents1)) + } + + if atomic.LoadInt32(&revokedEvents1) != 3 { + t.Fatalf("3 revokedEvents are Expected to happen for the first consumer, but %d happened\n", + atomic.LoadInt32(&revokedEvents1)) + } + + if atomic.LoadInt32(&assignedEvents2) != 2 { + t.Fatalf("2 assignedEvents are Expected to happen for the second consumer, but %d happened\n", + atomic.LoadInt32(&assignedEvents2)) + } + if atomic.LoadInt32(&revokedEvents2) != 2 { + t.Fatalf("2 revokedEvents is Expected to happen for the second consumer, but %d happened\n", + atomic.LoadInt32(&revokedEvents2)) + } + + if atomic.LoadInt32(&assignedEvents3) != 1 { + t.Fatalf("1 assignedEvents is Expected to happen for the third consumer, but %d happened\n", + atomic.LoadInt32(&assignedEvents3)) + } + if atomic.LoadInt32(&revokedEvents3) != 1 { + t.Fatalf("1 revokedEvents is Expected to happen for the third consumer, but %d happened\n", + atomic.LoadInt32(&revokedEvents3)) + } + + if atomic.LoadInt32(&assignedEvents4) != 1 { + t.Fatalf("1 assignedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n", + atomic.LoadInt32(&assignedEvents4)) + } + if atomic.LoadInt32(&revokedEvents4) != 1 { + t.Fatalf("1 revokedEvents is Expected to happen for the rejoined consumer(originally second consumer), but %d happened\n", + atomic.LoadInt32(&revokedEvents4)) + } +} + +func testConsumerWaitAssignment(c *Consumer, t *testing.T) { + run := true + for run { + assignment, err := c.Assignment() + if err != nil { + t.Fatalf("Assignment failed: %s\n", err) + } + + if len(assignment) != 0 { + t.Logf("%v Assigned partitions are: %v\n", c, assignment) + run = false + } + } +} diff --git a/kafka/context.go b/kafka/context.go index 85709be0f..b0dfd0e44 100644 --- a/kafka/context.go +++ b/kafka/context.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2019 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "context" "time" diff --git a/kafka/error.go b/kafka/error.go index 1827f43e1..193e7ea3e 100644 --- a/kafka/error.go +++ b/kafka/error.go @@ -67,10 +67,8 @@ func newCErrorFromString(code C.rd_kafka_resp_err_t, str string) (err Error) { return newErrorFromString(ErrorCode(code), str) } -// newErrorFromCError creates a new Error instance and destroys -// the passed cError. -func newErrorFromCErrorDestroy(cError *C.rd_kafka_error_t) Error { - defer C.rd_kafka_error_destroy(cError) +// newErrorFromCError creates a new Error instance +func newErrorFromCError(cError *C.rd_kafka_error_t) Error { return Error{ code: ErrorCode(C.rd_kafka_error_code(cError)), str: C.GoString(C.rd_kafka_error_string(cError)), @@ -80,6 +78,13 @@ func newErrorFromCErrorDestroy(cError *C.rd_kafka_error_t) Error { } } +// newErrorFromCErrorDestroy creates a new Error instance and destroys +// the passed cError. +func newErrorFromCErrorDestroy(cError *C.rd_kafka_error_t) Error { + defer C.rd_kafka_error_destroy(cError) + return newErrorFromCError(cError) +} + // Error returns a human readable representation of an Error // Same as Error.String() func (e Error) Error() string { diff --git a/kafka/error_test.go b/kafka/error_test.go index d405ba749..414fd59e8 100644 --- a/kafka/error_test.go +++ b/kafka/error_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2019 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "strings" "testing" diff --git a/kafka/event.go b/kafka/event.go index d7f0b6189..6357ad858 100644 --- a/kafka/event.go +++ b/kafka/event.go @@ -251,6 +251,8 @@ out: select { case *ch <- msg: case <-termChan: + retval = nil + term = true break out } diff --git a/kafka/event_test.go b/kafka/event_test.go index 0c172e0b8..355ae2145 100644 --- a/kafka/event_test.go +++ b/kafka/event_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "testing" ) diff --git a/kafka/generated_errors.go b/kafka/generated_errors.go index 96ed1eea0..568c831e8 100644 --- a/kafka/generated_errors.go +++ b/kafka/generated_errors.go @@ -1,6 +1,6 @@ package kafka -// Copyright 2016-2021 Confluent Inc. -// AUTOMATICALLY GENERATED ON 2021-05-10 11:33:08.588919179 +0200 CEST m=+0.000341587 USING librdkafka 1.7.0-dirty +// Copyright 2016-2022 Confluent Inc. +// AUTOMATICALLY GENERATED ON 2022-06-16 11:17:24.861602 -0700 PDT m=+0.000650282 USING librdkafka 1.9.0 /* #include "select_rdkafka.h" diff --git a/kafka/header_test.go b/kafka/header_test.go index f365cb912..6c0111941 100644 --- a/kafka/header_test.go +++ b/kafka/header_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "testing" ) diff --git a/kafka/integration_test.go b/kafka/integration_test.go index b0df51535..ebb26c32b 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "context" "encoding/binary" @@ -24,6 +24,7 @@ import ( "path" "reflect" "runtime" + "sort" "testing" "time" ) @@ -1654,3 +1655,197 @@ func TestAdminClient_ControllerID(t *testing.T) { t.Logf("ControllerID: %d\n", controllerID) } + +func TestAdminACLs(t *testing.T) { + if !testconfRead() { + t.Skipf("Missing testconf.json") + } + + rand.Seed(time.Now().Unix()) + topic := testconf.Topic + group := testconf.GroupID + noError := NewError(ErrNoError, "", false) + unknownError := NewError(ErrUnknown, "Unknown broker error", false) + var expectedCreateACLs []CreateACLResult + var expectedDescribeACLs DescribeACLsResult + var expectedDeleteACLs []DeleteACLsResult + var ctx context.Context + var cancel context.CancelFunc + + a := createAdminClient(t) + defer a.Close() + + maxDuration, err := time.ParseDuration("30s") + if err != nil { + t.Fatalf("%s", err) + } + requestTimeout, err := time.ParseDuration("20s") + if err != nil { + t.Fatalf("%s", err) + } + + checkExpectedResult := func(expected interface{}, result interface{}) { + if !reflect.DeepEqual(result, expected) { + t.Fatalf("Expected result to deep equal to %v, but found %v", expected, result) + } + } + + // Create ACLs + t.Logf("Creating ACLs\n") + newACLs := ACLBindings{ + { + Type: ResourceTopic, + Name: topic, + ResourcePatternType: ResourcePatternTypeLiteral, + Principal: "User:test-user-1", + Host: "*", + Operation: ACLOperationRead, + PermissionType: ACLPermissionTypeAllow, + }, + { + Type: ResourceTopic, + Name: topic, + ResourcePatternType: ResourcePatternTypePrefixed, + Principal: "User:test-user-2", + Host: "*", + Operation: ACLOperationWrite, + PermissionType: ACLPermissionTypeDeny, + }, + { + Type: ResourceGroup, + Name: group, + ResourcePatternType: ResourcePatternTypePrefixed, + Principal: "User:test-user-2", + Host: "*", + Operation: ACLOperationAll, + PermissionType: ACLPermissionTypeAllow, + }, + } + + invalidACLs := ACLBindings{ + { + Type: ResourceTopic, + Name: topic, + ResourcePatternType: ResourcePatternTypeLiteral, + // Principal must be in the form "{principalType}:{principalName}" + // Broker returns ErrUnknown in this case + Principal: "wrong-principal", + Host: "*", + Operation: ACLOperationRead, + PermissionType: ACLPermissionTypeAllow, + }, + } + + aclBindingFilters := ACLBindingFilters{ + { + Type: ResourceAny, + ResourcePatternType: ResourcePatternTypeAny, + Operation: ACLOperationAny, + PermissionType: ACLPermissionTypeAny, + }, + { + Type: ResourceAny, + ResourcePatternType: ResourcePatternTypePrefixed, + Operation: ACLOperationAny, + PermissionType: ACLPermissionTypeAny, + }, + { + Type: ResourceTopic, + ResourcePatternType: ResourcePatternTypeAny, + Operation: ACLOperationAny, + PermissionType: ACLPermissionTypeAny, + }, + { + Type: ResourceGroup, + ResourcePatternType: ResourcePatternTypeAny, + Operation: ACLOperationAny, + PermissionType: ACLPermissionTypeAny, + }, + } + + // CreateACLs should be idempotent + for n := 0; n < 2; n++ { + ctx, cancel = context.WithTimeout(context.Background(), maxDuration) + defer cancel() + + resultCreateACLs, err := a.CreateACLs(ctx, newACLs, SetAdminRequestTimeout(requestTimeout)) + if err != nil { + t.Fatalf("CreateACLs() failed: %s", err) + } + expectedCreateACLs = []CreateACLResult{{Error: noError}, {Error: noError}, {Error: noError}} + checkExpectedResult(expectedCreateACLs, resultCreateACLs) + } + + // CreateACLs with server side validation errors + ctx, cancel = context.WithTimeout(context.Background(), maxDuration) + defer cancel() + + resultCreateACLs, err := a.CreateACLs(ctx, invalidACLs, SetAdminRequestTimeout(requestTimeout)) + if err != nil { + t.Fatalf("CreateACLs() failed: %s", err) + } + expectedCreateACLs = []CreateACLResult{{Error: unknownError}} + checkExpectedResult(expectedCreateACLs, resultCreateACLs) + + // DescribeACLs must return the three ACLs + ctx, cancel = context.WithTimeout(context.Background(), maxDuration) + defer cancel() + resultDescribeACLs, err := a.DescribeACLs(ctx, aclBindingFilters[0], SetAdminRequestTimeout(requestTimeout)) + expectedDescribeACLs = DescribeACLsResult{ + Error: noError, + ACLBindings: newACLs, + } + if err != nil { + t.Fatalf("%s", err) + } + sort.Sort(&resultDescribeACLs.ACLBindings) + checkExpectedResult(expectedDescribeACLs, *resultDescribeACLs) + + // Delete the ACLs with ResourcePatternTypePrefixed + ctx, cancel = context.WithTimeout(context.Background(), maxDuration) + defer cancel() + resultDeleteACLs, err := a.DeleteACLs(ctx, aclBindingFilters[1:2], SetAdminRequestTimeout(requestTimeout)) + expectedDeleteACLs = []DeleteACLsResult{ + { + Error: noError, + ACLBindings: newACLs[1:3], + }, + } + if err != nil { + t.Fatalf("%s", err) + } + sort.Sort(&resultDeleteACLs[0].ACLBindings) + checkExpectedResult(expectedDeleteACLs, resultDeleteACLs) + + // Delete the ACLs with ResourceTopic and ResourceGroup + ctx, cancel = context.WithTimeout(context.Background(), maxDuration) + defer cancel() + resultDeleteACLs, err = a.DeleteACLs(ctx, aclBindingFilters[2:4], SetAdminRequestTimeout(requestTimeout)) + expectedDeleteACLs = []DeleteACLsResult{ + { + Error: noError, + ACLBindings: newACLs[0:1], + }, + { + Error: noError, + ACLBindings: ACLBindings{}, + }, + } + if err != nil { + t.Fatalf("%s", err) + } + checkExpectedResult(expectedDeleteACLs, resultDeleteACLs) + + // All the ACLs should have been deleted + ctx, cancel = context.WithTimeout(context.Background(), maxDuration) + defer cancel() + resultDescribeACLs, err = a.DescribeACLs(ctx, aclBindingFilters[0], SetAdminRequestTimeout(requestTimeout)) + expectedDescribeACLs = DescribeACLsResult{ + Error: noError, + ACLBindings: ACLBindings{}, + } + if err != nil { + t.Fatalf("%s", err) + } + checkExpectedResult(expectedDescribeACLs, *resultDescribeACLs) +} diff --git a/kafka/kafka.go b/kafka/kafka.go index 20dc30a89..254edbdbd 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -20,7 +20,7 @@ // // High-level Consumer // -// * Decide if you want to read messages and events by calling `.Poll()` or +// * Decide if you want to read messages and events by calling `.Poll()` or // the deprecated option of using the `.Events()` channel. (If you want to use // `.Events()` channel then set `"go.events.channel.enable": true`). // diff --git a/kafka/kafka_test.go b/kafka/kafka_test.go index e268f7eb6..9659c812e 100644 --- a/kafka/kafka_test.go +++ b/kafka/kafka_test.go @@ -1,3 +1,5 @@ +package kafka + /** * Copyright 2016 Confluent Inc. * @@ -14,8 +16,6 @@ * limitations under the License. */ -package kafka - import ( "testing" ) diff --git a/kafka/librdkafka_vendor/LICENSES.txt b/kafka/librdkafka_vendor/LICENSES.txt index f2aa57d07..1ab8a1dd4 100644 --- a/kafka/librdkafka_vendor/LICENSES.txt +++ b/kafka/librdkafka_vendor/LICENSES.txt @@ -27,6 +27,32 @@ ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +LICENSE.cjson +-------------------------------------------------------------- +For cJSON.c and cJSON.h: + +Copyright (c) 2009-2017 Dave Gamble and cJSON contributors + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + + + LICENSE.crc32c -------------------------------------------------------------- # For src/crc32c.c copied (with modifications) from diff --git a/kafka/librdkafka_vendor/bundle-import.sh b/kafka/librdkafka_vendor/bundle-import.sh index 2f7aedab2..5394fac69 100755 --- a/kafka/librdkafka_vendor/bundle-import.sh +++ b/kafka/librdkafka_vendor/bundle-import.sh @@ -14,28 +14,30 @@ usage() { } +# Parse dynamic libraries from linker command line. +# Will print a list matching -lfoo and -framework X.. parse_dynlibs() { - # Parse dynamic libraries from pkg-config file, - # both the ones specified with Libs: but also through Requires: - local pc=$1 local libs= - local req= - local n= - for req in $(grep ^Requires: $pc | sed -e 's/^Requires://'); do - n=$(pkg-config --libs $req) - if [[ $n == -l* ]]; then - libs="${libs} $n" - fi - done - for n in $(grep ^Libs: $pc); do - if [[ $n == -l* ]]; then - libs="${libs} $n" + while [[ $# -gt 0 ]]; do + if [[ $1 == -l* ]]; then + libs="${libs} $1" + elif [[ $1 == -framework ]]; then + libs="${libs} $1 $2" + shift # remove one (extra) arg fi + shift # remove one arg done echo "$libs" } +# Parse dynamic library dependecies from pkg-config file and print +# them to stdout. +parse_pc_dynlibs() { + local pc=$1 + parse_dynlibs $(sed -n 's/^Libs: \(..*\)/\1/p' "$pc") +} + setup_build() { # Copies static library from the temp directory into final location, # extracts dynamic lib list from the pkg-config file, @@ -54,7 +56,7 @@ setup_build() { build_tag="// +build musl" fi - local dynlibs=$(parse_dynlibs $pc) + local dynlibs=$(parse_pc_dynlibs $pc) echo "Copying $apath to $dpath" cp "$apath" "$dpath" diff --git a/kafka/librdkafka_vendor/import.sh b/kafka/librdkafka_vendor/import.sh index 702116997..7f920f3a9 100755 --- a/kafka/librdkafka_vendor/import.sh +++ b/kafka/librdkafka_vendor/import.sh @@ -51,12 +51,15 @@ fi curr_branch=$(git symbolic-ref HEAD 2>/dev/null | cut -d"/" -f 3-) uncommitted=$(git status --untracked-files=no --porcelain) -if [[ $devel != 1 ]] && ( [[ $curr_branch != master ]] || [[ ! -z $uncommitted ]] ); then +if [[ ! -z $uncommitted ]]; then + echo "Error: This script must be run on a clean branch with no uncommitted changes" + echo "Uncommitted files:" + echo "$uncommitted" + exit 1 +fi + +if [[ $devel != 1 ]] && [[ $curr_branch != master ]] ; then echo "Error: This script must be run on an up-to-date, clean, master branch" - if [[ ! -z $uncommitted ]]; then - echo "Uncommitted files:" - echo "$uncommitted" - fi exit 1 fi diff --git a/kafka/librdkafka_vendor/librdkafka_darwin.a b/kafka/librdkafka_vendor/librdkafka_darwin.a index 4a036842e..1d88064d6 100644 Binary files a/kafka/librdkafka_vendor/librdkafka_darwin.a and b/kafka/librdkafka_vendor/librdkafka_darwin.a differ diff --git a/kafka/librdkafka_vendor/librdkafka_glibc_linux.a b/kafka/librdkafka_vendor/librdkafka_glibc_linux.a index 9a059c19e..a08a8facb 100644 Binary files a/kafka/librdkafka_vendor/librdkafka_glibc_linux.a and b/kafka/librdkafka_vendor/librdkafka_glibc_linux.a differ diff --git a/kafka/librdkafka_vendor/librdkafka_musl_linux.a b/kafka/librdkafka_vendor/librdkafka_musl_linux.a index ac017a510..196ed79b0 100644 Binary files a/kafka/librdkafka_vendor/librdkafka_musl_linux.a and b/kafka/librdkafka_vendor/librdkafka_musl_linux.a differ diff --git a/kafka/librdkafka_vendor/librdkafka_windows.a b/kafka/librdkafka_vendor/librdkafka_windows.a index 994cff199..bf7cc8fac 100644 Binary files a/kafka/librdkafka_vendor/librdkafka_windows.a and b/kafka/librdkafka_vendor/librdkafka_windows.a differ diff --git a/kafka/librdkafka_vendor/rdkafka.h b/kafka/librdkafka_vendor/rdkafka.h index 96701d62b..04ea7fc5d 100644 --- a/kafka/librdkafka_vendor/rdkafka.h +++ b/kafka/librdkafka_vendor/rdkafka.h @@ -60,13 +60,13 @@ extern "C" { #ifndef WIN32_MEAN_AND_LEAN #define WIN32_MEAN_AND_LEAN #endif -#include