I’m trying to connect from C# .NET application to Azure Event Hubs Emulator, using real Azure Blob on Azure (before I also switch to Azurite). I have the following docker-compose:
eventhubs:
image: mcr.microsoft.com/azure-messaging/eventhubs-emulator
container_name: eventhubs
ports:
- "5672:5672"
- "9091:9091"
- "10000-10009:10000-10009"
environment:
- EVENT_HUB_NAMESPACE=my-namespace
- EVENT_HUB_NAME=MyEventHub
- ACCEPT_EULA=Y
- AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=mystoragename;AccountKey=${BlobPassword};EndpointSuffix=core.windows.net
networks:
- mynetwork
Where ${BlobPassword} is defined in environment variables.
In my C# .NET application I have:
var clientOptions = new EventProcessorClientOptions
{
ConnectionOptions = new EventHubConnectionOptions
{
TransportType = EventHubsTransportType.AmqpTcp
}
};
EventProcessorClient processor = new EventProcessorClient(
storageClient,
EventHubConsumerClient.DefaultConsumerGroupName,
eventHubConnectionString,
eventHubName,
clientOptions);
And my eventHubConnectionString is as follows:
"NamespaceConnectionString": "Endpoint=sb://localhost:5672/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=here_some_random_key;UseDevelopmentEmulator=true;",
But I receive:
[17:02:49 ERR] Error processing event: The AMQP transport failed to open because the inner transport tcp6 is closed.
[17:02:49 ERR] Partition:
[17:02:49 ERR] Operation: Retrieving list of partition identifiers from a Consumer Client.
Unhandled exception. System.AggregateException: One or more errors occurred. (The AMQP transport failed to open because the inner transport tcp8 is closed.)
System.IO.IOException: The AMQP transport failed to open because the inner transport tcp8 is closed.
at Microsoft.Azure.Amqp.ExceptionDispatcher.Throw(Exception exception)
at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.ConnectAsyncResult.End(IAsyncResult result)
at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.<>c.b__17_1(IAsyncResult r)
at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func
2 endFunction, Action1 endAction, Task
1 promise, Boolean requiresSynchronization)
End of stack trace from previous location —
at Azure.Messaging.EventHubs.Amqp.AmqpConnectionScope.CreateAndOpenConnectionAsync(Version amqpVersion, Uri serviceEndpoint, Uri connectionEndpoint, EventHubsTransportType transportType, IWebProxy proxy, Int32 sendBufferSizeBytes, Int32 receiveBufferSizeBytes, RemoteCertificateValidationCallback certificateValidationCallback, String scopeIdentifier, TimeSpan timeout)
at Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.OnCreateAsync(TimeSpan timeout, CancellationToken cancellationToken) at Microsoft.Azure.Amqp.Singleton
1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken) at Azure.Messaging.EventHubs.Amqp.AmqpConnectionScope.OpenManagementLinkAsync(TimeSpan operationTimeout, TimeSpan linkTimeout, CancellationToken cancellationToken) at Microsoft.Azure.Amqp.FaultTolerantAmqpObject
1.OnCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken) at Microsoft.Azure.Amqp.Singleton
1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.Amqp.AmqpClient.GetPropertiesAsync(EventHubsRetryPolicy retryPolicy, CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.Amqp.AmqpClient.GetPropertiesAsync(EventHubsRetryPolicy retryPolicy, CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.EventHubConnection.GetPropertiesAsync(EventHubsRetryPolicy retryPolicy, CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.Primitives.EventProcessor1.ValidateEventHubsConnectionAsync(CancellationToken cancellationToken) at Azure.Messaging.EventHubs.Primitives.EventProcessor
1.ValidateEventHubsConnectionAsync(CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.Primitives.EventProcessor1.ValidateProcessingPreconditions(CancellationToken cancellationToken) End of inner exception stack trace --- at Azure.Messaging.EventHubs.Primitives.EventProcessor
1.StartProcessingInternalAsync(Boolean async, CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.Primitives.EventProcessor`1.StartProcessingAsync(CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.EventProcessorClient.StartProcessingInternalAsync(Boolean async, CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.EventProcessorClient.StartProcessingAsync(CancellationToken cancellationToken)
In the “docker logs eventhub” I can see:
fail: a.a.aaW[0]
Emulator Start up probe Unsuccessful. MetadataStore Health status: Unhealthy BlobStore Health status: Unhealthy
fail: Microsoft.Extensions.Diagnostics.HealthChecks.DefaultHealthCheckService[103]
Health check Emulator Health Check with status Unhealthy completed after 40208.6284ms with message ‘Emulator Start up probe Unsuccessful. MetadataStore Health status: Unhealthy BlobStore Health status: Unhealthy’
Retry 1 encountered an exception: Emulator Health Check failed.. Waiting 00:00:00 before next retry.
I have tried WireShark and I can see this (indicated in red by WireShark):
Transmission Control Protocol, Src Port: 5672, Dst Port: 65002, Seq: 1, Ack: 1, Len: 0
Null/Loopback
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 5672, Dst Port: 65002, Seq: 1, Ack: 1, Len: 0
Which suggests that this exact packet has no data.
I was expecting to connect to Azure Event Hubs Emulator from C# .NET application.
What can I try to make sure that I can connect to Azure Event Hubs Emulator from my C# .NET application?
Thank you!
The main cause of the error is the Azure Event Hubs Emulator is unable to connect to Azure Blob Storage using the provided connection string, This leads to the emulator’s components (MetadataStore and BlobStore) being in an unhealthy state.
In the below compose file I have configured with the necessary environment variables that points to the Azure Blob Storage account.
I refered to this MSDOC to Use Azurite emulator for Azure Storage development in env.
Test locally by using the Azure Event Hubs emulator
Docker Compose:
version: '3.8'
services:
emulator:
image: "mcr.microsoft.com/azure-messaging/eventhubs-emulator:latest"
container_name: "eventhubs-emulator"
ports:
- "5672:5672" # AMQP port for the Event Hubs Emulator
- "9091:9091" # Management port for the Event Hubs Emulator
environment:
ACCEPT_EULA: "Y"
BLOB_SERVER: azurite
METADATA_SERVER: azurite
AZURE_STORAGE_CONNECTION_STRING: "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFe6kVZk3+0h1t9SQGphf65SlPfuFU=;BlobEndpoint=http://azurite:10000/devstoreaccount1;QueueEndpoint=http://azurite:10001/devstoreaccount1;TableEndpoint=http://azurite:10002/devstoreaccount1;"
depends_on:
- azurite
networks:
- eh-emulator
azurite:
image: "mcr.microsoft.com/azure-storage/azurite:latest"
container_name: "azurite"
ports:
- "10000:10000" # Blob service
- "10001:10001" # Queue service
- "10002:10002" # Table service
command: "azurite-blob --blobHost 0.0.0.0 --blobPort 10000"
networks:
- eh-emulator
networks:
eh-emulator:
driver: bridge
- Update the
.csproj
file to specify compatible versions.
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.12.0-beta.1" />
<PackageReference Include="Azure.Messaging.EventHubs.Processor" Version="5.12.0-beta.1" />
</ItemGroup>
</Project>
Copy the below emulator connection string.
Here is the code which work perfectly. Refered the code from Git.
using Azure.Messaging.EventHubs.Producer;
using Azure.Messaging.EventHubs;
using System.Text;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using System;
internal class Program
{
private static string checkpointBlobContainer = Guid.NewGuid().ToString();
// Ensure that this connection string is correct, and replace <YourSASKeyValue> with the actual SAS key.
private static string eventHubNamespaceConnectionString = "Endpoint=sb://.t/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=obib9i9xxxxxxxxx";
private static string eventHubName = "testevehub";
public static async Task Main(string[] args)
{
try
{
// Sends a batch of events to the event hub
await Send();
// Receives events from the event hub
await Receive();
// Receives the same events from the event hub using an event processor
await CreateCheckpointBlobForTests(checkpointBlobContainer);
await ReceiveWithEventProcessor();
}
catch (Exception ex)
{
Console.WriteLine($"An error occurred: {ex.Message}");
}
Console.ReadKey();
}
private static async Task Send()
{
int numOfEvents = 2;
// Initialize the Event Hub Producer Client
EventHubProducerClient producerClient = new EventHubProducerClient(eventHubNamespaceConnectionString, eventHubName);
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
for (int i = 1; i <= numOfEvents; i++)
{
if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"Event {i}"))))
{
throw new Exception($"Event {i} is too large for the batch and cannot be sent.");
}
}
try
{
await producerClient.SendAsync(eventBatch);
Console.WriteLine($"A batch of {numOfEvents} events has been published.");
}
catch (Exception ex)
{
Console.WriteLine($"Failed to send events: {ex.Message}");
}
finally
{
await producerClient.DisposeAsync();
}
}
private static async Task Receive()
{
try
{
var consumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, eventHubNamespaceConnectionString, eventHubName);
await foreach (PartitionEvent partitionEvent in consumer.ReadEventsAsync(new ReadEventOptions { MaximumWaitTime = TimeSpan.FromSeconds(2) }))
{
if (partitionEvent.Data != null)
{
string messageBody = Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray());
Console.WriteLine($"Message received : '{messageBody}'");
}
else
{
break;
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Failed to receive events: {ex.Message}");
}
}
private static async Task ReceiveWithEventProcessor()
{
try
{
// Blob storage connection string points to Azurite emulator for local testing
BlobContainerClient blobContainerClient = new BlobContainerClient("UseDevelopmentStorage=true", checkpointBlobContainer);
// Create an event processor client to process events in the event hub
var processor = new EventProcessorClient(blobContainerClient, EventHubConsumerClient.DefaultConsumerGroupName, eventHubNamespaceConnectionString, eventHubName);
// Register handlers for processing events and handling errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
// Start the processing
await processor.StartProcessingAsync();
// Wait for 10 seconds for the events to be processed
await Task.Delay(TimeSpan.FromSeconds(10));
// Stop the processing
await processor.StopProcessingAsync();
}
catch (Exception ex)
{
Console.WriteLine($"Error in event processor: {ex.Message}");
}
}
private static Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
try
{
Console.WriteLine("Received event using Event Processor: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
}
catch (Exception ex)
{
Console.WriteLine($"Error processing event: {ex.Message}");
}
return Task.CompletedTask;
}
private static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
Console.WriteLine($"tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
Console.WriteLine($"Error message: {eventArgs.Exception.Message}");
return Task.CompletedTask;
}
private static async Task CreateCheckpointBlobForTests(string blobContainerName)
{
try
{
BlobServiceClient client = new BlobServiceClient("UseDevelopmentStorage=true");
await client.CreateBlobContainerAsync(blobContainerName);
Console.WriteLine($"Created checkpoint blob container: {blobContainerName}");
}
catch (Exception ex)
{
Console.WriteLine($"Error creating checkpoint blob: {ex.Message}");
}
}
}
Now Iam able to connect with event hub with the application.
Containers:
Able to recieve the sent messages.