Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Merge pull request #15 from srnagar/eh-iot-sample
Browse files Browse the repository at this point in the history
Update IoT Hub sample to use new Events Hubs API
  • Loading branch information
robinsh authored May 26, 2020
2 parents 6f7f0b6 + c0a460d commit 0ed62ca
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 100 deletions.
36 changes: 36 additions & 0 deletions iot-hub/Quickstarts/read-d2c-messages/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Notes

This sample demonstrates how to use the Microsoft Azure Event Hubs Client for Java to
read messages sent from a device by using the built-in event hubs that exists by default for
every Iot Hub instance.

## Get Event Hubs-compatible connection string

You can get the Event Hubs-compatible connection string to your IotHub instance via the Azure portal or
by using the Azure CLI.

If using the Azure portal, see [Built in endpoints for IotHub](https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-read-builtin#read-from-the-built-in-endpoint) to get the Event Hubs-compatible
connection string and assign it to the constant `connectionString` in the sample. You can skip the Azure CLI
instructions in the sample after this.

If using the Azure CLI, you will need to run the following commands before running this sample to get
the details required to form the Event Hubs compatible connection string.

- `az iot hub show --query properties.eventHubEndpoints.events.endpoint --name {your IoT Hub name}`
- `az iot hub show --query properties.eventHubEndpoints.events.path --name {your IoT Hub name}`
- `az iot hub policy show --name service --query primaryKey --hub-name {your IoT Hub name}`

## Checkpointing

For an example that uses checkpointing, follow up this sample with the [sample that uses
Azure Storage Blob to create checkpoints](https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/samples/java/com/azure/messaging/eventhubs/checkpointstore/blob/EventProcessorBlobCheckpointStoreSample.java).

Note that this requires adding a new dependency in your `pom.xml` to use the [Azure Storage Blob checkpoint store](https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/README.md).

## WebSocket and proxy

If using WebSockets, configure the `EventHubClientBuilder` to use transport type `AmqpTransportType.AMQP_WEB_SOCKETS`.

If your application runs behind a proxy server, then, in addition to setting the transport type to
`AmqpTransportType.AMQP_WEB_SOCKETS`, you also need to configure the proxy options as shown in the `setupProxy` method in
[the sample](./src/main/java/com/microsoft/docs/iothub/samples/ReadDeviceToCloudMessages.java).
11 changes: 3 additions & 8 deletions iot-hub/Quickstarts/read-d2c-messages/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,9 @@
<name>Read device-to-cloud messages</name>
<dependencies>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>1.0.0</version>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.1.0</version>
</dependency>
</dependencies>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,114 +2,181 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

// This application uses the Microsoft Azure Event Hubs Client for Java
// For samples see: https://github.com/Azure/azure-event-hubs/tree/master/samples/Java
// For documenation see: https://docs.microsoft.com/azure/event-hubs/
// For samples see: https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/eventhubs/azure-messaging-eventhubs/src/samples
// For documentation see: https://docs.microsoft.com/azure/event-hubs/

package com.microsoft.docs.iothub.samples;

import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
import com.microsoft.azure.eventhubs.PartitionReceiver;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.nio.charset.Charset;
import java.net.URI;
import java.net.URISyntaxException;

import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyAuthenticationType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.models.EventPosition;
import java.net.InetSocketAddress;
import java.net.Proxy;

/**
* A sample demonstrating how to receive events from Event Hubs sent from an IoT Hub device.
*/
public class ReadDeviceToCloudMessages {

private static final String EH_COMPATIBLE_CONNECTION_STRING_FORMAT = "Endpoint=%s/;EntityPath=%s;"
+ "SharedAccessKeyName=%s;SharedAccessKey=%s";

// az iot hub show --query properties.eventHubEndpoints.events.endpoint --name {your IoT Hub name}
private static final String eventHubsCompatibleEndpoint = "{your Event Hubs compatible endpoint}";
private static final String EVENT_HUBS_COMPATIBLE_ENDPOINT = "{your Event Hubs compatible endpoint}";

// az iot hub show --query properties.eventHubEndpoints.events.path --name {your IoT Hub name}
private static final String eventHubsCompatiblePath = "{your Event Hubs compatible name}";
private static final String EVENT_HUBS_COMPATIBLE_PATH = "{your Event Hubs compatible name}";

// az iot hub policy show --name service --query primaryKey --hub-name {your IoT Hub name}
private static final String iotHubSasKey = "{your service primary key}";
private static final String iotHubSasKeyName = "service";

// Track all the PartitionReciever instances created.
private static ArrayList<PartitionReceiver> receivers = new ArrayList<PartitionReceiver>();

// Asynchronously create a PartitionReceiver for a partition and then start
// reading any messages sent from the simulated client.
private static void receiveMessages(EventHubClient ehClient, String partitionId)
throws EventHubException, ExecutionException, InterruptedException {

final ExecutorService executorService = Executors.newSingleThreadExecutor();

// Create the receiver using the default consumer group.
// For the purposes of this sample, read only messages sent since
// the time the receiver is created. Typically, you don't want to skip any messages.
ehClient.createReceiver(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, partitionId,
EventPosition.fromEnqueuedTime(Instant.now())).thenAcceptAsync(receiver -> {
System.out.println(String.format("Starting receive loop on partition: %s", partitionId));
System.out.println(String.format("Reading messages sent since: %s", Instant.now().toString()));

receivers.add(receiver);

while (true) {
try {
// Check for EventData - this methods times out if there is nothing to retrieve.
Iterable<EventData> receivedEvents = receiver.receiveSync(100);

// If there is data in the batch, process it.
if (receivedEvents != null) {
for (EventData receivedEvent : receivedEvents) {
System.out.println(String.format("Telemetry received:\n %s",
new String(receivedEvent.getBytes(), Charset.defaultCharset())));
System.out.println(String.format("Application properties (set by device):\n%s",receivedEvent.getProperties().toString()));
System.out.println(String.format("System properties (set by IoT Hub):\n%s\n",receivedEvent.getSystemProperties().toString()));
}
}
} catch (EventHubException e) {
System.out.println("Error reading EventData");
}
}
}, executorService);
}
private static final String IOT_HUB_SAS_KEY = "{your service primary key}";
private static final String IOT_HUB_SAS_KEY_NAME = "service";

public static void main(String[] args)
throws EventHubException, ExecutionException, InterruptedException, IOException, URISyntaxException {
/**
* The main method to start the sample application that receives events from Event Hubs sent from an IoT Hub device.
*
* @param args ignored args.
* @throws Exception if there's an error running the application.
*/
public static void main(String[] args) throws Exception {

final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setEndpoint(new URI(eventHubsCompatibleEndpoint))
.setEventHubName(eventHubsCompatiblePath)
.setSasKeyName(iotHubSasKeyName)
.setSasKey(iotHubSasKey);
// Build the Event Hubs compatible connection string.
String eventHubCompatibleConnectionString = String.format(EH_COMPATIBLE_CONNECTION_STRING_FORMAT,
EVENT_HUBS_COMPATIBLE_ENDPOINT, EVENT_HUBS_COMPATIBLE_PATH, IOT_HUB_SAS_KEY_NAME, IOT_HUB_SAS_KEY);

// Create an EventHubClient instance to connect to the
// IoT Hub Event Hubs-compatible endpoint.
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);
// Setup the EventHubBuilder by configuring various options as needed.
EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.connectionString(eventHubCompatibleConnectionString);

// Use the EventHubRunTimeInformation to find out how many partitions
// there are on the hub.
final EventHubRuntimeInformation eventHubInfo = ehClient.getRuntimeInformation().get();
// uncomment to setup proxy
// setupProxy(eventHubClientBuilder);

// Create a PartitionReciever for each partition on the hub.
for (String partitionId : eventHubInfo.getPartitionIds()) {
receiveMessages(ehClient, partitionId);
}
// uncomment to use Web Sockets
// eventHubClientBuilder.transportType(AmqpTransportType.AMQP_WEB_SOCKETS);

// Shut down cleanly.
System.out.println("Press ENTER to exit.");
System.in.read();
System.out.println("Shutting down...");
for (PartitionReceiver receiver : receivers) {
receiver.closeSync();
// Create an async consumer client as configured in the builder.
try (EventHubConsumerAsyncClient eventHubConsumerAsyncClient = eventHubClientBuilder.buildAsyncConsumerClient()) {

receiveFromAllPartitions(eventHubConsumerAsyncClient);

// uncomment to run these samples
// receiveFromSinglePartition(eventHubConsumerAsyncClient);
// receiveFromSinglePartitionInBatches(eventHubConsumerAsyncClient);

// Shut down cleanly.
System.out.println("Press ENTER to exit.");
System.in.read();
System.out.println("Shutting down...");
}
ehClient.closeSync();
executorService.shutdown();
System.exit(0);
}

/**
* This method receives events from all partitions asynchronously starting from the newly available events in
* each partition.
*
* @param eventHubConsumerAsyncClient The {@link EventHubConsumerAsyncClient}.
*/
private static void receiveFromAllPartitions(EventHubConsumerAsyncClient eventHubConsumerAsyncClient) {

eventHubConsumerAsyncClient
.receive(false) // set this to false to read only the newly available events
.subscribe(partitionEvent -> {
System.out.println();
System.out.printf("%nTelemetry received from partition %s:%n%s",
partitionEvent.getPartitionContext().getPartitionId(), partitionEvent.getData().getBodyAsString());
System.out.printf("%nApplication properties (set by device):%n%s", partitionEvent.getData().getProperties());
System.out.printf("%nSystem properties (set by IoT Hub):%n%s",
partitionEvent.getData().getSystemProperties());
}, ex -> {
System.out.println("Error receiving events " + ex);
}, () -> {
System.out.println("Completed receiving events");
});
}

/**
* This method queries all available partitions in the Event Hub and picks a single partition to receive
* events asynchronously starting from the newly available event in that partition.
*
* @param eventHubConsumerAsyncClient The {@link EventHubConsumerAsyncClient}.
*/
private static void receiveFromSinglePartition(EventHubConsumerAsyncClient eventHubConsumerAsyncClient) {
eventHubConsumerAsyncClient
.getPartitionIds() // get all available partitions
.take(1) // pick a single partition
.flatMap(partitionId -> {
System.out.println("Receiving events from partition id " + partitionId);
return eventHubConsumerAsyncClient
.receiveFromPartition(partitionId, EventPosition.latest());
}).subscribe(partitionEvent -> {
System.out.println();
System.out.printf("%nTelemetry received from partition %s:%n%s",
partitionEvent.getPartitionContext().getPartitionId(), partitionEvent.getData().getBodyAsString());
System.out.printf("%nApplication properties (set by device):%n%s", partitionEvent.getData().getProperties());
System.out.printf("%nSystem properties (set by IoT Hub):%n%s",
partitionEvent.getData().getSystemProperties());
}, ex -> {
System.out.println("Error receiving events " + ex);
}, () -> {
System.out.println("Completed receiving events");
}
);
}

/**
* This method queries all available partitions in the Event Hub and picks a single partition to receive
* events asynchronously in batches of 100 events, starting from the newly available event in that partition.
*
* @param eventHubConsumerAsyncClient The {@link EventHubConsumerAsyncClient}.
*/
private static void receiveFromSinglePartitionInBatches(EventHubConsumerAsyncClient eventHubConsumerAsyncClient) {
int batchSize = 100;
eventHubConsumerAsyncClient
.getPartitionIds()
.take(1)
.flatMap(partitionId -> {
System.out.println("Receiving events from partition id " + partitionId);
return eventHubConsumerAsyncClient
.receiveFromPartition(partitionId, EventPosition.latest());
}).window(batchSize) // batch the events
.subscribe(partitionEvents -> {
partitionEvents.toIterable().forEach(partitionEvent -> {
System.out.println();
System.out.printf("%nTelemetry received from partition %s:%n%s",
partitionEvent.getPartitionContext().getPartitionId(), partitionEvent.getData().getBodyAsString());
System.out.printf("%nApplication properties (set by device):%n%s",
partitionEvent.getData().getProperties());
System.out.printf("%nSystem properties (set by IoT Hub):%n%s",
partitionEvent.getData().getSystemProperties());
});
}, ex -> {
System.out.println("Error receiving events " + ex);
}, () -> {
System.out.println("Completed receiving events");
}
);
}

/**
* This method sets up proxy options and updates the {@link EventHubClientBuilder}.
*
* @param eventHubClientBuilder The {@link EventHubClientBuilder}.
*/
private static void setupProxy(EventHubClientBuilder eventHubClientBuilder) {
int proxyPort = 8000; // replace with right proxy port
String proxyHost = "{hostname}";
Proxy proxyAddress = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
String userName = "{username}";
String password = "{password}";
ProxyOptions proxyOptions = new ProxyOptions(ProxyAuthenticationType.BASIC, proxyAddress,
userName, password);

eventHubClientBuilder.proxyOptions(proxyOptions);

// To use proxy, the transport type has to be Web Sockets.
eventHubClientBuilder.transportType(AmqpTransportType.AMQP_WEB_SOCKETS);
}
}

0 comments on commit 0ed62ca

Please sign in to comment.