diff --git a/app/core/client.py b/app/core/client.py index 209fe67..6728fc9 100644 --- a/app/core/client.py +++ b/app/core/client.py @@ -35,7 +35,11 @@ class BOT(Client): def the_decorator(func): @wraps(func) def wrapper(): - Config.CMD_DICT[cmd] = func + if isinstance(cmd, list): + for _cmd in cmd: + Config.CMD_DICT[_cmd] = func + else: + Config.CMD_DICT[cmd] = func wrapper() return func @@ -48,7 +52,7 @@ class BOT(Client): await self.set_filter_list() await aiohttp_tools.session_switch() await self.edit_restart_msg() - await self.log(text="#Social-dl\n__Started__") + await self.log(text="#SocialDL\nStarted") print("started") await idle() await aiohttp_tools.session_switch() @@ -73,15 +77,23 @@ class BOT(Client): importlib.import_module(py_name) async def log( - self, text, chat=None, func=None, name="log.txt", disable_web_page_preview=True + self, + text="", + traceback="", + chat=None, + func=None, + name="log.txt", + disable_web_page_preview=True, + parse_mode=ParseMode.HTML, ): - if chat or func: - text = f"Function: {func}\nChat: {chat}\nTraceback:\n{text}" + if traceback: + text = f"#Traceback\nFunction: {func}\nChat: {chat}\nTraceback:\n{traceback}" return await self.send_message( chat_id=Config.LOG_CHAT, text=text, name=name, disable_web_page_preview=disable_web_page_preview, + parse_mode=parse_mode, ) async def restart(self): diff --git a/app/core/message.py b/app/core/message.py index b13cab2..b83822a 100755 --- a/app/core/message.py +++ b/app/core/message.py @@ -1,6 +1,7 @@ import asyncio from functools import cached_property +from pyrogram.errors import MessageDeleteForbidden from pyrogram.types import Message as MSG from app import Config @@ -83,9 +84,12 @@ class Message(MSG): return reply async def delete(self, reply=False): - await super().delete() - if reply and self.replied: - await self.replied.delete() + try: + await super().delete() + if reply and self.replied: + await self.replied.delete() + except MessageDeleteForbidden: + pass async def async_deleter(self, del_in, task, block): if block: diff --git a/app/core/shell.py b/app/core/shell.py index 2375131..c543caa 100755 --- a/app/core/shell.py +++ b/app/core/shell.py @@ -27,34 +27,32 @@ async def run_shell_cmd(cmd): class AsyncShell: - full_std = "" - is_not_completed = True def __init__(self, process): self.process = process + self.full_std = "" + self.is_done = False - async def get_output(self): + async def read_output(self): while True: - # Check output and stop loop if it's emtpy line = (await self.process.stdout.readline()).decode("utf-8") if not line: break self.full_std += line - # Let the Subprocess complete and let it shut down + self.is_done = True await self.process.wait() - self.is_not_completed = False + + async def get_output(self): + while not self.is_done: + yield self.full_std @classmethod - async def run_cmd(cls, cmd): - # Create Subprocess and initialise self using cls + async def run_cmd(cls, cmd, name="shell"): sub_process = cls( process=await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT ) ) - # Start Checking output but don't block code by awaiting it. - asyncio.create_task(sub_process.get_output()) - # Sleep for a short time to let previous task start + asyncio.create_task(sub_process.read_output()) await asyncio.sleep(0.5) - # Return Self object return sub_process diff --git a/app/plugins/dev_tools.py b/app/plugins/dev_tools.py index 95a0726..85accd6 100644 --- a/app/plugins/dev_tools.py +++ b/app/plugins/dev_tools.py @@ -15,46 +15,52 @@ from app.core import aiohttp_tools as aio # isort:skip # Run shell commands async def run_cmd(bot, message): cmd = message.input.strip() - status_ = await message.reply("executing...") - proc_stdout = await shell.run_shell_cmd(cmd) + reply = await message.reply("executing...") + try: + proc_stdout = await asyncio.Task( + shell.run_shell_cmd(cmd), name=f"{message.chat.id}-{reply.id}" + ) + except asyncio.exceptions.CancelledError: + return await reply.edit("`Cancelled...`") output = f"`${cmd}`\n\n`{proc_stdout}`" - return await status_.edit(output, name="sh.txt", disable_web_page_preview=True) + return await reply.edit(output, name="sh.txt", disable_web_page_preview=True) # Shell but Live Output async def live_shell(bot, message): - cmd = message.input.strip() - sub_process = await shell.AsyncShell.run_cmd(cmd) - reply = await message.reply("`getting live output....`") - output = "" - sleep_for = 1 - while sub_process.is_not_completed: - # Edit message only when there's new output. - if output != sub_process.full_std: - output = sub_process.full_std - if len(output) <= 4096: - await reply.edit( - f"`{output}`", - disable_web_page_preview=True, - parse_mode=ParseMode.MARKDOWN, - ) - # Reset sleep duration - if sleep_for >= 5: - sleep_for = 1 - # Sleep to Unblock running loop and let output reader read new - # output. - await asyncio.sleep(sleep_for) - sleep_for += 1 - # If the subprocess is finished edit the message with cmd and full - # output - return await reply.edit( - f"`$ {cmd}\n\n``{sub_process.full_std}`", - name="shell.txt", - disable_web_page_preview=True, - ) + try: + cmd = message.input.strip() + reply = await message.reply("`getting live output....`") + sub_process = await shell.AsyncShell.run_cmd(cmd) + sleep_for = 1 + output = "" + async for stdout in sub_process.get_output(): + if output != stdout: + if len(stdout) <= 4096: + await reply.edit( + f"`{stdout}`", + disable_web_page_preview=True, + parse_mode=ParseMode.MARKDOWN, + ) + output = stdout + if sleep_for >= 5: + sleep_for = 1 + await asyncio.Task( + asyncio.sleep(sleep_for), name=f"{message.chat.id}-{reply.id}" + ) + sleep_for += 1 + return await reply.edit( + f"`$ {cmd}\n\n``{sub_process.full_std}`", + name="shell.txt", + disable_web_page_preview=True, + ) + except asyncio.exceptions.CancelledError: + return await reply.edit("`Cancelled...`") # Run Python code + + async def executor(bot, message): code = message.flt_input.strip() if not code: @@ -67,7 +73,11 @@ async def executor(bot, message): try: # Create and initialise the function exec(f"async def _exec(bot, message):\n {formatted_code}") - func_out = await locals()["_exec"](bot, message) + func_out = await asyncio.Task( + locals()["_exec"](bot, message), name=f"{message.chat.id}-{reply.id}" + ) + except asyncio.exceptions.CancelledError: + return await reply.edit("`Cancelled....`") except BaseException: func_out = str(traceback.format_exc()) sys.stdout = sys.__stdout__ diff --git a/app/plugins/tools.py b/app/plugins/tools.py index 37d541f..b337940 100644 --- a/app/plugins/tools.py +++ b/app/plugins/tools.py @@ -1,19 +1,21 @@ +import asyncio + from app import bot -from app.social_dl import current_tasks -@bot.add_cmd(cmd="cancel") +@bot.add_cmd(cmd=["cancel", "c"]) async def cancel_task(bot, message): task_id = message.reply_id if not task_id: - return await message.reply("Reply To a Command or Bot's Response Message.") - task = current_tasks.get(task_id) - if not task: - return await message.reply("Task not in Currently Running Tasks.") - reply = await message.reply("Cancelling....") - cancelled = task.cancel() - if cancelled: - response = "Task Cancelled Successfully." - else: - response = "Task not Running.\nIt either is Finished or has Errored." - await reply.edit(response) + return await message.reply( + "Reply To a Command or Bot's Response Message.", del_in=8 + ) + all_tasks = asyncio.all_tasks() + tasks = [x for x in all_tasks if x.get_name() == f"{message.chat.id}-{task_id}"] + if not tasks: + return await message.reply("Task not in Currently Running Tasks.", del_in=8) + response = "" + for task in tasks: + status = task.cancel() + response += f"Task: __{task.get_name()}__\nCancelled: __{status}__\n" + await message.reply(response, del_in=5) diff --git a/app/social_dl.py b/app/social_dl.py index 7f7060a..52e364f 100644 --- a/app/social_dl.py +++ b/app/social_dl.py @@ -6,31 +6,27 @@ from app.core import filters from app.core.MediaHandler import ExtractAndSendMedia from app.core.message import Message -current_tasks = {} - @bot.add_cmd(cmd="dl") async def dl(bot, message): reply = await bot.send_message( chat_id=message.chat.id, text="`trying to download...`" ) - task = asyncio.Task(ExtractAndSendMedia.process(message)) - current_tasks[reply.id] = task + coro = ExtractAndSendMedia.process(message) + task = asyncio.Task(coro, name=f"{message.chat.id}-{message.id}") media = await task if media.exceptions: exceptions = "\n".join(media.exceptions) await bot.log( - text=exceptions, + traceback=exceptions, func="DL", chat=message.chat.title or message.chat.first_name, name="traceback.txt", ) - current_tasks.pop(reply.id) return await reply.edit(f"Media Download Failed.") if media.media_objects: await message.delete() await reply.delete() - current_tasks.pop(reply.id) @bot.on_message(filters.user_filter) @@ -38,36 +34,34 @@ async def dl(bot, message): async def cmd_dispatcher(bot, message): parsed_message = Message.parse_message(message) func = Config.CMD_DICT[parsed_message.cmd] + coro = func(bot, parsed_message) try: - task = asyncio.Task(func(bot, parsed_message)) - current_tasks[message.id] = task + task = asyncio.Task(coro, name=f"{message.chat.id}-{message.id}") await task except asyncio.exceptions.CancelledError: - pass + await bot.log(text=f"#Cancelled:\n{message.text}") except BaseException: await bot.log( - text=str(traceback.format_exc()), + traceback=str(traceback.format_exc()), chat=message.chat.title or message.chat.first_name, func=func.__name__, name="traceback.txt", ) - current_tasks.pop(message.id) @bot.on_message(filters.chat_filter) async def dl_dispatcher(bot, message): parsed_message = Message.parse_message(message) + coro = dl(bot, parsed_message) try: - task = asyncio.Task(dl(bot, parsed_message)) - current_tasks[message.id] = task + task = asyncio.Task(coro, name=f"{message.chat.id}-{message.id}") await task except asyncio.exceptions.CancelledError: - pass + await bot.log(text=f"#Cancelled:\n{message.text}") except BaseException: await bot.log( - text=str(traceback.format_exc()), + traceback=str(traceback.format_exc()), chat=message.chat.title or message.chat.first_name, func=dl.__name__, name="traceback.txt", ) - current_tasks.pop(message.id)