Skip to content

Commit

Permalink
Merge pull request apache#3542 from yardenbm/add_kafka_sample
Browse files Browse the repository at this point in the history
Add beam kafka consumer sample
  • Loading branch information
hansva authored Jan 29, 2024
2 parents 80d4a58 + 72aafad commit c772fec
Showing 1 changed file with 324 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
<?xml version="1.0" encoding="UTF-8"?>
<pipeline>
<info>
<name>kafka-aggregation-pipeline</name>
<name_sync_with_filename>Y</name_sync_with_filename>
<description/>
<extended_description/>
<pipeline_version/>
<pipeline_type>Normal</pipeline_type>
<parameters>
</parameters>
<capture_transform_performance>N</capture_transform_performance>
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
<created_user>-</created_user>
<created_date>2024/01/11 09:02:53.092</created_date>
<modified_user>-</modified_user>
<modified_date>2024/01/11 09:02:53.092</modified_date>
</info>
<notepads>
<notepad>
<backgroundcolorblue>251</backgroundcolorblue>
<backgroundcolorgreen>232</backgroundcolorgreen>
<backgroundcolorred>201</backgroundcolorred>
<bordercolorblue>90</bordercolorblue>
<bordercolorgreen>58</bordercolorgreen>
<bordercolorred>14</bordercolorred>
<fontbold>N</fontbold>
<fontcolorblue>90</fontcolorblue>
<fontcolorgreen>58</fontcolorgreen>
<fontcolorred>14</fontcolorred>
<fontitalic>N</fontitalic>
<fontname>.AppleSystemUIFont</fontname>
<fontsize>13</fontsize>
<height>106</height>
<xloc>224</xloc>
<yloc>80</yloc>
<note>Before running this pipeline, make sure a kafka server is running with two topics: hop-in and hop-out.
Example run of producer:
# bin/kafka-console-producer.sh --topic hop-in --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"

produce messages into in-topic in json format (e.g. {"name":"toni"}), the pipeline counts the number of times each name is shown
</note>
<width>862</width>
</notepad>
</notepads>
<order>
<hop>
<from>Beam Kafka Consume</from>
<to>String to Rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>String to Rows</from>
<to>Beam Window</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Beam Window</from>
<to>Memory group by</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Memory group by</from>
<to>Rows to String</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Rows to String</from>
<to>Beam Kafka Produce</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Beam Kafka Consume</name>
<type>BeamKafkaConsume</type>
<description/>
<distribute>N</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<allow_commit_on_consumed>Y</allow_commit_on_consumed>
<bootstrap_servers>localhost:9092</bootstrap_servers>
<config_options>
<config_option>
<parameter>auto.offset.reset

