I am trying to push approx. 500KB size message into Service Bus Topic. But I am able to push only upto 64KB. As per the Microsoft documentation, we should be able to push this size with premium tier.
Am I missing something here?
Below is my code to push message into Service Bus Topic:
<code> public async Task PublishMessageIntoEventTypeAsync(EventReceiver @event)
{
logger.LogInformation($"Start publishing notification event {@event.EventType} with requestID: {@event.EventMessage.RequestID} " +
$"into service bus topic: {serviceBusConfigOption.TopicEventTypes}");
var serviceBusClient = messageBusFactory.GetClient(serviceBusConfigOption.TopicEventTypes);
await serviceBusClient.PublishMessageAsync(@event.EventMessage, @event.EventMessage.RequestID.ToString(), nameof(@event.EventType), @event.EventType);
logger.LogInformation($"Successfully published notification event {@event.EventType} with requestID: {@event.EventMessage.RequestID} " +
$"into service bus topic: {serviceBusConfigOption.TopicEventTypes}");
}
/// <summary>
/// AzureServiceBusFactory
/// </summary>
public class AzureServiceBusFactory : IMessageBusFactory
{
#region private fields
private readonly object _lockObject = new object();
private readonly ConcurrentDictionary<string, ServiceBusClient> _clients = new ConcurrentDictionary<string, ServiceBusClient>();
private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new ConcurrentDictionary<string, ServiceBusSender>();
private readonly ServiceBusConfigOption serviceBusConfigOption;
#endregion
#region ctors
public AzureServiceBusFactory(IOptions<ServiceBusConfigOption> serviceBusConfigOption)
{
this.serviceBusConfigOption = serviceBusConfigOption.Value;
}
#endregion
#region public methods
/// <summary>
/// Get ServiceBusClient
/// </summary>
/// <param name="senderName"></param>
/// <returns>IMessageBus</returns>
public IMessageBus GetClient(string senderName)
{
var connectionString = serviceBusConfigOption.ConnectionString;
var key = $"{connectionString}-{senderName}";
if (this._senders.ContainsKey(key) && !this._senders[key].IsClosed)
{
return AzureServiceBus.Create(this._senders[key]);
}
var client = this.GetServiceBusClient(connectionString);
lock (this._lockObject)
{
if (this._senders.ContainsKey(key) && this._senders[key].IsClosed)
{
if (this._senders[key].IsClosed)
{
this._senders[key].DisposeAsync().GetAwaiter().GetResult();
}
return AzureServiceBus.Create(this._senders[key]);
}
var sender = client.CreateSender(senderName);
this._senders[key] = sender;
}
return AzureServiceBus.Create(this._senders[key]);
}
#endregion
#region protected methods
/// <summary>
/// Create ServiceBusClient
/// </summary>
/// <param name="connectionString"></param>
/// <returns></returns>
protected virtual ServiceBusClient GetServiceBusClient(string connectionString)
{
var key = $"{connectionString}";
lock (this._lockObject)
{
if (this.ClientDoesnotExistOrIsClosed(connectionString))
{
var client = new ServiceBusClient(connectionString, new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpTcp
});
this._clients[key] = client;
}
return this._clients[key];
}
}
#endregion
#region private methods
/// <summary>
/// Check ClientDoesnotExistOrIsClosed
/// </summary>
/// <param name="connectionString"></param>
/// <returns></returns>
private bool ClientDoesnotExistOrIsClosed(string connectionString)
{
return !this._clients.ContainsKey(connectionString) || this._clients[connectionString].IsClosed;
}
#endregion
}
/// <summary>
/// AzureServiceBus
/// </summary>
internal class AzureServiceBus : IMessageBus
{
#region private fields
private const string ContentType = "application/json";
private readonly ServiceBusSender _serviceBusSender;
#endregion
#region ctors
internal AzureServiceBus(ServiceBusSender serviceBusSender)
{
this._serviceBusSender = serviceBusSender;
}
#endregion
#region public methods
/// <summary>
/// Publish message into Service Bus Topic
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="message"></param>
/// <returns></returns>
public async Task PublishMessageAsync<T>(T message)
{
var serviceBusMessage = new ServiceBusMessage(Encoding.UTF8.GetBytes(message.AsJson()))
{
ContentType = ContentType
};
await this._serviceBusSender.SendMessageAsync(serviceBusMessage);
}
/// <summary>
/// Publish message into Service Bus Topic
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="message"></param>
/// <param name="messageId"></param>
/// <param name="filterPropertyName"></param>
/// <param name="filterPropertyValue"></param>
/// <returns></returns>
public async Task PublishMessageAsync<T>(T message, string messageId, string filterPropertyName, string filterPropertyValue)
{
var serviceBusMessage = new ServiceBusMessage(Encoding.UTF8.GetBytes(message.AsJson()))
{
ContentType = ContentType,
MessageId = messageId,
CorrelationId = filterPropertyValue
};
serviceBusMessage.ApplicationProperties.Add(filterPropertyName, filterPropertyValue);
await this._serviceBusSender.SendMessageAsync(serviceBusMessage);
}
#endregion
#region internal methods
internal static IMessageBus Create(ServiceBusSender sender)
{
return new AzureServiceBus(sender);
}
#endregion
}
</code>
<code> public async Task PublishMessageIntoEventTypeAsync(EventReceiver @event)
{
logger.LogInformation($"Start publishing notification event {@event.EventType} with requestID: {@event.EventMessage.RequestID} " +
$"into service bus topic: {serviceBusConfigOption.TopicEventTypes}");
var serviceBusClient = messageBusFactory.GetClient(serviceBusConfigOption.TopicEventTypes);
await serviceBusClient.PublishMessageAsync(@event.EventMessage, @event.EventMessage.RequestID.ToString(), nameof(@event.EventType), @event.EventType);
logger.LogInformation($"Successfully published notification event {@event.EventType} with requestID: {@event.EventMessage.RequestID} " +
$"into service bus topic: {serviceBusConfigOption.TopicEventTypes}");
}
/// <summary>
/// AzureServiceBusFactory
/// </summary>
public class AzureServiceBusFactory : IMessageBusFactory
{
#region private fields
private readonly object _lockObject = new object();
private readonly ConcurrentDictionary<string, ServiceBusClient> _clients = new ConcurrentDictionary<string, ServiceBusClient>();
private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new ConcurrentDictionary<string, ServiceBusSender>();
private readonly ServiceBusConfigOption serviceBusConfigOption;
#endregion
#region ctors
public AzureServiceBusFactory(IOptions<ServiceBusConfigOption> serviceBusConfigOption)
{
this.serviceBusConfigOption = serviceBusConfigOption.Value;
}
#endregion
#region public methods
/// <summary>
/// Get ServiceBusClient
/// </summary>
/// <param name="senderName"></param>
/// <returns>IMessageBus</returns>
public IMessageBus GetClient(string senderName)
{
var connectionString = serviceBusConfigOption.ConnectionString;
var key = $"{connectionString}-{senderName}";
if (this._senders.ContainsKey(key) && !this._senders[key].IsClosed)
{
return AzureServiceBus.Create(this._senders[key]);
}
var client = this.GetServiceBusClient(connectionString);
lock (this._lockObject)
{
if (this._senders.ContainsKey(key) && this._senders[key].IsClosed)
{
if (this._senders[key].IsClosed)
{
this._senders[key].DisposeAsync().GetAwaiter().GetResult();
}
return AzureServiceBus.Create(this._senders[key]);
}
var sender = client.CreateSender(senderName);
this._senders[key] = sender;
}
return AzureServiceBus.Create(this._senders[key]);
}
#endregion
#region protected methods
/// <summary>
/// Create ServiceBusClient
/// </summary>
/// <param name="connectionString"></param>
/// <returns></returns>
protected virtual ServiceBusClient GetServiceBusClient(string connectionString)
{
var key = $"{connectionString}";
lock (this._lockObject)
{
if (this.ClientDoesnotExistOrIsClosed(connectionString))
{
var client = new ServiceBusClient(connectionString, new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpTcp
});
this._clients[key] = client;
}
return this._clients[key];
}
}
#endregion
#region private methods
/// <summary>
/// Check ClientDoesnotExistOrIsClosed
/// </summary>
/// <param name="connectionString"></param>
/// <returns></returns>
private bool ClientDoesnotExistOrIsClosed(string connectionString)
{
return !this._clients.ContainsKey(connectionString) || this._clients[connectionString].IsClosed;
}
#endregion
}
/// <summary>
/// AzureServiceBus
/// </summary>
internal class AzureServiceBus : IMessageBus
{
#region private fields
private const string ContentType = "application/json";
private readonly ServiceBusSender _serviceBusSender;
#endregion
#region ctors
internal AzureServiceBus(ServiceBusSender serviceBusSender)
{
this._serviceBusSender = serviceBusSender;
}
#endregion
#region public methods
/// <summary>
/// Publish message into Service Bus Topic
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="message"></param>
/// <returns></returns>
public async Task PublishMessageAsync<T>(T message)
{
var serviceBusMessage = new ServiceBusMessage(Encoding.UTF8.GetBytes(message.AsJson()))
{
ContentType = ContentType
};
await this._serviceBusSender.SendMessageAsync(serviceBusMessage);
}
/// <summary>
/// Publish message into Service Bus Topic
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="message"></param>
/// <param name="messageId"></param>
/// <param name="filterPropertyName"></param>
/// <param name="filterPropertyValue"></param>
/// <returns></returns>
public async Task PublishMessageAsync<T>(T message, string messageId, string filterPropertyName, string filterPropertyValue)
{
var serviceBusMessage = new ServiceBusMessage(Encoding.UTF8.GetBytes(message.AsJson()))
{
ContentType = ContentType,
MessageId = messageId,
CorrelationId = filterPropertyValue
};
serviceBusMessage.ApplicationProperties.Add(filterPropertyName, filterPropertyValue);
await this._serviceBusSender.SendMessageAsync(serviceBusMessage);
}
#endregion
#region internal methods
internal static IMessageBus Create(ServiceBusSender sender)
{
return new AzureServiceBus(sender);
}
#endregion
}
</code>
public async Task PublishMessageIntoEventTypeAsync(EventReceiver @event)
{
logger.LogInformation($"Start publishing notification event {@event.EventType} with requestID: {@event.EventMessage.RequestID} " +
$"into service bus topic: {serviceBusConfigOption.TopicEventTypes}");
var serviceBusClient = messageBusFactory.GetClient(serviceBusConfigOption.TopicEventTypes);
await serviceBusClient.PublishMessageAsync(@event.EventMessage, @event.EventMessage.RequestID.ToString(), nameof(@event.EventType), @event.EventType);
logger.LogInformation($"Successfully published notification event {@event.EventType} with requestID: {@event.EventMessage.RequestID} " +
$"into service bus topic: {serviceBusConfigOption.TopicEventTypes}");
}
/// <summary>
/// AzureServiceBusFactory
/// </summary>
public class AzureServiceBusFactory : IMessageBusFactory
{
#region private fields
private readonly object _lockObject = new object();
private readonly ConcurrentDictionary<string, ServiceBusClient> _clients = new ConcurrentDictionary<string, ServiceBusClient>();
private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new ConcurrentDictionary<string, ServiceBusSender>();
private readonly ServiceBusConfigOption serviceBusConfigOption;
#endregion
#region ctors
public AzureServiceBusFactory(IOptions<ServiceBusConfigOption> serviceBusConfigOption)
{
this.serviceBusConfigOption = serviceBusConfigOption.Value;
}
#endregion
#region public methods
/// <summary>
/// Get ServiceBusClient
/// </summary>
/// <param name="senderName"></param>
/// <returns>IMessageBus</returns>
public IMessageBus GetClient(string senderName)
{
var connectionString = serviceBusConfigOption.ConnectionString;
var key = $"{connectionString}-{senderName}";
if (this._senders.ContainsKey(key) && !this._senders[key].IsClosed)
{
return AzureServiceBus.Create(this._senders[key]);
}
var client = this.GetServiceBusClient(connectionString);
lock (this._lockObject)
{
if (this._senders.ContainsKey(key) && this._senders[key].IsClosed)
{
if (this._senders[key].IsClosed)
{
this._senders[key].DisposeAsync().GetAwaiter().GetResult();
}
return AzureServiceBus.Create(this._senders[key]);
}
var sender = client.CreateSender(senderName);
this._senders[key] = sender;
}
return AzureServiceBus.Create(this._senders[key]);
}
#endregion
#region protected methods
/// <summary>
/// Create ServiceBusClient
/// </summary>
/// <param name="connectionString"></param>
/// <returns></returns>
protected virtual ServiceBusClient GetServiceBusClient(string connectionString)
{
var key = $"{connectionString}";
lock (this._lockObject)
{
if (this.ClientDoesnotExistOrIsClosed(connectionString))
{
var client = new ServiceBusClient(connectionString, new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpTcp
});
this._clients[key] = client;
}
return this._clients[key];
}
}
#endregion
#region private methods
/// <summary>
/// Check ClientDoesnotExistOrIsClosed
/// </summary>
/// <param name="connectionString"></param>
/// <returns></returns>
private bool ClientDoesnotExistOrIsClosed(string connectionString)
{
return !this._clients.ContainsKey(connectionString) || this._clients[connectionString].IsClosed;
}
#endregion
}
/// <summary>
/// AzureServiceBus
/// </summary>
internal class AzureServiceBus : IMessageBus
{
#region private fields
private const string ContentType = "application/json";
private readonly ServiceBusSender _serviceBusSender;
#endregion
#region ctors
internal AzureServiceBus(ServiceBusSender serviceBusSender)
{
this._serviceBusSender = serviceBusSender;
}
#endregion
#region public methods
/// <summary>
/// Publish message into Service Bus Topic
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="message"></param>
/// <returns></returns>
public async Task PublishMessageAsync<T>(T message)
{
var serviceBusMessage = new ServiceBusMessage(Encoding.UTF8.GetBytes(message.AsJson()))
{
ContentType = ContentType
};
await this._serviceBusSender.SendMessageAsync(serviceBusMessage);
}
/// <summary>
/// Publish message into Service Bus Topic
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="message"></param>
/// <param name="messageId"></param>
/// <param name="filterPropertyName"></param>
/// <param name="filterPropertyValue"></param>
/// <returns></returns>
public async Task PublishMessageAsync<T>(T message, string messageId, string filterPropertyName, string filterPropertyValue)
{
var serviceBusMessage = new ServiceBusMessage(Encoding.UTF8.GetBytes(message.AsJson()))
{
ContentType = ContentType,
MessageId = messageId,
CorrelationId = filterPropertyValue
};
serviceBusMessage.ApplicationProperties.Add(filterPropertyName, filterPropertyValue);
await this._serviceBusSender.SendMessageAsync(serviceBusMessage);
}
#endregion
#region internal methods
internal static IMessageBus Create(ServiceBusSender sender)
{
return new AzureServiceBus(sender);
}
#endregion
}
3