- pythonnet by Python.Runtime 3.0.3 (Installed the winform application in Visual Studio 2022)
- adfuller_test.py (This is my python script file)
I am calling python code from a winform application which works great normally.
The problem that I encounter now is that when I now try to use: “multiprocessing” with:
- concurrent.futures.ProcessPoolExecutor in the “adfuller_test.py”.
- To clarify. If I run the “adfuller_test.py” from CMD. The script works as expected.
The problem happens when I call the below line of code from within my C# application. What happens is that 2 new instances of my winform application is opening which is very strange and is the whole problem. The function called below is also not returning anything.
Notice that the second argument: “2” is “num_cpu_cores”. So many cores to use. That number is how many new winform instances that are opened for the C# application.
dynamic result = pythonModule.adfuller_engle_granger_multicore(new[] { "pair1", "pair2", "pair3" }, 2);
What is causing this and how can we prevent this and make the code work?
Below are all relevant code.
C# code in the winform application:
dynamic pythonModule = null;
public void initializePythonEngine()
{
if (pythonModule == null)
{
PythonEngine.Initialize(); // Initialize the Python engine
PythonEngine.BeginAllowThreads(); // Allow threads to interact with Python
using (Py.GIL())
{
dynamic sys = Py.Import("sys");
string projectDirectory = Directory.GetParent(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location))?.Parent?.FullName;
string adfullerTestPath = Path.Combine(projectDirectory, "adfuller_test");
sys.path.append(adfullerTestPath);
pythonModule = Py.Import("adfuller_test"); // Import adfuller_test module
}
}
}
void function1()
{
try
{
initializePythonEngine();
using (Py.GIL())
{
dynamic result = pythonModule.adfuller_engle_granger_multicore(new[] { "pair1", "pair2", "pair3" }, 2);
}
}
catch (PythonException ex) { MessageBox.Show("Python error: " + ex.Message); }
catch (Exception ex) { MessageBox.Show("C# error: " + ex.Message); }
finally { PythonEngine.Shutdown(); }
}
Python code in: “adfuller_test.py”
import concurrent.futures
import numpy as np
def process_pair(pair):
# Simulate some processing
result = f"Processed {pair}"
return result
def adfuller_engle_granger_multicore(pairs, num_cpu_cores):
# Using ProcessPoolExecutor to parallelize the processing of pairs
optimal_pair_windows = []
with concurrent.futures.ProcessPoolExecutor(max_workers=num_cpu_cores) as executor:
# Map pairs to the process_pair function
future_to_pair = {executor.submit(process_pair, pair): pair for pair in pairs}
for future in concurrent.futures.as_completed(future_to_pair):
result = future.result()
if result is not None:
optimal_pair_windows.append(result)
# Return a combined string of results
return "n".join(optimal_pair_windows)
# Test function
if __name__ == "__main__":
pairs = ["pair1", "pair2", "pair3"]
num_cpu_cores = 2
result = adfuller_engle_granger_multicore(pairs, num_cpu_cores)
print(result)
UPDATED APPROACH USING APPDOMAINS
I am trying to create multiple AppDomains in below code approach in order to run Tasks in parallell.
The function that is called is: function1();
However the code breaks with this error:
“System.ArgumentException: ‘Cannot pass a GCHandle across AppDomains.'”
The code breaks on this line. I am not sure what this means and we can find a solution for the below code to execute the python function: “adfuller_engle_granger_multicore” in parallell from C#?
dynamic result = _pythonModule.adfuller_engle_granger_multicore(
data.SymbolCombination,
data.CombinationList,
data.Series);
[Serializable]
public class SerializableData
{
public List<string> SymbolCombination { get; set; }
public List<string> CombinationList { get; set; }
public List<double[]> Series { get; set; }
}
public class PythonCaller : MarshalByRefObject
{
private dynamic _pythonModule;
public PythonCaller()
{
InitializePythonEngine();
}
private void InitializePythonEngine()
{
if (_pythonModule == null)
{
PythonEngine.Initialize(); // Initialize the Python engine
PythonEngine.BeginAllowThreads(); // Allow threads to interact with Python
using (Py.GIL())
{
dynamic sys = Py.Import("sys");
string projectDirectory = Directory.GetParent(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location))?.Parent?.FullName;
string adfullerTestPath = Path.Combine(projectDirectory, "adfuller_test");
sys.path.append(adfullerTestPath);
_pythonModule = Py.Import("adfuller_test"); // Import adfuller_test module
}
}
}
public string ExecutePythonCode(SerializableData data)
{
using (Py.GIL())
{
dynamic result = _pythonModule.adfuller_engle_granger_multicore(
data.SymbolCombination,
data.CombinationList,
data.Series
);
return result.ToString();
}
}
}
void function1()
{
//Below variables are filled with memories
var combinationDICT = new Dictionary<string, List<string>>(); var combinationLIST = new List<string>(); var symbolCombination = new List<string>(); var SERIES = new List<double[]>();
int num_cpu_cores = 10;
int batchSize = (int)Math.Ceiling((double)combinationDICT.Count / num_cpu_cores);
var tasks = new List<Task>();
var combinationDictList = combinationDICT.ToList(); // Convert dictionary to list for indexed access
for (int i = 0; i < num_cpu_cores; i++)
{
// Create a batch for each core
var batch = combinationDictList.Skip(i * batchSize).Take(batchSize).ToList();
if (batch.Count > 0)
{
tasks.Add(Task.Run(() =>
{
// Create an AppDomain for this batch of tasks
AppDomain newDomain = AppDomain.CreateDomain("PythonExecutionDomain_" + i);
try
{
// Create an instance of PythonCaller in the new AppDomain
PythonCaller pythonCaller = (PythonCaller)newDomain.CreateInstanceAndUnwrap(
typeof(PythonCaller).Assembly.FullName,
typeof(PythonCaller).FullName
);
foreach (var pair in batch)
{
try
{
// Create data for the PythonCaller
List<string> combinationlist = new List<string>(pair.Value);
var data = new SerializableData
{
CombinationList = new List<string>(combinationlist),
SymbolCombination = new List<string>(symbolCombination),
Series = new List<double[]>(SERIES)
};
// Call the method to execute Python code
string result = pythonCaller.ExecutePythonCode(data);
}
catch (Exception ex)
{
// Handle exceptions for individual pair processing
MessageBox.Show($"Error processing pair {pair.Key}: {ex.Message}");
}
}
}
catch (Exception ex)
{
// Handle exceptions for AppDomain creation or interaction
MessageBox.Show($"Error with AppDomain {i}: {ex.Message}");
}
finally
{
// Unload the AppDomain
AppDomain.Unload(newDomain);
}
}));
}
}
// Wait for all tasks to complete
Task.WaitAll(tasks.ToArray());
}
3