Cannot connect to Azure Event Hubs Emulator from C# .NET application

I’m trying to connect from C# .NET application to Azure Event Hubs Emulator, using real Azure Blob on Azure (before I also switch to Azurite). I have the following docker-compose:

  eventhubs:
    image: mcr.microsoft.com/azure-messaging/eventhubs-emulator 
    container_name: eventhubs
    ports:
      - "5672:5672"
      - "9091:9091" 
      - "10000-10009:10000-10009" 
    environment:
      - EVENT_HUB_NAMESPACE=my-namespace
      - EVENT_HUB_NAME=MyEventHub
      - ACCEPT_EULA=Y
      - AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=mystoragename;AccountKey=${BlobPassword};EndpointSuffix=core.windows.net
    networks:
      - mynetwork

Where ${BlobPassword} is defined in environment variables.

In my C# .NET application I have:

            var clientOptions = new EventProcessorClientOptions
            {
                ConnectionOptions = new EventHubConnectionOptions
                {
                    TransportType = EventHubsTransportType.AmqpTcp
                }
            };

            EventProcessorClient processor = new EventProcessorClient(
                storageClient,
                EventHubConsumerClient.DefaultConsumerGroupName,
                eventHubConnectionString,
                eventHubName,
                clientOptions);

And my eventHubConnectionString is as follows:

"NamespaceConnectionString": "Endpoint=sb://localhost:5672/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=here_some_random_key;UseDevelopmentEmulator=true;",

But I receive:

