Skip to content

Commit

Permalink
Log Kafka client errors to file and stop when client terminates (#5)
Browse files Browse the repository at this point in the history
I should really use structured logs (as the rest of HK does) so we can get useful info out of them. That's going to be after this.
  • Loading branch information
tinkerware authored Jun 18, 2018
1 parent ed229d0 commit 71ddfb8
Showing 1 changed file with 80 additions and 24 deletions.
104 changes: 80 additions & 24 deletions kafkatail/kafkatail.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,119 @@ package kafkatail

import (
"context"
"log"
"time"

"fmt"
"github.com/Shopify/sarama"
log "github.com/Sirupsen/logrus"
"os"
)

type Options struct {
Server string `long:"server" description:"kafka server" default:"localhost"`
Port string `long:"port" description:"kafka port" default:"9092"`
Topic string `long:"topic" description:"kafka topic" default:"my_topic"`
Partition int32 `long:"partition" description:"partition to read from"`
ClientLog string `long:"client_log" description:"file to redirect Kafka client logs"`
}

// GetChans returns a list of channels but it only ever has one entry - the
// partition on which we're listening.
// TODO listen on multiple channels to multiple partitions
func GetChans(ctx context.Context, options Options) ([]chan string, error) {
linesChans := make([]chan string, 1, 1)
lines := make(chan string, 1)
linesChans[0] = lines
partitionRecords := make(chan string, 1)

partitions := make([]chan string, 1, 1)
partitions[0] = partitionRecords

if logFilePath := options.ClientLog; logFilePath != "" {
logger := log.New()

if f, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666); err != nil {
log.Warnf("Failed to write Kafka client logs to [%v], using stderr: %v",
logFilePath, err)
} else {
logger.Out = f
}
sarama.Logger = logger
}

serverString := options.Server + ":" + options.Port
// TODO use a reasonable kafka *Config instead of nil for the new consumer
consumer, err := sarama.NewConsumer([]string{serverString}, nil)
config := sarama.NewConfig()
config.ClientID = makeClientId(options.Topic, options.Partition)

brokers := []string{options.Server + ":" + options.Port}
consumer, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
return nil, err
}

partitionConsumer, err := consumer.ConsumePartition(
options.Topic, options.Partition, sarama.OffsetNewest)
if err != nil {
panic(err)
log.Errorf("Error starting PartitionConsumer for topic [%v] partition [%d]\n",
options.Topic, options.Partition)
return nil, err
}

go func() {
log.Printf("consumer started")
log.Infof("Started consumer for topic [%v] partition [%d]\n",
options.Topic, options.Partition)

var lastSuccessOffset int64
var recordsConsumed int64

defer func() {
log.Infof("Stopping consumer for topic [%v] partition [%d]; "+
"consumed records: [%v]; last successfully read offset [%d]",
options.Topic, options.Partition, recordsConsumed, lastSuccessOffset)

close(partitionRecords)

err := partitionConsumer.Close()
if err != nil {
log.Errorf("Error shutting down partition consumer for topic [%v] partition [%d]",
options.Topic, options.Partition)
}

err = consumer.Close()
if err != nil {
log.Errorf("Error shutting down consumer for brokers [%v]", brokers)
}
}()

for {
select {
case msg := <-partitionConsumer.Messages():
case msg, ok := <-partitionConsumer.Messages():
if msg != nil {
lines <- string(msg.Value)
} else {
time.Sleep(1000 * time.Millisecond)
partitionRecords <- string(msg.Value)
lastSuccessOffset = max(lastSuccessOffset, msg.Offset)
recordsConsumed++
}
if !ok {
// Kafka client bailed on us; start clean up, drain messages, then bail out.
partitionConsumer.AsyncClose()
for range partitionConsumer.Messages() {
partitionRecords <- string(msg.Value)
lastSuccessOffset = max(lastSuccessOffset, msg.Offset)
recordsConsumed++
}
return
}
case <-ctx.Done():
// listen for the context's Done channel to clean up and exit
close(lines)
if err := partitionConsumer.Close(); err != nil {
log.Fatalln(err)
}
if err := consumer.Close(); err != nil {
log.Fatalln(err)
}
return
}
}
}()

return linesChans, nil
return partitions, nil
}

func max(a int64, b int64) int64 {
if a < b {
return b
}
return b
}

func makeClientId(topic string, partition int32) string {
return fmt.Sprintf("honeykafka_%v_%v", topic, partition)
}

0 comments on commit 71ddfb8

Please sign in to comment.