How can I invoke the think method to analyse streams of price data?

My goal is to implement a class and inherited classes that will interface with an API called MetaTrader5 . Specifically, the first class contains the methods for specific data I would to use to calculate and sending orders. The inherited class MeanReversionTrader, will obtain streams of price information, update values within a dataframe and then send order through the broker class if certain values are calculated for a specific position.

Specifically, I get no output from the generate_signals_and_think method from the MeanReversionTrader class at the beginning

if resampled_len < self.mean_periods:
            print(
                'Insufficient data size to calculate logic. Need',
                self.mean_periods - resampled_len, 'more.'
            )
            return

I don’t see this output. Not really sure where I am going wrong

from abc import abstractmethod
import datetime
import pytz
import time
import MetaTrader5 as mt5
import pandas as pd

# Abstract Class for Broker

class Broker(object):
    def __init__(self, host, port):
        self.host = host
        self.port = port

        self.__price_event_handler = None
        self.__order_event_handler = None
        self.__position_event_handler = None

    @property
    def on_price_event(self):
        """
        Listeners will receive:
        symbol, bid, ask
        """
        return self.__price_event_handler

    @on_price_event.setter
    def on_price_event(self, event_handler):
        self.__price_event_handler = event_handler

    @property
    def on_order_event(self):
        """
        Listeners will receive:
        transaction_id
        """
        return self.__order_event_handler

    @on_order_event.setter
    def on_order_event(self, event_handler):
        self.__order_event_handler = event_handler

    @property
    def on_position_event(self):
        """
        Listeners will receive:
        symbol, is_long, units, unrealized_pnl, pnl
        """
        return self.__position_event_handler

    @on_position_event.setter
    def on_position_event(self, event_handler):
        self.__position_event_handler = event_handler

    @abstractmethod
    def get_prices(self):
        """
        Query market prices from a broker
        :param symbols: list of symbols recognized by your broker
        """
        raise NotImplementedError('Method is required!')
    
    @abstractmethod
    def get_prices_and_process(self):
        """
        Query market prices from a broker
        :param symbols: list of symbols recognized by your broker
        """
        raise NotImplementedError('Method is required!')

    @abstractmethod
    def stream_prices(self, symbols=[]):
        """"
        Continuously stream prices from a broker.
        :param symbols: list of symbols recognized by your broker
        """
        raise NotImplementedError('Method is required!')

    @abstractmethod
    def send_market_order(self, symbol, quantity, is_buy):
        raise NotImplementedError('Method is required!')

Here is the ICMarketsBroker class

