diff --git a/404.html b/404.html index e6f6856cd..4a470bf90 100644 --- a/404.html +++ b/404.html @@ -8,13 +8,13 @@ Page Not Found | Firehose - +
Skip to main content

Page Not Found

We could not find what you were looking for.

Please contact the owner of the site that linked you to the original URL and let them know their link is broken.

- + \ No newline at end of file diff --git a/advance/dlq/index.html b/advance/dlq/index.html index cf5b3b912..effc3bd10 100644 --- a/advance/dlq/index.html +++ b/advance/dlq/index.html @@ -8,13 +8,13 @@ DLQ | Firehose - +
Skip to main content

DLQ

DLQ storage can be configured for certain errors thrown by sink.

DLQ_SINK_ENABLE

  • Example value: true
  • Type: optional
  • Default value: false

DLQ_WRITER_TYPE

DLQ Writer to be configured. The possible values are, KAFKA,BLOB_STORAGE,LOG

  • Example value: BLOB_STORAGE
  • Type: optional
  • Default value: LOG

DLQ_RETRY_MAX_ATTEMPTS

Max attempts to retry for dlq.

  • Example value: 3
  • Type: optional
  • Default value: 2147483647

DLQ_RETRY_FAIL_AFTER_MAX_ATTEMPT_ENABLE

  • Example value: true
  • Type: optional
  • Default value: false

DLQ_BLOB_STORAGE_TYPE

If the writer type is set to BLOB_STORAGE, we can choose any blob storage. Currently, GCS and S3 is supported.

  • Example value: GCS
  • Type: optional
  • Default value: GCS

DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID

  • Example value: my-project-id
  • Type: Required if BLOB storage type is GCS

DLQ_GCS_BUCKET_NAME

  • Example value: dlq-bucket
  • Type: Required if BLOB storage type is GCS

DLQ_GCS_CREDENTIAL_PATH

  • Example value: /path/for/json/credential
  • Type: Required if BLOB storage type is GCS

DLQ_GCS_RETRY_MAX_ATTEMPTS

  • Example value: 3
  • Type: optional
  • Default value: 10

DLQ_GCS_RETRY_TOTAL_TIMEOUT_MS

  • Example value: 120000
  • Type: optional
  • Default value: 120000

DLQ_GCS_RETRY_INITIAL_DELAY_MS

  • Example value: 1000
  • Type: optional
  • Default value: 1000

DLQ_GCS_RETRY_MAX_DELAY_MS

  • Example value: 30000
  • Type: optional
  • Default value: 30000

DLQ_GCS_RETRY_DELAY_MULTIPLIER

  • Example value: 2
  • Type: optional
  • Default value: 2

DLQ_GCS_RETRY_INITIAL_RPC_TIMEOUT_MS

  • Example value: 5000
  • Type: optional
  • Default value: 5000

DLQ_GCS_RETRY_RPC_TIMEOUT_MULTIPLIER

  • Example value: 1
  • Type: optional
  • Default value: 1

DLQ_GCS_RETRY_RPC_MAX_TIMEOUT_MS

  • Example value: 5000
  • Type: optional
  • Default value: 5000

DLQ_KAFKA_ACKS

  • Example value: all
  • Type: optional
  • Default value: all

DLQ_KAFKA_RETRIES

  • Example value: 3
  • Type: optional
  • Default value: 2147483647

DLQ_KAFKA_BATCH_SIZE

  • Example value: 100
  • Type: optional
  • Default value: 16384

DLQ_KAFKA_LINGER_MS

  • Example value: 5
  • Type: optional
  • Default value: 0

DLQ_KAFKA_BUFFER_MEMORY

  • Example value: 33554432
  • Type: optional
  • Default value: 33554432

DLQ_KAFKA_KEY_SERIALIZER

  • Example value: your.own.class
  • Type: optional
  • Default value: org.apache.kafka.common.serialization.ByteArraySerializer

DLQ_KAFKA_VALUE_SERIALIZER

  • Example value: your.own.class
  • Type: optional
  • Default value: org.apache.kafka.common.serialization.ByteArraySerializer

DLQ_KAFKA_BROKERS

  • Example value: 127.0.0.1:1234
  • Type: required if writer type is kafka

DLQ_KAFKA_TOPIC

  • Example value: your-own-topic
  • Type: optional
  • Default value: firehose-retry-topic

DLQ_S3_REGION"

Amazon S3 creates buckets in a Region that you specify.

  • Example value: ap-south-1
  • Type: required

DLQ_S3_BUCKET_NAME"

The Name of Amazon S3 bucket .Here is further documentation of s3 bucket name.

  • Example value: sink_bucket
  • Type: required

DLQ_S3_ACCESS_KEY"

Access Key to access the bucket. This key can also be set through env using AWS_ACCESS_KEY_ID key or by creating credentials file in ${HOME}/.aws/credentials folder . Here is further documentation on how to set through credentials file or environment varialbes

  • Example value: AKIAIOSFODNN7EXAMPLE
  • Type: required

DLQ_S3_SECRET_KEY"

Secret Key to access the bucket. This key can also be set through env using AWS_SECRET_ACCESS_KEY key or by creating credentials file in ${HOME}/.aws/credentials folder . Here is further documentation on how to set through credentials file or environment varialbes

  • Example value: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
  • Type: required

DLQ_S3_RETRY_MAX_ATTEMPTS

Number of retry of the s3 upload request when the request failed.

  • Example value: 10
  • Type: optional
  • Default value : 10

DLQ_S3_BASE_DELAY_MS"

Initial delay for first retry in milliseconds.

  • Example value: 1000
  • Type: optional
  • Default value : 1000

DLQ_S3_MAX_BACKOFF_MS"

Max backoff time for retry in milliseconds

  • Example value: 30000
  • Type: optional
  • Default value : 30000

DLQ_S3_API_ATTEMPT_TIMEOUT_MS"

The amount of time to wait for the http request to complete before giving up and timing out in milliseconds.

  • Example value: 10000
  • Type: optional
  • Default value : 10000

DLQ_S3_API_TIMEOUT_MS"

The amount of time to allow the client to complete the execution of an API call. This timeout covers the entire client execution except for marshalling. Unit is in milliseconds.

  • Example value: 40000
  • Type: optional
  • Default value : 40000
- + \ No newline at end of file diff --git a/advance/errors/index.html b/advance/errors/index.html index 1f18eb9f1..0856fd7b7 100644 --- a/advance/errors/index.html +++ b/advance/errors/index.html @@ -8,14 +8,14 @@ Errors | Firehose - +
Skip to main content

Errors

These errors are returned by sinks. One can configure to which errors should be processed by which decorator. The error type are:

  • DESERIALIZATION_ERROR
  • INVALID_MESSAGE_ERROR
  • UNKNOWN_FIELDS_ERROR
  • SINK_4XX_ERROR
  • SINK_5XX_ERROR
  • SINK_UNKNOWN_ERROR
  • DEFAULT_ERROR
    • If no error is specified

ERROR_TYPES_FOR_FAILING

  • Example value: DEFAULT_ERROR,SINK_UNKNOWN_ERROR
  • Type: optional
  • Default value: DESERIALIZATION_ERROR,INVALID_MESSAGE_ERROR,UNKNOWN_FIELDS_ERROR

ERROR_TYPES_FOR_DLQ

  • Example value: DEFAULT_ERROR,SINK_UNKNOWN_ERROR
  • Type: optional
  • Default value: ``

ERROR_TYPES_FOR_RETRY

  • Example value: DEFAULT_ERROR
  • Type: optional
  • Default value: DEFAULT_ERROR
- + \ No newline at end of file diff --git a/advance/filters/index.html b/advance/filters/index.html index 6b26b0f3a..ec36a8639 100644 --- a/advance/filters/index.html +++ b/advance/filters/index.html @@ -8,13 +8,13 @@ Filters | Firehose - +
Skip to main content

Filters

Following variables need to be set to enable JSON/JEXL filters.

FILTER_ENGINE

Defines whether to use JSON Schema-based filters or JEXL-based filters or NO_OP (i.e. no filtering)

  • Example value: JSON
  • Type: optional
  • Default value: NO_OP

FILTER_JSON_ESB_MESSAGE_TYPE

Defines the format type of the input ESB messages, i.e. JSON/Protobuf. This field is required only for JSON filters.

  • Example value: JSON
  • Type: optional

FILTER_SCHEMA_PROTO_CLASS

The fully qualified name of the proto schema so that the key/message in Kafka could be parsed.

  • Example value: com.gojek.esb.driverlocation.DriverLocationLogKey
  • Type: optional

FILTER_DATA_SOURCE

key/message/nonedepending on where to apply filter

  • Example value: key
  • Type: optional
  • Default value: none

FILTER_JEXL_EXPRESSION

JEXL filter expression

  • Example value: driverLocationLogKey.getVehicleType()=="BIKE"
  • Type: optional

FILTER_JSON_SCHEMA

JSON Schema string containing the filter rules to be applied.

  • Example value: {"properties":{"order_number":{"const":"1253"}}}
  • Type: optional
- + \ No newline at end of file diff --git a/advance/generic/index.html b/advance/generic/index.html index 454b706eb..50973e6a5 100644 --- a/advance/generic/index.html +++ b/advance/generic/index.html @@ -8,13 +8,13 @@ Generic | Firehose - +
Skip to main content

Generic

All sinks in Firehose requires the following variables to be set

INPUT_SCHEMA_DATA_TYPE

Defines the input message schema.

  • Example value: json
  • Type: optional
  • Default value: protobuf

KAFKA_RECORD_PARSER_MODE

Decides whether to parse key or message (as per your input proto) from incoming data.

  • Example value: message
  • Type: required
  • Default value: message

SINK_TYPE

Defines the Firehose sink type.

  • Example value: log
  • Type: required

INPUT_SCHEMA_PROTO_CLASS

Defines the fully qualified name of the input proto class.

  • Example value: com.tests.TestMessage
  • Type: required

INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING

Defines the mapping of the Proto fields to header/query fields in JSON format.

  • Example value: {"1":"order_number","2":"event_timestamp","3":"driver_id"}
  • Type: optional

METRIC_STATSD_HOST

URL of the StatsD host (Telegraf service)

  • Example value: localhost
  • Type: optional
  • Default value: localhost

METRIC_STATSD_PORT

Port of the StatsD host (Telegraf service)

  • Example value: 8125
  • Type: optional
  • Default value: 8125

METRIC_STATSD_TAGS

Global tags for StatsD metrics. Tags must be comma-separated.

  • Example value: team=data-engineering,app=firehose
  • Type: optional

APPLICATION_THREAD_CLEANUP_DELAY

Defines the time duration in milliseconds after which to cleanup the thread.

  • Example value: 400
  • Type: optional
  • Default value: 2000

APPLICATION_THREAD_COUNT

Number of parallel threads to run for Firehose.

  • Example value: 2
  • Type: optional
  • Default value: 1

TRACE_JAEGAR_ENABLE

Defines whether to enable Jaegar tracing or not

  • Example value: true
  • Type: optional
  • Default value: false

LOG_LEVEL

Defines the log level , i.e. debug/info/error.

  • Example value: debug
  • Type: optional
  • Default value: info

INPUT_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE

Proto can have unknown fields as input

  • Example value: true
  • Type: optional
  • Default value: true

Kafka Consumer

SOURCE_KAFKA_BROKERS

Defines the bootstrap server of Kafka brokers to consume from.

  • Example value: localhost:9092
  • Type: required

SOURCE_KAFKA_TOPIC

Defines the list of Kafka topics to consume from.

  • Example value: test-topic
  • Type: required

SOURCE_KAFKA_CONSUMER_CONFIG_MAX_POLL_RECORDS

Defines the batch size of Kafka messages

  • Example value: 705
  • Type: optional
  • Default value: 500

SOURCE_KAFKA_ASYNC_COMMIT_ENABLE

Defines whether to enable async commit for Kafka consumer

  • Example value: false
  • Type: optional
  • Default value: true

SOURCE_KAFKA_CONSUMER_CONFIG_SESSION_TIMEOUT_MS

Defines the duration of session timeout in milliseconds

  • Example value: 700
  • Type: optional
  • Default value: 10000

SOURCE_KAFKA_COMMIT_ONLY_CURRENT_PARTITIONS_ENABLE

Defines whether to commit only current partitions

  • Example value: false
  • Type: optional
  • Default value: true

SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE

Defines whether to enable auto commit for Kafka consumer

  • Example value: 705
  • Type: optional
  • Default value: 500

SOURCE_KAFKA_CONSUMER_GROUP_ID

Defines the Kafka consumer group ID for your Firehose deployment.

  • Example value: sample-group-id
  • Type: required

SOURCE_KAFKA_POLL_TIMEOUT_MS

Defines the duration of poll timeout for Kafka messages in milliseconds

  • Example value: 80000
  • Type: required
  • Default: 9223372036854775807

SOURCE_KAFKA_CONSUMER_CONFIG_METADATA_MAX_AGE_MS

Defines the maximum age of config metadata in milliseconds

  • Example value: 700
  • Type: optional
  • Default value: 500

SOURCE_KAFKA_CONSUMER_MODE

Mode can ASYNC or SYNC

  • Example value: SYNC
  • Type: optional
  • Default value: SYNC

SOURCE_KAFKA_CONSUMER_CONFIG_PARTITION_ASSIGNMENT_STRATEGY

Defines the class of the partition assignor to use for the rebalancing strategy.

  • Example value: org.apache.kafka.clients.consumer.StickyAssignor
  • Type: optional
  • Default value: org.apache.kafka.clients.consumer.CooperativeStickyAssignor

Stencil Client

Stencil, the Protobuf schema registry used by Firehose need the following variables to be set for the Stencil client.

SCHEMA_REGISTRY_STENCIL_ENABLE

Defines whether to enable Stencil Schema registry

  • Example value: true
  • Type: optional
  • Default value: false

SCHEMA_REGISTRY_STENCIL_URLS

Defines the URL of the Proto Descriptor set file in the Stencil Server

  • Example value: http://localhost:8000/v1/namespaces/quickstart/descriptors/example/versions/latest
  • Type: optional

SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS

Defines the timeout in milliseconds to fetch the Proto Descriptor set file from the Stencil Server.

  • Example value: 4000
  • Type: optional
  • Default value: 10000

SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES

Defines the number of times to retry to fetch the Proto Descriptor set file from the Stencil Server.

  • Example value: 4
  • Type: optional
  • Default value: 3

SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS

Defines the minimum time in milliseconds after which to back off from fetching the Proto Descriptor set file from the Stencil Server.

  • Example value: 70000
  • Type: optional
  • Default value: 60000

SCHEMA_REGISTRY_STENCIL_FETCH_AUTH_BEARER_TOKEN

Defines the token for authentication to connect to Stencil Server

  • Example value: tcDpw34J8d1
  • Type: optional

SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH

Defines whether to enable auto-refresh of Stencil cache.

  • Example value: true
  • Type: optional
  • Default value: false

SCHEMA_REGISTRY_STENCIL_CACHE_TTL_MS

Defines the minimum time in milliseconds after which to refresh the Stencil cache.

  • Example value: 900000
  • Type: optional
  • Default value: 900000
- + \ No newline at end of file diff --git a/advance/retries/index.html b/advance/retries/index.html index 4e549e939..89b72ca7f 100644 --- a/advance/retries/index.html +++ b/advance/retries/index.html @@ -8,13 +8,13 @@ Retries | Firehose - +
Skip to main content

Retries

RETRY_EXPONENTIAL_BACKOFF_INITIAL_MS

Initial expiry time in milliseconds for exponential backoff policy.

  • Example value: 10
  • Type: optional
  • Default value: 10

RETRY_EXPONENTIAL_BACKOFF_RATE

Backoff rate for exponential backoff policy.

  • Example value: 2
  • Type: optional
  • Default value: 2

RETRY_EXPONENTIAL_BACKOFF_MAX_MS

Maximum expiry time in milliseconds for exponential backoff policy.

  • Example value: 60000
  • Type: optional
  • Default value: 60000

RETRY_FAIL_AFTER_MAX_ATTEMPTS_ENABLE

Fail the firehose if the retries exceed

  • Example value: true
  • Type: optional
  • Default value: false

RETRY_MAX_ATTEMPTS

Max attempts for retries

  • Example value: 3
  • Type: optional
  • Default value: 2147483647
- + \ No newline at end of file diff --git a/advance/sink-pool/index.html b/advance/sink-pool/index.html index 1d9481f31..536a1190c 100644 --- a/advance/sink-pool/index.html +++ b/advance/sink-pool/index.html @@ -8,13 +8,13 @@ Sink Pool | Firehose - +
Skip to main content

Sink Pool

SINK_POOL_NUM_THREADS

Number of sinks in the pool to process messages in parallel.

  • Example value: 10
  • Type: optional
  • Default value: 1

SINK_POOL_QUEUE_POLL_TIMEOUT_MS

Poll timeout when the worker queue is full.

  • Example value: 1
  • Type: optional
  • Default value: 1000
