Skip to content

Commit

Permalink
Merge pull request #30 from childe/allow-auto-topic-creation
Browse files Browse the repository at this point in the history
Allow auto topic creation
  • Loading branch information
childe authored Oct 22, 2024
2 parents 9054522 + b1ff19a commit d73be14
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 44 deletions.
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ type ProducerConfig struct {
MetadataMaxAgeMS int `json:"metadata.max.age.ms,string" mapstructure:"metadata.max.age.ms"`
FetchTopicMetaDataRetrys int `json:"fetch.topic.metadata.retrys,string" mapstructure:"fetch.topic.metadata.retrys"`
ConnectionsMaxIdleMS int `json:"connections.max.idle.ms,string" mapstructure:"connections.max.idle.ms"`
RetryBackOffMS int `json:"retry.backoff.ms,string" mapstructure:"retry.backoff.ms"`

MetadataRefreshIntervalMS int `json:"metadata.refresh.interval.ms,string" mapstructure:"metadata.refresh.interval.ms"`

Expand Down Expand Up @@ -226,6 +227,7 @@ func DefaultProducerConfig() ProducerConfig {
MetadataMaxAgeMS: 300000,
FetchTopicMetaDataRetrys: 3,
ConnectionsMaxIdleMS: 540000,
RetryBackOffMS: 200,

TLSEnabled: false,

Expand Down
25 changes: 2 additions & 23 deletions metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,6 @@ import (
"encoding/binary"
)

/*
This API answers the following questions:
- What topics exist?
- How many partitions does each topic have?
- Which broker is currently the leader for each partition?
- What is the host and port for each of these brokers?
This is the only request that can be addressed to any broker in the cluster.
Since there may be many topics the client can give an optional list of topic names in order to only return metadata for a subset of topics.
The metadata returned is at the partition level, but grouped together by topic for convenience and to avoid redundancy. For each partition the metadata contains the information for the leader as well as for all the replicas and the list of replicas that are currently in-sync.
Topics Metadata Request
TopicsMetadataRequest => [TopicsName]
TopicsName => string
Field Description
TopicsName The topics to produce metadata for. If empty the request will yield metadata for all topics.
*/

type MetadataRequest struct {
*RequestHeader
Topics []string
Expand Down Expand Up @@ -86,7 +64,8 @@ func NewMetadataRequest(clientID string, topics []string) *MetadataRequest {
APIKey: API_MetadataRequest,
ClientID: clientID,
},
Topics: topics,
Topics: topics,
AllowAutoTopicCreation: true,
}

return r
Expand Down
29 changes: 12 additions & 17 deletions metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@ package healer

import (
"encoding/binary"
"errors"
"fmt"
"net"
"sort"
"strconv"
)

var (
errNoTopicsInMetadata = errors.New("no topic returned in metadata response")
)

// MetadataResponse holds all the parameters of metadata response, including the brokers and topics
// MetadataResponse holds all the fields of metadata response, including the brokers and topics
type MetadataResponse struct {
CorrelationID uint32
ThrottleTimeMs int32
Expand All @@ -23,7 +18,15 @@ type MetadataResponse struct {
TopicMetadatas []TopicMetadata
}

// Error returns the error abstracted from the error code, actually it always returns nil
// TopicMetadata holds all the fields of topic metadata, which is used in metadata response
type TopicMetadata struct {
TopicErrorCode int16
TopicName string
IsInternal bool
PartitionMetadatas []*PartitionMetadataInfo
}

// Error returns the error from the error code
func (r MetadataResponse) Error() error {
for _, topic := range r.TopicMetadatas {
if topic.TopicErrorCode != 0 {
Expand All @@ -40,7 +43,7 @@ func (r MetadataResponse) Error() error {
return nil
}

// BrokerInfo holds all the parameters of broker info, which is used in metadata response
// BrokerInfo holds all the fields of broker info, which is used in metadata response
type BrokerInfo struct {
NodeID int32
Host string
Expand Down Expand Up @@ -71,14 +74,6 @@ func decodeToBrokerInfo(payload []byte, version uint16) (b BrokerInfo, offset in
return
}

// TopicMetadata holds all the parameters of topic metadata, which is used in metadata response
type TopicMetadata struct {
TopicErrorCode int16
TopicName string
IsInternal bool
PartitionMetadatas []*PartitionMetadataInfo
}

func decodeToTopicMetadata(payload []byte, version uint16) (tm TopicMetadata, offset int) {
tm.TopicErrorCode = int16(binary.BigEndian.Uint16(payload[offset:]))
offset += 2
Expand All @@ -101,7 +96,7 @@ func decodeToTopicMetadata(payload []byte, version uint16) (tm TopicMetadata, of
return
}

// PartitionMetadataInfo holds all the parameters of partition metadata info, which is used in metadata response
// PartitionMetadataInfo holds all the fields of partition metadata info, which is used in metadata response
type PartitionMetadataInfo struct {
PartitionErrorCode int16
PartitionID int32
Expand Down
11 changes: 7 additions & 4 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ func NewProducer(topic string, config interface{}) (*Producer, error) {
return nil, err
}

err = p.updateCurrentSimpleProducer()
if err != nil {
err = fmt.Errorf("update current simple consumer of %s error: %w", p.topic, err)
return nil, err
for {
if err = p.updateCurrentSimpleProducer(); err != nil {
logger.Error(err, "update current simple consumer, sleep and retry", "topic", p.topic, "retry_backoff_ms", p.config.RetryBackOffMS)
time.Sleep(time.Duration(p.config.RetryBackOffMS) * time.Millisecond)
} else {
break
}
}

ctx := context.Background()
Expand Down

0 comments on commit d73be14

Please sign in to comment.