How could I cancel tasks that are already running when I click the button again?
The code I’ve sketched out, but it certainly doesn’t work
class TasksManager:
def __init__(self) -> None:
self.state = False
self.list = [self.task_1, self.task_2, self.task_3]
async def task_1(self):
for i in range(10):
await asyncio.sleep(1)
print("Task 1: ", i)
async def task_2(self):
for i in range(10):
await asyncio.sleep(1)
print("Task 2: ", i)
async def task_3(self):
for i in range(10):
await asyncio.sleep(1)
print("Task 3: ", i)
async def click(self, e: ft.TapEvent):
if not self.state:
self.state = True
list = [asyncio.create_task(task()) for task in self.list]
done, pending = await asyncio.wait(list, return_when=asyncio.FIRST_COMPLETED)
print(done, pending)
else:
# здесь нужно отменить корутины
for task in pending:
task.cancel()
def main(page: ft.Page):
manager = TasksManager()
button = ft.TextButton("Click", on_click=manager.click)
page.add(button)
page.update()
ft.app(main)
Tasks must be forcibly stopped
New contributor
root is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.