Whats New:
• New conversation module
• New Ban-unban, Kick, Mute-Unmute module

Fixes:
• Instagram params bug fix.
• Follow snake_case for gallery_dl

Type Hints and many more misc changes.
This commit is contained in:
anonymousx97 2023-10-23 12:48:11 +05:30
parent 7029de8d78
commit 89e387f2c6
27 changed files with 763 additions and 393 deletions

2
.gitignore vendored
View File

@ -1,3 +1,3 @@
__pycache__
config.env
.idea

View File

@ -77,7 +77,7 @@
Now send another message but this time include your id in the list: [12345678]
* Copy this message's link and add the message id in USERS_MESSAGE_ID var
</details>
</details>
* Trigger : Trigger to access bot.
* Dev Mode: Set to 1 if you want access to exec, sh, shell commands.
> These commands can be dangerous if used carelessly, Turn on at your own risk.

11
app/__init__.py Executable file → Normal file
View File

@ -4,11 +4,14 @@ from dotenv import load_dotenv
load_dotenv("config.env")
from .config import Config
from .core.client import BOT
# isort: skip
from .config import Config # noqa
from app.core.message import Message # noqa
from .core.client import BOT # noqa
if "com.termux" not in os.environ.get("PATH", ""):
import uvloop # isort:skip
if not os.environ.get("TERMUX_APK_RELEASE"):
import uvloop
uvloop.install()
bot = BOT()

0
app/__main__.py Executable file → Normal file
View File

View File

@ -7,13 +7,13 @@ from app.core import shell
from app.core.scraper_config import MediaType, ScraperConfig
class Gallery_DL(ScraperConfig):
def __init__(self, url):
class GalleryDL(ScraperConfig):
def __init__(self, url: str):
super().__init__()
self.url = url
self.url: str = url
async def download_or_extract(self):
self.path = "downloads/" + str(time.time())
self.path: str = "downloads/" + str(time.time())
os.makedirs(self.path)
try:
async with asyncio.timeout(30):
@ -22,8 +22,8 @@ class Gallery_DL(ScraperConfig):
)
except TimeoutError:
pass
files = glob.glob(f"{self.path}/*")
files: list[str] = glob.glob(f"{self.path}/*")
if not files:
return self.cleanup()
self.media = self.success = True
self.type = MediaType.GROUP
self.type: MediaType = MediaType.GROUP

View File

