Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding example file for BigTable #83

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<properties>
<repoName>SolaceProducts</repoName>

<beam.version>2.35.0</beam.version>
<beam.version>2.55.0</beam.version>
<jcsmp.version>10.13.0</jcsmp.version>
<slf4j.version>1.7.25</slf4j.version>
<pmd.version>6.37.0</pmd.version>
Expand Down
29 changes: 25 additions & 4 deletions solace-apache-beam-samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
<description>Samples for the Apache Beam I/O Component for Solace PubSub+</description>

<properties>
<beam.version>2.35.0</beam.version>
<solace-beam.version>1.3.0-SNAPSHOT</solace-beam.version>
<beam.version>2.55.0</beam.version>
<solace-beam.version>1.2.0</solace-beam.version>

<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
Expand Down Expand Up @@ -109,6 +109,18 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-beam</artifactId>
<version>2.12.0</version>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>2.37.0</version>
</dependency>

<dependency>
<groupId>com.solace.test.integration</groupId>
<artifactId>pubsubplus-testcontainer</artifactId>
Expand All @@ -133,6 +145,17 @@
<version>3.12.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-vendor-guava-26_0-jre</artifactId>
<version>0.1</version>
<scope>compile</scope>
</dependency>
</dependencies>

<profiles>
Expand All @@ -146,7 +169,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
Expand All @@ -158,7 +180,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package com.solace.connector.beam.examples;

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import com.solace.connector.beam.SolaceIO;
import com.solace.connector.beam.examples.common.SolaceTextRecord;
import com.solacesystems.jcsmp.JCSMPProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;

/**
* An example that binds to a Solace queue, consumes messages, and then writes them to BigTable.
*
* You will need to make sure there is a BigTable table with appropriate schema already created.
*
* <p>By default, the examples will run with the {@code DirectRunner}. To run the pipeline on
* Google Dataflow, specify:
*
* <pre>{@code
* --runner=DataflowRunner
* }</pre>
* <p>
*/

public class SolaceBeamBigTable {

private static final Logger LOG = LoggerFactory.getLogger(SolaceRecordTest.class);

public interface Options extends DataflowPipelineOptions {

@Description("IP and port of the client appliance. (e.g. -cip=192.168.160.101)")
String getCip();

void setCip(String value);

@Description("VPN name")
String getVpn();

void setVpn(String value);

@Description("Client username")
String getCu();

void setCu(String value);

@Description("Client password (default '')")
@Default.String("")
String getCp();

void setCp(String value);

@Description("List of queues for subscribing")
String getSql();

void setSql(String value);

@Description("Enable reading sender timestamp to determine freshness of data")
@Default.Boolean(false)
boolean getSts();

void setSts(boolean value);

@Description("Enable reading sender sequence number to determine duplication of data")
@Default.Boolean(false)
boolean getSmi();

void setSmi(boolean value);

@Description("The timeout in milliseconds while try to receive a messages from Solace broker")
@Default.Integer(100)
int getTimeout();

void setTimeout(int timeoutInMillis);

@Description("The Bigtable project ID, this can be different than your Dataflow project")
@Default.String("bigtable-project")
String getBigtableProjectId();

void setBigtableProjectId(String bigtableProjectId);

@Description("The Bigtable instance ID")
@Default.String("bigtable-instance")
String getBigtableInstanceId();

void setBigtableInstanceId(String bigtableInstanceId);

@Description("The Bigtable table ID in the instance.")
@Default.String("bigtable-table")
String getBigtableTableId();

void setBigtableTableId(String bigtableTableId);
}

private static void WriteToBigTable(Options options) throws Exception {

List<String> queues = Arrays.asList(options.getSql().split(","));
boolean useSenderMsgId = options.getSmi();

/** Create pipeline **/
Pipeline pipeline = Pipeline.create(options);

/** Set Solace connection properties **/
JCSMPProperties jcsmpProperties = new JCSMPProperties();
jcsmpProperties.setProperty(JCSMPProperties.HOST, options.getCip());
jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, options.getVpn());
jcsmpProperties.setProperty(JCSMPProperties.USERNAME, options.getCu());
jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, options.getCp());

/** Create object for BigTable table configuration to be used later to run the pipeline **/
CloudBigtableTableConfiguration bigtableTableConfig =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.build();

/* The pipeline consists of three components:
* 1. Reading message from Solace queue
* 2. Doing any necessary transformation and creating a BigTable row
* 3. Writing the row to BigTable
*/
pipeline.apply(SolaceIO.read(jcsmpProperties, queues, SolaceTextRecord.getCoder(), SolaceTextRecord.getMapper())
.withUseSenderTimestamp(options.getSts())
.withAdvanceTimeoutInMillis(options.getTimeout()))
.apply("Map to BigTable row",
ParDo.of(
new DoFn<SolaceTextRecord, Mutation>() {
@ProcessElement
public void processElement(ProcessContext c) {

String uniqueID = UUID.randomUUID().toString();

Put row = new Put(Bytes.toBytes(uniqueID));

/** Create row that will be written to BigTable **/
row.addColumn(
Bytes.toBytes("stats"),
null,
c.element().getPayload().getBytes(StandardCharsets.UTF_8));
c.output(row);
}
}))
.apply("Write to BigTable",
CloudBigtableIO.writeToTable(bigtableTableConfig));

PipelineResult result = pipeline.run();

try {
result.waitUntilFinish();
} catch (Exception exc) {
result.cancel();
}

}

public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(SolaceBeamBigTable.Options.class);

try {
WriteToBigTable(options);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Loading