-
Notifications
You must be signed in to change notification settings - Fork 10
/
configuration_generator.go
49 lines (42 loc) · 1.44 KB
/
configuration_generator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package kafka_wrapper
import (
"github.com/IBM/sarama"
"time"
)
func GenerateKafkaConfiguration(version, clientId string, shouldWaitAckFromAll bool) *sarama.Config {
v, err := sarama.ParseKafkaVersion(version)
if err != nil {
panic(err)
}
//consumer
config := sarama.NewConfig()
config.Version = v
config.Consumer.Return.Errors = true
config.ClientID = clientId
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Metadata.Retry.Max = 3
config.Metadata.Retry.Backoff = 10 * time.Second
config.Metadata.Full = false
config.Consumer.Group.Session.Timeout = 30 * time.Second
config.Consumer.Group.Heartbeat.Interval = 6 * time.Second
config.Consumer.MaxProcessingTime = 3 * time.Second
config.Consumer.Fetch.Default = 2048 * 1024
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
config.Consumer.Group.Rebalance.Timeout = 3 * time.Minute
//producer
config.Producer.Retry.Max = 3
config.Producer.Retry.Backoff = 10 * time.Second
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Timeout = 3 * time.Minute
config.Net.ReadTimeout = 3 * time.Minute
config.Net.DialTimeout = 3 * time.Minute
config.Net.WriteTimeout = 3 * time.Minute
config.Producer.MaxMessageBytes = 2000000
if shouldWaitAckFromAll {
config.Producer.RequiredAcks = sarama.WaitForAll
} else {
config.Producer.RequiredAcks = sarama.WaitForLocal
}
return config
}