Skip to content

Message

ppatierno edited this page May 2, 2016 · 6 revisions

The AMQP protocol has a well defined data type system and metadata for describing content other than an opaque body, which can be encoded using the same AMQP data type system or handled as raw data bytes. Apache Kafka doesn't have such rich features on transferring messages which are handled as raw bytes.

In order to translate AMQP messages between AMQP client and Apache Kafka, a MessageConverter interface is defined with following two methods :

  • toKafkaRecord : handles the conversion between an AMQP message to Kafka record;
  • toAmqpMessage : translated a Kafka record to an AMQP messages;

The message converter is pluggable through the message.convert property inside the bridge configuration file (bridge.properties).

The bridge provides a DefaultMessageConverter (as default) and a pluggable JsonMessageConverter.

DefaultMessageConverter

It's the simplest converter which works in the following way.

From AMQP message to Kafka record.

  • All properties, application properties, message annotations are lost. They are not encoded in any way in the Kafka record;
  • If partition and key are specified as message annotations, they are get in order to specify partition and key for topic destination in the Kafka record;
  • The AMQP body always handled as bytes and put inside the Kafka record value. The converter supports AMQP value and raw data/binary encoding;

From Kafka record to AMQP message.

  • No properties, application properties, message annotations are generated/filled;
  • Only partition, offset and key message annotations are filled from the Kafka record related information;
  • The AMQP body is encoded as raw data/binary from the corresponding Kafka record value;

JsonMessageConverter

This converter translates and brings all main AMQP message information/metadata/body in a JSON format and it works in the following way.

From AMQP message to Kafka record.

The converter generated a JSON document with following structure :

  • All main properties (messageId, to, subject, ...) are converted in a JSON map named "properties" with property name/property value pairs;
  • All application properties are converted in a JSON map named "applicationProperties" with property name/property value pairs;
  • All message annotations are converted in a JSON map named "messageAnnotations" with annotation name/annotation value pairs;
  • The body is encoded in a JSON map named "body" with a "type" field which specifies if it's AMQP value or raw data encoded and a "section" field containing the body content. A raw data bytes section is Base64 encoded;

From Kafka record to AMQP message.

Starting from the received JSON document fro Kafka record it produce an AMQP message in the following way :

  • All main properties (messageId, to, subject, ...) are filled from the corresponding JSON map named "properties";
  • All application properties are filled from the corresponding JSOM map (if present) named "applicationProperties";
  • All message annotations are filled from the corresponding JSON map (if present) named "messageAnnotations". The annotations related to partition, offset and key will be always filled;
  • The body is encoded as AMQP value or raw data bytes as specified by the "type" field of "body" and the content is get from the "section" field;
{
   "properties": {
      "to": ...
      "messageId": ...
      "subject": ...
      "replyTo": ...
      "correlationId": ...
   }
   "applicationProperties": {
      "prop1": ...
      ...
      "propN": ...
   }
   "messageAnnotations": {
      "partition": ...
      "offset": ...
      "key": ...
      "ann1": ...
      ...
      "annN": ...
   }
   "body": {
      "type": ...
      "section": ...
   }
}

Clone this wiki locally