The problem
Hey Guys and Gals
So, I’ve been wrestling with this script that’s supposed to work with a database. The database has these columns: open, high, low, close volume, and unix_timestamp. I’m trying to add some preprocessed indicators to the script, but it’s throwing this traceback error at me.
I really need to sort this out because I’m moving at the end of the semester and I want to have everything ready before then. Any help or ideas would be super appreciated!
Thanks!
These are the traceback errors:
Traceback (most recent call last):
File “C:UsersPycharmProjectspythonProject2processing 2.py”, line 369, in process_data
df = calculate_cycles(df, open_arr, high_arr, low_arr, close_arr)
File “C:UsersPycharmProjectspythonProject2processing 2.py”, line 55, in calculate_cycles
cdl_inside_df = ta.cdl_inside(open_arr, high_arr, low_arr, close_arr)
File “C:UsersAppDataLocalProgramsPythonPython310libsite-packagespandas_tacandlescdl_inside.py”, line 16, in cdl_inside
inside = (high.diff() < 0) & (low.diff() > 0)
AttributeError: ‘NoneType’ object has no attribute ‘diff’
Error processing data: ‘NoneType’ object has no attribute ‘diff’
Loading data from D:New folderdataNew folder – CopyNew folderoutputNew folderdata_1.parquet
This is the script
def load_data(file_path):
try:
print(f"Loading data from {file_path}")
# Load data using Dask
df = dd.read_parquet(file_path, engine='pyarrow', blocksize=500, columns=['open', 'high', 'low', 'close',
'volume'])
# Convert columns to numeric, handling errors by coercing
for col in ['open', 'high', 'low', 'close']:
df[col] = df[col].astype('float32')
# Fill NaN values with the mean of the column
for col in ['open', 'high', 'low', 'close']:
df[col] = df[col].fillna(df[col].mean())
# Convert to NumPy ndarrays
open_arr = df['open'].compute().to_numpy()
high_arr = df['high'].compute().to_numpy()
low_arr = df['low'].compute().to_numpy()
close_arr = df['close'].compute().to_numpy()
volume_arr = df['volume'].compute().to_numpy()
return df, open_arr, high_arr, low_arr, close_arr, volume_arr
except Exception as e:
print(f"Error loading data from {file_path}: {e}")
traceback.print_exc()
return None, None, None, None, None, None
# Function to process each file
def calculate_cycles(df, open_arr, high_arr, low_arr, close_arr):
if open_arr is not None and high_arr is not None and low_arr is not None and close_arr is not None:
ebsw_df = ta.ebsw(close_arr)
cdl_doji_df = ta.cdl_doji(open_arr, high_arr, low_arr, close_arr)
cdl_inside_df = ta.cdl_inside(open_arr, high_arr, low_arr, close_arr)
cdl_pattern_df = ta.cdl_pattern(open_arr, high_arr, low_arr, close_arr)
df['ebsw_EBSW'] = ebsw_df
df['CDL_DOJI_CDL_DOJI'] = cdl_doji_df
df['CDL_INSIDE_CDL_INSIDE'] = cdl_inside_df
df['CDL_PATTERN_CDL_PATTERN'] = cdl_pattern_df
return df
else:
return df
def process_data(file_path):
try:
df, open_arr, high_arr, low_arr, close_arr, volume_arr = load_data(file_path)
df = calculate_cycles(df, open_arr, high_arr, low_arr, close_arr)
# Fill NaN values with 0
df = df.fillna(0)
return df
except Exception as e:
print(f"Error processing data: {e}")
traceback.print_exc()
return None
if __name__ == '__main__':
# Directory to walk
file_path = r"D:New folderdataNew folder - CopyNew folderoutputNew folder"
# Prepare a list of files to process
parquet_files = [os.path.join(root, file)
for root, dirs, files in os.walk(file_path)
for file in files if file.endswith('.parquet')]
# Process files in chunks of 100
chunk_size = 2
for i in range(0, len(parquet_files), chunk_size):
chunk_files = parquet_files[i:i + chunk_size]
# Process files in parallel for each chunk
with ThreadPoolExecutor(32) as executor:
for file_path in chunk_files:
df = process_data(file_path)
if df is not None:
df.to_parquet(file_path.replace('.parquet', '_processed.parquet'), compression="zstd",
engine="pyarrow",
index=True)
gc.collect() # Force garbage collection
# Sleep for 10 seconds to avoid rate limiting
time.sleep(5)