I am developing a bot intermediary between the notcoin mini-application and a Telegram channel. My task is to notify my created channel about new pools that appear in the notcoin_bot Telegram bot.
1
What I tried to do
main.py:
from dotenv import load_dotenv
import asyncio
import logging
import bot
load_dotenv()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(bot.start())
bot.py:
import asyncio
import aiohttp
from os import environ
from aiogram import Bot, Dispatcher, types
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart
from aiogram.types import Message
from bs4 import BeautifulSoup
dp = Dispatcher()
additional_message = "Join now to start earning!"
channel_id = ""
sent_announcements = []
check_interval = 60 * 60
#Function triggered by the "/start" command
async def start():
global bot
#Create a Bot instance using the Telegram API token from environment variables
bot = Bot(
environ.get("TOKEN"),
default=DefaultBotProperties(parse_mode=ParseMode.HTML),
)
await bot.delete_webhook(drop_pending_updates=True)
await dp.start_polling(bot)
@dp.message(CommandStart())
#Function handling messages with the "/start" command
async def getUserByID(message: Message):
await message.answer(f"Hello, Ваш ID: {message.from_user.id}")
#Function to fetch and parse pool data
async def fetch_and_parse_pools():
async with aiohttp.ClientSession() as session:
async with session.get("https://t.me/notcoin_bot") as response:
soup = BeautifulSoup(await response.text(), "html.parser")
pools = soup.find_all("div", class_="pool")
for pool in pools:
pool_name = pool.find("h3").text
pool_reward = pool.find("span", class_="reward").text
if pool_name not in sent_announcements:
sent_announcements.append(pool_name)
announcement_text = f"**Новый пул:** {pool_name} | **Награда:** {pool_reward} в час.n{additional_message}"
await bot.send_message(channel_id, announcement_text)
#Function to schedule pool checking (improved version)
async def scheduled(delay):
await asyncio.sleep(delay)
loop = asyncio.get_event_loop()
loop.create_task(fetch_and_parse_pools())
loop = asyncio.get_event_loop()
loop.create_task(scheduled(check_interval))
In the process of coding, I tried to complete the task and tested the code, but apart from the greeting, none of the logic of the code worked.
New contributor
Anton2000 Pavlukhin is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.