I’m trying to reset the accumulation of a sum when a desired condition is met, all in a vectorized manner, without using loops. I’ll first show a simple example and then the real problem.
import pandas as pd
import numpy as np
np.random.seed(0)
v = pd.DataFrame()
v['a'] = np.random.randint(10, size=(1000))
The reset condition would be when a: (current value is greater than or equal to the opening value + 5) or when a: (current value is less than or equal to the opening value – 5).
In summary:
((current_value >= opening_value + 5) | (current_value <= opening_value - 5))
I believe this is possible in two parts.
Part 1: Accumulate the difference from the previous value
v['b'] = v['a'].diff().cumsum()
Part 2: Reset the accumulation of the difference whenever the condition is met.
I don’t know how to do part 2.
The intention behind this is to create OHLC candles based on the desired tick variation. I believe this image can help in understanding:
image1
It might be something like this:
image2
This was the simplified problem. However, the real problem is with a dataset of 147.046.963 lines of EURUSD ticks.
I’m loading the data in partitions using dask.dataframe. However, I have the preprocessed data in a .parquet file.
The code I’m using is this:
import dask.dataframe as dd
from decimal import Decimal
import pandas as pd
import numpy as np
import datetime
# eurusd = dd.read_csv('eurusd_ticks.csv', sep='t')
def decimal_df(value):
return Decimal(str(value))
def pre_processing(df, last_bid=None, last_ask=None):
if pd.isna(df.loc[0, '<BID>']):
df.loc[0, '<BID>'] = last_bid
if pd.isna(df.loc[0, '<ASK>']):
df.loc[0, '<ASK>'] = last_ask
df['<BID>'] = df['<BID>'].apply(decimal_df)
df['<ASK>'] = df['<ASK>'].apply(decimal_df)
df.ffill(inplace=True)
date_, time_ = [], []
open, high, low, close = [], [], [], []
ask_open, ask_close = [], []
previous_date = 0
tick_variation = Decimal('0.00005')
candle_is_started = False
last_bid, last_ask = None, None
max_value, min_value = -np.inf, np.inf
npartitions = eurusd.npartitions
for partition in range(npartitions):
time_start = datetime.datetime.now()
chunk = eurusd.get_partition(partition).compute()
pre_processing(chunk, last_bid=last_bid, last_ask=last_ask)
last_bid = chunk.loc[len(chunk)-1, '<BID>']
last_ask = chunk.loc[len(chunk)-1, '<ASK>']
if partition+1 <= npartitions-2:
date_partition = eurusd.get_partition(partition+1).loc[0, '<DATE>'].compute().values[0]
else:
date_partition = chunk.loc[len(chunk)-1, '<DATE>']
for i in range(len(chunk)):
current_date = chunk.loc[i, '<DATE>']
if i+1 <= len(chunk)-2:
next_date = chunk.loc[i+1, '<DATE>']
else:
next_date = date_partition
close_by_date = (current_date!=next_date)
if (current_date != previous_date) or (candle_is_started==False):
start_index = i
bid_open_ = chunk.loc[start_index, '<BID>']
ask_open_ = chunk.loc[start_index, '<ASK>']
date = chunk.loc[start_index, '<DATE>']
time = chunk.loc[start_index, '<TIME>']
candle_is_started = True
bid = chunk.loc[i, '<BID>']
ask = chunk.loc[i, '<ASK>']
if ((bid >= bid_open_+tick_variation) or (bid <= bid_open_-tick_variation) or close_by_date or
(partition==npartitions-1 and i==len(chunk)-1)):
date_.append(date)
time_.append(time)
open.append(float(bid_open_))
high.append(float(np.max((chunk.loc[start_index:i, '<BID>'].max(), max_value))))
low.append(float(np.min((chunk.loc[start_index:i, '<BID>'].min(), min_value))))
close.append(float(bid))
ask_open.append(float(ask_open_))
ask_close.append(float(ask))
candle_is_started = False
previous_date = current_date
max_value, min_value = -np.inf, np.inf
if i == len(chunk)-1 and candle_is_started:
max_value = chunk.loc[start_index:i, '<BID>'].max() # max_value previous partition
min_value = chunk.loc[start_index:i, '<BID>'].min() # min_value previous partition
start_index = 0
time_final = datetime.datetime.now()
print(partition, time_final-time_start)
However, it is a very slow code and may contain errors. I would like to vectorize it and make it more efficient and simpler. Another image for better understanding:
image3
I was unable to reset the accumulation, and accumulate again from the expected index point.
Arthur Stackoverflow is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
1