diff --git a/iot-hub/Quickstarts/read-d2c-messages/README.md b/iot-hub/Quickstarts/read-d2c-messages/README.md new file mode 100644 index 0000000..336366e --- /dev/null +++ b/iot-hub/Quickstarts/read-d2c-messages/README.md @@ -0,0 +1,36 @@ +# Notes + +This sample demonstrates how to use the Microsoft Azure Event Hubs Client for Java to +read messages sent from a device by using the built-in event hubs that exists by default for +every Iot Hub instance. + +## Get Event Hubs-compatible connection string + +You can get the Event Hubs-compatible connection string to your IotHub instance via the Azure portal or +by using the Azure CLI. + +If using the Azure portal, see [Built in endpoints for IotHub](https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-read-builtin#read-from-the-built-in-endpoint) to get the Event Hubs-compatible +connection string and assign it to the constant `connectionString` in the sample. You can skip the Azure CLI +instructions in the sample after this. + +If using the Azure CLI, you will need to run the following commands before running this sample to get +the details required to form the Event Hubs compatible connection string. + +- `az iot hub show --query properties.eventHubEndpoints.events.endpoint --name {your IoT Hub name}` +- `az iot hub show --query properties.eventHubEndpoints.events.path --name {your IoT Hub name}` +- `az iot hub policy show --name service --query primaryKey --hub-name {your IoT Hub name}` + +## Checkpointing + +For an example that uses checkpointing, follow up this sample with the [sample that uses +Azure Storage Blob to create checkpoints](https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/samples/java/com/azure/messaging/eventhubs/checkpointstore/blob/EventProcessorBlobCheckpointStoreSample.java). + +Note that this requires adding a new dependency in your `pom.xml` to use the [Azure Storage Blob checkpoint store](https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/README.md). + +## WebSocket and proxy + +If using WebSockets, configure the `EventHubClientBuilder` to use transport type `AmqpTransportType.AMQP_WEB_SOCKETS`. + +If your application runs behind a proxy server, then, in addition to setting the transport type to +`AmqpTransportType.AMQP_WEB_SOCKETS`, you also need to configure the proxy options as shown in the `setupProxy` method in +[the sample](./src/main/java/com/microsoft/docs/iothub/samples/ReadDeviceToCloudMessages.java). diff --git a/iot-hub/Quickstarts/read-d2c-messages/pom.xml b/iot-hub/Quickstarts/read-d2c-messages/pom.xml index 79eaa09..1a53a82 100644 --- a/iot-hub/Quickstarts/read-d2c-messages/pom.xml +++ b/iot-hub/Quickstarts/read-d2c-messages/pom.xml @@ -12,14 +12,9 @@ Read device-to-cloud messages - com.microsoft.azure - azure-eventhubs - 1.0.0 - - - com.microsoft.azure - azure-eventhubs-eph - 1.0.0 + com.azure + azure-messaging-eventhubs + 5.1.0 diff --git a/iot-hub/Quickstarts/read-d2c-messages/src/main/java/com/microsoft/docs/iothub/samples/ReadDeviceToCloudMessages.java b/iot-hub/Quickstarts/read-d2c-messages/src/main/java/com/microsoft/docs/iothub/samples/ReadDeviceToCloudMessages.java index 2c7254d..2682caa 100644 --- a/iot-hub/Quickstarts/read-d2c-messages/src/main/java/com/microsoft/docs/iothub/samples/ReadDeviceToCloudMessages.java +++ b/iot-hub/Quickstarts/read-d2c-messages/src/main/java/com/microsoft/docs/iothub/samples/ReadDeviceToCloudMessages.java @@ -2,114 +2,181 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. // This application uses the Microsoft Azure Event Hubs Client for Java -// For samples see: https://github.com/Azure/azure-event-hubs/tree/master/samples/Java -// For documenation see: https://docs.microsoft.com/azure/event-hubs/ +// For samples see: https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/eventhubs/azure-messaging-eventhubs/src/samples +// For documentation see: https://docs.microsoft.com/azure/event-hubs/ package com.microsoft.docs.iothub.samples; -import com.microsoft.azure.eventhubs.ConnectionStringBuilder; -import com.microsoft.azure.eventhubs.EventData; -import com.microsoft.azure.eventhubs.EventHubClient; -import com.microsoft.azure.eventhubs.EventHubException; -import com.microsoft.azure.eventhubs.EventPosition; -import com.microsoft.azure.eventhubs.EventHubRuntimeInformation; -import com.microsoft.azure.eventhubs.PartitionReceiver; - -import java.io.IOException; -import java.time.Instant; -import java.util.ArrayList; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.nio.charset.Charset; -import java.net.URI; -import java.net.URISyntaxException; - +import com.azure.core.amqp.AmqpTransportType; +import com.azure.core.amqp.ProxyAuthenticationType; +import com.azure.core.amqp.ProxyOptions; +import com.azure.messaging.eventhubs.EventHubClientBuilder; +import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient; +import com.azure.messaging.eventhubs.models.EventPosition; +import java.net.InetSocketAddress; +import java.net.Proxy; + +/** + * A sample demonstrating how to receive events from Event Hubs sent from an IoT Hub device. + */ public class ReadDeviceToCloudMessages { + private static final String EH_COMPATIBLE_CONNECTION_STRING_FORMAT = "Endpoint=%s/;EntityPath=%s;" + + "SharedAccessKeyName=%s;SharedAccessKey=%s"; + // az iot hub show --query properties.eventHubEndpoints.events.endpoint --name {your IoT Hub name} - private static final String eventHubsCompatibleEndpoint = "{your Event Hubs compatible endpoint}"; + private static final String EVENT_HUBS_COMPATIBLE_ENDPOINT = "{your Event Hubs compatible endpoint}"; // az iot hub show --query properties.eventHubEndpoints.events.path --name {your IoT Hub name} - private static final String eventHubsCompatiblePath = "{your Event Hubs compatible name}"; + private static final String EVENT_HUBS_COMPATIBLE_PATH = "{your Event Hubs compatible name}"; // az iot hub policy show --name service --query primaryKey --hub-name {your IoT Hub name} - private static final String iotHubSasKey = "{your service primary key}"; - private static final String iotHubSasKeyName = "service"; - - // Track all the PartitionReciever instances created. - private static ArrayList receivers = new ArrayList(); - - // Asynchronously create a PartitionReceiver for a partition and then start - // reading any messages sent from the simulated client. - private static void receiveMessages(EventHubClient ehClient, String partitionId) - throws EventHubException, ExecutionException, InterruptedException { - - final ExecutorService executorService = Executors.newSingleThreadExecutor(); - - // Create the receiver using the default consumer group. - // For the purposes of this sample, read only messages sent since - // the time the receiver is created. Typically, you don't want to skip any messages. - ehClient.createReceiver(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, partitionId, - EventPosition.fromEnqueuedTime(Instant.now())).thenAcceptAsync(receiver -> { - System.out.println(String.format("Starting receive loop on partition: %s", partitionId)); - System.out.println(String.format("Reading messages sent since: %s", Instant.now().toString())); - - receivers.add(receiver); - - while (true) { - try { - // Check for EventData - this methods times out if there is nothing to retrieve. - Iterable receivedEvents = receiver.receiveSync(100); - - // If there is data in the batch, process it. - if (receivedEvents != null) { - for (EventData receivedEvent : receivedEvents) { - System.out.println(String.format("Telemetry received:\n %s", - new String(receivedEvent.getBytes(), Charset.defaultCharset()))); - System.out.println(String.format("Application properties (set by device):\n%s",receivedEvent.getProperties().toString())); - System.out.println(String.format("System properties (set by IoT Hub):\n%s\n",receivedEvent.getSystemProperties().toString())); - } - } - } catch (EventHubException e) { - System.out.println("Error reading EventData"); - } - } - }, executorService); - } + private static final String IOT_HUB_SAS_KEY = "{your service primary key}"; + private static final String IOT_HUB_SAS_KEY_NAME = "service"; - public static void main(String[] args) - throws EventHubException, ExecutionException, InterruptedException, IOException, URISyntaxException { + /** + * The main method to start the sample application that receives events from Event Hubs sent from an IoT Hub device. + * + * @param args ignored args. + * @throws Exception if there's an error running the application. + */ + public static void main(String[] args) throws Exception { - final ConnectionStringBuilder connStr = new ConnectionStringBuilder() - .setEndpoint(new URI(eventHubsCompatibleEndpoint)) - .setEventHubName(eventHubsCompatiblePath) - .setSasKeyName(iotHubSasKeyName) - .setSasKey(iotHubSasKey); + // Build the Event Hubs compatible connection string. + String eventHubCompatibleConnectionString = String.format(EH_COMPATIBLE_CONNECTION_STRING_FORMAT, + EVENT_HUBS_COMPATIBLE_ENDPOINT, EVENT_HUBS_COMPATIBLE_PATH, IOT_HUB_SAS_KEY_NAME, IOT_HUB_SAS_KEY); - // Create an EventHubClient instance to connect to the - // IoT Hub Event Hubs-compatible endpoint. - final ExecutorService executorService = Executors.newSingleThreadExecutor(); - final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService); + // Setup the EventHubBuilder by configuring various options as needed. + EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder() + .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) + .connectionString(eventHubCompatibleConnectionString); - // Use the EventHubRunTimeInformation to find out how many partitions - // there are on the hub. - final EventHubRuntimeInformation eventHubInfo = ehClient.getRuntimeInformation().get(); + // uncomment to setup proxy + // setupProxy(eventHubClientBuilder); - // Create a PartitionReciever for each partition on the hub. - for (String partitionId : eventHubInfo.getPartitionIds()) { - receiveMessages(ehClient, partitionId); - } + // uncomment to use Web Sockets + // eventHubClientBuilder.transportType(AmqpTransportType.AMQP_WEB_SOCKETS); - // Shut down cleanly. - System.out.println("Press ENTER to exit."); - System.in.read(); - System.out.println("Shutting down..."); - for (PartitionReceiver receiver : receivers) { - receiver.closeSync(); + // Create an async consumer client as configured in the builder. + try (EventHubConsumerAsyncClient eventHubConsumerAsyncClient = eventHubClientBuilder.buildAsyncConsumerClient()) { + + receiveFromAllPartitions(eventHubConsumerAsyncClient); + + // uncomment to run these samples + // receiveFromSinglePartition(eventHubConsumerAsyncClient); + // receiveFromSinglePartitionInBatches(eventHubConsumerAsyncClient); + + // Shut down cleanly. + System.out.println("Press ENTER to exit."); + System.in.read(); + System.out.println("Shutting down..."); } - ehClient.closeSync(); - executorService.shutdown(); - System.exit(0); + } + + /** + * This method receives events from all partitions asynchronously starting from the newly available events in + * each partition. + * + * @param eventHubConsumerAsyncClient The {@link EventHubConsumerAsyncClient}. + */ + private static void receiveFromAllPartitions(EventHubConsumerAsyncClient eventHubConsumerAsyncClient) { + + eventHubConsumerAsyncClient + .receive(false) // set this to false to read only the newly available events + .subscribe(partitionEvent -> { + System.out.println(); + System.out.printf("%nTelemetry received from partition %s:%n%s", + partitionEvent.getPartitionContext().getPartitionId(), partitionEvent.getData().getBodyAsString()); + System.out.printf("%nApplication properties (set by device):%n%s", partitionEvent.getData().getProperties()); + System.out.printf("%nSystem properties (set by IoT Hub):%n%s", + partitionEvent.getData().getSystemProperties()); + }, ex -> { + System.out.println("Error receiving events " + ex); + }, () -> { + System.out.println("Completed receiving events"); + }); + } + + /** + * This method queries all available partitions in the Event Hub and picks a single partition to receive + * events asynchronously starting from the newly available event in that partition. + * + * @param eventHubConsumerAsyncClient The {@link EventHubConsumerAsyncClient}. + */ + private static void receiveFromSinglePartition(EventHubConsumerAsyncClient eventHubConsumerAsyncClient) { + eventHubConsumerAsyncClient + .getPartitionIds() // get all available partitions + .take(1) // pick a single partition + .flatMap(partitionId -> { + System.out.println("Receiving events from partition id " + partitionId); + return eventHubConsumerAsyncClient + .receiveFromPartition(partitionId, EventPosition.latest()); + }).subscribe(partitionEvent -> { + System.out.println(); + System.out.printf("%nTelemetry received from partition %s:%n%s", + partitionEvent.getPartitionContext().getPartitionId(), partitionEvent.getData().getBodyAsString()); + System.out.printf("%nApplication properties (set by device):%n%s", partitionEvent.getData().getProperties()); + System.out.printf("%nSystem properties (set by IoT Hub):%n%s", + partitionEvent.getData().getSystemProperties()); + }, ex -> { + System.out.println("Error receiving events " + ex); + }, () -> { + System.out.println("Completed receiving events"); + } + ); + } + + /** + * This method queries all available partitions in the Event Hub and picks a single partition to receive + * events asynchronously in batches of 100 events, starting from the newly available event in that partition. + * + * @param eventHubConsumerAsyncClient The {@link EventHubConsumerAsyncClient}. + */ + private static void receiveFromSinglePartitionInBatches(EventHubConsumerAsyncClient eventHubConsumerAsyncClient) { + int batchSize = 100; + eventHubConsumerAsyncClient + .getPartitionIds() + .take(1) + .flatMap(partitionId -> { + System.out.println("Receiving events from partition id " + partitionId); + return eventHubConsumerAsyncClient + .receiveFromPartition(partitionId, EventPosition.latest()); + }).window(batchSize) // batch the events + .subscribe(partitionEvents -> { + partitionEvents.toIterable().forEach(partitionEvent -> { + System.out.println(); + System.out.printf("%nTelemetry received from partition %s:%n%s", + partitionEvent.getPartitionContext().getPartitionId(), partitionEvent.getData().getBodyAsString()); + System.out.printf("%nApplication properties (set by device):%n%s", + partitionEvent.getData().getProperties()); + System.out.printf("%nSystem properties (set by IoT Hub):%n%s", + partitionEvent.getData().getSystemProperties()); + }); + }, ex -> { + System.out.println("Error receiving events " + ex); + }, () -> { + System.out.println("Completed receiving events"); + } + ); + } + + /** + * This method sets up proxy options and updates the {@link EventHubClientBuilder}. + * + * @param eventHubClientBuilder The {@link EventHubClientBuilder}. + */ + private static void setupProxy(EventHubClientBuilder eventHubClientBuilder) { + int proxyPort = 8000; // replace with right proxy port + String proxyHost = "{hostname}"; + Proxy proxyAddress = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); + String userName = "{username}"; + String password = "{password}"; + ProxyOptions proxyOptions = new ProxyOptions(ProxyAuthenticationType.BASIC, proxyAddress, + userName, password); + + eventHubClientBuilder.proxyOptions(proxyOptions); + + // To use proxy, the transport type has to be Web Sockets. + eventHubClientBuilder.transportType(AmqpTransportType.AMQP_WEB_SOCKETS); } }