BackGround Info: For a Home Automation, an KNX installation (running sound and stable), setup a Group Address recorder. Logging values of heating control valve actuators positions, room temperatures, temperature Setpoints, Outside Temp, ect. KNX supplies the Falcon SDK to get access to the bus. This SDK facilitates a bus “MessageReceived” Event. With this you can capture all traffic and filter for EventType. With method “bus.RequestGroupValueAsync(<groupaddress)” you can ping the bus actively. Up on arrival, I’m converting the inbound telegrams into a business object “Telegram”, buffer this a List and write towards table in a MariaDB (wit INSERT Stored Procedure). The logging is working. NOTE: Inbound traffic on a “busy bus” can be average 3 msg/sec up to 12 msg/sec at peak.
What I have tried: I have created a Winforms app. See code snippets below. This concept works, however:
QUESTION: What is a “best practice” for bus -> reading & buffer in Cue -> Sequential Writes to MariaDB (INSERT) in Async way?
///// Code snippets /////
private void frmKNXFalconTest_Load(object sender, EventArgs e)
{
gAddress_OnOff = "2/0/0";
gAddress_FB = "2/0/3";
bsTelegram.DataSource = _inboundLogs;
Connect_KNXBus();
}
private void Connect_KNXBus()
{
try
{
bus = new KnxBus(KNX_Connection.Set_IpTunnelingConnectorParameters());
// Register an event handler
bus.GroupMessageReceived += KNX_Bus_Reader;
//Connect bus
bus.Connect();
Console.WriteLine($"## KNX Bus Connection State: {bus.ConnectionState} #####");
labelControl_Connection.Text = $"KNX Bus Connection State: {bus.ConnectionState}";
connected = true;
timer1.Start();
RequestGroupValue_Light_Async();
}
catch (Exception ex)
{
labelControl_Connection.Text = "Error: No KNX Connection";
labelControl_Connection.ForeColor = Color.Red;
Console.WriteLine(ex.ToString());
}
}
private async void KNX_Bus_Reader(object sender, GroupEventArgs e)
{
try
{
if (connected == false)
return;
string value = string.Empty;
bool onRequest = false;
if (e.Value == null)
{
value = "null";
}
else
{
value = e.Value.ToString();
}
if (e.EventType == GroupEventType.ValueResponse)
{
onRequest = true;
}
await Task.Run(() => Write_to_InboundBuffer_and_Refresh_Screen(e.EventType.ToString(), e.DestinationAddress.ToString(), value, e.SourceAddress.ToString(), onRequest));
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
private void Write_to_InboundBuffer_and_Refresh_Screen(string eventType, string destinationAddress, string value, string sourceAddress, bool onRequest)
{
try
{
#region Set Telegram
telegramId++;
Telegram telegram = new Telegram();
telegram = Set_TeleGram(telegramId, eventType, destinationAddress, value, sourceAddress, onRequest, telegram.InboundTime = DateTime.Now);
//var dpt = DptFactory.Default.Get(mainType, subType);
//var formatted = dpt.Format(groupValue, null, null);
_inboundLogs.Add(telegram);
_inBoundBuffer.Add(telegram);
string msg = "InboundBuffer - count:";
Console.WriteLine($"{msg.PadRight(25)} | {_inBoundBuffer.Count.ToString().PadLeft(2)} | {DateTime.Now}");
#endregion
#region Refresh Screen
// Now update label controls and the Grid on the form
if (gridControl_Telegram.InvokeRequired)
{
this.Invoke((MethodInvoker)delegate ()
{
gridControl_Telegram.RefreshDataSource();
gridView_Telegram.MoveLast();
});
}
if (lblGaAddress.InvokeRequired)
{
this.Invoke((MethodInvoker)delegate ()
{
if (value != "null")
{
lblGaAddress.Text = destinationAddress;
}
else
{
lblGaAddress.Text = "null";
}
});
}
if (lblGaValue.InvokeRequired)
{
this.Invoke((MethodInvoker)delegate ()
{
if (value != "null")
{
lblGaValue.Text = destinationAddress;
}
else
{
lblGaValue.Text = "null";
}
});
}
if (value != "null")
{
if (lblLightStatus.InvokeRequired)
{
this.Invoke((MethodInvoker)delegate ()
{
if (gaAddress == gAddress_FB && gaValue == "0") { lblLightStatus.Text = "Light Off"; }
if (gaAddress == gAddress_FB && gaValue == "1") { lblLightStatus.Text = "Light On"; }
});
}
}
#endregion
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
private Telegram Set_TeleGram(int id, string eventType, string destinationAddress, string value, string sourceAddress, bool onRequest, DateTime inboundTime)
{
Telegram t = new Telegram();
t.Id = id;
t.EventType = eventType;
t.DestinationAddress = destinationAddress;
t.Value = value;
t.SourceAddress = sourceAddress;
t.OnRequest = onRequest;
t.InboundTime = inboundTime;
return t;
}
private void timer1_Tick(object sender, EventArgs e)
{
CueBuffer_WriteSQL_Async();
}
private async void CueBuffer_WriteSQL_Async()
{
try
{
if (_inBoundBuffer.Count > 0)
{
List<Telegram> cueBuffer = new List<Telegram>();
// Copy telegram objects from _inboundBuffer -> cueBuffer
foreach (Telegram t in _inBoundBuffer)
{
cueBuffer.Add(t);
}
// Declare List<Task>
List<Task> tasks = new List<Task>();
// Launch SQL INSERTs in parallel
foreach (Telegram t in cueBuffer)
{
tasks.Add(Task.Run(() => DataAccess.BusLog_01_Insert(t)));
}
await Task.WhenAll(tasks);
// Remove the objects that having succesfully written to SQL
foreach (Telegram t in cueBuffer)
{
if (t.Copied)
{
_inBoundBuffer.Remove(t);
}
}
string msg = "InboundBuffer - cleared:";
Console.WriteLine($"{msg.PadRight(25)} | {_inBoundBuffer.Count.ToString().PadLeft(2)} | {DateTime.Now}");
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
public static int BusLog_01_Insert(Telegram telegram)
{
int newRecordId = -1;
// Get the ConnectionString
string connectionString = Get_ConnectionString();
// Setup the Connection
using (MySqlConnection connection = new MySqlConnection(connectionString))
{
// Set CommandText refer to SQL Stored Procedure
string commandText = "BusLog_01_Insert";
// Setup the Command
using (MySqlCommand cmd = new MySqlCommand())
{
cmd.Connection = connection;
cmd.CommandText = commandText;
cmd.CommandType = CommandType.StoredProcedure;
//Add Parameters
cmd.Parameters.AddWithValue("p_eventType", telegram.EventType);
cmd.Parameters.AddWithValue("p_groupAddress", telegram.DestinationAddress);
cmd.Parameters.AddWithValue("p_ga_value", telegram.Value);
cmd.Parameters.AddWithValue("p_individualAddress", telegram.DestinationAddress);
cmd.Parameters.AddWithValue("p_onrequest", telegram.OnRequest);
cmd.Parameters.AddWithValue("p_inboundtime", telegram.InboundTime);
//Add OUTPUT parameter - This to retrieve the Id of the new record
MySqlParameter pm_newRecordId = new MySqlParameter();
pm_newRecordId.ParameterName = "newRecordId";
pm_newRecordId.MySqlType = MySqlType.BigInt;
pm_newRecordId.Direction = ParameterDirection.Output;
cmd.Parameters.Add(pm_newRecordId);
// Open Connection
connection.Open();
try
{
// Execute the command
cmd.ExecuteNonQuery();
// Get feedback
object objNewRecordId = pm_newRecordId.Value;
if (objNewRecordId != DBNull.Value)
{
newRecordId = Convert.ToInt32(objNewRecordId);
if (newRecordId > 0)
{
// SP succesfull -> flag telegram "Okay"
telegram.Copied = true;
}
}
}
catch (SqlException ex)
{
Console.WriteLine(ex.ToString());
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
}
return newRecordId;
}
Note: BusLog_01_Insert(Telegram telegram) Non-Async
Here a Log of showing the Inbound messages and the writes to MariaDB
enter image description here
2