diff --git a/Microsoft.Azure.EventHubs.sln b/Microsoft.Azure.EventHubs.sln
index fb81122..3537037 100644
--- a/Microsoft.Azure.EventHubs.sln
+++ b/Microsoft.Azure.EventHubs.sln
@@ -20,12 +20,6 @@ Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Microsoft.Azure.EventHubs.P
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Microsoft.Azure.EventHubs.Processor.UnitTests", "test\Microsoft.Azure.EventHubs.Processor.UnitTests\Microsoft.Azure.EventHubs.Processor.UnitTests.xproj", "{F7F892F4-4490-4BC9-BB18-F42F2C85E345}"
EndProject
-Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{188550D8-80F0-410D-AECA-BC3C1F96CF95}"
-EndProject
-Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "SampleSender", "samples\SampleSender\SampleSender.xproj", "{940BCA4A-B154-4667-8A03-57E6B689866D}"
-EndProject
-Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "SampleEphReceiver", "samples\SampleEphReceiver\SampleEphReceiver.xproj", "{679DFCC5-76BD-4725-A51E-AFBB01565401}"
-EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
diff --git a/global.json b/global.json
index 5e6422e..4ad282f 100644
--- a/global.json
+++ b/global.json
@@ -1,3 +1,6 @@
-{
- "projects": [ "src", "test" ]
+{
+ "projects": ["src", "test"],
+ "sdk": {
+ "version": "1.0.0-preview2-003121"
+ }
}
diff --git a/readme.md b/readme.md
index 0c065f3..a092bac 100644
--- a/readme.md
+++ b/readme.md
@@ -1,4 +1,4 @@
-
+
@@ -39,9 +39,9 @@ consumers that filter and/or transform event streams and then forward them on to
### Getting Started
-To get started sending events to an Event Hub refer to [Get started sending messages to Event Hubs in .NET Core](./samples/SampleSender/readme.md).
+To get started sending events to an Event Hub refer to [Get started sending messages to Event Hubs in .NET Core](https://github.com/Azure/azure-event-hubs/tree/master/samples/SampleSender).
-To get started receiving events with the **EventProcessorHost** refer to [Get started receiving messages with the EventProcessorHost in .NET Core](./samples/SampleEphReceiver/readme.md ).
+To get started receiving events with the **EventProcessorHost** refer to [Get started receiving messages with the EventProcessorHost in .NET Core](https://github.com/Azure/azure-event-hubs/tree/master/samples/SampleEphReceiver).
### Running the unit tests
diff --git a/samples/SampleEphReceiver/Program.cs b/samples/SampleEphReceiver/Program.cs
deleted file mode 100644
index d249d6a..0000000
--- a/samples/SampleEphReceiver/Program.cs
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright (c) Microsoft. All rights reserved.
-// Licensed under the MIT license. See LICENSE file in the project root for full license information.
-
-namespace SampleEphReceiver
-{
- using System;
- using System.Threading.Tasks;
- using Microsoft.Azure.EventHubs;
- using Microsoft.Azure.EventHubs.Processor;
-
- public class Program
- {
- private const string EhConnectionString = "{Event Hubs connection string}";
- private const string EhEntityPath = "{Event Hub path/name}";
- private const string StorageContainerName = "{Storage account container name}";
- private const string StorageAccountName = "{Storage account name}";
- private const string StorageAccountKey = "{Storage account key}";
-
- private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
-
- public static void Main(string[] args)
- {
- MainAsync(args).GetAwaiter().GetResult();
- }
-
- private static async Task MainAsync(string[] args)
- {
- Console.WriteLine("Registering EventProcessor...");
-
- var eventProcessorHost = new EventProcessorHost(
- EhEntityPath,
- PartitionReceiver.DefaultConsumerGroupName,
- EhConnectionString,
- StorageConnectionString,
- StorageContainerName);
-
- // Registers the Event Processor Host and starts receiving messages
- await eventProcessorHost.RegisterEventProcessorAsync();
-
- Console.WriteLine("Receiving. Press enter key to stop worker.");
- Console.ReadLine();
-
- // Disposes of the Event Processor Host
- await eventProcessorHost.UnregisterEventProcessorAsync();
- }
- }
-}
diff --git a/samples/SampleEphReceiver/Properties/AssemblyInfo.cs b/samples/SampleEphReceiver/Properties/AssemblyInfo.cs
deleted file mode 100644
index 9e4abd7..0000000
--- a/samples/SampleEphReceiver/Properties/AssemblyInfo.cs
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright (c) Microsoft. All rights reserved.
-// Licensed under the MIT license. See LICENSE file in the project root for full license information.
-
-using System.Reflection;
-using System.Runtime.InteropServices;
-
-// General Information about an assembly is controlled through the following
-// set of attributes. Change these attribute values to modify the information
-// associated with an assembly.
-[assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("")]
-[assembly: AssemblyProduct("SampleReceiver")]
-[assembly: AssemblyTrademark("")]
-
-// Setting ComVisible to false makes the types in this assembly not visible
-// to COM components. If you need to access a type in this assembly from
-// COM, set the ComVisible attribute to true on that type.
-[assembly: ComVisible(false)]
-
-// The following GUID is for the ID of the typelib if this project is exposed to COM
-[assembly: Guid("679dfcc5-76bd-4725-a51e-afbb01565401")]
diff --git a/samples/SampleEphReceiver/SampleEphReceiver.xproj b/samples/SampleEphReceiver/SampleEphReceiver.xproj
deleted file mode 100644
index f7ef2e6..0000000
--- a/samples/SampleEphReceiver/SampleEphReceiver.xproj
+++ /dev/null
@@ -1,21 +0,0 @@
-
-
-
- 14.0
- $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)
-
-
-
-
- 679dfcc5-76bd-4725-a51e-afbb01565401
- SampleReceiver
- .\obj\
- .\bin\
- v4.5.2
-
-
-
- 2.0
-
-
-
diff --git a/samples/SampleEphReceiver/SimpleEventProcessor.cs b/samples/SampleEphReceiver/SimpleEventProcessor.cs
deleted file mode 100644
index dd73a14..0000000
--- a/samples/SampleEphReceiver/SimpleEventProcessor.cs
+++ /dev/null
@@ -1,44 +0,0 @@
-// Copyright (c) Microsoft. All rights reserved.
-// Licensed under the MIT license. See LICENSE file in the project root for full license information.
-
-namespace SampleEphReceiver
-{
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading.Tasks;
- using Microsoft.Azure.EventHubs;
- using Microsoft.Azure.EventHubs.Processor;
-
- public class SimpleEventProcessor : IEventProcessor
- {
- public Task CloseAsync(PartitionContext context, CloseReason reason)
- {
- Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
- return Task.CompletedTask;
- }
-
- public Task OpenAsync(PartitionContext context)
- {
- Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
- return Task.CompletedTask;
- }
-
- public Task ProcessErrorAsync(PartitionContext context, Exception error)
- {
- Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
- return Task.CompletedTask;
- }
-
- public Task ProcessEventsAsync(PartitionContext context, IEnumerable messages)
- {
- foreach (var eventData in messages)
- {
- var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
- Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
- }
-
- return context.CheckpointAsync();
- }
- }
-}
diff --git a/samples/SampleEphReceiver/project.json b/samples/SampleEphReceiver/project.json
deleted file mode 100644
index 9d169d2..0000000
--- a/samples/SampleEphReceiver/project.json
+++ /dev/null
@@ -1,30 +0,0 @@
-{
- "version": "1.0.0",
- "buildOptions": {
- "emitEntryPoint": true
- },
-
- "dependencies": {
- "Microsoft.Azure.EventHubs.Processor": {
- "target": "project"
- },
- "Microsoft.Azure.EventHubs": {
- "target": "project"
- }
- },
-
- "frameworks": {
- "netcoreapp1.0": {
- "dependencies": {
- "Microsoft.NETCore.App": {
- "type": "platform",
- "version": "1.0.0"
- }
- },
- "imports": [
- "dnxcore50",
- "portable-net45+win8"
- ]
- }
- }
-}
\ No newline at end of file
diff --git a/samples/SampleEphReceiver/readme.md b/samples/SampleEphReceiver/readme.md
deleted file mode 100644
index a5b0133..0000000
--- a/samples/SampleEphReceiver/readme.md
+++ /dev/null
@@ -1,190 +0,0 @@
-# Get started receiving messages with the EventProcessorHost in .NET Core
-
-## What will be accomplished
-
-This tutorial will walk-through how to create the existing solution **SampleEphReceiver** (inside this folder). You can run the solution as-is replacing the EhConnectionString/EhEntityPath/StorageAccount settings with your Event Hub and storage account values, or follow this tutorial to create your own.
-
-In this tutorial, we will write a .NET Core console application to receive messages from an Event Hub using the **EventProcessorHost**.
-
-## Prerequisites
-
-1. [Visual Studio 2015](http://www.visualstudio.com).
-
-2. [.NET Core Visual Studio 2015 Tooling](http://www.microsoft.com/net/core).
-
-3. An Azure subscription.
-
-4. An Event Hubs namespace.
-
-## Receive messages from the Event Hub
-
-### Create a console application
-
-1. Launch Visual Studio and create a new .NET Core console application.
-
-### Add the Event Hubs NuGet package
-
-1. Right-click the newly created project and select **Manage NuGet Packages**.
-
-2. Click the **Browse** tab, then search for “Microsoft Azure Event Processor Host” and select the **Microsoft Azure Event Processor Host** item. Click **Install** to complete the installation, then close this dialog box.
-
-### Implement the IEventProcessor interface
-
-1. Create a new class called `SimpleEventProcessor'.
-
-2. Add the following `using` statements to the top of the SimpleEventProcessor.cs file.
-
- ```cs
- using Microsoft.Azure.EventHubs;
- using Microsoft.Azure.EventHubs.Processor;
- ```
-
-3. Implement the `IEventProcessor` interface. The class should look like this:
-
- ```cs
- namespace SampleEphReceiver
- {
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading.Tasks;
- using Microsoft.Azure.EventHubs;
- using Microsoft.Azure.EventHubs.Processor;
-
- public class SimpleEventProcessor : IEventProcessor
- {
- public Task CloseAsync(PartitionContext context, CloseReason reason)
- {
- Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
- return Task.CompletedTask;
- }
-
- public Task OpenAsync(PartitionContext context)
- {
- Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
- return Task.CompletedTask;
- }
-
- public Task ProcessErrorAsync(PartitionContext context, Exception error)
- {
- Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
- return Task.CompletedTask;
- }
-
- public Task ProcessEventsAsync(PartitionContext context, IEnumerable messages)
- {
- foreach (var eventData in messages)
- {
- var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
- Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
- }
-
- return context.CheckpointAsync();
- }
- }
- }
- ```
-
-### Write a main console method that uses `SimpleEventProcessor` to receive messages from an Event Hub
-
-1. Add the following `using` statements to the top of the Program.cs file.
-
- ```cs
- using Microsoft.Azure.EventHubs;
- using Microsoft.Azure.EventHubs.Processor;
- ```
-
-2. Add constants to the `Program` class for the Event Hubs connection string, Event Hub path, storage container name, storage account name, and storage account key. Replace placeholders with their corresponding values.
-
- ```cs
- private const string EhConnectionString = "{Event Hubs connection string}";
- private const string EhEntityPath = "{Event Hub path/name}";
- private const string StorageContainerName = "{Storage account container name}";
- private const string StorageAccountName = "{Storage account name}";
- private const string StorageAccountKey = "{Storage account key}";
-
- private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
- ```
-
-3. Add a new method named `MainAsync` to the `Program` class like the following:
- ```cs
- private static async Task MainAsync(string[] args)
- {
- Console.WriteLine("Registering EventProcessor...");
-
- var eventProcessorHost = new EventProcessorHost(
- EhEntityPath,
- PartitionReceiver.DefaultConsumerGroupName,
- EhConnectionString,
- StorageConnectionString,
- StorageContainerName);
-
- // Registers the Event Processor Host and starts receiving messages
- await eventProcessorHost.RegisterEventProcessorAsync();
-
- Console.WriteLine("Receiving. Press enter key to stop worker.");
- Console.ReadLine();
-
- // Disposes of the Event Processor Host
- await eventProcessorHost.UnregisterEventProcessorAsync();
- }
- ```
-
-3. Add the following line of code to the `Main` method:
-
- ```cs
- MainAsync(args).GetAwaiter().GetResult();
- ```
-
- Here is what your Program.cs file should look like:
-
- ```cs
- namespace SampleEphReceiver
- {
- using System;
- using System.Threading.Tasks;
- using Microsoft.Azure.EventHubs;
- using Microsoft.Azure.EventHubs.Processor;
-
- public class Program
- {
- private const string EhConnectionString = "{Event Hubs connection string}";
- private const string EhEntityPath = "{Event Hub path/name}";
- private const string StorageContainerName = "{Storage account container name}";
- private const string StorageAccountName = "{Storage account name}";
- private const string StorageAccountKey = "{Storage account key}";
-
- private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
-
- public static void Main(string[] args)
- {
- MainAsync(args).GetAwaiter().GetResult();
- }
-
- private static async Task MainAsync(string[] args)
- {
- Console.WriteLine("Registering EventProcessor...");
-
- var eventProcessorHost = new EventProcessorHost(
- EhEntityPath,
- PartitionReceiver.DefaultConsumerGroupName,
- EhConnectionString,
- StorageConnectionString,
- StorageContainerName);
-
- // Registers the Event Processor Host and starts receiving messages
- await eventProcessorHost.RegisterEventProcessorAsync();
-
- Console.WriteLine("Receiving. Press enter key to stop worker.");
- Console.ReadLine();
-
- // Disposes of the Event Processor Host
- await eventProcessorHost.UnregisterEventProcessorAsync();
- }
- }
- }
- ```
-
-4. Run the program, and ensure that there are no errors.
-
-Congratulations! You have now received messages from an Event Hub.
\ No newline at end of file
diff --git a/samples/SampleSender/Program.cs b/samples/SampleSender/Program.cs
deleted file mode 100644
index 9a66622..0000000
--- a/samples/SampleSender/Program.cs
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (c) Microsoft. All rights reserved.
-// Licensed under the MIT license. See LICENSE file in the project root for full license information.
-
-namespace SampleSender
-{
- using System;
- using System.Text;
- using System.Threading.Tasks;
- using Microsoft.Azure.EventHubs;
-
- public class Program
- {
- private static EventHubClient eventHubClient;
- private const string EhConnectionString = "{Event Hubs connection string}";
- private const string EhEntityPath = "{Event Hub path/name}";
-
- public static void Main(string[] args)
- {
- MainAsync(args).GetAwaiter().GetResult();
- }
-
- private static async Task MainAsync(string[] args)
- {
- // Creates an EventHubsConnectionStringBuilder object from a the connection string, and sets the EntityPath.
- // Typically the connection string should have the Entity Path in it, but for the sake of this simple scenario
- // we are using the connection string from the namespace.
- var connectionStringBuilder = new EventHubsConnectionStringBuilder(EhConnectionString)
- {
- EntityPath = EhEntityPath
- };
-
- eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
-
- await SendMessagesToEventHub(100);
-
- await eventHubClient.CloseAsync();
-
- Console.WriteLine("Press any key to exit.");
- Console.ReadLine();
- }
-
- // Creates an Event Hub client and sends 100 messages to the event hub.
- private static async Task SendMessagesToEventHub(int numMessagesToSend)
- {
- for (var i = 0; i < numMessagesToSend; i++)
- {
- try
- {
- var message = $"Message {i}";
- Console.WriteLine($"Sending message: {message}");
- await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
- }
- catch (Exception exception)
- {
- Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
- }
-
- await Task.Delay(10);
- }
-
- Console.WriteLine($"{numMessagesToSend} messages sent.");
- }
- }
-}
diff --git a/samples/SampleSender/Properties/AssemblyInfo.cs b/samples/SampleSender/Properties/AssemblyInfo.cs
deleted file mode 100644
index 2aebac9..0000000
--- a/samples/SampleSender/Properties/AssemblyInfo.cs
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright (c) Microsoft. All rights reserved.
-// Licensed under the MIT license. See LICENSE file in the project root for full license information.
-
-using System.Reflection;
-using System.Runtime.InteropServices;
-
-// General Information about an assembly is controlled through the following
-// set of attributes. Change these attribute values to modify the information
-// associated with an assembly.
-[assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("")]
-[assembly: AssemblyProduct("SampleSender")]
-[assembly: AssemblyTrademark("")]
-
-// Setting ComVisible to false makes the types in this assembly not visible
-// to COM components. If you need to access a type in this assembly from
-// COM, set the ComVisible attribute to true on that type.
-[assembly: ComVisible(false)]
-
-// The following GUID is for the ID of the typelib if this project is exposed to COM
-[assembly: Guid("940bca4a-b154-4667-8a03-57e6b689866d")]
diff --git a/samples/SampleSender/SampleSender.xproj b/samples/SampleSender/SampleSender.xproj
deleted file mode 100644
index cf9229d..0000000
--- a/samples/SampleSender/SampleSender.xproj
+++ /dev/null
@@ -1,19 +0,0 @@
-
-
-
- 14.0
- $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)
-
-
-
- 940bca4a-b154-4667-8a03-57e6b689866d
- SampleSender
- .\obj\
- .\bin\
- v4.5.2
-
-
- 2.0
-
-
-
\ No newline at end of file
diff --git a/samples/SampleSender/project.json b/samples/SampleSender/project.json
deleted file mode 100644
index 6d73615..0000000
--- a/samples/SampleSender/project.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
- "version": "1.0.0",
- "buildOptions": {
- "emitEntryPoint": true
- },
-
- "dependencies": {
- "Microsoft.Azure.EventHubs": {
- "target": "project"
- },
- "Microsoft.NETCore.App": {
- "type": "platform",
- "version": "1.0.1"
- }
- },
-
- "frameworks": {
- "netcoreapp1.0": {
- "imports": "dnxcore50"
- }
- }
-}
diff --git a/samples/SampleSender/readme.md b/samples/SampleSender/readme.md
deleted file mode 100644
index 58a85a6..0000000
--- a/samples/SampleSender/readme.md
+++ /dev/null
@@ -1,173 +0,0 @@
-# Get started sending messages to Event Hubs in .NET Core
-
-## What will be accomplished
-
-This tutorial will walk-through how to create the existing solution **SampleSender** (inside this folder). You can run the solution as-is replacing the EhConnectionString/EhEntityPath with your Event Hub values, or follow this tutorial to create your own.
-
-In this tutorial, we will write a .NET Core console application to send messages to an Event Hub.
-
-## Prerequisites
-
-1. [Visual Studio 2015](http://www.visualstudio.com).
-
-2. [.NET Core Visual Studio 2015 Tooling](http://www.microsoft.com/net/core).
-
-3. An Azure subscription.
-
-4. An Event Hubs namespace.
-
-## Send messages to an Event Hub
-
-To send messages to an Event Hub, we will write a C# console application using Visual Studio.
-
-### Create a console application
-
-1. Launch Visual Studio and create a new .NET Core console application.
-
-### Add the Event Hubs NuGet package
-
-1. Right-click the newly created project and select **Manage NuGet Packages**.
-
-2. Click the **Browse** tab, then search for “Microsoft Azure Event Hubs” and select the **Microsoft Azure Event Hubs** item. Click **Install** to complete the installation, then close this dialog box.
-
-### Write some code to send messages to the Event Hub
-
-1. Add the following `using` statement to the top of the Program.cs file.
-
- ```cs
- using Microsoft.Azure.EventHubs;
- ```
-
-2. Add constants to the `Program` class for the Event Hubs connection string and entity path (individual Event Hub name). Replace the placeholders in brackets with the proper values that were obtained when creating the Event Hub.
-
- ```cs
- private static EventHubClient eventHubClient;
- private const string EhConnectionString = "{Event Hubs connection string}";
- private const string EhEntityPath = "{Event Hub path/name}";
- ```
-
-3. Add a new method named `MainAsync` to the `Program` class like the following:
-
- ```cs
- private static async Task MainAsync(string[] args)
- {
- // Creates an EventHubsConnectionStringBuilder object from a the connection string, and sets the EntityPath.
- // Typically the connection string should have the Entity Path in it, but for the sake of this simple scenario
- // we are using the connection string from the namespace.
- var connectionStringBuilder = new EventHubsConnectionStringBuilder(EhConnectionString)
- {
- EntityPath = EhEntityPath
- };
-
- eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
-
- await SendMessagesToEventHub(100);
-
- await eventHubClient.CloseAsync();
-
- Console.WriteLine("Press any key to exit.");
- Console.ReadLine();
- }
- ```
-
-4. Add a new method named `SendMessagesToEventHub` to the `Program` class like the following:
-
- ```cs
- // Creates an Event Hub client and sends 100 messages to the event hub.
- private static async Task SendMessagesToEventHub(int numMessagesToSend)
- {
- for (var i = 0; i < numMessagesToSend; i++)
- {
- try
- {
- var message = $"Message {i}";
- Console.WriteLine($"Sending message: {message}");
- await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
- }
- catch (Exception exception)
- {
- Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
- }
-
- await Task.Delay(10);
- }
-
- Console.WriteLine($"{numMessagesToSend} messages sent.");
- }
- ```
-
-5. Add the following code to the `Main` method in the `Program` class.
-
- ```cs
- MainAsync(args).GetAwaiter().GetResult();
- ```
-
- Here is what your Program.cs should look like.
-
- ```cs
- namespace SampleSender
- {
- using System;
- using System.Text;
- using System.Threading.Tasks;
- using Microsoft.Azure.EventHubs;
-
- public class Program
- {
- private static EventHubClient eventHubClient;
- private const string EhConnectionString = "{Event Hubs connection string}";
- private const string EhEntityPath = "{Event Hub path/name}";
-
- public static void Main(string[] args)
- {
- MainAsync(args).GetAwaiter().GetResult();
- }
-
- private static async Task MainAsync(string[] args)
- {
- // Creates an EventHubsConnectionStringBuilder object from a the connection string, and sets the EntityPath.
- // Typically the connection string should have the Entity Path in it, but for the sake of this simple scenario
- // we are using the connection string from the namespace.
- var connectionStringBuilder = new EventHubsConnectionStringBuilder(EhConnectionString)
- {
- EntityPath = EhEntityPath
- };
-
- eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
-
- await SendMessagesToEventHub(100);
-
- await eventHubClient.CloseAsync();
-
- Console.WriteLine("Press any key to exit.");
- Console.ReadLine();
- }
-
- // Creates an Event Hub client and sends 100 messages to the event hub.
- private static async Task SendMessagesToEventHub(int numMessagesToSend)
- {
- for (var i = 0; i < numMessagesToSend; i++)
- {
- try
- {
- var message = $"Message {i}";
- Console.WriteLine($"Sending message: {message}");
- await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
- }
- catch (Exception exception)
- {
- Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
- }
-
- await Task.Delay(10);
- }
-
- Console.WriteLine($"{numMessagesToSend} messages sent.");
- }
- }
- }
- ```
-
-6. Run the program, and ensure that there are no errors thrown.
-
-Congratulations! You have now sent messages to an Event Hub.
diff --git a/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs b/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs
index 769d04f..f1c54cc 100644
--- a/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs
+++ b/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs
@@ -10,7 +10,6 @@ namespace Microsoft.Azure.EventHubs.Processor
class AzureBlobLease : Lease
{
// ctor needed for deserialization
-
internal AzureBlobLease()
{
}
@@ -36,14 +35,9 @@ internal AzureBlobLease(AzureBlobLease source, CloudBlockBlob blob) : base(sourc
}
// do not serialize
-
[JsonIgnore]
public CloudBlockBlob Blob { get; }
- public string Offset { get; set; }
-
- public long SequenceNumber { get; set; }
-
public override async Task IsExpired()
{
await this.Blob.FetchAttributesAsync().ConfigureAwait(false); // Get the latest metadata
diff --git a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs
index 6352ac9..243cf39 100644
--- a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs
+++ b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs
@@ -28,7 +28,13 @@ sealed class AzureStorageCheckpointLeaseManager : ICheckpointManager, ILeaseMana
static readonly TimeSpan storageMaximumExecutionTime = TimeSpan.FromMinutes(2);
static readonly TimeSpan leaseDuration = TimeSpan.FromSeconds(30);
static readonly TimeSpan leaseRenewInterval = TimeSpan.FromSeconds(10);
- readonly BlobRequestOptions renewRequestOptions = new BlobRequestOptions();
+
+ // Lease renew calls shouldn't wait more than leaseRenewInterval
+ readonly BlobRequestOptions renewRequestOptions = new BlobRequestOptions()
+ {
+ ServerTimeout = leaseRenewInterval,
+ MaximumExecutionTime = TimeSpan.FromMinutes(1)
+ };
internal AzureStorageCheckpointLeaseManager(string storageConnectionString, string leaseContainerName, string storageBlobPrefix)
{
@@ -97,6 +103,12 @@ public async Task GetCheckpointAsync(string partitionId)
return checkpoint;
}
+ [Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)]
+ public Task UpdateCheckpointAsync(Checkpoint checkpoint)
+ {
+ throw new NotImplementedException();
+ }
+
public async Task CreateCheckpointIfNotExistsAsync(string partitionId)
{
// Normally the lease will already be created, checkpoint store is initialized after lease store.
@@ -106,13 +118,12 @@ public async Task CreateCheckpointIfNotExistsAsync(string partitionI
return checkpoint;
}
- public async Task UpdateCheckpointAsync(Checkpoint checkpoint)
+ public async Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint)
{
- // Need to fetch the most current lease data so that we can update it correctly.
- AzureBlobLease lease = (AzureBlobLease)await GetLeaseAsync(checkpoint.PartitionId).ConfigureAwait(false);
- lease.Offset = checkpoint.Offset;
- lease.SequenceNumber = checkpoint.SequenceNumber;
- await UpdateLeaseAsync(lease).ConfigureAwait(false);
+ AzureBlobLease newLease = new AzureBlobLease((AzureBlobLease)lease);
+ newLease.Offset = checkpoint.Offset;
+ newLease.SequenceNumber = checkpoint.SequenceNumber;
+ await this.UpdateLeaseAsync(newLease).ConfigureAwait(false);
}
public Task DeleteCheckpointAsync(string partitionId)
@@ -294,6 +305,18 @@ async Task AcquireLeaseCoreAsync(AzureBlobLease lease)
await leaseBlob.FetchAttributesAsync().ConfigureAwait(false);
if (leaseBlob.Properties.LeaseState == LeaseState.Leased)
{
+ if (string.IsNullOrEmpty(lease.Token))
+ {
+ // We reach here in a race condition: when this instance of EventProcessorHost scanned the
+ // lease blobs, this partition was unowned (token is empty) but between then and now, another
+ // instance of EPH has established a lease (getLeaseState() is LEASED). We normally enforce
+ // that we only steal the lease if it is still owned by the instance which owned it when we
+ // scanned, but we can't do that when we don't know who owns it. The safest thing to do is just
+ // fail the acquisition. If that means that one EPH instance gets more partitions than it should,
+ // rebalancing will take care of that quickly enough.
+ return false;
+ }
+
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.Id, lease.PartitionId, "Need to ChangeLease");
newToken = await leaseBlob.ChangeLeaseAsync(newLeaseId, AccessCondition.GenerateLeaseCondition(lease.Token)).ConfigureAwait(false);
}
@@ -331,8 +354,8 @@ public Task RenewLeaseAsync(Lease lease)
async Task RenewLeaseCoreAsync(AzureBlobLease lease)
{
CloudBlockBlob leaseBlob = lease.Blob;
- bool retval = true;
string partitionId = lease.PartitionId;
+
try
{
await leaseBlob.RenewLeaseAsync(AccessCondition.GenerateLeaseCondition(lease.Token), this.renewRequestOptions, null).ConfigureAwait(false);
@@ -341,15 +364,13 @@ async Task RenewLeaseCoreAsync(AzureBlobLease lease)
{
if (WasLeaseLost(partitionId, se))
{
- retval = false;
- }
- else
- {
- throw;
+ throw new LeaseLostException(partitionId, se);
}
+
+ throw;
}
- return retval;
+ return true;
}
public Task ReleaseLeaseAsync(Lease lease)
@@ -362,8 +383,8 @@ async Task ReleaseLeaseCoreAsync(AzureBlobLease lease)
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.Id, lease.PartitionId, "Releasing lease");
CloudBlockBlob leaseBlob = lease.Blob;
- bool retval = true;
string partitionId = lease.PartitionId;
+
try
{
string leaseId = lease.Token;
@@ -379,15 +400,13 @@ async Task ReleaseLeaseCoreAsync(AzureBlobLease lease)
{
if (WasLeaseLost(partitionId, se))
{
- retval = false;
- }
- else
- {
- throw;
+ throw new LeaseLostException(partitionId, se);
}
+
+ throw;
}
- return retval;
+ return true;
}
public Task UpdateLeaseAsync(Lease lease)
@@ -407,16 +426,13 @@ async Task UpdateLeaseCoreAsync(AzureBlobLease lease)
string token = lease.Token;
if (string.IsNullOrEmpty(token))
- {
- return false;
- }
-
- // First, renew the lease to make sure the update will go through.
- if (!await this.RenewLeaseAsync(lease).ConfigureAwait(false))
{
return false;
}
+ // First, renew the lease to make sure the update will go through.
+ await this.RenewLeaseAsync(lease).ConfigureAwait(false);
+
CloudBlockBlob leaseBlob = lease.Blob;
try
{
@@ -428,7 +444,7 @@ async Task UpdateLeaseCoreAsync(AzureBlobLease lease)
{
if (WasLeaseLost(partitionId, se))
{
- throw new LeaseLostException(lease, se);
+ throw new LeaseLostException(partitionId, se);
}
throw;
@@ -469,6 +485,7 @@ bool WasLeaseLost(string partitionId, StorageException se)
}
}
}
+
return retval;
}
diff --git a/src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs b/src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs
index 96524f9..67451e5 100644
--- a/src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs
+++ b/src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs
@@ -33,21 +33,9 @@ protected override async Task OnOpenAsync()
catch (Exception e)
{
lastException = e;
- if (e is ReceiverDisconnectedException)
- {
- // TODO Assuming this is due to a receiver with a higher epoch.
- // Is there a way to be sure without checking the exception text?
- ProcessorEventSource.Log.PartitionPumpWarning(
- this.Host.Id, this.PartitionContext.PartitionId, "Receiver disconnected on create, bad epoch?", e.ToString());
- // If it's a bad epoch, then retrying isn't going to help.
- break;
- }
- else
- {
- ProcessorEventSource.Log.PartitionPumpWarning(
- this.Host.Id, this.PartitionContext.PartitionId, "Failure creating client or receiver, retrying", e.ToString());
- retryCount++;
- }
+ ProcessorEventSource.Log.PartitionPumpWarning(
+ this.Host.Id, this.PartitionContext.PartitionId, "Failure creating client or receiver, retrying", e.ToString());
+ retryCount++;
}
}
while (!openedOK && (retryCount < 5));
@@ -156,6 +144,8 @@ public Task ProcessEventsAsync(IEnumerable events)
public async Task ProcessErrorAsync(Exception error)
{
+ bool faultPump;
+
if (error == null)
{
error = new Exception("No error info supplied by EventHub client");
@@ -163,19 +153,38 @@ public async Task ProcessErrorAsync(Exception error)
if (error is ReceiverDisconnectedException)
{
+ // Trace as warning since ReceiverDisconnectedException is part of lease stealing logic.
ProcessorEventSource.Log.PartitionPumpWarning(
this.eventHubPartitionPump.Host.Id, this.eventHubPartitionPump.PartitionContext.PartitionId,
"EventHub client disconnected, probably another host took the partition");
+
+
+ // Shutdown the message pump when receiver is disconnected.
+ faultPump = true;
}
else
{
ProcessorEventSource.Log.PartitionPumpError(
this.eventHubPartitionPump.Host.Id, this.eventHubPartitionPump.PartitionContext.PartitionId, "EventHub client error:", error.ToString());
- await this.eventHubPartitionPump.ProcessErrorAsync(error).ConfigureAwait(false);
+
+ // No need to fault the pump, we expect receiver to recover on its own.
+ faultPump = false;
}
- this.eventHubPartitionPump.PumpStatus = PartitionPumpStatus.Errored;
+ try
+ {
+ // We would like to deliver all errors in the pump to error handler.
+ await this.eventHubPartitionPump.ProcessErrorAsync(error).ConfigureAwait(false);
+ }
+ finally
+ {
+ // Fault pump only when needed.
+ if (faultPump)
+ {
+ this.eventHubPartitionPump.PumpStatus = PartitionPumpStatus.Errored;
+ }
+ }
}
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs
index 6ec6da2..e48e209 100644
--- a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs
+++ b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs
@@ -38,6 +38,9 @@ public interface ICheckpointManager
/// Checkpoint info for the given partition, or null if none has been previously stored.
Task GetCheckpointAsync(string partitionId);
+ [System.Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)]
+ Task UpdateCheckpointAsync(Checkpoint checkpoint);
+
///
/// Create the checkpoint for the given partition if it doesn't exist. Do nothing if it does exist.
/// The offset/sequenceNumber for a freshly-created checkpoint should be set to StartOfStream/0.
@@ -49,8 +52,9 @@ public interface ICheckpointManager
///
/// Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint.
///
+ /// Partition information against which to perform a checkpoint.
/// offset/sequeceNumber to update the store with.
- Task UpdateCheckpointAsync(Checkpoint checkpoint);
+ Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint);
///
/// Delete the stored checkpoint for the given partition. If there is no stored checkpoint for the
diff --git a/src/Microsoft.Azure.EventHubs.Processor/Lease.cs b/src/Microsoft.Azure.EventHubs.Processor/Lease.cs
index 23905f4..877dd63 100644
--- a/src/Microsoft.Azure.EventHubs.Processor/Lease.cs
+++ b/src/Microsoft.Azure.EventHubs.Processor/Lease.cs
@@ -26,6 +26,10 @@ protected Lease(Lease source)
this.Token = source.Token;
}
+ public string Offset { get; set; }
+
+ public long SequenceNumber { get; set; }
+
public string PartitionId { get; set; }
public string Owner { get; set; }
@@ -36,7 +40,8 @@ protected Lease(Lease source)
public virtual Task IsExpired()
{
- // this function is meaningless in the base class
+ // By default lease never expires.
+ // Deriving class will implement the lease expiry logic.
return Task.FromResult(false);
}
diff --git a/src/Microsoft.Azure.EventHubs.Processor/LeaseLostException.cs b/src/Microsoft.Azure.EventHubs.Processor/LeaseLostException.cs
index 1e86902..b260795 100644
--- a/src/Microsoft.Azure.EventHubs.Processor/LeaseLostException.cs
+++ b/src/Microsoft.Azure.EventHubs.Processor/LeaseLostException.cs
@@ -7,23 +7,22 @@ namespace Microsoft.Azure.EventHubs.Processor
public class LeaseLostException : Exception
{
- readonly Lease lease;
+ readonly string partitionId;
- internal LeaseLostException(Lease lease, Exception innerException)
+ internal LeaseLostException(string partitionId, Exception innerException)
: base(string.Empty, innerException)
{
- if (lease == null)
+ if (partitionId == null)
{
- throw new ArgumentNullException(nameof(lease));
+ throw new ArgumentNullException(nameof(partitionId));
}
- this.lease = lease;
+ this.partitionId = partitionId;
}
- // We don't want to expose Lease to the public.
public string PartitionId
{
- get { return this.lease.PartitionId; }
+ get { return this.partitionId; }
}
}
}
\ No newline at end of file
diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs
index 27222d2..5515403 100644
--- a/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs
+++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs
@@ -44,15 +44,6 @@ public string Owner
object ThisLock { get; }
- ///
- /// Updates the offset/sequenceNumber in the PartitionContext with the values in the received EventData object.
- ///
- /// Since offset is a string it cannot be compared easily, but sequenceNumber is checked. The new sequenceNumber must be
- /// at least the same as the current value or the entire assignment is aborted. It is assumed that if the new sequenceNumber
- /// is equal or greater, the new offset will be as well.
- ///
- /// A received EventData with valid offset and sequenceNumber
- /// If the sequenceNumber in the provided event is less than the current value
internal void SetOffsetAndSequenceNumber(EventData eventData)
{
if (eventData == null)
@@ -60,35 +51,12 @@ internal void SetOffsetAndSequenceNumber(EventData eventData)
throw new ArgumentNullException(nameof(eventData));
}
- this.SetOffsetAndSequenceNumber(eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber);
- }
-
- ///
- /// Updates the offset/sequenceNumber in the PartitionContext.
- ///
- /// These two values are closely tied and must be updated in an atomic fashion, hence the combined setter.
- /// Since offset is a string it cannot be compared easily, but sequenceNumber is checked. The new sequenceNumber must be
- /// at least the same as the current value or the entire assignment is aborted. It is assumed that if the new sequenceNumber
- /// is equal or greater, the new offset will be as well.
- ///
- /// New offset value
- /// New sequenceNumber value
- /// If the sequenceNumber in the provided event is less than the current value
- void SetOffsetAndSequenceNumber(string offset, long sequenceNumber)
- {
- lock(this.ThisLock)
+ lock (this.ThisLock)
{
- if (sequenceNumber >= this.SequenceNumber)
- {
- this.Offset = offset;
- this.SequenceNumber = sequenceNumber;
- }
- else
- {
- throw new ArgumentOutOfRangeException("offset/sequenceNumber", $"New offset {offset}/{sequenceNumber} is less than previous {this.Offset}/{this.SequenceNumber}");
- }
+ this.Offset = eventData.SystemProperties.Offset;
+ this.SequenceNumber = eventData.SystemProperties.SequenceNumber;
}
- }
+ }
internal async Task
/// A received EventData with valid offset and sequenceNumber
+ /// If suplied eventData is null
/// If the sequenceNumber is less than the last checkpointed value
public Task CheckpointAsync(EventData eventData)
{
- this.SetOffsetAndSequenceNumber(eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber);
+ if (eventData == null)
+ {
+ throw new ArgumentNullException("eventData");
+ }
+
+ // We have never seen this sequence number yet
+ if (eventData.SystemProperties.SequenceNumber > this.SequenceNumber)
+ {
+ throw new ArgumentOutOfRangeException("eventData.SystemProperties.SequenceNumber");
+ }
+
return this.PersistCheckpointAsync(new Checkpoint(this.PartitionId, eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber));
}
@@ -165,7 +143,7 @@ public override string ToString()
return $"PartitionContext({this.EventHubPath}/{this.ConsumerGroupName}/{this.PartitionId}/{this.SequenceNumber})";
}
- async Task PersistCheckpointAsync(Checkpoint checkpoint) // throws ArgumentOutOfRangeException, InterruptedException, ExecutionException
+ async Task PersistCheckpointAsync(Checkpoint checkpoint)
{
ProcessorEventSource.Log.PartitionPumpCheckpointStart(this.host.Id, checkpoint.PartitionId, checkpoint.Offset, checkpoint.SequenceNumber);
try
@@ -175,12 +153,14 @@ async Task PersistCheckpointAsync(Checkpoint checkpoint) // throws ArgumentOutOf
{
if (inStoreCheckpoint == null)
{
- inStoreCheckpoint = await this.host.CheckpointManager.CreateCheckpointIfNotExistsAsync(checkpoint.PartitionId).ConfigureAwait(false);
+ await this.host.CheckpointManager.CreateCheckpointIfNotExistsAsync(checkpoint.PartitionId).ConfigureAwait(false);
}
- inStoreCheckpoint.Offset = checkpoint.Offset;
- inStoreCheckpoint.SequenceNumber = checkpoint.SequenceNumber;
- await this.host.CheckpointManager.UpdateCheckpointAsync(inStoreCheckpoint).ConfigureAwait(false);
+ await this.host.CheckpointManager.UpdateCheckpointAsync(this.Lease, checkpoint).ConfigureAwait(false);
+
+ // Update internal lease if update call above is successful.
+ this.Lease.Offset = checkpoint.Offset;
+ this.Lease.SequenceNumber = checkpoint.SequenceNumber;
}
else
{
diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs
index 8edada2..9bb3e17 100644
--- a/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs
+++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs
@@ -192,19 +192,19 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception
// Renew any leases that currently belong to us.
IEnumerable> gettingAllLeases = leaseManager.GetAllLeases();
List leasesOwnedByOthers = new List();
- int ourLeasesCount = 0;
- foreach (Task getLeastTask in gettingAllLeases)
+ int ourLeaseCount = 0;
+ foreach (Task getLeaseTask in gettingAllLeases)
{
try
{
- Lease possibleLease = await getLeastTask.ConfigureAwait(false);
+ Lease possibleLease = await getLeaseTask.ConfigureAwait(false);
allLeases[possibleLease.PartitionId] = possibleLease;
if (await possibleLease.IsExpired().ConfigureAwait(false))
{
ProcessorEventSource.Log.PartitionPumpInfo(this.host.Id, possibleLease.PartitionId, "Trying to acquire lease.");
if (await leaseManager.AcquireLeaseAsync(possibleLease).ConfigureAwait(false))
{
- ourLeasesCount++;
+ ourLeaseCount++;
}
else
{
@@ -215,11 +215,15 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception
else if (possibleLease.Owner == this.host.HostName)
{
ProcessorEventSource.Log.PartitionPumpInfo(this.host.Id, possibleLease.PartitionId, "Trying to renew lease.");
- if (await leaseManager.RenewLeaseAsync(possibleLease).ConfigureAwait(false))
+
+ // Try to renew the lease. If successful then this lease belongs to us,
+ // if throws LeaseLostException then we don't own it anymore.
+ try
{
- ourLeasesCount++;
+ await leaseManager.RenewLeaseAsync(possibleLease).ConfigureAwait(false);
+ ourLeaseCount++;
}
- else
+ catch (LeaseLostException)
{
// Probably failed because another host stole it between get and renew
leasesOwnedByOthers.Add(possibleLease);
@@ -240,31 +244,30 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception
// Grab more leases if available and needed for load balancing
if (leasesOwnedByOthers.Count > 0)
{
- IEnumerable stealTheseLeases = WhichLeasesToSteal(leasesOwnedByOthers, ourLeasesCount);
- if (stealTheseLeases != null)
+ Lease stealThisLease = WhichLeaseToSteal(leasesOwnedByOthers, ourLeaseCount);
+ if (stealThisLease != null)
{
- foreach (Lease stealee in stealTheseLeases)
+ try
{
- try
+ ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.Id, stealThisLease.PartitionId);
+ if (await leaseManager.AcquireLeaseAsync(stealThisLease).ConfigureAwait(false))
{
- ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.Id, stealee.PartitionId);
- if (await leaseManager.AcquireLeaseAsync(stealee).ConfigureAwait(false))
- {
- // Succeeded in stealing lease
- ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.Id, stealee.PartitionId);
- ourLeasesCount++;
- }
- else
- {
- ProcessorEventSource.Log.EventProcessorHostWarning(this.host.Id, "Failed to steal lease for partition " + stealee.PartitionId, null);
- }
+ // Succeeded in stealing lease
+ ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.Id, stealThisLease.PartitionId);
}
- catch (Exception e)
+ else
{
- ProcessorEventSource.Log.EventProcessorHostError(this.host.Id, "Exception during stealing lease for partition " + stealee.PartitionId, e.ToString());
- this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, stealee.PartitionId, e, EventProcessorHostActionStrings.StealingLease);
+ ProcessorEventSource.Log.EventProcessorHostWarning(this.host.Id,
+ "Failed to steal lease for partition " + stealThisLease.PartitionId, null);
}
}
+ catch (Exception e)
+ {
+ ProcessorEventSource.Log.EventProcessorHostError(this.host.Id,
+ "Exception during stealing lease for partition " + stealThisLease.PartitionId, e.ToString());
+ this.host.EventProcessorOptions.NotifyOfException(this.host.HostName,
+ stealThisLease.PartitionId, e, EventProcessorHostActionStrings.StealingLease);
+ }
}
}
@@ -366,12 +369,11 @@ Task RemoveAllPumpsAsync(CloseReason reason)
return Task.WhenAll(tasks);
}
- IEnumerable WhichLeasesToSteal(List stealableLeases, int haveLeaseCount)
+ Lease WhichLeaseToSteal(List stealableLeases, int haveLeaseCount)
{
IDictionary countsByOwner = CountLeasesByOwner(stealableLeases);
- string biggestOwner = FindBiggestOwner(countsByOwner);
- int biggestCount = countsByOwner[biggestOwner];
- List stealTheseLeases = null;
+ var biggestOwner = countsByOwner.OrderByDescending(o => o.Value).First();
+ Lease stealThisLease = null;
// If the number of leases is a multiple of the number of hosts, then the desired configuration is
// that all hosts own the name number of leases, and the difference between the "biggest" owner and
@@ -384,7 +386,7 @@ IEnumerable WhichLeasesToSteal(List stealableLeases, int haveLease
//
// In either case, if the difference between this host and the biggest owner is 2 or more, then the
// system is not in the most evenly-distributed configuration, so steal one lease from the biggest.
- // If there is a tie for biggest, findBiggestOwner() picks whichever appears first in the list because
+ // If there is a tie for biggest, we pick whichever appears first in the list because
// it doesn't really matter which "biggest" is trimmed down.
//
// Stealing one at a time prevents flapping because it reduces the difference between the biggest and
@@ -392,61 +394,31 @@ IEnumerable WhichLeasesToSteal(List stealableLeases, int haveLease
// end up below 0. This host may become tied for biggest, but it cannot become larger than the host that
// it is stealing from.
- if ((biggestCount - haveLeaseCount) >= 2)
+ if ((biggestOwner.Value - haveLeaseCount) >= 2)
{
- stealTheseLeases = new List();
- foreach (Lease l in stealableLeases)
- {
- if (l.Owner == biggestOwner)
- {
- stealTheseLeases.Add(l);
- ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Proposed to steal lease for partition {l.PartitionId} from {biggestOwner}");
- break;
- }
- }
+ stealThisLease = stealableLeases.Where(l => l.Owner == biggestOwner.Key).First();
+ ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Proposed to steal lease for partition {stealThisLease.PartitionId} from {biggestOwner.Key}");
}
- return stealTheseLeases;
+ return stealThisLease;
}
- string FindBiggestOwner(IDictionary countsByOwner)
+ Dictionary CountLeasesByOwner(IEnumerable leases)
{
- int biggestCount = 0;
- string biggestOwner = null;
- foreach (string owner in countsByOwner.Keys)
- {
- if (countsByOwner[owner] > biggestCount)
- {
- biggestCount = countsByOwner[owner];
- biggestOwner = owner;
- }
- }
- return biggestOwner;
- }
+ var counts = leases.GroupBy(lease => lease.Owner).Select(group => new {
+ Owner = group.Key,
+ Count = group.Count()
+ });
- IDictionary CountLeasesByOwner(IEnumerable leases)
- {
- IDictionary counts = new Dictionary();
- foreach (Lease l in leases)
+ // Log ownership mapping.
+ foreach (var owner in counts)
{
- if (counts.ContainsKey(l.Owner))
- {
- int oldCount = counts[l.Owner];
- counts[l.Owner] = oldCount + 1;
- }
- else
- {
- counts[l.Owner] = 1;
- }
+ ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Host {owner.Owner} owns {owner.Count} leases");
}
- foreach (string owner in counts.Keys)
- {
- ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Host {owner} owns {counts[owner]} leases");
- }
+ ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Total hosts in list: {counts.Count()}");
- ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Total hosts in list: {counts.Count}");
- return counts;
+ return counts.ToDictionary(e => e.Owner, e => e.Count);
}
}
}
\ No newline at end of file
diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs
index d8d7140..94b6daa 100644
--- a/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs
+++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs
@@ -112,7 +112,12 @@ public async Task CloseAsync(CloseReason reason)
if (reason != CloseReason.LeaseLost)
{
// Since this pump is dead, release the lease.
- await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false);
+ // Ignore LeaseLostException
+ try
+ {
+ await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false);
+ }
+ catch (LeaseLostException) { }
}
this.PumpStatus = PartitionPumpStatus.Closed;
@@ -123,10 +128,17 @@ public async Task CloseAsync(CloseReason reason)
protected async Task ProcessEventsAsync(IEnumerable events)
{
- // Assumes that .NET Core client will call with null on receive timeout.
- if (events == null && !this.Host.EventProcessorOptions.InvokeProcessorAfterReceiveTimeout)
+ if (events == null)
{
- return;
+ if (this.Host.EventProcessorOptions.InvokeProcessorAfterReceiveTimeout)
+ {
+ // Assumes that .NET Core client will call with empty EventData on receive timeout.
+ events = Enumerable.Empty();
+ }
+ else
+ {
+ return;
+ }
}
// Synchronize to serialize calls to the processor.
@@ -135,15 +147,18 @@ protected async Task ProcessEventsAsync(IEnumerable events)
// protected by synchronizing too.
using (await this.ProcessingAsyncLock.LockAsync().ConfigureAwait(false))
{
- int eventCount = events?.Count() ?? 0;
- ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStart(this.Host.Id, this.PartitionContext.PartitionId, eventCount);
+ ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStart(this.Host.Id,
+ this.PartitionContext.PartitionId, events?.Count() ?? 0);
try
{
- if (eventCount > 0)
+ EventData last = events?.LastOrDefault();
+ if (last != null)
{
- var lastMessage = events.Last();
- this.PartitionContext.SequenceNumber = lastMessage.SystemProperties.SequenceNumber;
- this.PartitionContext.Offset = lastMessage.SystemProperties.Offset;
+ ProcessorEventSource.Log.PartitionPumpInfo(
+ this.Host.Id,
+ this.PartitionContext.PartitionId,
+ "Updating offset in partition context with end of batch " + last.SystemProperties.Offset + "/" + last.SystemProperties.SequenceNumber);
+ this.PartitionContext.SetOffsetAndSequenceNumber(last);
}
await this.Processor.ProcessEventsAsync(this.PartitionContext, events).ConfigureAwait(false);
@@ -157,16 +172,6 @@ protected async Task ProcessEventsAsync(IEnumerable events)
{
ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStop(this.Host.Id, this.PartitionContext.PartitionId);
}
-
- EventData last = events?.LastOrDefault();
- if (last != null)
- {
- ProcessorEventSource.Log.PartitionPumpInfo(
- this.Host.Id,
- this.PartitionContext.PartitionId,
- "Updating offset in partition context with end of batch " + last.SystemProperties.Offset + "/" + last.SystemProperties.SequenceNumber);
- this.PartitionContext.SetOffsetAndSequenceNumber(last);
- }
}
}
diff --git a/src/Microsoft.Azure.EventHubs.Processor/project.json b/src/Microsoft.Azure.EventHubs.Processor/project.json
index 88e254b..46cedf4 100644
--- a/src/Microsoft.Azure.EventHubs.Processor/project.json
+++ b/src/Microsoft.Azure.EventHubs.Processor/project.json
@@ -1,5 +1,5 @@
{
- "version": "1.0.0",
+ "version": "1.0.1",
"title": "Microsoft.Azure.EventHubs.Processor",
"description": "Microsoft.Azure.EventHubs.Processor Class Library",
"authors": [ "Microsoft" ],
@@ -23,7 +23,7 @@
"target": "project"
},
"Newtonsoft.Json": "9.0.1",
- "WindowsAzure.Storage": "8.0.1"
+ "WindowsAzure.Storage": "8.1.1"
},
"frameworks": {
diff --git a/src/Microsoft.Azure.EventHubs/Amqp/AmqpEventDataSender.cs b/src/Microsoft.Azure.EventHubs/Amqp/AmqpEventDataSender.cs
index 9b2b184..1e1662c 100644
--- a/src/Microsoft.Azure.EventHubs/Amqp/AmqpEventDataSender.cs
+++ b/src/Microsoft.Azure.EventHubs/Amqp/AmqpEventDataSender.cs
@@ -109,15 +109,14 @@ ArraySegment GetNextDeliveryTag()
async Task CreateLinkAsync(TimeSpan timeout)
{
var amqpEventHubClient = ((AmqpEventHubClient)this.EventHubClient);
- var csb = amqpEventHubClient.ConnectionStringBuilder;
- var timeoutHelper = new TimeoutHelper(csb.OperationTimeout);
+ var timeoutHelper = new TimeoutHelper(timeout);
AmqpConnection connection = await amqpEventHubClient.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
// Authenticate over CBS
var cbsLink = connection.Extensions.Find();
ICbsTokenProvider cbsTokenProvider = amqpEventHubClient.CbsTokenProvider;
- Uri address = new Uri(csb.Endpoint, this.Path);
+ Uri address = new Uri(amqpEventHubClient.ConnectionStringBuilder.Endpoint, this.Path);
string audience = address.AbsoluteUri;
string resource = address.AbsoluteUri;
var expiresAt = await cbsLink.SendTokenAsync(cbsTokenProvider, address, audience, resource, new[] { ClaimConstants.Send }, timeoutHelper.RemainingTime()).ConfigureAwait(false);
diff --git a/src/Microsoft.Azure.EventHubs/Amqp/AmqpEventHubClient.cs b/src/Microsoft.Azure.EventHubs/Amqp/AmqpEventHubClient.cs
index 7d9e68a..1ba0428 100644
--- a/src/Microsoft.Azure.EventHubs/Amqp/AmqpEventHubClient.cs
+++ b/src/Microsoft.Azure.EventHubs/Amqp/AmqpEventHubClient.cs
@@ -59,61 +59,6 @@ protected override Task OnCloseAsync()
return this.ConnectionManager.CloseAsync();
}
- internal async Task OpenRequestResponseLinkAsync(
- string type, string address, MessagingEntityType? entityType, string[] requiredClaims, TimeSpan timeout)
- {
- var timeoutHelper = new TimeoutHelper(timeout, true);
- AmqpSession session = null;
- try
- {
- // Don't need to get token for namespace scope operations, included in request
- bool isNamespaceScope = address.Equals(AmqpClientConstants.ManagementAddress, StringComparison.OrdinalIgnoreCase);
-
- var connection = await this.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
-
- var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
- session = connection.CreateSession(sessionSettings);
-
- await session.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
-
- var linkSettings = new AmqpLinkSettings();
- linkSettings.AddProperty(AmqpClientConstants.TimeoutName, (uint)timeoutHelper.RemainingTime().TotalMilliseconds);
- if (entityType != null)
- {
- linkSettings.AddProperty(AmqpClientConstants.EntityTypeName, (int)entityType.Value);
- }
-
- // Create the link
- var link = new RequestResponseAmqpLink(type, session, address, linkSettings.Properties);
-
- var authorizationValidToUtc = DateTime.MaxValue;
-
- if (!isNamespaceScope)
- {
- // TODO: Get Entity level token here
- }
-
- await link.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
-
- // Redirected scenario requires entityPath as the audience, otherwise we
- // should always use the full EndpointUri as audience.
- return new ActiveClientRequestResponseLink(
- link,
- this.ConnectionStringBuilder.Endpoint.AbsoluteUri, // audience
- this.ConnectionStringBuilder.Endpoint.AbsoluteUri, // endpointUri
- requiredClaims,
- false,
- authorizationValidToUtc);
- }
- catch (Exception)
- {
- // Aborting the session will cleanup the link as well.
- session?.Abort();
-
- throw;
- }
- }
-
protected override async Task OnGetRuntimeInformationAsync()
{
var serviceClient = this.GetManagementServiceClient();
diff --git a/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs b/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs
index 90cd015..cc7ac47 100644
--- a/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs
+++ b/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs
@@ -110,6 +110,13 @@ protected override async Task> OnReceiveAsync(int maxMessageCou
}
else
{
+ // Handle System.TimeoutException explicitly.
+ // We don't really want to to throw TimeoutException on this call.
+ if (ex is TimeoutException)
+ {
+ break;
+ }
+
throw;
}
}
@@ -123,10 +130,10 @@ protected override void OnSetReceiveHandler(IPartitionReceiveHandler newReceiveH
{
lock (this.receivePumpLock)
{
- if (this.receiveHandler != null)
+ if (newReceiveHandler != null && this.receiveHandler != null)
{
// Notify existing handler first (but don't wait).
- this.receiveHandler.ProcessErrorAsync(new OperationCanceledException("New handler has registered for this receiver.")); // .Fork();
+ this.receiveHandler.ProcessErrorAsync(new OperationCanceledException("New handler has registered for this receiver."));
}
this.receiveHandler = newReceiveHandler;
@@ -156,15 +163,14 @@ protected override void OnSetReceiveHandler(IPartitionReceiveHandler newReceiveH
async Task CreateLinkAsync(TimeSpan timeout)
{
var amqpEventHubClient = ((AmqpEventHubClient)this.EventHubClient);
- var csb = this.EventHubClient.ConnectionStringBuilder;
- var timeoutHelper = new TimeoutHelper(csb.OperationTimeout);
+ var timeoutHelper = new TimeoutHelper(timeout);
AmqpConnection connection = await amqpEventHubClient.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
// Authenticate over CBS
var cbsLink = connection.Extensions.Find();
ICbsTokenProvider cbsTokenProvider = amqpEventHubClient.CbsTokenProvider;
- Uri address = new Uri(csb.Endpoint, this.Path);
+ Uri address = new Uri(amqpEventHubClient.ConnectionStringBuilder.Endpoint, this.Path);
string audience = address.AbsoluteUri;
string resource = address.AbsoluteUri;
var expiresAt = await cbsLink.SendTokenAsync(cbsTokenProvider, address, audience, resource, new[] { ClaimConstants.Listen }, timeoutHelper.RemainingTime()).ConfigureAwait(false);
@@ -268,42 +274,56 @@ static long TimeStampEncodingGetMilliseconds(DateTime value)
async Task ReceivePumpAsync(CancellationToken cancellationToken)
{
- // Loop until pump is shutdown or an error is hit.
- while (!cancellationToken.IsCancellationRequested)
+ try
{
- IEnumerable receivedEvents;
-
- try
+ // Loop until pump is shutdown or an error is hit.
+ while (!cancellationToken.IsCancellationRequested)
{
- int batchSize;
- lock (this.receivePumpLock)
+ IEnumerable receivedEvents;
+
+ try
{
- if (this.receiveHandler == null)
+ int batchSize;
+ lock (this.receivePumpLock)
{
- // Pump has been shutdown, nothing more to do.
- return;
+ if (this.receiveHandler == null)
+ {
+ // Pump has been shutdown, nothing more to do.
+ return;
+ }
+ batchSize = receiveHandler.MaxBatchSize;
}
- batchSize = receiveHandler.MaxBatchSize;
+
+ receivedEvents = await this.ReceiveAsync(batchSize);
}
+ catch (Exception e)
+ {
+ EventHubsEventSource.Log.ReceiveHandlerExitingWithError(this.ClientId, this.PartitionId, e.Message);
+ await this.ReceiveHandlerProcessErrorAsync(e).ConfigureAwait(false);
- receivedEvents = await this.ReceiveAsync(batchSize);
- }
- catch (Exception e)
- {
- await this.ReceiveHandlerProcessErrorAsync(e).ConfigureAwait(false);
- break;
- }
+ // Avoid tight loop if Receieve call keeps faling.
+ await Task.Delay(100);
- try
- {
- await this.ReceiveHandlerProcessEventsAsync(receivedEvents).ConfigureAwait(false);
- }
- catch (Exception userCodeError)
- {
- await this.ReceiveHandlerProcessErrorAsync(userCodeError).ConfigureAwait(false);
- break;
+ continue;
+ }
+
+ try
+ {
+ await this.ReceiveHandlerProcessEventsAsync(receivedEvents).ConfigureAwait(false);
+ }
+ catch (Exception userCodeError)
+ {
+ EventHubsEventSource.Log.ReceiveHandlerExitingWithError(this.ClientId, this.PartitionId, userCodeError.Message);
+ await this.ReceiveHandlerProcessErrorAsync(userCodeError).ConfigureAwait(false);
+ }
}
}
+ catch (Exception ex)
+ {
+ // This should never throw
+ EventHubsEventSource.Log.ReceiveHandlerExitingWithError(this.ClientId, this.PartitionId, ex.Message);
+ Environment.FailFast(ex.ToString());
+ }
this.ReceiveHandlerClose();
}
diff --git a/src/Microsoft.Azure.EventHubs/Amqp/AmqpServiceClient.cs b/src/Microsoft.Azure.EventHubs/Amqp/AmqpServiceClient.cs
index 337fa95..9ab52ab 100644
--- a/src/Microsoft.Azure.EventHubs/Amqp/AmqpServiceClient.cs
+++ b/src/Microsoft.Azure.EventHubs/Amqp/AmqpServiceClient.cs
@@ -20,7 +20,6 @@ class AmqpServiceClient : ClientEntity
readonly AmqpEventHubClient eventHubClient;
readonly FaultTolerantAmqpObject link;
- readonly ActiveClientLinkManager clientLinkManager;
SecurityToken token;
AsyncLock tokenLock = new AsyncLock();
@@ -31,7 +30,6 @@ public AmqpServiceClient(AmqpEventHubClient eventHubClient, string address)
this.eventHubClient = eventHubClient;
this.Address = address;
this.link = new FaultTolerantAmqpObject(t => this.OpenLinkAsync(t), rrlink => rrlink.CloseAsync(TimeSpan.FromSeconds(10)));
- this.clientLinkManager = new ActiveClientLinkManager(this.eventHubClient);
}
AmqpMessage CreateGetRuntimeInformationRequest()
@@ -172,10 +170,64 @@ internal void OnAbort()
async Task OpenLinkAsync(TimeSpan timeout)
{
- ActiveClientRequestResponseLink activeClientLink = await this.eventHubClient.OpenRequestResponseLinkAsync(
+ ActiveClientRequestResponseLink activeClientLink = await OpenRequestResponseLinkAsync(
"svc", this.Address, null, AmqpServiceClient.RequiredClaims, timeout);
- this.clientLinkManager.SetActiveLink(activeClientLink);
return activeClientLink.Link;
}
+
+ async Task OpenRequestResponseLinkAsync(
+ string type, string address, MessagingEntityType? entityType, string[] requiredClaims, TimeSpan timeout)
+ {
+ var timeoutHelper = new TimeoutHelper(timeout, true);
+ AmqpSession session = null;
+ try
+ {
+ // Don't need to get token for namespace scope operations, included in request
+ bool isNamespaceScope = address.Equals(AmqpClientConstants.ManagementAddress, StringComparison.OrdinalIgnoreCase);
+
+ var connection = await this.eventHubClient.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
+
+ var sessionSettings = new AmqpSessionSettings { Properties = new Fields() };
+ session = connection.CreateSession(sessionSettings);
+
+ await session.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
+
+ var linkSettings = new AmqpLinkSettings();
+ linkSettings.AddProperty(AmqpClientConstants.TimeoutName, (uint)timeoutHelper.RemainingTime().TotalMilliseconds);
+ if (entityType != null)
+ {
+ linkSettings.AddProperty(AmqpClientConstants.EntityTypeName, (int)entityType.Value);
+ }
+
+ // Create the link
+ var link = new RequestResponseAmqpLink(type, session, address, linkSettings.Properties);
+
+ var authorizationValidToUtc = DateTime.MaxValue;
+
+ if (!isNamespaceScope)
+ {
+ // TODO: Get Entity level token here
+ }
+
+ await link.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
+
+ // Redirected scenario requires entityPath as the audience, otherwise we
+ // should always use the full EndpointUri as audience.
+ return new ActiveClientRequestResponseLink(
+ link,
+ this.eventHubClient.ConnectionStringBuilder.Endpoint.AbsoluteUri, // audience
+ this.eventHubClient.ConnectionStringBuilder.Endpoint.AbsoluteUri, // endpointUri
+ requiredClaims,
+ false,
+ authorizationValidToUtc);
+ }
+ catch (Exception)
+ {
+ // Aborting the session will cleanup the link as well.
+ session?.Abort();
+
+ throw;
+ }
+ }
}
}
diff --git a/src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs b/src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs
index 0f351cd..e2e17c4 100644
--- a/src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs
+++ b/src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs
@@ -199,7 +199,16 @@ public void GetEventHubPartitionRuntimeInformationException(string clientId, str
WriteEvent(20, clientId, partitionId, error);
}
}
-
+
+ [Event(21, Level = EventLevel.Error, Message = "{0}: Receive handler exiting with exception on partition {1}: {2}.")]
+ public void ReceiveHandlerExitingWithError(string clientId, string partitionId, string error)
+ {
+ if (IsEnabled())
+ {
+ WriteEvent(21, clientId, partitionId, error);
+ }
+ }
+
// TODO: Add Keywords if desired.
//public class Keywords // This is a bitvector
//{
diff --git a/src/Microsoft.Azure.EventHubs/project.json b/src/Microsoft.Azure.EventHubs/project.json
index 8e6bfee..f6c6106 100644
--- a/src/Microsoft.Azure.EventHubs/project.json
+++ b/src/Microsoft.Azure.EventHubs/project.json
@@ -1,5 +1,5 @@
{
- "version": "1.0.0",
+ "version": "1.0.1",
"title": "Microsoft.Azure.EventHubs",
"description": "Microsoft.Azure.EventHubs Class Library",
"authors": [ "Microsoft" ],
diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs
index f14cfc0..9cb254c 100644
--- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs
+++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs
@@ -9,6 +9,7 @@ namespace Microsoft.Azure.EventHubs.Processor.UnitTests
using System.Diagnostics;
using System.Linq;
using System.Text;
+ using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
@@ -152,22 +153,31 @@ Task SingleProcessorHost()
[Fact]
async Task MultipleProcessorHosts()
{
- Log("Testing with 2 EventProcessorHost instances");
+ int hostCount = 3;
+ Log($"Testing with {hostCount} EventProcessorHost instances");
+
+ // Prepare partition trackers.
var partitionReceiveEvents = new ConcurrentDictionary();
foreach (var partitionId in PartitionIds)
{
partitionReceiveEvents[partitionId] = new AsyncAutoResetEvent(false);
}
- int hostCount = 2;
+ // Prepare host trackers.
+ var hostReceiveEvents = new ConcurrentDictionary();
+
var hosts = new List();
try
{
- for (int i = 0; i < hostCount; i++)
+ for (int hostId = 0; hostId < hostCount; hostId++)
{
+ var thisHostName = $"host-{hostId}";
+ hostReceiveEvents[thisHostName] = new AsyncAutoResetEvent(false);
+
Log("Creating EventProcessorHost");
var eventProcessorHost = new EventProcessorHost(
+ thisHostName,
string.Empty, // Passing empty as entity path here rsince path is already in EH connection string.
PartitionReceiver.DefaultConsumerGroupName,
this.EventHubConnectionString,
@@ -194,11 +204,11 @@ async Task MultipleProcessorHosts()
processor.OnProcessEvents += (_, eventsArgs) =>
{
int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0;
- Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)");
if (eventCount > 0)
{
- var receivedEvent = partitionReceiveEvents[partitionId];
- receivedEvent.Set();
+ Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)");
+ partitionReceiveEvents[partitionId].Set();
+ hostReceiveEvents[hostName].Set();
}
};
};
@@ -206,8 +216,10 @@ async Task MultipleProcessorHosts()
await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, processorOptions);
}
+ // Allow some time for each host to own at least 1 partition.
+ // Partition stealing logic balances partition ownership one at a time.
Log("Waiting for partition ownership to settle...");
- await Task.Delay(TimeSpan.FromSeconds(30));
+ await Task.Delay(TimeSpan.FromSeconds(60));
Log("Sending an event to each partition");
var sendTasks = new List();
@@ -218,11 +230,17 @@ async Task MultipleProcessorHosts()
await Task.WhenAll(sendTasks);
Log("Verifying an event was received by each partition");
- foreach (var partitionId in PartitionIds)
+ foreach (var e in partitionReceiveEvents)
+ {
+ bool ret = await e.Value.WaitAsync(TimeSpan.FromSeconds(30));
+ Assert.True(ret, $"Partition {e.Key} didn't receive any message!");
+ }
+
+ Log("Verifying at least an event was received by each host");
+ foreach (var e in hostReceiveEvents)
{
- var receivedEvent = partitionReceiveEvents[partitionId];
- bool partitionReceivedMessage = await receivedEvent.WaitAsync(TimeSpan.FromSeconds(30));
- Assert.True(partitionReceivedMessage, $"Partition {partitionId} didn't receive any message!");
+ bool ret = await e.Value.WaitAsync(TimeSpan.FromSeconds(30));
+ Assert.True(ret, $"Host {e.Key} didn't receive any message!");
}
}
finally
@@ -254,7 +272,7 @@ async Task WithBlobPrefix()
this.StorageConnectionString,
leaseContainerName,
"firsthost");
- var setOfMessages1 = await RunGenericScenario(eventProcessorHostFirst);
+ var runResult1 = await RunGenericScenario(eventProcessorHostFirst);
// Consume all messages with second host.
// Create host with 'secondhost' prefix.
@@ -268,12 +286,12 @@ async Task WithBlobPrefix()
this.StorageConnectionString,
leaseContainerName,
"secondhost");
- var setOfMessages2 = await RunGenericScenario(eventProcessorHostSecond, totalNumberOfEventsToSend: 0);
+ var runResult2 = await RunGenericScenario(eventProcessorHostSecond, totalNumberOfEventsToSend: 0);
// Confirm that we are looking at 2 identical sets of messages in the end.
- foreach (var kvp in setOfMessages1)
+ foreach (var kvp in runResult1.ReceivedEvents)
{
- Assert.True(kvp.Value.Count() == setOfMessages2[kvp.Key].Count,
+ Assert.True(kvp.Value.Count() == runResult2.ReceivedEvents[kvp.Key].Count,
$"The sets of messages returned from first host and the second host are different for partition {kvp.Key}.");
}
}
@@ -530,10 +548,10 @@ async Task InitialOffsetProviderWithDateTime()
MaxBatchSize = 100
};
- var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions);
+ var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions);
// We should have received only 1 event from each partition.
- Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
+ Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
}
[Fact]
@@ -562,10 +580,10 @@ async Task InitialOffsetProviderWithOffset()
MaxBatchSize = 100
};
- var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions);
+ var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions);
// We should have received only 1 event from each partition.
- Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
+ Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
}
[Fact]
@@ -586,10 +604,10 @@ async Task InitialOffsetProviderWithEndOfStream()
MaxBatchSize = 100
};
- var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions);
+ var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions);
// We should have received only 1 event from each partition.
- Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
+ Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
}
[Fact]
@@ -623,10 +641,11 @@ async Task InitialOffsetProviderOverrideBehavior()
InitialOffsetProvider = partitionId => PartitionReceiver.StartOfStream,
MaxBatchSize = 100
};
- var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions, checkPointLastEvent: false);
+
+ var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions, checkpointLastEvent: false);
// We should have received only 1 event from each partition.
- Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
+ Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
}
[Fact]
@@ -652,10 +671,10 @@ async Task CheckpointEventDataShouldHold()
this.EventHubConnectionString,
this.StorageConnectionString,
leaseContainerName);
- var receivedEvents = await RunGenericScenario(eventProcessorHostSecond);
+ var runResult = await RunGenericScenario(eventProcessorHostSecond);
// We should have received only 1 event from each partition.
- Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
+ Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
}
[Fact]
@@ -671,7 +690,7 @@ async Task CheckpointBatchShouldHold()
this.EventHubConnectionString,
this.StorageConnectionString,
leaseContainerName);
- await RunGenericScenario(eventProcessorHostFirst, checkPointLastEvent: false, checkPointBatch: true);
+ await RunGenericScenario(eventProcessorHostFirst, checkpointLastEvent: false, checkpointBatch: true);
// For the second time we initiate a host and this time it should pick from where the previous host left.
// In other words, it shouldn't start receiving from start of the stream.
@@ -681,10 +700,100 @@ async Task CheckpointBatchShouldHold()
this.EventHubConnectionString,
this.StorageConnectionString,
leaseContainerName);
- var receivedEvents = await RunGenericScenario(eventProcessorHostSecond);
+ var runResult = await RunGenericScenario(eventProcessorHostSecond);
// We should have received only 1 event from each partition.
- Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
+ Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event");
+ }
+
+ [Fact]
+ async Task HostShouldRecoverAfterReceiverDisconnection()
+ {
+ // We will target one partition and do validation on it.
+ var targetPartition = this.PartitionIds.First();
+
+ int targetPartitionOpens = 0;
+ int targetPartitionCloses = 0;
+ int targetPartitionErrors = 0;
+ PartitionReceiver externalReceiver = null;
+
+ var eventProcessorHost = new EventProcessorHost(
+ "ephhost",
+ string.Empty,
+ PartitionReceiver.DefaultConsumerGroupName,
+ this.EventHubConnectionString,
+ this.StorageConnectionString,
+ Guid.NewGuid().ToString());
+
+ try
+ {
+ var processorFactory = new TestEventProcessorFactory();
+
+ processorFactory.OnCreateProcessor += (f, createArgs) =>
+ {
+ var processor = createArgs.Item2;
+ string partitionId = createArgs.Item1.PartitionId;
+ string hostName = createArgs.Item1.Owner;
+ processor.OnOpen += (_, partitionContext) =>
+ {
+ Log($"{hostName} > Partition {partitionId} TestEventProcessor opened");
+ if (partitionId == targetPartition)
+ {
+ Interlocked.Increment(ref targetPartitionOpens);
+ }
+ };
+ processor.OnClose += (_, closeArgs) =>
+ {
+ Log($"{hostName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}");
+ if (partitionId == targetPartition && closeArgs.Item2 == CloseReason.Shutdown)
+ {
+ Interlocked.Increment(ref targetPartitionCloses);
+ }
+ };
+ processor.OnProcessError += (_, errorArgs) =>
+ {
+ Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}");
+ if (partitionId == targetPartition && errorArgs.Item2 is ReceiverDisconnectedException)
+ {
+ Interlocked.Increment(ref targetPartitionErrors);
+ }
+ };
+ };
+
+ await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory);
+
+ // Wait 15 seconds then create a new epoch receiver.
+ // This will trigger ReceiverDisconnectedExcetion in the host.
+ await Task.Delay(15000);
+
+ Log("Creating a new receiver with epoch 2. This will trigger ReceiverDisconnectedException in the host.");
+ var ehClient = EventHubClient.CreateFromConnectionString(this.EventHubConnectionString);
+ externalReceiver = ehClient.CreateEpochReceiver(PartitionReceiver.DefaultConsumerGroupName,
+ targetPartition, PartitionReceiver.StartOfStream, 2);
+ await externalReceiver.ReceiveAsync(100, TimeSpan.FromSeconds(5));
+
+ // Give another 1 minute for host to recover then do the validatins.
+ await Task.Delay(60000);
+
+ Log("Verifying that host was able to receive ReceiverDisconnectedException");
+ Assert.True(targetPartitionErrors == 1, $"Host received {targetPartitionErrors} ReceiverDisconnectedExceptions!");
+
+ Log("Verifying that host was able to reopen the partition");
+ Assert.True(targetPartitionOpens == 2, $"Host opened target partition {targetPartitionOpens} times!");
+
+ Log("Verifying that host notified by close");
+ Assert.True(targetPartitionCloses == 1, $"Host closed target partition {targetPartitionCloses} times!");
+ }
+ finally
+ {
+ Log("Calling UnregisterEventProcessorAsync");
+ await eventProcessorHost.UnregisterEventProcessorAsync();
+
+ if (externalReceiver != null)
+ {
+ await externalReceiver.CloseAsync();
+ }
+ }
}
///
@@ -704,8 +813,8 @@ async Task NoCheckpointThenNewHostReadsFromStart()
this.EventHubConnectionString,
this.StorageConnectionString,
leaseContainerName);
- var receivedEvents1 = await RunGenericScenario(eventProcessorHostFirst, checkPointLastEvent: false);
- var totalEventsFromFirstHost = receivedEvents1.Sum(part => part.Value.Count);
+ var runResult1 = await RunGenericScenario(eventProcessorHostFirst, checkpointLastEvent: false);
+ var totalEventsFromFirstHost = runResult1.ReceivedEvents.Sum(part => part.Value.Count);
// Second time we initiate a host, it should pick from where previous host left.
// In other words, it shouldn't start receiving from start of the stream.
@@ -715,12 +824,33 @@ async Task NoCheckpointThenNewHostReadsFromStart()
this.EventHubConnectionString,
this.StorageConnectionString,
leaseContainerName);
- var receivedEvents2 = await RunGenericScenario(eventProcessorHostSecond);
- var totalEventsFromSecondHost = receivedEvents2.Sum(part => part.Value.Count);
+ var runResult2 = await RunGenericScenario(eventProcessorHostSecond);
+ var totalEventsFromSecondHost = runResult2.ReceivedEvents.Sum(part => part.Value.Count);
// Second host should have received +partition-count messages.
Assert.True(totalEventsFromFirstHost + PartitionIds.Count() == totalEventsFromSecondHost,
- $"Second host received {receivedEvents2} events where as first host receive {receivedEvents1} events.");
+ $"Second host received {totalEventsFromSecondHost} events where as first host receive {totalEventsFromFirstHost} events.");
+ }
+
+ ///
+ /// Checkpointing every message received should be Ok. No failures expected.
+ ///
+ ///
+ [Fact]
+ async Task CheckpointEveryMessageReceived()
+ {
+ var eventProcessorHost = new EventProcessorHost(
+ null,
+ PartitionReceiver.DefaultConsumerGroupName,
+ this.EventHubConnectionString,
+ this.StorageConnectionString,
+ this.LeaseContainerName);
+
+ var runResult = await RunGenericScenario(eventProcessorHost, totalNumberOfEventsToSend: 10,
+ checkpointLastEvent: false, checkpoingEveryEvent: true);
+
+ // Validate there were not failures.
+ Assert.True(runResult.NumberOfFailures == 0, $"RunResult returned with {runResult.NumberOfFailures} failures!");
}
async Task>> DiscoverEndOfStream()
@@ -737,11 +867,11 @@ async Task>> DiscoverEndOfStream()
return partitions.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
}
- async Task>> RunGenericScenario(EventProcessorHost eventProcessorHost,
- EventProcessorOptions epo = null, int totalNumberOfEventsToSend = 1, bool checkPointLastEvent = true,
- bool checkPointBatch = false)
+ async Task RunGenericScenario(EventProcessorHost eventProcessorHost,
+ EventProcessorOptions epo = null, int totalNumberOfEventsToSend = 1, bool checkpointLastEvent = true,
+ bool checkpointBatch = false, bool checkpoingEveryEvent = false)
{
- var receivedEvents = new ConcurrentDictionary>();
+ var runResult = new GenericScenarioResult();
var lastReceivedAt = DateTime.Now;
if (epo == null)
@@ -767,27 +897,32 @@ async Task>> RunGenericScenario(EventProcesso
string hostName = createArgs.Item1.Owner;
processor.OnOpen += (_, partitionContext) => Log($"{hostName} > Partition {partitionId} TestEventProcessor opened");
processor.OnClose += (_, closeArgs) => Log($"{hostName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}");
- processor.OnProcessError += (_, errorArgs) => Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}");
+ processor.OnProcessError += (_, errorArgs) =>
+ {
+ Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}");
+ Interlocked.Increment(ref runResult.NumberOfFailures);
+ };
processor.OnProcessEvents += (_, eventsArgs) =>
{
int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0;
Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)");
if (eventCount > 0)
{
- List events;
- receivedEvents.TryGetValue(partitionId, out events);
- if (events == null)
+ lastReceivedAt = DateTime.Now;
+ runResult.AddEvents(partitionId, eventsArgs.Item2.events);
+
+ foreach (var e in eventsArgs.Item2.events)
{
- events = new List();
+ // Checkpoint every event received?
+ if (checkpoingEveryEvent)
+ {
+ eventsArgs.Item1.CheckpointAsync(e).Wait();
+ }
}
-
- events.AddRange(eventsArgs.Item2.events);
- receivedEvents[partitionId] = events;
- lastReceivedAt = DateTime.Now;
}
- eventsArgs.Item2.checkPointLastEvent = checkPointLastEvent;
- eventsArgs.Item2.checkPointBatch = checkPointBatch;
+ eventsArgs.Item2.checkPointLastEvent = checkpointLastEvent;
+ eventsArgs.Item2.checkPointBatch = checkpointBatch;
};
};
@@ -814,10 +949,12 @@ async Task>> RunGenericScenario(EventProcesso
await Task.Delay(1000);
}
- Log("Verifying at least an event was received by each partition");
+ Log($"Verifying at least {totalNumberOfEventsToSend} event(s) was received by each partition");
foreach (var partitionId in PartitionIds)
{
- Assert.True(receivedEvents.ContainsKey(partitionId), $"Partition {partitionId} didn't receive any message!");
+ Assert.True(runResult.ReceivedEvents.ContainsKey(partitionId)
+ && runResult.ReceivedEvents[partitionId].Count >= totalNumberOfEventsToSend,
+ $"Partition {partitionId} didn't receive expected number of messages. Expected {totalNumberOfEventsToSend}, received {runResult.ReceivedEvents[partitionId].Count}.");
}
Log("Success");
@@ -828,10 +965,10 @@ async Task>> RunGenericScenario(EventProcesso
await eventProcessorHost.UnregisterEventProcessorAsync();
}
- return receivedEvents.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
+ return runResult;
}
- async Task SendToPartitionAsync(string partitionId, string messageBody, string connectionString)
+ protected async Task SendToPartitionAsync(string partitionId, string messageBody, string connectionString)
{
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString);
try
@@ -853,5 +990,31 @@ protected void Log(string message)
Console.WriteLine(log);
}
}
+
+ class GenericScenarioResult
+ {
+ public ConcurrentDictionary> ReceivedEvents = new ConcurrentDictionary>();
+ public int NumberOfFailures = 0;
+
+ object listLock = new object();
+
+ public void AddEvents(string partitionId, IEnumerable addEvents)
+ {
+ List events;
+ this.ReceivedEvents.TryGetValue(partitionId, out events);
+ if (events == null)
+ {
+ events = new List();
+ }
+
+ // Account the case where 2 hosts racing by working on the same partition.
+ lock (listLock)
+ {
+ events.AddRange(addEvents);
+ }
+
+ this.ReceivedEvents[partitionId] = events;
+ }
+ }
}
diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs
index 54deaa7..ea74511 100644
--- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs
+++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs
@@ -4,6 +4,10 @@
namespace Microsoft.Azure.EventHubs.Processor.UnitTests
{
using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Text;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
@@ -76,5 +80,146 @@ async Task NonexsistentEntity()
Assert.NotNull(ex.InnerException);
Assert.IsType(ex.InnerException);
}
+
+ ///
+ /// While processing events one event causes a failure. Host should be able to recover any error.
+ ///
+ ///
+ [Fact]
+ async Task HostShouldRecoverWhenProcessEventsAsyncThrows()
+ {
+ var lastReceivedAt = DateTime.Now;
+ var lastReceivedAtLock = new object();
+ var poisonMessageReceived = false;
+ var poisonMessageProperty = "poison";
+ var processorFactory = new TestEventProcessorFactory();
+ var receivedEventCounts = new ConcurrentDictionary();
+
+ var eventProcessorHost = new EventProcessorHost(
+ null,
+ PartitionReceiver.DefaultConsumerGroupName,
+ this.EventHubConnectionString,
+ this.StorageConnectionString,
+ this.LeaseContainerName);
+
+ processorFactory.OnCreateProcessor += (f, createArgs) =>
+ {
+ var processor = createArgs.Item2;
+ string partitionId = createArgs.Item1.PartitionId;
+ string hostName = createArgs.Item1.Owner;
+ string consumerGroupName = createArgs.Item1.ConsumerGroupName;
+ processor.OnOpen += (_, partitionContext) => Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor opened");
+ processor.OnClose += (_, closeArgs) => Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}");
+ processor.OnProcessError += (_, errorArgs) =>
+ {
+ Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}");
+
+ // Throw once more here depending on where we are at exception sequence.
+ if (errorArgs.Item2.Message.Contains("ExceptionSequence1"))
+ {
+ throw new Exception("ExceptionSequence2");
+ }
+ };
+ processor.OnProcessEvents += (_, eventsArgs) =>
+ {
+ int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0;
+ Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)");
+ if (eventCount > 0)
+ {
+ lock (lastReceivedAtLock)
+ {
+ lastReceivedAt = DateTime.Now;
+ }
+
+ foreach (var e in eventsArgs.Item2.events)
+ {
+ // If this is poisoned event then throw.
+ if (!poisonMessageReceived && e.Properties.ContainsKey(poisonMessageProperty))
+ {
+ poisonMessageReceived = true;
+ Log($"Received poisoned message from partition {partitionId}");
+ throw new Exception("ExceptionSequence1");
+ }
+
+ // Track received events so we can validate at the end.
+ if (!receivedEventCounts.ContainsKey(partitionId))
+ {
+ receivedEventCounts[partitionId] = 0;
+ }
+
+ receivedEventCounts[partitionId]++;
+ }
+ }
+ };
+ };
+
+ try
+ {
+ Log("Registering processorFactory...");
+ var epo = new EventProcessorOptions()
+ {
+ MaxBatchSize = 100
+ };
+ await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, epo);
+
+ Log("Waiting for partition ownership to settle...");
+ await Task.Delay(TimeSpan.FromSeconds(5));
+
+ // Send first set of messages.
+ Log("Sending an event to each partition as the first set of messages.");
+ var sendTasks = new List();
+ foreach (var partitionId in PartitionIds)
+ {
+ sendTasks.Add(this.SendToPartitionAsync(partitionId, $"{partitionId} event.", this.ConnectionStringBuilder.ToString()));
+ }
+ await Task.WhenAll(sendTasks);
+
+ // Now send 1 poisoned message. This will fail one of the partition pumps.
+ Log($"Sending a poison event to partition {PartitionIds.First()}");
+ var client = EventHubClient.CreateFromConnectionString(this.EventHubConnectionString);
+ var pSender = client.CreatePartitionSender(PartitionIds.First());
+ var ed = new EventData(Encoding.UTF8.GetBytes("This is poison message"));
+ ed.Properties[poisonMessageProperty] = true;
+ await pSender.SendAsync(ed);
+
+ // Wait sometime. The host should fail and then recever during this time.
+ await Task.Delay(30000);
+
+ // Send second set of messages.
+ Log("Sending an event to each partition as the second set of messages.");
+ sendTasks.Clear();
+ foreach (var partitionId in PartitionIds)
+ {
+ sendTasks.Add(this.SendToPartitionAsync(partitionId, $"{partitionId} event.", this.ConnectionStringBuilder.ToString()));
+ }
+ await Task.WhenAll(sendTasks);
+
+ Log("Waiting until hosts are idle, i.e. no more messages to receive.");
+ while (lastReceivedAt > DateTime.Now.AddSeconds(-60))
+ {
+ await Task.Delay(1000);
+ }
+
+ Log("Verifying poison message was received");
+ Assert.True(poisonMessageReceived, "Didn't receive poison message!");
+
+ Log("Verifying received events by each partition");
+ foreach (var partitionId in PartitionIds)
+ {
+ if (!receivedEventCounts.ContainsKey(partitionId))
+ {
+ throw new Exception($"Partition {partitionId} didn't receive any messages!");
+ }
+
+ var receivedEventCount = receivedEventCounts[partitionId];
+ Assert.True(receivedEventCount >= 2, $"Partition {partitionId} received {receivedEventCount} where as at least 2 expected!");
+ }
+ }
+ finally
+ {
+ Log("Calling UnregisterEventProcessorAsync.");
+ await eventProcessorHost.UnregisterEventProcessorAsync();
+ }
+ }
}
}
diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/TestEventProcessor.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/TestEventProcessor.cs
index 72ea6df..d019951 100644
--- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/TestEventProcessor.cs
+++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/TestEventProcessor.cs
@@ -48,11 +48,12 @@ Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable Log($"TestPartitionReceiveHandler.ProcessError {e.GetType().Name}: {e.Message}");
+
+ // Not expecting any errors.
+ handler.ErrorReceived += (s, e) =>
+ {
+ throw new Exception($"TestPartitionReceiveHandler.ProcessError {e.GetType().Name}: {e.Message}");
+ };
+
handler.EventsReceived += (s, eventDatas) =>
{
int count = eventDatas != null ? eventDatas.Count() : 0;
@@ -497,6 +570,10 @@ async Task PartitionReceiverSetReceiveHandler()
}
finally
{
+ // Unregister handler.
+ partitionReceiver.SetReceiveHandler(null);
+
+ // Close clients.
await partitionSender.CloseAsync();
await partitionReceiver.CloseAsync();
}
@@ -681,6 +758,50 @@ async Task ReceiveTimeout()
}
}
+ ///
+ /// Small receive timeout should not throw System.TimeoutException.
+ /// TimeoutException should be returned as NULL to the awaiting client.
+ ///
+ ///
+ [Fact]
+ async Task SmallReceiveTimeout()
+ {
+ var maxClients = 5;
+
+ // Issue receives with 1 second so that some of the Receive calls will timeout while creating AMQP link.
+ // Even those Receive calls should return NULL instead of bubbling the exception up.
+ var receiveTimeoutInSeconds = 1;
+
+ var tasks = Enumerable.Range(0, maxClients)
+ .Select(async i =>
+ {
+ PartitionReceiver receiver = null;
+
+ try
+ {
+ Log($"Testing with {receiveTimeoutInSeconds} seconds on client {i}.");
+
+ // Start receiving from a future time so that Receive call won't be able to fetch any events.
+ var ehClient = EventHubClient.CreateFromConnectionString(this.EventHubsConnectionString);
+ receiver = ehClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, "0", DateTime.UtcNow.AddMinutes(1));
+ var ed = await receiver.ReceiveAsync(1, TimeSpan.FromSeconds(receiveTimeoutInSeconds));
+ if (ed == null)
+ {
+ Log($"Received NULL from client {i}");
+ }
+ }
+ finally
+ {
+ if (receiver != null)
+ {
+ await receiver.CloseAsync();
+ }
+ }
+ });
+
+ await Task.WhenAll(tasks);
+ }
+
[Fact]
async Task PartitionKeyValidation()
{
diff --git a/test/Microsoft.Azure.EventHubs.UnitTests/project.json b/test/Microsoft.Azure.EventHubs.UnitTests/project.json
index e9d5785..6d9b8e8 100644
--- a/test/Microsoft.Azure.EventHubs.UnitTests/project.json
+++ b/test/Microsoft.Azure.EventHubs.UnitTests/project.json
@@ -1,5 +1,5 @@
{
- "version": "1.0.0",
+ "version": "1.0.1",
"testRunner": "xunit",
"buildOptions": {