Problem Description:
I’m working on a Go application that interfaces with Python via CGO to process store data, i’m using goroutines that because i have more then 4M store .
Issue:the execution get blocked after some iteration.
Code Overview:
Here’s a simplified overview of the relevant code snippets:
Go Code (main package):
defer validation.FinalizePython()
}
func main() {
batches := len(records) / CHUNK_SIZE + 1
var wg sync.WaitGroup
for i := 0; i < batches; i++ {
start := i * CHUNK_SIZE
end := (i + 1) * CHUNK_SIZE
if end > len(records) {
end = len(records)
}
wg.Add(1)
go func(rows [][]string) {
defer wg.Done()
for _, row := range rows {
store := validator.ValidateStore(row)
fmt.Println("Processed store:", store)
resultsChan <- store
}
}(records[start:end])
}
wg.Wait()
close(resultsChan)
<-done
}
package validation
/*
#cgo CFLAGS: -I/usr/local/Cellar/[email protected]/3.8.19/Frameworks/Python.framework/Versions/3.8/include/python3.8
#cgo LDFLAGS: -L/usr/local/Cellar/[email protected]/3.8.19/Frameworks/Python.framework/Versions/3.8/lib -lpython3.8
#include "cleaner_bridge.h"
*/
import "C"
import (
"encoding/json"
"fmt"
"sync"
"unsafe"
"github.com/stores"
type Validator struct {
Headers struct {
ID int
Name int
Address int
Country int
}
}
func (validator *Validator) ValidateStore(fields []string) stores.Store {
InitializePython()
fmt.Println("Validating store", fields)
var store stores.Store
store.ID = GetRowField(fields, validator.Headers.ID)
store.Name = GetRowField(fields, validator.Headers.Name)
store.Address = GetRowField(fields, validator.Headers.Address)
store.Country = GetRowField(fields, validator.Headers.Country)
fmt.Println("to pass to function Cleaned store data", store)
storeCode := C.CString(store.ID)
defer C.free(unsafe.Pointer(storeCode))
storeName := C.CString(store.Name)
defer C.free(unsafe.Pointer(storeName))
storeAddress := C.CString(store.Address)
defer C.free(unsafe.Pointer(storeAddress))
storeCountry := C.CString(store.Country)
defer C.free(unsafe.Pointer(storeCountry))
fmt.Println("Calling C function clean_store")
result := C.clean_store(storeCode, storeName, storeAddress, storeCountry, C.int(0))
defer C.free(unsafe.Pointer(result))
fmt.Println("C function clean_store returned")
if result != nil {
cleanedStore := &stores.Store{}
err := json.Unmarshal([]byte(C.GoString(result)), cleanedStore)
if err != nil {
fmt.Println("Error unmarshalling JSON", err)
}
// Use cleanedStore as needed
}
return store
}
func GetRowField(fields []string, index int) string {
if index >= 0 && index < len(fields) {
return fields[index]
}
return ""
}
var initOnce sync.Once
var finalizeOnce sync.Once
func InitializePython() {
initOnce.Do(func() {
C.initialize_python()
fmt.Println("Python initialized")
})
}
func FinalizePython() {
finalizeOnce.Do(func() {
C.finalize_python()
fmt.Println("Python finalized")
})
}
cleaner_bridge.h
#ifndef CLEANER_BRIDGE_H
#define CLEANER_BRIDGE_H
void initialize_python();
void finalize_python();
const char* clean_store(const char* store_code, const char* store_name, const char* store_address, const char* store_country, int address_pars_flag);
#endif // CLEANER_BRIDGE_H
cleaner_bridge.c
#include "cleaner_bridge.h"
#include <Python.h>
#include <pthread.h>
#include <stdio.h>
static pthread_once_t init_once = PTHREAD_ONCE_INIT;
static pthread_mutex_t gil_lock;
static int python_initialized = 0;
void initialize_python_once() {
Py_Initialize();
pthread_mutex_init(&gil_lock, NULL);
PyRun_SimpleString("import sys");
PyRun_SimpleString("sys.path.append('validation')");
PyRun_SimpleString("import cleaner");
python_initialized = 1;
printf("Python initializedn");
}
void initialize_python() {
pthread_once(&init_once, initialize_python_once);
}
void finalize_python() {
if (python_initialized) {
Py_Finalize();
python_initialized = 0;
printf("Python finalizedn");
}
}
const char* clean_store(const char* store_code, const char* store_name, const char* store_address, const char* store_country, int address_pars_flag) {
pthread_mutex_lock(&gil_lock);
PyGILState_STATE gstate = PyGILState_Ensure();
printf("GIL acquiredn");
PyObject* sysPath = PySys_GetObject("path");
PyObject* currentDir = PyUnicode_FromString("validation");
PyList_Append(sysPath, currentDir);
Py_DECREF(currentDir);
PyObject* pModule = PyImport_ImportModule("cleaner");
if (!pModule) {
PyErr_Print();
PyGILState_Release(gstate);
pthread_mutex_unlock(&gil_lock);
printf("Error importing modulen");
return NULL;
}
PyObject* pFunc = PyObject_GetAttrString(pModule, "clean_store");
if (!pFunc || !PyCallable_Check(pFunc)) {
if (PyErr_Occurred()) PyErr_Print();
Py_DECREF(pFunc);
Py_DECREF(pModule);
PyGILState_Release(gstate);
pthread_mutex_unlock(&gil_lock);
printf("Error accessing functionn");
return NULL;
}
PyObject* pArgs = PyTuple_New(5);
PyTuple_SetItem(pArgs, 0, PyUnicode_FromString(store_code));
PyTuple_SetItem(pArgs, 1, PyUnicode_FromString(store_name));
PyTuple_SetItem(pArgs, 2, PyUnicode_FromString(store_address));
PyTuple_SetItem(pArgs, 3, PyUnicode_FromString(store_country));
PyTuple_SetItem(pArgs, 4, PyLong_FromLong(address_pars_flag));
PyObject* pResult = PyObject_CallObject(pFunc, pArgs);
Py_DECREF(pArgs);
if (!pResult) {
PyErr_Print();
Py_DECREF(pFunc);
Py_DECREF(pModule);
PyGILState_Release(gstate);
pthread_mutex_unlock(&gil_lock);
printf("Error calling functionn");
return NULL;
}
char* result = PyUnicode_AsUTF8(PyUnicode_FromObject(pResult));
Py_DECREF(pResult);
Py_DECREF(pFunc);
Py_DECREF(pModule);
PyGILState_Release(gstate);
pthread_mutex_unlock(&gil_lock);
printf("GIL releasedn");
return result;
}
cleaner.py
def clean_store(store_code, store_name, store_address, store_country, address_pars_flag):
try:
store = {
"store_code": store_code,
"store_name": store_name,
"store_address": store_address,
"store_country": store_country,
"address_pars_flag": address_pars_flag
}
return json.dumps(store)
except Exception as e:
print(f"Error cleaning store: {e}")
return json.dumps({"error": str(e), "store_code": store_code, "store_name": store_name, "store_address": store_address, "store_country": store_country})
Here is some logs :
Python finalized
Connecting to Bigquery...
Connecting to GCS...
Connecting to S3...
Fetching CSV from S3...
Parsing the CSV...
Processing 104 stores
Processed 0 so far
Processed 0 so far
Processed 0 so far
Python initialized
Python initialized
Validating store [100 8378346 Bird In Bush Elsdon Village Green Elsdon GB 55.23311 -2.103 Village Green]
Validating store [30 7085141 Black Bull Godmanchester 32 Post Street, Godmanchester Huntingdon GB 52.32233 -0.1758 32 Post Street, Godmanchester]
Validating store [10 1455570 La Coquille Napoléon Route Napoléon Saint-Jean-Pied-de-Port FR 43.1528 -1.24208 Route Napoléon]
Validating store [20 374314 Sibton White Horse Inn Halesworth Road, Sibton Saxmundham GB 52.27933 1.45492 Halesworth Road, Sibton]
Validating store [60 9145596 The Castle Inn Bradford on Avon 10 Mount Pleasant Bradford on Avon GB 51.35035 -2.24903 10 Mount Pleasant]
to pass to function Cleaned store data {7085141 Black Bull Godmanchester 32 Post Street, Godmanchester Huntingdon GB 52.32233 -0.1758 true 100 [] }
Calling C function clean_store
to pass to function Cleaned store data {8378346 Bird In Bush Elsdon Village Green Elsdon GB 55.23311 -2.103 true 66.66666666666666 [No number exists in the store address] }
Calling C function clean_store
to pass to function Cleaned store data {1455570 La Coquille Napoléon Route Napoléon Saint-Jean-Pied-de-Port FR 43.1528 -1.24208 true 66.66666666666666 [No number exists in the store address] }
Calling C function clean_store
Validating store [40 1334335 The Oakwheel 17 19 Coastal Road, Burniston Scarborough GB 54.32133 -0.44181 17 19 Coastal Road, Burniston]
to pass to function Cleaned store data {374314 Sibton White Horse Inn Halesworth Road, Sibton Saxmundham GB 52.27933 1.45492 inn true 66.66666666666666 [No number exists in the store address] }
Validating store [0 11081738 Real fisherman's cabins in Ballstad, Lofoten - nr. 11, Johnbua 46 Kræmmervikveien Ballstad NO 68.06593 13.53547 Kræmmervikveien 46 Ballstad]
to pass to function Cleaned store data {9145596 The Castle Inn Bradford on Avon 10 Mount Pleasant Bradford on Avon GB 51.35035 -2.24903 inn true 100 [] }
Calling C function clean_store
Calling C function clean_store
Validating store [50 2536808 The Rodney 67 Winwick Road Warrington GB 53.39456 -2.59368 67 Winwick Road]
to pass to function Cleaned store data {1334335 The Oakwheel 17 19 Coastal Road, Burniston Scarborough GB 54.32133 -0.44181 true 100 [] }
Calling C function clean_store
Validating store [70 387155 Turfcutters Arms Main Road Boldre GB 50.80243 -1.47022 Main Road]
to pass to function Cleaned store data {2536808 The Rodney 67 Winwick Road Warrington GB 53.39456 -2.59368 true 100 [] }
Validating store [80 542412 Victoria Inn Sigingstone Cowbridge GB 51.43524 -3.47953 Moorshead Farm Cowbridge UK]
to pass to function Cleaned store data {387155 Turfcutters Arms Main Road Boldre GB 50.80243 -1.47022 true 66.66666666666666 [No number exists in the store address] }
Calling C function clean_store
Validating store [90 340537 The Bulls Head 1 Woodville Road Swadlincote GB 52.78309 -1.51679 1 Woodville Road]
Calling C function clean_store
to pass to function Cleaned store data {11081738 Real fisherman's cabins in Ballstad, Lofoten - nr. 11, Johnbua 46 Kræmmervikveien Ballstad NO 68.06593 13.53547 true 66.66666666666666 [A number exists in the store name] }
Calling C function clean_store
to pass to function Cleaned store data {542412 Victoria Inn Sigingstone Cowbridge GB 51.43524 -3.47953 inn true 66.66666666666666 [No number exists in the store address] }
Calling C function clean_store
to pass to function Cleaned store data {340537 The Bulls Head 1 Woodville Road Swadlincote GB 52.78309 -1.51679 true 100 [] }
Calling C function clean_store
Processed 0 so far
Processed 0 so far
Processed 0 so far
Processed 0 so far
Processed 0 so far store