I made this Telegram bot with python3.11 to control the execution of a python script running on a windows machine.
import logging
# import telegram
from telegram import Update, ReplyKeyboardMarkup, KeyboardButton
from telegram.ext import filters, ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler
import subprocess
import sys
import asyncio
# import psutil
## CONFIG
sys.stdout.reconfigure(line_buffering=True)
sys.stdout.reconfigure(encoding='utf-8')
logging.basicConfig(format='%(asctime)s - %(name)s - %(levellevel)s - %(message)s', level=logging.WARNING)
logger = logging.getLogger(__name__)
AUTHORIZED_USERS = [XXX, XXX]
TOKEN = 'XXX'
bot_status = 'unknown'
# Create a custom keyboard with "Start", "Stop" and "Status" buttons
keyboard = [
[KeyboardButton('Start')],
[KeyboardButton('Stop')],
[KeyboardButton('Status')]
]
reply_markup = ReplyKeyboardMarkup(keyboard, resize_keyboard=True)
script_path = r'script1.1.py'
current_process = None
stop_requested = False
def restricted(func):
async def wrapped(update, context, *args, **kwargs):
user_id = update.effective_user.id
logger.info(f"User ID: {user_id} is trying to access.")
if user_id not in AUTHORIZED_USERS:
logger.warning("Non-authorized access denied for %s.", user_id)
await update.message.reply_text("Sorry, you are not authorised to use this bot.")
return
return await func(update, context, *args, **kwargs)
return wrapped
# Reply to a message if I type /start
@restricted
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a start-up message."""
print("The control bot is online!")
await context.bot.send_message(chat_id=update.effective_chat.id, text="The control bot is online!", reply_markup=reply_markup)
# Reply when typing a message
@restricted
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Check if the message is one of the custom buttons
print(f"Message received : {update.message.text}")
if update.message.text == 'Start':
await run_script(update, context, script_path)
elif update.message.text == 'Stop':
await stop_command(update, context)
elif update.message.text == 'Status':
await status(update, context)
else:
# Displays the keyboard after any message
await context.bot.send_message(chat_id=update.effective_chat.id, text=update.message.text, reply_markup=reply_markup)
@restricted
async def run_script(update: Update, context: ContextTypes.DEFAULT_TYPE, script_path):
"""Run the Python script and send real-time output to Telegram."""
global current_process, stop_requested
stop_requested = False
print(f"Launching the script : {script_path}")
await context.bot.send_message(chat_id=update.effective_chat.id, text=f"Launching the script : {script_path}")
command = [sys.executable, script_path]
current_process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
print(f"Process running with PID : {current_process.pid}")
async def send_output(stream, update, context, stream_name):
while True:
line = await stream.readline()
if not line:
break
try:
decoded_output = line.decode('utf-8', errors='ignore').strip()
print(f"{stream_name}: {decoded_output}")
await context.bot.send_message(chat_id=update.effective_chat.id, text=f"{stream_name}: {decoded_output}")
except UnicodeDecodeError as e:
print(f"Decoding error : {e}")
if stop_requested:
print(f"{stream_name}: Process stopped by the user.")
await context.bot.send_message(chat_id=update.effective_chat.id, text=f"{stream_name}: Process stopped by the user.")
await asyncio.gather(
send_output(current_process.stdout, update, context, "STDOUT"),
send_output(current_process.stderr, update, context, "STDERR")
)
return_code = await current_process.wait()
current_process = None
print(f"Script completed with return code : {return_code}")
if return_code != 0 and not stop_requested:
await context.bot.send_message(chat_id=update.effective_chat.id, text=f'Script terminated with errors, return code : {return_code}', reply_markup=reply_markup)
elif stop_requested:
await context.bot.send_message(chat_id=update.effective_chat.id, text=f'Script stopped by the user.', reply_markup=reply_markup)
else:
await context.bot.send_message(chat_id=update.effective_chat.id, text=f'Script completed successfully, return code : {return_code}', reply_markup=reply_markup)
@restricted
async def stop_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Stop the current process."""
global current_process, stop_requested
print("Stop button clicked.")
if current_process is not None:
try:
# Runs taskkill and captures standard and error output
result = subprocess.run(['taskkill', '/F', '/T', '/PID', str(current_process.pid)], capture_output=True, text=True, check=True)
stop_requested = True
print("Process killed.")
await update.message.reply_text('Process killed.')
current_process = None
# Displays taskkill output
print("Output of taskkill :")
print(result.stdout)
print(result.stderr)
except subprocess.CalledProcessError as e:
print(f"Error when stopping the process: {e}")
await update.message.reply_text(f"Error when stopping the process: {e}")
else:
print("No processes running.")
await update.message.reply_text('No running processes.')
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Return the current state of the main bot."""
global current_process
status_message = 'running' if current_process else 'stopped'
print(f"Bot status : {status_message}")
await update.message.reply_text(f'Bot status : {status_message}')
def error(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Manage Update errors."""
logger.warning('Update "%s" caused error "%s"', update, context.error)
def main():
"""Start the bot."""
print("Starting the bot...")
application = ApplicationBuilder().token(TOKEN).build()
# Handler for start
start_handler = CommandHandler('start', start_command)
application.add_handler(start_handler)
stop_handler = CommandHandler('stop', stop_command)
application.add_handler(stop_handler)
echo_handler = MessageHandler(filters.TEXT & (~filters.COMMAND), echo)
application.add_handler(echo_handler)
# Add an error handler
application.add_error_handler(error)
# Start the bot
application.run_polling()
if __name__ == '__main__':
main()
When i click on the Stop button, I want to kill the script1.1.py process immediately.
My problem is that this script is continuing to execute until the end, and receives my Stop signal only when it is already finished.
I’m able to kill the process with taskkill, from the terminal, or from another python script.
Maybe i made a wrong use of async/await.
Can some help me to understand how to kill it immediately, please?
Fokda is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.