</parameter>
<type>String</type>
<value>earliest</value>
</config_option>
</config_options>
<group_id>console-consumer-31887</group_id>
<key_field>key</key_field>
<message_field>message</message_field>
<message_type>String</message_type>
<restrict_to_committed>Y</restrict_to_committed>
<topics>hop-in</topics>
<use_create_time>N</use_create_time>
<use_log_append_time>N</use_log_append_time>
<use_processing_time>Y</use_processing_time>
<attributes/>
<GUI>
<xloc>224</xloc>
<yloc>288</yloc>
</GUI>
</transform>
<transform>
<name>Beam Kafka Produce</name>
<type>BeamKafkaProduce</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<bootstrap_servers>localhost:9092</bootstrap_servers>
<config_options>
</config_options>
<key_field>name</key_field>
<message_field>outputValue</message_field>
<topic>hop-out</topic>
<attributes/>
<GUI>
<xloc>1024</xloc>
<yloc>288</yloc>
</GUI>
</transform>
<transform>
<name>Beam Window</name>
<type>BeamWindow</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<allowed_lateness>0</allowed_lateness>
<discarding_fired_panes>N</discarding_fired_panes>
<duration>15</duration>
<end_window_field>endWindow</end_window_field>
<max_window_field>maxWindow</max_window_field>
<start_window_field>startWindow</start_window_field>
<trigger_type>None</trigger_type>
<window_type>Fixed</window_type>
<attributes/>
<GUI>
<xloc>544</xloc>
<yloc>288</yloc>
</GUI>
</transform>
<transform>
<name>String to Rows</name>
<type>JsonInput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<include>N</include>
<include_field/>
<rownum>Y</rownum>
<addresultfile>N</addresultfile>
<readurl>N</readurl>
<removeSourceField>N</removeSourceField>
<IsIgnoreEmptyFile>N</IsIgnoreEmptyFile>
<doNotFailIfNoFile>N</doNotFailIfNoFile>
<ignoreMissingPath>Y</ignoreMissingPath>
<defaultPathLeafToNull>Y</defaultPathLeafToNull>
<rownum_field>row</rownum_field>
<file>
<name/>
<filemask/>
<exclude_filemask/>
<file_required>N</file_required>
<include_subfolders>N</include_subfolders>
</file>
<fields>
<field>
<name>country</name>
<path>$.country</path>
<type>String</type>
<format/>
<currency/>
<decimal/>
<group/>
<length>-1</length>
<precision>-1</precision>
<trim_type>none</trim_type>
<repeat>N</repeat>
</field>
<field>
<name>id</name>
<path>$.id</path>
<type>String</type>
<format/>
<currency/>
<decimal/>
<group/>
<length>-1</length>
<precision>-1</precision>
<trim_type>none</trim_type>
<repeat>N</repeat>
</field>
<field>
<name>name</name>
<path>$.name</path>
<type>String</type>
<format/>
<currency/>
<decimal/>
<group/>
<length>-1</length>
<precision>-1</precision>
<trim_type>none</trim_type>
<repeat>N</repeat>
</field>
</fields>
<limit>0</limit>
<IsInFields>Y</IsInFields>
<IsAFile>N</IsAFile>
<valueField>message</valueField>
<shortFileFieldName/>
<pathFieldName/>
<hiddenFieldName/>
<lastModificationTimeFieldName/>
<uriNameFieldName/>
<rootUriNameFieldName/>
<extensionFieldName/>
<sizeFieldName/>
<attributes/>
<GUI>
<xloc>368</xloc>
<yloc>288</yloc>
</GUI>
</transform>
<transform>
<name>Rows to String</name>
<type>JsonOutput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<addToResult>N</addToResult>
<createParentFolder>N</createParentFolder>
<dateInFilename>N</dateInFilename>
<doNotOpenNewFileInit>Y</doNotOpenNewFileInit>
<encoding>UTF-8</encoding>
<extension>json</extension>
<fields>
<field>
<element>name</element>
<name>name</name>
</field>
<field>
<element>nameCount</element>
<name>nameCount</name>
</field>
</fields>
<fileAppended>N</fileAppended>
<fileAsCommand>N</fileAsCommand>
<fileName/>
<jsonBloc>data</jsonBloc>
<nrRowsInBloc>1</nrRowsInBloc>
<operation_type>outputvalue</operation_type>
<outputValue>outputValue</outputValue>
<partNrInFilename>N</partNrInFilename>
<specifyingFormat>N</specifyingFormat>
<timeInFilename>N</timeInFilename>
<transformNrInFilename>N</transformNrInFilename>
<attributes/>
<GUI>
<xloc>896</xloc>
<yloc>288</yloc>
</GUI>
</transform>
<transform>
<name>Memory group by</name>
<type>MemoryGroupBy</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<fields>
<field>
<aggregate>nameCount</aggregate>
<subject>name</subject>
<type>COUNT_ALL</type>
</field>
</fields>
<give_back_row>N</give_back_row>
<group>
<field>
<name>name</name>
</field>
</group>
<attributes/>
<GUI>
<xloc>672</xloc>
<yloc>288</yloc>
</GUI>
</transform>
<transform_error_handling>
</transform_error_handling>
<attributes/>
</pipeline>

0 comments on commit c772fec

Please sign in to comment.