class ICMarketsBroker(Broker):

    def __init__(self, login, password, server):

        self.login = login
        self.password = password
        self.server = server
        self.symbol = 'EURUSD'
        self.initial_amount = 10e6
        self.symbol_info = mt5.symbol_info(self.symbol)

        self.buy_signal = None
        self.sell_signal = None
        self.close_signal = None
        self.ticks_df = pd.DataFrame(columns= ['time', 'bid', 
                                               'ask', 'last', 
                                               'volume', 'time_msc', 
                                               'flags', 'volume_real']
                                    )

        self.positions_df = pd.DataFrame()
        self.upnl = self.get_upnl()
        self.current_balance = self.get_current_balance()
        self.pnl = self.initial_amount - self.get_current_balance()
        self.lot = None
        self.bid = None
        self.ask = None
        
    def initialize_terminal(self):
        # establish connection to the MetaTrader 5 terminal
        if not mt5.initialize(login=self.login, 
                              password=self.password, 
                              server=self.server
                             ):
            print("initialize() failed, error code =",mt5.last_error())
            quit()
        else:
            print('MT5 Terminal initialized')
    
    def get_prices(self):
        
        self.initialize_terminal()
        tick = pd.DataFrame(mt5.symbol_info_tick(self.symbol)._asdict(), 
                                index = range(1), 
                                columns = ['time', 'bid', 
                                           'ask', 'last',
                                           'volume', 'time_msc',
                                           'flags', 'volume_real']
                                           )
        print(tick)
        tick['time'] = pd.to_datetime(tick['time'], unit='s')

        self.process_price(tick)

    def process_price(self, tick):
        # print('Print last price n', self.tick)

        # Create and assign symbol, bid and ask variables to pass to listener
        # function
        self.bid = tick['bid']
        self.ask = tick['ask']
        symbol = self.symbol
        print('Symbol ', self.symbol,' bid ', self.bid, ' ask ', self.ask)
        self.on_price_event(symbol, self.bid, self.ask)

    def stream_prices(self):
        self.initialize_terminal()
        # set duration
        duration = 600

        # set start time
        start_time = time.time()

        self.tick = pd.DataFrame(mt5.symbol_info_tick(self.symbol)._asdict(), 
                                index = range(1), 
                                columns = ['time', 'bid', 
                                           'ask', 'last',
                                           'volume', 'time_msc',
                                           'flags', 'volume_real']
                                )

        
        self.tick['time'] = pd.to_datetime(self.tick['time'], 
                                           unit='s'
                                          )
        # print(self.tick)
        # Make a condition for the while loop that the current time - start time
        # < duration
        print('Price stream begins')
        #while (time.time() - start_time) < duration :
        while True:
            price = mt5.symbol_info_tick(self.symbol)._asdict()
            price['time'] = pd.to_datetime( price['time'], unit='s')
            price['bid'] = price['bid']
            price['ask'] = price['ask']
            print(
                'time ', price['time'],
                'bid', price['bid'], 
                'ask', price['ask'] 
            )
        
            # price['time'] = pd.to_datetime( price['time'], unit='s')
            self.ticks_df.loc[len(self.ticks_df)] = price
            time.sleep(1)
        
        #self.ticks_df['time'] = pd.to_datetime(self.ticks_df['time'], unit='s')
        self.ticks_df.set_index(self.ticks_df['time'], inplace=True)
        # print('Price stream ends')

        self.process_price()

    def send_market_order(self, symbol, quantity, buy_signal, sell_signal):
 
        # establish connection to the MetaTrader 5 terminal
        self.initialize_terminal()

        # Check if symbol is available
        symbol_info = mt5.symbol_info(symbol)
        if symbol_info is None:
            print(symbol, "not found, can not call order_check()")
            mt5.shutdown()
            quit()

        # if the symbol is unavailable in MarketWatch, add it
        if not symbol_info.visible:
            print(symbol, "is not visible, trying to switch on")
            if not mt5.symbol_select(symbol,True):
                print("symbol_select({}}) failed, exit",symbol)
                mt5.shutdown()
                quit()

        # Assign varibles for the request structure for the trade
        point = mt5.symbol_info(symbol).point
        ask_price = mt5.symbol_info_tick(symbol).ask
        bid_price = mt5.symbol_info_tick(symbol).bid
        deviation = 20

        # Create and assign a dictionary for the order_send
        # function for buying and selling
        request_buy = {
            "action": mt5.TRADE_ACTION_DEAL,
            "symbol": symbol,
            "volume": quantity,
            "type": mt5.ORDER_TYPE_BUY,
            "price": ask_price,
            "sl": ask_price - 100 * point,
            "tp": ask_price + 100 * point,
            "deviation": deviation,
            "magic": 234000,
            "comment": "python script open",
            "type_time": mt5.ORDER_TIME_GTC,
            "type_filling": mt5.ORDER_FILLING_IOC
            }

        request_sell = {
            "action": mt5.TRADE_ACTION_DEAL,
            "symbol": symbol,
            "volume": quantity,
            "type": mt5.ORDER_TYPE_SELL,
            "price": bid_price, 
            "sl": bid_price + 100 * point,
            "tp": bid_price - 100 * point,
            "deviation": deviation,
            "magic": 234000,
            "comment": "python script open",
            "type_time": mt5.ORDER_TIME_GTC,
            "type_filling": mt5.ORDER_FILLING_IOC
            }
        
        if ( buy_signal == True and sell_signal==False) :
            result = mt5.order_send(request_buy)
   

        if ( sell_signal == True and buy_signal==False):
            result = mt5.order_send(request_sell)

        
        if result.retcode == mt5.TRADE_RETCODE_DONE:
            print('order filled')
            status = 'FILLED'
        else:
            print('order not filled, check result output')
            status = 'NOT_FILLED'
        
        is_buy = buy_signal
        is_sell = sell_signal

        self.on_order_event(symbol, 
                            quantity, 
                            is_buy, is_sell, 
                            result, status
                           )
    
    def get_positions(self):
        
        self.create_positions_df()
        
        symbol = self.symbol
        unrealized_pnl = self.get_upnl()
        pnl = self.get_current_balance() - self.initial_amount

        
        long_units = (self.positions_df.groupby('type').size()).iloc[0]
        short_units = (self.positions_df.groupby('type').size()).iloc[1]
        print('Short/sell position count = ', short_units, 
                 'Long/buy positions count = ', long_units)
            
        if long_units > 0:
            is_long = True
        else:
            is_long = False

        if short_units > 0:
            self.on_position_event(symbol, False, short_units, unrealized_pnl, pnl)
        elif long_units > 0:
            self.on_position_event(symbol, True, long_units, unrealized_pnl, pnl)
        else:
            self.on_position_event(symbol, None, 0, unrealized_pnl, pnl)

    def create_positions_df(self):

        self.initialize_terminal()

        if not mt5.positions_get():
            print('There are no positions open')
        else:
            print('Positions open')
            self.positions_df = pd.DataFrame( list(mt5.positions_get() ),
                                             columns=mt5.positions_get()[0]._asdict()
                                             )
            # The values within the time column of the self.positions_df are changed to
            # datetime object by usign the pd.to_datetime() method by passing the following
            # arguments of the column name and the the units='s'
            self.positions_df['time'] = pd.to_datetime(self.positions_df['time'], 
                                                       unit = 's'
                                                      )
            self.positions_df.set_index('time', inplace=True)

    def print_positions(self):
        print('Initial balance = ', self.initial_amount, self.positions_df)

    def df_ticks(self):
        return self.ticks_df
    
    def get_upnl(self):
        self.initialize_terminal()

        account_info=mt5.account_info()
        
        if account_info!=None:
       
        # display trading account data in the form of a dictionary
        
            account_info_dict = mt5.account_info()._asdict()

            df=pd.DataFrame(list(account_info_dict.items()),
                            columns=['property','value']
                            )
    
        else:
            print("failed to connect to trade account , error code = ", 
                  mt5.last_error()
                  )

        # shut down connection to the MetaTrader 5 terminal
        df.set_index(df['property'], inplace=True)
        profit = df['value'].loc['profit']

        return(profit)
    
    def get_current_balance(self):
        self.initialize_terminal()

        account_info=mt5.account_info()
        
        if account_info!=None:
       
        # display trading account data in the form of a dictionary
        
            account_info_dict = mt5.account_info()._asdict()

            df=pd.DataFrame(list(account_info_dict.items()),
                            columns=['property','value']
                            )
    
        else:
            print("failed to connect to trade account , error code = ", 
                  mt5.last_error()
                  )

        # shut down connection to the MetaTrader 5 terminal
        df.set_index(df['property'], inplace=True)
        balance = df['value'].loc['balance']
        
        return (balance)

