-
Notifications
You must be signed in to change notification settings - Fork 170
KafkaSink
KafkaSink routes messages to Kafka 0.8 broker using Kafka's synchronous producer client. RoutingKey will be used as topic name. The sequentially increased message id will be used as the key for KeyedMessage. Because Suro's message payload is binary, KafkaSink does not support configurable Encoder in Kafka. Since KafkaSink is also asynchronous sink that uses an internal queue, it needs an MessageQueue4Sink.
By default, KafkaSink will use sequentially incremented long number as the key for Kafka's KeyedMessage. If keyTopicMap is specified, KafkaSink will use the hash code of field object as the key. For example, field1 is the key for topic1, keyForTopic should be specified as the following json string
{
...
"keyForTopic": {
"topic1": "field1"
}
}
KafkaSink will call the following java code to use field1's hash code as the key of KeyedMessage
Map<String, Object> msgMap = message.getEntity(new TypeReference<Map<String, Object>>() {});
Object keyField = msgMap.get(keyTopicMap.get(message.getRoutingKey()));
if (keyField != null) {
key = keyField.hashCode();
}
So, if you want to Kafka's partitioner based on field value, you should use Json formatted message.
For Kafka specific properties, go to Kafka 0.8 producer config
- client.id
- metadata.broker.list
- compression.codec
- send.buffer.bytes
- request.timeout.ms
- request.required.acks
- message.send.max.retries
- retry.backoff.ms
- kafka.etc
- keyTopicMap
KafkaSink is ThreadPoolQueuedSink and it has the following common properties:
- queue4Sink
- batchSize
- batchTimeout
- jobQueueSize
- corePoolSize
- maxPoolSize
- jobTimeout
Properties | Description | type | Default |
---|---|---|---|
kafka.etc | We can set up extra properties on this, for example kafka.metrics.reporters. | Properties | |
keyTopicMap | If this Map is not empty, KafkaSink will use field value as the key on KeyeydMessage | Map<String, String>, its key is topic and the value is field name. |