can you help me to understand how to avoid this error? This is my code:
conn_test = get_db_connection('liq_test')
cursor_test = conn_test.cursor()
conn = get_db_connection(‘liq’)
cursor = conn.cursor()
df_future_decrease.loc[:, 'stress_price'] = df_future_decrease['stress_price'].round(8)
output_db_f=pd.DataFrame(index=range(len(df_future_decrease)))
output_db_f['ENV'] = 'ENX'
output_db_f['ASSET_TYPE'] = 'M' #M per commodities
output_db_f['SCENARIO'] = 'double'
output_db_f['CALC_DT'] = Data
output_db_f['REF_DT'] = Data
output_db_f['STR_PRC'] = df_future_decrease['stress_price'].tolist()
output_db_f['PROD_PRC'] = 0
output_db_f['FILENAME'] = 'future'
output_db_f['SCENUND'] = 'decrease'
output_db_f['SYMBOL'] = df_future_decrease['instr_id'].tolist()
df_option_decrease.loc[:, 'stress_option_price'] = df_option_decrease['stress_option_price'].round(8)
output_db_o=pd.DataFrame(index=range(len(df_option_decrease)))
output_db_o['ENV'] = 'ENX'
output_db_o['ASSET_TYPE'] = 'M' #M per commodities
output_db_o['SCENARIO'] = 'double'
output_db_o['CALC_DT'] = Data
output_db_o['REF_DT'] = Data
output_db_o['STR_PRC'] = df_option_decrease['stress_option_price'].tolist()
output_db_o['PROD_PRC'] = 0
output_db_o['FILENAME'] = 'option'
output_db_o['SCENUND'] = 'decrease'
output_db_o['SYMBOL'] = df_option_decrease['instr_id'].tolist()
output_db = pd.concat([output_db_f, output_db_o]).reset_index(drop=True)
def fetch_table_colnames(cursor, schema, table):
cursor.execute(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE table_schema = '{schema}' AND table_name = '{table}'")
return [row[0] for row in cursor.fetchall()]
table='RMSSTRPRI'
table_columns_test = fetch_table_colnames(cursor_test, 'LIQTOOLDEV', table)
for col in table_columns_test:
if col not in output_db.columns:
output_db[col] = ''
df_filtered_test = output_db[table_columns_test]
Function to handle database insertion
def insert_data(cursor, conn, table, df_filtered):
cursor=cursor_test
conn=conn_test
df_filtered=df_filtered_test
table='LIQTOOLDEV.RMSSTRPRI'
ref_dates = df_filtered['REF_DT'].unique()
for ref_date in ref_dates:
try:
check_query = f"SELECT COUNT(*) FROM {table} WHERE REF_DT = ?"
cursor.execute(check_query, [int(ref_date)])
count = cursor.fetchone()[0]
if count > 0:
delete_query = f"DELETE FROM {table} WHERE REF_DT = ?"
cursor.execute(delete_query, [int(ref_date)])
conn.commit()
except Exception as e:
print(f"ERROR - failed checking/deleting data: {str(e)}")
pass
for index, row_data in df_filtered.iterrows():
columns = ', '.join(row_data.index)
placeholders = ', '.join(['?' for _ in row_data.values])
values = []
for col, val in row_data.items():
if pd.isna(val):
values.append(None)
elif col in ['CALC_DT', 'REF_DT']:
values.append(int(val))
elif col in ['STR_PRC', 'PROD_PRC']:
values.append(float(val))
else:
values.append(str(val).strip())
try:
insert_query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
cursor.execute(insert_query, values)
except Exception as e:
print(f"ERROR - failed inserting new data: {str(e)}")
pass
conn.commit()
insert_data(cursor_test, conn_test, 'LIQTOOLDEV.RMSSTRPRI', df_filtered_test)
I have permission to write on this table ( i checked it) but still i get this error.
Plus i checked that the type of the values in the columns are all right (and they are)… can you help me?
john is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.