Here are the listeners for the above class and lines to create an instance

broker = ICMarketsBroker(login=login, 
                         password=password, 
                         server=server)

# Price Event Listener

# Listener function
def on_price_event(symbol, bid, ask):
    print('PRICE EVENT LISTENER')
    print( datetime.datetime.now(), 'Symbol n', symbol)
    print('Bid', bid)
    print('Ask', ask)

broker.on_price_event = on_price_event

Position event listener for broker class

def on_position_event(symbol, is_long, units, upnl, pnl):
    print(
        datetime.datetime.now(), '[POSITIONS]', 
        'symbol ', symbol,
        'is_long ', is_long, 
        'units ', units, 
        'upnl', upnl,
        'pnl ', pnl
    )

broker.on_position_event = on_position_event

# Order event listener
def on_order_event(symbol,
                   quantity, 
                   is_buy, is_sell, 
                   result,
                   status
                   ):
    print(
        datetime.datetime.now(), '[ORDER]',
        'symbol:', symbol, "n", 
        'quantity:', quantity,  "n", 
        'is_buy:', is_buy,  "n", 
        'is_sell:', is_sell,  "n", 
        'result: ', result,  "n", 
        'status: ', status
    )

broker.on_order_event = on_order_event

Here is the MeanReversionTrader Class that inherits from above

# Mean Reversion Trader Class
import datetime as dt
import pandas as pd