[17:02:49 ERR] Error processing event: The AMQP transport failed to open because the inner transport tcp6 is closed.
[17:02:49 ERR] Partition:
[17:02:49 ERR] Operation: Retrieving list of partition identifiers from a Consumer Client.
Unhandled exception. System.AggregateException: One or more errors occurred. (The AMQP transport failed to open because the inner transport tcp8 is closed.)
System.IO.IOException: The AMQP transport failed to open because the inner transport tcp8 is closed.
at Microsoft.Azure.Amqp.ExceptionDispatcher.Throw(Exception exception)
at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.ConnectAsyncResult.End(IAsyncResult result)
at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.<>c.b__17_1(IAsyncResult r)
at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func2 endFunction, Action1 endAction, Task1 promise, Boolean requiresSynchronization)
End of stack trace from previous location —
at Azure.Messaging.EventHubs.Amqp.AmqpConnectionScope.CreateAndOpenConnectionAsync(Version amqpVersion, Uri serviceEndpoint, Uri connectionEndpoint, EventHubsTransportType transportType, IWebProxy proxy, Int32 sendBufferSizeBytes, Int32 receiveBufferSizeBytes, RemoteCertificateValidationCallback certificateValidationCallback, String scopeIdentifier, TimeSpan timeout)
at Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.OnCreateAsync(TimeSpan timeout, CancellationToken cancellationToken) at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken) at Azure.Messaging.EventHubs.Amqp.AmqpConnectionScope.OpenManagementLinkAsync(TimeSpan operationTimeout, TimeSpan linkTimeout, CancellationToken cancellationToken) at Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.OnCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken) at Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.Amqp.AmqpClient.GetPropertiesAsync(EventHubsRetryPolicy retryPolicy, CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.Amqp.AmqpClient.GetPropertiesAsync(EventHubsRetryPolicy retryPolicy, CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.EventHubConnection.GetPropertiesAsync(EventHubsRetryPolicy retryPolicy, CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.Primitives.EventProcessor1.ValidateEventHubsConnectionAsync(CancellationToken cancellationToken) at Azure.Messaging.EventHubs.Primitives.EventProcessor1.ValidateEventHubsConnectionAsync(CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.Primitives.EventProcessor1.ValidateProcessingPreconditions(CancellationToken cancellationToken) End of inner exception stack trace --- at Azure.Messaging.EventHubs.Primitives.EventProcessor1.StartProcessingInternalAsync(Boolean async, CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.Primitives.EventProcessor`1.StartProcessingAsync(CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.EventProcessorClient.StartProcessingInternalAsync(Boolean async, CancellationToken cancellationToken)
at Azure.Messaging.EventHubs.EventProcessorClient.StartProcessingAsync(CancellationToken cancellationToken)

In the “docker logs eventhub” I can see:

fail: a.a.aaW[0]
Emulator Start up probe Unsuccessful. MetadataStore Health status: Unhealthy BlobStore Health status: Unhealthy
fail: Microsoft.Extensions.Diagnostics.HealthChecks.DefaultHealthCheckService[103]
Health check Emulator Health Check with status Unhealthy completed after 40208.6284ms with message ‘Emulator Start up probe Unsuccessful. MetadataStore Health status: Unhealthy BlobStore Health status: Unhealthy’
Retry 1 encountered an exception: Emulator Health Check failed.. Waiting 00:00:00 before next retry.

I have tried WireShark and I can see this (indicated in red by WireShark):

Transmission Control Protocol, Src Port: 5672, Dst Port: 65002, Seq: 1, Ack: 1, Len: 0
Null/Loopback
Internet Protocol Version 4, Src: 127.0.0.1, Dst: 127.0.0.1
Transmission Control Protocol, Src Port: 5672, Dst Port: 65002, Seq: 1, Ack: 1, Len: 0

Which suggests that this exact packet has no data.

I was expecting to connect to Azure Event Hubs Emulator from C# .NET application.

What can I try to make sure that I can connect to Azure Event Hubs Emulator from my C# .NET application?

Thank you!

The main cause of the error is the Azure Event Hubs Emulator is unable to connect to Azure Blob Storage using the provided connection string, This leads to the emulator’s components (MetadataStore and BlobStore) being in an unhealthy state.

In the below compose file I have configured with the necessary environment variables that points to the Azure Blob Storage account.

I refered to this MSDOC to Use Azurite emulator for Azure Storage development in env.

Test locally by using the Azure Event Hubs emulator

Docker Compose:

version: '3.8'
services:
  emulator:
    image: "mcr.microsoft.com/azure-messaging/eventhubs-emulator:latest"
    container_name: "eventhubs-emulator"
    ports:
      - "5672:5672"  # AMQP port for the Event Hubs Emulator
      - "9091:9091"  # Management port for the Event Hubs Emulator
    environment:
      ACCEPT_EULA: "Y"
      BLOB_SERVER: azurite
      METADATA_SERVER: azurite
      AZURE_STORAGE_CONNECTION_STRING: "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFe6kVZk3+0h1t9SQGphf65SlPfuFU=;BlobEndpoint=http://azurite:10000/devstoreaccount1;QueueEndpoint=http://azurite:10001/devstoreaccount1;TableEndpoint=http://azurite:10002/devstoreaccount1;"
    depends_on:
      - azurite
    networks:
      - eh-emulator

  azurite:
    image: "mcr.microsoft.com/azure-storage/azurite:latest"
    container_name: "azurite"
    ports:
      - "10000:10000"  # Blob service
      - "10001:10001"  # Queue service
      - "10002:10002"  # Table service
    command: "azurite-blob --blobHost 0.0.0.0 --blobPort 10000"
    networks:
      - eh-emulator

networks:
  eh-emulator:
    driver: bridge

  • Update the .csproj file to specify compatible versions.
<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net6.0</TargetFramework>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Azure.Messaging.EventHubs" Version="5.12.0-beta.1" />
    <PackageReference Include="Azure.Messaging.EventHubs.Processor" Version="5.12.0-beta.1" />
  </ItemGroup>
</Project>

Copy the below emulator connection string.

Here is the code which work perfectly. Refered the code from Git.

using Azure.Messaging.EventHubs.Producer;
using Azure.Messaging.EventHubs;
using System.Text;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using System;

internal class Program
{
    private static string checkpointBlobContainer = Guid.NewGuid().ToString();

    // Ensure that this connection string is correct, and replace <YourSASKeyValue> with the actual SAS key.
    private static string eventHubNamespaceConnectionString = "Endpoint=sb://.t/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=obib9i9xxxxxxxxx";
    private static string eventHubName = "testevehub";

    public static async Task Main(string[] args)
    {
        try
        {
            // Sends a batch of events to the event hub
            await Send();

            // Receives events from the event hub
            await Receive();

            // Receives the same events from the event hub using an event processor
            await CreateCheckpointBlobForTests(checkpointBlobContainer);
            await ReceiveWithEventProcessor();
        }
        catch (Exception ex)
        {
            Console.WriteLine($"An error occurred: {ex.Message}");
        }

        Console.ReadKey();
    }

    private static async Task Send()
    {
        int numOfEvents = 2;

        // Initialize the Event Hub Producer Client
        EventHubProducerClient producerClient = new EventHubProducerClient(eventHubNamespaceConnectionString, eventHubName);

        using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();

        for (int i = 1; i <= numOfEvents; i++)
        {
            if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes($"Event {i}"))))
            {
                throw new Exception($"Event {i} is too large for the batch and cannot be sent.");
            }
        }

        try
        {
            await producerClient.SendAsync(eventBatch);
            Console.WriteLine($"A batch of {numOfEvents} events has been published.");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Failed to send events: {ex.Message}");
        }
        finally
        {
            await producerClient.DisposeAsync();
        }
    }

    private static async Task Receive()
    {
        try
        {
            var consumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, eventHubNamespaceConnectionString, eventHubName);

            await foreach (PartitionEvent partitionEvent in consumer.ReadEventsAsync(new ReadEventOptions { MaximumWaitTime = TimeSpan.FromSeconds(2) }))
            {
                if (partitionEvent.Data != null)
                {
                    string messageBody = Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray());
                    Console.WriteLine($"Message received : '{messageBody}'");
                }
                else
                {
                    break;
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Failed to receive events: {ex.Message}");
        }
    }

    private static async Task ReceiveWithEventProcessor()
    {
        try
        {
            // Blob storage connection string points to Azurite emulator for local testing
            BlobContainerClient blobContainerClient = new BlobContainerClient("UseDevelopmentStorage=true", checkpointBlobContainer);

            // Create an event processor client to process events in the event hub
            var processor = new EventProcessorClient(blobContainerClient, EventHubConsumerClient.DefaultConsumerGroupName, eventHubNamespaceConnectionString, eventHubName);

            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;

            // Start the processing
            await processor.StartProcessingAsync();

            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(10));

            // Stop the processing
            await processor.StopProcessingAsync();
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error in event processor: {ex.Message}");
        }
    }

    private static Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        try
        {
            Console.WriteLine("Received event using Event Processor: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error processing event: {ex.Message}");
        }
        return Task.CompletedTask;
    }

    private static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        Console.WriteLine($"tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine($"Error message: {eventArgs.Exception.Message}");
        return Task.CompletedTask;
    }

    private static async Task CreateCheckpointBlobForTests(string blobContainerName)
    {
        try
        {
            BlobServiceClient client = new BlobServiceClient("UseDevelopmentStorage=true");
            await client.CreateBlobContainerAsync(blobContainerName);
            Console.WriteLine($"Created checkpoint blob container: {blobContainerName}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error creating checkpoint blob: {ex.Message}");
        }
    }
}

Now Iam able to connect with event hub with the application.

Containers:

Able to recieve the sent messages.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật