Skip to content

Commit

Permalink
Use metadata structure provided by siesta client
Browse files Browse the repository at this point in the history
  • Loading branch information
serejja committed Mar 23, 2016
1 parent 6acf026 commit 268b84c
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 97 deletions.
5 changes: 0 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

type ProducerConfig struct {
Partitioner Partitioner
MetadataExpire time.Duration
CompressionType string
BatchSize int
Linger time.Duration
Expand All @@ -45,7 +44,6 @@ type ProducerConfig struct {
func NewProducerConfig() *ProducerConfig {
return &ProducerConfig{
Partitioner: NewHashPartitioner(),
MetadataExpire: time.Minute,
BatchSize: 16384,
ClientID: "siesta",
MaxRequests: 10,
Expand All @@ -67,9 +65,6 @@ func ProducerConfigFromFile(filename string) (*ProducerConfig, error) {
}

producerConfig := NewProducerConfig()
if err := setDurationConfig(&producerConfig.MetadataExpire, c["metadata.max.age"]); err != nil {
return nil, err
}
if err := setIntConfig(&producerConfig.BatchSize, c["batch.size"]); err != nil {
return nil, err
}
Expand Down
4 changes: 1 addition & 3 deletions kafka_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ type KafkaProducer struct {
valueSerializer Serializer
messagesChan chan *ProducerRecord
connector siesta.Connector
metadata *Metadata
selector *Selector
}

Expand All @@ -74,7 +73,6 @@ func NewKafkaProducer(config *ProducerConfig, keySerializer Serializer, valueSer
producer.keySerializer = keySerializer
producer.valueSerializer = valueSerializer
producer.connector = connector
producer.metadata = NewMetadata(connector, config.MetadataExpire)

selectorConfig := NewSelectorConfig(config)
producer.selector = NewSelector(selectorConfig)
Expand Down Expand Up @@ -113,7 +111,7 @@ func (kp *KafkaProducer) send(record *ProducerRecord) {
record.encodedKey = serializedKey
record.encodedValue = serializedValue

partitions, err := kp.metadata.Get(record.Topic)
partitions, err := kp.connector.Metadata().PartitionsFor(record.Topic)
if err != nil {
metadata.Error = err
metadataChan <- metadata
Expand Down
82 changes: 0 additions & 82 deletions metadata.go

This file was deleted.

15 changes: 12 additions & 3 deletions network_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ func (nc *NetworkClient) trySend(request *siesta.ProduceRequest, batch []*Produc

response := <-nc.selector.Send(leader, request)
if response.sendErr != nil {
nc.connector.RefreshMetadata([]string{nc.Topic})
err = nc.connector.Metadata().Refresh([]string{nc.Topic})
if err != nil {
Logger.Warn("Send error occurred, returning it but also failed to refresh metadata: %s", err)
}
return response.sendErr
}

Expand All @@ -82,7 +85,10 @@ func (nc *NetworkClient) trySend(request *siesta.ProduceRequest, batch []*Produc
}

if response.receiveErr != nil {
nc.connector.RefreshMetadata([]string{nc.Topic})
err = nc.connector.Metadata().Refresh([]string{nc.Topic})
if err != nil {
Logger.Warn("Receive error occurred, returning it but also failed to refresh metadata: %s", err)
}
return response.receiveErr
}

Expand All @@ -96,7 +102,10 @@ func (nc *NetworkClient) trySend(request *siesta.ProduceRequest, batch []*Produc
status, exists := produceResponse.Status[nc.Topic][nc.Partition]
if exists {
if status.Error == siesta.ErrNotLeaderForPartition {
nc.connector.RefreshMetadata([]string{nc.Topic})
err = nc.connector.Metadata().Refresh([]string{nc.Topic})
if err != nil {
Logger.Warn("Produce error occurred, returning it but also failed to refresh metadata: %s", err)
}
return status.Error
}

Expand Down
14 changes: 10 additions & 4 deletions selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,21 @@ func (s *Selector) send(correlationID int32, conn *net.TCPConn, request siesta.R
encoder := siesta.NewBinaryEncoder(bytes)
writer.Write(encoder)

conn.SetWriteDeadline(time.Now().Add(s.config.WriteTimeout))
_, err := conn.Write(bytes)
err := conn.SetWriteDeadline(time.Now().Add(s.config.WriteTimeout))
if err != nil {
return err
}
_, err = conn.Write(bytes)
return err
}

func (s *Selector) receive(conn *net.TCPConn) ([]byte, error) {
conn.SetReadDeadline(time.Now().Add(s.config.ReadTimeout))
err := conn.SetReadDeadline(time.Now().Add(s.config.ReadTimeout))
if err != nil {
return nil, err
}
header := make([]byte, 8)
_, err := io.ReadFull(conn, header)
_, err = io.ReadFull(conn, header)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 268b84c

Please sign in to comment.