class MeanReversionTrader(object):
    def __init__(
        self, broker, symbol=None, units=1,
        resample_interval='60s', mean_periods=5
    ):
        """
        A trading platform that trades on one side
            based on a mean-reverting algorithm.

        :param broker: Broker object
        :param symbol: A str object recognized by the broker for trading
        :param units: Number of units to trade
        :param resample_interval:
            Frequency for resampling price time series
        :param mean_periods: Number of resampled intervals
            for calculating the average price
        """
        self.broker = self.setup_broker(broker)

        self.resample_interval = resample_interval
        self.mean_periods = mean_periods
        self.symbol = symbol
        self.units = units

        self.df_prices = pd.DataFrame()
        self.pnl, self.upnl = broker.pnl, self.broker.get_upnl()

        self.bid_price, self.ask_price = 0, 0
        self.position = 0
        self.is_order_pending = False
        self.is_next_signal_cycle = True

    def setup_broker(self, broker):
        broker.on_price_event = self.on_price_event
        broker.on_order_event = self.on_order_event
        broker.on_position_event = self.on_position_event
        return broker

    def on_price_event(self, symbol, bid, ask):
        print(dt.datetime.now(), '[PRICE]', symbol, 'bid:', bid, 'ask:', ask)

        self.bid_price = bid
        self.ask_price = ask
        self.df_prices.loc[pd.Timestamp.now(), symbol] = (bid + ask) / 2.

        self.get_positions()
        self.generate_signals_and_think()

        self.print_state()

    def get_positions(self):
        try:
            self.broker.get_positions()
        except Exception as ex:
            print('get_positions error:', ex)

    def on_order_event(self, symbol, quantity, is_buy, transaction_id, status):
        print(
            dt.datetime.now(), '[ORDER]',
            'transaction_id:', transaction_id,
            'status:', status,
            'symbol:', symbol,
            'quantity:', quantity,
            'is_buy:', is_buy,
        )
        if status == 'FILLED':
            self.is_order_pending = False
            self.is_next_signal_cycle = False

            self.get_positions()  # Update positions before thinking
            self.generate_signals_and_think()

    def on_position_event(self, symbol, is_long, units, upnl, pnl):
        if symbol == self.symbol:
            self.position = abs(units) * (1 if is_long else -1)
            self.upnl = upnl
            self.pnl = pnl
            self.print_state()

    def print_state(self):
        print(
            dt.datetime.now(), self.symbol, self.position_state,
            abs(self.position), 'upnl:', self.upnl, 'pnl:', self.pnl
        )

    @property
    def position_state(self):
        if self.position == 0:
            return 'FLAT'
        if self.position > 0:
            return 'LONG'
        if self.position < 0:
            return 'SHORT'

    def generate_signals_and_think(self):
        df_resampled = self.df_prices
            .resample(self.resample_interval)
            .ffill()
            .dropna()
        resampled_len = len(df_resampled.index)

        if resampled_len < self.mean_periods:
            print(
                'Insufficient data size to calculate logic. Need',
                self.mean_periods - resampled_len, 'more.'
            )
            return

        mean = df_resampled.tail(self.mean_periods).mean()[self.symbol]

        # Signal flag calculation
        is_signal_buy = mean > self.ask_price
        is_signal_sell = mean < self.bid_price

        print(
            'is_signal_buy:', is_signal_buy,
            'is_signal_sell:', is_signal_sell,
            'average_price: %.5f' % mean,
            'bid:', self.bid_price,
            'ask:', self.ask_price
        )

        self.think(is_signal_buy, is_signal_sell)

    def think(self, is_signal_buy, is_signal_sell):
        if self.is_order_pending:
            return

        if self.position == 0:
            self.think_when_position_flat(is_signal_buy, is_signal_sell)
        elif self.position > 0:
            self.think_when_position_long(is_signal_sell)
        elif self.position < 0: 
            self.think_when_position_short(is_signal_buy)        

    def think_when_position_flat(self, is_signal_buy, is_signal_sell):
        if is_signal_buy and self.is_next_signal_cycle:
            print('Opening position, BUY', 
                  self.symbol, self.units, 'units')
            self.is_order_pending = True
            self.send_market_order(self.symbol, self.units, is_buy= True)
            return

        if is_signal_sell and self.is_next_signal_cycle:
            print('Opening position, SELL', 
                  self.symbol, self.units, 'units')
            self.is_order_pending = True
            self.send_market_order(self.symbol, self.units, is_buy= False)
            return

        if not is_signal_buy and not is_signal_sell:
            self.is_next_signal_cycle = True

    def think_when_position_long(self, is_signal_sell):
        if is_signal_sell:
            print('Closing position, SELL', 
                  self.symbol, self.units, 'units')
            self.is_order_pending = True
            self.send_market_order(self.symbol, self.units, is_buy= False)

    def think_when_position_short(self, is_signal_buy):
        if is_signal_buy:
            print('Closing position, BUY', 
                  self.symbol, self.units, 'units')
            self.is_order_pending = True
            self.send_market_order(self.symbol, self.units, is_buy= True)

    def send_market_order(self, symbol, quantity, is_buy):
        buy_signal = None
        sell_signal = None
        
        if is_buy == True:
            buy_signal == True
            sell_signal == False
            self.broker.send_market_order(symbol, quantity, buy_signal, sell_signal)
            # self, symbol, quantity, buy_signal, sell_signal
        if is_buy == False:
            buy_signal == False
            sell_signal == True
            self.broker.send_market_order(symbol, quantity, buy_signal, sell_signal)

    def run(self):
        self.get_positions() 
        self.broker.stream_prices()

Here is a sample of output

MT5 Terminal initialized
MT5 Terminal initialized
Positions open
MT5 Terminal initialized
MT5 Terminal initialized
Short/sell position count =  2 Long/buy positions count =  2
2024-07-08 12:48:33.664236 EURUSD SHORT 2 upnl: -0.68 pnl: -468.070000000298
MT5 Terminal initialized
Price stream begins
time  2024-07-08 07:48:32 bid 1.08283 ask 1.08283
time  2024-07-08 07:48:32 bid 1.08283 ask 1.08283

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật