diff --git a/.gitignore b/.gitignore index ee8ba97..4542e52 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ __pycache__ config.env - +.idea diff --git a/README.md b/README.md index 2aacef6..a7debae 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ Now send another message but this time include your id in the list: [12345678] * Copy this message's link and add the message id in USERS_MESSAGE_ID var - + * Trigger : Trigger to access bot. * Dev Mode: Set to 1 if you want access to exec, sh, shell commands. > These commands can be dangerous if used carelessly, Turn on at your own risk. diff --git a/app/__init__.py b/app/__init__.py old mode 100755 new mode 100644 index d148264..59a9623 --- a/app/__init__.py +++ b/app/__init__.py @@ -4,11 +4,14 @@ from dotenv import load_dotenv load_dotenv("config.env") -from .config import Config -from .core.client import BOT +# isort: skip +from .config import Config # noqa +from app.core.message import Message # noqa +from .core.client import BOT # noqa + +if "com.termux" not in os.environ.get("PATH", ""): + import uvloop # isort:skip -if not os.environ.get("TERMUX_APK_RELEASE"): - import uvloop uvloop.install() bot = BOT() diff --git a/app/__main__.py b/app/__main__.py old mode 100755 new mode 100644 diff --git a/app/api/gallerydl.py b/app/api/gallery_dl.py similarity index 69% rename from app/api/gallerydl.py rename to app/api/gallery_dl.py index 2834be8..9b7ce37 100644 --- a/app/api/gallerydl.py +++ b/app/api/gallery_dl.py @@ -7,13 +7,13 @@ from app.core import shell from app.core.scraper_config import MediaType, ScraperConfig -class Gallery_DL(ScraperConfig): - def __init__(self, url): +class GalleryDL(ScraperConfig): + def __init__(self, url: str): super().__init__() - self.url = url + self.url: str = url async def download_or_extract(self): - self.path = "downloads/" + str(time.time()) + self.path: str = "downloads/" + str(time.time()) os.makedirs(self.path) try: async with asyncio.timeout(30): @@ -22,8 +22,8 @@ class Gallery_DL(ScraperConfig): ) except TimeoutError: pass - files = glob.glob(f"{self.path}/*") + files: list[str] = glob.glob(f"{self.path}/*") if not files: return self.cleanup() self.media = self.success = True - self.type = MediaType.GROUP + self.type: MediaType = MediaType.GROUP diff --git a/app/api/instagram.py b/app/api/instagram.py index 89c042c..9664ddc 100755 --- a/app/api/instagram.py +++ b/app/api/instagram.py @@ -1,34 +1,45 @@ import os from urllib.parse import urlparse -from app import Config, bot +from app import Config, Message, bot from app.core.aiohttp_tools import get_json, get_type from app.core.scraper_config import MediaType, ScraperConfig API_KEYS = {"KEYS": Config.API_KEYS, "counter": 0} +async def get_key(): + keys, count = API_KEYS.values() + count += 1 + if count == len(keys): + count = 0 + ret_key = keys[count] + API_KEYS["counter"] = count + return ret_key + + class Instagram(ScraperConfig): def __init__(self, url): super().__init__() self.shortcode = os.path.basename(urlparse(url).path.rstrip("/")) - self.api_url = f"https://www.instagram.com/graphql/query?query_hash=2b0673e0dc4580674a88d426fe00ea90&variables=%7B%22shortcode%22%3A%22{self.shortcode}%22%7D" + self.api_url = (f"https://www.instagram.com/graphql/query?query_hash=2b0673e0dc4580674a88d426fe00ea90" + f"&variables=%7B%22shortcode%22%3A%22{self.shortcode}%22%7D") self.url = url self.dump = True - async def check_dump(self): + async def check_dump(self) -> None | bool: if not Config.DUMP_ID: return async for message in bot.search_messages(Config.DUMP_ID, "#" + self.shortcode): - self.media = message - self.type = MediaType.MESSAGE - self.in_dump = True + self.media: Message = message + self.type: MediaType = MediaType.MESSAGE + self.in_dump: bool = True return True async def download_or_extract(self): for func in [self.check_dump, self.api_3, self.no_api_dl, self.api_dl]: if await func(): - self.success = True + self.success: bool = True break async def api_3(self): @@ -37,28 +48,28 @@ class Instagram(ScraperConfig): if not response: return self.caption = "." - data = ( - response.get("videos", []) - + response.get("images", []) - + response.get("stories", []) + data: list = ( + response.get("videos", []) + + response.get("images", []) + + response.get("stories", []) ) if not data: return if len(data) > 1: self.type = MediaType.GROUP - self.media = data + self.media: list = data return True else: - self.media = data[0] - self.type = get_type(self.media) + self.media: str = data[0] + self.type: MediaType = get_type(self.media) return True async def no_api_dl(self): - response = await aiohttp_tools.get_json(url=self.api_url) + response = await get_json(url=self.api_url) if ( - not response - or "data" not in response - or not response["data"]["shortcode_media"] + not response + or "data" not in response + or not response["data"]["shortcode_media"] ): return return await self.parse_ghraphql(response["data"]["shortcode_media"]) @@ -67,45 +78,35 @@ class Instagram(ScraperConfig): if not Config.API_KEYS: return param = { - "api_key": await self.get_key(), + "api_key": await get_key(), "url": self.api_url, "proxy": "residential", - "js": False, + "js": "false", } - response = await aiohttp_tools.get_json( + response: dict | None = await get_json( url="https://api.webscraping.ai/html", timeout=30, params=param ) if ( - not response - or "data" not in response - or not response["data"]["shortcode_media"] + not response + or "data" not in response + or not response["data"]["shortcode_media"] ): return self.caption = ".." return await self.parse_ghraphql(response["data"]["shortcode_media"]) - async def parse_ghraphql(self, json_: dict): - type_check = json_.get("__typename", None) + async def parse_ghraphql(self, json_: dict) -> str | list | None: + type_check: str | None = json_.get("__typename", None) if not type_check: return elif type_check == "GraphSidecar": - self.media = [ + self.media: list[str] = [ i["node"].get("video_url") or i["node"].get("display_url") for i in json_["edge_sidecar_to_children"]["edges"] ] - self.type = MediaType.GROUP + self.type: MediaType = MediaType.GROUP else: - self.media = json_.get("video_url", json_.get("display_url")) - self.thumb = json_.get("display_url") - self.type = get_type(self.media) + self.media: str = json_.get("video_url", json_.get("display_url")) + self.thumb: str = json_.get("display_url") + self.type: MediaType = get_type(self.media) return self.media - - # Rotating Key function to avoid hitting limit on single Key - async def get_key(self): - keys, count = API_KEYS.values() - count += 1 - if count == len(keys): - count = 0 - ret_key = keys[count] - API_KEYS["counter"] = count - return ret_key diff --git a/app/api/reddit.py b/app/api/reddit.py index 715919e..e9983f2 100755 --- a/app/api/reddit.py +++ b/app/api/reddit.py @@ -12,57 +12,59 @@ class Reddit(ScraperConfig): def __init__(self, url): super().__init__() parsed_url = urlparse(url) - self.url = f"https://www.reddit.com{parsed_url.path}.json?limit=1" + self.url: str = f"https://www.reddit.com{parsed_url.path}.json?limit=1" - async def download_or_extract(self): - headers = { + async def download_or_extract(self) -> None: + headers: dict = { "user-agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_8_7 rv:5.0; en-US) AppleWebKit/533.31.5 (KHTML, like Gecko) Version/4.0 Safari/533.31.5" } - response = await get_json(url=self.url, headers=headers, json_=True) + response: dict | None = await get_json( + url=self.url, headers=headers, json_=True + ) if not response: return try: - json_ = response[0]["data"]["children"][0]["data"] + json_: dict = response[0]["data"]["children"][0]["data"] except BaseException: return - self.caption = ( + self.caption: str = ( f"""__{json_["subreddit_name_prefixed"]}:__\n**{json_["title"]}**""" ) - self.thumb = json_.get("thumbnail") + self.thumb: str = json_.get("thumbnail") if json_.get("is_gallery"): - self.media = [ + self.media: list[str] = [ val["s"].get("u", val["s"].get("gif")).replace("preview", "i") for val in json_["media_metadata"].values() ] self.success = True - self.type = MediaType.GROUP + self.type: MediaType = MediaType.GROUP return - hls = re.findall(r"'hls_url'\s*:\s*'([^']*)'", str(json_)) + hls: list[str] = re.findall(r"'hls_url'\s*:\s*'([^']*)'", str(json_)) if hls: - self.path = "downloads/" + str(time.time()) + self.path: str = "downloads/" + str(time.time()) os.makedirs(self.path) - self.media = f"{self.path}/v.mp4" - vid_url = hls[0] + self.media: str = f"{self.path}/v.mp4" + vid_url: str = hls[0] await shell.run_shell_cmd( f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {self.media}' ) self.thumb = await shell.take_ss(video=self.media, path=self.path) self.success = True - self.type = ( + self.type: MediaType.VIDEO | MediaType.GIF = ( MediaType.VIDEO if await shell.check_audio(self.media) else MediaType.GIF ) return - generic = json_.get("url_overridden_by_dest", "").strip() - self.type = get_type(generic) + generic: str = json_.get("url_overridden_by_dest", "").strip() + self.type: MediaType = get_type(generic) if self.type: - self.media = generic + self.media: str = generic self.success = True diff --git a/app/api/threads.py b/app/api/threads.py index c1d40ef..8f29edc 100644 --- a/app/api/threads.py +++ b/app/api/threads.py @@ -10,12 +10,12 @@ from app.core.scraper_config import MediaType, ScraperConfig class Threads(ScraperConfig): def __init__(self, url): super().__init__() - self.url = url + self.url: str = url async def download_or_extract(self): - shortcode = os.path.basename(urlparse(self.url).path.rstrip("/")) + shortcode: str = os.path.basename(urlparse(self.url).path.rstrip("/")) - response = await ( + response: str = await ( await aiohttp_tools.SESSION.get( f"https://www.threads.net/t/{shortcode}/embed/" ) diff --git a/app/api/tiktok.py b/app/api/tiktok.py index f422c5e..b4a609b 100755 --- a/app/api/tiktok.py +++ b/app/api/tiktok.py @@ -7,18 +7,18 @@ tiktok_scraper = Tiktok_Scraper(quiet=True) class Tiktok(ScraperConfig): def __init__(self, url): super().__init__() - self.url = url + self.url: str = url async def download_or_extract(self): - media = await tiktok_scraper.hybrid_parsing(self.url) + media: dict | None = await tiktok_scraper.hybrid_parsing(self.url) if not media or "status" not in media or media["status"] == "failed": return if "video_data" in media: - self.media = media["video_data"]["nwm_video_url_HQ"] - self.thumb = media["cover_data"]["dynamic_cover"]["url_list"][0] + self.media: str = media["video_data"]["nwm_video_url_HQ"] + self.thumb: str = media["cover_data"]["dynamic_cover"]["url_list"][0] self.success = True self.type = MediaType.VIDEO if "image_data" in media: - self.media = media["image_data"]["no_watermark_image_list"] + self.media: list[str] = media["image_data"]["no_watermark_image_list"] self.success = True self.type = MediaType.GROUP diff --git a/app/api/ytdl.py b/app/api/ytdl.py index 48383f5..e932404 100755 --- a/app/api/ytdl.py +++ b/app/api/ytdl.py @@ -21,14 +21,14 @@ class FakeLogger(object): pass -class YT_DL(ScraperConfig): +class YouTubeDL(ScraperConfig): def __init__(self, url): super().__init__() - self.url = url - self.path = "downloads/" + str(time.time()) - self.video_path = self.path + "/v.mp4" + self.url: str = url + self.path: str = "downloads/" + str(time.time()) + self.video_path: str = self.path + "/v.mp4" self.type = MediaType.VIDEO - _opts = { + _opts: dict = { "outtmpl": self.video_path, "ignoreerrors": True, "ignore_no_formats_error": True, @@ -37,17 +37,17 @@ class YT_DL(ScraperConfig): "noplaylist": True, "format": self.get_format(), } - self.yt_obj = yt_dlp.YoutubeDL(_opts) + self.yt_obj: yt_dlp.YoutubeDL = yt_dlp.YoutubeDL(_opts) async def download_or_extract(self): - info = await self.get_info() + info: dict = await self.get_info() if not info: return await asyncio.to_thread(self.yt_obj.download, self.url) if "youtu" in self.url: - self.caption = ( + self.caption: str = ( f"""__{info.get("channel","")}__:\n**{info.get("title","")}**""" ) @@ -56,7 +56,7 @@ class YT_DL(ScraperConfig): self.thumb = await take_ss(self.video_path, path=self.path) self.success = True - async def get_info(self): + async def get_info(self) -> None | dict: if os.path.basename(self.url).startswith("@") or "/hashtag/" in self.url: return info = await asyncio.to_thread( @@ -70,7 +70,7 @@ class YT_DL(ScraperConfig): return return info - def get_format(self): + def get_format(self) -> str: if "/shorts" in self.url: return "bv[ext=mp4][res=720]+ba[ext=m4a]/b[ext=mp4]" elif "youtu" in self.url: diff --git a/app/config.py b/app/config.py index 450b133..91f69a6 100644 --- a/app/config.py +++ b/app/config.py @@ -1,29 +1,37 @@ import json import os +from typing import Callable + +from pyrogram.filters import Filter +from pyrogram.types import Message class Config: - API_KEYS = json.loads(os.environ.get("API_KEYS", "[]")) + API_KEYS: list[int] = json.loads(os.environ.get("API_KEYS", "[]")) - BLOCKED_USERS = [] - BLOCKED_USERS_MESSAGE_ID = int(os.environ.get("BLOCKED_USERS_MESSAGE_ID", 0)) + BLOCKED_USERS: list[int] = [] + BLOCKED_USERS_MESSAGE_ID: int = int(os.environ.get("BLOCKED_USERS_MESSAGE_ID", 0)) - CHATS = [] - AUTO_DL_MESSAGE_ID = int(os.environ.get("AUTO_DL_MESSAGE_ID", 0)) + CHATS: list[int] = [] + AUTO_DL_MESSAGE_ID: int = int(os.environ.get("AUTO_DL_MESSAGE_ID", 0)) - CMD_DICT = {} + CMD_DICT: dict[str, Callable] = {} + CONVO_DICT: dict[int, dict[str | int, Message | Filter | None]] = {} - DEV_MODE = int(os.environ.get("DEV_MODE", 0)) + DEV_MODE: int = int(os.environ.get("DEV_MODE", 0)) - DISABLED_CHATS = [] - DISABLED_CHATS_MESSAGE_ID = int(os.environ.get("DISABLED_CHATS_MESSAGE_ID", 0)) + DISABLED_CHATS: list[int] = [] + DISABLED_CHATS_MESSAGE_ID: int = int(os.environ.get("DISABLED_CHATS_MESSAGE_ID", 0)) - DUMP_ID = int(os.environ.get("DUMP_ID",0)) + DUMP_ID: int = int(os.environ.get("DUMP_ID", 0)) - LOG_CHAT = int(os.environ.get("LOG_CHAT")) - TRIGGER = os.environ.get("TRIGGER", ".") + LOG_CHAT: int = int(os.environ.get("LOG_CHAT")) - UPSTREAM_REPO = os.environ.get("UPSTREAM_REPO","https://github.com/anonymousx97/social-dl").rstrip("/") + TRIGGER: str = os.environ.get("TRIGGER", ".") - USERS = [] - USERS_MESSAGE_ID = int(os.environ.get("USERS_MESSAGE_ID", 0)) + UPSTREAM_REPO = os.environ.get( + "UPSTREAM_REPO", "https://github.com/anonymousx97/social-dl" + ).rstrip("/") + + USERS: list[int] = [] + USERS_MESSAGE_ID: int = int(os.environ.get("USERS_MESSAGE_ID", 0)) diff --git a/app/core/aiohttp_tools.py b/app/core/aiohttp_tools.py old mode 100755 new mode 100644 index 8eb7dca..5e7879b --- a/app/core/aiohttp_tools.py +++ b/app/core/aiohttp_tools.py @@ -7,10 +7,10 @@ import aiohttp from app.core.scraper_config import MediaType -SESSION = None +SESSION: aiohttp.ClientSession | None = None -async def session_switch(): +async def session_switch() -> None: if not SESSION: globals().update({"SESSION": aiohttp.ClientSession()}) else: @@ -23,7 +23,7 @@ async def get_json( params: dict = None, json_: bool = False, timeout: int = 10, -): +) -> dict | None: try: async with SESSION.get( url=url, headers=headers, params=params, timeout=timeout @@ -37,7 +37,7 @@ async def get_json( return -async def in_memory_dl(url: str): +async def in_memory_dl(url: str) -> BytesIO: async with SESSION.get(url) as remote_file: bytes_data = await remote_file.read() file = BytesIO(bytes_data) @@ -45,7 +45,7 @@ async def in_memory_dl(url: str): return file -def get_filename(url): +def get_filename(url: str) -> str: name = basename(urlparse(url).path.rstrip("/")).lower() if name.endswith((".webp", ".heic")): name = name + ".jpg" @@ -54,7 +54,7 @@ def get_filename(url): return name -def get_type(url): +def get_type(url: str) -> MediaType | None: name, ext = splitext(get_filename(url)) if ext in {".png", ".jpg", ".jpeg"}: return MediaType.PHOTO @@ -64,7 +64,7 @@ def get_type(url): return MediaType.GIF -async def thumb_dl(thumb): +async def thumb_dl(thumb) -> BytesIO | str | None: if not thumb or not thumb.startswith("http"): return thumb return await in_memory_dl(thumb) diff --git a/app/core/client.py b/app/core/client.py index 17660bc..58419c4 100644 --- a/app/core/client.py +++ b/app/core/client.py @@ -1,4 +1,3 @@ -import base64 import glob import importlib import json @@ -7,14 +6,26 @@ import sys from functools import wraps from io import BytesIO -from pyrogram import Client, idle +from pyrogram import Client, filters, idle from pyrogram.enums import ParseMode +from pyrogram.types import Message as Msg from app import Config from app.core import aiohttp_tools +from app.core.conversation import Conversation from app.core.message import Message +async def import_modules(): + for py_module in glob.glob(pathname="app/**/*.py", recursive=True): + name = os.path.splitext(py_module)[0] + py_name = name.replace("/", ".") + try: + importlib.import_module(py_name) + except Exception as e: + print(e) + + class BOT(Client): def __init__(self): if string := os.environ.get("STRING_SESSION"): @@ -32,7 +43,8 @@ class BOT(Client): max_concurrent_transmissions=2, ) - def add_cmd(self, cmd, trigger=Config.TRIGGER): # Custom triggers To do + @staticmethod + def add_cmd(cmd: str): def the_decorator(func): @wraps(func) def wrapper(): @@ -47,9 +59,22 @@ class BOT(Client): return the_decorator - async def boot(self): + @staticmethod + async def get_response( + chat_id: int, filters: filters.Filter = None, timeout: int = 8 + ) -> Message | None: + try: + async with Conversation( + chat_id=chat_id, filters=filters, timeout=timeout + ) as convo: + response: Message | None = await convo.get_response() + return response + except Conversation.TimeOutError: + return + + async def boot(self) -> None: await super().start() - await self.import_modules() + await import_modules() await self.set_filter_list() await aiohttp_tools.session_switch() await self.edit_restart_msg() @@ -58,7 +83,7 @@ class BOT(Client): await idle() await aiohttp_tools.session_switch() - async def edit_restart_msg(self): + async def edit_restart_msg(self) -> None: restart_msg = int(os.environ.get("RESTART_MSG", 0)) restart_chat = int(os.environ.get("RESTART_CHAT", 0)) if restart_msg and restart_chat: @@ -71,24 +96,26 @@ class BOT(Client): os.environ.pop("RESTART_MSG", "") os.environ.pop("RESTART_CHAT", "") - async def import_modules(self): - for py_module in glob.glob("app/**/*.py", recursive=True): - name = os.path.splitext(py_module)[0] - py_name = name.replace("/", ".") - importlib.import_module(py_name) - async def log( self, text="", traceback="", chat=None, func=None, + message: Message | Msg | None = None, name="log.txt", disable_web_page_preview=True, parse_mode=ParseMode.HTML, - ): + ) -> Message | Msg: + if message: + return (await message.copy(chat_id=Config.LOG_CHAT)) # fmt: skip if traceback: - text = f"#Traceback\nFunction: {func}\nChat: {chat}\nTraceback:\n{traceback}" + text = f""" +#Traceback +Function: {func} +Chat: {chat} +Traceback: +{traceback}""" return await self.send_message( chat_id=Config.LOG_CHAT, text=text, @@ -97,13 +124,27 @@ class BOT(Client): parse_mode=parse_mode, ) - async def restart(self): + async def restart(self, hard=False) -> None: await aiohttp_tools.session_switch() await super().stop(block=False) + if hard: + os.execl("/bin/bash", "/bin/bash", "run") os.execl(sys.executable, sys.executable, "-m", "app") - SECRET_API = base64.b64decode("YS56dG9yci5tZS9hcGkvaW5zdGE=").decode("utf-8") + async def send_message( + self, chat_id: int | str, text, name: str = "output.txt", **kwargs + ) -> Message | Msg: + text = str(text) + if len(text) < 4096: + return Message.parse_message( + (await super().send_message(chat_id=chat_id, text=text, **kwargs)) + ) + doc = BytesIO(bytes(text, encoding="utf-8")) + doc.name = name + kwargs.pop("disable_web_page_preview", "") + return await super().send_document(chat_id=chat_id, document=doc, **kwargs) + async def set_filter_list(self): chats_id = Config.AUTO_DL_MESSAGE_ID blocked_id = Config.BLOCKED_USERS_MESSAGE_ID @@ -126,13 +167,3 @@ class BOT(Client): Config.DISABLED_CHATS = json.loads( (await super().get_messages(Config.LOG_CHAT, disabled)).text ) - - async def send_message(self, chat_id, text, name: str = "output.txt", **kwargs): - if len(str(text)) < 4096: - return Message.parse_message( - (await super().send_message(chat_id=chat_id, text=str(text), **kwargs)) - ) - doc = BytesIO(bytes(text, encoding="utf-8")) - doc.name = name - kwargs.pop("disable_web_page_preview", "") - return await super().send_document(chat_id=chat_id, document=doc, **kwargs) diff --git a/app/core/conversation.py b/app/core/conversation.py new file mode 100644 index 0000000..4c381f9 --- /dev/null +++ b/app/core/conversation.py @@ -0,0 +1,47 @@ +import asyncio +import json + +from pyrogram.filters import Filter +from pyrogram.types import Message + +from app import Config + + +class Conversation: + class DuplicateConvo(Exception): + def __init__(self, chat: str | int | None = None): + text = "Conversation already started" + if chat: + text += f" with {chat}" + super().__init__(text) + + class TimeOutError(Exception): + def __init__(self): + super().__init__("Conversation Timeout") + + def __init__(self, chat_id: int, filters: Filter | None = None, timeout: int = 10): + self.chat_id = chat_id + self.filters = filters + self.timeout = timeout + + def __str__(self): + return json.dumps(self.__dict__, indent=4, ensure_ascii=False) + + async def get_response(self, timeout: int | None = None) -> Message | None: + try: + async with asyncio.timeout(timeout or self.timeout): + while not Config.CONVO_DICT[self.chat_id]["response"]: + await asyncio.sleep(0) + return Config.CONVO_DICT[self.chat_id]["response"] + except asyncio.TimeoutError: + raise self.TimeOutError + + async def __aenter__(self) -> "Conversation": + if self.chat_id in Config.CONVO_DICT: + raise self.DuplicateConvo(self.chat_id) + convo_dict = {"filters": self.filters, "response": None} + Config.CONVO_DICT[self.chat_id] = convo_dict + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + Config.CONVO_DICT.pop(self.chat_id, "") diff --git a/app/core/filters.py b/app/core/filters.py index 3eb36c1..4f8f9d2 100644 --- a/app/core/filters.py +++ b/app/core/filters.py @@ -1,14 +1,15 @@ from urllib.parse import urlparse from pyrogram import filters as _filters +from pyrogram.types import Message from app import Config from app.core.media_handler import url_map -def check_for_urls(text_list): +def check_for_urls(text_list: list): for link in text_list: - if match := url_map.get(urlparse(link).netloc): + if url_map.get(urlparse(link).netloc): return True else: for key in url_map.keys(): @@ -16,7 +17,7 @@ def check_for_urls(text_list): return True -def dynamic_chat_filter(_, __, message, cmd=False): +def dynamic_chat_filter(_, __, message: Message, cmd=False) -> bool: if ( not message.text or (not message.text.startswith("https") and not cmd) @@ -34,17 +35,16 @@ def dynamic_chat_filter(_, __, message, cmd=False): return bool(url_check) -def dynamic_allowed_list(_, __, message): +def dynamic_allowed_list(_, __, message: Message) -> bool: if not dynamic_chat_filter(_, __, message, cmd=True): return False - start_str = message.text.split(maxsplit=1)[0] - cmd = start_str.replace("/", "", 1) - cmd_check = cmd in {"download", "dl", "down"} + cmd = message.text.split(maxsplit=1)[0] + cmd_check = cmd in {"/download", "/dl", "/down"} reaction_check = not message.reactions return bool(cmd_check and reaction_check) -def dynamic_cmd_filter(_, __, message): +def dynamic_cmd_filter(_, __, message: Message) -> bool: if ( not message.text or not message.text.startswith(Config.TRIGGER) @@ -63,3 +63,7 @@ def dynamic_cmd_filter(_, __, message): chat_filter = _filters.create(dynamic_chat_filter) user_filter = _filters.create(dynamic_cmd_filter) allowed_cmd_filter = _filters.create(dynamic_allowed_list) +convo_filter = _filters.create( + lambda _, __, message: (message.chat.id in Config.CONVO_DICT) + and (not message.reactions) +) diff --git a/app/core/media_handler.py b/app/core/media_handler.py index ce354d1..f2413e3 100644 --- a/app/core/media_handler.py +++ b/app/core/media_handler.py @@ -1,82 +1,98 @@ import asyncio import glob +import json import os import traceback +from functools import lru_cache +from io import BytesIO from urllib.parse import urlparse from pyrogram.errors import MediaEmpty, PhotoSaveFileInvalid, WebpageCurlFailed from pyrogram.types import InputMediaPhoto, InputMediaVideo -from app import Config -from app.api.gallerydl import Gallery_DL +from app import Config, Message, bot +from app.api.gallery_dl import GalleryDL from app.api.instagram import Instagram from app.api.reddit import Reddit from app.api.threads import Threads from app.api.tiktok import Tiktok -from app.api.ytdl import YT_DL +from app.api.ytdl import YouTubeDL from app.core import aiohttp_tools, shell from app.core.scraper_config import MediaType -url_map = { +url_map: dict = { "tiktok.com": Tiktok, "www.instagram.com": Instagram, "www.reddit.com": Reddit, "reddit.com": Reddit, "www.threads.net": Threads, - "twitter.com": Gallery_DL, - "youtube.com": YT_DL, - "youtu.be": YT_DL, - "www.facebook.com": YT_DL, + "twitter.com": GalleryDL, + "x.com": GalleryDL, + "www.x.com": GalleryDL, + "youtube.com": YouTubeDL, + "youtu.be": YouTubeDL, + "www.facebook.com": YouTubeDL, } +@lru_cache() +def get_url_dict_items(): + return url_map.items() + + class MediaHandler: - def __init__(self, message): - self.exceptions, self.media_objects, self.sender_dict = [], [], {} - self.__client = message._client - self.message = message - self.doc = "-d" in message.flags - self.spoiler = "-s" in message.flags - self.args_ = { + def __init__(self, message: Message) -> None: + self.exceptions = [] + self.media_objects: list[asyncio.Task.result] = [] + self.sender_dict = {} + self.__client: bot = message._client + self.message: Message = message + self.doc: bool = "-d" in message.flags + self.spoiler: bool = "-s" in message.flags + self.args_: dict = { "chat_id": self.message.chat.id, "reply_to_message_id": message.reply_id, } - def get_sender(self, reply=False): + def __str__(self): + return json.dumps(self.__dict__, indent=4, ensure_ascii=False) + + def get_sender(self, reply: bool = False) -> str: if "-ns" in self.message.flags: return "" - text = f"\nShared by : " - author = self.message.author_signature - sender = user.first_name if (user := self.message.from_user) else "" - reply_sender = "" + text: str = f"\nShared by : " + author: str | None = self.message.author_signature + sender: str = user.first_name if (user := self.message.from_user) else "" + reply_sender: str = "" if reply: reply_msg = self.message.replied - reply_sender = ( + reply_sender: str = ( reply_user.first_name if (reply_user := reply_msg.from_user) else "" ) if any((author, sender, reply_sender)): return text + (author or sender if not reply else reply_sender) - else: - return "" + return "" - async def get_media(self): + async def get_media(self) -> None: async with asyncio.TaskGroup() as task_group: - tasks = [] - text_list = self.message.text_list - reply_text_list = self.message.reply_text_list + tasks: list[asyncio.Task] = [] + text_list: list[str] = self.message.text_list + reply_text_list: list[str] = self.message.reply_text_list for link in text_list + reply_text_list: - reply = link in reply_text_list + reply: bool = link in reply_text_list if match := url_map.get(urlparse(link).netloc): tasks.append(task_group.create_task(match.start(link))) self.sender_dict[link] = self.get_sender(reply=reply) else: - for key, val in url_map.items(): + for key, val in get_url_dict_items(): if key in link: tasks.append(task_group.create_task(val.start(link))) self.sender_dict[link] = self.get_sender(reply=reply) - self.media_objects = [task.result() for task in tasks if task.result()] + self.media_objects: list[asyncio.Task.result] = [ + task.result() for task in tasks if task.result() + ] - async def send_media(self): + async def send_media(self) -> None: for obj in self.media_objects: if "-nc" in self.message.flags: caption = "" @@ -122,25 +138,33 @@ class MediaHandler: "\n".join([obj.caption_url.strip(), traceback.format_exc()]) ) - async def send(self, media, method, **kwargs): + async def send(self, media: dict | str, method, caption: str, **kwargs) -> Message: try: try: - post = await method( - **media, **self.args_, **kwargs, has_spoiler=self.spoiler + post: Message = await method( + **media, + **self.args_, + caption=caption, + **kwargs, + has_spoiler=self.spoiler, ) except (MediaEmpty, WebpageCurlFailed): key, value = list(media.items())[0] media[key] = await aiohttp_tools.in_memory_dl(value) - post = await method( - **media, **self.args_, **kwargs, has_spoiler=self.spoiler + post: Message = await method( + **media, + **self.args_, + caption=caption, + **kwargs, + has_spoiler=self.spoiler, ) except PhotoSaveFileInvalid: - post = await self.__client.send_document( + post: Message = await self.__client.send_document( **self.args_, document=media, caption=caption, force_document=True ) return post - async def send_document(self, docs, caption, path=""): + async def send_document(self, docs: list, caption: str, path="") -> None: if not path: docs = await asyncio.gather( *[aiohttp_tools.in_memory_dl(doc) for doc in docs] @@ -154,8 +178,8 @@ class MediaHandler: ) await asyncio.sleep(0.5) - async def send_group(self, media_obj, caption): - sorted = await sort_media( + async def send_group(self, media_obj, caption: str) -> None: + sorted: list[str, list[InputMediaVideo | InputMediaPhoto]] = await sort_media( caption=caption, spoiler=self.spoiler, urls=media_obj.media, @@ -182,18 +206,22 @@ class MediaHandler: return obj -async def sort_media(caption="", spoiler=False, urls=None, path=None): +async def sort_media( + caption="", spoiler=False, urls=None, path=None +) -> list[str, list[InputMediaVideo | InputMediaPhoto]]: images, videos, animations = [], [], [] if path and os.path.exists(path): [os.rename(file_, file_ + ".png") for file_ in glob.glob(f"{path}/*.webp")] - media = glob.glob(f"{path}/*") + media: list[str] = glob.glob(f"{path}/*") else: - media = await asyncio.gather(*[aiohttp_tools.in_memory_dl(url) for url in urls]) + media: tuple[BytesIO] = await asyncio.gather( + *[aiohttp_tools.in_memory_dl(url) for url in urls] + ) for file in media: if path: - name = file.lower() + name: str = file.lower() else: - name = file.name.lower() + name: str = file.name.lower() if name.endswith((".png", ".jpg", ".jpeg")): images.append(InputMediaPhoto(file, caption=caption, has_spoiler=spoiler)) elif name.endswith((".mp4", ".mkv", ".webm")): @@ -208,7 +236,9 @@ async def sort_media(caption="", spoiler=False, urls=None, path=None): return await make_chunks(images, videos, animations) -async def make_chunks(images=[], videos=[], animations=[]): +async def make_chunks( + images=[], videos=[], animations=[] +) -> list[str, list[InputMediaVideo | InputMediaPhoto]]: chunk_imgs = [images[imgs : imgs + 5] for imgs in range(0, len(images), 5)] chunk_vids = [videos[vids : vids + 5] for vids in range(0, len(videos), 5)] return [*chunk_imgs, *chunk_vids, *animations] diff --git a/app/core/message.py b/app/core/message.py old mode 100755 new mode 100644 index 86eec77..7418bfc --- a/app/core/message.py +++ b/app/core/message.py @@ -2,61 +2,61 @@ import asyncio from functools import cached_property from pyrogram.errors import MessageDeleteForbidden -from pyrogram.types import Message as MSG +from pyrogram.filters import Filter +from pyrogram.types import Message as Msg +from pyrogram.types import User from app import Config +from app.core.conversation import Conversation -class Message(MSG): - def __init__(self, message): - super().__dict__.update(message.__dict__) +class Message(Msg): + def __init__(self, message: Msg) -> None: + args = vars(message) + args["client"] = args.pop("_client", None) + super().__init__(**args) @cached_property - def cmd(self): + def cmd(self) -> str | None: raw_cmd = self.text_list[0] - cmd = raw_cmd.lstrip(Config.TRIGGER) + cmd = raw_cmd[1:] return cmd if cmd in Config.CMD_DICT else None @cached_property - def flags(self): + def flags(self) -> list: return [i for i in self.text_list if i.startswith("-")] @cached_property - def flt_input(self): - split_lines = self.input.splitlines() - split_n_joined = [ - " ".join([word for word in line.split(" ") if word not in self.flags]) - for line in split_lines - ] - return "\n".join(split_n_joined) + def flt_input(self) -> str: + split_lines = self.input.split("\n", maxsplit=1) + split_lines[0] = " ".join( + [word for word in split_lines[0].split(" ") if word not in self.flags] + ) + return "\n".join(split_lines) @cached_property - def input(self): + def input(self) -> str: if len(self.text_list) > 1: return self.text.split(maxsplit=1)[-1] return "" @cached_property - def replied(self): + def replied(self) -> "Message": if self.reply_to_message: return Message.parse_message(self.reply_to_message) @cached_property - def reply_id(self): + def reply_id(self) -> int | None: return self.replied.id if self.replied else None @cached_property - def replied_task_id(self): + def replied_task_id(self) -> str | None: return self.replied.task_id if self.replied else None @cached_property - def reply_text_list(self): + def reply_text_list(self) -> list: reply_text_list = [] - if ( - self.replied - and (reply_text := self.replied.text) - and "dl" in self.text_list[0] - ): + if self.replied and self.replied.text and "dl" in self.text_list[0]: reply_text_list = self.replied.text_list return reply_text_list @@ -65,10 +65,10 @@ class Message(MSG): return f"{self.chat.id}-{self.id}" @cached_property - def text_list(self): + def text_list(self) -> list: return self.text.split() - async def async_deleter(self, del_in, task, block): + async def async_deleter(self, del_in, task, block) -> None: if block: x = await task await asyncio.sleep(del_in) @@ -78,7 +78,7 @@ class Message(MSG): self.async_deleter(del_in=del_in, task=task, block=True) ) - async def delete(self, reply=False): + async def delete(self, reply: bool = False) -> None: try: await super().delete() if reply and self.replied: @@ -86,7 +86,7 @@ class Message(MSG): except MessageDeleteForbidden: pass - async def edit(self, text, del_in: int = 0, block=True, **kwargs): + async def edit(self, text, del_in: int = 0, block=True, **kwargs) -> "Message": if len(str(text)) < 4096: kwargs.pop("name", "") task = self.edit_text(text, **kwargs) @@ -100,7 +100,41 @@ class Message(MSG): ) return reply - async def reply(self, text, del_in: int = 0, block=True, **kwargs): + async def extract_user_n_reason(self) -> list[User | str | Exception, str | None]: + if self.replied: + return [self.replied.from_user, self.flt_input] + inp_list = self.flt_input.split(maxsplit=1) + if not inp_list: + return [ + "Unable to Extract User info.\nReply to a user or input @ | id.", + "", + ] + user = inp_list[0] + reason = None + if len(inp_list) >= 2: + reason = inp_list[1] + if user.isdigit(): + user = int(user) + elif user.startswith("@"): + user = user.strip("@") + try: + return [await self._client.get_users(user_ids=user), reason] + except Exception as e: + return [e, reason] + + async def get_response(self, filters: Filter = None, timeout: int = 8): + try: + async with Conversation( + chat_id=self.chat.id, filters=filters, timeout=timeout + ) as convo: + response: Message | None = await convo.get_response() + return response + except Conversation.TimeOutError: + return + + async def reply( + self, text, del_in: int = 0, block: bool = True, **kwargs + ) -> "Message": task = self._client.send_message( chat_id=self.chat.id, text=text, reply_to_message_id=self.id, **kwargs ) @@ -110,6 +144,5 @@ class Message(MSG): return await task @classmethod - def parse_message(cls, message): - ret_obj = cls(message) - return ret_obj + def parse_message(cls, message: Msg) -> "Message": + return cls(message) diff --git a/app/core/scraper_config.py b/app/core/scraper_config.py index 9173dc9..16e7dc3 100644 --- a/app/core/scraper_config.py +++ b/app/core/scraper_config.py @@ -1,5 +1,7 @@ +import json import shutil from enum import Enum, auto +from io import BytesIO class MediaType(Enum): @@ -11,22 +13,25 @@ class MediaType(Enum): class ScraperConfig: - def __init__(self): - self.dump = False - self.in_dump = False - self.path = "" - self.media = "" - self.caption = "" - self.caption_url = "" - self.thumb = None - self.type = None - self.success = False + def __init__(self) -> None: + self.dump: bool = False + self.in_dump: bool = False + self.path: str = "" + self.media: str | BytesIO = "" + self.caption: str = "" + self.caption_url: str = "" + self.thumb: str | None | BytesIO = None + self.type: None | MediaType = None + self.success: bool = False - def set_sauce(self, url): + def __str__(self): + return json.dumps(self.__dict__, indent=4, ensure_ascii=False, default=str) + + def set_sauce(self, url: str) -> None: self.caption_url = f"\n\nSauce" @classmethod - async def start(cls, url): + async def start(cls, url: str) -> "ScraperConfig": obj = cls(url=url) obj.query_url = url obj.set_sauce(url) @@ -34,6 +39,6 @@ class ScraperConfig: if obj.success: return obj - def cleanup(self): + def cleanup(self) -> None: if self.path: shutil.rmtree(self.path, ignore_errors=True) diff --git a/app/core/shell.py b/app/core/shell.py old mode 100755 new mode 100644 index fb0cb12..f32b3f6 --- a/app/core/shell.py +++ b/app/core/shell.py @@ -1,8 +1,9 @@ import asyncio import os +from typing import AsyncIterable -async def take_ss(video: str, path: str): +async def take_ss(video: str, path: str) -> None | str: thumb = f"{path}/i.png" await run_shell_cmd( f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"''' @@ -11,15 +12,15 @@ async def take_ss(video: str, path: str): return thumb -async def check_audio(file): +async def check_audio(file) -> int: result = await run_shell_cmd( f"ffprobe -v error -show_entries format=nb_streams -of default=noprint_wrappers=1:nokey=1 {file}" ) return int(result or 0) - 1 -async def run_shell_cmd(cmd): - proc = await asyncio.create_subprocess_shell( +async def run_shell_cmd(cmd: str) -> str: + proc: asyncio.create_subprocess_shell = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT ) stdout, _ = await proc.communicate() @@ -27,38 +28,37 @@ async def run_shell_cmd(cmd): class AsyncShell: - def __init__(self, process): - self.process = process - self.full_std = "" - self.is_done = False + def __init__(self, process: asyncio.create_subprocess_shell): + self.process: asyncio.create_subprocess_shell = process + self.full_std: str = "" + self.is_done: bool = False + self._task: asyncio.Task | None = None - async def read_output(self): + async def read_output(self) -> None: while True: - line = (await self.process.stdout.readline()).decode("utf-8") + line: str = (await self.process.stdout.readline()).decode("utf-8") if not line: break self.full_std += line self.is_done = True await self.process.wait() - async def get_output(self): + async def get_output(self) -> AsyncIterable: while not self.is_done: yield self.full_std - def cancel(self): + def cancel(self) -> None: if not self.is_done: self.process.kill() self._task.cancel() @classmethod - async def run_cmd(cls, cmd, name="shell"): - sub_process = cls( + async def run_cmd(cls, cmd: str, name: str = "AsyncShell") -> "AsyncShell": + sub_process: AsyncShell = cls( process=await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT ) ) - sub_process._task = asyncio.create_task( - sub_process.read_output(), name="AsyncShell" - ) + sub_process._task = asyncio.create_task(sub_process.read_output(), name=name) await asyncio.sleep(0.5) return sub_process diff --git a/app/plugins/admin_tools.py b/app/plugins/admin_tools.py new file mode 100644 index 0000000..6414cc8 --- /dev/null +++ b/app/plugins/admin_tools.py @@ -0,0 +1,144 @@ +import asyncio +from typing import Awaitable + +from pyrogram.types import ChatPermissions, ChatPrivileges, User + +from app import bot +from app.core.message import Message + + +def get_privileges( + anon: bool = False, full: bool = False, without_rights: bool = False +) -> ChatPrivileges: + if without_rights: + return ChatPrivileges( + can_manage_chat=True, + can_manage_video_chats=False, + can_pin_messages=False, + can_delete_messages=False, + can_change_info=False, + can_restrict_members=False, + can_invite_users=False, + can_promote_members=False, + is_anonymous=False, + ) + return ChatPrivileges( + can_manage_chat=True, + can_manage_video_chats=True, + can_pin_messages=True, + can_delete_messages=True, + can_change_info=True, + can_restrict_members=True, + can_invite_users=True, + can_promote_members=full, + is_anonymous=anon, + ) + + +@bot.add_cmd(cmd=["promote", "demote"]) +async def promote_or_demote(bot: bot, message: Message) -> None: + response: Message = await message.reply( + f"Trying to {message.cmd.capitalize()}....." + ) + user, title = await message.extract_user_n_reason() + if not isinstance(user, User): + await response.edit(user, del_in=10) + return + full: bool = "-full" in message.flags + anon: bool = "-anon" in message.flags + without_rights = "-wr" in message.flags + promote = message.cmd == "promote" + if promote: + privileges: ChatPrivileges = get_privileges( + full=full, anon=anon, without_rights=without_rights + ) + else: + privileges = ChatPrivileges(can_manage_chat=False) + response_text = f"{message.cmd.capitalize()}d: {user.mention}" + try: + await bot.promote_chat_member( + chat_id=message.chat.id, user_id=user.id, privileges=privileges + ) + if promote: + # Let server promote admin before setting title + # Bot is too fast moment 😂😂😂 + await asyncio.sleep(1) + await bot.set_administrator_title( + chat_id=message.chat.id, user_id=user.id, title=title or "Admin" + ) + if title: + response_text += f"\nTitle: {title}" + if without_rights: + response_text += "\nWithout Rights: True" + await response.edit(text=response_text) + except Exception as e: + await response.edit(text=e, del_in=10, block=True) + + +@bot.add_cmd(cmd=["ban", "unban"]) +async def ban_or_unban(bot: bot, message: Message) -> None: + user, reason = await message.extract_user_n_reason() + if not isinstance(user, User): + await message.reply(user, del_in=10) + return + if message.cmd == "ban": + action: Awaitable = bot.ban_chat_member( + chat_id=message.chat.id, user_id=user.id + ) + else: + action: Awaitable = bot.unban_chat_member( + chat_id=message.chat.id, user_id=user.id + ) + try: + await action + await message.reply( + text=f"{message.cmd.capitalize()}ned: {user.mention}\nReason: {reason}." + ) + except Exception as e: + await message.reply(text=e, del_in=10) + + +@bot.add_cmd(cmd="kick") +async def kick_user(bot, message: Message): + user, reason = await message.extract_user_n_reason() + if not isinstance(user, User): + await message.reply(user, del_in=10) + return + try: + await bot.ban_chat_member(chat_id=message.chat.id, user_id=user.id) + await asyncio.sleep(1) + await bot.unban_chat_member(chat_id=message.chat.id, user_id=user.id) + await message.reply( + text=f"{message.cmd.capitalize()}ed: {user.mention}\nReason: {reason}." + ) + except Exception as e: + await message.reply(text=e, del_in=10) + + +@bot.add_cmd(cmd=["mute", "unmute"]) +async def mute_or_unmute(bot: bot, message: Message): + user, reason = await message.extract_user_n_reason() + if not isinstance(user, User): + await message.reply(user, del_in=10) + return + perms = message.chat.permissions + if message.cmd == "mute": + perms = ChatPermissions( + can_send_messages=False, + can_pin_messages=False, + can_invite_users=False, + can_change_info=False, + can_send_media_messages=False, + can_send_polls=False, + can_send_other_messages=False, + can_add_web_page_previews=False, + ) + try: + await bot.restrict_chat_member( + chat_id=message.chat.id, user_id=user.id, permissions=perms + ) + await message.reply( + text=f"{message.cmd.capitalize()}d: {user.mention}\nReason: {reason}." + ) + except Exception as e: + await message.reply(text=e, del_in=10) diff --git a/app/plugins/authorise.py b/app/plugins/authorise.py index 6f7670c..d030a13 100644 --- a/app/plugins/authorise.py +++ b/app/plugins/authorise.py @@ -1,9 +1,13 @@ +from typing import Callable + from pyrogram.errors import MessageNotModified -from app import Config, bot +from app import Config, Message, bot -async def add_or_remove(mode, task, item, config_list, message_id): +async def add_or_remove( + mode: str, task: Callable, item: int, config_list: list, message_id: int +) -> None | str: err = None if item in config_list and mode == "add": return "ID Already in List" @@ -21,7 +25,7 @@ async def add_or_remove(mode, task, item, config_list, message_id): return err -def extract_user(message): +def extract_user(message: Message) -> tuple: user, err = message.input.strip(), None if not Config.USERS_MESSAGE_ID: return user, "You haven't added `USERS_MESSAGE_ID` Var, Add it." @@ -36,12 +40,12 @@ def extract_user(message): return user, err -def extract_chat(message): +def extract_chat(message: Message) -> tuple: chat, err = message.input.strip() or message.chat.id, None if not Config.AUTO_DL_MESSAGE_ID: - return user, "You haven't added `AUTO_DL_MESSAGE_ID` Var, Add it." + return chat, "You haven't added `AUTO_DL_MESSAGE_ID` Var, Add it." if not chat: - return user, "Unable to Extract Chat IDs. Try again." + return chat, "Unable to Extract Chat IDs. Try again." try: chat = int(chat) except ValueError: @@ -50,19 +54,19 @@ def extract_chat(message): @bot.add_cmd(cmd=["addsudo", "delsudo"]) -async def add_or_remove_sudo(bot, message): +async def add_or_remove_sudo(bot: bot, message: Message) -> Message | None: user, err = extract_user(message) if err: return await message.reply(err) if message.cmd == "addsudo": - mode = "add" - task = Config.USERS.append - action = "Added to" + mode: str = "add" + task: Callable = Config.USERS.append + action: str = "Added to" else: - mode = "remove" - task = Config.USERS.remove - action = "Removed from" + mode: str = "remove" + task: Callable = Config.USERS.remove + action: str = "Removed from" if err := await add_or_remove( mode=mode, @@ -76,19 +80,19 @@ async def add_or_remove_sudo(bot, message): @bot.add_cmd(cmd=["addchat", "delchat"]) -async def add_or_remove_chat(bot, message): +async def add_or_remove_chat(bot: bot, message: Message) -> Message | None: chat, err = extract_chat(message) if err: return await message.reply(err) if message.cmd == "addchat": - mode = "add" - task = Config.CHATS.append - action = "Added to" + mode: str = "add" + task: Callable = Config.CHATS.append + action: str = "Added to" else: - mode = "remove" - task = Config.CHATS.remove - action = "Removed from" + mode: str = "remove" + task: Callable = Config.CHATS.remove + action: str = "Removed from" if err := await add_or_remove( mode=mode, @@ -105,19 +109,19 @@ async def add_or_remove_chat(bot, message): @bot.add_cmd(cmd=["block", "unblock"]) -async def block_or_unblock(bot, message): +async def block_or_unblock(bot: bot, message: Message) -> Message | None: user, err = extract_user(message) if err: return await message.reply(err) if message.cmd == "block": - mode = "add" - task = Config.BLOCKED_USERS.append - action = "Added to" + mode: str = "add" + task: Callable = Config.BLOCKED_USERS.append + action: str = "Added to" else: - mode = "remove" - task = Config.BLOCKED_USERS.remove - action = "Removed from" + mode: str = "remove" + task: Callable = Config.BLOCKED_USERS.remove + action: str = "Removed from" if err := await add_or_remove( mode=mode, @@ -131,18 +135,18 @@ async def block_or_unblock(bot, message): @bot.add_cmd(cmd=["enable", "disable"]) -async def auto_dl_trigger(bot, message): +async def auto_dl_trigger(bot: bot, message: Message) -> Message | None: if not Config.DISABLED_CHATS_MESSAGE_ID: return await message.reply("You haven't added `DISABLED_CHATS_ID` Var, Add it.") if message.cmd == "disable": - mode = "add" - task = Config.DISABLED_CHATS.append - action = "Added to" + mode: str = "add" + task: Callable = Config.DISABLED_CHATS.append + action: str = "Added to" else: - mode = "remove" - task = Config.DISABLED_CHATS.remove - action = "Removed from" + mode: str = "remove" + task: Callable = Config.DISABLED_CHATS.remove + action: str = "Removed from" if err := await add_or_remove( mode=mode, diff --git a/app/plugins/dev_tools.py b/app/plugins/dev_tools.py index edd6b1d..270e69a 100644 --- a/app/plugins/dev_tools.py +++ b/app/plugins/dev_tools.py @@ -6,31 +6,31 @@ from io import StringIO from pyrogram.enums import ParseMode -from app import Config -from app.core import shell - -from app.core import aiohttp_tools as aio # isort:skip +from app import Config, Message, bot +from app.core import shell, aiohttp_tools as aio # isort:skip # Run shell commands -async def run_cmd(bot, message): - cmd = message.input.strip() - reply = await message.reply("executing...") +async def run_cmd(bot: bot, message: Message) -> Message | None: + cmd: str = message.input.strip() + reply: Message = await message.reply("executing...") try: - proc_stdout = await asyncio.Task(shell.run_shell_cmd(cmd), name=reply.task_id) + proc_stdout: str = await asyncio.Task( + shell.run_shell_cmd(cmd), name=reply.task_id + ) except asyncio.exceptions.CancelledError: return await reply.edit("`Cancelled...`") - output = f"`${cmd}`\n\n`{proc_stdout}`" + output: str = f"~$`{cmd}`\n\n`{proc_stdout}`" return await reply.edit(output, name="sh.txt", disable_web_page_preview=True) -# Shell but Live Output -async def live_shell(bot, message): - cmd = message.input.strip() - reply = await message.reply("`getting live output....`") - sub_process = await shell.AsyncShell.run_cmd(cmd) - sleep_for = 1 - output = "" +# Shell with Live Output +async def live_shell(bot: bot, message: Message) -> Message | None: + cmd: str = message.input.strip() + reply: Message = await message.reply("`getting live output....`") + sub_process: shell.AsyncShell = await shell.AsyncShell.run_cmd(cmd) + sleep_for: int = 1 + output: str = "" try: async for stdout in sub_process.get_output(): if output != stdout: @@ -46,7 +46,7 @@ async def live_shell(bot, message): await asyncio.Task(asyncio.sleep(sleep_for), name=reply.task_id) sleep_for += 1 return await reply.edit( - f"`$ {cmd}\n\n``{sub_process.full_std}`", + f"~$`{cmd}\n\n``{sub_process.full_std}`", name="shell.txt", disable_web_page_preview=True, ) @@ -58,11 +58,11 @@ async def live_shell(bot, message): # Run Python code -async def executor(bot, message): - code = message.flt_input.strip() +async def executor(bot: bot, message: Message) -> Message | None: + code: str = message.flt_input.strip() if not code: return await message.reply("exec Jo mama?") - reply = await message.reply("executing") + reply: Message = await message.reply("executing") sys.stdout = codeOut = StringIO() sys.stderr = codeErr = StringIO() # Indent code as per proper python syntax @@ -81,10 +81,15 @@ async def executor(bot, message): sys.stderr = sys.__stderr__ output = codeErr.getvalue().strip() or codeOut.getvalue().strip() if func_out is not None: - output = "\n\n".join([output, str(func_out)]).strip() - if "-s" not in message.flags: + output = f"{output}\n\n{func_out}".strip() + elif not output and "-s" in message.flags: + await reply.delete() + return + if "-s" in message.flags: + output = f">> `{output}`" + else: output = f"> `{code}`\n\n>> `{output}`" - return await reply.edit( + await reply.edit( output, name="exec.txt", disable_web_page_preview=True, @@ -92,17 +97,17 @@ async def executor(bot, message): ) -async def loader(bot, message): +async def loader(bot: bot, message: Message) -> Message | None: if ( not message.replied or not message.replied.document or not message.replied.document.file_name.endswith(".py") ): return await message.reply("reply to a plugin.") - reply = await message.reply("Loading....") - file_name = message.replied.document.file_name.rstrip(".py") + reply: Message = await message.reply("Loading....") + file_name: str = message.replied.document.file_name.rstrip(".py") reload = sys.modules.pop(f"app.temp.{file_name}", None) - status = "Reloaded" if reload else "Loaded" + status: str = "Reloaded" if reload else "Loaded" await message.replied.download("app/temp/") try: importlib.import_module(f"app.temp.{file_name}") @@ -111,6 +116,26 @@ async def loader(bot, message): await reply.edit(f"{status} {file_name}.py.") +@bot.add_cmd(cmd="c") +async def cancel_task(bot: bot, message: Message) -> Message | None: + task_id: str | None = message.replied_task_id + if not task_id: + return await message.reply( + text="Reply To a Command or Bot's Response Message.", del_in=8 + ) + all_tasks: set[asyncio.all_tasks] = asyncio.all_tasks() + tasks: list[asyncio.Task] | None = [x for x in all_tasks if x.get_name() == task_id] + if not tasks: + return await message.reply( + text="Task not in Currently Running Tasks.", del_in=8 + ) + response: str = "" + for task in tasks: + status: bool = task.cancel() + response += f"Task: __{task.get_name()}__\nCancelled: __{status}__\n" + await message.reply(response, del_in=5) + + if Config.DEV_MODE: Config.CMD_DICT["sh"] = run_cmd Config.CMD_DICT["shell"] = live_shell diff --git a/app/plugins/song.py b/app/plugins/song.py index ebc3d08..cdb2203 100644 --- a/app/plugins/song.py +++ b/app/plugins/song.py @@ -6,7 +6,7 @@ from urllib.parse import urlparse import yt_dlp -from app import bot +from app import Message, bot from app.api.ytdl import FakeLogger from app.core.aiohttp_tools import in_memory_dl @@ -21,43 +21,41 @@ domains = [ @bot.add_cmd(cmd="song") -async def song_dl(bot, message): +async def song_dl(bot: bot, message: Message) -> None | Message: reply_query = None - audio_file = None - artist = None if message.replied: - for link in message.replied.text.split(): + for link in message.replied.text_list: if urlparse(link).netloc in domains: reply_query = link break query = reply_query or message.flt_input if not query: return await message.reply("Give a song name or link to download.") - response = await message.reply("Searching....") - dl_path = f"downloads/{time()}/" - query_or_search = query if query.startswith("http") else f"ytsearch:{query}" + response: Message = await message.reply("Searching....") + dl_path: str = f"downloads/{time()}/" + query_or_search: str = query if query.startswith("http") else f"ytsearch:{query}" if "-m" in message.flags: - aformat = "mp3" + a_format = "mp3" else: - aformat = "opus" + a_format = "opus" yt_opts = { "logger": FakeLogger(), "outtmpl": dl_path + "%(title)s.%(ext)s", "format": "bestaudio", "postprocessors": [ - {"key": "FFmpegExtractAudio", "preferredcodec": aformat}, + {"key": "FFmpegExtractAudio", "preferredcodec": a_format}, {"key": "FFmpegMetadata"}, {"key": "EmbedThumbnail"}, ], } - ytdl = yt_dlp.YoutubeDL(yt_opts) - yt_info = await asyncio.to_thread(ytdl.extract_info, query_or_search) + ytdl: yt_dlp.YoutubeDL = yt_dlp.YoutubeDL(yt_opts) + yt_info: dict = await asyncio.to_thread(ytdl.extract_info, query_or_search) if not query_or_search.startswith("http"): - yt_info = yt_info["entries"][0] - duration = yt_info["duration"] - artist = yt_info["channel"] + yt_info: str = yt_info["entries"][0] + duration: int = yt_info["duration"] + artist: str = yt_info["channel"] thumb = await in_memory_dl(yt_info["thumbnail"]) - down_path = glob.glob(dl_path + "*") + down_path: list = glob.glob(dl_path + "*") if not down_path: return await response.edit("Not found") await response.edit("Uploading....") diff --git a/app/plugins/tg_utils.py b/app/plugins/tg_utils.py index 82a71c9..6319d63 100644 --- a/app/plugins/tg_utils.py +++ b/app/plugins/tg_utils.py @@ -2,36 +2,38 @@ import os from pyrogram.enums import ChatType from pyrogram.errors import BadRequest -from pyrogram.types import Message -from app import bot +from app import Config, Message, bot # Delete replied and command message @bot.add_cmd(cmd="del") -async def delete_message(bot, message: Message): +async def delete_message(bot: bot, message: Message) -> None: await message.delete(reply=True) # Delete Multiple messages from replied to command. @bot.add_cmd(cmd="purge") -async def purge_(bot, message: Message): - reply = message.replied - if not reply: +async def purge_(bot: bot, message: Message) -> None | Message: + start_message: int = message.reply_id + if not start_message: return await message.reply("reply to a message") - start_message = reply.id - end_message = message.id - messages = [end_message] + [i for i in range(int(start_message), int(end_message))] + end_message: int = message.id + messages: list[int] = [ + end_message, + *[i for i in range(int(start_message), int(end_message))], + ] await bot.delete_messages( chat_id=message.chat.id, message_ids=messages, revoke=True ) @bot.add_cmd(cmd="ids") -async def get_ids(bot, message): - if reply := message.replied: - ids = "" +async def get_ids(bot: bot, message: Message) -> None: + reply: Message = message.replied + if reply: + ids: str = "" reply_forward = reply.forward_from_chat reply_user = reply.from_user ids += f"Chat : `{reply.chat.id}`\n" @@ -40,13 +42,13 @@ async def get_ids(bot, message): if reply_user: ids += f"User : {reply.from_user.id}" else: - ids = f"Chat :`{message.chat.id}`" + ids: str = f"Chat :`{message.chat.id}`" await message.reply(ids) @bot.add_cmd(cmd="join") -async def join_chat(bot, message): - chat = message.input +async def join_chat(bot: bot, message: Message) -> Message: + chat: str = message.input try: await bot.join_chat(chat) except (KeyError, BadRequest): @@ -58,11 +60,16 @@ async def join_chat(bot, message): @bot.add_cmd(cmd="leave") -async def leave_chat(bot, message): +async def leave_chat(bot: bot, message: Message) -> None: if message.input: chat = message.input else: chat = message.chat.id + await message.reply( + f"Leaving current chat in 5\nReply with `{Config.TRIGGER}c` to cancel", + del_in=5, + block=True, + ) try: await bot.leave_chat(chat) except Exception as e: @@ -70,8 +77,8 @@ async def leave_chat(bot, message): @bot.add_cmd(cmd="reply") -async def reply(bot, message): - text = message.input +async def reply(bot: bot, message: Message) -> None: + text: str = message.input await bot.send_message( chat_id=message.chat.id, text=text, diff --git a/app/plugins/tools.py b/app/plugins/tools.py index 83d17d4..5817ea9 100644 --- a/app/plugins/tools.py +++ b/app/plugins/tools.py @@ -1,21 +1,23 @@ import asyncio -from app import bot +from app import Message, bot -@bot.add_cmd(cmd=["cancel", "c"]) -async def cancel_task(bot, message): - task_id = message.replied_task_id +@bot.add_cmd(cmd="c") +async def cancel_task(bot: bot, message: Message) -> Message | None: + task_id: str | None = message.replied_task_id if not task_id: return await message.reply( - "Reply To a Command or Bot's Response Message.", del_in=8 + text="Reply To a Command or Bot's Response Message.", del_in=8 ) - all_tasks = asyncio.all_tasks() - tasks = [x for x in all_tasks if x.get_name() == task_id] + all_tasks: set[asyncio.all_tasks] = asyncio.all_tasks() + tasks: list[asyncio.Task] | None = [x for x in all_tasks if x.get_name() == task_id] if not tasks: - return await message.reply("Task not in Currently Running Tasks.", del_in=8) - response = "" + return await message.reply( + text="Task not in Currently Running Tasks.", del_in=8 + ) + response: str = "" for task in tasks: - status = task.cancel() + status: bool = task.cancel() response += f"Task: __{task.get_name()}__\nCancelled: __{status}__\n" await message.reply(response, del_in=5) diff --git a/app/plugins/bot.py b/app/plugins/utils.py old mode 100755 new mode 100644 similarity index 54% rename from app/plugins/bot.py rename to app/plugins/utils.py index ccd2b4b..4841872 --- a/app/plugins/bot.py +++ b/app/plugins/utils.py @@ -1,10 +1,16 @@ import asyncio import os +from async_lru import alru_cache from git import Repo from pyrogram.enums import ChatType -from app import Config, bot +from app import Config, Message, bot + + +@alru_cache() +async def get_banner() -> Message: + return await bot.get_messages("Social_DL", [2, 3]) @bot.add_cmd(cmd="bot") @@ -13,7 +19,7 @@ async def info(bot, message): chat_count = ( f"\nAuto-Dl enabled in: {len(Config.CHATS)} chats\n" ) - supported_sites, photo = await bot.get_messages("Social_DL", [2, 3]) + supported_sites, photo = await get_banner() await photo.copy( message.chat.id, caption="\n".join([head, chat_count, supported_sites.text.html]), @@ -21,58 +27,62 @@ async def info(bot, message): @bot.add_cmd(cmd="help") -async def help(bot, message): - commands = "\n".join( +async def cmd_list(bot: bot, message: Message) -> None: + commands: str = "\n".join( [f"{Config.TRIGGER}{i}" for i in Config.CMD_DICT.keys()] ) await message.reply(f"Available Commands:\n\n{commands}", del_in=30) @bot.add_cmd(cmd="restart") -async def restart(bot, message, u_resp=None): - reply = u_resp or await message.reply("restarting....") +async def restart(bot: bot, message: Message, u_resp: Message | None = None) -> None: + reply: Message = u_resp or await message.reply("restarting....") if reply.chat.type in (ChatType.GROUP, ChatType.SUPERGROUP): os.environ["RESTART_MSG"] = str(reply.id) os.environ["RESTART_CHAT"] = str(reply.chat.id) - await bot.restart() + await bot.restart(hard="-h" in message.flags) @bot.add_cmd(cmd="refresh") -async def chat_update(bot, message): +async def chat_update(bot: bot, message: Message) -> None: await bot.set_filter_list() await message.reply("Filters Refreshed", del_in=8) @bot.add_cmd(cmd="repo") -async def sauce(bot, message): +async def sauce(bot: bot, message: Message) -> None: await bot.send_message( chat_id=message.chat.id, - text=f"Social-DL", + text="Social-DL", reply_to_message_id=message.reply_id or message.id, disable_web_page_preview=True, ) @bot.add_cmd(cmd="update") -async def updater(bot, message): - reply = await message.reply("Checking for Updates....") - repo = Repo() +async def updater(bot: bot, message: Message) -> None | Message: + reply: Message = await message.reply("Checking for Updates....") + repo: Repo = Repo() repo.git.fetch() - commits = "" - limit = 0 + commits: str = "" + limit: int = 0 for commit in repo.iter_commits("HEAD..origin/main"): - commits += f"#{commit.count()} {commit.summary} By {commit.author}\n\n" + commits += f""" +#{commit.count()} {commit.summary} By {commit.author} +""" limit += 1 if limit > 50: break if not commits: - return await reply.edit("Already Up To Date.", del_in=5) + return await reply.edit(text="Already Up To Date.", del_in=5) if "-pull" not in message.flags: - return await reply.edit(f"Update Available:\n\n{commits}") + return await reply.edit( + text=f"Update Available:\n\n{commits}", disable_web_page_preview=True + ) repo.git.reset("--hard") repo.git.pull(Config.UPSTREAM_REPO, "--rebase=true") await asyncio.gather( - bot.log(text=f"#Updater\nPulled:\n\n{commits}"), + bot.log(text=f"#Updater\nPulled:\n\n{commits}", disable_web_page_preview=True), reply.edit("Update Found\nPulling...."), ) await restart(bot, message, reply) diff --git a/app/social_dl.py b/app/social_dl.py index 4264bd9..99ca4c4 100644 --- a/app/social_dl.py +++ b/app/social_dl.py @@ -1,22 +1,23 @@ import asyncio import traceback +from typing import Callable, Coroutine from app import Config, bot from app.core import filters from app.core.media_handler import MediaHandler -from app.core.message import Message +from app.core.message import Message, Msg @bot.add_cmd(cmd="dl") -async def dl(bot, message): - reply = await bot.send_message( +async def dl(bot: bot, message: Message): + reply: Message = await bot.send_message( chat_id=message.chat.id, text="`trying to download...`" ) - coro = MediaHandler.process(message) - task = asyncio.Task(coro, name=reply.task_id) - media = await task + coro: Coroutine = MediaHandler.process(message) + task: asyncio.Task = asyncio.Task(coro, name=reply.task_id) + media: MediaHandler = await task if media.exceptions: - exceptions = "\n".join(media.exceptions) + exceptions: str = "\n".join(media.exceptions) await bot.log( traceback=exceptions, func="DL", @@ -31,12 +32,12 @@ async def dl(bot, message): @bot.on_message(filters.user_filter) @bot.on_edited_message(filters.user_filter) -async def cmd_dispatcher(bot, message): - message = Message.parse_message(message) - func = Config.CMD_DICT[message.cmd] - coro = func(bot, message) +async def cmd_dispatcher(bot: bot, message: Msg): + message: Message = Message.parse_message(message) + func: Callable = Config.CMD_DICT[message.cmd] + coro: Coroutine = func(bot, message) try: - task = asyncio.Task(coro, name=message.task_id) + task: asyncio.Task = asyncio.Task(coro, name=message.task_id) await task except asyncio.exceptions.CancelledError: await bot.log(text=f"#Cancelled:\n{message.text}") @@ -49,13 +50,28 @@ async def cmd_dispatcher(bot, message): ) +@bot.on_message(filters.convo_filter, group=0) +@bot.on_edited_message(filters.convo_filter, group=0) +async def convo_handler(bot: bot, message: Msg): + conv_dict: dict = Config.CONVO_DICT[message.chat.id] + conv_filters = conv_dict.get("filters") + if conv_filters: + check = await conv_filters(bot, message) + if not check: + message.continue_propagation() + conv_dict["response"] = message + message.continue_propagation() + conv_dict["response"] = message + message.continue_propagation() + + @bot.on_message(filters.allowed_cmd_filter) @bot.on_message(filters.chat_filter) -async def dl_dispatcher(bot, message): - message = Message.parse_message(message) - coro = dl(bot, message) +async def dl_dispatcher(bot: bot, message: Msg): + message: Message = Message.parse_message(message) + coro: Coroutine = dl(bot, message) try: - task = asyncio.Task(coro, name=message.task_id) + task: asyncio.Task = asyncio.Task(coro, name=message.task_id) await task except asyncio.exceptions.CancelledError: await bot.log(text=f"#Cancelled:\n{message.text}")