I am working with a C# program that is subscribing to options contract data with the updates coming in on multiple threads. i would like to run analytics every minute or so, so I was planning on storing the trades as the come in, in a ConcurrentQueue trades. i have my current implementation below which is a function that will attempt to write the trades in batches whenever I call it. Is this sufficient? or is there a better way to do this? It currently works for low volume, but unsure how it will hold up on large volumes of updates. Thanks!
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Npgsql;
public struct PriceSub
{
public DateTime _timestamp;
public string _name;
public string _term;
public DateTime? _expiration;
public string _type;
public string _product;
public decimal? _strike;
public ulong _instrumentID;
public decimal _price;
public decimal _userTV;
public decimal _midImpliedTV;
public string _theoPrice;
public decimal _userVol;
public decimal _midImpliedVol;
public decimal _quantity;
public decimal _totalQuanity;
public decimal _openInterest;
public long _thread;
}
public class TradeWriter
{
private ConcurrentQueue<PriceSub> _trades;
private string _connectionString;
public TradeWriter(ConcurrentQueue<PriceSub> trades, string connectionString)
{
_trades = trades;
_connectionString = connectionString;
}
public async Task WriteTradesToDbAsync(int batchSize)
{
List<PriceSub> batch = new List<PriceSub>();
// Dequeue trades in batches
while (batch.Count < batchSize && _trades.TryDequeue(out var trade))
{
batch.Add(trade);
}
// If there are trades to write, proceed with batch insertion
if (batch.Count > 0)
{
await InsertTradesBatchAsync(batch);
}
}
private async Task InsertTradesBatchAsync(List<PriceSub> tradesBatch)
{
// SQL query for batch insert
string sql = @"INSERT INTO es_options_daily
(_timestamp, _name, _term, _expiration, _type, _product, _strike, _instrumentID,
_price, _userTV, _midImpliedTV, _theoPrice, _userVol, _midImpliedVol,
_quantity, _totalQuantity, _openInterest, _thread)
VALUES
(@timestamp, @name, @term, @expiration, @type, @product, @strike, @instrumentID,
@price, @userTV, @midImpliedTV, @theoPrice, @userVol, @midImpliedVol,
@quantity, @totalQuantity, @openInterest, @thread)";
using (var connection = new NpgsqlConnection(_connectionString))
{
await connection.OpenAsync();
using (var transaction = await connection.BeginTransactionAsync())
{
using (var command = new NpgsqlCommand(sql, connection, transaction))
{
foreach (var trade in tradesBatch)
{
// Add parameters for each trade in the batch
command.Parameters.AddWithValue("@timestamp", trade._timestamp);
command.Parameters.AddWithValue("@name", trade._name);
command.Parameters.AddWithValue("@term", trade._term);
command.Parameters.AddWithValue("@expiration", trade._expiration.HasValue ? (object)trade._expiration.Value : DBNull.Value);
command.Parameters.AddWithValue("@type", trade._type);
command.Parameters.AddWithValue("@product", trade._product);
command.Parameters.AddWithValue("@strike", trade._strike.HasValue ? (object)trade._strike.Value : DBNull.Value);
command.Parameters.AddWithValue("@instrumentID", trade._instrumentID);
command.Parameters.AddWithValue("@price", trade._price);
command.Parameters.AddWithValue("@userTV", trade._userTV);
command.Parameters.AddWithValue("@midImpliedTV", trade._midImpliedTV);
command.Parameters.AddWithValue("@theoPrice", trade._theoPrice);
command.Parameters.AddWithValue("@userVol", trade._userVol);
command.Parameters.AddWithValue("@midImpliedVol", trade._midImpliedVol);
command.Parameters.AddWithValue("@quantity", trade._quantity);
command.Parameters.AddWithValue("@totalQuantity", trade._totalQuanity);
command.Parameters.AddWithValue("@openInterest", trade._openInterest);
command.Parameters.AddWithValue("@thread", trade._thread);
// Execute insert for the current trade in the batch
await command.ExecuteNonQueryAsync();
// Clear parameters for the next trade
command.Parameters.Clear();
}
}
// Commit the transaction after batch insertion
await transaction.CommitAsync();
}
}
}
}
2