My goal is to implement a class and inherited classes that will interface with an API called MetaTrader5 . Specifically, the first class contains the methods for specific data I would to use to calculate and sending orders. The inherited class MeanReversionTrader
, will obtain streams of price information, update values within a dataframe and then send order through the broker class if certain values are calculated for a specific position.
Specifically, I get no output from the generate_signals_and_thin
k method from the MeanReversionTrader
class at the beginning
if resampled_len < self.mean_periods:
print(
'Insufficient data size to calculate logic. Need',
self.mean_periods - resampled_len, 'more.'
)
return
I don’t see this output. Not really sure where I am going wrong
from abc import abstractmethod
import datetime
import pytz
import time
import MetaTrader5 as mt5
import pandas as pd
# Abstract Class for Broker
class Broker(object):
def __init__(self, host, port):
self.host = host
self.port = port
self.__price_event_handler = None
self.__order_event_handler = None
self.__position_event_handler = None
@property
def on_price_event(self):
"""
Listeners will receive:
symbol, bid, ask
"""
return self.__price_event_handler
@on_price_event.setter
def on_price_event(self, event_handler):
self.__price_event_handler = event_handler
@property
def on_order_event(self):
"""
Listeners will receive:
transaction_id
"""
return self.__order_event_handler
@on_order_event.setter
def on_order_event(self, event_handler):
self.__order_event_handler = event_handler
@property
def on_position_event(self):
"""
Listeners will receive:
symbol, is_long, units, unrealized_pnl, pnl
"""
return self.__position_event_handler
@on_position_event.setter
def on_position_event(self, event_handler):
self.__position_event_handler = event_handler
@abstractmethod
def get_prices(self):
"""
Query market prices from a broker
:param symbols: list of symbols recognized by your broker
"""
raise NotImplementedError('Method is required!')
@abstractmethod
def get_prices_and_process(self):
"""
Query market prices from a broker
:param symbols: list of symbols recognized by your broker
"""
raise NotImplementedError('Method is required!')
@abstractmethod
def stream_prices(self, symbols=[]):
""""
Continuously stream prices from a broker.
:param symbols: list of symbols recognized by your broker
"""
raise NotImplementedError('Method is required!')
@abstractmethod
def send_market_order(self, symbol, quantity, is_buy):
raise NotImplementedError('Method is required!')
Here is the ICMarketsBroker class
class ICMarketsBroker(Broker):
def __init__(self, login, password, server):
self.login = login
self.password = password
self.server = server
self.symbol = 'EURUSD'
self.initial_amount = 10e6
self.symbol_info = mt5.symbol_info(self.symbol)
self.buy_signal = None
self.sell_signal = None
self.close_signal = None
self.ticks_df = pd.DataFrame(columns= ['time', 'bid',
'ask', 'last',
'volume', 'time_msc',
'flags', 'volume_real']
)
self.positions_df = pd.DataFrame()
self.upnl = self.get_upnl()
self.current_balance = self.get_current_balance()
self.pnl = self.initial_amount - self.get_current_balance()
self.lot = None
self.bid = None
self.ask = None
def initialize_terminal(self):
# establish connection to the MetaTrader 5 terminal
if not mt5.initialize(login=self.login,
password=self.password,
server=self.server
):
print("initialize() failed, error code =",mt5.last_error())
quit()
else:
print('MT5 Terminal initialized')
def get_prices(self):
self.initialize_terminal()
tick = pd.DataFrame(mt5.symbol_info_tick(self.symbol)._asdict(),
index = range(1),
columns = ['time', 'bid',
'ask', 'last',
'volume', 'time_msc',
'flags', 'volume_real']
)
print(tick)
tick['time'] = pd.to_datetime(tick['time'], unit='s')
self.process_price(tick)
def process_price(self, tick):
# print('Print last price n', self.tick)
# Create and assign symbol, bid and ask variables to pass to listener
# function
self.bid = tick['bid']
self.ask = tick['ask']
symbol = self.symbol
print('Symbol ', self.symbol,' bid ', self.bid, ' ask ', self.ask)
self.on_price_event(symbol, self.bid, self.ask)
def stream_prices(self):
self.initialize_terminal()
# set duration
duration = 600
# set start time
start_time = time.time()
self.tick = pd.DataFrame(mt5.symbol_info_tick(self.symbol)._asdict(),
index = range(1),
columns = ['time', 'bid',
'ask', 'last',
'volume', 'time_msc',
'flags', 'volume_real']
)
self.tick['time'] = pd.to_datetime(self.tick['time'],
unit='s'
)
# print(self.tick)
# Make a condition for the while loop that the current time - start time
# < duration
print('Price stream begins')
#while (time.time() - start_time) < duration :
while True:
price = mt5.symbol_info_tick(self.symbol)._asdict()
price['time'] = pd.to_datetime( price['time'], unit='s')
price['bid'] = price['bid']
price['ask'] = price['ask']
print(
'time ', price['time'],
'bid', price['bid'],
'ask', price['ask']
)
# price['time'] = pd.to_datetime( price['time'], unit='s')
self.ticks_df.loc[len(self.ticks_df)] = price
time.sleep(1)
#self.ticks_df['time'] = pd.to_datetime(self.ticks_df['time'], unit='s')
self.ticks_df.set_index(self.ticks_df['time'], inplace=True)
# print('Price stream ends')
self.process_price()
def send_market_order(self, symbol, quantity, buy_signal, sell_signal):
# establish connection to the MetaTrader 5 terminal
self.initialize_terminal()
# Check if symbol is available
symbol_info = mt5.symbol_info(symbol)
if symbol_info is None:
print(symbol, "not found, can not call order_check()")
mt5.shutdown()
quit()
# if the symbol is unavailable in MarketWatch, add it
if not symbol_info.visible:
print(symbol, "is not visible, trying to switch on")
if not mt5.symbol_select(symbol,True):
print("symbol_select({}}) failed, exit",symbol)
mt5.shutdown()
quit()
# Assign varibles for the request structure for the trade
point = mt5.symbol_info(symbol).point
ask_price = mt5.symbol_info_tick(symbol).ask
bid_price = mt5.symbol_info_tick(symbol).bid
deviation = 20
# Create and assign a dictionary for the order_send
# function for buying and selling
request_buy = {
"action": mt5.TRADE_ACTION_DEAL,
"symbol": symbol,
"volume": quantity,
"type": mt5.ORDER_TYPE_BUY,
"price": ask_price,
"sl": ask_price - 100 * point,
"tp": ask_price + 100 * point,
"deviation": deviation,
"magic": 234000,
"comment": "python script open",
"type_time": mt5.ORDER_TIME_GTC,
"type_filling": mt5.ORDER_FILLING_IOC
}
request_sell = {
"action": mt5.TRADE_ACTION_DEAL,
"symbol": symbol,
"volume": quantity,
"type": mt5.ORDER_TYPE_SELL,
"price": bid_price,
"sl": bid_price + 100 * point,
"tp": bid_price - 100 * point,
"deviation": deviation,
"magic": 234000,
"comment": "python script open",
"type_time": mt5.ORDER_TIME_GTC,
"type_filling": mt5.ORDER_FILLING_IOC
}
if ( buy_signal == True and sell_signal==False) :
result = mt5.order_send(request_buy)
if ( sell_signal == True and buy_signal==False):
result = mt5.order_send(request_sell)
if result.retcode == mt5.TRADE_RETCODE_DONE:
print('order filled')
status = 'FILLED'
else:
print('order not filled, check result output')
status = 'NOT_FILLED'
is_buy = buy_signal
is_sell = sell_signal
self.on_order_event(symbol,
quantity,
is_buy, is_sell,
result, status
)
def get_positions(self):
self.create_positions_df()
symbol = self.symbol
unrealized_pnl = self.get_upnl()
pnl = self.get_current_balance() - self.initial_amount
long_units = (self.positions_df.groupby('type').size()).iloc[0]
short_units = (self.positions_df.groupby('type').size()).iloc[1]
print('Short/sell position count = ', short_units,
'Long/buy positions count = ', long_units)
if long_units > 0:
is_long = True
else:
is_long = False
if short_units > 0:
self.on_position_event(symbol, False, short_units, unrealized_pnl, pnl)
elif long_units > 0:
self.on_position_event(symbol, True, long_units, unrealized_pnl, pnl)
else:
self.on_position_event(symbol, None, 0, unrealized_pnl, pnl)
def create_positions_df(self):
self.initialize_terminal()
if not mt5.positions_get():
print('There are no positions open')
else:
print('Positions open')
self.positions_df = pd.DataFrame( list(mt5.positions_get() ),
columns=mt5.positions_get()[0]._asdict()
)
# The values within the time column of the self.positions_df are changed to
# datetime object by usign the pd.to_datetime() method by passing the following
# arguments of the column name and the the units='s'
self.positions_df['time'] = pd.to_datetime(self.positions_df['time'],
unit = 's'
)
self.positions_df.set_index('time', inplace=True)
def print_positions(self):
print('Initial balance = ', self.initial_amount, self.positions_df)
def df_ticks(self):
return self.ticks_df
def get_upnl(self):
self.initialize_terminal()
account_info=mt5.account_info()
if account_info!=None:
# display trading account data in the form of a dictionary
account_info_dict = mt5.account_info()._asdict()
df=pd.DataFrame(list(account_info_dict.items()),
columns=['property','value']
)
else:
print("failed to connect to trade account , error code = ",
mt5.last_error()
)
# shut down connection to the MetaTrader 5 terminal
df.set_index(df['property'], inplace=True)
profit = df['value'].loc['profit']
return(profit)
def get_current_balance(self):
self.initialize_terminal()
account_info=mt5.account_info()
if account_info!=None:
# display trading account data in the form of a dictionary
account_info_dict = mt5.account_info()._asdict()
df=pd.DataFrame(list(account_info_dict.items()),
columns=['property','value']
)
else:
print("failed to connect to trade account , error code = ",
mt5.last_error()
)
# shut down connection to the MetaTrader 5 terminal
df.set_index(df['property'], inplace=True)
balance = df['value'].loc['balance']
return (balance)
Here are the listeners for the above class and lines to create an instance
broker = ICMarketsBroker(login=login,
password=password,
server=server)
# Price Event Listener
# Listener function
def on_price_event(symbol, bid, ask):
print('PRICE EVENT LISTENER')
print( datetime.datetime.now(), 'Symbol n', symbol)
print('Bid', bid)
print('Ask', ask)
broker.on_price_event = on_price_event
Position event listener for broker class
def on_position_event(symbol, is_long, units, upnl, pnl):
print(
datetime.datetime.now(), '[POSITIONS]',
'symbol ', symbol,
'is_long ', is_long,
'units ', units,
'upnl', upnl,
'pnl ', pnl
)
broker.on_position_event = on_position_event
# Order event listener
def on_order_event(symbol,
quantity,
is_buy, is_sell,
result,
status
):
print(
datetime.datetime.now(), '[ORDER]',
'symbol:', symbol, "n",
'quantity:', quantity, "n",
'is_buy:', is_buy, "n",
'is_sell:', is_sell, "n",
'result: ', result, "n",
'status: ', status
)
broker.on_order_event = on_order_event
Here is the MeanReversionTrader Class that inherits from above
# Mean Reversion Trader Class
import datetime as dt
import pandas as pd
class MeanReversionTrader(object):
def __init__(
self, broker, symbol=None, units=1,
resample_interval='60s', mean_periods=5
):
"""
A trading platform that trades on one side
based on a mean-reverting algorithm.
:param broker: Broker object
:param symbol: A str object recognized by the broker for trading
:param units: Number of units to trade
:param resample_interval:
Frequency for resampling price time series
:param mean_periods: Number of resampled intervals
for calculating the average price
"""
self.broker = self.setup_broker(broker)
self.resample_interval = resample_interval
self.mean_periods = mean_periods
self.symbol = symbol
self.units = units
self.df_prices = pd.DataFrame()
self.pnl, self.upnl = broker.pnl, self.broker.get_upnl()
self.bid_price, self.ask_price = 0, 0
self.position = 0
self.is_order_pending = False
self.is_next_signal_cycle = True
def setup_broker(self, broker):
broker.on_price_event = self.on_price_event
broker.on_order_event = self.on_order_event
broker.on_position_event = self.on_position_event
return broker
def on_price_event(self, symbol, bid, ask):
print(dt.datetime.now(), '[PRICE]', symbol, 'bid:', bid, 'ask:', ask)
self.bid_price = bid
self.ask_price = ask
self.df_prices.loc[pd.Timestamp.now(), symbol] = (bid + ask) / 2.
self.get_positions()
self.generate_signals_and_think()
self.print_state()
def get_positions(self):
try:
self.broker.get_positions()
except Exception as ex:
print('get_positions error:', ex)
def on_order_event(self, symbol, quantity, is_buy, transaction_id, status):
print(
dt.datetime.now(), '[ORDER]',
'transaction_id:', transaction_id,
'status:', status,
'symbol:', symbol,
'quantity:', quantity,
'is_buy:', is_buy,
)
if status == 'FILLED':
self.is_order_pending = False
self.is_next_signal_cycle = False
self.get_positions() # Update positions before thinking
self.generate_signals_and_think()
def on_position_event(self, symbol, is_long, units, upnl, pnl):
if symbol == self.symbol:
self.position = abs(units) * (1 if is_long else -1)
self.upnl = upnl
self.pnl = pnl
self.print_state()
def print_state(self):
print(
dt.datetime.now(), self.symbol, self.position_state,
abs(self.position), 'upnl:', self.upnl, 'pnl:', self.pnl
)
@property
def position_state(self):
if self.position == 0:
return 'FLAT'
if self.position > 0:
return 'LONG'
if self.position < 0:
return 'SHORT'
def generate_signals_and_think(self):
df_resampled = self.df_prices
.resample(self.resample_interval)
.ffill()
.dropna()
resampled_len = len(df_resampled.index)
if resampled_len < self.mean_periods:
print(
'Insufficient data size to calculate logic. Need',
self.mean_periods - resampled_len, 'more.'
)
return
mean = df_resampled.tail(self.mean_periods).mean()[self.symbol]
# Signal flag calculation
is_signal_buy = mean > self.ask_price
is_signal_sell = mean < self.bid_price
print(
'is_signal_buy:', is_signal_buy,
'is_signal_sell:', is_signal_sell,
'average_price: %.5f' % mean,
'bid:', self.bid_price,
'ask:', self.ask_price
)
self.think(is_signal_buy, is_signal_sell)
def think(self, is_signal_buy, is_signal_sell):
if self.is_order_pending:
return
if self.position == 0:
self.think_when_position_flat(is_signal_buy, is_signal_sell)
elif self.position > 0:
self.think_when_position_long(is_signal_sell)
elif self.position < 0:
self.think_when_position_short(is_signal_buy)
def think_when_position_flat(self, is_signal_buy, is_signal_sell):
if is_signal_buy and self.is_next_signal_cycle:
print('Opening position, BUY',
self.symbol, self.units, 'units')
self.is_order_pending = True
self.send_market_order(self.symbol, self.units, is_buy= True)
return
if is_signal_sell and self.is_next_signal_cycle:
print('Opening position, SELL',
self.symbol, self.units, 'units')
self.is_order_pending = True
self.send_market_order(self.symbol, self.units, is_buy= False)
return
if not is_signal_buy and not is_signal_sell:
self.is_next_signal_cycle = True
def think_when_position_long(self, is_signal_sell):
if is_signal_sell:
print('Closing position, SELL',
self.symbol, self.units, 'units')
self.is_order_pending = True
self.send_market_order(self.symbol, self.units, is_buy= False)
def think_when_position_short(self, is_signal_buy):
if is_signal_buy:
print('Closing position, BUY',
self.symbol, self.units, 'units')
self.is_order_pending = True
self.send_market_order(self.symbol, self.units, is_buy= True)
def send_market_order(self, symbol, quantity, is_buy):
buy_signal = None
sell_signal = None
if is_buy == True:
buy_signal == True
sell_signal == False
self.broker.send_market_order(symbol, quantity, buy_signal, sell_signal)
# self, symbol, quantity, buy_signal, sell_signal
if is_buy == False:
buy_signal == False
sell_signal == True
self.broker.send_market_order(symbol, quantity, buy_signal, sell_signal)
def run(self):
self.get_positions()
self.broker.stream_prices()
Here is a sample of output
MT5 Terminal initialized
MT5 Terminal initialized
Positions open
MT5 Terminal initialized
MT5 Terminal initialized
Short/sell position count = 2 Long/buy positions count = 2
2024-07-08 12:48:33.664236 EURUSD SHORT 2 upnl: -0.68 pnl: -468.070000000298
MT5 Terminal initialized
Price stream begins
time 2024-07-08 07:48:32 bid 1.08283 ask 1.08283
time 2024-07-08 07:48:32 bid 1.08283 ask 1.08283