@ -1,34 +1,45 @@
import os
from urllib.parse import urlparse
from app import Config, bot
from app import Config, Message, bot
from app.core.aiohttp_tools import get_json, get_type
from app.core.scraper_config import MediaType, ScraperConfig
API_KEYS = {"KEYS": Config.API_KEYS, "counter": 0}
async def get_key():
keys, count = API_KEYS.values()
count += 1
if count == len(keys):
count = 0
ret_key = keys[count]
API_KEYS["counter"] = count
return ret_key
class Instagram(ScraperConfig):
def __init__(self, url):
super().__init__()
self.shortcode = os.path.basename(urlparse(url).path.rstrip("/"))
self.api_url = f"https://www.instagram.com/graphql/query?query_hash=2b0673e0dc4580674a88d426fe00ea90&variables=%7B%22shortcode%22%3A%22{self.shortcode}%22%7D"
self.api_url = (f"https://www.instagram.com/graphql/query?query_hash=2b0673e0dc4580674a88d426fe00ea90"
f"&variables=%7B%22shortcode%22%3A%22{self.shortcode}%22%7D")
self.url = url
self.dump = True
async def check_dump(self):
async def check_dump(self) -> None | bool:
if not Config.DUMP_ID:
return
async for message in bot.search_messages(Config.DUMP_ID, "#" + self.shortcode):
self.media = message
self.type = MediaType.MESSAGE
self.in_dump = True
self.media: Message = message
self.type: MediaType = MediaType.MESSAGE
self.in_dump: bool = True
return True
async def download_or_extract(self):
for func in [self.check_dump, self.api_3, self.no_api_dl, self.api_dl]:
if await func():
self.success = True
self.success: bool = True
break
async def api_3(self):
@ -37,28 +48,28 @@ class Instagram(ScraperConfig):
if not response:
return
self.caption = "."
data = (
response.get("videos", [])
+ response.get("images", [])
+ response.get("stories", [])
data: list = (
response.get("videos", [])
+ response.get("images", [])
+ response.get("stories", [])
)
if not data:
return
if len(data) > 1:
self.type = MediaType.GROUP
self.media = data
self.media: list = data
return True
else:
self.media = data[0]
self.type = get_type(self.media)
self.media: str = data[0]
self.type: MediaType = get_type(self.media)
return True
async def no_api_dl(self):
response = await aiohttp_tools.get_json(url=self.api_url)
response = await get_json(url=self.api_url)
if (
not response
or "data" not in response
or not response["data"]["shortcode_media"]
not response
or "data" not in response
or not response["data"]["shortcode_media"]
):
return
return await self.parse_ghraphql(response["data"]["shortcode_media"])
@ -67,45 +78,35 @@ class Instagram(ScraperConfig):
if not Config.API_KEYS:
return
param = {
"api_key": await self.get_key(),
"api_key": await get_key(),
"url": self.api_url,
"proxy": "residential",
"js": False,
"js": "false",
}
response = await aiohttp_tools.get_json(
response: dict | None = await get_json(
url="https://api.webscraping.ai/html", timeout=30, params=param
)
if (
not response
or "data" not in response
or not response["data"]["shortcode_media"]
not response
or "data" not in response
or not response["data"]["shortcode_media"]
):
return
self.caption = ".."
return await self.parse_ghraphql(response["data"]["shortcode_media"])
async def parse_ghraphql(self, json_: dict):
type_check = json_.get("__typename", None)
async def parse_ghraphql(self, json_: dict) -> str | list | None:
type_check: str | None = json_.get("__typename", None)
if not type_check:
return
elif type_check == "GraphSidecar":
self.media = [
self.media: list[str] = [
i["node"].get("video_url") or i["node"].get("display_url")
for i in json_["edge_sidecar_to_children"]["edges"]
]
self.type = MediaType.GROUP
self.type: MediaType = MediaType.GROUP
else:
self.media = json_.get("video_url", json_.get("display_url"))
self.thumb = json_.get("display_url")
self.type = get_type(self.media)
self.media: str = json_.get("video_url", json_.get("display_url"))
self.thumb: str = json_.get("display_url")
self.type: MediaType = get_type(self.media)
return self.media
# Rotating Key function to avoid hitting limit on single Key
async def get_key(self):
keys, count = API_KEYS.values()
count += 1
if count == len(keys):
count = 0
ret_key = keys[count]
API_KEYS["counter"] = count
return ret_key

View File

@ -12,57 +12,59 @@ class Reddit(ScraperConfig):
def __init__(self, url):
super().__init__()
parsed_url = urlparse(url)
self.url = f"https://www.reddit.com{parsed_url.path}.json?limit=1"
self.url: str = f"https://www.reddit.com{parsed_url.path}.json?limit=1"
async def download_or_extract(self):
headers = {
async def download_or_extract(self) -> None:
headers: dict = {
"user-agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_8_7 rv:5.0; en-US) AppleWebKit/533.31.5 (KHTML, like Gecko) Version/4.0 Safari/533.31.5"
}
response = await get_json(url=self.url, headers=headers, json_=True)
response: dict | None = await get_json(
url=self.url, headers=headers, json_=True
)
if not response:
return
try:
json_ = response[0]["data"]["children"][0]["data"]
json_: dict = response[0]["data"]["children"][0]["data"]
except BaseException:
return
self.caption = (
self.caption: str = (
f"""__{json_["subreddit_name_prefixed"]}:__\n**{json_["title"]}**"""
)
self.thumb = json_.get("thumbnail")
self.thumb: str = json_.get("thumbnail")
if json_.get("is_gallery"):
self.media = [
self.media: list[str] = [
val["s"].get("u", val["s"].get("gif")).replace("preview", "i")
for val in json_["media_metadata"].values()
]
self.success = True
self.type = MediaType.GROUP
self.type: MediaType = MediaType.GROUP
return
hls = re.findall(r"'hls_url'\s*:\s*'([^']*)'", str(json_))
hls: list[str] = re.findall(r"'hls_url'\s*:\s*'([^']*)'", str(json_))
if hls:
self.path = "downloads/" + str(time.time())
self.path: str = "downloads/" + str(time.time())
os.makedirs(self.path)
self.media = f"{self.path}/v.mp4"
vid_url = hls[0]
self.media: str = f"{self.path}/v.mp4"
vid_url: str = hls[0]
await shell.run_shell_cmd(
f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {self.media}'
)
self.thumb = await shell.take_ss(video=self.media, path=self.path)
self.success = True
self.type = (
self.type: MediaType.VIDEO | MediaType.GIF = (
MediaType.VIDEO
if await shell.check_audio(self.media)
else MediaType.GIF
)
return
generic = json_.get("url_overridden_by_dest", "").strip()
self.type = get_type(generic)
generic: str = json_.get("url_overridden_by_dest", "").strip()
self.type: MediaType = get_type(generic)
if self.type:
self.media = generic
self.media: str = generic
self.success = True

View File

@ -10,12 +10,12 @@ from app.core.scraper_config import MediaType, ScraperConfig
class Threads(ScraperConfig):
def __init__(self, url):
super().__init__()
self.url = url
self.url: str = url
async def download_or_extract(self):
shortcode = os.path.basename(urlparse(self.url).path.rstrip("/"))
shortcode: str = os.path.basename(urlparse(self.url).path.rstrip("/"))
response = await (
response: str = await (
await aiohttp_tools.SESSION.get(
f"https://www.threads.net/t/{shortcode}/embed/"
)

View File

@ -7,18 +7,18 @@ tiktok_scraper = Tiktok_Scraper(quiet=True)
class Tiktok(ScraperConfig):
def __init__(self, url):
super().__init__()
self.url = url
self.url: str = url
async def download_or_extract(self):
media = await tiktok_scraper.hybrid_parsing(self.url)
media: dict | None = await tiktok_scraper.hybrid_parsing(self.url)
if not media or "status" not in media or media["status"] == "failed":
return
if "video_data" in media:
self.media = media["video_data"]["nwm_video_url_HQ"]
self.thumb = media["cover_data"]["dynamic_cover"]["url_list"][0]
self.media: str = media["video_data"]["nwm_video_url_HQ"]
self.thumb: str = media["cover_data"]["dynamic_cover"]["url_list"][0]
self.success = True
self.type = MediaType.VIDEO
if "image_data" in media:
self.media = media["image_data"]["no_watermark_image_list"]
self.media: list[str] = media["image_data"]["no_watermark_image_list"]
self.success = True
self.type = MediaType.GROUP

View File

@ -21,14 +21,14 @@ class FakeLogger(object):
pass
class YT_DL(ScraperConfig):
class YouTubeDL(ScraperConfig):
def __init__(self, url):
super().__init__()
self.url = url
self.path = "downloads/" + str(time.time())
self.video_path = self.path + "/v.mp4"
self.url: str = url
self.path: str = "downloads/" + str(time.time())
self.video_path: str = self.path + "/v.mp4"
self.type = MediaType.VIDEO
_opts = {
_opts: dict = {
"outtmpl": self.video_path,
"ignoreerrors": True,
"ignore_no_formats_error": True,
@ -37,17 +37,17 @@ class YT_DL(ScraperConfig):
"noplaylist": True,
"format": self.get_format(),
}
self.yt_obj = yt_dlp.YoutubeDL(_opts)
self.yt_obj: yt_dlp.YoutubeDL = yt_dlp.YoutubeDL(_opts)
async def download_or_extract(self):
info = await self.get_info()
info: dict = await self.get_info()
if not info:
return
await asyncio.to_thread(self.yt_obj.download, self.url)
if "youtu" in self.url:
self.caption = (
self.caption: str = (
f"""__{info.get("channel","")}__:\n**{info.get("title","")}**"""
)
@ -56,7 +56,7 @@ class YT_DL(ScraperConfig):
self.thumb = await take_ss(self.video_path, path=self.path)
self.success = True
async def get_info(self):
async def get_info(self) -> None | dict:
if os.path.basename(self.url).startswith("@") or "/hashtag/" in self.url:
return
info = await asyncio.to_thread(
@ -70,7 +70,7 @@ class YT_DL(ScraperConfig):
return
return info
def get_format(self):
def get_format(self) -> str:
if "/shorts" in self.url:
return "bv[ext=mp4][res=720]+ba[ext=m4a]/b[ext=mp4]"
elif "youtu" in self.url:

View File

@ -1,29 +1,37 @@
import json
import os
from typing import Callable
from pyrogram.filters import Filter
from pyrogram.types import Message
class Config:
API_KEYS = json.loads(os.environ.get("API_KEYS", "[]"))
API_KEYS: list[int] = json.loads(os.environ.get("API_KEYS", "[]"))
BLOCKED_USERS = []
BLOCKED_USERS_MESSAGE_ID = int(os.environ.get("BLOCKED_USERS_MESSAGE_ID", 0))
BLOCKED_USERS: list[int] = []
BLOCKED_USERS_MESSAGE_ID: int = int(os.environ.get("BLOCKED_USERS_MESSAGE_ID", 0))
CHATS = []
AUTO_DL_MESSAGE_ID = int(os.environ.get("AUTO_DL_MESSAGE_ID", 0))
CHATS: list[int] = []
AUTO_DL_MESSAGE_ID: int = int(os.environ.get("AUTO_DL_MESSAGE_ID", 0))
CMD_DICT = {}
CMD_DICT: dict[str, Callable] = {}
CONVO_DICT: dict[int, dict[str | int, Message | Filter | None]] = {}
DEV_MODE = int(os.environ.get("DEV_MODE", 0))
DEV_MODE: int = int(os.environ.get("DEV_MODE", 0))
DISABLED_CHATS = []
DISABLED_CHATS_MESSAGE_ID = int(os.environ.get("DISABLED_CHATS_MESSAGE_ID", 0))
DISABLED_CHATS: list[int] = []
DISABLED_CHATS_MESSAGE_ID: int = int(os.environ.get("DISABLED_CHATS_MESSAGE_ID", 0))
DUMP_ID = int(os.environ.get("DUMP_ID",0))
DUMP_ID: int = int(os.environ.get("DUMP_ID", 0))
LOG_CHAT = int(os.environ.get("LOG_CHAT"))
TRIGGER = os.environ.get("TRIGGER", ".")
LOG_CHAT: int = int(os.environ.get("LOG_CHAT"))
UPSTREAM_REPO = os.environ.get("UPSTREAM_REPO","https://github.com/anonymousx97/social-dl").rstrip("/")
TRIGGER: str = os.environ.get("TRIGGER", ".")
USERS = []
USERS_MESSAGE_ID = int(os.environ.get("USERS_MESSAGE_ID", 0))
UPSTREAM_REPO = os.environ.get(
"UPSTREAM_REPO", "https://github.com/anonymousx97/social-dl"
).rstrip("/")
USERS: list[int] = []
USERS_MESSAGE_ID: int = int(os.environ.get("USERS_MESSAGE_ID", 0))

14
app/core/aiohttp_tools.py Executable file → Normal file
View File

@ -7,10 +7,10 @@ import aiohttp
from app.core.scraper_config import MediaType
SESSION = None
SESSION: aiohttp.ClientSession | None = None
async def session_switch():
async def session_switch() -> None:
if not SESSION:
globals().update({"SESSION": aiohttp.ClientSession()})
else:
@ -23,7 +23,7 @@ async def get_json(
params: dict = None,
json_: bool = False,
timeout: int = 10,
):
) -> dict | None:
try:
async with SESSION.get(
url=url, headers=headers, params=params, timeout=timeout
@ -37,7 +37,7 @@ async def get_json(
return
async def in_memory_dl(url: str):
async def in_memory_dl(url: str) -> BytesIO:
async with SESSION.get(url) as remote_file:
bytes_data = await remote_file.read()
file = BytesIO(bytes_data)
@ -45,7 +45,7 @@ async def in_memory_dl(url: str):
return file
def get_filename(url):
def get_filename(url: str) -> str:
name = basename(urlparse(url).path.rstrip("/")).lower()
if name.endswith((".webp", ".heic")):
name = name + ".jpg"
@ -54,7 +54,7 @@ def get_filename(url):
return name
def get_type(url):
def get_type(url: str) -> MediaType | None:
name, ext = splitext(get_filename(url))
if ext in {".png", ".jpg", ".jpeg"}:
return MediaType.PHOTO
@ -64,7 +64,7 @@ def get_type(url):
return MediaType.GIF
async def thumb_dl(thumb):
async def thumb_dl(thumb) -> BytesIO | str | None:
if not thumb or not thumb.startswith("http"):
return thumb
return await in_memory_dl(thumb)

View File

@ -1,4 +1,3 @@
import base64
import glob
import importlib
import json
@ -7,14 +6,26 @@ import sys
from functools import wraps
from io import BytesIO
from pyrogram import Client, idle
from pyrogram import Client, filters, idle
from pyrogram.enums import ParseMode
from pyrogram.types import Message as Msg
from app import Config
from app.core import aiohttp_tools
from app.core.conversation import Conversation
from app.core.message import Message
async def import_modules():
for py_module in glob.glob(pathname="app/**/*.py", recursive=True):
name = os.path.splitext(py_module)[0]
py_name = name.replace("/", ".")
try:
importlib.import_module(py_name)
except Exception as e:
print(e)
class BOT(Client):
def __init__(self):
if string := os.environ.get("STRING_SESSION"):
@ -32,7 +43,8 @@ class BOT(Client):
max_concurrent_transmissions=2,
)
def add_cmd(self, cmd, trigger=Config.TRIGGER): # Custom triggers To do
@staticmethod
def add_cmd(cmd: str):
def the_decorator(func):
@wraps(func)
def wrapper():
@ -47,9 +59,22 @@ class BOT(Client):
return the_decorator
async def boot(self):
@staticmethod
async def get_response(
chat_id: int, filters: filters.Filter = None, timeout: int = 8
) -> Message | None:
try:
async with Conversation(
chat_id=chat_id, filters=filters, timeout=timeout
) as convo:
response: Message | None = await convo.get_response()
return response
except Conversation.TimeOutError:
return
async def boot(self) -> None:
await super().start()
await self.import_modules()
await import_modules()
await self.set_filter_list()
await aiohttp_tools.session_switch()
await self.edit_restart_msg()
@ -58,7 +83,7 @@ class BOT(Client):
await idle()
await aiohttp_tools.session_switch()
async def edit_restart_msg(self):
async def edit_restart_msg(self) -> None:
restart_msg = int(os.environ.get("RESTART_MSG", 0))
restart_chat = int(os.environ.get("RESTART_CHAT", 0))
if restart_msg and restart_chat:
@ -71,24 +96,26 @@ class BOT(Client):
os.environ.pop("RESTART_MSG", "")
os.environ.pop("RESTART_CHAT", "")
async def import_modules(self):
for py_module in glob.glob("app/**/*.py", recursive=True):
name = os.path.splitext(py_module)[0]
py_name = name.replace("/", ".")
importlib.import_module(py_name)
async def log(
self,
text="",
traceback="",
chat=None,
func=None,
message: Message | Msg | None = None,
name="log.txt",
disable_web_page_preview=True,
parse_mode=ParseMode.HTML,
):
) -> Message | Msg:
if message:
return (await message.copy(chat_id=Config.LOG_CHAT)) # fmt: skip
if traceback:
text = f"#Traceback\n<b>Function:</b> {func}\n<b>Chat:</b> {chat}\n<b>Traceback:</b>\n<code>{traceback}</code>"
text = f"""
#Traceback
<b>Function:</b> {func}
<b>Chat:</b> {chat}
<b>Traceback:</b>
<code>{traceback}</code>"""
return await self.send_message(
chat_id=Config.LOG_CHAT,
text=text,
@ -97,13 +124,27 @@ class BOT(Client):
parse_mode=parse_mode,
)
async def restart(self):
async def restart(self, hard=False) -> None:
await aiohttp_tools.session_switch()
await super().stop(block=False)
if hard:
os.execl("/bin/bash", "/bin/bash", "run")
os.execl(sys.executable, sys.executable, "-m", "app")
SECRET_API = base64.b64decode("YS56dG9yci5tZS9hcGkvaW5zdGE=").decode("utf-8")
async def send_message(
self, chat_id: int | str, text, name: str = "output.txt", **kwargs
) -> Message | Msg:
text = str(text)
if len(text) < 4096:
return Message.parse_message(
(await super().send_message(chat_id=chat_id, text=text, **kwargs))
)
doc = BytesIO(bytes(text, encoding="utf-8"))
doc.name = name
kwargs.pop("disable_web_page_preview", "")
return await super().send_document(chat_id=chat_id, document=doc, **kwargs)
async def set_filter_list(self):
chats_id = Config.AUTO_DL_MESSAGE_ID
blocked_id = Config.BLOCKED_USERS_MESSAGE_ID
@ -126,13 +167,3 @@ class BOT(Client):
Config.DISABLED_CHATS = json.loads(
(await super().get_messages(Config.LOG_CHAT, disabled)).text
)
async def send_message(self, chat_id, text, name: str = "output.txt", **kwargs):
if len(str(text)) < 4096:
return Message.parse_message(
(await super().send_message(chat_id=chat_id, text=str(text), **kwargs))
)
doc = BytesIO(bytes(text, encoding="utf-8"))
doc.name = name
kwargs.pop("disable_web_page_preview", "")
return await super().send_document(chat_id=chat_id, document=doc, **kwargs)

47
app/core/conversation.py Normal file
View File

@ -0,0 +1,47 @@
import asyncio
import json
from pyrogram.filters import Filter
from pyrogram.types import Message
from app import Config
class Conversation:
class DuplicateConvo(Exception):
def __init__(self, chat: str | int | None = None):
text = "Conversation already started"
if chat:
text += f" with {chat}"
super().__init__(text)
class TimeOutError(Exception):
def __init__(self):
super().__init__("Conversation Timeout")
def __init__(self, chat_id: int, filters: Filter | None = None, timeout: int = 10):
self.chat_id = chat_id
self.filters = filters
self.timeout = timeout
def __str__(self):
return json.dumps(self.__dict__, indent=4, ensure_ascii=False)
async def get_response(self, timeout: int | None = None) -> Message | None:
try:
async with asyncio.timeout(timeout or self.timeout):
while not Config.CONVO_DICT[self.chat_id]["response"]:
await asyncio.sleep(0)
return Config.CONVO_DICT[self.chat_id]["response"]
except asyncio.TimeoutError:
raise self.TimeOutError
async def __aenter__(self) -> "Conversation":
if self.chat_id in Config.CONVO_DICT:
raise self.DuplicateConvo(self.chat_id)
convo_dict = {"filters": self.filters, "response": None}
Config.CONVO_DICT[self.chat_id] = convo_dict
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
Config.CONVO_DICT.pop(self.chat_id, "")

View File

@ -1,14 +1,15 @@
from urllib.parse import urlparse
from pyrogram import filters as _filters
from pyrogram.types import Message
from app import Config
from app.core.media_handler import url_map
def check_for_urls(text_list):
def check_for_urls(text_list: list):
for link in text_list:
if match := url_map.get(urlparse(link).netloc):
if url_map.get(urlparse(link).netloc):
return True
else:
for key in url_map.keys():
@ -16,7 +17,7 @@ def check_for_urls(text_list):
return True
def dynamic_chat_filter(_, __, message, cmd=False):
def dynamic_chat_filter(_, __, message: Message, cmd=False) -> bool:
if (
not message.text
or (not message.text.startswith("https") and not cmd)
@ -34,17 +35,16 @@ def dynamic_chat_filter(_, __, message, cmd=False):
return bool(url_check)
def dynamic_allowed_list(_, __, message):
def dynamic_allowed_list(_, __, message: Message) -> bool:
if not dynamic_chat_filter(_, __, message, cmd=True):
return False
start_str = message.text.split(maxsplit=1)[0]
cmd = start_str.replace("/", "", 1)
cmd_check = cmd in {"download", "dl", "down"}
cmd = message.text.split(maxsplit=1)[0]
cmd_check = cmd in {"/download", "/dl", "/down"}
reaction_check = not message.reactions
return bool(cmd_check and reaction_check)
def dynamic_cmd_filter(_, __, message):
def dynamic_cmd_filter(_, __, message: Message) -> bool:
if (
not message.text
or not message.text.startswith(Config.TRIGGER)
@ -63,3 +63,7 @@ def dynamic_cmd_filter(_, __, message):
chat_filter = _filters.create(dynamic_chat_filter)
user_filter = _filters.create(dynamic_cmd_filter)
allowed_cmd_filter = _filters.create(dynamic_allowed_list)
convo_filter = _filters.create(
lambda _, __, message: (message.chat.id in Config.CONVO_DICT)
and (not message.reactions)
)

View File

@ -1,82 +1,98 @@
import asyncio
import glob
import json
import os
import traceback
from functools import lru_cache
from io import BytesIO
from urllib.parse import urlparse
from pyrogram.errors import MediaEmpty, PhotoSaveFileInvalid, WebpageCurlFailed
from pyrogram.types import InputMediaPhoto, InputMediaVideo
from app import Config
from app.api.gallerydl import Gallery_DL
from app import Config, Message, bot
from app.api.gallery_dl import GalleryDL
from app.api.instagram import Instagram
from app.api.reddit import Reddit
from app.api.threads import Threads
from app.api.tiktok import Tiktok
from app.api.ytdl import YT_DL
from app.api.ytdl import YouTubeDL
from app.core import aiohttp_tools, shell
from app.core.scraper_config import MediaType
url_map = {
url_map: dict = {
"tiktok.com": Tiktok,
"www.instagram.com": Instagram,
"www.reddit.com": Reddit,
"reddit.com": Reddit,
"www.threads.net": Threads,
"twitter.com": Gallery_DL,
"youtube.com": YT_DL,
"youtu.be": YT_DL,
"www.facebook.com": YT_DL,
"twitter.com": GalleryDL,
"x.com": GalleryDL,
"www.x.com": GalleryDL,
"youtube.com": YouTubeDL,
"youtu.be": YouTubeDL,
"www.facebook.com": YouTubeDL,
}
@lru_cache()
def get_url_dict_items():
return url_map.items()
class MediaHandler:
def __init__(self, message):
self.exceptions, self.media_objects, self.sender_dict = [], [], {}
self.__client = message._client
self.message = message
self.doc = "-d" in message.flags
self.spoiler = "-s" in message.flags
self.args_ = {
def __init__(self, message: Message) -> None:
self.exceptions = []
self.media_objects: list[asyncio.Task.result] = []
self.sender_dict = {}
self.__client: bot = message._client
self.message: Message = message
self.doc: bool = "-d" in message.flags
self.spoiler: bool = "-s" in message.flags
self.args_: dict = {
"chat_id": self.message.chat.id,
"reply_to_message_id": message.reply_id,
}
def get_sender(self, reply=False):
def __str__(self):
return json.dumps(self.__dict__, indent=4, ensure_ascii=False)
def get_sender(self, reply: bool = False) -> str:
if "-ns" in self.message.flags:
return ""
text = f"\nShared by : "
author = self.message.author_signature
sender = user.first_name if (user := self.message.from_user) else ""
reply_sender = ""
text: str = f"\nShared by : "
author: str | None = self.message.author_signature
sender: str = user.first_name if (user := self.message.from_user) else ""
reply_sender: str = ""
if reply:
reply_msg = self.message.replied
reply_sender = (
reply_sender: str = (
reply_user.first_name if (reply_user := reply_msg.from_user) else ""
)
if any((author, sender, reply_sender)):
return text + (author or sender if not reply else reply_sender)
else:
return ""
return ""
async def get_media(self):
async def get_media(self) -> None:
async with asyncio.TaskGroup() as task_group:
tasks = []
text_list = self.message.text_list
reply_text_list = self.message.reply_text_list
tasks: list[asyncio.Task] = []
text_list: list[str] = self.message.text_list
reply_text_list: list[str] = self.message.reply_text_list
for link in text_list + reply_text_list:
reply = link in reply_text_list
reply: bool = link in reply_text_list
if match := url_map.get(urlparse(link).netloc):
tasks.append(task_group.create_task(match.start(link)))
self.sender_dict[link] = self.get_sender(reply=reply)
else:
for key, val in url_map.items():
for key, val in get_url_dict_items():
if key in link:
tasks.append(task_group.create_task(val.start(link)))
self.sender_dict[link] = self.get_sender(reply=reply)
self.media_objects = [task.result() for task in tasks if task.result()]
self.media_objects: list[asyncio.Task.result] = [
task.result() for task in tasks if task.result()
]
async def send_media(self):
async def send_media(self) -> None:
for obj in self.media_objects:
if "-nc" in self.message.flags:
caption = ""
@ -122,25 +138,33 @@ class MediaHandler:
"\n".join([obj.caption_url.strip(), traceback.format_exc()])
)
async def send(self, media, method, **kwargs):
async def send(self, media: dict | str, method, caption: str, **kwargs) -> Message:
try:
try:
post = await method(
**media, **self.args_, **kwargs, has_spoiler=self.spoiler
post: Message = await method(
**media,
**self.args_,
caption=caption,
**kwargs,
has_spoiler=self.spoiler,
)
except (MediaEmpty, WebpageCurlFailed):
key, value = list(media.items())[0]
media[key] = await aiohttp_tools.in_memory_dl(value)
post = await method(
**media, **self.args_, **kwargs, has_spoiler=self.spoiler
post: Message = await method(
**media,
**self.args_,
caption=caption,
**kwargs,
has_spoiler=self.spoiler,
)
except PhotoSaveFileInvalid:
post = await self.__client.send_document(
post: Message = await self.__client.send_document(
**self.args_, document=media, caption=caption, force_document=True
)
return post
async def send_document(self, docs, caption, path=""):
async def send_document(self, docs: list, caption: str, path="") -> None:
if not path:
docs = await asyncio.gather(
*[aiohttp_tools.in_memory_dl(doc) for doc in docs]
@ -154,8 +178,8 @@ class MediaHandler:
)
await asyncio.sleep(0.5)
async def send_group(self, media_obj, caption):
sorted = await sort_media(
async def send_group(self, media_obj, caption: str) -> None:
sorted: list[str, list[InputMediaVideo | InputMediaPhoto]] = await sort_media(
caption=caption,
spoiler=self.spoiler,
urls=media_obj.media,
@ -182,18 +206,22 @@ class MediaHandler:
return obj
async def sort_media(caption="", spoiler=False, urls=None, path=None):
async def sort_media(
caption="", spoiler=False, urls=None, path=None
) -> list[str, list[InputMediaVideo | InputMediaPhoto]]:
images, videos, animations = [], [], []
if path and os.path.exists(path):
[os.rename(file_, file_ + ".png") for file_ in glob.glob(f"{path}/*.webp")]
media = glob.glob(f"{path}/*")
media: list[str] = glob.glob(f"{path}/*")
else:
media = await asyncio.gather(*[aiohttp_tools.in_memory_dl(url) for url in urls])
media: tuple[BytesIO] = await asyncio.gather(
*[aiohttp_tools.in_memory_dl(url) for url in urls]
)
for file in media:
if path:
name = file.lower()
name: str = file.lower()
else:
name = file.name.lower()
name: str = file.name.lower()
if name.endswith((".png", ".jpg", ".jpeg")):
images.append(InputMediaPhoto(file, caption=caption, has_spoiler=spoiler))
elif name.endswith((".mp4", ".mkv", ".webm")):
@ -208,7 +236,9 @@ async def sort_media(caption="", spoiler=False, urls=None, path=None):
return await make_chunks(images, videos, animations)
async def make_chunks(images=[], videos=[], animations=[]):
async def make_chunks(
images=[], videos=[], animations=[]
) -> list[str, list[InputMediaVideo | InputMediaPhoto]]:
chunk_imgs = [images[imgs : imgs + 5] for imgs in range(0, len(images), 5)]
chunk_vids = [videos[vids : vids + 5] for vids in range(0, len(videos), 5)]
return [*chunk_imgs, *chunk_vids, *animations]

97
app/core/message.py Executable file → Normal file
View File

@ -2,61 +2,61 @@ import asyncio
from functools import cached_property
from pyrogram.errors import MessageDeleteForbidden
from pyrogram.types import Message as MSG
from pyrogram.filters import Filter
from pyrogram.types import Message as Msg
from pyrogram.types import User
from app import Config
from app.core.conversation import Conversation
class Message(MSG):
def __init__(self, message):
super().__dict__.update(message.__dict__)
class Message(Msg):
def __init__(self, message: Msg) -> None:
args = vars(message)
args["client"] = args.pop("_client", None)
super().__init__(**args)
@cached_property
def cmd(self):
def cmd(self) -> str | None:
raw_cmd = self.text_list[0]
cmd = raw_cmd.lstrip(Config.TRIGGER)
cmd = raw_cmd[1:]
return cmd if cmd in Config.CMD_DICT else None
@cached_property
def flags(self):
def flags(self) -> list:
return [i for i in self.text_list if i.startswith("-")]
@cached_property
def flt_input(self):
split_lines = self.input.splitlines()
split_n_joined = [
" ".join([word for word in line.split(" ") if word not in self.flags])
for line in split_lines
]
return "\n".join(split_n_joined)
def flt_input(self) -> str:
split_lines = self.input.split("\n", maxsplit=1)
split_lines[0] = " ".join(
[word for word in split_lines[0].split(" ") if word not in self.flags]
)
return "\n".join(split_lines)
@cached_property
def input(self):
def input(self) -> str:
if len(self.text_list) > 1:
return self.text.split(maxsplit=1)[-1]
return ""
@cached_property
def replied(self):
def replied(self) -> "Message":
if self.reply_to_message:
return Message.parse_message(self.reply_to_message)
@cached_property
def reply_id(self):
def reply_id(self) -> int | None:
return self.replied.id if self.replied else None
@cached_property
def replied_task_id(self):
def replied_task_id(self) -> str | None:
return self.replied.task_id if self.replied else None
@cached_property
def reply_text_list(self):
def reply_text_list(self) -> list:
reply_text_list = []
if (
self.replied
and (reply_text := self.replied.text)
and "dl" in self.text_list[0]
):
if self.replied and self.replied.text and "dl" in self.text_list[0]:
reply_text_list = self.replied.text_list
return reply_text_list
@ -65,10 +65,10 @@ class Message(MSG):
return f"{self.chat.id}-{self.id}"
@cached_property
def text_list(self):
def text_list(self) -> list:
return self.text.split()
async def async_deleter(self, del_in, task, block):
async def async_deleter(self, del_in, task, block) -> None:
if block:
x = await task
await asyncio.sleep(del_in)
@ -78,7 +78,7 @@ class Message(MSG):
self.async_deleter(del_in=del_in, task=task, block=True)
)
async def delete(self, reply=False):
async def delete(self, reply: bool = False) -> None:
try:
await super().delete()
if reply and self.replied:
@ -86,7 +86,7 @@ class Message(MSG):
except MessageDeleteForbidden:
pass
async def edit(self, text, del_in: int = 0, block=True, **kwargs):
async def edit(self, text, del_in: int = 0, block=True, **kwargs) -> "Message":
if len(str(text)) < 4096:
kwargs.pop("name", "")
task = self.edit_text(text, **kwargs)
@ -100,7 +100,41 @@ class Message(MSG):
)
return reply
async def reply(self, text, del_in: int = 0, block=True, **kwargs):
async def extract_user_n_reason(self) -> list[User | str | Exception, str | None]:
if self.replied:
return [self.replied.from_user, self.flt_input]
inp_list = self.flt_input.split(maxsplit=1)
if not inp_list:
return [
"Unable to Extract User info.\nReply to a user or input @ | id.",
"",
]
user = inp_list[0]
reason = None
if len(inp_list) >= 2:
reason = inp_list[1]
if user.isdigit():
user = int(user)
elif user.startswith("@"):
user = user.strip("@")
try:
return [await self._client.get_users(user_ids=user), reason]
except Exception as e:
return [e, reason]
async def get_response(self, filters: Filter = None, timeout: int = 8):
try:
async with Conversation(
chat_id=self.chat.id, filters=filters, timeout=timeout
) as convo:
response: Message | None = await convo.get_response()
return response
except Conversation.TimeOutError:
return
async def reply(
self, text, del_in: int = 0, block: bool = True, **kwargs
) -> "Message":
task = self._client.send_message(
chat_id=self.chat.id, text=text, reply_to_message_id=self.id, **kwargs
)
@ -110,6 +144,5 @@ class Message(MSG):
return await task
@classmethod
def parse_message(cls, message):
ret_obj = cls(message)
return ret_obj
def parse_message(cls, message: Msg) -> "Message":
return cls(message)

View File

@ -1,5 +1,7 @@
import json
import shutil
from enum import Enum, auto
from io import BytesIO
class MediaType(Enum):
@ -11,22 +13,25 @@ class MediaType(Enum):
class ScraperConfig:
def __init__(self):
self.dump = False
self.in_dump = False
self.path = ""
self.media = ""
self.caption = ""
self.caption_url = ""
self.thumb = None
self.type = None
self.success = False
def __init__(self) -> None:
self.dump: bool = False
self.in_dump: bool = False
self.path: str = ""
self.media: str | BytesIO = ""
self.caption: str = ""
self.caption_url: str = ""
self.thumb: str | None | BytesIO = None
self.type: None | MediaType = None
self.success: bool = False
def set_sauce(self, url):
def __str__(self):
return json.dumps(self.__dict__, indent=4, ensure_ascii=False, default=str)
def set_sauce(self, url: str) -> None:
self.caption_url = f"\n\n<a href='{url}'>Sauce</a>"
@classmethod
async def start(cls, url):
async def start(cls, url: str) -> "ScraperConfig":
obj = cls(url=url)
obj.query_url = url
obj.set_sauce(url)
@ -34,6 +39,6 @@ class ScraperConfig:
if obj.success:
return obj
def cleanup(self):
def cleanup(self) -> None:
if self.path:
shutil.rmtree(self.path, ignore_errors=True)

34
app/core/shell.py Executable file → Normal file
View File

@ -1,8 +1,9 @@
import asyncio
import os
from typing import AsyncIterable
async def take_ss(video: str, path: str):
async def take_ss(video: str, path: str) -> None | str:
thumb = f"{path}/i.png"
await run_shell_cmd(
f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"'''
@ -11,15 +12,15 @@ async def take_ss(video: str, path: str):
return thumb
async def check_audio(file):
async def check_audio(file) -> int:
result = await run_shell_cmd(
f"ffprobe -v error -show_entries format=nb_streams -of default=noprint_wrappers=1:nokey=1 {file}"
)
return int(result or 0) - 1
async def run_shell_cmd(cmd):
proc = await asyncio.create_subprocess_shell(
async def run_shell_cmd(cmd: str) -> str:
proc: asyncio.create_subprocess_shell = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
)
stdout, _ = await proc.communicate()
@ -27,38 +28,37 @@ async def run_shell_cmd(cmd):
class AsyncShell:
def __init__(self, process):
self.process = process
self.full_std = ""
self.is_done = False
def __init__(self, process: asyncio.create_subprocess_shell):
self.process: asyncio.create_subprocess_shell = process
self.full_std: str = ""
self.is_done: bool = False
self._task: asyncio.Task | None = None
async def read_output(self):
async def read_output(self) -> None:
while True:
line = (await self.process.stdout.readline()).decode("utf-8")
line: str = (await self.process.stdout.readline()).decode("utf-8")
if not line:
break
self.full_std += line
self.is_done = True
await self.process.wait()
async def get_output(self):
async def get_output(self) -> AsyncIterable:
while not self.is_done:
yield self.full_std
def cancel(self):
def cancel(self) -> None:
if not self.is_done:
self.process.kill()
self._task.cancel()
@classmethod
async def run_cmd(cls, cmd, name="shell"):
sub_process = cls(
async def run_cmd(cls, cmd: str, name: str = "AsyncShell") -> "AsyncShell":
sub_process: AsyncShell = cls(
process=await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
)
)
sub_process._task = asyncio.create_task(
sub_process.read_output(), name="AsyncShell"
)
sub_process._task = asyncio.create_task(sub_process.read_output(), name=name)
await asyncio.sleep(0.5)
return sub_process

144
app/plugins/admin_tools.py Normal file
View File

@ -0,0 +1,144 @@
import asyncio
from typing import Awaitable
from pyrogram.types import ChatPermissions, ChatPrivileges, User
from app import bot
from app.core.message import Message
def get_privileges(
anon: bool = False, full: bool = False, without_rights: bool = False
) -> ChatPrivileges:
if without_rights:
return ChatPrivileges(
can_manage_chat=True,
can_manage_video_chats=False,
can_pin_messages=False,
can_delete_messages=False,
can_change_info=False,
can_restrict_members=False,
can_invite_users=False,
can_promote_members=False,
is_anonymous=False,
)
return ChatPrivileges(
can_manage_chat=True,
can_manage_video_chats=True,
can_pin_messages=True,
can_delete_messages=True,
can_change_info=True,
can_restrict_members=True,
can_invite_users=True,
can_promote_members=full,
is_anonymous=anon,
)
@bot.add_cmd(cmd=["promote", "demote"])
async def promote_or_demote(bot: bot, message: Message) -> None:
response: Message = await message.reply(
f"Trying to {message.cmd.capitalize()}....."
)
user, title = await message.extract_user_n_reason()
if not isinstance(user, User):
await response.edit(user, del_in=10)
return
full: bool = "-full" in message.flags
anon: bool = "-anon" in message.flags
without_rights = "-wr" in message.flags
promote = message.cmd == "promote"
if promote:
privileges: ChatPrivileges = get_privileges(
full=full, anon=anon, without_rights=without_rights
)
else:
privileges = ChatPrivileges(can_manage_chat=False)
response_text = f"{message.cmd.capitalize()}d: {user.mention}"
try:
await bot.promote_chat_member(
chat_id=message.chat.id, user_id=user.id, privileges=privileges
)
if promote:
# Let server promote admin before setting title
# Bot is too fast moment 😂😂😂
await asyncio.sleep(1)
await bot.set_administrator_title(
chat_id=message.chat.id, user_id=user.id, title=title or "Admin"
)
if title:
response_text += f"\nTitle: {title}"
if without_rights:
response_text += "\nWithout Rights: True"
await response.edit(text=response_text)
except Exception as e:
await response.edit(text=e, del_in=10, block=True)
@bot.add_cmd(cmd=["ban", "unban"])
async def ban_or_unban(bot: bot, message: Message) -> None:
user, reason = await message.extract_user_n_reason()
if not isinstance(user, User):
await message.reply(user, del_in=10)
return
if message.cmd == "ban":
action: Awaitable = bot.ban_chat_member(
chat_id=message.chat.id, user_id=user.id
)
else:
action: Awaitable = bot.unban_chat_member(
chat_id=message.chat.id, user_id=user.id
)
try:
await action
await message.reply(
text=f"{message.cmd.capitalize()}ned: {user.mention}\nReason: {reason}."
)
except Exception as e:
await message.reply(text=e, del_in=10)
@bot.add_cmd(cmd="kick")
async def kick_user(bot, message: Message):
user, reason = await message.extract_user_n_reason()
if not isinstance(user, User):
await message.reply(user, del_in=10)
return
try:
await bot.ban_chat_member(chat_id=message.chat.id, user_id=user.id)
await asyncio.sleep(1)
await bot.unban_chat_member(chat_id=message.chat.id, user_id=user.id)
await message.reply(
text=f"{message.cmd.capitalize()}ed: {user.mention}\nReason: {reason}."
)
except Exception as e:
await message.reply(text=e, del_in=10)
@bot.add_cmd(cmd=["mute", "unmute"])
async def mute_or_unmute(bot: bot, message: Message):
user, reason = await message.extract_user_n_reason()
if not isinstance(user, User):
await message.reply(user, del_in=10)
return
perms = message.chat.permissions
if message.cmd == "mute":
perms = ChatPermissions(
can_send_messages=False,
can_pin_messages=False,
can_invite_users=False,
can_change_info=False,
can_send_media_messages=False,
can_send_polls=False,
can_send_other_messages=False,
can_add_web_page_previews=False,
)
try:
await bot.restrict_chat_member(
chat_id=message.chat.id, user_id=user.id, permissions=perms
)
await message.reply(
text=f"{message.cmd.capitalize()}d: {user.mention}\nReason: {reason}."
)
except Exception as e:
await message.reply(text=e, del_in=10)

View File

@ -1,9 +1,13 @@
from typing import Callable
from pyrogram.errors import MessageNotModified
from app import Config, bot
from app import Config, Message, bot
async def add_or_remove(mode, task, item, config_list, message_id):
async def add_or_remove(
mode: str, task: Callable, item: int, config_list: list, message_id: int
) -> None | str:
err = None
if item in config_list and mode == "add":
return "ID Already in List"
@ -21,7 +25,7 @@ async def add_or_remove(mode, task, item, config_list, message_id):
return err
def extract_user(message):
def extract_user(message: Message) -> tuple:
user, err = message.input.strip(), None
if not Config.USERS_MESSAGE_ID:
return user, "You haven't added `USERS_MESSAGE_ID` Var, Add it."
@ -36,12 +40,12 @@ def extract_user(message):
return user, err
def extract_chat(message):
def extract_chat(message: Message) -> tuple:
chat, err = message.input.strip() or message.chat.id, None
if not Config.AUTO_DL_MESSAGE_ID:
return user, "You haven't added `AUTO_DL_MESSAGE_ID` Var, Add it."
return chat, "You haven't added `AUTO_DL_MESSAGE_ID` Var, Add it."
if not chat:
return user, "Unable to Extract Chat IDs. Try again."
return chat, "Unable to Extract Chat IDs. Try again."
try:
chat = int(chat)
except ValueError:
@ -50,19 +54,19 @@ def extract_chat(message):
@bot.add_cmd(cmd=["addsudo", "delsudo"])
async def add_or_remove_sudo(bot, message):
async def add_or_remove_sudo(bot: bot, message: Message) -> Message | None:
user, err = extract_user(message)
if err:
return await message.reply(err)
if message.cmd == "addsudo":
mode = "add"
task = Config.USERS.append
action = "Added to"
mode: str = "add"
task: Callable = Config.USERS.append
action: str = "Added to"
else:
mode = "remove"
task = Config.USERS.remove
action = "Removed from"
mode: str = "remove"
task: Callable = Config.USERS.remove
action: str = "Removed from"
if err := await add_or_remove(
mode=mode,
@ -76,19 +80,19 @@ async def add_or_remove_sudo(bot, message):
@bot.add_cmd(cmd=["addchat", "delchat"])
async def add_or_remove_chat(bot, message):
async def add_or_remove_chat(bot: bot, message: Message) -> Message | None:
chat, err = extract_chat(message)
if err:
return await message.reply(err)
if message.cmd == "addchat":
mode = "add"
task = Config.CHATS.append
action = "Added to"
mode: str = "add"
task: Callable = Config.CHATS.append
action: str = "Added to"
else:
mode = "remove"
task = Config.CHATS.remove
action = "Removed from"
mode: str = "remove"
task: Callable = Config.CHATS.remove
action: str = "Removed from"
if err := await add_or_remove(
mode=mode,
@ -105,19 +109,19 @@ async def add_or_remove_chat(bot, message):
@bot.add_cmd(cmd=["block", "unblock"])
async def block_or_unblock(bot, message):
async def block_or_unblock(bot: bot, message: Message) -> Message | None:
user, err = extract_user(message)
if err:
return await message.reply(err)
if message.cmd == "block":
mode = "add"
task = Config.BLOCKED_USERS.append
action = "Added to"
mode: str = "add"
task: Callable = Config.BLOCKED_USERS.append
action: str = "Added to"
else:
mode = "remove"
task = Config.BLOCKED_USERS.remove
action = "Removed from"
mode: str = "remove"
task: Callable = Config.BLOCKED_USERS.remove
action: str = "Removed from"
if err := await add_or_remove(
mode=mode,
@ -131,18 +135,18 @@ async def block_or_unblock(bot, message):
@bot.add_cmd(cmd=["enable", "disable"])
async def auto_dl_trigger(bot, message):
async def auto_dl_trigger(bot: bot, message: Message) -> Message | None:
if not Config.DISABLED_CHATS_MESSAGE_ID:
return await message.reply("You haven't added `DISABLED_CHATS_ID` Var, Add it.")
if message.cmd == "disable":
mode = "add"
task = Config.DISABLED_CHATS.append
action = "Added to"
mode: str = "add"
task: Callable = Config.DISABLED_CHATS.append
action: str = "Added to"
else:
mode = "remove"
task = Config.DISABLED_CHATS.remove
action = "Removed from"
mode: str = "remove"
task: Callable = Config.DISABLED_CHATS.remove
action: str = "Removed from"
if err := await add_or_remove(
mode=mode,

View File

@ -6,31 +6,31 @@ from io import StringIO
from pyrogram.enums import ParseMode
from app import Config
from app.core import shell
from app.core import aiohttp_tools as aio # isort:skip
from app import Config, Message, bot
from app.core import shell, aiohttp_tools as aio # isort:skip
# Run shell commands
async def run_cmd(bot, message):
cmd = message.input.strip()
reply = await message.reply("executing...")
async def run_cmd(bot: bot, message: Message) -> Message | None:
cmd: str = message.input.strip()
reply: Message = await message.reply("executing...")
try:
proc_stdout = await asyncio.Task(shell.run_shell_cmd(cmd), name=reply.task_id)
proc_stdout: str = await asyncio.Task(
shell.run_shell_cmd(cmd), name=reply.task_id
)
except asyncio.exceptions.CancelledError:
return await reply.edit("`Cancelled...`")
output = f"`${cmd}`\n\n`{proc_stdout}`"
output: str = f"~$`{cmd}`\n\n`{proc_stdout}`"
return await reply.edit(output, name="sh.txt", disable_web_page_preview=True)
# Shell but Live Output
async def live_shell(bot, message):
cmd = message.input.strip()
reply = await message.reply("`getting live output....`")
sub_process = await shell.AsyncShell.run_cmd(cmd)
sleep_for = 1
output = ""
# Shell with Live Output
async def live_shell(bot: bot, message: Message) -> Message | None:
cmd: str = message.input.strip()
reply: Message = await message.reply("`getting live output....`")
sub_process: shell.AsyncShell = await shell.AsyncShell.run_cmd(cmd)
sleep_for: int = 1
output: str = ""
try:
async for stdout in sub_process.get_output():
if output != stdout:
@ -46,7 +46,7 @@ async def live_shell(bot, message):
await asyncio.Task(asyncio.sleep(sleep_for), name=reply.task_id)
sleep_for += 1
return await reply.edit(
f"`$ {cmd}\n\n``{sub_process.full_std}`",
f"~$`{cmd}\n\n``{sub_process.full_std}`",
name="shell.txt",
disable_web_page_preview=True,
)
@ -58,11 +58,11 @@ async def live_shell(bot, message):
# Run Python code
async def executor(bot, message):
code = message.flt_input.strip()
async def executor(bot: bot, message: Message) -> Message | None:
code: str = message.flt_input.strip()
if not code:
return await message.reply("exec Jo mama?")
reply = await message.reply("executing")
reply: Message = await message.reply("executing")
sys.stdout = codeOut = StringIO()
sys.stderr = codeErr = StringIO()
# Indent code as per proper python syntax
@ -81,10 +81,15 @@ async def executor(bot, message):
sys.stderr = sys.__stderr__
output = codeErr.getvalue().strip() or codeOut.getvalue().strip()
if func_out is not None:
output = "\n\n".join([output, str(func_out)]).strip()
if "-s" not in message.flags:
output = f"{output}\n\n{func_out}".strip()
elif not output and "-s" in message.flags:
await reply.delete()
return
if "-s" in message.flags:
output = f">> `{output}`"
else:
output = f"> `{code}`\n\n>> `{output}`"
return await reply.edit(
await reply.edit(
output,
name="exec.txt",
disable_web_page_preview=True,
@ -92,17 +97,17 @@ async def executor(bot, message):
)
async def loader(bot, message):
async def loader(bot: bot, message: Message) -> Message | None:
if (
not message.replied
or not message.replied.document
or not message.replied.document.file_name.endswith(".py")
):
return await message.reply("reply to a plugin.")
reply = await message.reply("Loading....")
file_name = message.replied.document.file_name.rstrip(".py")
reply: Message = await message.reply("Loading....")
file_name: str = message.replied.document.file_name.rstrip(".py")
reload = sys.modules.pop(f"app.temp.{file_name}", None)
status = "Reloaded" if reload else "Loaded"
status: str = "Reloaded" if reload else "Loaded"
await message.replied.download("app/temp/")
try:
importlib.import_module(f"app.temp.{file_name}")
@ -111,6 +116,26 @@ async def loader(bot, message):
await reply.edit(f"{status} {file_name}.py.")
@bot.add_cmd(cmd="c")
async def cancel_task(bot: bot, message: Message) -> Message | None:
task_id: str | None = message.replied_task_id
if not task_id:
return await message.reply(
text="Reply To a Command or Bot's Response Message.", del_in=8
)
all_tasks: set[asyncio.all_tasks] = asyncio.all_tasks()
tasks: list[asyncio.Task] | None = [x for x in all_tasks if x.get_name() == task_id]
if not tasks:
return await message.reply(
text="Task not in Currently Running Tasks.", del_in=8
)
response: str = ""
for task in tasks:
status: bool = task.cancel()
response += f"Task: __{task.get_name()}__\nCancelled: __{status}__\n"
await message.reply(response, del_in=5)
if Config.DEV_MODE:
Config.CMD_DICT["sh"] = run_cmd
Config.CMD_DICT["shell"] = live_shell

View File

@ -6,7 +6,7 @@ from urllib.parse import urlparse
import yt_dlp
from app import bot
from app import Message, bot
from app.api.ytdl import FakeLogger
from app.core.aiohttp_tools import in_memory_dl
@ -21,43 +21,41 @@ domains = [
@bot.add_cmd(cmd="song")
async def song_dl(bot, message):
async def song_dl(bot: bot, message: Message) -> None | Message:
reply_query = None
audio_file = None
artist = None
if message.replied:
for link in message.replied.text.split():
for link in message.replied.text_list:
if urlparse(link).netloc in domains:
reply_query = link
break
query = reply_query or message.flt_input
if not query:
return await message.reply("Give a song name or link to download.")
response = await message.reply("Searching....")
dl_path = f"downloads/{time()}/"
query_or_search = query if query.startswith("http") else f"ytsearch:{query}"
response: Message = await message.reply("Searching....")
dl_path: str = f"downloads/{time()}/"
query_or_search: str = query if query.startswith("http") else f"ytsearch:{query}"
if "-m" in message.flags:
aformat = "mp3"
a_format = "mp3"
else:
aformat = "opus"
a_format = "opus"
yt_opts = {
"logger": FakeLogger(),
"outtmpl": dl_path + "%(title)s.%(ext)s",
"format": "bestaudio",
"postprocessors": [
{"key": "FFmpegExtractAudio", "preferredcodec": aformat},
{"key": "FFmpegExtractAudio", "preferredcodec": a_format},
{"key": "FFmpegMetadata"},
{"key": "EmbedThumbnail"},
],
}
ytdl = yt_dlp.YoutubeDL(yt_opts)
yt_info = await asyncio.to_thread(ytdl.extract_info, query_or_search)
ytdl: yt_dlp.YoutubeDL = yt_dlp.YoutubeDL(yt_opts)
yt_info: dict = await asyncio.to_thread(ytdl.extract_info, query_or_search)
if not query_or_search.startswith("http"):
yt_info = yt_info["entries"][0]
duration = yt_info["duration"]
artist = yt_info["channel"]
yt_info: str = yt_info["entries"][0]
duration: int = yt_info["duration"]
artist: str = yt_info["channel"]
thumb = await in_memory_dl(yt_info["thumbnail"])
down_path = glob.glob(dl_path + "*")
down_path: list = glob.glob(dl_path + "*")
if not down_path:
return await response.edit("Not found")
await response.edit("Uploading....")

View File

@ -2,36 +2,38 @@ import os
from pyrogram.enums import ChatType
from pyrogram.errors import BadRequest
from pyrogram.types import Message
from app import bot
from app import Config, Message, bot
# Delete replied and command message
@bot.add_cmd(cmd="del")
async def delete_message(bot, message: Message):
async def delete_message(bot: bot, message: Message) -> None:
await message.delete(reply=True)
# Delete Multiple messages from replied to command.
@bot.add_cmd(cmd="purge")
async def purge_(bot, message: Message):
reply = message.replied
if not reply:
async def purge_(bot: bot, message: Message) -> None | Message:
start_message: int = message.reply_id
if not start_message:
return await message.reply("reply to a message")
start_message = reply.id
end_message = message.id
messages = [end_message] + [i for i in range(int(start_message), int(end_message))]
end_message: int = message.id
messages: list[int] = [
end_message,
*[i for i in range(int(start_message), int(end_message))],
]
await bot.delete_messages(
chat_id=message.chat.id, message_ids=messages, revoke=True
)
@bot.add_cmd(cmd="ids")
async def get_ids(bot, message):
if reply := message.replied:
ids = ""
async def get_ids(bot: bot, message: Message) -> None:
reply: Message = message.replied
if reply:
ids: str = ""
reply_forward = reply.forward_from_chat
reply_user = reply.from_user
ids += f"Chat : `{reply.chat.id}`\n"
@ -40,13 +42,13 @@ async def get_ids(bot, message):
if reply_user:
ids += f"User : {reply.from_user.id}"
else:
ids = f"Chat :`{message.chat.id}`"
ids: str = f"Chat :`{message.chat.id}`"
await message.reply(ids)
@bot.add_cmd(cmd="join")
async def join_chat(bot, message):
chat = message.input
async def join_chat(bot: bot, message: Message) -> Message:
chat: str = message.input
try:
await bot.join_chat(chat)
except (KeyError, BadRequest):
@ -58,11 +60,16 @@ async def join_chat(bot, message):
@bot.add_cmd(cmd="leave")
async def leave_chat(bot, message):
async def leave_chat(bot: bot, message: Message) -> None:
if message.input:
chat = message.input
else:
chat = message.chat.id
await message.reply(
f"Leaving current chat in 5\nReply with `{Config.TRIGGER}c` to cancel",
del_in=5,
block=True,
)
try:
await bot.leave_chat(chat)
except Exception as e:
@ -70,8 +77,8 @@ async def leave_chat(bot, message):
@bot.add_cmd(cmd="reply")
async def reply(bot, message):
text = message.input
async def reply(bot: bot, message: Message) -> None:
text: str = message.input
await bot.send_message(
chat_id=message.chat.id,
text=text,

View File

@ -1,21 +1,23 @@
import asyncio
from app import bot
from app import Message, bot
@bot.add_cmd(cmd=["cancel", "c"])
async def cancel_task(bot, message):
task_id = message.replied_task_id
@bot.add_cmd(cmd="c")
async def cancel_task(bot: bot, message: Message) -> Message | None:
task_id: str | None = message.replied_task_id
if not task_id:
return await message.reply(
"Reply To a Command or Bot's Response Message.", del_in=8
text="Reply To a Command or Bot's Response Message.", del_in=8
)
all_tasks = asyncio.all_tasks()
tasks = [x for x in all_tasks if x.get_name() == task_id]
all_tasks: set[asyncio.all_tasks] = asyncio.all_tasks()
tasks: list[asyncio.Task] | None = [x for x in all_tasks if x.get_name() == task_id]
if not tasks:
return await message.reply("Task not in Currently Running Tasks.", del_in=8)
response = ""
return await message.reply(
text="Task not in Currently Running Tasks.", del_in=8
)
response: str = ""
for task in tasks:
status = task.cancel()
status: bool = task.cancel()
response += f"Task: __{task.get_name()}__\nCancelled: __{status}__\n"
await message.reply(response, del_in=5)

48
app/plugins/bot.py → app/plugins/utils.py Executable file → Normal file
View File

@ -1,10 +1,16 @@
import asyncio
import os
from async_lru import alru_cache
from git import Repo
from pyrogram.enums import ChatType
from app import Config, bot
from app import Config, Message, bot
@alru_cache()
async def get_banner() -> Message:
return await bot.get_messages("Social_DL", [2, 3])
@bot.add_cmd(cmd="bot")
@ -13,7 +19,7 @@ async def info(bot, message):
chat_count = (
f"\n<b>Auto-Dl enabled in: <code>{len(Config.CHATS)}</code> chats</b>\n"
)
supported_sites, photo = await bot.get_messages("Social_DL", [2, 3])
supported_sites, photo = await get_banner()
await photo.copy(
message.chat.id,
caption="\n".join([head, chat_count, supported_sites.text.html]),
@ -21,58 +27,62 @@ async def info(bot, message):
@bot.add_cmd(cmd="help")
async def help(bot, message):
commands = "\n".join(
async def cmd_list(bot: bot, message: Message) -> None:
commands: str = "\n".join(
[f"<code>{Config.TRIGGER}{i}</code>" for i in Config.CMD_DICT.keys()]
)
await message.reply(f"<b>Available Commands:</b>\n\n{commands}", del_in=30)
@bot.add_cmd(cmd="restart")
async def restart(bot, message, u_resp=None):
reply = u_resp or await message.reply("restarting....")
async def restart(bot: bot, message: Message, u_resp: Message | None = None) -> None:
reply: Message = u_resp or await message.reply("restarting....")
if reply.chat.type in (ChatType.GROUP, ChatType.SUPERGROUP):
os.environ["RESTART_MSG"] = str(reply.id)
os.environ["RESTART_CHAT"] = str(reply.chat.id)
await bot.restart()
await bot.restart(hard="-h" in message.flags)
@bot.add_cmd(cmd="refresh")
async def chat_update(bot, message):
async def chat_update(bot: bot, message: Message) -> None:
await bot.set_filter_list()
await message.reply("Filters Refreshed", del_in=8)
@bot.add_cmd(cmd="repo")
async def sauce(bot, message):
async def sauce(bot: bot, message: Message) -> None:
await bot.send_message(
chat_id=message.chat.id,
text=f"<a href='{Config.UPSTREAM_REPO}'>Social-DL</a>",
text="<a href='https://github.com/anonymousx97/social-dl'>Social-DL</a>",
reply_to_message_id=message.reply_id or message.id,
disable_web_page_preview=True,
)
@bot.add_cmd(cmd="update")
async def updater(bot, message):
reply = await message.reply("Checking for Updates....")
repo = Repo()
async def updater(bot: bot, message: Message) -> None | Message:
reply: Message = await message.reply("Checking for Updates....")
repo: Repo = Repo()
repo.git.fetch()
commits = ""
limit = 0
commits: str = ""
limit: int = 0
for commit in repo.iter_commits("HEAD..origin/main"):
commits += f"<b>#{commit.count()}</b> <a href='{Config.UPSTREAM_REPO}/commit/{commit}'>{commit.summary}</a> By <i>{commit.author}</i>\n\n"
commits += f"""
<b>#{commit.count()}</b> <a href='{Config.UPSTREAM_REPO}/commit/{commit}'>{commit.summary}</a> By <i>{commit.author}</i>
"""
limit += 1
if limit > 50:
break
if not commits:
return await reply.edit("Already Up To Date.", del_in=5)
return await reply.edit(text="Already Up To Date.", del_in=5)
if "-pull" not in message.flags:
return await reply.edit(f"<b>Update Available:</b>\n\n{commits}")
return await reply.edit(
text=f"<b>Update Available:</b>\n\n{commits}", disable_web_page_preview=True
)
repo.git.reset("--hard")
repo.git.pull(Config.UPSTREAM_REPO, "--rebase=true")
await asyncio.gather(
bot.log(text=f"#Updater\nPulled:\n\n{commits}"),
bot.log(text=f"#Updater\nPulled:\n\n{commits}", disable_web_page_preview=True),
reply.edit("<b>Update Found</b>\n<i>Pulling....</i>"),
)
await restart(bot, message, reply)

View File

@ -1,22 +1,23 @@
import asyncio
import traceback
from typing import Callable, Coroutine
from app import Config, bot
from app.core import filters
from app.core.media_handler import MediaHandler
from app.core.message import Message
from app.core.message import Message, Msg
@bot.add_cmd(cmd="dl")
async def dl(bot, message):
reply = await bot.send_message(
async def dl(bot: bot, message: Message):
reply: Message = await bot.send_message(
chat_id=message.chat.id, text="`trying to download...`"
)
coro = MediaHandler.process(message)
task = asyncio.Task(coro, name=reply.task_id)
media = await task
coro: Coroutine = MediaHandler.process(message)
task: asyncio.Task = asyncio.Task(coro, name=reply.task_id)
media: MediaHandler = await task
if media.exceptions:
exceptions = "\n".join(media.exceptions)
exceptions: str = "\n".join(media.exceptions)
await bot.log(
traceback=exceptions,
func="DL",
@ -31,12 +32,12 @@ async def dl(bot, message):
@bot.on_message(filters.user_filter)
@bot.on_edited_message(filters.user_filter)
async def cmd_dispatcher(bot, message):
message = Message.parse_message(message)
func = Config.CMD_DICT[message.cmd]
coro = func(bot, message)
async def cmd_dispatcher(bot: bot, message: Msg):
message: Message = Message.parse_message(message)
func: Callable = Config.CMD_DICT[message.cmd]
coro: Coroutine = func(bot, message)
try:
task = asyncio.Task(coro, name=message.task_id)
task: asyncio.Task = asyncio.Task(coro, name=message.task_id)
await task
except asyncio.exceptions.CancelledError:
await bot.log(text=f"<b>#Cancelled</b>:\n<code>{message.text}</code>")
@ -49,13 +50,28 @@ async def cmd_dispatcher(bot, message):
)
@bot.on_message(filters.convo_filter, group=0)
@bot.on_edited_message(filters.convo_filter, group=0)
async def convo_handler(bot: bot, message: Msg):
conv_dict: dict = Config.CONVO_DICT[message.chat.id]
conv_filters = conv_dict.get("filters")
if conv_filters:
check = await conv_filters(bot, message)
if not check:
message.continue_propagation()
conv_dict["response"] = message
message.continue_propagation()
conv_dict["response"] = message
message.continue_propagation()
@bot.on_message(filters.allowed_cmd_filter)
@bot.on_message(filters.chat_filter)
async def dl_dispatcher(bot, message):
message = Message.parse_message(message)
coro = dl(bot, message)
async def dl_dispatcher(bot: bot, message: Msg):
message: Message = Message.parse_message(message)
coro: Coroutine = dl(bot, message)
try:
task = asyncio.Task(coro, name=message.task_id)
task: asyncio.Task = asyncio.Task(coro, name=message.task_id)
await task
except asyncio.exceptions.CancelledError:
await bot.log(text=f"<b>#Cancelled</b>:\n<code>{message.text}</code>")