- + \ No newline at end of file diff --git a/assets/js/210c775d.08cc36ac.js b/assets/js/210c775d.08cc36ac.js deleted file mode 100644 index 067149fd2..000000000 --- a/assets/js/210c775d.08cc36ac.js +++ /dev/null @@ -1 +0,0 @@ -"use strict";(self.webpackChunkfirehose=self.webpackChunkfirehose||[]).push([[915],{3905:function(e,t,n){n.d(t,{Zo:function(){return c},kt:function(){return m}});var r=n(7294);function i(e,t,n){return t in e?Object.defineProperty(e,t,{value:n,enumerable:!0,configurable:!0,writable:!0}):e[t]=n,e}function a(e,t){var n=Object.keys(e);if(Object.getOwnPropertySymbols){var r=Object.getOwnPropertySymbols(e);t&&(r=r.filter((function(t){return Object.getOwnPropertyDescriptor(e,t).enumerable}))),n.push.apply(n,r)}return n}function l(e){for(var t=1;t=0||(i[n]=e[n]);return i}(e,t);if(Object.getOwnPropertySymbols){var a=Object.getOwnPropertySymbols(e);for(r=0;r=0||Object.prototype.propertyIsEnumerable.call(e,n)&&(i[n]=e[n])}return i}var p=r.createContext({}),s=function(e){var t=r.useContext(p),n=t;return e&&(n="function"==typeof e?e(t):l(l({},t),e)),n},c=function(e){var t=s(e.components);return r.createElement(p.Provider,{value:t},e.children)},u={inlineCode:"code",wrapper:function(e){var t=e.children;return r.createElement(r.Fragment,{},t)}},d=r.forwardRef((function(e,t){var n=e.components,i=e.mdxType,a=e.originalType,p=e.parentName,c=o(e,["components","mdxType","originalType","parentName"]),d=s(n),m=i,k=d["".concat(p,".").concat(m)]||d[m]||u[m]||a;return n?r.createElement(k,l(l({ref:t},c),{},{components:n})):r.createElement(k,l({ref:t},c))}));function m(e,t){var n=arguments,i=t&&t.mdxType;if("string"==typeof e||i){var a=n.length,l=new Array(a);l[0]=d;var o={};for(var p in t)hasOwnProperty.call(t,p)&&(o[p]=t[p]);o.originalType=e,o.mdxType="string"==typeof e?e:i,l[1]=o;for(var s=2;sSINK_GRPC_SERVICE_HOST",id:"sink_grpc_service_host",level:3},{value:"SINK_GRPC_SERVICE_PORT",id:"sink_grpc_service_port",level:3},{value:"SINK_GRPC_METHOD_URL",id:"sink_grpc_method_url",level:3},{value:"SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS",id:"sink_grpc_response_schema_proto_class",level:3},{value:"SINK_GRPC_ARG_KEEPALIVE_TIME_MS",id:"sink_grpc_arg_keepalive_time_ms",level:3},{value:"SINK_GRPC_ARG_KEEPALIVE_TIMEOUT_MS",id:"sink_grpc_arg_keepalive_timeout_ms",level:3},{value:"SINK_GRPC_ARG_DEADLINE_MS",id:"sink_grpc_arg_deadline_ms",level:3}],d={toc:u};function m(e){var t=e.components,n=(0,i.Z)(e,l);return(0,a.kt)("wrapper",(0,r.Z)({},d,n,{components:t,mdxType:"MDXLayout"}),(0,a.kt)("h1",{id:"grpc"},"GRPC"),(0,a.kt)("p",null,(0,a.kt)("a",{parentName:"p",href:"https://grpc.io/"},"gRPC")," is a modern open source high performance Remote Procedure Call framework that can run in any environment."),(0,a.kt)("p",null,"A GRPC sink Firehose ","(",(0,a.kt)("inlineCode",{parentName:"p"},"SINK_TYPE"),"=",(0,a.kt)("inlineCode",{parentName:"p"},"grpc"),")"," requires the following variables to be set along with Generic ones"),(0,a.kt)("h3",{id:"sink_grpc_service_host"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_SERVICE_HOST")),(0,a.kt)("p",null,"Defines the host of the GRPC service."),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"http://grpc-service.sample.io")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"required"))),(0,a.kt)("h3",{id:"sink_grpc_service_port"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_SERVICE_PORT")),(0,a.kt)("p",null,"Defines the port of the GRPC service."),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"8500")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"required"))),(0,a.kt)("h3",{id:"sink_grpc_method_url"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_METHOD_URL")),(0,a.kt)("p",null,"Defines the URL of the GRPC method that needs to be called."),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"com.tests.SampleServer/SomeMethod")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"required"))),(0,a.kt)("h3",{id:"sink_grpc_response_schema_proto_class"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS")),(0,a.kt)("p",null,"Defines the Proto which would be the response of the GRPC Method."),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"com.tests.SampleGrpcResponse")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"required"))),(0,a.kt)("h3",{id:"sink_grpc_arg_keepalive_time_ms"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_ARG_KEEPALIVE_TIME_MS")),(0,a.kt)("p",null,"The keepalive ping is a way to check if a channel is currently working by sending HTTP2 pings over the transport. It is sent periodically, and if the ping is not acknowledged by the peer within a certain timeout period, the transport is disconnected. Other keepalive configurations are described ",(0,a.kt)("a",{parentName:"p",href:"https://github.com/grpc/grpc/blob/master/doc/keepalive.md"},"here"),"."),(0,a.kt)("p",null,"Defines the period (in milliseconds) after which a keepalive ping is sent on the transport. If smaller than 10000, 10000 will be used instead."),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"60000")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"optional")),(0,a.kt)("li",{parentName:"ul"},"Default value: ",(0,a.kt)("inlineCode",{parentName:"li"},"infinite"))),(0,a.kt)("h3",{id:"sink_grpc_arg_keepalive_timeout_ms"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_ARG_KEEPALIVE_TIMEOUT_MS")),(0,a.kt)("p",null,"Defines the amount of time (in milliseconds) the sender of the keepalive ping waits for an acknowledgement. If it does not receive an acknowledgment within this time, it will close the connection."),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"5000")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"optional")),(0,a.kt)("li",{parentName:"ul"},"Default value: ",(0,a.kt)("inlineCode",{parentName:"li"},"20000"))),(0,a.kt)("h3",{id:"sink_grpc_arg_deadline_ms"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_ARG_DEADLINE_MS")),(0,a.kt)("p",null,"Defines the amount of time (in milliseconds) gRPC clients are willing to wait for an RPC to complete before the RPC is terminated with the error ",(0,a.kt)("a",{parentName:"p",href:"https://grpc.io/docs/guides/deadlines/#:~:text=By%20default%2C%20gRPC%20does%20not,realistic%20deadline%20in%20your%20clients."},"DEADLINE_EXCEEDED")),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"1000")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"optional"))))}m.isMDXComponent=!0}}]); \ No newline at end of file diff --git a/assets/js/210c775d.d0195589.js b/assets/js/210c775d.d0195589.js new file mode 100644 index 000000000..9cdb77914 --- /dev/null +++ b/assets/js/210c775d.d0195589.js @@ -0,0 +1 @@ +"use strict";(self.webpackChunkfirehose=self.webpackChunkfirehose||[]).push([[915],{3905:function(e,t,n){n.d(t,{Zo:function(){return c},kt:function(){return m}});var i=n(7294);function r(e,t,n){return t in e?Object.defineProperty(e,t,{value:n,enumerable:!0,configurable:!0,writable:!0}):e[t]=n,e}function a(e,t){var n=Object.keys(e);if(Object.getOwnPropertySymbols){var i=Object.getOwnPropertySymbols(e);t&&(i=i.filter((function(t){return Object.getOwnPropertyDescriptor(e,t).enumerable}))),n.push.apply(n,i)}return n}function l(e){for(var t=1;t=0||(r[n]=e[n]);return r}(e,t);if(Object.getOwnPropertySymbols){var a=Object.getOwnPropertySymbols(e);for(i=0;i=0||Object.prototype.propertyIsEnumerable.call(e,n)&&(r[n]=e[n])}return r}var p=i.createContext({}),s=function(e){var t=i.useContext(p),n=t;return e&&(n="function"==typeof e?e(t):l(l({},t),e)),n},c=function(e){var t=s(e.components);return i.createElement(p.Provider,{value:t},e.children)},u={inlineCode:"code",wrapper:function(e){var t=e.children;return i.createElement(i.Fragment,{},t)}},d=i.forwardRef((function(e,t){var n=e.components,r=e.mdxType,a=e.originalType,p=e.parentName,c=o(e,["components","mdxType","originalType","parentName"]),d=s(n),m=r,k=d["".concat(p,".").concat(m)]||d[m]||u[m]||a;return n?i.createElement(k,l(l({ref:t},c),{},{components:n})):i.createElement(k,l({ref:t},c))}));function m(e,t){var n=arguments,r=t&&t.mdxType;if("string"==typeof e||r){var a=n.length,l=new Array(a);l[0]=d;var o={};for(var p in t)hasOwnProperty.call(t,p)&&(o[p]=t[p]);o.originalType=e,o.mdxType="string"==typeof e?e:r,l[1]=o;for(var s=2;sSINK_GRPC_SERVICE_HOST",id:"sink_grpc_service_host",level:3},{value:"SINK_GRPC_SERVICE_PORT",id:"sink_grpc_service_port",level:3},{value:"SINK_GRPC_METHOD_URL",id:"sink_grpc_method_url",level:3},{value:"SINK_GRPC_METADATA",id:"sink_grpc_metadata",level:3},{value:"SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS",id:"sink_grpc_response_schema_proto_class",level:3},{value:"SINK_GRPC_ARG_KEEPALIVE_TIME_MS",id:"sink_grpc_arg_keepalive_time_ms",level:3},{value:"SINK_GRPC_ARG_KEEPALIVE_TIMEOUT_MS",id:"sink_grpc_arg_keepalive_timeout_ms",level:3},{value:"SINK_GRPC_ARG_DEADLINE_MS",id:"sink_grpc_arg_deadline_ms",level:3}],d={toc:u};function m(e){var t=e.components,n=(0,r.Z)(e,l);return(0,a.kt)("wrapper",(0,i.Z)({},d,n,{components:t,mdxType:"MDXLayout"}),(0,a.kt)("h1",{id:"grpc"},"GRPC"),(0,a.kt)("p",null,(0,a.kt)("a",{parentName:"p",href:"https://grpc.io/"},"gRPC")," is a modern open source high performance Remote Procedure Call framework that can run in any environment."),(0,a.kt)("p",null,"A GRPC sink Firehose ","(",(0,a.kt)("inlineCode",{parentName:"p"},"SINK_TYPE"),"=",(0,a.kt)("inlineCode",{parentName:"p"},"grpc"),")"," requires the following variables to be set along with Generic ones"),(0,a.kt)("h3",{id:"sink_grpc_service_host"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_SERVICE_HOST")),(0,a.kt)("p",null,"Defines the host of the GRPC service."),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"http://grpc-service.sample.io")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"required"))),(0,a.kt)("h3",{id:"sink_grpc_service_port"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_SERVICE_PORT")),(0,a.kt)("p",null,"Defines the port of the GRPC service."),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"8500")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"required"))),(0,a.kt)("h3",{id:"sink_grpc_method_url"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_METHOD_URL")),(0,a.kt)("p",null,"Defines the URL of the GRPC method that needs to be called."),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"com.tests.SampleServer/SomeMethod")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"required"))),(0,a.kt)("h3",{id:"sink_grpc_metadata"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_METADATA")),(0,a.kt)("p",null,"Defines the GRPC additional static Metadata that allows clients to provide information to server that is associated with an RPC."),(0,a.kt)("p",null,"Note - final metadata will be generated with merging static metadata and the kafka record header. "),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"authorization:token,dlq:true")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"optional"))),(0,a.kt)("h3",{id:"sink_grpc_response_schema_proto_class"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS")),(0,a.kt)("p",null,"Defines the Proto which would be the response of the GRPC Method."),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"com.tests.SampleGrpcResponse")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"required"))),(0,a.kt)("h3",{id:"sink_grpc_arg_keepalive_time_ms"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_ARG_KEEPALIVE_TIME_MS")),(0,a.kt)("p",null,"The keepalive ping is a way to check if a channel is currently working by sending HTTP2 pings over the transport. It is sent periodically, and if the ping is not acknowledged by the peer within a certain timeout period, the transport is disconnected. Other keepalive configurations are described ",(0,a.kt)("a",{parentName:"p",href:"https://github.com/grpc/grpc/blob/master/doc/keepalive.md"},"here"),"."),(0,a.kt)("p",null,"Defines the period (in milliseconds) after which a keepalive ping is sent on the transport. If smaller than 10000, 10000 will be used instead."),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"60000")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"optional")),(0,a.kt)("li",{parentName:"ul"},"Default value: ",(0,a.kt)("inlineCode",{parentName:"li"},"infinite"))),(0,a.kt)("h3",{id:"sink_grpc_arg_keepalive_timeout_ms"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_ARG_KEEPALIVE_TIMEOUT_MS")),(0,a.kt)("p",null,"Defines the amount of time (in milliseconds) the sender of the keepalive ping waits for an acknowledgement. If it does not receive an acknowledgment within this time, it will close the connection."),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"5000")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"optional")),(0,a.kt)("li",{parentName:"ul"},"Default value: ",(0,a.kt)("inlineCode",{parentName:"li"},"20000"))),(0,a.kt)("h3",{id:"sink_grpc_arg_deadline_ms"},(0,a.kt)("inlineCode",{parentName:"h3"},"SINK_GRPC_ARG_DEADLINE_MS")),(0,a.kt)("p",null,"Defines the amount of time (in milliseconds) gRPC clients are willing to wait for an RPC to complete before the RPC is terminated with the error ",(0,a.kt)("a",{parentName:"p",href:"https://grpc.io/docs/guides/deadlines/#:~:text=By%20default%2C%20gRPC%20does%20not,realistic%20deadline%20in%20your%20clients."},"DEADLINE_EXCEEDED")),(0,a.kt)("ul",null,(0,a.kt)("li",{parentName:"ul"},"Example value: ",(0,a.kt)("inlineCode",{parentName:"li"},"1000")),(0,a.kt)("li",{parentName:"ul"},"Type: ",(0,a.kt)("inlineCode",{parentName:"li"},"optional"))))}m.isMDXComponent=!0}}]); \ No newline at end of file diff --git a/assets/js/runtime~main.1ce45bbc.js b/assets/js/runtime~main.f175c5b5.js similarity index 98% rename from assets/js/runtime~main.1ce45bbc.js rename to assets/js/runtime~main.f175c5b5.js index 7bc018312..65518d9f8 100644 --- a/assets/js/runtime~main.1ce45bbc.js +++ b/assets/js/runtime~main.f175c5b5.js @@ -1 +1 @@ -!function(){"use strict";var e,f,t,n,r,a={},c={};function o(e){var f=c[e];if(void 0!==f)return f.exports;var t=c[e]={id:e,loaded:!1,exports:{}};return a[e].call(t.exports,t,t.exports,o),t.loaded=!0,t.exports}o.m=a,o.c=c,e=[],o.O=function(f,t,n,r){if(!t){var a=1/0;for(u=0;u=r)&&Object.keys(o.O).every((function(e){return o.O[e](t[d])}))?t.splice(d--,1):(c=!1,r0&&e[u-1][2]>r;u--)e[u]=e[u-1];e[u]=[t,n,r]},o.n=function(e){var f=e&&e.__esModule?function(){return e.default}:function(){return e};return o.d(f,{a:f}),f},t=Object.getPrototypeOf?function(e){return Object.getPrototypeOf(e)}:function(e){return e.__proto__},o.t=function(e,n){if(1&n&&(e=this(e)),8&n)return e;if("object"==typeof e&&e){if(4&n&&e.__esModule)return e;if(16&n&&"function"==typeof e.then)return e}var r=Object.create(null);o.r(r);var a={};f=f||[null,t({}),t([]),t(t)];for(var c=2&n&&e;"object"==typeof c&&!~f.indexOf(c);c=t(c))Object.getOwnPropertyNames(c).forEach((function(f){a[f]=function(){return e[f]}}));return a.default=function(){return e},o.d(r,a),r},o.d=function(e,f){for(var t in f)o.o(f,t)&&!o.o(e,t)&&Object.defineProperty(e,t,{enumerable:!0,get:f[t]})},o.f={},o.e=function(e){return Promise.all(Object.keys(o.f).reduce((function(f,t){return o.f[t](e,f),f}),[]))},o.u=function(e){return"assets/js/"+({26:"fb712b3b",53:"935f2afb",75:"0dffb83e",106:"8a495ff9",119:"caaf7770",128:"a09c2993",139:"65608a03",149:"207a9a9a",189:"c8afe516",217:"d9e16301",253:"b6eeaa08",371:"7b339fd1",402:"d1aadf6e",403:"f3275a05",412:"ce7f4e9c",453:"3d917fd3",468:"0993e877",514:"1be78505",520:"86efa356",569:"8f1026b0",576:"ddb54152",586:"2f24fa3e",592:"03831d4f",596:"0a19fd4d",601:"b9784030",605:"28c0e2f6",621:"9bc8abf4",635:"846656e5",641:"7ff2cdfe",643:"fc49bffc",723:"a93b5b84",775:"684972ed",831:"efa77650",839:"44af7337",854:"dab89a64",886:"8a1416ba",902:"87734bb2",909:"a0ca9226",915:"210c775d",918:"17896441",955:"1e214f97",979:"fc1011c6",999:"85b8c529"}[e]||e)+"."+{26:"1b707e5c",53:"6dbf8f7f",75:"3f470d2b",106:"1e135459",119:"fb21365d",128:"e356be8e",139:"d7264251",149:"ae7a56eb",189:"1cf8a08d",217:"d08336b1",253:"e845266d",371:"4eca1d2c",402:"52ef0e04",403:"0a8ee394",412:"11f3233c",453:"b4ae89fb",468:"354d1058",514:"82e067b7",520:"f7791586",569:"892676c8",576:"e977da0c",586:"31c75330",592:"9783f922",596:"cba4d464",601:"ad42da67",605:"157bdb3d",621:"74803fd1",635:"df253800",641:"0fe1465c",643:"491688c1",723:"93902480",775:"aea56cd6",831:"b8b6e14d",839:"acc5afd6",854:"1de41628",886:"71838689",902:"8364d6f0",909:"b4cb0f57",915:"08cc36ac",918:"0e6dd5ea",955:"73b136c0",972:"7d6af76f",979:"fc2b04e9",999:"9a414304"}[e]+".js"},o.miniCssF=function(e){},o.g=function(){if("object"==typeof globalThis)return globalThis;try{return this||new Function("return this")()}catch(e){if("object"==typeof window)return window}}(),o.o=function(e,f){return Object.prototype.hasOwnProperty.call(e,f)},n={},r="firehose:",o.l=function(e,f,t,a){if(n[e])n[e].push(f);else{var c,d;if(void 0!==t)for(var i=document.getElementsByTagName("script"),u=0;u=r)&&Object.keys(o.O).every((function(e){return o.O[e](t[d])}))?t.splice(d--,1):(c=!1,r0&&e[u-1][2]>r;u--)e[u]=e[u-1];e[u]=[t,n,r]},o.n=function(e){var f=e&&e.__esModule?function(){return e.default}:function(){return e};return o.d(f,{a:f}),f},t=Object.getPrototypeOf?function(e){return Object.getPrototypeOf(e)}:function(e){return e.__proto__},o.t=function(e,n){if(1&n&&(e=this(e)),8&n)return e;if("object"==typeof e&&e){if(4&n&&e.__esModule)return e;if(16&n&&"function"==typeof e.then)return e}var r=Object.create(null);o.r(r);var a={};f=f||[null,t({}),t([]),t(t)];for(var c=2&n&&e;"object"==typeof c&&!~f.indexOf(c);c=t(c))Object.getOwnPropertyNames(c).forEach((function(f){a[f]=function(){return e[f]}}));return a.default=function(){return e},o.d(r,a),r},o.d=function(e,f){for(var t in f)o.o(f,t)&&!o.o(e,t)&&Object.defineProperty(e,t,{enumerable:!0,get:f[t]})},o.f={},o.e=function(e){return Promise.all(Object.keys(o.f).reduce((function(f,t){return o.f[t](e,f),f}),[]))},o.u=function(e){return"assets/js/"+({26:"fb712b3b",53:"935f2afb",75:"0dffb83e",106:"8a495ff9",119:"caaf7770",128:"a09c2993",139:"65608a03",149:"207a9a9a",189:"c8afe516",217:"d9e16301",253:"b6eeaa08",371:"7b339fd1",402:"d1aadf6e",403:"f3275a05",412:"ce7f4e9c",453:"3d917fd3",468:"0993e877",514:"1be78505",520:"86efa356",569:"8f1026b0",576:"ddb54152",586:"2f24fa3e",592:"03831d4f",596:"0a19fd4d",601:"b9784030",605:"28c0e2f6",621:"9bc8abf4",635:"846656e5",641:"7ff2cdfe",643:"fc49bffc",723:"a93b5b84",775:"684972ed",831:"efa77650",839:"44af7337",854:"dab89a64",886:"8a1416ba",902:"87734bb2",909:"a0ca9226",915:"210c775d",918:"17896441",955:"1e214f97",979:"fc1011c6",999:"85b8c529"}[e]||e)+"."+{26:"1b707e5c",53:"6dbf8f7f",75:"3f470d2b",106:"1e135459",119:"fb21365d",128:"e356be8e",139:"d7264251",149:"ae7a56eb",189:"1cf8a08d",217:"d08336b1",253:"e845266d",371:"4eca1d2c",402:"52ef0e04",403:"0a8ee394",412:"11f3233c",453:"b4ae89fb",468:"354d1058",514:"82e067b7",520:"f7791586",569:"892676c8",576:"e977da0c",586:"31c75330",592:"9783f922",596:"cba4d464",601:"ad42da67",605:"157bdb3d",621:"74803fd1",635:"df253800",641:"0fe1465c",643:"491688c1",723:"93902480",775:"aea56cd6",831:"b8b6e14d",839:"acc5afd6",854:"1de41628",886:"71838689",902:"8364d6f0",909:"b4cb0f57",915:"d0195589",918:"0e6dd5ea",955:"73b136c0",972:"7d6af76f",979:"fc2b04e9",999:"9a414304"}[e]+".js"},o.miniCssF=function(e){},o.g=function(){if("object"==typeof globalThis)return globalThis;try{return this||new Function("return this")()}catch(e){if("object"==typeof window)return window}}(),o.o=function(e,f){return Object.prototype.hasOwnProperty.call(e,f)},n={},r="firehose:",o.l=function(e,f,t,a){if(n[e])n[e].push(f);else{var c,d;if(void 0!==t)for(var i=document.getElementsByTagName("script"),u=0;u Architecture | Firehose - +

Architecture

Firehose Architecture

Firehose has the capability to run parallelly on threads. Each thread does the following:

  • Get messages from Kafka
  • Filter the messages (optional)
  • Messages are processed by a chain of decorators, each decorator calling the previous one and processing the returned result.
    • Push these messages to sink.
    • Process the messages not pushed to sink by chained decorators.
      1. Fail if the error is configured.
      2. Retry if the error is configured.
      3. Push to DLQ if the error is configured.
      4. Ignore messages.
  • Captures telemetry and success/failure events and send them to Telegraf
  • Repeat the process

System Design

Components

Consumer

  • Firehose supports sync and async consumers.
    • Sync and async consumers differ in their kafka offset commit strategy.
    • SyncConsumer pulls messages from kafka, sends to sink and commits offsets in one single threads. Although Sink can choose to manage its own offsets by implementation appropriate methods. Consumer will get offsets from the sink while committing.
    • AsyncConsumer pulls messages from kafka, submit a task to send messages to SinkPool, tries to commit the kafka offsets for the messages for which the tasks are finished.

Filter

  • Here it looks for any filters that are configured while creating Firehose.
  • There can be a filter on either Key or on Message depending on which fields you want to apply filters on.
  • One can choose not to apply any filters and send all the records to the sink.
  • It will apply the provided filter condition on the incoming batch of messages and pass the list of filtered messages to the Sink class for the configured sink type.

Sink

  • Firehose has an exclusive Sink class for each of the sink types, this Sink receives a list of filtered messages that will go through a well-defined lifecycle.
  • All the existing sink types follow the same contract/lifecycle defined in AbstractSink.java. It consists of two stages:
    • Prepare: Transformation over-filtered messages’ list to prepare the sink-specific insert/update client requests.
    • Execute: Requests created in the Prepare stage are executed at this step and a list of failed messages is returned (if any) for retry.
  • Underlying implementation of AbstractSink can use implementation present in depot.
  • If the batch has any failures, Firehose will retry to push the failed messages to the sink

SinkPool

  • Firehose can have a sinkpool to submit tasks based on the configuration. SinkPool is used to asynchronously process messages.
  • SinkPool is defined by number of threads and poll timeout of the worker queue.

Instrumentation

  • Instrumentation is a wrapper around statsD client and logging. Its job is to capture Important metrics such as Latencies, Successful/Failed Messages Count, Sink Response Time, etc. for each record that goes through the Firehose ecosystem.

Commit

  1. SyncConsumer:
    1. In the Consumer thread, Firehose commits the offset after the messages are sent successfully .
  2. AsyncConsumer:
    1. In the Consumer thread, Firehose checks if any of the sink-pool tasks are finished. It sets the offsets of those messages to be committable. It commits the all the committable offsets.

Message Final State

The final state of message can be any one of the followings after it is consumed from kafka:

  • Sink
  • DLQ
  • Ignored
  • Filtered

One can monitor via plotting the metrics related to messages.

Schema Handling

  • Incase when INPUT_SCHEMA_DATA_TYPE is set to protobuf

    • Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a protobuf schema.

    • Firehose deserializes the data consumed from the topics using the Protobuf descriptors generated out of the artifacts. The artifactory is an HTTP interface that Firehose uses to deserialize.

    • The schema handling ie., find the mapped schema for the topic, downloading the descriptors, and dynamically being notified of/updating with the latest schema is abstracted through the Stencil library.

      The Stencil is a proprietary library that provides an abstraction layer, for schema handling.

      Schema Caching, dynamic schema updates, etc. are features of the stencil client library.

  • Incase when INPUT_SCHEMA_DATA_TYPE is set to json

    • Currently this config is only supported in Bigquery sink,
    • For json, in bigquery sink the schema is dynamically inferred from incoming data, in future we plan to provide json schema support via stencil.

Firehose Integration

The section details all integrating systems for Firehose deployment. These are external systems that Firehose connects to.

Firehose Integration

Kafka

  • The Kafka topic(s) where Firehose reads from. The SOURCE_KAFKA_TOPIC config can be set in Firehose.

ProtoDescriptors

  • Generated protobuf descriptors which are hosted behind an artifactory/HTTP endpoint. This endpoint URL and the proto that the Firehose deployment should use to deserialize data with is configured in Firehose.

Telegraf

  • Telegraf is run as a process beside Firehose to export metrics to InfluxDB. Firehose internally uses statsd, a java library to export metrics to Telegraf. Configured with statsd host parameter that Firehose points.

Sink

  • The storage where Firehose eventually pushes the data. Can be an HTTP/GRPC Endpoint or one of the Databases mentioned in the Architecture section. Sink Type and each sink-specific configuration are relevant to this integration point.

InfluxDB

  • InfluxDB - time-series database where all Firehose metrics are stored. Integration through the Telegraf component.

For a complete set of configurations please refer to the sink-specific configuration.

- + \ No newline at end of file diff --git a/concepts/consumer/index.html b/concepts/consumer/index.html index 54d84885c..ea52b77e2 100644 --- a/concepts/consumer/index.html +++ b/concepts/consumer/index.html @@ -8,7 +8,7 @@ Firehose Consumer | Firehose - + @@ -17,7 +17,7 @@ SOURCE_KAFKA_CONSUMER_MODE can be set as SYNC or ASYNC. SyncConsumer run in one thread on the other hand AsyncConsumer has a SinkPool. SinkPool can be configured by setting SINK_POOL_NUM_THREADS.

FirehoseSyncConsumer

  • Pull messages from kafka in batches.
  • Apply filter based on filter configuration
  • Add offsets of Not filtered messages into OffsetManager and set them committable.
  • call sink.pushMessages() with filtered messages.
  • Add offsets for remaining messages and set them committable.
  • Call consumer.commit()
  • Repeat.

FirehoseAsyncConsumer

  • Pull messages from kafka in batches.
  • Apply filter based on filter configuration
  • Add offsets of Not filtered messages into OffsetManager and set them committable.
  • Schedule a task on SinkPool for these messages.
  • Add offsets of these messages with key as the returned Future,
  • Check SinkPool for finished tasks.
  • Set offsets to be committable for any finished future.
  • Call consumer.commit()
  • Repeat.
- + \ No newline at end of file diff --git a/concepts/decorators/index.html b/concepts/decorators/index.html index 39121774d..bfeb349ec 100644 --- a/concepts/decorators/index.html +++ b/concepts/decorators/index.html @@ -8,7 +8,7 @@ Decorators | Firehose - + @@ -24,7 +24,7 @@ The BlobStorageDlqWriter converts each message into json String, and appends multiple messages via new line. These messages are pushed to a blob storage. The object name for messages istopic_name/consumed_timestamp/a-random-uuid.

SinkFinal

This decorator is the black hole for messages. The messages reached here are ignored and no more processing is done on them.

- + \ No newline at end of file diff --git a/concepts/filters/index.html b/concepts/filters/index.html index 72ed20db9..d5d5e456f 100644 --- a/concepts/filters/index.html +++ b/concepts/filters/index.html @@ -8,13 +8,13 @@ Filters | Firehose - +

Filters

Filtering Messages

When you are consuming from a topic that collects data from multiple publishers and you are concerned with only a particular subset of that data, in that case, you don’t need to consume all of it from the topic.

Instead, use this Filter feature provided in Firehose which allows you to apply any filters on the fields present in the key or message of the event data set and helps you narrow it down to your use case-specific data.

However, understand that you are ignoring some data/messages and if that ignored value is important for your sink, you might lose/skip data that is essential. So it is best to verify that your filter condition is not skipping essential data, for example, payment transactions.

Firehose uses the Apache Commons JEXL library for parsing the filter expressions. Refer to the Using Filters section for details on how to configure filters.

JEXL - based Filtering

JEXL (Java EXpressions Language) is an Apache Commons library intended to facilitate the implementation of dynamic and scripting features in applications and frameworks written in Java. Its goal is to expose scripting features usable by technical operatives or consultants working with enterprise platforms.

The API and the expression language exploit Java-beans naming patterns through introspection to expose property getters and setters. It also considers public class fields as properties and allows to invoke any accessible method. JEXL-based filters can be applied to only Protobuf messages, and not on JSON messages.

To evaluate expressions using JEXL, you need three things:

Read more about Apache Commons JEXL project here.

How JEXL-based Filters Work

The filtering occurs in the following steps -

  • Firehose Consumer creates a Filter object and initializes it with the values of -FILTER_DATA_SOURCE i.e. key/message,FILTER_JEXL_EXPRESSION and FILTER_SCHEMA_PROTO_CLASSas configured in the environment variables.
  • JexlFilter iterates over the input List of events. For each event, the Protobuf Class as specified by the environment variable FILTER_SCHEMA_PROTO_CLASS , converts the key/message of the event from raw byte array to a POJO (Plain Old Java Object), which contains getters for accessing various fields of the event data.
  • AJEXLContext is created to link the key/message proto reference in the JEXL expression with the POJO object generated earlier. JEXL engine converts FILTER_JEXL_EXPRESSION string into JEXLExpression object and replaces all occurrences of references in the string by the generated POJO object. JEXLException is thrown if the filter expression is invalid.
  • The JEXLExpression is then evaluated for each of these parsed events. The messages for which the JEXLExpression evaluates to true, are added to the output List of messages and returned by the Filter.

JSON - based Filtering

JSON-based filtering uses a JSON Schema string to apply filter rules and validate if the JSON message passes all filter checks. JSON Schema is a vocabulary that allows you to annotate and validate JSON documents. JSON Schema is a JSON media type for defining the structure of JSON data. It provides a contract for what JSON data is required for a given application and how to interact with it.

To filter messages using JSON-based filters, you need two things:

  • A JSON Schema string, containing the filter rules
  • A JSON Schema Validator, to validate the message against the JSON Schema

You can read more about JSON Schema here. For more details on JSON Schema syntax and specifications, refer this article. The JSON Schema Validator library used in Firehose is networknt/json-schema-validator . JSON-based filters can be applied to both JSON and Protobuf messages.

How JSON-based Filters Work

The filtering occurs in the following steps -

  • JSON filter configurations are validated and logged to firehoseInstrumentation by JsonFilterUtil. In case any configuration is invalid, then IllegalArgumentException is thrown and Firehose is terminated.
  • If FILTER_ESB_MESSAGE_FORMAT=PROTOBUF, then the serialized key/message protobuf byte array is deserialized to POJO object by the Proto schema class. It is then converted to a JSON string so that it can be parsed by the JSON Schema Validator.
  • IfFILTER_ESB_MESSAGE_FORMAT=JSON, then the serialized JSON byte array is deserialized to a JSON message string.
  • The JSON Schema validator performs a validation on the JSON message against the filter rules specified in the JSON Schema string provided in the environment variableFILTER_JSON_SCHEMA.
  • If there are any validation errors, then that key/message is filtered out and the validation errors are logged to the firehoseInstrumentation in debug mode.
  • If all validation checks pass, then the key/message is added to the ArrayList of filtered messages and returned by the JsonFilter.

Why Use Filters

Filters enable you to consume only a smaller subset of incoming messages fulfilling a particular set of criteria while discarding other messages. This is helpful in cases like for e.g.- processing the status of drivers riding a bike, obtaining data of drivers within a particular city, etc.

Additionally, Filters can also help to significantly decrease consumer lag when the rate of incoming messages is too high, thus providing significant performance improvements for your sink.

If your underlying sink is not able to handle increased (or default) volume of data being pushed to it, adding a filter condition in the Firehose to ignore unnecessary messages in the topic would help you bring down the volume of data being processed by the sink.

- + \ No newline at end of file diff --git a/concepts/monitoring/index.html b/concepts/monitoring/index.html index ed0d25090..6d330e542 100644 --- a/concepts/monitoring/index.html +++ b/concepts/monitoring/index.html @@ -8,13 +8,13 @@ Monitoring | Firehose - +

Monitoring

Firehose provides a detailed health dashboard (Grafana) for effortless monitoring. Always know what’s going on with your deployment with built-in monitoring of throughput, response times, errors and more.

The Firehose Grafana dashboard provides detailed visualization of all Firehose metrics. For further details on each metric, please refer the Metrics section.

Flow of Metrics from Firehose to Grafana

StatsD Client

StatsD is a simple protocol for sending application metrics via UDP. StatsD is simple and has a tiny footprint. It can’t crash your application and has become the standard for large-scale metric collection. Firehose uses the StatsD client library to send metrics to the Telegraf StatsD host.

Telegraf

Telegraf is an agent written in Go and accepts StatsD protocol metrics over UDP. It works as a metrics aggregator and then periodically flushes the metrics to InfluxDB database. Telegraf is deployed in a container along with Firehose container in the Kubernetes pod.

InfluxDB

InfluxDB is a time-series database where all Firehose metrics are stored. It accepts metrics flushed from the Telegraf agent. InfluxDB stores the metrics along with corresponding metric tags, e.g. - SUCCESS_TAG , FAILURE_TAG InfluxDB is deployed on a separate Kubernetes cluster.

Grafana

Grafana is a multi-platform open-source analytics and interactive visualization web app. It pulls metrics data from the InfluxDB database and provides detailed visualization of all Firehose metrics in real-time. Metrics visualization can be obtained for all or specific pods.

Setting Up Grafana with Firehose

Set Up Grafana service

Create a Grafana Cloud account, to set up Grafana metrics dashboard on the cloud, or download Grafana to set up Grafana dashboard locally. Grafana can also be deployed on Docker by pulling InfluxDB image from DockerHub.

$ docker pull grafana/grafana

Grafana server runs on localhost:3000 by default. Make sure to add your InfluxDB server as the data source in Grafana Data Sources section.

If your using InfluxDB version 1.8 or earlier, then Grafana dashboard can be loaded on your Grafana cloud account or local Grafana server by importing the JSON file firehose-grafana-dashboard-updated.json in the docs/assets/ directory of the Firehose.

Make sure to select InfluxQL as the query language while configuring the InfluxDB as the Grafana data source, since the InfluxDB queries for the Firehose metrics in the firehose-grafana-dashboard-updated.json currently support only InfluxDB 1.x. Then provide the InfluxDB login credentials and other parameters like organization name and bucket name.

Set Up InfluxDB Server

Follow this guide to set up and link your InfluxDB database with Grafana. InfluxDB can be installed locally or can be set up on the cloud via InfluxDB Cloud. For local setup, InfluxDB installer can be downloaded from here. InfluxDB can also be deployed on Docker by pulling InfluxDB image from DockerHub. The following command will pull the InfluxDb 2.x latest version.

$ docker pull influxdb

For compatibity with the sample Firehose Grafana dashboard (which current only supports InfluxQL query language), you must download and install InfluxDB 1.x . To deploy InfluxDB 1.x on Docker, run the following to pull the image from Docker Hub.

$ docker pull influxdb:1.8

Make sure to add the InfluxDB server URLs, port and the login credentials to the telegraf.conf file. By default, InfluxDB server is hosted on localhost:8086 . Create a new bucket for the Firehose metrics and configure the bucket name in the telegraf.conf file, as well as in the InfluxDB source properties in the Grafana Data Sources section

The Telegraf output plugin configuration for InfluxDB v2 + can be found here. The Telegraf output plugin configuration for InfluxDB v1.x can be found here.

Set Up Telegraf Server

Lastly, set up Telegraf to send metrics to InfluxDB, following the corresponding instructions according to your Firehose deployment -

Firehose in Docker or deployed locally

  1. Follow this guide to install and set up Telegraf as the StatsD host. For a sample Telegraf file, needed for Firehose you may refer the telegraf.conf file in docs/assets/
  2. Configure the Firehose environment variables METRIC_STATSD_HOST and METRIC_STATSD_PORT to the IP address and port on which Telegraf is listening. By default, METRIC_STATSD_HOST is set to localhost and METRIC_STATSD_PORT is set to the default listener port of Telegraf, i.e.8125
  3. In telegraf.conf, check these 2 configs in INPUT PLUGINS in the StatsD plugin configuration, set datadog_extensions = true and datadog_distributions = true
  4. Configure the URL and port of the InfluxDB server in the Telegraf configuration file, i.e.~/.telegraf/telegraf.conf

Firehose deployed on Kubernetes **

  1. Follow this guide for deploying Firehose on a Kubernetes cluster using a Helm chart.
  2. Configure the following parameters in the default values.yaml file and run -
$ helm install my-release -f values.yaml goto/firehose
KeyTypeDefaultDescription
telegraf.config.output.influxdb.databasestring"test-db"db name for telegraf influxdb output
telegraf.config.output.influxdb.enabledboolfalseflag for enabling telegraf influxdb output
telegraf.config.output.influxdb.retention_policystring"autogen"retention policy for telegraf influxdb output
telegraf.config.output.influxdb.urlslist["http://localhost:8086"]influxdb urls for telegraf output
telegraf.enabledboolfalseflag for enabling telegraf

Grafana Dashboard

Grafana is a multi-platform open-source analytics and interactive visualization web application. It provides charts, graphs, and alerts for the web when connected to supported data sources. A licensed Grafana Enterprise version with additional capabilities is also available as a self-hosted installation or an account on the Grafana Labs cloud service.

Grafana dashboard can be loaded on your Grafana cloud account by importing the JSON file firehose-grafana-dashboard-updated.json in the docs/assets/ directory.

Firehose Grafana dashboard

Load a Firehose dashboard by configuring the following parameters -

  • Data Source - name of InfluxDB database for metrics
  • Prometheus Data Source - Prometheus cluster name if any
  • Organization - name of the InfluxDB organization
  • Firehose Name - the Firehose Kafka Consumer Group ID
  • Pod - specify a particular Kubernetes pod ID or select All to track all pods.

Features of Grafana dashboard

  • metrics of every pod at any point of time can be obtained by hovering over that time coordinate.
  • each pod has its own color code to distinguish it from other pods, in the visualization graph.
  • minimum, maximum, and average values of the metrics are displayed for every pod.

Dashboard Sections

Type Details

Overview

Pods Health

Kafka Consumer Details

Error

Memory

Garbage Collection

Retry

HTTP Sink

Filter

- + \ No newline at end of file diff --git a/concepts/offsets/index.html b/concepts/offsets/index.html index b4f931f46..cdc743038 100644 --- a/concepts/offsets/index.html +++ b/concepts/offsets/index.html @@ -8,7 +8,7 @@ Offset manager | Firehose - + @@ -19,7 +19,7 @@ Each Topic-Partition has a sorted list by offsets. The OffsetNode is added into this sorted list. OffsetNode is also added into the map keyed by provided key.

Setting a batch to be Committable.

setCommittable(Object batch) sets a flag isCommittable to be true on each OffsetNode on the batch. It also removes from the map toBeCommittableBatchOffsets.

Getting Committable offsets

getCommittableOffset()

  • For each topic-partition:
    • Look for the contiguous offsets in the sorted list which are set to be committed.
    • Return the largest offset from the contiguous series.
    • Delete smaller OffsetNodes from the sorted list.
- + \ No newline at end of file diff --git a/concepts/overview/index.html b/concepts/overview/index.html index cfe6de701..d1581d3e8 100644 --- a/concepts/overview/index.html +++ b/concepts/overview/index.html @@ -8,7 +8,7 @@ Overview | Firehose - + @@ -21,7 +21,7 @@ message of the event data set and helps you narrow it down to your use case-specific data.

Templating

Firehose provides various templating features

Decorators

Decorators are used for chained processing of messages.

  • SinkWithFailHandler
  • SinkWithRetry
  • SinkWithDlq
  • SinkFinal

FirehoseConsumer

A firehose consumer read messages from kafka, pushes those messages to sink and commits offsets back to kafka based on certain strategies.

Offsets

Offset manager is a data structure used to manage offsets asynchronously. An offset should only be committed when a message is processed fully. Offset manager maintains a state of all the offsets of all topic-partitions, that can be committed. It can also be used by sinks to manage its own offsets.

- + \ No newline at end of file diff --git a/concepts/templating/index.html b/concepts/templating/index.html index 3fd8b000a..578b75111 100644 --- a/concepts/templating/index.html +++ b/concepts/templating/index.html @@ -8,13 +8,13 @@ Templating | Firehose - +

Templating

Firehose HTTP sink supports payload templating using SINK_HTTP_JSON_BODY_TEMPLATE configuration. It uses JsonPath for creating Templates which is a DSL for basic JSON parsing. Playground for this: https://jsonpath.com/, where users can play around with a given JSON to extract out the elements as required and validate the jsonpath. The template works only when the output data format SINK_HTTP_DATA_FORMAT is JSON.

Creating Templates:

This is really simple. Find the paths you need to extract using the JSON path. Create a valid JSON template with the static field names + the paths that need to extract. (Paths name starts with $.). Firehose will simply replace the paths with the actual data in the path of the message accordingly. Paths can also be used on keys, but be careful that the element in the key must be a string data type.

One sample configuration(On XYZ proto) : {"test":"$.routes[0]", "$.order_number" : "xxx"} If you want to dump the entire JSON as it is in the backend, use "$._all_" as a path.

Limitations:

  • Works when the input DATA TYPE is a protobuf, not a JSON.
  • Supports only on messages, not keys.
  • validation on the level of valid JSON template. But after data has been replaced the resulting string may or may not be a valid JSON. Users must do proper testing/validation from the service side.
  • If selecting fields from complex data types like repeated/messages/map of proto, the user must do filtering based first as selecting a field that does not exist would fail.
- + \ No newline at end of file diff --git a/contribute/contribution/index.html b/contribute/contribution/index.html index a2697b32e..ec2d09689 100644 --- a/contribute/contribution/index.html +++ b/contribute/contribution/index.html @@ -8,13 +8,13 @@ Contribution Process | Firehose - +

Contribution Process

The following is a set of guidelines for contributing to Firehose. These are mostly guidelines, not rules. Use your best judgment, and feel free to propose changes to this document in a pull request. Here are some important resources:

  • The Concepts section will explain to you about Firehose architecture,
  • Our roadmap is the 10k foot view of where we're going, and
  • Github issues track the ongoing and reported issues.

Development of Firehose happens in the open on GitHub, and we are grateful to the community for contributing bug fixes and improvements. Read below to learn how you can take part in improving Firehose.

What to contribute

Your contribution might make it easier for other people to use this product. Better usability often means a bigger user base, which results in more contributors, which in turn can lead to higher-quality software in the long run.

You don’t have to be a developer to make a contribution. We also need technical writers to improve our documentation and designers to make our interface more intuitive and attractive. In fact, We are actively looking for contributors who have these skill sets.

The following parts are open for contribution:

  • Adding a new functionality
  • Improve an existing functionality
  • Adding a new sink
  • Improve an existing sink
  • Provide suggestions to make the user experience better
  • Provide suggestions to Improve the documentation

To help you get your feet wet and get you familiar with our contribution process, we have a list of good first issues that contain bugs that have a relatively limited scope. This is a great place to get started.

How can I contribute?

We use RFCs and GitHub issues to communicate ideas.

  • You can report a bug or suggest a feature enhancement or can just ask questions. Reach out on Github discussions for this purpose.
  • You are also welcome to add a new common sink in depot, improve monitoring and logging and improve code quality.
  • You can help with documenting new features or improve existing documentation.
  • You can also review and accept other contributions if you are a maintainer.

Please submit a PR to the main branch of the Firehose repository once you are ready to submit your contribution. Code submission to Firehose (including a submission from project maintainers) requires review and approval from maintainers or code owners. PRs that are submitted by the general public need to pass the build. Once the build is passed community members will help to review the pull request.

Becoming a maintainer

We are always interested in adding new maintainers. What we look for is a series of contributions, good taste, and an ongoing interest in the project.

  • maintainers will have write access to the Firehose repositories.
  • There is no strict protocol for becoming a maintainer or PMC member. Candidates for new maintainers are typically people that are active contributors and community members.
  • Candidates for new maintainers can also be suggested by current maintainers or PMC members.
  • If you would like to become a maintainer, you should start contributing to Firehose in any of the ways mentioned. You might also want to talk to other maintainers and ask for their advice and guidance.

Guidelines

Please follow these practices for your change to get merged fast and smoothly:

  • Contributions can only be accepted if they contain appropriate testing (Unit and Integration Tests).
  • If you are introducing a completely new feature or making any major changes to an existing one, we recommend starting with an RFC and get consensus on the basic design first.
  • Make sure your local build is running with all the tests and checkstyle passing.
  • If your change is related to user-facing protocols/configurations, you need to make the corresponding change in the documentation as well.
  • Docs live in the code repo under docs so that changes to that can be done in the same PR as changes to the code.
- + \ No newline at end of file diff --git a/contribute/development/index.html b/contribute/development/index.html index 81ab1e130..1f92e42ef 100644 --- a/contribute/development/index.html +++ b/contribute/development/index.html @@ -8,14 +8,14 @@ Development Guide | Firehose - +

Development Guide

The following guide will help you quickly run Firehose in your local machine. The main components of Firehose are:

  • Consumer: Handles data consumption from Kafka.
  • Sink: Package which handles sinking data.
  • Metrics: Handles the metrics via StatsD client

Requirements

Development environment

Java SE Development Kit 8 is required to build, test and run Firehose service. Oracle JDK 8 can be downloaded from here. Extract the tarball to your preferred installation directory and configure your PATH environment variable to point to the bin sub-directory in the JDK 8 installation directory. For example -

export PATH=~/Downloads/jdk1.8.0_291/bin:$PATH

Environment Variables

Firehose environment variables can be configured in either of the following ways -

  • append a new line at the end of env/local.properties file. Variables declared in local.properties file are automatically added to the environment during runtime.
  • run export SAMPLE_VARIABLE=287 on a UNIX shell, to directly assign the required environment variable.

Kafka Server

Apache Kafka server service must be set up, from which Firehose's Kafka consumer will pull messages. Kafka Server version greater than 2.4 is currently supported by Firehose. Kafka Server URL and port address, as well as other Kafka-specific parameters must be configured in the corresponding environment variables as defined in the Generic configuration section.

Read the official guide on how to install and configure Apache Kafka Server.

Destination Sink Server

The sink to which Firehose will stream Kafka's data to, must have its corresponding server set up and configured. The URL and port address of the database server / HTTP/GRPC endpoint , along with other sink - specific parameters must be configured the environment variables corresponding to that particular sink.

Configuration parameter variables of each sink can be found in the Configurations section.

Schema Registry

When INPUT_SCHEMA_DATA_TYPE is set to protobuf, firehose uses Stencil Server as its Schema Registry for hosting Protobuf descriptors. The environment variable SCHEMA_REGISTRY_STENCIL_ENABLE must be set to true . Stencil server URL must be specified in the variable SCHEMA_REGISTRY_STENCIL_URLS . The Proto Descriptor Set file of the Kafka messages must be uploaded to the Stencil server.

Refer this guide on how to set up and configure the Stencil server, and how to generate and upload Proto descriptor set file to the server.

Monitoring

Firehose sends critical metrics via StatsD client. Refer the Monitoring section for details on how to setup Firehose with Grafana. Alternatively, you can set up any other visualization platform for monitoring Firehose. Following are the typical requirements -

  • StatsD host (e.g. Telegraf) for aggregation of metrics from Firehose StatsD client
  • A time-series database (e.g. InfluxDB) to store the metrics
  • GUI visualization dashboard (e.g. Grafana) for detailed visualisation of metrics

Running locally

  • The following guides provide a simple way to run firehose with a log sink locally.
  • It uses the TestMessage (src/test/proto/TestMessage.proto) proto schema, which has already been provided for testing purposes.
# Clone the repo
$ git clone https://github.com/goto/firehose.git

# Build the jar
$ ./gradlew clean build

# Configure env variables
$ cat env/local.properties

Configure env/local.properties

Set the generic variables in the local.properties file.

KAFKA_RECORD_PARSER_MODE = message
SINK_TYPE = log
INPUT_SCHEMA_DATA_TYPE=protobuf
INPUT_SCHEMA_PROTO_CLASS = com.gotocompany.firehose.consumer.TestMessage

Set the variables which specify the kafka server, topic name, and group-id of the kafka consumer - the standard values are used here.

SOURCE_KAFKA_BROKERS = localhost:9092
SOURCE_KAFKA_TOPIC = test-topic
SOURCE_KAFKA_CONSUMER_GROUP_ID = sample-group-id

Stencil Workaround

Firehose uses Stencil as the schema-registry which enables dynamic proto schemas. For the sake of this quick-setup guide, we can work our way around Stencil setup by setting up a simple local HTTP server which can provide the static descriptor for TestMessage schema.

  • Install a server service - like this one.

  • Generate the descriptor for TestMessage by running the command on terminal -

./gradlew generateTestProto
  • The above should generate a file (src/test/resources/__files/descriptors.bin), move this to a new folder at a separate location, and start the HTTP-server there so that this file can be fetched at the runtime.
  • If you are using this, use this command after moving the file to start server at the default port number 8080.
http-server
  • Because we are not using the schema-registry in the default mode, the following lines should also be added in env/local.properties to specify the new location to fetch descriptor from.
SCHEMA_REGISTRY_STENCIL_ENABLE = true
SCHEMA_REGISTRY_STENCIL_URLS = http://localhost:8080/descriptors.bin
SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH = false
SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY = LONG_POLLING

Run Firehose Log Sink

  • Make sure that your kafka server and local HTTP server containing the descriptor is up and running.
  • Run the firehose consumer through the gradlew task:
./gradlew runConsumer

Note: Sample configuration for other sinks along with some advanced configurations can be found here

Running tests

# Running unit tests
$ ./gradlew test

# Run code quality checks
$ ./gradlew checkstyleMain checkstyleTest

#Cleaning the build
$ ./gradlew clean

Style Guide

Java

We conform to the Google Java Style Guide. Maven can helpfully take care of that for you before you commit:

Making a pull request

Incorporating upstream changes from master

Our preference is the use of git rebase instead of git merge. Signing commits

# Include -s flag to signoff
$ git commit -s -m "feat: my first commit"

Good practices to keep in mind

  • Follow the conventional commit format for all commit messages.
  • Fill in the description based on the default template configured when you first open the PR
  • Include kind label when opening the PR
  • Add WIP: to PR name if more work needs to be done prior to review
  • Avoid force-pushing as it makes reviewing difficult
- + \ No newline at end of file diff --git a/guides/create_firehose/index.html b/guides/create_firehose/index.html index 628205ba2..79048f8fb 100644 --- a/guides/create_firehose/index.html +++ b/guides/create_firehose/index.html @@ -8,13 +8,13 @@ Creating Firehose | Firehose - +

Creating Firehose

This page contains how-to guides for creating Firehose with different sinks along with their features.

Create a Log Sink

Firehose provides a log sink to make it easy to consume messages in standard output. A log sink firehose requires the following variables to be set. Firehose log sink can work in key as well as message parsing mode configured through KAFKA_RECORD_PARSER_MODE

An example log sink configurations:

SOURCE_KAFKA_BROKERS=localhost:9092
SOURCE_KAFKA_TOPIC=test-topic
KAFKA_RECOED_CONSUMER_GROUP_ID=sample-group-id
KAFKA_RECORD_PARSER_MODE=message
SINK_TYPE=log
INPUT_SCHEMA_DATA_TYPE=protobuf
INPUT_SCHEMA_PROTO_CLASS=com.tests.TestMessage

Sample output of a Firehose log sink:

2021-03-29T08:43:05,998Z [pool-2-thread-1] INFO  com.gotocompany.firehose.Consumer- Execution successful for 1 records
2021-03-29T08:43:06,246Z [pool-2-thread-1] INFO com.gotocompany.firehose.Consumer - Pulled 1 messages
2021-03-29T08:43:06,246Z [pool-2-thread-1] INFO com.gotocompany.firehose.sink.log.LogSink -
================= DATA =======================
sample_field: 81179979
sample_field_2: 9897987987
event_timestamp {
seconds: 1617007385
nanos: 964581040
}

Define generic configurations

  • These are the configurations that remain common across all the Sink Types.
  • You don’t need to modify them necessarily, It is recommended to use them with the default values. More details here.

Create an HTTP Sink

Firehose HTTP sink allows users to read data from Kafka and write to an HTTP endpoint. it requires the following variables to be set. You need to create your own HTTP endpoint so that the Firehose can send data to it.

Supported methods

Firehose supports PUT and POST verbs in its HTTP sink. The method can be configured using SINK_HTTP_REQUEST_METHOD.

Authentication

Firehose HTTP sink supports OAuth authentication. OAuth can be enabled for the HTTP sink by setting SINK_HTTP_OAUTH2_ENABLE

SINK_HTTP_OAUTH2_ACCESS_TOKEN_URL: https://sample-oauth.my-api.com/oauth2/token  # OAuth2 Token Endpoint.
SINK_HTTP_OAUTH2_CLIENT_NAME: client-name # OAuth2 identifier issued to the client.
SINK_HTTP_OAUTH2_CLIENT_SECRET: client-secret # OAuth2 secret issued for the client.
SINK_HTTP_OAUTH2_SCOPE: User:read, sys:info # Space-delimited scope overrides.

Retries

Firehose allows for retrying to sink messages in case of failure of HTTP service. The HTTP error code ranges to retry can be configured with SINK_HTTP_RETRY_STATUS_CODE_RANGES. HTTP request timeout can be configured with SINK_HTTP_REQUEST_TIMEOUT_MS

Templating

Firehose HTTP sink supports payload templating using SINK_HTTP_JSON_BODY_TEMPLATE configuration. It uses JsonPath for creating Templates which is a DSL for basic JSON parsing. Playground for this: https://jsonpath.com/, where users can play around with a given JSON to extract out the elements as required and validate the jsonpath. The template works only when the output data format SINK_HTTP_DATA_FORMAT is JSON.

Creating Templates:

This is really simple. Find the paths you need to extract using the JSON path. Create a valid JSON template with the static field names + the paths that need to extract. (Paths name starts with $.). Firehose will simply replace the paths with the actual data in the path of the message accordingly. Paths can also be used on keys, but be careful that the element in the key must be a string data type.

One sample configuration(On XYZ proto) : {"test":"$.routes[0]", "$.order_number" : "xxx"} If you want to dump the entire JSON as it is in the backend, use "$._all_" as a path.

Limitations:

  • Works when the input DATA TYPE is a protobuf, not a JSON.
  • Supports only on messages, not keys.
  • validation on the level of valid JSON template. But after data has been replaced the resulting string may or may not be a valid JSON. Users must do proper testing/validation from the service side.
  • If selecting fields from complex data types like repeated/messages/map of proto, the user must do filtering based first as selecting a field that does not exist would fail.

Create a JDBC sink

  • Supports only PostgresDB as of now.
  • Data read from Kafka is written to the PostgresDB database and it requires the following variables to be set.

Note: Schema (Table, Columns, and Any Constraints) being used in firehose configuration must exist in the Database already.

Create an InfluxDB sink

  • Data read from Kafka is written to the InfluxDB time-series database and it requires the following variables to be set.

Note: DATABASE and RETENTION POLICY being used in firehose configuration must exist already in the Influx, It’s outside the scope of a firehose and won’t be generated automatically.

Create a Redis sink

  • it requires the following variables to be set.
  • Redis sink can be created in 2 different modes based on the value of SINK_REDIS_DATA_TYPE: HashSet or List
    • Hashset: For each message, an entry of the format key : field : value is generated and pushed to Redis. field and value are generated on the basis of the config INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING
    • List: For each message entry of the format key : value is generated and pushed to Redis. Value is fetched for the proto index provided in the config SINK_REDIS_LIST_DATA_PROTO_INDEX
  • The key is picked up from a field in the message itself.
  • Redis sink also supports different Deployment Types Standalone and Cluster.
  • Limitation: Firehose Redis sink only supports HashSet and List entries as of now.

Create an Elasticsearch sink

  • it requires the following variables to be set.
  • In the Elasticsearch sink, each message is converted into a document in the specified index with the Document type and ID as specified by the user.
  • Elasticsearch sink supports reading messages in both JSON and Protobuf formats.
  • Using Routing Key one can route documents to a particular shard in Elasticsearch.

Create a GRPC sink

  • Data read from Kafka is written to a GRPC endpoint and it requires the following variables to be set.
  • You need to create your own GRPC endpoint so that the Firehose can send data to it. The response proto should have a field “success” with value as true or false.

Create an MongoDB sink

  • it requires the following variables to be set.
  • In the MongoDB sink, each message is converted into a BSON Document and then inserted/updated/upserted into the specified Mongo Collection
  • MongoDB sink supports reading messages in both JSON and Protobuf formats.

Create a Blob sink

  • it requires the following variables to be set.
  • Only support google cloud storage for now.
  • Only support writing protobuf message to apache parquet file format for now.
  • The protobuf message need to have a google.protobuf.Timestamp field as partitioning timestamp, event_timestamp field is usually being used.
  • Google cloud credential with some google cloud storage permission is required to run this sink.

Create a Bigquery sink

  • it requires the following variables to be set.
  • For INPUT_SCHEMA_DATA_TYPE = protobuf, this sink will generate bigquery schema from protobuf message schema and update bigquery table with the latest generated schema.
    • The protobuf message of a google.protobuf.Timestamp field might be needed when table partitioning is enabled.
  • For INPUT_SCHEMA_DATA_TYPE = json, this sink will generate bigquery schema by infering incoming json. In future we will add support for json schema as well coming from stencil.
    • The timestamp column is needed incase of partition table. It can be generated at the time of ingestion by setting the config. Please refer to config SINK_BIGQUERY_ADD_EVENT_TIMESTAMP_ENABLE in depot bigquery sink config section
  • Google cloud credential with some bigquery permission is required to run this sink.

Create a Bigtable sink

  • it requires the following environment variables ,which are required by Depot library, to be set along with the generic firehose variables.

If you'd like to connect to a sink which is not yet supported, you can create a new sink by following the contribution guidelines

- + \ No newline at end of file diff --git a/guides/deployment/index.html b/guides/deployment/index.html index 192009862..82551770c 100644 --- a/guides/deployment/index.html +++ b/guides/deployment/index.html @@ -8,13 +8,13 @@ Deployment | Firehose - +

Deployment

Firehose can deployed locally, inside a Docker container or in a Kubernetes cluster. The following external services must be installed and launched before deploying Firehose on any platform -

  • Apache Kafka Server 2.3+
  • Stencil Server as schema registry
  • Telegraf as the StatsD host
  • InfluxDB for storing metrics
  • Grafana for metrics visualization
  • destination Sink server

Refer the Development Guide section on how to set up and configure the above services. For instructions on how to set up visualization of Firehose metrics , refer the Monitoring section.

Deploy on Docker

Use the Docker hub to download Firehose docker image. You need to have Docker installed in your system. Follow this guide on how to install and set up Docker in your system.

# Download docker image from docker hub
$ docker pull gotocompany/firehose

# Run the following docker command for a simple log sink.
$ docker run -e SOURCE_KAFKA_BROKERS=127.0.0.1:6667 -e SOURCE_KAFKA_CONSUMER_GROUP_ID=kafka-consumer-group-id -e SOURCE_KAFKA_TOPIC=sample-topic -e SINK_TYPE=log -e SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET=latest -e INPUT_SCHEMA_PROTO_CLASS=com.github.firehose.sampleLogProto.SampleLogMessage -e SCHEMA_REGISTRY_STENCIL_ENABLE=true -e SCHEMA_REGISTRY_STENCIL_URLS=http://localhost:9000/artifactory/proto-descriptors/latest/gotocompany/firehose:latest

Note: Make sure your protos (.jar file) are located in work-dir, this is required for Filter functionality to work.

Deploy on Kubernetes

Kubernetes is an open-source container-orchestration system for automating computer application deployment, scaling, and management. Follow this guide on how to set up and configure a Kubernetes cluster.

Then create a Firehose deployment using the Helm chart available here. The Helm chart Deployment also includes Telegraf container which works as a metrics aggregator and pushes StatsD metrics to InfluxDB. Make sure to configure all Kafka, sink and filter parameters in values.yaml file before deploying the Helm chart.

Description and default values for each parameter can be found here.

Deploy locally

Firehose needs Java SE Development Kit 8 to be installed and configured in JAVA_HOME environment variable.

# Clone the repo
$ git clone https://github.com/goto/firehose.git

# Build the jar
$ ./gradlew clean build

# Configure env variables
$ cat env/local.properties

# Run the Firehose
$ ./gradlew runConsumer

Note: Sample configuration for other sinks along with some advanced configurations can be found here

- + \ No newline at end of file diff --git a/guides/jexl-based-filters/index.html b/guides/jexl-based-filters/index.html index 8a836ed05..9ffd22ce0 100644 --- a/guides/jexl-based-filters/index.html +++ b/guides/jexl-based-filters/index.html @@ -8,7 +8,7 @@ JEXL-based Filters | Firehose - + @@ -70,7 +70,7 @@ This is the negation of the 'starts with' operator. a !^ "abc" is equivalent to !(a =^ "abc")

Ends With=$ The =$ operator is a short-hand for the 'endsWith' method. For example, "abcdef" =$ "def" returns true. Note that through duck-typing, user classes exposing an 'endsWith' method will allow their instances to behave as left-hand side operands of this operator.

Not Ends With!$ This is the negation of the 'ends with' operator. a !$ "abc" is equivalent to !(a =$ "abc")

Examples

Sample proto message:

===================KEY==========================
driver_id: "abcde12345"
vehicle_type: BIKE
event_timestamp {
seconds: 186178
nanos: 323080
}
driver_status: UNAVAILABLE

================= MESSAGE=======================
driver_id: "abcde12345"
vehicle_type: BIKE
event_timestamp {
seconds: 186178
nanos: 323080
}
driver_status: UNAVAILABLE
app_version: "1.0.0"
driver_location {
latitude: 0.6487193703651428
longitude: 0.791822075843811
altitude_in_meters: 0.9949166178703308
accuracy_in_meters: 0.39277541637420654
speed_in_meters_per_second: 0.28804516792297363
}
gcm_key: "abc123"

Key-based filter expressions examples:

  • sampleLogKey.getDriverId()=="abcde12345"
  • sampleLogKey.getVehicleType()=="BIKE"
  • sampleLogKey.getEventTimestamp().getSeconds()==186178
  • sampleLogKey.getDriverId()=="abcde12345"&&sampleLogKey.getVehicleType=="BIKE" (multiple conditions example 1)
  • sampleLogKey.getVehicleType()=="BIKE"||sampleLogKey.getEventTimestamp().getSeconds()==186178 (multiple conditions example 2)

Message -based filter expressions examples:

  • sampleLogMessage.getGcmKey()=="abc123"
  • sampleLogMessage.getDriverId()=="abcde12345"&&sampleLogMessage.getDriverLocation().getLatitude()>0.6487193703651428
  • sampleLogMessage.getDriverLocation().getAltitudeInMeters>0.9949166178703308

Note: Use log sink for testing the applied filtering

- + \ No newline at end of file diff --git a/guides/json-based-filters/index.html b/guides/json-based-filters/index.html index 023c72744..bb889b123 100644 --- a/guides/json-based-filters/index.html +++ b/guides/json-based-filters/index.html @@ -8,7 +8,7 @@ JSON-based Filters | Firehose - + @@ -18,7 +18,7 @@ x > exclusiveMinimum
x ≤ maximum
x < exclusiveMaximum

Example:

{
"properties":{
"age":{
"minimum":0,
"maximum":100
}
}
}


/* valid */
{"age":0}
{"age":100}
{"age":99}


/* invalid */
{"age":-1}
{"age":101}

Regex Match

The pattern keyword is used to restrict a string to a particular regular expression. The regular expression syntax is the one defined in JavaScript (ECMA 262 specifically). See Regular Expressions for more information.

Example:

{
"properties":{
"pincode":{
"pattern":"^(\\([0-9]{3}\\))?[0-9]{3}-[0-9]{4}$"
}
}
}

// valid
{ "pincode": "555-1212" }
{ "pincode": "(888)555-1212" }

// invalid
{ "pincode": "(888)555-1212 ext. 532" }
{ "pincode": "(800)FLOWERS" }

Conditional operators

The if, then and else keywords allow the application of a sub-schema based on the outcome of another schema, much like the if/then/else constructs you’ve probably seen in traditional programming languages. If if is valid, then must also be valid (and else is ignored.) If if is invalid, else must also be valid (and then is ignored).

{
"if": {
"properties": { "country": { "const": "United States of America" } }
},
"then": {
"properties": { "postal_code": { "pattern": "[0-9]{5}(-[0-9]{4})?" } }
},
"else": {
"properties": { "postal_code": { "pattern": "[A-Z][0-9][A-Z] [0-9][A-Z][0-9]" } }
}
}

/* valid */
{
"street_address": "1600 Pennsylvania Avenue NW",
"country": "United States of America",
"postal_code": "20500"
}

{
"street_address": "24 Sussex Drive",
"country": "Canada",
"postal_code": "K1M 1M4"
}

/* invalid */
{
"street_address": "24 Sussex Drive",
"country": "Canada",
"postal_code": "10000"
}

Logical operators

The keywords used to combine schemas are:

  • allOf: Must be valid against all of the sub-schemas
  • oneOf: Must be valid against exactly one of the sub-schemas
  • anyOf: Must be valid against any of the sub-schemas

allOf

To validate against allOf, the given data must be valid against all of the given sub-schemas.

{
"properties":{
"age":{
"allOf":[
{ "multipleOf":5 },
{ "multipleOf":3 }
]
}
}
}

/* valid */
{"age:15}
{"age:30}


/* invalid */
{"age:5}
{"age:9}

anyOf

To validate against anyOf, the given data must be valid against any (one or more) of the given sub-schemas.

{
"properties":{
"age":{
"anyOf":[
{ "multipleOf":5 },
{ "multipleOf":3 }
]
}
}
}

/* valid */
{"age:10}
{"age:15}


/* invalid */
{"age:2}
{"age:7}

oneOf

To validate against oneOf, the given data must be valid against exactly one of the given sub-schemas.

{
"properties":{
"age":{
"oneOf":[
{ "multipleOf":5 },
{ "multipleOf":3 }
]
}
}
}

/* valid */
{"age:10}
{"age:9}


/* invalid */
{"age:2}
{"age:15}

not

The not keyword declares that a instance validates if it doesn’t validate against the given sub-schema.

{
"properties":{
"fruit":{
"not":{
"const":"apple"
}
}
}
}

/* valid */
{"fruit":"mango"}
{"fruit":"errr"}

/* invalid */
{"fruit":"apple"}

Nested fields

You can apply all the above validation features to any level of nested fields in the JSON/ Protobuf message. Consider the below example -

{
"properties":{
"driver_location":{
"properties":{
"latitude":{
"minimum":-90.453,
"maximum":90.2167
},
"longitude":{
"minimum":-180.776,
"maximum":180.321
}
}
}
}
}
- + \ No newline at end of file diff --git a/guides/manage/index.html b/guides/manage/index.html index 06e81d241..36a7eae9a 100644 --- a/guides/manage/index.html +++ b/guides/manage/index.html @@ -8,13 +8,13 @@ Troubleshooting | Firehose - +

Troubleshooting

Consumer Lag

When it comes to decreasing the topic lag, it often helps to have the environment variable - SOURCE_KAFKA_CONSUMER_CONFIG_MAX_POLL_RECORDS config to be increased from the default of 500 to something higher.

Additionally, you can increase the workers in the Firehose which will effectively multiply the number of records being processed by Firehose. However, please be mindful of the caveat mentioned below.

The caveat to the aforementioned remedies:

Be mindful of the fact that your sink also needs to be able to process this higher volume of data being pushed to it. Because if it is not, then this will only compound the problem of increasing lag.

Alternatively, if your underlying sink is not able to handle increased (or default) volume of data being pushed to it, adding some sort of a filter condition in the Firehose to ignore unnecessary messages in the topic would help you bring down the volume of data being processed by the sink.

- + \ No newline at end of file diff --git a/index.html b/index.html index 8d505d2df..86d1e20e6 100644 --- a/index.html +++ b/index.html @@ -8,13 +8,13 @@ Introduction | Firehose - +

Introduction

Firehose is a cloud-native service for delivering real-time streaming data to destinations such as service endpoints (HTTP or GRPC) & managed databases (MongoDB, Prometheus, Postgres, InfluxDB, Redis, & ElasticSearch). With Firehose, you don't need to write applications or manage resources. It automatically scales to match the throughput of your data and requires no ongoing administration. If your data is present in Kafka, Firehose delivers it to the destination(SINK) that you specified.

Key Features

Discover why users choose Firehose as their main Kafka Consumer

  • Sinks Firehose supports sinking stream data to log console, HTTP, GRPC, PostgresDB(JDBC), InfluxDB, Elastic Search, Redis, Prometheus and MongoDB.
  • Scale Firehose scales in an instant, both vertically and horizontally, for high-performance streaming sink and zero data drops.
  • Extensibility Add your own sink to Firehose with a clearly defined interface or choose from already provided ones.
  • Runtime Firehose can run inside containers or VMs in a fully managed runtime environment like Kubernetes.
  • Metrics Always know what’s going on with your deployment with built-in monitoring of throughput, response times, errors, and more.

Supported Incoming data types from kafka

  • Protobuf
  • JSON
    • Supported limited to bigquery, elastic and mongo sink. In future support to other sinks will be added

Supported Sinks:

Following sinks are supported in the Firehose

  • Log - Standard Output
  • HTTP - HTTP services
  • JDBC - Postgres DB
  • InfluxDB - A time-series database
  • Redis - An in-memory Key value store
  • ElasticSearch - A search database
  • GRPC - GRPC based services
  • Prometheus - A time-series database
  • MongoDB - A NoSQL database
  • Bigquery - A data warehouse provided by Google Cloud
  • Bigtable - A fully managed, scalable NoSQL database service for large analytical and operational workloads.
  • Blob Storage - A data storage architecture for large stores of unstructured data like google cloud storage, amazon s3, apache hadoop distributed filesystem

How can I get started?

Explore the following resources to get started with Firehose:

  • Guides provide guidance on creating Firehose with different sinks.
  • Concepts describe all important Firehose concepts.
  • FAQs lists down some common frequently asked questions about Firehose and related components.
  • Reference contains details about configurations, metrics, FAQs, and other aspects of Firehose.
  • Contributing contains resources for anyone who wants to contribute to Firehose.
- + \ No newline at end of file diff --git a/reference/core-faqs/index.html b/reference/core-faqs/index.html index 1b26cb853..6f46e82b7 100644 --- a/reference/core-faqs/index.html +++ b/reference/core-faqs/index.html @@ -8,13 +8,13 @@ FAQs | Firehose - +

FAQs

What problems does Firehose solve?

Every micro-service needs its own sink to be developed for such common operations as streaming data from Kafka to data lakes or other endpoints, along with real-time filtering, parsing, and monitoring of the sink.

With Firehose, you don't need to write sink code for every such microservice, or manage resources to sink data from Kafka server to your database/service endpoint. Having provided all the configuration parameters of the sink, Firehose will create, manage and monitor one for you. It also automatically scales to match the throughput of your data and requires no ongoing administration.

Which Java versions does Firehose work with?

Firehose has been built and tested to work with Java SE Development Kit 1.8.

How does the execution work?

Firehose has the capability to run parallelly on threads. Each thread does the following:

  • Get messages from Kafka
  • Filter the messages (optional)
  • Push these messages to sink
  • All the existing sink types follow the same contract/lifecycle defined in AbstractSink.java. It consists of two stages:
    • Prepare: Transformation over-filtered messages’ list to prepare the sink-specific insert/update client requests.
    • Execute: Requests created in the Prepare stage are executed at this step and a list of failed messages is returned (if any) for retry.
  • In case push fails and DLQ is:
    • enabled: Firehose keeps on retrying for the configured number of attempts before the messages got pushed to DLQ Kafka topic
    • disabled: Firehose keeps on retrying until it receives a success code
  • Captures telemetry and success/failure events and send them to Telegraf
  • Repeat the process

Can I do any transformations(for example filter) before sending the data to sink?

Yes, Firehose provides JEXL based filters based on the fields in key or message of the Kafka record. Read the Filters section for further details.

How to optimize parallelism based on input rate of Kafka messages?

You can increase the workers in the Firehose which will effectively multiply the number of records being processed by Firehose. However, please be mindful of the fact that your sink also needs to be able to process this higher volume of data being pushed to it. Because if it is not, then this will only compound the problem of increasing lag.

Adding some sort of a filter condition in the Firehose to ignore unnecessary messages in the topic would help you bring down the volume of data being processed by the sink.

What is the retry mechanism in Firehose? What kind of retry strategies are supported ?

In case push fails and DLQ (Dead Letter Queue) is:

  • enabled: Firehose keeps on retrying for the configured number of attempts before the messages got pushed to DLQ Kafka topic
  • disabled: Firehose keeps on retrying until it receives a success code

Which Kafka Client configs are available ?

Firehose provides various Kafka client configurations. Refer Generic Configurations section for details on configuration related to Kafka Consumer.

What all data formats are supported ?

Elasticsearch , Bigquery and MongoDB sink support both JSON and Protobuf as the input schema. For other sinks, we currently support only Protobuf. Support for JSON and Avro is planned and incorporated in our roadmap. Please refer to our Roadmap section for more details.

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a Protobuf schema.

Follow the instructions in this article on how to create, compile and serialize a Protobuf object to send it to a binary OutputStream. Refer this guide for detailed Protobuf syntax and rules to create a .proto file

Is there any code snippet which shows how i can produce sample message in supported data format ?

Following is an example to demonstrate how to create a Protobuf message and then produce it to a Kafka cluster. Firstly, create a .proto file containing all the required field names and their corresponding integer tags. Save it in a new file named person.proto

syntax = "proto2";

package tutorial;

option java_multiple_files = true;
option java_package = "com.example.tutorial.protos";
option java_outer_classname = "PersonProtos";

message Person {
optional string name = 1;
optional int32 id = 2;
optional string email = 3;

enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}

message PhoneNumber {
optional string number = 1;
optional PhoneType type = 2 [default = HOME];
}

repeated PhoneNumber phones = 4;
}

Next, compile your .proto file using Protobuf compiler i.e. protoc.This will generate Person ,PersonOrBuilder and PersonProtos Java source files. Specify the source directory (where your application's source code lives – the current directory is used if you don't provide a value), the destination directory (where you want the generated code to go; often the same as $SRC_DIR), and the path to your .proto

protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/person.proto

Lastly, add the following lines in your Java code to generate a POJO (Plain Old Java Object) of the Person proto class and serialize it to a byte array, using the toByteArray() method of the com.google.protobuf.GeneratedMessageV3 class. The byte array is then sent to the Kafka cluster by the producer.

KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties);

Person john = Person.newBuilder()
.setId(87182872)
.setName("John Doe")
.setEmail("jdoe@example.com")
.addPhones(
Person.PhoneNumber.newBuilder()
.setNumber("555-4321")
.setType(Person.PhoneType.HOME))
.build();

producer.send(new ProducerRecord<byte[], byte[]>(topicName, john.toByteArray()));

Refer https://developers.google.com/protocol-buffers for more info on how to create protobufs.

Can we select particular fields from the incoming message ?

Firehose will send all the fields of the incoming messages to the specified sink. But you can configure your sink destination/ database to consume only the required fields.

How can I handle consumer lag ?

  • When it comes to decreasing the topic lag, it often helps to have the environment variable - SOURCE_KAFKA_CONSUMER_CONFIG_MAX_POLL_RECORDS to be increased from the default of 500 to something higher which will tell the Kafka Consumer to consume more messages in a single poll.
  • Additionally, you can increase the workers in the Firehose which will effectively multiply the number of records being processed by Firehose.
  • Alternatively, if your underlying sink is not able to handle increased (or default) volume of data being pushed to it, adding some sort of a filter condition in the Firehose to ignore unnecessary messages in the topic would help you bring down the volume of data being processed by the sink.

What is Stencil in context of Firehose ?

Stencil API is a dynamic schema registry for hosting and managing versions of Protobuf descriptors. The schema handling i.e., find the mapped schema for the topic, downloading the descriptors, and dynamically being notified of/updating with the latest schema is abstracted through the Stencil library.

The Stencil Client is a proprietary library that provides an abstraction layer, for schema handling. Schema Caching, dynamic schema updates are features of the stencil client library.

Refer this article for further information of the features, configuration and deployment instructions of the Stencil API. Source code of Stencil Server and Client API can be found in its Github repository.

How do I configure Protobuf needed to consume ?

Generated Protobuf Descriptors are hosted behind an Stencil server artifactory/HTTP endpoint. This endpoint URL and the ProtoDescriptor class that the Firehose deployment should use to deserialize raw data with is configured in Firehose in the environment variablesSCHEMA_REGISTRY_STENCIL_URLSandINPUT_SCHEMA_PROTO_CLASS respectively .

The Proto Descriptor Set of the Kafka messages must be uploaded to the Stencil server. Refer this guide on how to setup and configure the Stencil server.

Can we select particular fields from the input message ?

No, all fields from the input key/message will be sent by Firehose to the Sink. But you can configure your service endpoint or database to consume only those fields which are required.

Why Protobuf ? Can it support other formats like JSON and Avro ?

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. Data streams on Kafka topics are bound to a Protobuf schema. Protobuf is much more lightweight that other schema formats like JSON, since it encodes the keys in the message to integers.

Elasticsearch, Bigquery and MongoDB sink support both JSON and Protobuf as the input schema.

For other sinks, we currently support only Protobuf. Support for JSON and Avro is planned and incorporated in our roadmap. Please refer to our Roadmap section for more details.

Will I have any data loss if my Firehose fails ?

After a batch of messages is sent successfully, Firehose commits the offset before the consumer polls another batch from Kafka. Thus, failed messages are not committed.

So, when Firehose is restarted, the Kafka Consumer automatically starts pulling messages from the last committed offset of the consumer group. So, no data loss occurs when an instance of Firehose fails.

How does Firehose handle failed messages ?

In case push fails and DLQ (Dead Letter Queue) is:

  • enabled: Firehose keeps on retrying for the configured number of attempts before the messages got pushed to DLQ Kafka topic
  • disabled: Firehose keeps on retrying until it receives a success code

How does commits for Kafka consumer works ?

After the messages are pulled successfully, Firehose commits the offset to the Kafka cluster.

IfSOURCE_KAFKA_ASYNC_COMMIT_ENABLE is set to truethen the KafkaConsumer commits the offset asynchronously and logs to the metric SOURCE_KAFKA_MESSAGES_COMMIT_TOTAL incrementing the counter FAILURE_TAG or SUCCESS_TAG depending on whether the commit was a success / failure.

IfSOURCE_KAFKA_ASYNC_COMMIT_ENABLE is set to falsethen the KafkaConsumer commits the offset synchronously and execution is blocked until the commit either succeeds or throws an exception.

What all metrics are available to monitor the Kafka consumer?

Firehose exposes critical metrics to monitor the health of your delivery streams and take any necessary actions. Refer the Metrics section for further details on each metric.

What happens if my Firehose gets restarted?

When Firehose is restarted, the Kafka Consumer automatically starts pulling messages from the last committed offset of the consumer group specified by the variable SOURCE_KAFKA_CONSUMER_GROUP_ID

How to configure the filter for a proto field based on some data?

The environment variables FILTER_DATA_SOURCE , FILTER_JEXL_EXPRESSION and FILTER_SCHEMA_PROTO_CLASS need to be set for filters to work. The required filters need to be written in JEXL expression format. Refer Using Filters section for more details.

Can I perform basic arithmetic operations in filters?

Yes, you can combine multiple fields of the key/message protobuf in a single JEXL expression and perform any arithmetic or logical operations between them. e.g - sampleKey.getTime().getSeconds() * 1000 + sampleKey.getTime().getMillis() > 22809

Does log sink work for any complex data type e.g. array?

Log Sink uses Logback and SL4J lobrary for logging to standard output. Thus, it'll be able to log any complex data type by printing the String returned by the toString() method of the object. Log sink will also work for arrays and be able to print all the array elements in comma-separated format, e.g. [4, 3, 8]

What are the use-cases of log sink?

Firehose provides a log sink to make it easy to consume messages in standard output. Log sink can be used for debugging purposes and experimenting with various filters. It can also be used to test the latency and overall performance of the Firehose.

- + \ No newline at end of file diff --git a/reference/faq/index.html b/reference/faq/index.html index dcea3fd33..ce457d8fa 100644 --- a/reference/faq/index.html +++ b/reference/faq/index.html @@ -8,7 +8,7 @@ Frequently Asked Questions | Firehose - + @@ -226,7 +226,7 @@ earliest available offset is now 21, Firehose will start reading from offset 21 for the partition. This is also the default behaviour in case this config is not specified at all.
  • If the config SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET is set to none or anything else, then Firehose will terminate with an abnormal status code.
  • - + \ No newline at end of file diff --git a/reference/glossary/index.html b/reference/glossary/index.html index 02907196d..836a8f463 100644 --- a/reference/glossary/index.html +++ b/reference/glossary/index.html @@ -8,13 +8,13 @@ Glossary | Firehose - + - + \ No newline at end of file diff --git a/reference/metrics/index.html b/reference/metrics/index.html index 67920de92..543017453 100644 --- a/reference/metrics/index.html +++ b/reference/metrics/index.html @@ -8,13 +8,13 @@ Metrics | Firehose - +

    Metrics

    Service-level Indicators (SLIs) are the measurements used to calculate the performance for the goal. It is a direct measurement of a service’s behaviour and helps us and the users to evaluate whether our system has been running within SLO. The metrics captured as part of SLI for Firehose are described below.

    Table of Contents

    Type Details

    Collection of all the generic configurations in a Firehose.

    Sink

    • The type of sink of the Firehose. It could be 'log', 'HTTP', 'DB', 'redis', 'influx' or 'Elasticsearch'

    Team

    • Team who has the ownership for the given Firehose.

    Proto Schema

    • The proto class used for creating the Firehose

    Stream

    • The stream where the input topic is read from

    Overview

    Some of the most important metrics related to Firehose that gives you an overview of the current state of it.

    ConsumerLag: MaxLag

    • The maximum lag in terms of number of records for any partition in this window. An increasing value over time is your best indication that the consumer group is not keeping up with the producers.

    Total Message Received

    • Sum of all messages received from Kafka per pod.

    Message Sent Successfully

    • Messages sent successfully to the sink per batch per pod.

    Message Sent Failed

    • Messages failed to be pushed into the sink per batch per pod. In case of HTTP sink, if status code is not in retry codes configured, the records will be dropped.

    Message Dropped

    • In case of HTTP sink, when status code is not in retry codes configured, the records are dropped. This metric captures the dropped messages count.

    Batch size Distribution

    • 99p of batch size distribution for pulled and pushed messages per pod.

    Time spent in Firehose

    • Latency introduced by Firehose (time before sending to sink - time when reading from Kafka). Note: It could be high if the response time of the sink is higher as subsequent batches could be delayed.

    Time spent in pipeline

    • Time difference between Kafka ingestion and sending to sink (Time before sending to sink - Time of Kafka ingestion)

    Sink Response Time

    • Different percentile of the response time of the sink.

    Pods Health

    Since Firehose runs on Kube, this gives a nice health details of each pods.

    JVM Lifetime

    • JVM Uptime of each pod.

    Cpu Load

    • Returns the "recent cpu usage" for the Java Virtual Machine process. This value is a double in the [0.0,1.0] interval. A value of 0.0 means that none of the CPUs were running threads from the JVM process during the recent period of time observed, while a value of 1.0 means that all CPUs were actively running threads from the JVM 100% of the time during the recent period being observed. Threads from the JVM include the application threads as well as the JVM internal threads. All values betweens 0.0 and 1.0 are possible depending of the activities going on in the JVM process and the whole system. If the Java Virtual Machine recent CPU usage is not available, the method returns a negative value.

    Cpu Time

    • Returns the CPU time used by the process on which the Java virtual machine is running. The returned value is of nanoseconds precision but not necessarily nanoseconds accuracy.

    Kafka Consumer Details

    Listing some of the Kafka consumer metrics here.

    Assigned partitions

    • Consumer Group Metrics: The number of partitions currently assigned to this consumer (per pod).

    Consumer Number of Request/second

    • Global Request Metrics: The average number of requests sent per second per pod.

    Records Consumed Rate/second

    • Topic-level Fetch Metrics: The average number of records consumed per second for a specific topic per pod.

    Bytes Consumed Rate/second

    • Topic-level Fetch Metrics: The average number of bytes consumed per second per pod.

    Fetch Rate/second

    • Fetch Metrics: The number of fetch requests per second per pod.

    Max Fetch Latency

    • Fetch Metrics: The max time taken for a fetch request per pod.

    Average Fetch Latency

    • Fetch Metrics: The average time taken for a fetch request per pod.

    Average Fetch Size

    • Fetch Metrics: The average number of bytes fetched per request per pod.

    Max Fetch Size

    • Fetch Metrics: The max number of bytes fetched per request per pod.

    Commit Rate/second

    • Consumer Group Metrics: The number of commit calls per second per pod.

    Consumer Active Connections Count

    • Global Connection Metrics: The current number of active connections per pod.

    New Connections Creation Rate/second

    • Global Connection Metrics: New connections established per second in the window per pod.

    Connections Close Rate/second

    • Global Connection Metrics: Connections closed per second in the window per pod.

    Consumer Outgoing Byte Rate/Sec

    • Global Request Metrics: The average number of outgoing bytes sent per second to all servers per pod.

    Avg time between poll

    • Average time spent between poll per pod.

    Max time between poll

    • Max time spent between poll per pod.

    Sync rate

    • Consumer Group Metrics: The number of group syncs per second per pod. Group synchronization is the second and last phase of the rebalance protocol. Similar to join-rate, a large value indicates group instability.

    Consumer Network IO rate /second

    • The average number of network operations (reads or writes) on all connections per second per pod

    Rebalance Rate /hour

    • Rate of rebalance the consumer.

    Average Commit latency

    • Consumer Group Metrics: The average time taken for a commit request per pod

    Max Commit latency

    • Consumer Group Metrics: The max time taken for a commit request per pod.

    Avg Rebalance latency

    • Average Rebalance Latency for the consumer per pod.

    Max Rebalance latency

    • Max Rebalance Latency for the consumer per pod.

    Error

    This gives you a nice insight about the critical and noncritical exceptions happened in the Firehose.

    Fatal Error

    • Count of all the exception raised by the pods which can restart the Firehose.

    Nonfatal Error

    • Count of all the exception raised by the Firehose which will not restart the Firehose and Firehose will keep retrying.

    Memory

    Details on memory used by the Firehose for different tasks.

    Heap Memory Usage

    • Details of heap memory usage:

      Max: The amount of memory that can be used for memory management
      Used: The amount of memory currently in use

    Non-Heap Memory Usage

    • Details of non-heap memory usage:

      Max: The amount of memory that can be used for memory management
      Used: The amount of memory currently in use

    GC: Memory Pool Collection Usage

    • For a garbage-collected memory pool, the amount of used memory includes the memory occupied by all objects in the pool including both reachable and unreachable objects. This is for all the names in the type: MemoryPool.

    GC: Memory Pool Peak Usage

    • Peak usage of GC memory usage.

    GC: Memory Pool Usage

    • Total usage of GC memory usage.

    Garbage Collection

    All JVM Garbage Collection Details.

    GC Collection Count

    • The total number of collections that have occurred per pod. Rather than showing the absolute value we are showing the difference to see the rate of change more easily.

    GC Collection Time

    • The approximate accumulated collection elapsed time in milliseconds per pod. Rather than showing the absolute value we are showing the difference to see the rate of change more easily.

    Thread Count

    • daemonThreadCount: Returns the current number of live daemon threads per pod peakThreadCount: Returns the peak live thread count since the Java virtual machine started or peak was reset per pod threadCount: Returns the current number of live threads including both daemon and non-daemon threads per pod.

    Class Count

    • loadedClass: Displays number of classes that are currently loaded in the Java virtual machine per pod unloadedClass: Displays the total number of classes unloaded since the Java virtual machine has started execution.

    Code Cache Memory after GC

    • The code cache memory usage in the memory pools at the end of a GC per pod.

    Compressed Class Space after GC

    • The compressed class space memory usage in the memory pools at the end of a GC per pod.

    Metaspace after GC

    • The metaspace memory usage in the memory pools at the end of a GC per pod.

    Par Eden Space after GC

    • The eden space memory usage in the memory pools at the end of a GC per pod.

    Par Survivor Space after GC

    • The survivor space memory usage in the memory pools at the end of a GC per pod.

    Tenured Space after GC

    • The tenured space memory usage in the memory pools at the end of a GC per pod.

      File Descriptor

    • Number of file descriptor per pod

      Open: Current open file descriptors
      Max: Based on config max allowed

    Retry

    If you have configured retries this will give you some insight about the retries.

    Average Retry Requests

    • Request retries per min per pod.

    Back Off time

    • Time spent per pod backing off.

    HTTP Sink

    HTTP Sink response code details.

    2XX Response Count

    • Total number of 2xx response received by Firehose from the HTTP service,

    4XX Response Count

    • Total number of 4xx response received by Firehose from the HTTP service.

    5XX Response Count

    • Total number of 5xx response received by Firehose from the HTTP service.

    No Response Count

    • Total number of No response received by Firehose from the HTTP service.

    Filter

    Since Firehose supports filtration based on some data, these metrics give some information related to that.

    Filter Type

    • Type of filter in the Firehose. It will be one of the "none", "key", "message".

    Total Messages filtered

    • Sum of all the messages filtered because of the filter condition per pod.

    Blob Sink

    Local File Open Total

    A gauge, total number of local file that is currently being opened.

    Local File Closed Total

    Total number of local file that being closed and ready to be uploaded, excluding local file that being closed prematurely due to consumer restart.

    Local File Closing Time

    Duration of local file closing time.

    Local File Records Total

    Total number of records that written to all files that have been closed and ready to be uploaded.

    Local File Size Bytes

    Size of file in bytes.

    File Uploaded Total

    Total number file that successfully being uploaded.

    File Upload Time

    Duration of file upload.

    File Upload Size Bytes

    Total Size of the uploaded file in bytes.

    File Upload Records Total

    Total number records inside files that successfully being uploaded to blob storage.

    - + \ No newline at end of file diff --git a/roadmap/index.html b/roadmap/index.html index 9cdcbb831..3100ec391 100644 --- a/roadmap/index.html +++ b/roadmap/index.html @@ -8,13 +8,13 @@ Roadmap | Firehose - +

    Roadmap

    In the following section, you can learn about what features we're working on, what stage they're in, and when we expect to bring them to you. Have any questions or comments about items on the roadmap? Join the discussions on the Firehose Github forum.

    We’re planning to iterate on the format of the roadmap itself, and we see the potential to engage more in discussions about the future of Firehose features. If you have feedback about the roadmap section itself, such as how the issues are presented, let us know through discussions.

    Firehose 0.4

    Following are some of the upcoming enhancements on Firehose.

    - + \ No newline at end of file diff --git a/sinks/bigquery-sink/index.html b/sinks/bigquery-sink/index.html index 1b061f523..8ee64bc89 100644 --- a/sinks/bigquery-sink/index.html +++ b/sinks/bigquery-sink/index.html @@ -8,7 +8,7 @@ BigQuery | Firehose - + @@ -24,7 +24,7 @@ Depot-bigquery.md#protobuf-bigquery-table-type-mapping section

    Partitioning

    Bigquery Sink supports creation of table with partition configuration. For more information refer to Depot-bigquery.md#partitioning section

    Clustering

    Bigquery Sink supports for creating and modifying clustered or unclustered table with clustering configuration. For more information refer to Depot-bigquery.md#clustering section

    Kafka Metadata

    For data quality checking purpose sometimes kafka metadata need to be added on the record. For more information refer to Depot-bigquery.md#metadata sectionn

    Default columns for json data type

    With dynamic schema for json we need to create table with some default columns, example like parition key needs to be set during creation of the table. Sample config SINK_BIGQUERY_DEFAULT_COLUMNS =event_timestamp=timestamp. For more information refer to Depot-bigquery.md#default-columns-for-json-data-type section

    Error handling

    The response can contain multiple errors which will be sent to the firehose from depot. Please refer to Depot-bigquery.md#errors-handling section

    Google Cloud Bigquery IAM Permission

    Several IAM permission is required for bigquery sink to run properly. For more detail refer to Depot-bigquery.md#google-cloud-bigquery-iam-permission section

    - + \ No newline at end of file diff --git a/sinks/bigtable-sink/index.html b/sinks/bigtable-sink/index.html index 71ae0f25c..75eac9fa6 100644 --- a/sinks/bigtable-sink/index.html +++ b/sinks/bigtable-sink/index.html @@ -8,13 +8,13 @@ Bigtable Sink | Firehose - +

    Bigtable Sink

    Bigtable Sink is implemented in Firehose using the Bigtable sink connector implementation in Depot. You can check out Depot Github repository here.

    Configuration

    For Bigtable sink in Firehose we need to set first (SINK_TYPE=bigtable). There are some generic configs which are common across different sink types which need to be set which are mentioned in generic.md. Bigtable sink specific configs are mentioned in Depot repository. You can check out the Bigtable Sink configs here

    - + \ No newline at end of file diff --git a/sinks/blob-sink/index.html b/sinks/blob-sink/index.html index 57b57a542..b1c3b6883 100644 --- a/sinks/blob-sink/index.html +++ b/sinks/blob-sink/index.html @@ -8,7 +8,7 @@ Blob | Firehose - + @@ -17,7 +17,7 @@ When metadata column name is configured, all metadata column/field will be added as child field under the configured column name.

    • Example value: kafka_metadata
    • Type: optional

    SINK_BLOB_LOCAL_FILE_WRITER_PARQUET_BLOCK_SIZE

    Defines the storage parquet writer block size, this config only applies on parquet writer. This configuration is only needed to be set manually when user need to control the block size for optimal file read.

    • Example value: 134217728
    • Type: optional

    SINK_BLOB_LOCAL_FILE_WRITER_PARQUET_PAGE_SIZE

    Define the storage parquet writer page size, this config only applies on parquet writer. This configuration is only needed to be set manually when user need to control the block size for optimal file read.

    • Example value: 1048576
    • Type: optional

    SINK_BLOB_LOCAL_FILE_ROTATION_DURATION_MS

    Define the maximum duration of record to be added to a single parquet file in milliseconds, after the elapsed time exceeded the configured duration, current file will be closed, a new file will be created and incoming records will be written to the new file.

    • Example value: 1800000
    • Type: optional
    • Default value: 3600000

    SINK_BLOB_LOCAL_FILE_ROTATION_MAX_SIZE_BYTES

    Defines the maximum size of record to be written on a single parquet file in bytes, new record will be written to new a file.

    • Example value: 3600000
    • Type: required
    • Default value: 268435456

    SINK_BLOB_FILE_PARTITION_PROTO_TIMESTAMP_FIELD_NAME

    Defines the field used as file partitioning.

    • Example value: event_timestamp
    • Type: required

    SINK_BLOB_FILE_PARTITION_TIME_GRANULARITY_TYPE

    Defines time partitioning file type. Currently, the supported partitioning type are hour, day. This also affect the partitioning path of the files.

    • Example value: hour
    • Type: required
    • Default value: hour

    SINK_BLOB_FILE_PARTITION_PROTO_TIMESTAMP_TIMEZONE

    Defines time partitioning time zone. The date time partitioning uses local date and time value that calculated using the configured timezone.

    • Example value: UTC
    • Type: optional
    • Default value: UTC

    SINK_BLOB_FILE_PARTITION_TIME_HOUR_PREFIX

    Defines time partitioning path format for hour segment for example ${date_segment}/hr=10/${filename}.

    • Example value: hr=
    • Type: optional
    • Default value: hr=

    SINK_BLOB_FILE_PARTITION_TIME_DATE_PREFIX

    Defines time partitioning path format for date segment for example dt=2021-01-01/${hour_segment}/${filename}.

    • Example value: dt=
    • Type: optional
    • Default value: dt=

    SINK_BLOB_GCS_GOOGLE_CLOUD_PROJECT_ID

    The identifier of google project ID where the google cloud storage bucket is located. Further documentation on google cloud project id.

    • Example value: project-007
    • Type: required

    SINK_BLOB_GCS_BUCKET_NAME

    The name of google cloud storage bucket. Here is further documentation of google cloud storage bucket name.

    • Example value: pricing
    • Type: required

    SINK_BLOB_GCS_CREDENTIAL_PATH

    Full path of google cloud credentials file. Here is further documentation of google cloud authentication and credentials.

    • Example value: /.secret/google-cloud-credentials.json
    • Type: required

    SINK_BLOB_GCS_RETRY_MAX_ATTEMPTS

    Number of retry of the google cloud storage upload request when the request failed.

    • Example value: 10
    • Type: optional
    • Default value: 10

    SINK_BLOB_GCS_RETRY_TOTAL_TIMEOUT_MS

    Duration of retry of the google cloud storage upload in milliseconds. Google cloud storage client will keep retry the upload operation until the elapsed time since first retry exceed the configured duration. Both of the config SINK_BLOB_GCS_RETRY_MAX_ATTEMPTS and SINK_BLOB_GCS_RETRY_TOTAL_TIMEOUT_MS can works at the same time, exception will be triggered when one of the limit is exceeded, user also need to aware of the default values.

    • Example value: 60000
    • Type: optional
    • Default value: 120000

    SINK_BLOB_GCS_RETRY_INITIAL_DELAY_MS"

    Initial delay for first retry in milliseconds. It is recommended to set this config at default values.

    • Example value: 500
    • Type: optional
    • Default value: 1000

    SINK_BLOB_GCS_RETRY_MAX_DELAY_MS"

    Maximum delay for each retry in milliseconds when delay being multiplied because of increase in retry attempts. It is recommended to set this config at default values.

    • Example value: 15000
    • Type: optional
    • Default value: 30000

    SINK_BLOB_GCS_RETRY_DELAY_MULTIPLIER"

    Multiplier of retry delay. The new retry delay for the subsequent operation will be recalculated for each retry. This config will cause increase of retry delay. When this config is set to 1 means the delay will be constant. It is recommended to set this config at default values.

    • Example value: 1.5
    • Type: optional
    • Default value: 2

    SINK_BLOB_GCS_RETRY_INITIAL_RPC_TIMEOUT_MS"

    Initial timeout in milliseconds of RPC call for google cloud storage client. It is recommended to set this config at default values.

    • Example value: 3000
    • Type: optional
    • Default value: 5000

    SINK_BLOB_GCS_RETRY_RPC_MAX_TIMEOUT_MS"

    Maximum timeout in milliseconds of RPC call for google cloud storage client. It is recommended to set this config at default values.

    • Example value: 10000
    • Type: optional
    • Default value: 5000

    SINK_BLOB_GCS_RETRY_RPC_TIMEOUT_MULTIPLIER"

    Multiplier of google cloud storage client RPC call timeout. When this config is set to 1 means the config is multiplied. It is recommended to set this config at default values.

    • Example value: 1
    • Type: optional
    • Default value: 1

    SINK_BLOB_S3_REGION"

    Amazon S3 creates buckets in a Region that you specify.

    • Example value: ap-south-1
    • Type: required

    SINK_BLOB_S3_BUCKET_NAME"

    The Name of Amazon S3 bucket .Here is further documentation of s3 bucket name.

    • Example value: sink_bucket
    • Type: required

    SINK_BLOB_S3_ACCESS_KEY"

    Access Key to access the bucket. This key can also be set through env using AWS_ACCESS_KEY_ID key or by creating credentials file in ${HOME}/.aws/credentials folder . Here is further documentation on how to set through credentials file or environment varialbes

    • Example value: AKIAIOSFODNN7EXAMPLE
    • Type: required

    SINK_BLOB_S3_SECRET_KEY"

    Secret Key to access the bucket. This key can also be set through env using AWS_SECRET_ACCESS_KEY key or by creating credentials file in ${HOME}/.aws/credentials folder . Here is further documentation on how to set through credentials file or environment varialbes

    • Example value: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
    • Type: required

    SINK_BLOB_S3_RETRY_MAX_ATTEMPTS

    Number of retry of the s3 upload request when the request failed.

    • Example value: 10
    • Type: optional
    • Default value : 10

    SINK_BLOB_S3_BASE_DELAY_MS"

    Initial delay for first retry in milliseconds.

    • Example value: 1000
    • Type: optional
    • Default value : 1000

    SINK_BLOB_S3_MAX_BACKOFF_MS"

    Max backoff time for retry in milliseconds

    • Example value: 30000
    • Type: optional
    • Default value : 30000

    SINK_BLOB_S3_API_ATTEMPT_TIMEOUT_MS"

    The amount of time to wait for the http request to complete before giving up and timing out in milliseconds.

    • Example value: 10000
    • Type: optional
    • Default value : 10000

    SINK_BLOB_S3_API_TIMEOUT_MS"

    The amount of time to allow the client to complete the execution of an API call. This timeout covers the entire client execution except for marshalling. Unit is in milliseconds.

    • Example value: 40000
    • Type: optional
    • Default value : 40000
    - + \ No newline at end of file diff --git a/sinks/elasticsearch-sink/index.html b/sinks/elasticsearch-sink/index.html index 0e71331c3..e1f140a2d 100644 --- a/sinks/elasticsearch-sink/index.html +++ b/sinks/elasticsearch-sink/index.html @@ -8,13 +8,13 @@ Elasticsearch | Firehose - +

    Elasticsearch

    Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents.

    An ES sink Firehose (SINK_TYPE=elasticsearch) requires the following variables to be set along with generic ones.

    SINK_ES_CONNECTION_URLS

    Elastic search connection URL/URLs to connect. Multiple URLs could be given in a comma separated format.

    • Example value: localhost1:9200
    • Type: required

    SINK_ES_INDEX_NAME

    The name of the index to which you want to write the documents. If it does not exist, it will be created.

    • Example value: sample_index
    • Type: required

    SINK_ES_TYPE_NAME

    Defines the type name of the Document in Elasticsearch.

    • Example value: Customer
    • Type: required

    SINK_ES_ID_FIELD

    The identifier field of the document in Elasticsearch. This should be the key of the field present in the message (JSON or Protobuf) and it has to be a unique, non-null field. So the value of this field in the message will be used as the ID of the document in Elasticsearch while writing the document.

    • Example value: customer_id
    • Type: required

    SINK_ES_MODE_UPDATE_ONLY_ENABLE

    Elasticsearch sink can be created in 2 modes: Upsert mode or UpdateOnly mode. If this config is set:

    • TRUE: Firehose will run on UpdateOnly mode which will only UPDATE the already existing documents in the Elasticsearch index.
    • FALSE: Firehose will run on Upsert mode, UPDATING the existing documents and also INSERTING any new ones.
      • Example value: TRUE
      • Type: required
      • Default value: FALSE

    SINK_ES_INPUT_MESSAGE_TYPE

    Indicates if the Kafka topic contains JSON or Protocol Buffer messages.

    • Example value: PROTOBUF
    • Type: required
    • Default value: JSON

    SINK_ES_PRESERVE_PROTO_FIELD_NAMES_ENABLE

    Whether or not the protobuf field names should be preserved in the Elasticsearch document. If false the fields will be converted to camel case.

    • Example value: FALSE
    • Type: required
    • Default value: TRUE

    SINK_ES_REQUEST_TIMEOUT_MS

    Defines the request timeout of the elastic search endpoint. The value specified is in milliseconds.

    • Example value: 60000
    • Type: required
    • Default value: 60000

    SINK_ES_SHARDS_ACTIVE_WAIT_COUNT

    Defines the number of shard copies that must be active before proceeding with the operation. This can be set to any non-negative value less than or equal to the total number of shard copies (number of replicas + 1).

    • Example value: 1
    • Type: required
    • Default value: 1

    SINK_ES_RETRY_STATUS_CODE_BLACKLIST

    List of comma-separated status codes for which Firehose should not retry in case of UPDATE ONLY mode is TRUE

    • Example value: 404,400
    • Type: optional

    SINK_ES_ROUTING_KEY_NAME

    Defines the proto field whose value will be used for routing documents to a particular shard in Elasticsearch. If empty, Elasticsearch uses the ID field of the doc by default.

    • Example value: service_type
    • Type: optional
    - + \ No newline at end of file diff --git a/sinks/grpc-sink/index.html b/sinks/grpc-sink/index.html index 3bd1a9920..70f0a0750 100644 --- a/sinks/grpc-sink/index.html +++ b/sinks/grpc-sink/index.html @@ -8,13 +8,13 @@ GRPC | Firehose - +
    -

    GRPC

    gRPC is a modern open source high performance Remote Procedure Call framework that can run in any environment.

    A GRPC sink Firehose (SINK_TYPE=grpc) requires the following variables to be set along with Generic ones

    SINK_GRPC_SERVICE_HOST

    Defines the host of the GRPC service.

    • Example value: http://grpc-service.sample.io
    • Type: required

    SINK_GRPC_SERVICE_PORT

    Defines the port of the GRPC service.

    • Example value: 8500
    • Type: required

    SINK_GRPC_METHOD_URL

    Defines the URL of the GRPC method that needs to be called.

    • Example value: com.tests.SampleServer/SomeMethod
    • Type: required

    SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS

    Defines the Proto which would be the response of the GRPC Method.

    • Example value: com.tests.SampleGrpcResponse
    • Type: required

    SINK_GRPC_ARG_KEEPALIVE_TIME_MS

    The keepalive ping is a way to check if a channel is currently working by sending HTTP2 pings over the transport. It is sent periodically, and if the ping is not acknowledged by the peer within a certain timeout period, the transport is disconnected. Other keepalive configurations are described here.

    Defines the period (in milliseconds) after which a keepalive ping is sent on the transport. If smaller than 10000, 10000 will be used instead.

    • Example value: 60000
    • Type: optional
    • Default value: infinite

    SINK_GRPC_ARG_KEEPALIVE_TIMEOUT_MS

    Defines the amount of time (in milliseconds) the sender of the keepalive ping waits for an acknowledgement. If it does not receive an acknowledgment within this time, it will close the connection.

    • Example value: 5000
    • Type: optional
    • Default value: 20000

    SINK_GRPC_ARG_DEADLINE_MS

    Defines the amount of time (in milliseconds) gRPC clients are willing to wait for an RPC to complete before the RPC is terminated with the error DEADLINE_EXCEEDED

    • Example value: 1000
    • Type: optional
    - +

    GRPC

    gRPC is a modern open source high performance Remote Procedure Call framework that can run in any environment.

    A GRPC sink Firehose (SINK_TYPE=grpc) requires the following variables to be set along with Generic ones

    SINK_GRPC_SERVICE_HOST

    Defines the host of the GRPC service.

    • Example value: http://grpc-service.sample.io
    • Type: required

    SINK_GRPC_SERVICE_PORT

    Defines the port of the GRPC service.

    • Example value: 8500
    • Type: required

    SINK_GRPC_METHOD_URL

    Defines the URL of the GRPC method that needs to be called.

    • Example value: com.tests.SampleServer/SomeMethod
    • Type: required

    SINK_GRPC_METADATA

    Defines the GRPC additional static Metadata that allows clients to provide information to server that is associated with an RPC.

    Note - final metadata will be generated with merging static metadata and the kafka record header.

    • Example value: authorization:token,dlq:true
    • Type: optional

    SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS

    Defines the Proto which would be the response of the GRPC Method.

    • Example value: com.tests.SampleGrpcResponse
    • Type: required

    SINK_GRPC_ARG_KEEPALIVE_TIME_MS

    The keepalive ping is a way to check if a channel is currently working by sending HTTP2 pings over the transport. It is sent periodically, and if the ping is not acknowledged by the peer within a certain timeout period, the transport is disconnected. Other keepalive configurations are described here.

    Defines the period (in milliseconds) after which a keepalive ping is sent on the transport. If smaller than 10000, 10000 will be used instead.

    • Example value: 60000
    • Type: optional
    • Default value: infinite

    SINK_GRPC_ARG_KEEPALIVE_TIMEOUT_MS

    Defines the amount of time (in milliseconds) the sender of the keepalive ping waits for an acknowledgement. If it does not receive an acknowledgment within this time, it will close the connection.

    • Example value: 5000
    • Type: optional
    • Default value: 20000

    SINK_GRPC_ARG_DEADLINE_MS

    Defines the amount of time (in milliseconds) gRPC clients are willing to wait for an RPC to complete before the RPC is terminated with the error DEADLINE_EXCEEDED

    • Example value: 1000
    • Type: optional
    + \ No newline at end of file diff --git a/sinks/http-sink/index.html b/sinks/http-sink/index.html index b465b2c49..c3b36e49c 100644 --- a/sinks/http-sink/index.html +++ b/sinks/http-sink/index.html @@ -8,14 +8,14 @@ HTTP | Firehose - +

    HTTP

    REST API stands for Representational State Transfer and is an architectural pattern for creating web services.

    An Http sink Firehose (SINK_TYPE=http) requires the following variables to be set along with Generic ones.

    SINK_HTTP_SERVICE_URL

    The HTTP endpoint of the service to which this consumer should PUT/POST/PATCH/DELETE data. This can be configured as per the requirement, a constant or a dynamic one (which extract given field values from each message and use that as the endpoint) If service url is constant, messages will be sent as batches while in case of dynamic one each message will be sent as a separate request (Since they’d be having different endpoints).

    • Example value: http://http-service.test.io
    • Example value: http://http-service.test.io/test-field/%%s,6 This will take the value with index 6 from proto and create the endpoint as per the template
    • Type: required

    SINK_HTTP_REQUEST_METHOD

    Defines the HTTP verb supported by the endpoint, Supports PUT, POST, PATCH and DELETE verbs as of now.

    • Example value: post
    • Type: required
    • Default value: put

    SINK_HTTP_REQUEST_TIMEOUT_MS

    Defines the connection timeout for the request in millis.

    • Example value: 10000
    • Type: required
    • Default value: 10000

    SINK_HTTP_MAX_CONNECTIONS

    Defines the maximum number of HTTP connections.

    • Example value: 10
    • Type: required
    • Default value: 10

    SINK_HTTP_RETRY_STATUS_CODE_RANGES

    Defines the range of HTTP status codes for which retry will be attempted. Please remove 404 from retry code range in case of HTTP DELETE otherwise it might try to retry to delete already deleted resources.

    • Example value: 400-600
    • Type: optional
    • Default value: 400-600

    SINK_HTTP_DATA_FORMAT

    If set to proto, the log message will be sent as Protobuf byte strings. Otherwise, the log message will be deserialized into readable JSON strings.

    • Example value: JSON
    • Type: required
    • Default value: proto

    SINK_HTTP_HEADERS

    Deifnes the HTTP headers required to push the data to the above URL.

    • Example value: Authorization:auth_token, Accept:text/plain
    • Type: optional

    SINK_HTTP_JSON_BODY_TEMPLATE

    Deifnes a template for creating a custom request body from the fields of a protobuf message. This should be a valid JSON itself.

    • Example value: {"test":"$.routes[0]", "$.order_number" : "xxx"}
    • Type: optional

    SINK_HTTP_PARAMETER_SOURCE

    Defines the source from which the fields should be parsed. This field should be present in order to use this feature.

    • Example value: Key
    • Example value: Message
    • Type: optional
    • Default value: None

    SINK_HTTP_PARAMETER_PLACEMENT

    Deifnes the fields parsed can be passed in query parameters or in headers.

    • Example value: Header
    • Example value: Query
    • Type: optional

    SINK_HTTP_PARAMETER_SCHEMA_PROTO_CLASS

    Defines the fully qualified name of the proto class which is to be used for parametrised http sink.

    • Example value: com.tests.TestMessage
    • Type: optional

    INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING

    Defines the mapping of the proto fields to header/query fields in JSON format.

    • Example value: {"1":"order_number","2":"event_timestamp","3":"driver_id"}
    • Type: optional

    SINK_HTTP_OAUTH2_ENABLE

    Enable/Disable OAuth2 support for HTTP sink.

    • Example value: true
    • Type: optional
    • Default value: false

    SINK_HTTP_OAUTH2_ACCESS_TOKEN_URL

    Defines the OAuth2 Token Endpoint.

    • Example value: https://sample-oauth.my-api.com/oauth2/token
    • Type: optional

    SINK_HTTP_OAUTH2_CLIENT_NAME

    Defines the OAuth2 identifier issued to the client.

    • Example value: client-name
    • Type: optional

    SINK_HTTP_OAUTH2_CLIENT_SECRET

    Defines the OAuth2 secret issued for the client.

    • Example value: client-secret
    • Type: optional

    SINK_HTTP_OAUTH2_SCOPE

    Space-delimited scope overrides. If scope override is not provided, no scopes will be granted to the token.

    • Example value: User:read, sys:info
    • Type: optional

    SINK_HTTP_DELETE_BODY_ENABLE

    This config if set to true will allow body for the HTTP DELETE method, otherwise no payload will be sent with DELETE request.

    • Example value: false
    • Type: optional
    • Default value: true

    SINK_HTTP_SIMPLE_DATE_FORMAT_ENABLE

    This config if set to true will enable the simple date format (Eg. - May 3, 2023 11:59:37 AM ) for timestamps (both at root and nested level also) in case of JSON payload. If set to false, it will send the timestamps (both at root and nested level also) in the ISO format (Eg. - 2023-05-03T11:59:36.965Z) . Note: This config is only applicable when the SINK_HTTP_JSON_BODY_TEMPLATE config is empty or not supplied.

    • Example value: false
    • Type: optional
    • Default value: true
    - + \ No newline at end of file diff --git a/sinks/influxdb-sink/index.html b/sinks/influxdb-sink/index.html index 5fb43c970..4c9fc8526 100644 --- a/sinks/influxdb-sink/index.html +++ b/sinks/influxdb-sink/index.html @@ -8,13 +8,13 @@ InfluxDB | Firehose - +

    InfluxDB

    InfluxDB is an open-source time series database developed by the company InfluxData.

    An Influx sink Firehose (SINK_TYPE=influxdb) requires the following variables to be set along with Generic ones

    SINK_INFLUX_URL

    InfluxDB URL, it's usually the hostname followed by port.

    • Example value: http://localhost:8086
    • Type: required

    SINK_INFLUX_USERNAME

    Defines the username to connect to DB.

    • Example value: root
    • Type: required

    SINK_INFLUX_PASSWORD

    Defines the password to connect to DB.

    • Example value: root
    • Type: required

    SINK_INFLUX_FIELD_NAME_PROTO_INDEX_MAPPING

    Defines the mapping of fields and the corresponding proto index which can be used to extract the field value from the proto message. This is a JSON field. Note that Influx keeps a single value for each unique set of tags and timestamps. If a new value comes with the same tag and timestamp from the source, it will override the existing one.

    • Example value: "2":"order_number","1":"service_type", "4":"status"
      • Proto field value with index 2 will be stored in a field named 'order_number' in Influx and so on\
    • Type: required

    SINK_INFLUX_TAG_NAME_PROTO_INDEX_MAPPING

    Defines the mapping of tags and the corresponding proto index from which the value for the tags can be obtained. If the tags contain existing fields from field name mapping it will not be overridden. They will be duplicated. If ENUMS are present then they must be added here. This is a JSON field.

    • Example value: {"6":"customer_id"}
    • Type: optional

    SINK_INFLUX_PROTO_EVENT_TIMESTAMP_INDEX

    Defines the proto index of a field that can be used as the timestamp.

    • Example value: 5
    • Type: required

    SINK_INFLUX_DB_NAME

    Defines the InfluxDB database name where data will be dumped.

    • Example value: status
    • Type: required

    SINK_INFLUX_RETENTION_POLICY

    Defines the retention policy for influx database.

    • Example value: quarterly
    • Type: required
    • Default value: autogen

    SINK_INFLUX_MEASUREMENT_NAME

    This field is used to give away the name of the measurement that needs to be used by the sink. Measurement is another name for tables and it will be auto-created if does not exist at the time Firehose pushes the data to the influx.

    • Example value: customer-booking
    • Type: required
    - + \ No newline at end of file diff --git a/sinks/jdbc-sink/index.html b/sinks/jdbc-sink/index.html index 227ec7f62..7657933e3 100644 --- a/sinks/jdbc-sink/index.html +++ b/sinks/jdbc-sink/index.html @@ -8,13 +8,13 @@ JDBC | Firehose - +

    JDBC

    A JDBC sink Firehose (SINK_TYPE=jdbc) requires the following variables to be set along with Generic ones

    SINK_JDBC_URL

    Deifnes the PostgresDB URL, it's usually the hostname followed by port.

    • Example value: jdbc:postgresql://localhost:5432/postgres
    • Type: required

    SINK_JDBC_TABLE_NAME

    Defines the name of the table in which the data should be dumped.

    • Example value: public.customers
    • Type: required

    SINK_JDBC_USERNAME

    Defines the username to connect to DB.

    • Example value: root
    • Type: required

    SINK_JDBC_PASSWORD

    Defines the password to connect to DB.

    • Example value: root
    • Type: required

    INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING

    Defines the mapping of fields in DB and the corresponding proto index from where the value will be extracted. This is a JSON field.

    • Example value: {"6":"customer_id","1":"service_type","5":"event_timestamp"} Proto field value with index 1 will be stored in a column named service_type in DB and so on
    • Type: required

    SINK_JDBC_UNIQUE_KEYS

    Defines a comma-separated column names having a unique constraint on the table.

    • Example value: customer_id
    • Type: optional

    SINK_JDBC_CONNECTION_POOL_TIMEOUT_MS

    Defines a database connection timeout in milliseconds.

    • Example value: 1000
    • Type: required
    • Default value: 1000

    SINK_JDBC_CONNECTION_POOL_IDLE_TIMEOUT_MS

    Defines a database connection pool idle connection timeout in milliseconds.

    • Example value: 60000
    • Type: required
    • Default value: 60000

    SINK_JDBC_CONNECTION_POOL_MIN_IDLE

    Defines the minimum number of idle connections in the pool to maintain.

    • Example value: 0
    • Type: required
    • Default value: 0

    SINK_JDBC_CONNECTION_POOL_MAX_SIZE

    Defines the maximum size for the database connection pool.

    • Example value: 10
    • Type: required
    • Default value: 10
    - + \ No newline at end of file diff --git a/sinks/mongo-sink/index.html b/sinks/mongo-sink/index.html index 307c9813a..b9bc67cbd 100644 --- a/sinks/mongo-sink/index.html +++ b/sinks/mongo-sink/index.html @@ -8,13 +8,13 @@ MongoDB | Firehose - +

    MongoDB

    A MongoDB sink Firehose (SINK_TYPE= mongodb ) requires the following variables to be set along with Generic ones

    SINK_MONGO_CONNECTION_URLS

    MongoDB connection URL/URLs to connect. Multiple URLs could be given in a comma separated format.

    • Example value: localhost:27017
    • Type: required

    SINK_MONGO_DB_NAME

    The name of the Mongo database to which you want to write the documents. If it does not exist, it will be created.

    • Example value: sample_DB
    • Type: required

    SINK_MONGO_AUTH_ENABLE

    This field should be set to true if login authentication is enabled for the MongoDB Server.

    • Example value: true
    • Type: optional
    • Default value: false

    SINK_MONGO_AUTH_USERNAME

    The login username for session authentication to the MongoDB Server. This is a required field is SINK_MONGO_AUTH_ENABLE is set to true

    • Example value: bruce_wills
    • Type: optional

    SINK_MONGO_AUTH_PASSWORD

    The login password for session authentication to the MongoDB Server. This is a required field is SINK_MONGO_AUTH_ENABLE is set to true

    • Example value: password@123
    • Type: optional

    SINK_MONGO_AUTH_DB

    The name of the Mongo authentication database in which the user credentials are stored. This is a required field is SINK_MONGO_AUTH_ENABLE is set to true

    • Example value: sample_auth_DB
    • Type: optional

    SINK_MONGO_COLLECTION_NAME

    Defines the name of the Mongo Collection

    • Example value: customerCollection
    • Type: required

    SINK_MONGO_PRIMARY_KEY

    The identifier field of the document in MongoDB. This should be the key of a field present in the message (JSON or Protobuf) and it has to be a unique, non-null field. So the value of this field in the message will be copied to the _id field of the document in MongoDB while writing the document.

    Note - If this parameter is not specified in Upsert mode ( i.e. when the variableSINK_MONGO_MODE_UPDATE_ONLY_ENABLE=false), then Mongo server will assign the default UUID to the _id field, and only insert operations can be performed.

    Note - this variable is a required field in the case of Update-Only mode ( i.e. when the variableSINK_MONGO_MODE_UPDATE_ONLY_ENABLE=true). Also, all externally-fed documents must have this key copied to the _id field, for update operations to execute normally.

    • Example value: customer_id
    • Type: optional

    SINK_MONGO_MODE_UPDATE_ONLY_ENABLE

    MongoDB sink can be created in 2 modes: Upsert mode or UpdateOnly mode. If this config is set:

    • TRUE: Firehose will run on UpdateOnly mode which will only UPDATE the already existing documents in the MongoDB collection.
    • FALSE: Firehose will run on Upsert mode, UPDATING the existing documents and also INSERTING any new ones.
      • Example value: TRUE
      • Type: required
      • Default value: FALSE

    SINK_MONGO_INPUT_MESSAGE_TYPE

    Indicates if the Kafka topic contains JSON or Protocol Buffer messages.

    • Example value: PROTOBUF
    • Type: required
    • Default value: JSON

    SINK_MONGO_CONNECT_TIMEOUT_MS

    Defines the connect timeout of the MongoDB endpoint. The value specified is in milliseconds.

    • Example value: 60000
    • Type: required
    • Default value: 60000

    SINK_MONGO_RETRY_STATUS_CODE_BLACKLIST

    List of comma-separated status codes for which Firehose should not retry in case of UPDATE ONLY mode is TRUE

    • Example value: 16608,11000
    • Type: optional

    SINK_MONGO_PRESERVE_PROTO_FIELD_NAMES_ENABLE

    Whether or not the protobuf field names should be preserved in the MongoDB document. If false the fields will be converted to camel case.

    • Example value: false
    • Type: optional
    • Default: true

    SINK_MONGO_SERVER_SELECT_TIMEOUT_MS

    Sets the server selection timeout in milliseconds, which defines how long the driver will wait for server selection to succeed before throwing an exception. A value of 0 means that it will timeout immediately if no server is available. A negative value means to wait indefinitely.

    • Example value: 4000
    • Type: optional
    • Default: 30000
    - + \ No newline at end of file diff --git a/sinks/prometheus-sink/index.html b/sinks/prometheus-sink/index.html index 28ce963e8..866ad4d7a 100644 --- a/sinks/prometheus-sink/index.html +++ b/sinks/prometheus-sink/index.html @@ -8,13 +8,13 @@ Prometheus | Firehose - +

    Prometheus

    A Prometheus sink Firehose (SINK_TYPE=prometheus) requires the following variables to be set along with Generic ones.

    SINK_PROM_SERVICE_URL

    Defines the HTTP/Cortex endpoint of the service to which this consumer should POST data.

    • Example value: http://localhost:9009/api/prom/push
    • Type: required

    SINK_PROM_REQUEST_TIMEOUT_MS

    Defines the connection timeout for the request in millis.

    • Example value: 10000
    • Type: required
    • Default value: 10000

    SINK_PROM_MAX_CONNECTIONS

    Defines the maximum number of HTTP connections with Prometheus.

    • Example value: 10
    • Type: optional
    • Default value: default no more than 2 concurrent connections per given route and no more 20 connections

    SINK_PROM_RETRY_STATUS_CODE_RANGES

    Defines the range of HTTP status codes for which retry will be attempted.

    • Example value: 400-600
    • Type: optional
    • Default value: 400-600

    SINK_PROM_REQUEST_LOG_STATUS_CODE_RANGES

    Defines the range of HTTP status codes for which the request will be logged.

    • Example value: 400-499
    • Type: optional
    • Default value: 400-499

    SINK_PROM_HEADERS

    Defines the HTTP headers required to push the data to the above URL.

    • Example value: Authorization:auth_token, Accept:text/plain
    • Type: optional

    SINK_PROM_METRIC_NAME_PROTO_INDEX_MAPPING

    The mapping of fields and the corresponding proto index which will be set as the metric name on Cortex. This is a JSON field.

    • Example value: {"2":"tip_amount","1":"feedback_ratings"}
      • Proto field value with index 2 will be stored as metric named tip_amount in Cortex and so on
    • Type: required

    SINK_PROM_LABEL_NAME_PROTO_INDEX_MAPPING

    The mapping of proto fields to metric lables. This is a JSON field. Each metric defined in SINK_PROM_METRIC_NAME_PROTO_INDEX_MAPPING will have all the labels defined here.

    • Example value: {"6":"customer_id"}
    • Type: optional

    SINK_PROM_WITH_EVENT_TIMESTAMP

    If set to true, metric timestamp will using event timestamp otherwise it will using timestamp when Firehose push to endpoint.

    • Example value: false
    • Type: optional
    • Default value: false

    SINK_PROM_PROTO_EVENT_TIMESTAMP_INDEX

    Defines the proto index of a field that can be used as the timestamp.

    • Example value: 2
    • Type: required (if SINK_PROM_WITH_EVENT_TIMESTAMP=true)
    - + \ No newline at end of file diff --git a/sinks/redis-sink/index.html b/sinks/redis-sink/index.html index 03ec85ee5..fc7847280 100644 --- a/sinks/redis-sink/index.html +++ b/sinks/redis-sink/index.html @@ -8,13 +8,13 @@ Redis Sink | Firehose - +

    Redis Sink

    Redis Sink is implemented in Firehose using the Redis sink connector implementation in Depot. You can check out Depot Github repository here.

    Data Types

    Redis sink can be created in 3 different modes based on the value of SINK_REDIS_DATA_TYPE: HashSet, KeyValue or List

    • Hashset: For each message, an entry of the format key : field : value is generated and pushed to Redis. Field and value are generated on the basis of the config SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING
    • List: For each message entry of the format key : value is generated and pushed to Redis. Value is fetched for the Proto field name provided in the config SINK_REDIS_LIST_DATA_FIELD_NAME
    • KeyValue: For each message entry of the format key : value is generated and pushed to Redis. Value is fetched for the proto field name provided in the config SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME

    The key is picked up from a field in the message itself.

    Limitation: Depot Redis sink only supports Key-Value, HashSet and List entries as of now.

    Configuration

    For Redis sink in Firehose we need to set first (SINK_TYPE=redis). There are some generic configs which are common across different sink types which need to be set which are mentioned in generic.md. Redis sink specific configs are mentioned in Depot repository. You can check out the Redis Sink configs here

    Deployment Types

    Redis sink, as of now, supports two different Deployment Types Standalone and Cluster. This can be configured in the Depot environment variable SINK_REDIS_DEPLOYMENT_TYPE.

    - + \ No newline at end of file diff --git a/support/index.html b/support/index.html index 00b02fbbb..a7717c2f4 100644 --- a/support/index.html +++ b/support/index.html @@ -8,7 +8,7 @@ Need help? | Firehose - + @@ -16,7 +16,7 @@

    Need help?

    Need a bit of help? We're here for you. Check out our current issues, GitHub discussions

    Issues

    Have a general issue or bug that you've found? We'd love to hear about it in our GitHub issues. This can be feature requests too! Go to issues

    Discussions

    For help and questions about best practices, join our GitHub discussions. Browse and ask questions. Go to discussions

    - + \ No newline at end of file