From 4bc5ad471d9ffcb6676fc5ba7afd3ed9ca33bce2 Mon Sep 17 00:00:00 2001 From: childe Date: Tue, 22 Oct 2024 18:55:38 +0800 Subject: [PATCH 1/3] set AllowAutoTopicCreation to true by default --- metadata_request.go | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/metadata_request.go b/metadata_request.go index afcf6d3..7a2f3a3 100644 --- a/metadata_request.go +++ b/metadata_request.go @@ -4,28 +4,6 @@ import ( "encoding/binary" ) -/* -This API answers the following questions: -- What topics exist? -- How many partitions does each topic have? -- Which broker is currently the leader for each partition? -- What is the host and port for each of these brokers? - -This is the only request that can be addressed to any broker in the cluster. - -Since there may be many topics the client can give an optional list of topic names in order to only return metadata for a subset of topics. - -The metadata returned is at the partition level, but grouped together by topic for convenience and to avoid redundancy. For each partition the metadata contains the information for the leader as well as for all the replicas and the list of replicas that are currently in-sync. - -Topics Metadata Request - - TopicsMetadataRequest => [TopicsName] - TopicsName => string - -Field Description -TopicsName The topics to produce metadata for. If empty the request will yield metadata for all topics. -*/ - type MetadataRequest struct { *RequestHeader Topics []string @@ -86,7 +64,8 @@ func NewMetadataRequest(clientID string, topics []string) *MetadataRequest { APIKey: API_MetadataRequest, ClientID: clientID, }, - Topics: topics, + Topics: topics, + AllowAutoTopicCreation: true, } return r From 5a9c4236a4208b464c04321569bded85ad193bca Mon Sep 17 00:00:00 2001 From: childe Date: Tue, 22 Oct 2024 18:55:51 +0800 Subject: [PATCH 2/3] remove unused err varible --- metadata_response.go | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/metadata_response.go b/metadata_response.go index 1285206..c7ca1eb 100644 --- a/metadata_response.go +++ b/metadata_response.go @@ -2,18 +2,13 @@ package healer import ( "encoding/binary" - "errors" "fmt" "net" "sort" "strconv" ) -var ( - errNoTopicsInMetadata = errors.New("no topic returned in metadata response") -) - -// MetadataResponse holds all the parameters of metadata response, including the brokers and topics +// MetadataResponse holds all the fields of metadata response, including the brokers and topics type MetadataResponse struct { CorrelationID uint32 ThrottleTimeMs int32 @@ -23,7 +18,15 @@ type MetadataResponse struct { TopicMetadatas []TopicMetadata } -// Error returns the error abstracted from the error code, actually it always returns nil +// TopicMetadata holds all the fields of topic metadata, which is used in metadata response +type TopicMetadata struct { + TopicErrorCode int16 + TopicName string + IsInternal bool + PartitionMetadatas []*PartitionMetadataInfo +} + +// Error returns the error from the error code func (r MetadataResponse) Error() error { for _, topic := range r.TopicMetadatas { if topic.TopicErrorCode != 0 { @@ -40,7 +43,7 @@ func (r MetadataResponse) Error() error { return nil } -// BrokerInfo holds all the parameters of broker info, which is used in metadata response +// BrokerInfo holds all the fields of broker info, which is used in metadata response type BrokerInfo struct { NodeID int32 Host string @@ -71,14 +74,6 @@ func decodeToBrokerInfo(payload []byte, version uint16) (b BrokerInfo, offset in return } -// TopicMetadata holds all the parameters of topic metadata, which is used in metadata response -type TopicMetadata struct { - TopicErrorCode int16 - TopicName string - IsInternal bool - PartitionMetadatas []*PartitionMetadataInfo -} - func decodeToTopicMetadata(payload []byte, version uint16) (tm TopicMetadata, offset int) { tm.TopicErrorCode = int16(binary.BigEndian.Uint16(payload[offset:])) offset += 2 @@ -101,7 +96,7 @@ func decodeToTopicMetadata(payload []byte, version uint16) (tm TopicMetadata, of return } -// PartitionMetadataInfo holds all the parameters of partition metadata info, which is used in metadata response +// PartitionMetadataInfo holds all the fields of partition metadata info, which is used in metadata response type PartitionMetadataInfo struct { PartitionErrorCode int16 PartitionID int32 From b1ff19afcf8ef86b8166368d43900c721d1855d9 Mon Sep 17 00:00:00 2001 From: childe Date: Tue, 22 Oct 2024 19:03:58 +0800 Subject: [PATCH 3/3] retry if updateCurrentSimpleProducer failed --- config.go | 2 ++ producer.go | 11 +++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/config.go b/config.go index f4bd2ef..b3fff4f 100644 --- a/config.go +++ b/config.go @@ -193,6 +193,7 @@ type ProducerConfig struct { MetadataMaxAgeMS int `json:"metadata.max.age.ms,string" mapstructure:"metadata.max.age.ms"` FetchTopicMetaDataRetrys int `json:"fetch.topic.metadata.retrys,string" mapstructure:"fetch.topic.metadata.retrys"` ConnectionsMaxIdleMS int `json:"connections.max.idle.ms,string" mapstructure:"connections.max.idle.ms"` + RetryBackOffMS int `json:"retry.backoff.ms,string" mapstructure:"retry.backoff.ms"` MetadataRefreshIntervalMS int `json:"metadata.refresh.interval.ms,string" mapstructure:"metadata.refresh.interval.ms"` @@ -226,6 +227,7 @@ func DefaultProducerConfig() ProducerConfig { MetadataMaxAgeMS: 300000, FetchTopicMetaDataRetrys: 3, ConnectionsMaxIdleMS: 540000, + RetryBackOffMS: 200, TLSEnabled: false, diff --git a/producer.go b/producer.go index cbed8af..57af2b3 100644 --- a/producer.go +++ b/producer.go @@ -54,10 +54,13 @@ func NewProducer(topic string, config interface{}) (*Producer, error) { return nil, err } - err = p.updateCurrentSimpleProducer() - if err != nil { - err = fmt.Errorf("update current simple consumer of %s error: %w", p.topic, err) - return nil, err + for { + if err = p.updateCurrentSimpleProducer(); err != nil { + logger.Error(err, "update current simple consumer, sleep and retry", "topic", p.topic, "retry_backoff_ms", p.config.RetryBackOffMS) + time.Sleep(time.Duration(p.config.RetryBackOffMS) * time.Millisecond) + } else { + break + } } ctx := context.Background()