v2.5.0:Upstream with Private Repo.

What's New:
- Sort Plugins into their respective folders.
- download, upload, rename commands.
- Help Docstrings.
- Proper Logging.
- rename .exec cmd to .py
This commit is contained in:
anonymousx97 2024-01-08 16:01:41 +05:30
parent cdf7ca73df
commit 31d19bc48b
60 changed files with 2234 additions and 1138 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
__pycache__
config.env
.idea
logs/
.vscode

View File

@ -1,17 +1,20 @@
import os
import tracemalloc
from dotenv import load_dotenv
load_dotenv("config.env")
tracemalloc.start()
# isort: skip
from .config import Config # noqa
from app.core.message import Message # noqa
from .core.client import BOT # noqa
from app.config import Config # NOQA
from app.core import LOGGER, Message # NOQA
from app.core.client.client import BOT # NOQA
if "com.termux" not in os.environ.get("PATH", ""):
import uvloop # isort:skip
uvloop.install()
bot = BOT()
bot: BOT = BOT()

View File

@ -1,8 +1,6 @@
from app import LOGGER, bot
if __name__ == "__main__":
import tracemalloc
tracemalloc.start()
from app import bot
bot.run(bot.boot())
else:
LOGGER.error("Wrong Start Command.\nUse 'python -m app'")

View File

@ -1,112 +0,0 @@
import os
from urllib.parse import urlparse
from app import Config, Message, bot
from app.core.aiohttp_tools import get_json, get_type
from app.core.scraper_config import MediaType, ScraperConfig
API_KEYS = {"KEYS": Config.API_KEYS, "counter": 0}
async def get_key():
keys, count = API_KEYS.values()
count += 1
if count == len(keys):
count = 0
ret_key = keys[count]
API_KEYS["counter"] = count
return ret_key
class Instagram(ScraperConfig):
def __init__(self, url):
super().__init__()
self.shortcode = os.path.basename(urlparse(url).path.rstrip("/"))
self.api_url = (f"https://www.instagram.com/graphql/query?query_hash=2b0673e0dc4580674a88d426fe00ea90"
f"&variables=%7B%22shortcode%22%3A%22{self.shortcode}%22%7D")
self.url = url
self.dump = True
async def check_dump(self) -> None | bool:
if not Config.DUMP_ID:
return
async for message in bot.search_messages(Config.DUMP_ID, "#" + self.shortcode):
self.media: Message = message
self.type: MediaType = MediaType.MESSAGE
self.in_dump: bool = True
return True
async def download_or_extract(self):
for func in [self.check_dump, self.api_3, self.no_api_dl, self.api_dl]:
if await func():
self.success: bool = True
break
async def api_3(self):
query_api = f"https://{bot.SECRET_API}?url={self.url}&v=1"
response = await get_json(url=query_api, json_=False)
if not response:
return
self.caption = "."
data: list = (
response.get("videos", [])
+ response.get("images", [])
+ response.get("stories", [])
)
if not data:
return
if len(data) > 1:
self.type = MediaType.GROUP
self.media: list = data
return True
else:
self.media: str = data[0]
self.type: MediaType = get_type(self.media)
return True
async def no_api_dl(self):
response = await get_json(url=self.api_url)
if (
not response
or "data" not in response
or not response["data"]["shortcode_media"]
):
return
return await self.parse_ghraphql(response["data"]["shortcode_media"])
async def api_dl(self):
if not Config.API_KEYS:
return
param = {
"api_key": await get_key(),
"url": self.api_url,
"proxy": "residential",
"js": "false",
}
response: dict | None = await get_json(
url="https://api.webscraping.ai/html", timeout=30, params=param
)
if (
not response
or "data" not in response
or not response["data"]["shortcode_media"]
):
return
self.caption = ".."
return await self.parse_ghraphql(response["data"]["shortcode_media"])
async def parse_ghraphql(self, json_: dict) -> str | list | None:
type_check: str | None = json_.get("__typename", None)
if not type_check:
return
elif type_check == "GraphSidecar":
self.media: list[str] = [
i["node"].get("video_url") or i["node"].get("display_url")
for i in json_["edge_sidecar_to_children"]["edges"]
]
self.type: MediaType = MediaType.GROUP
else:
self.media: str = json_.get("video_url", json_.get("display_url"))
self.thumb: str = json_.get("display_url")
self.type: MediaType = get_type(self.media)
return self.media

View File

@ -1,37 +1,57 @@
import json
import os
from typing import Callable
from pyrogram.filters import Filter
from pyrogram.types import Message
from git import Repo
class Config:
API_KEYS: list[int] = json.loads(os.environ.get("API_KEYS", "[]"))
class _Config:
class CMD:
def __init__(self, func, path, doc):
self.func = func
self.path = path
self.doc = doc or "Not Documented."
BLOCKED_USERS: list[int] = []
BLOCKED_USERS_MESSAGE_ID: int = int(os.environ.get("BLOCKED_USERS_MESSAGE_ID", 0))
def __init__(self):
self.API_KEYS: list[int] = json.loads(os.environ.get("API_KEYS", "[]"))
CHATS: list[int] = []
AUTO_DL_MESSAGE_ID: int = int(os.environ.get("AUTO_DL_MESSAGE_ID", 0))
self.BLOCKED_USERS: list[int] = []
self.BLOCKED_USERS_MESSAGE_ID: int = int(
os.environ.get("BLOCKED_USERS_MESSAGE_ID", 0)
)
CMD_DICT: dict[str, Callable] = {}
CONVO_DICT: dict[int, dict[str | int, Message | Filter | None]] = {}
self.CHATS: list[int] = []
self.AUTO_DL_MESSAGE_ID: int = int(os.environ.get("AUTO_DL_MESSAGE_ID", 0))
DEV_MODE: int = int(os.environ.get("DEV_MODE", 0))
self.CMD_TRIGGER: str = os.environ.get("TRIGGER", ".")
DISABLED_CHATS: list[int] = []
DISABLED_CHATS_MESSAGE_ID: int = int(os.environ.get("DISABLED_CHATS_MESSAGE_ID", 0))
self.CMD_DICT: dict[str, _Config.CMD] = {}
DUMP_ID: int = int(os.environ.get("DUMP_ID", 0))
self.DEV_MODE: int = int(os.environ.get("DEV_MODE", 0))
LOG_CHAT: int = int(os.environ.get("LOG_CHAT"))
self.DISABLED_CHATS: list[int] = []
TRIGGER: str = os.environ.get("TRIGGER", ".")
self.DISABLED_CHATS_MESSAGE_ID: int = int(
os.environ.get("DISABLED_CHATS_MESSAGE_ID", 0)
)
UPSTREAM_REPO = os.environ.get(
"UPSTREAM_REPO", "https://github.com/anonymousx97/social-dl"
).rstrip("/")
self.DUMP_ID: int = int(os.environ.get("DUMP_ID", 0))
USERS: list[int] = []
USERS_MESSAGE_ID: int = int(os.environ.get("USERS_MESSAGE_ID", 0))
self.INIT_TASKS: list = []
self.LOG_CHAT: int = int(os.environ.get("LOG_CHAT"))
self.REPO = Repo(".")
self.UPSTREAM_REPO = os.environ.get(
"UPSTREAM_REPO", "https://github.com/anonymousx97/social-dl"
).rstrip("/")
self.USERS: list[int] = []
self.USERS_MESSAGE_ID: int = int(os.environ.get("USERS_MESSAGE_ID", 0))
def __str__(self):
config_dict = self.__dict__.copy()
return json.dumps(config_dict, indent=4, ensure_ascii=False, default=str)
Config = _Config()

3
app/core/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from app.core.types.message import Message # NOQA
from app.core.logger import LOGGER # NOQA

View File

@ -1,29 +1,32 @@
import asyncio
import glob
import importlib
import json
import inspect
import os
import sys
import traceback
from functools import wraps
from io import BytesIO
from pyrogram import Client, filters, idle
from pyrogram import Client, idle
from pyrogram.enums import ParseMode
from pyrogram.filters import Filter
from pyrogram.types import Message as Msg
from app import Config
from app.core import aiohttp_tools
from app.core.conversation import Conversation
from app.core.message import Message
from app import LOGGER, Config, Message
from app.utils import aiohttp_tools
async def import_modules():
for py_module in glob.glob(pathname="app/**/*.py", recursive=True):
def import_modules():
for py_module in glob.glob(pathname="app/**/[!^_]*.py", recursive=True):
name = os.path.splitext(py_module)[0]
py_name = name.replace("/", ".")
try:
importlib.import_module(py_name)
except Exception as e:
print(e)
mod = importlib.import_module(py_name)
if hasattr(mod, "init_task"):
Config.INIT_TASKS.append(mod.init_task())
except BaseException:
LOGGER.error(traceback.format_exc())
class BOT(Client):
@ -42,59 +45,56 @@ class BOT(Client):
sleep_threshold=30,
max_concurrent_transmissions=2,
)
from app.core.client.conversation import Conversation
self.Convo = Conversation
@staticmethod
def add_cmd(cmd: str):
def add_cmd(cmd: str | list):
def the_decorator(func):
path = inspect.stack()[1][1]
@wraps(func)
def wrapper():
if isinstance(cmd, list):
for _cmd in cmd:
Config.CMD_DICT[_cmd] = func
Config.CMD_DICT[_cmd] = Config.CMD(
func=func, path=path, doc=func.__doc__
)
else:
Config.CMD_DICT[cmd] = func
Config.CMD_DICT[cmd] = Config.CMD(
func=func, path=path, doc=func.__doc__
)
wrapper()
return func
return the_decorator
@staticmethod
async def get_response(
chat_id: int, filters: filters.Filter = None, timeout: int = 8
self, chat_id: int, filters: Filter = None, timeout: int = 8
) -> Message | None:
try:
async with Conversation(
async with self.Convo(
chat_id=chat_id, filters=filters, timeout=timeout
) as convo:
response: Message | None = await convo.get_response()
return response
except Conversation.TimeOutError:
except TimeoutError:
return
async def boot(self) -> None:
await super().start()
await import_modules()
await self.set_filter_list()
await aiohttp_tools.session_switch()
await self.edit_restart_msg()
await self.log(text="#SocialDL\n<i>Started</i>")
print("started")
LOGGER.info("Started.")
import_modules()
LOGGER.info("Plugins Imported.")
await asyncio.gather(*Config.INIT_TASKS)
Config.INIT_TASKS.clear()
LOGGER.info("Init Tasks Completed.")
await self.log(text="<i>Started</i>")
LOGGER.info("Idling...")
await idle()
await aiohttp_tools.session_switch()
async def edit_restart_msg(self) -> None:
restart_msg = int(os.environ.get("RESTART_MSG", 0))
restart_chat = int(os.environ.get("RESTART_CHAT", 0))
if restart_msg and restart_chat:
await super().get_chat(restart_chat)
await super().edit_message_text(
chat_id=restart_chat,
message_id=restart_msg,
text="#Social-dl\n__Started__",
)
os.environ.pop("RESTART_MSG", "")
os.environ.pop("RESTART_CHAT", "")
await aiohttp_tools.init_task()
async def log(
self,
@ -110,27 +110,29 @@ class BOT(Client):
if message:
return (await message.copy(chat_id=Config.LOG_CHAT)) # fmt: skip
if traceback:
text = f"""
#Traceback
<b>Function:</b> {func}
<b>Chat:</b> {chat}
<b>Traceback:</b>
<code>{traceback}</code>"""
return await self.send_message(
text = (
"#Traceback"
f"\n<b>Function:</b> {func}"
f"\n<b>Chat:</b> {chat}"
f"\n<b>Traceback:</b>"
f"\n<code>{traceback}</code>"
)
return (await self.send_message(
chat_id=Config.LOG_CHAT,
text=text,
name=name,
disable_web_page_preview=disable_web_page_preview,
parse_mode=parse_mode,
)
)) # fmt:skip
async def restart(self, hard=False) -> None:
await aiohttp_tools.session_switch()
await aiohttp_tools.init_task()
await super().stop(block=False)
if hard:
os.remove("logs/app_logs.txt")
os.execl("/bin/bash", "/bin/bash", "run")
LOGGER.info("Restarting......")
os.execl(sys.executable, sys.executable, "-m", "app")
SECRET_API = base64.b64decode("YS56dG9yci5tZS9hcGkvaW5zdGE=").decode("utf-8")
async def send_message(
self, chat_id: int | str, text, name: str = "output.txt", **kwargs
@ -144,26 +146,3 @@ class BOT(Client):
doc.name = name
kwargs.pop("disable_web_page_preview", "")
return await super().send_document(chat_id=chat_id, document=doc, **kwargs)
async def set_filter_list(self):
chats_id = Config.AUTO_DL_MESSAGE_ID
blocked_id = Config.BLOCKED_USERS_MESSAGE_ID
users = Config.USERS_MESSAGE_ID
disabled = Config.DISABLED_CHATS_MESSAGE_ID
if chats_id:
Config.CHATS = json.loads(
(await super().get_messages(Config.LOG_CHAT, chats_id)).text
)
if blocked_id:
Config.BLOCKED_USERS = json.loads(
(await super().get_messages(Config.LOG_CHAT, blocked_id)).text
)
if users:
Config.USERS = json.loads(
(await super().get_messages(Config.LOG_CHAT, users)).text
)
if disabled:
Config.DISABLED_CHATS = json.loads(
(await super().get_messages(Config.LOG_CHAT, disabled)).text
)

View File

@ -0,0 +1,93 @@
import asyncio
import json
from pyrogram.filters import Filter
from pyrogram.types import Message
class Conversation:
CONVO_DICT: dict[int, "Conversation"] = {}
class DuplicateConvo(Exception):
def __init__(self, chat: str | int):
super().__init__(f"Conversation already started with {chat} ")
def __init__(
self, chat_id: int | str, filters: Filter | None = None, timeout: int = 10
):
self.chat_id = chat_id
self.filters = filters
self.timeout = timeout
self.responses: list = []
self.set_future()
from app import bot
self._client = bot
def __str__(self):
return json.dumps(self.__dict__, indent=4, ensure_ascii=False, default=str)
def set_future(self, *args, **kwargs):
future = asyncio.Future()
future.add_done_callback(self.set_future)
self.response = future
async def get_response(self, timeout: int | None = None) -> Message | None:
try:
resp_future: asyncio.Future.result = await asyncio.wait_for(
self.response, timeout=timeout or self.timeout
)
return resp_future
except asyncio.TimeoutError:
raise TimeoutError("Conversation Timeout")
async def send_message(
self,
text: str,
timeout=0,
get_response=False,
**kwargs,
) -> Message | tuple[Message, Message]:
message = await self._client.send_message(
chat_id=self.chat_id, text=text, **kwargs
)
if get_response:
response = await self.get_response(timeout=timeout or self.timeout)
return message, response
return message
async def send_document(
self,
document,
caption="",
timeout=0,
get_response=False,
**kwargs,
) -> Message | tuple[Message, Message]:
message = await self._client.send_document(
chat_id=self.chat_id,
document=document,
caption=caption,
force_document=True,
**kwargs,
)
if get_response:
response = await self.get_response(timeout=timeout or self.timeout)
return message, response
return message
async def __aenter__(self) -> "Conversation":
if isinstance(self.chat_id, str):
self.chat_id = (await self._client.get_chat(self.chat_id)).id
if (
self.chat_id in Conversation.CONVO_DICT.keys()
and Conversation.CONVO_DICT[self.chat_id].filters == self.filters
):
raise self.DuplicateConvo(self.chat_id)
Conversation.CONVO_DICT[self.chat_id] = self
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
Conversation.CONVO_DICT.pop(self.chat_id, None)
if not self.response.done():
self.response.cancel()

View File

@ -4,11 +4,19 @@ from pyrogram import filters as _filters
from pyrogram.types import Message
from app import Config
from app.core.media_handler import url_map
from app.core.client.conversation import Conversation
from app.social_dl.media_handler import url_map
convo_filter = _filters.create(
lambda _, __, message: (message.chat.id in Conversation.CONVO_DICT.keys())
and (not message.reactions)
)
def check_for_urls(text_list: list):
for link in text_list:
if "music.youtube.com" in link:
continue
if url_map.get(urlparse(link).netloc):
return True
else:
@ -35,35 +43,31 @@ def dynamic_chat_filter(_, __, message: Message, cmd=False) -> bool:
return bool(url_check)
chat_filter = _filters.create(dynamic_chat_filter)
def dynamic_allowed_list(_, __, message: Message) -> bool:
if not dynamic_chat_filter(_, __, message, cmd=True):
if message.reactions or not dynamic_chat_filter(_, __, message, cmd=True):
return False
cmd = message.text.split(maxsplit=1)[0]
cmd_check = cmd in {"/download", "/dl", "/down"}
reaction_check = not message.reactions
return bool(cmd_check and reaction_check)
return cmd in {"/download", "/dl", "/down"}
allowed_cmd_filter = _filters.create(dynamic_allowed_list)
def dynamic_cmd_filter(_, __, message: Message) -> bool:
if (
not message.text
or not message.text.startswith(Config.TRIGGER)
message.reactions
or not message.text
or not message.text.startswith(Config.CMD_TRIGGER)
or not message.from_user
or message.from_user.id not in Config.USERS
):
return False
start_str = message.text.split(maxsplit=1)[0]
cmd = start_str.replace(Config.TRIGGER, "", 1)
cmd_check = cmd in Config.CMD_DICT
reaction_check = not message.reactions
return bool(cmd_check and reaction_check)
cmd = start_str.replace(Config.CMD_TRIGGER, "", 1)
return cmd in Config.CMD_DICT.keys()
chat_filter = _filters.create(dynamic_chat_filter)
user_filter = _filters.create(dynamic_cmd_filter)
allowed_cmd_filter = _filters.create(dynamic_allowed_list)
convo_filter = _filters.create(
lambda _, __, message: (message.chat.id in Config.CONVO_DICT)
and (not message.reactions)
)

View File

@ -2,16 +2,20 @@ import asyncio
import traceback
from typing import Callable, Coroutine
from app import Config, bot
from app.core import filters
from app.core.media_handler import MediaHandler
from app.core.message import Message, Msg
from pyrogram.enums import ChatType
from app import BOT, Config, bot
from app.core.client import filters
from app.core.types.message import Message, Msg
from app.social_dl.media_handler import MediaHandler
@bot.add_cmd(cmd="dl")
async def dl(bot: bot, message: Message):
async def dl(bot: BOT, message: Message):
reply: Message = await bot.send_message(
chat_id=message.chat.id, text="`trying to download...`"
chat_id=message.chat.id,
text="`trying to download...`",
disable_notification=message.chat.type == ChatType.CHANNEL,
)
coro: Coroutine = MediaHandler.process(message)
task: asyncio.Task = asyncio.Task(coro, name=reply.task_id)
@ -34,7 +38,7 @@ async def dl(bot: bot, message: Message):
@bot.on_edited_message(filters.user_filter)
async def cmd_dispatcher(bot: bot, message: Msg):
message: Message = Message.parse_message(message)
func: Callable = Config.CMD_DICT[message.cmd]
func: Callable = Config.CMD_DICT[message.cmd].func
coro: Coroutine = func(bot, message)
try:
task: asyncio.Task = asyncio.Task(coro, name=message.task_id)
@ -50,21 +54,6 @@ async def cmd_dispatcher(bot: bot, message: Msg):
)
@bot.on_message(filters.convo_filter, group=0)
@bot.on_edited_message(filters.convo_filter, group=0)
async def convo_handler(bot: bot, message: Msg):
conv_dict: dict = Config.CONVO_DICT[message.chat.id]
conv_filters = conv_dict.get("filters")
if conv_filters:
check = await conv_filters(bot, message)
if not check:
message.continue_propagation()
conv_dict["response"] = message
message.continue_propagation()
conv_dict["response"] = message
message.continue_propagation()
@bot.on_message(filters.allowed_cmd_filter)
@bot.on_message(filters.chat_filter)
async def dl_dispatcher(bot: bot, message: Msg):
@ -82,3 +71,14 @@ async def dl_dispatcher(bot: bot, message: Msg):
func=dl.__name__,
name="traceback.txt",
)
@bot.on_message(filters.convo_filter, group=0)
@bot.on_edited_message(filters.convo_filter, group=0)
async def convo_handler(bot: BOT, message: Msg):
conv_obj: bot.Convo = bot.Convo.CONVO_DICT[message.chat.id]
if conv_obj.filters and not (await conv_obj.filters(bot, message)):
message.continue_propagation()
conv_obj.responses.append(message)
conv_obj.response.set_result(message)
message.continue_propagation()

View File

@ -1,47 +0,0 @@
import asyncio
import json
from pyrogram.filters import Filter
from pyrogram.types import Message
from app import Config
class Conversation:
class DuplicateConvo(Exception):
def __init__(self, chat: str | int | None = None):
text = "Conversation already started"
if chat:
text += f" with {chat}"
super().__init__(text)
class TimeOutError(Exception):
def __init__(self):
super().__init__("Conversation Timeout")
def __init__(self, chat_id: int, filters: Filter | None = None, timeout: int = 10):
self.chat_id = chat_id
self.filters = filters
self.timeout = timeout
def __str__(self):
return json.dumps(self.__dict__, indent=4, ensure_ascii=False)
async def get_response(self, timeout: int | None = None) -> Message | None:
try:
async with asyncio.timeout(timeout or self.timeout):
while not Config.CONVO_DICT[self.chat_id]["response"]:
await asyncio.sleep(0)
return Config.CONVO_DICT[self.chat_id]["response"]
except asyncio.TimeoutError:
raise self.TimeOutError
async def __aenter__(self) -> "Conversation":
if self.chat_id in Config.CONVO_DICT:
raise self.DuplicateConvo(self.chat_id)
convo_dict = {"filters": self.filters, "response": None}
Config.CONVO_DICT[self.chat_id] = convo_dict
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
Config.CONVO_DICT.pop(self.chat_id, "")

26
app/core/logger.py Normal file
View File

@ -0,0 +1,26 @@
import os
from logging import INFO, WARNING, StreamHandler, basicConfig, getLogger, handlers
os.makedirs("logs", exist_ok=True)
LOGGER = getLogger("Social-DL")
basicConfig(
level=INFO,
format="[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s",
datefmt="%y-%m-%d %H:%M:%S",
handlers={
handlers.RotatingFileHandler(
filename="logs/app_logs.txt",
mode="a",
maxBytes=5 * 1024 * 1024,
backupCount=2,
encoding=None,
delay=0,
),
StreamHandler(),
},
)
getLogger("pyrogram").setLevel(WARNING)
getLogger("httpx").setLevel(WARNING)

View File

@ -1,244 +0,0 @@
import asyncio
import glob
import json
import os
import traceback
from functools import lru_cache
from io import BytesIO
from urllib.parse import urlparse
from pyrogram.errors import MediaEmpty, PhotoSaveFileInvalid, WebpageCurlFailed
from pyrogram.types import InputMediaPhoto, InputMediaVideo
from app import Config, Message, bot
from app.api.gallery_dl import GalleryDL
from app.api.instagram import Instagram
from app.api.reddit import Reddit
from app.api.threads import Threads
from app.api.tiktok import Tiktok
from app.api.ytdl import YouTubeDL
from app.core import aiohttp_tools, shell
from app.core.scraper_config import MediaType
url_map: dict = {
"tiktok.com": Tiktok,
"www.instagram.com": Instagram,
"www.reddit.com": Reddit,
"reddit.com": Reddit,
"www.threads.net": Threads,
"twitter.com": GalleryDL,
"x.com": GalleryDL,
"www.x.com": GalleryDL,
"youtube.com": YouTubeDL,
"youtu.be": YouTubeDL,
"www.facebook.com": YouTubeDL,
}
@lru_cache()
def get_url_dict_items():
return url_map.items()
class MediaHandler:
def __init__(self, message: Message) -> None:
self.exceptions = []
self.media_objects: list[asyncio.Task.result] = []
self.sender_dict = {}
self.__client: bot = message._client
self.message: Message = message
self.doc: bool = "-d" in message.flags
self.spoiler: bool = "-s" in message.flags
self.args_: dict = {
"chat_id": self.message.chat.id,
"reply_to_message_id": message.reply_id,
}
def __str__(self):
return json.dumps(self.__dict__, indent=4, ensure_ascii=False)
def get_sender(self, reply: bool = False) -> str:
if "-ns" in self.message.flags:
return ""
text: str = f"\nShared by : "
author: str | None = self.message.author_signature
sender: str = user.first_name if (user := self.message.from_user) else ""
reply_sender: str = ""
if reply:
reply_msg = self.message.replied
reply_sender: str = (
reply_user.first_name if (reply_user := reply_msg.from_user) else ""
)
if any((author, sender, reply_sender)):
return text + (author or sender if not reply else reply_sender)
return ""
async def get_media(self) -> None:
async with asyncio.TaskGroup() as task_group:
tasks: list[asyncio.Task] = []
text_list: list[str] = self.message.text_list
reply_text_list: list[str] = self.message.reply_text_list
for link in text_list + reply_text_list:
reply: bool = link in reply_text_list
if match := url_map.get(urlparse(link).netloc):
tasks.append(task_group.create_task(match.start(link)))
self.sender_dict[link] = self.get_sender(reply=reply)
else:
for key, val in get_url_dict_items():
if key in link:
tasks.append(task_group.create_task(val.start(link)))
self.sender_dict[link] = self.get_sender(reply=reply)
self.media_objects: list[asyncio.Task.result] = [
task.result() for task in tasks if task.result()
]
async def send_media(self) -> None:
for obj in self.media_objects:
if "-nc" in self.message.flags:
caption = ""
else:
caption = (
obj.caption + obj.caption_url + self.sender_dict[obj.query_url]
)
try:
if self.doc and not obj.in_dump:
await self.send_document(obj.media, caption=caption, path=obj.path)
continue
match obj.type:
case MediaType.MESSAGE:
await obj.media.copy(self.message.chat.id, caption=caption)
continue
case MediaType.GROUP:
await self.send_group(obj, caption=caption)
continue
case MediaType.PHOTO:
post = await self.send(
media={"photo": obj.media},
method=self.__client.send_photo,
caption=caption,
)
case MediaType.VIDEO:
post = await self.send(
media={"video": obj.media},
method=self.__client.send_video,
thumb=await aiohttp_tools.thumb_dl(obj.thumb),
caption=caption,
)
case MediaType.GIF:
post = await self.send(
media={"animation": obj.media},
method=self.__client.send_animation,
caption=caption,
unsave=True,
)
if obj.dump and Config.DUMP_ID:
await post.copy(Config.DUMP_ID, caption="#" + obj.shortcode)
except BaseException:
self.exceptions.append(
"\n".join([obj.caption_url.strip(), traceback.format_exc()])
)
async def send(self, media: dict | str, method, caption: str, **kwargs) -> Message:
try:
try:
post: Message = await method(
**media,
**self.args_,
caption=caption,
**kwargs,
has_spoiler=self.spoiler,
)
except (MediaEmpty, WebpageCurlFailed):
key, value = list(media.items())[0]
media[key] = await aiohttp_tools.in_memory_dl(value)
post: Message = await method(
**media,
**self.args_,
caption=caption,
**kwargs,
has_spoiler=self.spoiler,
)
except PhotoSaveFileInvalid:
post: Message = await self.__client.send_document(
**self.args_, document=media, caption=caption, force_document=True
)
return post
async def send_document(self, docs: list, caption: str, path="") -> None:
if not path:
docs = await asyncio.gather(
*[aiohttp_tools.in_memory_dl(doc) for doc in docs]
)
else:
[os.rename(file_, file_ + ".png") for file_ in glob.glob(f"{path}/*.webp")]
docs = glob.glob(f"{path}/*")
for doc in docs:
await self.__client.send_document(
**self.args_, document=doc, caption=caption, force_document=True
)
await asyncio.sleep(0.5)
async def send_group(self, media_obj, caption: str) -> None:
sorted: list[str, list[InputMediaVideo | InputMediaPhoto]] = await sort_media(
caption=caption,
spoiler=self.spoiler,
urls=media_obj.media,
path=media_obj.path,
)
for data in sorted:
if isinstance(data, list):
await self.__client.send_media_group(**self.args_, media=data)
else:
await self.send(
media={"animation": data},
method=self.__client.send_animation,
caption=caption,
unsave=True,
)
await asyncio.sleep(1)
@classmethod
async def process(cls, message):
obj = cls(message=message)
await obj.get_media()
await obj.send_media()
[m_obj.cleanup() for m_obj in obj.media_objects]
return obj
async def sort_media(
caption="", spoiler=False, urls=None, path=None
) -> list[str, list[InputMediaVideo | InputMediaPhoto]]:
images, videos, animations = [], [], []
if path and os.path.exists(path):
[os.rename(file_, file_ + ".png") for file_ in glob.glob(f"{path}/*.webp")]
media: list[str] = glob.glob(f"{path}/*")
else:
media: tuple[BytesIO] = await asyncio.gather(
*[aiohttp_tools.in_memory_dl(url) for url in urls]
)
for file in media:
if path:
name: str = file.lower()
else:
name: str = file.name.lower()
if name.endswith((".png", ".jpg", ".jpeg")):
images.append(InputMediaPhoto(file, caption=caption, has_spoiler=spoiler))
elif name.endswith((".mp4", ".mkv", ".webm")):
if not urls and not await shell.check_audio(file):
animations.append(file)
else:
videos.append(
InputMediaVideo(file, caption=caption, has_spoiler=spoiler)
)
elif name.endswith(".gif"):
animations.append(file)
return await make_chunks(images, videos, animations)
async def make_chunks(
images=[], videos=[], animations=[]
) -> list[str, list[InputMediaVideo | InputMediaPhoto]]:
chunk_imgs = [images[imgs : imgs + 5] for imgs in range(0, len(images), 5)]
chunk_vids = [videos[vids : vids + 5] for vids in range(0, len(videos), 5)]
return [*chunk_imgs, *chunk_vids, *animations]

View File

@ -1,44 +0,0 @@
import json
import shutil
from enum import Enum, auto
from io import BytesIO
class MediaType(Enum):
PHOTO = auto()
VIDEO = auto()
GROUP = auto()
GIF = auto()
MESSAGE = auto()
class ScraperConfig:
def __init__(self) -> None:
self.dump: bool = False
self.in_dump: bool = False
self.path: str = ""
self.media: str | BytesIO = ""
self.caption: str = ""
self.caption_url: str = ""
self.thumb: str | None | BytesIO = None
self.type: None | MediaType = None
self.success: bool = False
def __str__(self):
return json.dumps(self.__dict__, indent=4, ensure_ascii=False, default=str)
def set_sauce(self, url: str) -> None:
self.caption_url = f"\n\n<a href='{url}'>Sauce</a>"
@classmethod
async def start(cls, url: str) -> "ScraperConfig":
obj = cls(url=url)
obj.query_url = url
obj.set_sauce(url)
await obj.download_or_extract()
if obj.success:
return obj
def cleanup(self) -> None:
if self.path:
shutil.rmtree(self.path, ignore_errors=True)

View File

@ -7,20 +7,28 @@ from pyrogram.types import Message as Msg
from pyrogram.types import User
from app import Config
from app.core.conversation import Conversation
class Message(Msg):
def __init__(self, message: Msg) -> None:
args = vars(message)
args["client"] = args.pop("_client", None)
args = vars(message).copy()
args["client"] = args.pop("_client", message._client)
super().__init__(**args)
@cached_property
def cmd(self) -> str | None:
if not self.text_list:
return
raw_cmd = self.text_list[0]
cmd = raw_cmd[1:]
return cmd if cmd in Config.CMD_DICT else None
return (
cmd
if (
cmd in Config.CMD_DICT.keys()
or cmd in {"dl", "down", "download", "play", "song"}
)
else None
)
@cached_property
def flags(self) -> list:
@ -40,6 +48,12 @@ class Message(Msg):
return self.text.split(maxsplit=1)[-1]
return ""
@cached_property
def is_from_owner(self) -> bool:
if self.from_user and self.from_user.id == Config.OWNER_ID:
return True
return False
@cached_property
def replied(self) -> "Message":
if self.reply_to_message:
@ -55,10 +69,7 @@ class Message(Msg):
@cached_property
def reply_text_list(self) -> list:
reply_text_list = []
if self.replied and self.replied.text and "dl" in self.text_list[0]:
reply_text_list = self.replied.text_list
return reply_text_list
return self.replied.text_list if self.replied else []
@cached_property
def task_id(self):
@ -66,7 +77,13 @@ class Message(Msg):
@cached_property
def text_list(self) -> list:
return self.text.split()
if self.text:
return self.text.split()
return []
@cached_property
def trigger(self):
return Config.CMD_TRIGGER
async def async_deleter(self, del_in, task, block) -> None:
if block:
@ -124,12 +141,12 @@ class Message(Msg):
async def get_response(self, filters: Filter = None, timeout: int = 8):
try:
async with Conversation(
async with self._client.Convo(
chat_id=self.chat.id, filters=filters, timeout=timeout
) as convo:
response: Message | None = await convo.get_response()
return response
except Conversation.TimeOutError:
except TimeoutError:
return
async def reply(

46
app/plugins/admin/ban.py Normal file
View File

@ -0,0 +1,46 @@
import asyncio
from typing import Awaitable
from pyrogram.types import User
from app import BOT, Message, bot
@bot.add_cmd(cmd=["ban", "unban"])
async def ban_or_unban(bot: BOT, message: Message) -> None:
user, reason = await message.extract_user_n_reason()
if not isinstance(user, User):
await message.reply(user, del_in=10)
return
if message.cmd == "ban":
action: Awaitable = bot.ban_chat_member(
chat_id=message.chat.id, user_id=user.id
)
else:
action: Awaitable = bot.unban_chat_member(
chat_id=message.chat.id, user_id=user.id
)
try:
await action
await message.reply(
text=f"{message.cmd.capitalize()}ned: {user.mention}\nReason: {reason}."
)
except Exception as e:
await message.reply(text=e, del_in=10)
@bot.add_cmd(cmd="kick")
async def kick_user(bot: BOT, message: Message):
user, reason = await message.extract_user_n_reason()
if not isinstance(user, User):
await message.reply(user, del_in=10)
return
try:
await bot.ban_chat_member(chat_id=message.chat.id, user_id=user.id)
await asyncio.sleep(1)
await bot.unban_chat_member(chat_id=message.chat.id, user_id=user.id)
await message.reply(
text=f"{message.cmd.capitalize()}ed: {user.mention}\nReason: {reason}."
)
except Exception as e:
await message.reply(text=e, del_in=10)

32
app/plugins/admin/mute.py Normal file
View File

@ -0,0 +1,32 @@
from pyrogram.types import ChatPermissions, User
from app import BOT, Message, bot
@bot.add_cmd(cmd=["mute", "unmute"])
async def mute_or_unmute(bot: BOT, message: Message):
user, reason = await message.extract_user_n_reason()
if not isinstance(user, User):
await message.reply(user, del_in=10)
return
perms = message.chat.permissions
if message.cmd == "mute":
perms = ChatPermissions(
can_send_messages=False,
can_pin_messages=False,
can_invite_users=False,
can_change_info=False,
can_send_media_messages=False,
can_send_polls=False,
can_send_other_messages=False,
can_add_web_page_previews=False,
)
try:
await bot.restrict_chat_member(
chat_id=message.chat.id, user_id=user.id, permissions=perms
)
await message.reply(
text=f"{message.cmd.capitalize()}d: {user.mention}\nReason: {reason}"
)
except Exception as e:
await message.reply(text=e, del_in=10)

View File

@ -0,0 +1,82 @@
import asyncio
from pyrogram.types import ChatPrivileges, User
from app import BOT, Message, bot
def get_privileges(
anon: bool = False, full: bool = False, without_rights: bool = False
) -> ChatPrivileges:
if without_rights:
return ChatPrivileges(
can_manage_chat=True,
can_manage_video_chats=False,
can_pin_messages=False,
can_delete_messages=False,
can_change_info=False,
can_restrict_members=False,
can_invite_users=False,
can_promote_members=False,
is_anonymous=False,
)
return ChatPrivileges(
can_manage_chat=True,
can_manage_video_chats=True,
can_pin_messages=True,
can_delete_messages=True,
can_change_info=True,
can_restrict_members=True,
can_invite_users=True,
can_promote_members=full,
is_anonymous=anon,
)
@bot.add_cmd(cmd=["promote", "demote"])
async def promote_or_demote(bot: BOT, message: Message) -> None:
"""
CMD: PROMOTE | DEMOTE
INFO: Add/Remove an Admin.
FLAGS:
PROMOTE: -full for full rights, -anon for anon admin
USAGE:
PROMOTE: .promote [ -anon | -full ] [ UID | REPLY | @ ] Title[Optional]
DEMOTE: .demote [ UID | REPLY | @ ]
"""
response: Message = await message.reply(
f"Trying to {message.cmd.capitalize()}....."
)
user, title = await message.extract_user_n_reason()
if not isinstance(user, User):
await response.edit(user, del_in=10)
return
full: bool = "-full" in message.flags
anon: bool = "-anon" in message.flags
without_rights = "-wr" in message.flags
promote = message.cmd == "promote"
if promote:
privileges: ChatPrivileges = get_privileges(
full=full, anon=anon, without_rights=without_rights
)
else:
privileges = ChatPrivileges(can_manage_chat=False)
response_text = f"{message.cmd.capitalize()}d: {user.mention}"
try:
await bot.promote_chat_member(
chat_id=message.chat.id, user_id=user.id, privileges=privileges
)
if promote:
# Let server promote admin before setting title
# Bot is too fast moment 😂😂😂
await asyncio.sleep(1)
await bot.set_administrator_title(
chat_id=message.chat.id, user_id=user.id, title=title or "Admin"
)
if title:
response_text += f"\nTitle: {title}"
if without_rights:
response_text += "\nWithout Rights: True"
await response.edit(text=response_text)
except Exception as e:
await response.edit(text=e, del_in=10, block=True)

View File

@ -0,0 +1,37 @@
import asyncio
from pyrogram.enums import ChatMemberStatus
from pyrogram.errors import FloodWait
from app import BOT, Message, bot
@bot.add_cmd(cmd="zombies")
async def clean_zombies(bot: BOT, message: Message):
me = await bot.get_chat_member(message.chat.id, bot.me.id)
if me.status not in {ChatMemberStatus.ADMINISTRATOR, ChatMemberStatus.OWNER}:
await message.reply("Cannot clean zombies without being admin.")
return
zombies = 0
admin_zombies = 0
response = await message.reply("Cleaning Zombies....\nthis may take a while")
async for member in bot.get_chat_members(message.chat.id):
try:
if member.user.is_deleted:
if member.status in {
ChatMemberStatus.ADMINISTRATOR,
ChatMemberStatus.OWNER,
}:
admin_zombies += 1
continue
zombies += 1
await bot.ban_chat_member(
chat_id=message.chat.id, user_id=member.user.id
)
await asyncio.sleep(1)
except FloodWait as e:
await asyncio.sleep(e.value + 3)
resp_str = f"Cleaned <b>{zombies}</b> zombies."
if admin_zombies:
resp_str += f"\n<b>{admin_zombies}</b> Admin Zombie(s) not Removed."
await response.edit(resp_str)

View File

@ -1,144 +0,0 @@
import asyncio
from typing import Awaitable
from pyrogram.types import ChatPermissions, ChatPrivileges, User
from app import bot
from app.core.message import Message
def get_privileges(
anon: bool = False, full: bool = False, without_rights: bool = False
) -> ChatPrivileges:
if without_rights:
return ChatPrivileges(
can_manage_chat=True,
can_manage_video_chats=False,
can_pin_messages=False,
can_delete_messages=False,
can_change_info=False,
can_restrict_members=False,
can_invite_users=False,
can_promote_members=False,
is_anonymous=False,
)
return ChatPrivileges(
can_manage_chat=True,
can_manage_video_chats=True,
can_pin_messages=True,
can_delete_messages=True,
can_change_info=True,
can_restrict_members=True,
can_invite_users=True,
can_promote_members=full,
is_anonymous=anon,
)
@bot.add_cmd(cmd=["promote", "demote"])
async def promote_or_demote(bot: bot, message: Message) -> None:
response: Message = await message.reply(
f"Trying to {message.cmd.capitalize()}....."
)
user, title = await message.extract_user_n_reason()
if not isinstance(user, User):
await response.edit(user, del_in=10)
return
full: bool = "-full" in message.flags
anon: bool = "-anon" in message.flags
without_rights = "-wr" in message.flags
promote = message.cmd == "promote"
if promote:
privileges: ChatPrivileges = get_privileges(
full=full, anon=anon, without_rights=without_rights
)
else:
privileges = ChatPrivileges(can_manage_chat=False)
response_text = f"{message.cmd.capitalize()}d: {user.mention}"
try:
await bot.promote_chat_member(
chat_id=message.chat.id, user_id=user.id, privileges=privileges
)
if promote:
# Let server promote admin before setting title
# Bot is too fast moment 😂😂😂
await asyncio.sleep(1)
await bot.set_administrator_title(
chat_id=message.chat.id, user_id=user.id, title=title or "Admin"
)
if title:
response_text += f"\nTitle: {title}"
if without_rights:
response_text += "\nWithout Rights: True"
await response.edit(text=response_text)
except Exception as e:
await response.edit(text=e, del_in=10, block=True)
@bot.add_cmd(cmd=["ban", "unban"])
async def ban_or_unban(bot: bot, message: Message) -> None:
user, reason = await message.extract_user_n_reason()
if not isinstance(user, User):
await message.reply(user, del_in=10)
return
if message.cmd == "ban":
action: Awaitable = bot.ban_chat_member(
chat_id=message.chat.id, user_id=user.id
)
else:
action: Awaitable = bot.unban_chat_member(
chat_id=message.chat.id, user_id=user.id
)
try:
await action
await message.reply(
text=f"{message.cmd.capitalize()}ned: {user.mention}\nReason: {reason}."
)
except Exception as e:
await message.reply(text=e, del_in=10)
@bot.add_cmd(cmd="kick")
async def kick_user(bot, message: Message):
user, reason = await message.extract_user_n_reason()
if not isinstance(user, User):
await message.reply(user, del_in=10)
return
try:
await bot.ban_chat_member(chat_id=message.chat.id, user_id=user.id)
await asyncio.sleep(1)
await bot.unban_chat_member(chat_id=message.chat.id, user_id=user.id)
await message.reply(
text=f"{message.cmd.capitalize()}ed: {user.mention}\nReason: {reason}."
)
except Exception as e:
await message.reply(text=e, del_in=10)
@bot.add_cmd(cmd=["mute", "unmute"])
async def mute_or_unmute(bot: bot, message: Message):
user, reason = await message.extract_user_n_reason()
if not isinstance(user, User):
await message.reply(user, del_in=10)
return
perms = message.chat.permissions
if message.cmd == "mute":
perms = ChatPermissions(
can_send_messages=False,
can_pin_messages=False,
can_invite_users=False,
can_change_info=False,
can_send_media_messages=False,
can_send_polls=False,
can_send_other_messages=False,
can_add_web_page_previews=False,
)
try:
await bot.restrict_chat_member(
chat_id=message.chat.id, user_id=user.id, permissions=perms
)
await message.reply(
text=f"{message.cmd.capitalize()}d: {user.mention}\nReason: {reason}."
)
except Exception as e:
await message.reply(text=e, del_in=10)

View File

@ -7,8 +7,8 @@ from urllib.parse import urlparse
import yt_dlp
from app import Message, bot
from app.api.ytdl import FakeLogger
from app.core.aiohttp_tools import in_memory_dl
from app.social_dl.api.ytdl import FakeLogger
from app.utils.aiohttp_tools import in_memory_dl
domains = [
"www.youtube.com",
@ -23,11 +23,10 @@ domains = [
@bot.add_cmd(cmd="song")
async def song_dl(bot: bot, message: Message) -> None | Message:
reply_query = None
if message.replied:
for link in message.replied.text_list:
if urlparse(link).netloc in domains:
reply_query = link
break
for link in message.reply_text_list:
if urlparse(link).netloc in domains:
reply_query = link
break
query = reply_query or message.flt_input
if not query:
return await message.reply("Give a song name or link to download.")

61
app/plugins/dev/exec.py Normal file
View File

@ -0,0 +1,61 @@
import asyncio
import inspect
import sys
import traceback
from io import StringIO
from pyrogram.enums import ParseMode
from app import Config, bot, BOT, Message # isort:skip
from app.utils import shell, aiohttp_tools as aio # isort:skip
async def executor(bot: BOT, message: Message) -> Message | None:
"""
CMD: EXEC
INFO: Run Python Code.
FLAGS: -s to only show output.
USAGE:
.exec [-s] return 1
"""
code: str = message.flt_input.strip()
if not code:
return await message.reply("exec Jo mama?")
reply: Message = await message.reply("executing")
sys.stdout = codeOut = StringIO()
sys.stderr = codeErr = StringIO()
# Indent code as per proper python syntax
formatted_code = "\n ".join(code.splitlines())
try:
# Create and initialise the function
exec(f"async def _exec(bot, message):\n {formatted_code}")
task = asyncio.Task(locals()["_exec"](bot, message), name=reply.task_id)
func_out = await task
except asyncio.exceptions.CancelledError:
return await reply.edit("`Cancelled....`")
except BaseException:
func_out = str(traceback.format_exc())
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
output = codeErr.getvalue().strip() or codeOut.getvalue().strip()
if func_out is not None:
output = f"{output}\n\n{func_out}".strip()
elif not output and "-s" in message.flags:
await reply.delete()
return
if "-s" in message.flags:
output = f">> `{output}`"
else:
output = f"```python\n> {code}```\n\n>> `{output}`"
await reply.edit(
output,
name="exec.txt",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN,
)
if Config.DEV_MODE:
Config.CMD_DICT["py"] = Config.CMD(
func=executor, path=inspect.stack()[0][1], doc=executor.__doc__
)

31
app/plugins/dev/loader.py Normal file
View File

@ -0,0 +1,31 @@
import importlib
import inspect
import sys
import traceback
from app import BOT, Config, Message
async def loader(bot: BOT, message: Message) -> Message | None:
if (
not message.replied
or not message.replied.document
or not message.replied.document.file_name.endswith(".py")
):
return await message.reply("reply to a plugin.")
reply: Message = await message.reply("Loading....")
file_name: str = message.replied.document.file_name.rstrip(".py")
reload = sys.modules.pop(f"app.temp.{file_name}", None)
status: str = "Reloaded" if reload else "Loaded"
await message.replied.download("app/temp/")
try:
importlib.import_module(f"app.temp.{file_name}")
except BaseException:
return await reply.edit(str(traceback.format_exc()))
await reply.edit(f"{status} {file_name}.py.")
if Config.DEV_MODE:
Config.CMD_DICT["load"] = Config.CMD(
func=loader, path=inspect.stack()[0][1], doc=loader.__doc__
)

60
app/plugins/dev/shell.py Normal file
View File

@ -0,0 +1,60 @@
import asyncio
import inspect
from pyrogram.enums import ParseMode
from app import BOT, Config, Message
from app.utils import shell
async def run_cmd(bot: BOT, message: Message) -> Message | None:
cmd: str = message.input.strip()
reply: Message = await message.reply("executing...")
try:
proc_stdout: str = await asyncio.Task(
shell.run_shell_cmd(cmd), name=reply.task_id
)
except asyncio.exceptions.CancelledError:
return await reply.edit("`Cancelled...`")
output: str = f"<pre language=shell>~${cmd}\n\n{proc_stdout}</pre>"
return await reply.edit(output, name="sh.txt", disable_web_page_preview=True)
# Shell with Live Output
async def live_shell(bot: BOT, message: Message) -> Message | None:
cmd: str = message.input.strip()
reply: Message = await message.reply("`getting live output....`")
sub_process: shell.AsyncShell = await shell.AsyncShell.run_cmd(cmd)
sleep_for: int = 1
output: str = ""
try:
async for stdout in sub_process.get_output():
if output != stdout:
if len(stdout) <= 4096:
await reply.edit(
f"```shell\n{stdout}```",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN,
)
output = stdout
if sleep_for >= 6:
sleep_for = 1
await asyncio.Task(asyncio.sleep(sleep_for), name=reply.task_id)
sleep_for += 1
return await reply.edit(
f"<pre language=shell>~${cmd}\n\n{sub_process.full_std}</pre>",
name="shell.txt",
disable_web_page_preview=True,
)
except asyncio.exceptions.CancelledError:
sub_process.cancel()
return await reply.edit(f"`Cancelled....`")
if Config.DEV_MODE:
Config.CMD_DICT["shell"] = Config.CMD(
func=live_shell, path=inspect.stack()[0][1], doc=live_shell.__doc__
)
Config.CMD_DICT["sh"] = Config.CMD(
func=run_cmd, path=inspect.stack()[0][1], doc=run_cmd.__doc__
)

View File

@ -1,143 +0,0 @@
import asyncio
import importlib
import sys
import traceback
from io import StringIO
from pyrogram.enums import ParseMode
from app import Config, Message, bot
from app.core import shell, aiohttp_tools as aio # isort:skip
# Run shell commands
async def run_cmd(bot: bot, message: Message) -> Message | None:
cmd: str = message.input.strip()
reply: Message = await message.reply("executing...")
try:
proc_stdout: str = await asyncio.Task(
shell.run_shell_cmd(cmd), name=reply.task_id
)
except asyncio.exceptions.CancelledError:
return await reply.edit("`Cancelled...`")
output: str = f"~$`{cmd}`\n\n`{proc_stdout}`"
return await reply.edit(output, name="sh.txt", disable_web_page_preview=True)
# Shell with Live Output
async def live_shell(bot: bot, message: Message) -> Message | None:
cmd: str = message.input.strip()
reply: Message = await message.reply("`getting live output....`")
sub_process: shell.AsyncShell = await shell.AsyncShell.run_cmd(cmd)
sleep_for: int = 1
output: str = ""
try:
async for stdout in sub_process.get_output():
if output != stdout:
if len(stdout) <= 4096:
await reply.edit(
f"`{stdout}`",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN,
)
output = stdout
if sleep_for >= 6:
sleep_for = 1
await asyncio.Task(asyncio.sleep(sleep_for), name=reply.task_id)
sleep_for += 1
return await reply.edit(
f"~$`{cmd}\n\n``{sub_process.full_std}`",
name="shell.txt",
disable_web_page_preview=True,
)
except asyncio.exceptions.CancelledError:
sub_process.cancel()
return await reply.edit(f"`Cancelled....`")
# Run Python code
async def executor(bot: bot, message: Message) -> Message | None:
code: str = message.flt_input.strip()
if not code:
return await message.reply("exec Jo mama?")
reply: Message = await message.reply("executing")
sys.stdout = codeOut = StringIO()
sys.stderr = codeErr = StringIO()
# Indent code as per proper python syntax
formatted_code = "\n ".join(code.splitlines())
try:
# Create and initialise the function
exec(f"async def _exec(bot, message):\n {formatted_code}")
func_out = await asyncio.Task(
locals()["_exec"](bot, message), name=reply.task_id
)
except asyncio.exceptions.CancelledError:
return await reply.edit("`Cancelled....`")
except BaseException:
func_out = str(traceback.format_exc())
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
output = codeErr.getvalue().strip() or codeOut.getvalue().strip()
if func_out is not None:
output = f"{output}\n\n{func_out}".strip()
elif not output and "-s" in message.flags:
await reply.delete()
return
if "-s" in message.flags:
output = f">> `{output}`"
else:
output = f"> `{code}`\n\n>> `{output}`"
await reply.edit(
output,
name="exec.txt",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN,
)
async def loader(bot: bot, message: Message) -> Message | None:
if (
not message.replied
or not message.replied.document
or not message.replied.document.file_name.endswith(".py")
):
return await message.reply("reply to a plugin.")
reply: Message = await message.reply("Loading....")
file_name: str = message.replied.document.file_name.rstrip(".py")
reload = sys.modules.pop(f"app.temp.{file_name}", None)
status: str = "Reloaded" if reload else "Loaded"
await message.replied.download("app/temp/")
try:
importlib.import_module(f"app.temp.{file_name}")
except BaseException:
return await reply.edit(str(traceback.format_exc()))
await reply.edit(f"{status} {file_name}.py.")
@bot.add_cmd(cmd="c")
async def cancel_task(bot: bot, message: Message) -> Message | None:
task_id: str | None = message.replied_task_id
if not task_id:
return await message.reply(
text="Reply To a Command or Bot's Response Message.", del_in=8
)
all_tasks: set[asyncio.all_tasks] = asyncio.all_tasks()
tasks: list[asyncio.Task] | None = [x for x in all_tasks if x.get_name() == task_id]
if not tasks:
return await message.reply(
text="Task not in Currently Running Tasks.", del_in=8
)
response: str = ""
for task in tasks:
status: bool = task.cancel()
response += f"Task: __{task.get_name()}__\nCancelled: __{status}__\n"
await message.reply(response, del_in=5)
if Config.DEV_MODE:
Config.CMD_DICT["sh"] = run_cmd
Config.CMD_DICT["shell"] = live_shell
Config.CMD_DICT["exec"] = executor
Config.CMD_DICT["load"] = loader

View File

@ -0,0 +1,92 @@
import asyncio
import os
import time
from app import BOT, bot
from app.core import Message
from app.utils.downloader import Download, DownloadedFile
from app.utils.helpers import progress
from app.utils.media_helper import get_tg_media_details
@bot.add_cmd(cmd="download")
async def down_load(bot: BOT, message: Message):
"""
CMD: DOWNLOAD
INFO: Download Files/TG Media to Bot server.
FLAGS: "-f" for custom filename
USAGE:
.download URL | Reply to Media
.download -f file.ext URL | Reply to Media
"""
response = await message.reply("Checking Input...")
if (not message.replied or not message.replied.media) and not message.input:
await response.edit(
"Invalid input...\nReply to a message containing media or give a link with cmd."
)
return
dl_path = os.path.join("downloads", str(time.time()))
await response.edit("Input verified....Starting Download...")
file_name = None
if message.replied and message.replied.media:
if "-f" in message.flags:
file_name = message.flt_input
download_coro = telegram_download(
message=message.replied,
response=response,
path=dl_path,
file_name=file_name,
)
else:
if "-f" in message.flags:
file_name, url = message.flt_input.split()
else:
url = message.flt_input
dl_obj: Download = await Download.setup(
url=url, path=dl_path, message_to_edit=response, custom_file_name=file_name
)
download_coro = dl_obj.download()
try:
downloaded_file: DownloadedFile = await download_coro
await response.edit(
f"<b>Download Completed</b>"
f"\n<pre language=bash>"
f"\nfile={downloaded_file.name}"
f"\npath={downloaded_file.full_path}"
f"\nsize={downloaded_file.size}mb</pre>"
)
return downloaded_file
except asyncio.exceptions.CancelledError:
await response.edit("Cancelled....")
except TimeoutError:
await response.edit("Download Timeout...")
except Exception as e:
await response.edit(str(e))
async def telegram_download(
message: Message, response: Message, path: str, file_name: str | None = None
) -> DownloadedFile:
"""
:param message: Message Containing Media
:param response: Response to Edit
:param path: Download path
:param file_name: Custom File Name
:return: DownloadedFile
"""
tg_media = get_tg_media_details(message)
tg_media.file_name = file_name or tg_media.file_name
media_obj: DownloadedFile = DownloadedFile(
name=tg_media.file_name,
path=path,
size=round(tg_media.file_size / 1048576, 1),
full_path=os.path.join(path, tg_media.file_name),
)
progress_args = (response, "Downloading...", media_obj.name, media_obj.full_path)
await message.download(
file_name=media_obj.full_path,
progress=progress,
progress_args=progress_args,
)
return media_obj

View File

@ -0,0 +1,70 @@
import asyncio
import os
import shutil
import time
from app import BOT, bot
from app.core import Message
from app.plugins.files.download import telegram_download
from app.plugins.files.upload import FILE_TYPE_MAP
from app.utils.downloader import Download, DownloadedFile
from app.utils.helpers import progress
@bot.add_cmd(cmd="rename")
async def rename(bot: BOT, message: Message):
"""
CMD: RENAME
INFO: Upload Files with custom name
FLAGS: -s for spoiler
USAGE:
.rename [ url | reply to message ] file_name.ext
"""
input = message.flt_input
response = await message.reply("Checking input...")
if (not message.replied or not message.replied.media) and not message.flt_input:
await response.edit(
"Invalid input...\nReply to a message containing media or give a link and filename with cmd."
)
return
dl_path = os.path.join("downloads", str(time.time()))
await response.edit("Input verified....Starting Download...")
if message.replied:
download_coro = telegram_download(
message=message.replied,
path=dl_path,
file_name=input,
response=response,
)
else:
url, file_name = input.split()
dl_obj: Download = await Download.setup(
url=url, path=dl_path, message_to_edit=response, custom_file_name=file_name
)
download_coro = dl_obj.download()
try:
downloaded_file: DownloadedFile = await download_coro
media: dict = await FILE_TYPE_MAP[downloaded_file.type](
downloaded_file, has_spoiler="-s" in message.flags
)
progress_args = (
response,
"Uploading...",
downloaded_file.name,
downloaded_file.full_path,
)
await media["method"](
chat_id=message.chat.id,
reply_to_message_id=message.reply_id,
progress=progress,
progress_args=progress_args,
**media["kwargs"]
)
shutil.rmtree(dl_path, ignore_errors=True)
await response.delete()
except asyncio.exceptions.CancelledError:
await response.edit("Cancelled....")
except TimeoutError:
await response.edit("Download Timeout...")
except Exception as e:
await response.edit(str(e))

151
app/plugins/files/upload.py Normal file
View File

@ -0,0 +1,151 @@
import asyncio
import os
import time
from app import BOT, Config, bot
from app.core import Message
from app.utils.downloader import Download, DownloadedFile
from app.utils.helpers import progress
from app.utils.media_helper import MediaType, bytes_to_mb
from app.utils.shell import check_audio, get_duration, take_ss
async def video_upload(
file: DownloadedFile, has_spoiler: bool
) -> dict[str, bot.send_video, bot.send_animation, dict]:
thumb = await take_ss(file.full_path, path=file.path)
if not (await check_audio(file.full_path)): # fmt:skip
return dict(
method=bot.send_animation,
kwargs=dict(
thumb=thumb,
unsave=True,
animation=file.full_path,
duration=await get_duration(file.full_path),
has_spoiler=has_spoiler,
),
)
return dict(
method=bot.send_video,
kwargs=dict(
thumb=thumb,
video=file.full_path,
duration=await get_duration(file.full_path),
has_spoiler=has_spoiler,
),
)
async def photo_upload(
file: DownloadedFile, has_spoiler: bool
) -> dict[str, bot.send_photo, dict]:
return dict(
method=bot.send_photo,
kwargs=dict(photo=file.full_path, has_spoiler=has_spoiler),
)
async def audio_upload(
file: DownloadedFile, has_spoiler: bool
) -> dict[str, bot.send_audio, dict]:
return dict(
method=bot.send_audio,
kwargs=dict(
audio=file.full_path, duration=await get_duration(file=file.full_path)
),
)
async def doc_upload(
file: DownloadedFile, has_spoiler: bool
) -> dict[str, bot.send_document, dict]:
return dict(
method=bot.send_document,
kwargs=dict(document=file.full_path, force_document=True),
)
FILE_TYPE_MAP = {
MediaType.PHOTO: photo_upload,
MediaType.DOCUMENT: doc_upload,
MediaType.GIF: video_upload,
MediaType.AUDIO: audio_upload,
MediaType.VIDEO: video_upload,
}
def file_check(file: str):
return os.path.isfile(file)
@bot.add_cmd(cmd="upload")
async def upload(bot: BOT, message: Message):
"""
CMD: UPLOAD
INFO: Upload Media/Local Files/Plugins to TG.
FLAGS:
-d: to upload as doc.
-s: spoiler.
USAGE:
.upload [-d] URL | Path to File | CMD
"""
input = message.flt_input
if not input:
await message.reply("give a file url | path to upload.")
return
response = await message.reply("checking input...")
if input in Config.CMD_DICT.keys():
await message.reply_document(document=Config.CMD_DICT[input].path)
await response.delete()
return
elif input.startswith("http") and not file_check(input):
dl_obj: Download = await Download.setup(
url=input,
path=os.path.join("downloads", str(time.time())),
message_to_edit=response,
)
if not bot.me.is_premium and dl_obj.size > 1999:
await response.edit(
"<b>Aborted</b>, File size exceeds 2gb limit for non premium users!!!"
)
return
try:
file: DownloadedFile = await dl_obj.download()
except asyncio.exceptions.CancelledError:
await response.edit("Cancelled...")
return
except TimeoutError:
await response.edit("Download Timeout...")
return
elif file_check(input):
file = DownloadedFile(
name=input,
path=os.path.dirname(input),
full_path=input,
size=bytes_to_mb(os.path.getsize(input)),
)
else:
await response.edit("invalid `cmd` | `url` | `file path`!!!")
return
await response.edit("uploading....")
progress_args = (response, "Uploading...", file.name, file.full_path)
if "-d" in message.flags:
media: dict = dict(
method=bot.send_document,
kwargs=dict(document=file.full_path, force_document=True),
)
else:
media: dict = await FILE_TYPE_MAP[file.type](
file, has_spoiler="-s" in message.flags
)
try:
await media["method"](
chat_id=message.chat.id,
reply_to_message_id=message.reply_id,
progress=progress,
progress_args=progress_args,
**media["kwargs"]
)
await response.delete()
except asyncio.exceptions.CancelledError:
await response.edit("Cancelled....")

View File

@ -1,87 +0,0 @@
import os
from pyrogram.enums import ChatType
from pyrogram.errors import BadRequest
from app import Config, Message, bot
# Delete replied and command message
@bot.add_cmd(cmd="del")
async def delete_message(bot: bot, message: Message) -> None:
await message.delete(reply=True)
# Delete Multiple messages from replied to command.
@bot.add_cmd(cmd="purge")
async def purge_(bot: bot, message: Message) -> None | Message:
start_message: int = message.reply_id
if not start_message:
return await message.reply("reply to a message")
end_message: int = message.id
messages: list[int] = [
end_message,
*[i for i in range(int(start_message), int(end_message))],
]
await bot.delete_messages(
chat_id=message.chat.id, message_ids=messages, revoke=True
)
@bot.add_cmd(cmd="ids")
async def get_ids(bot: bot, message: Message) -> None:
reply: Message = message.replied
if reply:
ids: str = ""
reply_forward = reply.forward_from_chat
reply_user = reply.from_user
ids += f"Chat : `{reply.chat.id}`\n"
if reply_forward:
ids += f"Replied {'Channel' if reply_forward.type == ChatType.CHANNEL else 'Chat'} : `{reply_forward.id}`\n"
if reply_user:
ids += f"User : {reply.from_user.id}"
else:
ids: str = f"Chat :`{message.chat.id}`"
await message.reply(ids)
@bot.add_cmd(cmd="join")
async def join_chat(bot: bot, message: Message) -> Message:
chat: str = message.input
try:
await bot.join_chat(chat)
except (KeyError, BadRequest):
try:
await bot.join_chat(os.path.basename(chat).strip())
except Exception as e:
return await message.reply(str(e))
await message.reply("Joined")
@bot.add_cmd(cmd="leave")
async def leave_chat(bot: bot, message: Message) -> None:
if message.input:
chat = message.input
else:
chat = message.chat.id
await message.reply(
f"Leaving current chat in 5\nReply with `{Config.TRIGGER}c` to cancel",
del_in=5,
block=True,
)
try:
await bot.leave_chat(chat)
except Exception as e:
await message.reply(str(e))
@bot.add_cmd(cmd="reply")
async def reply(bot: bot, message: Message) -> None:
text: str = message.input
await bot.send_message(
chat_id=message.chat.id,
text=text,
reply_to_message_id=message.reply_id,
disable_web_page_preview=True,
)

View File

@ -1,10 +1,15 @@
import asyncio
from app import Message, bot
from app import BOT, Message, bot
@bot.add_cmd(cmd="c")
async def cancel_task(bot: bot, message: Message) -> Message | None:
async def cancel_task(bot: BOT, message: Message) -> Message | None:
"""
CMD: CANCEL
INFO: Cancel a running command by replying to a message.
USAGE: .c
"""
task_id: str | None = message.replied_task_id
if not task_id:
return await message.reply(

56
app/plugins/tools/chat.py Normal file
View File

@ -0,0 +1,56 @@
import os
from pyrogram.enums import ChatType
from pyrogram.errors import BadRequest
from app import BOT, Message, bot
@bot.add_cmd(cmd="ids")
async def get_ids(bot: BOT, message: Message) -> None:
reply: Message = message.replied
if reply:
ids: str = ""
reply_forward = reply.forward_from_chat
reply_user = reply.from_user
ids += f"<b>Chat</b> : `{reply.chat.id}`\n"
if reply_forward:
ids += f"<b>Replied {'Channel' if reply_forward.type == ChatType.CHANNEL else 'Chat'}</b> : `{reply_forward.id}`\n"
if reply_user:
ids += f"<b>User</b> : {reply.from_user.id}"
elif message.input:
ids: int = (await bot.get_chat(message.input[1:])).id
else:
ids: str = f"<b>Chat</b> :`{message.chat.id}`"
await message.reply(ids)
@bot.add_cmd(cmd="join")
async def join_chat(bot: BOT, message: Message) -> None:
chat: str = message.input
try:
await bot.join_chat(chat)
except (KeyError, BadRequest):
try:
await bot.join_chat(os.path.basename(chat).strip())
except Exception as e:
await message.reply(str(e))
return
await message.reply("Joined")
@bot.add_cmd(cmd="leave")
async def leave_chat(bot: BOT, message: Message) -> None:
if message.input:
chat = message.input
else:
chat = message.chat.id
await message.reply(
text=f"Leaving current chat in 5\nReply with `{message.trigger}c` to cancel",
del_in=5,
block=True,
)
try:
await bot.leave_chat(chat)
except Exception as e:
await message.reply(str(e))

View File

@ -0,0 +1,14 @@
from app import BOT, Message, bot
@bot.add_cmd(cmd="click")
async def click(bot: BOT, message: Message):
if not message.input or not message.replied:
await message.reply(
"reply to a message containing a button and give a button to click"
)
return
try:
await message.replied.click(message.input.strip())
except Exception as e:
await message.reply(str(e), del_in=5)

View File

@ -0,0 +1,34 @@
from app import BOT, bot
from app.core import Message
from app.plugins.tools.get_message import parse_link
@bot.add_cmd(cmd="del")
async def delete_message(bot: BOT, message: Message) -> None:
"""
CMD: DEL
INFO: Delete the replied message.
FLAGS: -r to remotely delete a text using its link.
USAGE:
.del | .del -r t.me/......
"""
if "-r" in message.flags:
chat_id, message_id = parse_link(message.flt_input)
await bot.delete_messages(chat_id=chat_id, message_ids=message_id, revoke=True)
return
await message.delete(reply=True)
@bot.add_cmd(cmd="purge")
async def purge_(bot: BOT, message: Message) -> None | Message:
start_message: int = message.reply_id
if not start_message:
return await message.reply("reply to a message")
end_message: int = message.id
messages: list[int] = [
end_message,
*[i for i in range(int(start_message), int(end_message))],
]
await bot.delete_messages(
chat_id=message.chat.id, message_ids=messages, revoke=True
)

View File

@ -0,0 +1,36 @@
from urllib.parse import urlparse
from app import BOT, Message, bot
def parse_link(link: str) -> tuple[int | str, int]:
parsed_url: str = urlparse(link).path.strip("/")
chat, id = parsed_url.lstrip("c/").split("/")
if chat.isdigit():
chat = int(f"-100{chat}")
return chat, int(id)
@bot.add_cmd(cmd="gm")
async def get_message(bot: BOT, message: Message):
"""
CMD: Get Message
INFO: Get a Message Json/Attr by providing link.
USAGE:
.gm t.me/.... | .gm t.me/... text [Returns message text]
"""
if not message.input:
await message.reply("Give a Message link.")
attr = None
if len(message.text_list) == 3:
link, attr = message.text_list[1:]
else:
link = message.input.strip()
remote_message = await bot.get_messages(*parse_link(link))
if not attr:
await message.reply(str(remote_message))
return
if hasattr(remote_message, attr):
await message.reply(str(getattr(remote_message, attr)))
return
await message.reply(f"Message object has no attribute '{attr}'")

View File

@ -0,0 +1,32 @@
from app import BOT, Message, bot
from app.plugins.tools.get_message import parse_link
@bot.add_cmd(cmd="reply")
async def reply(bot: BOT, message: Message) -> None:
"""
CMD: REPLY
INFO: Reply to a Message.
FLAGS: -r to reply remotely using message link.
USAGE:
.reply HI | .reply -r t.me/... HI
"""
if "-r" in message.flags:
input: list[str] = message.flt_input.split(" ", maxsplit=1)
if len(input) < 2:
await message.reply("The '-r' flag requires a message link and text.")
return
message_link, text = input
chat_id, reply_to_message_id = parse_link(message_link.strip())
else:
text: str = message.input
chat_id = message.chat.id
reply_to_message_id = message.reply_id
if not text:
return
await bot.send_message(
chat_id=chat_id,
text=text,
reply_to_message_id=reply_to_message_id,
disable_web_page_preview=True,
)

View File

@ -1,88 +0,0 @@
import asyncio
import os
from async_lru import alru_cache
from git import Repo
from pyrogram.enums import ChatType
from app import Config, Message, bot
@alru_cache()
async def get_banner() -> Message:
return await bot.get_messages("Social_DL", [2, 3])
@bot.add_cmd(cmd="bot")
async def info(bot, message):
head = "<b><a href=https://t.me/Social_DL>Social-DL</a> is running.</b>"
chat_count = (
f"\n<b>Auto-Dl enabled in: <code>{len(Config.CHATS)}</code> chats</b>\n"
)
supported_sites, photo = await get_banner()
await photo.copy(
message.chat.id,
caption="\n".join([head, chat_count, supported_sites.text.html]),
)
@bot.add_cmd(cmd="help")
async def cmd_list(bot: bot, message: Message) -> None:
commands: str = "\n".join(
[f"<code>{Config.TRIGGER}{i}</code>" for i in Config.CMD_DICT.keys()]
)
await message.reply(f"<b>Available Commands:</b>\n\n{commands}", del_in=30)
@bot.add_cmd(cmd="restart")
async def restart(bot: bot, message: Message, u_resp: Message | None = None) -> None:
reply: Message = u_resp or await message.reply("restarting....")
if reply.chat.type in (ChatType.GROUP, ChatType.SUPERGROUP):
os.environ["RESTART_MSG"] = str(reply.id)
os.environ["RESTART_CHAT"] = str(reply.chat.id)
await bot.restart(hard="-h" in message.flags)
@bot.add_cmd(cmd="refresh")
async def chat_update(bot: bot, message: Message) -> None:
await bot.set_filter_list()
await message.reply("Filters Refreshed", del_in=8)
@bot.add_cmd(cmd="repo")
async def sauce(bot: bot, message: Message) -> None:
await bot.send_message(
chat_id=message.chat.id,
text="<a href='https://github.com/anonymousx97/social-dl'>Social-DL</a>",
reply_to_message_id=message.reply_id or message.id,
disable_web_page_preview=True,
)
@bot.add_cmd(cmd="update")
async def updater(bot: bot, message: Message) -> None | Message:
reply: Message = await message.reply("Checking for Updates....")
repo: Repo = Repo()
repo.git.fetch()
commits: str = ""
limit: int = 0
for commit in repo.iter_commits("HEAD..origin/main"):
commits += f"""
<b>#{commit.count()}</b> <a href='{Config.UPSTREAM_REPO}/commit/{commit}'>{commit.summary}</a> By <i>{commit.author}</i>
"""
limit += 1
if limit > 50:
break
if not commits:
return await reply.edit(text="Already Up To Date.", del_in=5)
if "-pull" not in message.flags:
return await reply.edit(
text=f"<b>Update Available:</b>\n\n{commits}", disable_web_page_preview=True
)
repo.git.reset("--hard")
repo.git.pull(Config.UPSTREAM_REPO, "--rebase=true")
await asyncio.gather(
bot.log(text=f"#Updater\nPulled:\n\n{commits}", disable_web_page_preview=True),
reply.edit("<b>Update Found</b>\n<i>Pulling....</i>"),
)
await restart(bot, message, reply)

View File

@ -0,0 +1,27 @@
import os
from app import BOT, Config, Message, bot
@bot.add_cmd(cmd="ci")
async def cmd_info(bot: BOT, message: Message):
"""
CMD: CI (CMD INFO)
INFO: Get Github File URL of a Command.
USAGE: .ci ci
"""
cmd = message.flt_input
if not cmd or cmd not in Config.CMD_DICT.keys():
await message.reply("Give a valid cmd.", del_in=5)
return
cmd_path = Config.CMD_DICT[cmd].path
plugin_path = os.path.relpath(cmd_path, os.curdir)
repo = Config.REPO.remotes.origin.url
branch = Config.REPO.active_branch
remote_url = os.path.join(str(repo), "blob", str(branch), plugin_path)
resp_str = (
f"<pre language=bash>CMD={cmd}"
f"\nLocal_Path={cmd_path}</pre>"
f"\nLink: <a href='{remote_url}'>Github</a>"
)
await message.reply(resp_str, disable_web_page_preview=True)

30
app/plugins/utils/help.py Normal file
View File

@ -0,0 +1,30 @@
from app import BOT, Config, Message, bot
@bot.add_cmd(cmd="help")
async def cmd_list(bot: BOT, message: Message) -> None:
"""
CMD: HELP
INFO: Check info/about available cmds.
USAGE:
.help help | .help
"""
cmd = message.input.strip()
if not cmd:
commands: str = " ".join(
[
f"<code>{message.trigger}{cmd}</code>"
for cmd in sorted(Config.CMD_DICT.keys())
]
)
await message.reply(
text=f"<b>Available Commands:</b>\n\n{commands}", del_in=30, block=True
)
elif cmd not in Config.CMD_DICT.keys():
await message.reply(
f"Invalid <b>{cmd}</b>, check {message.trigger}help", del_in=5
)
else:
await message.reply(
f"<pre language=js>Doc:{Config.CMD_DICT[cmd].doc}</pre>", del_in=30
)

14
app/plugins/utils/logs.py Normal file
View File

@ -0,0 +1,14 @@
import aiofiles
from app import BOT, bot
from app.core import Message
@bot.add_cmd(cmd="logs")
async def read_logs(bot: BOT, message: Message):
async with aiofiles.open("logs/app_logs.txt", "r") as aio_file:
text = await aio_file.read()
if len(text) < 4050:
await message.reply(f"<pre language=bash>{text}</pre>")
else:
await message.reply_document(document="logs/app_logs.txt")

13
app/plugins/utils/ping.py Normal file
View File

@ -0,0 +1,13 @@
from datetime import datetime
from app import BOT, Message, bot
# Not my Code
# Prolly from Userge/UX/VenomX IDK
@bot.add_cmd(cmd="ping")
async def ping_bot(bot: BOT, message: Message):
start = datetime.now()
resp: Message = await message.reply("Checking Ping.....")
end = (datetime.now() - start).microseconds / 1000
await resp.edit(f"Pong! {end} ms.")

11
app/plugins/utils/repo.py Normal file
View File

@ -0,0 +1,11 @@
from app import BOT, Message, bot
@bot.add_cmd(cmd="repo")
async def sauce(bot: BOT, message: Message) -> None:
await bot.send_message(
chat_id=message.chat.id,
text="<a href='https://github.com/anonymousx97/social-dl'>Social-DL</a>",
reply_to_message_id=message.reply_id or message.id,
disable_web_page_preview=True,
)

View File

@ -0,0 +1,21 @@
import os
from pyrogram.enums import ChatType
from app import BOT, Message, bot
@bot.add_cmd(cmd="restart")
async def restart(bot: BOT, message: Message, u_resp: Message | None = None) -> None:
"""
CMD: RESTART
INFO: Restart the Bot.
FLAGS: -h for hard restart and clearing logs
Usage:
.restart | .restart -h
"""
reply: Message = u_resp or await message.reply("restarting....")
if reply.chat.type in (ChatType.GROUP, ChatType.SUPERGROUP):
os.environ["RESTART_MSG"] = str(reply.id)
os.environ["RESTART_CHAT"] = str(reply.chat.id)
await bot.restart(hard="-h" in message.flags)

View File

@ -0,0 +1,69 @@
import asyncio
from git import Repo
from app import BOT, Config, Message, bot
from app.plugins.utils.restart import restart
async def get_commits(repo: Repo) -> str | None:
try:
async with asyncio.timeout(10):
await asyncio.to_thread(repo.git.fetch)
except TimeoutError:
return
commits: str = ""
limit: int = 0
for commit in repo.iter_commits("HEAD..origin/main"):
commits += f"""
<b>#{commit.count()}</b> <a href='{Config.UPSTREAM_REPO}/commit/{commit}'>{commit.message}</a> By <i>{commit.author}</i>
"""
limit += 1
if limit > 50:
break
return commits
async def pull_commits(repo: Repo) -> None | bool:
repo.git.reset("--hard")
try:
async with asyncio.timeout(10):
await asyncio.to_thread(
repo.git.pull, Config.UPSTREAM_REPO, "--rebase=true"
)
return True
except TimeoutError:
return
@bot.add_cmd(cmd="update")
async def updater(bot: BOT, message: Message) -> None | Message:
"""
CMD: UPDATE
INFO: Pull / Check for updates.
FLAGS: -pull to pull updates
USAGE:
.update | .update -pull
"""
reply: Message = await message.reply("Checking for Updates....")
repo: Repo = Config.REPO
commits: str = await get_commits(repo)
if commits is None:
await reply.edit("Timeout... Try again.")
return
if not commits:
await reply.edit("Already Up To Date.", del_in=5)
return
if "-pull" not in message.flags:
await reply.edit(
f"<b>Update Available:</b>\n{commits}", disable_web_page_preview=True
)
return
if not (await pull_commits(repo)): # NOQA
await reply.edit("Timeout...Try again.")
return
await asyncio.gather(
bot.log(text=f"#Updater\nPulled:\n{commits}", disable_web_page_preview=True),
reply.edit("<b>Update Found</b>\n<i>Pulling....</i>"),
)
await restart(bot, message, reply)

View File

@ -3,14 +3,14 @@ import glob
import os
import time
from app.core import shell
from app.core.scraper_config import MediaType, ScraperConfig
from app.social_dl.scraper_config import ScraperConfig
from app.utils import shell
from app.utils.media_helper import MediaType
class GalleryDL(ScraperConfig):
def __init__(self, url: str):
super().__init__()
self.url: str = url
super().__init__(url=url)
async def download_or_extract(self):
self.path: str = "downloads/" + str(time.time())
@ -18,7 +18,7 @@ class GalleryDL(ScraperConfig):
try:
async with asyncio.timeout(30):
await shell.run_shell_cmd(
f"gallery-dl -q --range '0-4' -D {self.path} '{self.url}'"
f"gallery-dl -q --range '0-4' -D {self.path} '{self.raw_url}'"
)
except TimeoutError:
pass

130
app/social_dl/api/instagram.py Executable file
View File

@ -0,0 +1,130 @@
import os
from urllib.parse import urlparse
from app import LOGGER, Config, Message, bot
from app.social_dl.scraper_config import ScraperConfig
from app.utils import aiohttp_tools as aio
from app.utils.media_helper import MediaType, get_type
class ApiKeys:
def __init__(self):
self.API2_KEYS: list = Config.API_KEYS
self.api_2 = 0
# Rotating Key function to avoid hitting limit on single Key
def get_key(self, func: str) -> str:
keys = self.API2_KEYS
count = getattr(self, func) + 1
if count >= len(keys):
count = 0
setattr(self, func, count)
return keys[count]
# def switch(self) -> int:
# self.switch_val += 1
# if self.switch_val >= 3:
# self.switch_val = 0
# return self.switch_val
api_keys: ApiKeys = ApiKeys()
class Instagram(ScraperConfig):
def __init__(self, url):
self.APIS = (
"check_dump",
"no_api_dl",
"api_2",
)
super().__init__(url=url)
parsed_url = urlparse(url)
self.shortcode: str = os.path.basename(parsed_url.path.rstrip("/"))
self.api_url = f"https://www.instagram.com/graphql/query?query_hash=2b0673e0dc4580674a88d426fe00ea90&variables=%7B%22shortcode%22%3A%22{self.shortcode}%22%7D"
self.dump: bool = True
async def check_dump(self) -> None | bool:
if not Config.DUMP_ID:
return
async for message in bot.search_messages(Config.DUMP_ID, "#" + self.shortcode):
self.media: Message = message
self.type: MediaType = MediaType.MESSAGE
self.in_dump: bool = True
return True
async def download_or_extract(self) -> None:
for api in self.APIS:
func = getattr(self, api)
if await func():
self.success: bool = True
break
async def no_api_dl(self):
response: dict | None = await aio.get_json(url=self.api_url)
if (
not response
or "data" not in response
or not response["data"]["shortcode_media"]
):
LOGGER.error(response)
return
return await self.parse_ghraphql(response["data"]["shortcode_media"])
async def api_2(self) -> bool | None:
if not Config.API_KEYS:
return
# "/?__a=1&__d=1"
response: dict | None = await aio.get_json(
url="https://api.webscraping.ai/html",
timeout=30,
params={
"api_key": api_keys.get_key("api_2"),
"url": self.api_url,
"proxy": "residential",
"js": "false",
},
)
if (
not response
or "data" not in response.keys()
or not response["data"]["shortcode_media"]
):
LOGGER.error(response)
return
self.caption = ".."
return await self.parse_ghraphql(response["data"]["shortcode_media"])
async def parse_ghraphql(self, json_: dict) -> str | list | None:
type_check: str | None = json_.get("__typename", None)
if not type_check:
return
elif type_check == "GraphSidecar":
self.media: list[str] = [
i["node"].get("video_url") or i["node"].get("display_url")
for i in json_["edge_sidecar_to_children"]["edges"]
]
self.type: MediaType = MediaType.GROUP
else:
self.media: str = json_.get("video_url", json_.get("display_url"))
self.thumb: str = json_.get("display_url")
self.type: MediaType = get_type(self.media)
return self.media
async def parse_v2_json(self, data: dict):
if data.get("carousel_media"):
self.media = []
for media in data["carousel_media"]:
if media.get("video_dash_manifest"):
self.media.append(media["video_versions"][0]["url"])
else:
self.media.append(media["image_versions2"]["candidates"][0]["url"])
self.type = MediaType.GROUP
elif data.get("video_dash_manifest"):
self.media = data["video_versions"][0]["url"]
self.type = MediaType.VIDEO
else:
self.media = data["image_versions2"]["candidates"][0]["url"]
self.type = MediaType.PHOTO
return 1

View File

@ -3,14 +3,14 @@ import re
import time
from urllib.parse import urlparse
from app.core import shell
from app.core import aiohttp_tools
from app.core.scraper_config import MediaType, ScraperConfig
from app.social_dl.scraper_config import ScraperConfig
from app.utils import aiohttp_tools, shell
from app.utils.media_helper import MediaType, get_type
class Reddit(ScraperConfig):
def __init__(self, url):
super().__init__()
super().__init__(url=url)
parsed_url = urlparse(url)
self.url: str = f"https://www.reddit.com{parsed_url.path}"
@ -21,7 +21,7 @@ class Reddit(ScraperConfig):
try:
json_: dict = json_data[0]["data"]["children"][0]["data"]
except BaseException:
except (KeyError, IndexError, ValueError):
return
self.caption: str = (
@ -59,7 +59,7 @@ class Reddit(ScraperConfig):
return
generic: str = json_.get("url_overridden_by_dest", "").strip()
self.type: MediaType = aiohttp_tools.get_type(generic)
self.type: MediaType = get_type(generic)
if self.type:
self.media: str = generic
self.success = True
@ -72,7 +72,8 @@ class Reddit(ScraperConfig):
url=f"{self.url}.json?limit=1", headers=headers, json_=True
)
if not response:
raw_url = (await aiohttp_tools.SESSION.get(self.url)).url # fmt : skip
# fmt : skip
raw_url = (await aiohttp_tools.SESSION.get(self.url)).url
parsed_url = urlparse(f"{raw_url}")
url: str = f"https://www.reddit.com{parsed_url.path}"

View File

@ -3,17 +3,17 @@ from urllib.parse import urlparse
from bs4 import BeautifulSoup
from app.core import aiohttp_tools
from app.core.scraper_config import MediaType, ScraperConfig
from app.social_dl.scraper_config import ScraperConfig
from app.utils import aiohttp_tools
from app.utils.media_helper import MediaType
class Threads(ScraperConfig):
def __init__(self, url):
super().__init__()
self.url: str = url
super().__init__(url=url)
async def download_or_extract(self):
shortcode: str = os.path.basename(urlparse(self.url).path.rstrip("/"))
shortcode: str = os.path.basename(urlparse(self.raw_url).path.rstrip("/"))
response: str = await (
await aiohttp_tools.SESSION.get(

View File

@ -1,16 +1,16 @@
from app.api.tiktok_scraper import Scraper as Tiktok_Scraper
from app.core.scraper_config import MediaType, ScraperConfig
from app.social_dl.api.tiktok_scraper import Scraper as Tiktok_Scraper
from app.social_dl.scraper_config import ScraperConfig
from app.utils.media_helper import MediaType
tiktok_scraper = Tiktok_Scraper(quiet=True)
class Tiktok(ScraperConfig):
def __init__(self, url):
super().__init__()
self.url: str = url
super().__init__(url=url)
async def download_or_extract(self):
media: dict | None = await tiktok_scraper.hybrid_parsing(self.url)
media: dict | None = await tiktok_scraper.hybrid_parsing(self.raw_url)
if not media or "status" not in media or media["status"] == "failed":
return
if "video_data" in media:

View File

@ -4,12 +4,14 @@ import time
import yt_dlp
from app.core.scraper_config import MediaType, ScraperConfig
from app.core.shell import take_ss
from app.social_dl.scraper_config import ScraperConfig
from app.utils.media_helper import MediaType
from app.utils.shell import take_ss
# To disable YT-DLP logging
# https://github.com/ytdl-org/youtube-dl/blob/fa7f0effbe4e14fcf70e1dc4496371c9862b64b9/test/helper.py#L92
class FakeLogger(object):
def debug(self, msg):
pass
@ -23,8 +25,7 @@ class FakeLogger(object):
class YouTubeDL(ScraperConfig):
def __init__(self, url):
super().__init__()
self.url: str = url
super().__init__(url=url)
self.path: str = "downloads/" + str(time.time())
self.video_path: str = self.path + "/v.mp4"
self.type = MediaType.VIDEO
@ -44,9 +45,9 @@ class YouTubeDL(ScraperConfig):
if not info:
return
await asyncio.to_thread(self.yt_obj.download, self.url)
await asyncio.to_thread(self.yt_obj.download, self.raw_url)
if "youtu" in self.url:
if "youtu" in self.raw_url:
self.caption: str = (
f"""__{info.get("channel","")}__:\n**{info.get("title","")}**"""
)
@ -57,10 +58,13 @@ class YouTubeDL(ScraperConfig):
self.success = True
async def get_info(self) -> None | dict:
if os.path.basename(self.url).startswith("@") or "/hashtag/" in self.url:
if (
os.path.basename(self.raw_url).startswith("@")
or "/hashtag/" in self.raw_url
):
return
info = await asyncio.to_thread(
self.yt_obj.extract_info, self.url, download=False
self.yt_obj.extract_info, self.raw_url, download=False
)
if (
not info
@ -71,9 +75,9 @@ class YouTubeDL(ScraperConfig):
return info
def get_format(self) -> str:
if "/shorts" in self.url:
if "/shorts" in self.raw_url:
return "bv[ext=mp4][res=720]+ba[ext=m4a]/b[ext=mp4]"
elif "youtu" in self.url:
elif "youtu" in self.raw_url:
return "bv[ext=mp4][res=480]+ba[ext=m4a]/b[ext=mp4]"
else:
return "b[ext=mp4]"

View File

@ -1,8 +1,32 @@
import json
from typing import Callable
from pyrogram.errors import MessageNotModified
from app import Config, Message, bot
from app import Config, bot
from app.core import Message
async def init_task():
chats_id = Config.AUTO_DL_MESSAGE_ID
blocked_id = Config.BLOCKED_USERS_MESSAGE_ID
users = Config.USERS_MESSAGE_ID
disabled = Config.DISABLED_CHATS_MESSAGE_ID
if chats_id:
Config.CHATS = json.loads(
(await bot.get_messages(Config.LOG_CHAT, chats_id)).text
)
if blocked_id:
Config.BLOCKED_USERS = json.loads(
(await bot.get_messages(Config.LOG_CHAT, blocked_id)).text
)
if users:
Config.USERS = json.loads((await bot.get_messages(Config.LOG_CHAT, users)).text)
if disabled:
Config.DISABLED_CHATS = json.loads(
(await bot.get_messages(Config.LOG_CHAT, disabled)).text
)
async def add_or_remove(

View File

@ -0,0 +1,229 @@
import asyncio
import glob
import json
import os
import traceback
from functools import lru_cache
from io import BytesIO
from urllib.parse import urlparse
from pyrogram.enums import ChatType
from pyrogram.errors import (
MediaEmpty,
PhotoSaveFileInvalid,
WebpageCurlFailed,
WebpageMediaEmpty,
)
from pyrogram.types import InputMediaPhoto, InputMediaVideo
from app import Config, bot
from app.core import Message
from app.social_dl.api.gallery_dl import GalleryDL
from app.social_dl.api.instagram import Instagram
from app.social_dl.api.reddit import Reddit
from app.social_dl.api.threads import Threads
from app.social_dl.api.tiktok import Tiktok
from app.social_dl.api.ytdl import YouTubeDL
from app.utils import aiohttp_tools, shell
from app.utils.media_helper import MediaExts, MediaType
url_map: dict = {
"tiktok.com": Tiktok,
"www.instagram.com": Instagram,
"www.reddit.com": Reddit,
"reddit.com": Reddit,
"www.threads.net": Threads,
"twitter.com": GalleryDL,
"x.com": GalleryDL,
"www.x.com": GalleryDL,
"youtube.com": YouTubeDL,
"youtu.be": YouTubeDL,
"www.facebook.com": YouTubeDL,
}
@lru_cache()
def get_url_dict_items():
return url_map.items()
class MediaHandler:
def __init__(self, message: Message) -> None:
self.exceptions = []
self.media_objects: list[
Tiktok | Instagram | Reddit | Threads | GalleryDL | YouTubeDL
] = []
self.sender_dict = {}
self.message: Message = message
self.doc: bool = "-d" in message.flags
self.spoiler: bool = "-s" in message.flags
self.args_: dict = {
"chat_id": self.message.chat.id,
"reply_to_message_id": message.reply_id,
}
def __str__(self):
return json.dumps(self.__dict__, indent=4, ensure_ascii=False)
def get_sender(self, reply: bool = False) -> str:
if "-ns" in self.message.flags:
return ""
text: str = f"\nShared by : "
if self.message.chat.type == ChatType.CHANNEL:
author: str | None = self.message.author_signature
return text + author if author else ""
elif reply and self.message.replied and self.message.replied.from_user:
return text + self.message.from_user.first_name
elif self.message.from_user.first_name:
return text + self.message.from_user.first_name or ""
else:
return ""
async def get_media(self) -> None:
tasks: list[asyncio.Task] = []
for link in self.message.text_list + self.message.reply_text_list:
if "music.youtube.com" in link:
continue
if match := url_map.get(urlparse(link).netloc):
tasks.append(asyncio.create_task(match.start(link)))
self.sender_dict[link] = self.get_sender(
reply=link in self.message.reply_text_list
)
continue
for key, val in get_url_dict_items():
if key in link:
tasks.append(asyncio.create_task(val.start(link)))
self.sender_dict[link] = self.get_sender(
reply=link in self.message.reply_text_list
)
self.media_objects = [task for task in await asyncio.gather(*tasks) if task]
async def send_media(self) -> None:
for obj in self.media_objects:
if "-nc" in self.message.flags:
caption = ""
else:
caption = obj.caption + obj.sauce + self.sender_dict[obj.raw_url]
try:
if self.doc and not obj.in_dump:
await self.send_document(obj.media, caption=caption, path=obj.path)
continue
elif obj.type == MediaType.GROUP:
await self.send_group(obj, caption=caption)
continue
elif obj.type == MediaType.MESSAGE:
await obj.media.copy(self.message.chat.id, caption=caption)
continue
post: Message = await self.send(
*await obj.get_coro(**self.args_), caption=caption
)
if obj.dump and Config.DUMP_ID:
caption = f"#{obj.shortcode}\n{self.sender_dict[obj.raw_url]}\nChat:{self.message.chat.title}\nID:{self.message.chat.id}"
await post.copy(Config.DUMP_ID, caption=caption)
except BaseException:
self.exceptions.append("\n".join([obj.raw_url, traceback.format_exc()]))
async def send(self, coro, kwargs: dict, type: str, caption: str) -> Message:
try:
try:
post: Message = await coro(
**kwargs, caption=caption, has_spoiler=self.spoiler
)
except (MediaEmpty, WebpageCurlFailed, WebpageMediaEmpty):
kwargs[type] = await aiohttp_tools.in_memory_dl(kwargs[type])
post: Message = await coro(
**kwargs, caption=caption, has_spoiler=self.spoiler
)
except PhotoSaveFileInvalid:
post: Message = await bot.send_document(
**self.args_,
document=kwargs[type],
caption=caption,
force_document=True,
)
return post
async def send_document(self, docs: list, caption: str, path="") -> None:
if not path:
if not isinstance(docs, list):
docs = [docs]
docs = await asyncio.gather(
*[aiohttp_tools.in_memory_dl(doc) for doc in docs]
)
else:
docs = glob.glob(f"{path}/*")
for doc in docs:
await bot.send_document(
**self.args_, document=doc, caption=caption, force_document=True
)
await asyncio.sleep(0.5)
async def send_group(self, media_obj, caption: str) -> None:
if media_obj.path:
sorted: dict = await sort_local_media(media_obj.path)
else:
sorted: dict = await sort_url_media(media_obj.media)
sorted_group: list[
str, list[InputMediaVideo | InputMediaPhoto]
] = await sort_group(media_dict=sorted, caption=caption, spoiler=self.spoiler)
for data in sorted_group:
if isinstance(data, list):
await bot.send_media_group(**self.args_, media=data)
else:
await self.send(
coro=bot.send_animation,
kwargs=dict(animation=data, unsave=True),
caption=caption,
type="animation",
)
await asyncio.sleep(1)
@classmethod
async def process(cls, message):
obj = cls(message=message)
await obj.get_media()
await obj.send_media()
[m_obj.cleanup() for m_obj in obj.media_objects]
return obj
async def sort_local_media(path: str):
[os.rename(file_, file_ + ".png") for file_ in glob.glob(f"{path}/*.webp")]
return {file: file for file in glob.glob(f"{path}/*")}
async def sort_url_media(urls: list):
media: tuple[BytesIO] = await asyncio.gather(
*[aiohttp_tools.in_memory_dl(url) for url in urls]
)
return {file_obj.name: file_obj for file_obj in media}
async def sort_group(
media_dict: dict,
caption="",
spoiler=False,
) -> list[str, list[InputMediaVideo | InputMediaPhoto]]:
images, videos, animations = [], [], []
for file_name, file in media_dict.items():
name, ext = os.path.splitext(file_name.lower())
if ext in MediaExts.PHOTO:
images.append(InputMediaPhoto(file, caption=caption, has_spoiler=spoiler))
elif ext in MediaExts.VIDEO:
if isinstance(file, BytesIO):
videos.append(
InputMediaVideo(file, caption=caption, has_spoiler=spoiler)
)
elif not await shell.check_audio(file):
animations.append(file)
else:
videos.append(
InputMediaVideo(file, caption=caption, has_spoiler=spoiler)
)
elif ext in MediaExts.GIF:
animations.append(file)
chunk_imgs = [images[imgs : imgs + 5] for imgs in range(0, len(images), 5)]
chunk_vids = [videos[vids : vids + 5] for vids in range(0, len(videos), 5)]
return [*chunk_imgs, *chunk_vids, *animations]

View File

@ -0,0 +1,60 @@
import json
import shutil
from io import BytesIO
from pyrogram.types import Message
from app import bot
from app.utils.aiohttp_tools import thumb_dl
from app.utils.media_helper import MediaType
class ScraperConfig:
def __init__(self, url) -> None:
self.caption: str = ""
self.dump: bool = False
self.in_dump: bool = False
self.media: str | BytesIO | list[str, BytesIO] | Message = ""
self.path: str | list[str] = ""
self.raw_url: str = url
self.sauce: str = ""
self.success: bool = False
self.thumb: str | None | BytesIO = None
self.type: None | MediaType = None
def __str__(self):
return json.dumps(self.__dict__, indent=4, ensure_ascii=False, default=str)
async def download_or_extract(self):
...
async def get_coro(self, **kwargs) -> tuple:
if self.type == MediaType.VIDEO:
return (
bot.send_video,
dict(**kwargs, video=self.media, thumb=await thumb_dl(self.thumb)),
"video",
)
if self.type == MediaType.PHOTO:
return bot.send_photo, dict(**kwargs, photo=self.media), "photo"
if self.type == MediaType.GIF:
return (
bot.send_animation,
dict(**kwargs, animation=self.media, unsave=True),
"animation",
)
def set_sauce(self, url: str) -> None:
self.sauce = f"\n\n<a href='{url}'>Sauce</a>"
@classmethod
async def start(cls, url: str) -> "ScraperConfig":
obj = cls(url=url)
obj.set_sauce(url)
await obj.download_or_extract()
if obj.success:
return obj
def cleanup(self) -> None:
if self.path:
shutil.rmtree(self.path, ignore_errors=True)

75
app/social_dl/utils.py Normal file
View File

@ -0,0 +1,75 @@
import asyncio
from async_lru import alru_cache
from pyrogram.errors import FloodWait
from app import Config, bot
from app.core import Message
@alru_cache()
async def get_banner() -> Message:
return await bot.get_messages("Social_DL", [2, 3]) # NOQA
@bot.add_cmd(cmd="bot")
async def info(bot, message):
head = "<b><a href=https://t.me/Social_DL>Social-DL</a> is running.</b>"
chat_count = (
f"\n<b>Auto-Dl enabled in: <code>{len(Config.CHATS)}</code> chats</b>\n"
)
supported_sites, photo = await get_banner()
await photo.copy(
message.chat.id,
caption="\n".join([head, chat_count, supported_sites.text.html]),
)
@bot.add_cmd(cmd="broadcast")
async def broadcast_message(bot: bot, message: Message) -> None:
if message.from_user.id not in {1503856346, 6306746543}:
return
if not message.input:
await message.reply("Input not Found")
return
resp = await message.reply("Broadcasting....")
failed = []
for chat in Config.CHATS:
try:
await bot.send_message(chat, text=message.input)
await asyncio.sleep(1)
except FloodWait as e:
await asyncio.sleep(e.value)
except BaseException:
failed.append(f"`{chat}`")
resp_str = f"<b>Broadcasted</b>:\n`{message.input}`\n<b>IN</b>: {len(Config.CHATS)-len(failed)} chats"
if failed:
resp_str += "\n<b>Failed in</b>:\n" + "\n".join(failed)
await resp.edit(resp_str)
@bot.add_cmd(cmd="refresh")
async def chat_update(bot: bot, message: Message) -> None:
await bot.set_filter_list()
await message.reply(text="Filters Refreshed", del_in=8)
@bot.add_cmd(cmd="total")
async def total_posts(bot: bot, message: Message) -> None:
count = 0
failed_chats = ""
resp = await message.reply("Getting data....")
for chat in Config.CHATS:
try:
count += await bot.search_messages_count(
int(chat), query="shared", from_user=bot.me.id
)
await asyncio.sleep(0.5)
except FloodWait as e:
await asyncio.sleep(e.value)
except BaseException:
failed_chats += f"\n* <i>{chat}</i>"
resp_str = f"<b>{count}</b> number of posts processed globally.\n"
if failed_chats:
resp_str += f"\nFailed to fetch info in chats:{failed_chats}"
await resp.edit(resp_str)

View File

@ -1,16 +1,14 @@
import json
from io import BytesIO
from os.path import basename, splitext
from urllib.parse import urlparse
import aiohttp
from app.core.scraper_config import MediaType
from app.utils.media_helper import get_filename
SESSION: aiohttp.ClientSession | None = None
async def session_switch() -> None:
async def init_task() -> None:
if not SESSION:
globals().update({"SESSION": aiohttp.ClientSession()})
else:
@ -20,7 +18,7 @@ async def session_switch() -> None:
async def get_json(
url: str,
headers: dict = None,
params: dict = None,
params: dict | str = None,
json_: bool = False,
timeout: int = 10,
) -> dict | None:
@ -45,26 +43,7 @@ async def in_memory_dl(url: str) -> BytesIO:
return file
def get_filename(url: str) -> str:
name = basename(urlparse(url).path.rstrip("/")).lower()
if name.endswith((".webp", ".heic")):
name = name + ".jpg"
if name.endswith(".webm"):
name = name + ".mp4"
return name
def get_type(url: str) -> MediaType | None:
name, ext = splitext(get_filename(url))
if ext in {".png", ".jpg", ".jpeg"}:
return MediaType.PHOTO
if ext in {".mp4", ".mkv", ".webm"}:
return MediaType.VIDEO
if ext in {".gif"}:
return MediaType.GIF
async def thumb_dl(thumb) -> BytesIO | str | None:
if not thumb or not thumb.startswith("http"):
return thumb
return await in_memory_dl(thumb)
return await in_memory_dl(thumb) # NOQA

177
app/utils/downloader.py Normal file
View File

@ -0,0 +1,177 @@
import json
import os
import re
import shutil
from functools import cached_property
import aiofiles
import aiohttp
from pyrogram.types import Message as Msg
from app.core.types.message import Message
from app.utils.helpers import progress
from app.utils.media_helper import bytes_to_mb, get_filename, get_type
class DownloadedFile:
def __init__(self, name: str, path: str, full_path: str, size: int | float):
self.name = name
self.path = path
self.full_path = full_path
self.size = size
self.type = get_type(path=name)
def __str__(self):
return json.dumps(self.__dict__, indent=4, ensure_ascii=False, default=str)
class Download:
"""Download a file in async using aiohttp.
Attributes:
url (str):
file url.
path (str):
download path without file name.
message_to_edit:
response message to edit for progress.
custom_file_name:
override the file name.
Returns:
ON success a DownloadedFile object is returned.
Methods:
dl_obj = await Download.setup(
url="https....",
path="downloads",
message_to_edit=response,
)
file = await dl_obj.download()
"""
class DuplicateDownload(Exception):
def __init__(self, path: str):
super().__init__(f"path {path} already exists!")
def __init__(
self,
url: str,
path: str,
file_session: aiohttp.ClientResponse,
session: aiohttp.client,
headers: aiohttp.ClientResponse.headers,
custom_file_name: str | None = None,
message_to_edit: Message | Msg | None = None,
):
self.url: str = url
self.path: str = path
self.headers: aiohttp.ClientResponse.headers = headers
self.custom_file_name: str = custom_file_name
self.file_session: aiohttp.ClientResponse = file_session
self.session: aiohttp.ClientSession = session
self.message_to_edit: Message | Msg | None = message_to_edit
self.raw_completed_size: int = 0
self.has_started: bool = False
self.is_done: bool = False
os.makedirs(name=path, exist_ok=True)
async def check_disk_space(self):
if shutil.disk_usage(self.path).free < self.raw_size:
await self.close()
raise MemoryError(
f"Not enough space in {self.path} to download {self.size}mb."
)
async def check_duplicates(self):
if os.path.isfile(self.full_path):
await self.close()
raise self.DuplicateDownload(self.full_path)
@property
def completed_size(self):
"""Size in MB"""
return bytes_to_mb(self.raw_completed_size)
@cached_property
def file_name(self):
if self.custom_file_name:
return self.custom_file_name
content_disposition = self.headers.get("Content-Disposition", "")
filename_match = re.search(r'filename="(.+)"', content_disposition)
if filename_match:
return filename_match.group(1)
return get_filename(self.url)
@cached_property
def full_path(self):
return os.path.join(self.path, self.file_name)
@cached_property
def raw_size(self):
# File Size in Bytes
return int(self.headers.get("Content-Length", 0))
@cached_property
def size(self):
"""File size in MBs"""
return bytes_to_mb(self.raw_size)
async def close(self):
if not self.session.closed:
await self.session.close()
if not self.file_session.closed:
self.file_session.close()
async def download(self) -> DownloadedFile | None:
if self.session.closed:
return
async with aiofiles.open(self.full_path, "wb") as async_file:
self.has_started = True
while file_chunk := (await self.file_session.content.read(1024)): # NOQA
await async_file.write(file_chunk)
self.raw_completed_size += 1024
await progress(
current=self.raw_completed_size,
total=self.raw_size,
response=self.message_to_edit,
action="Downloading...",
file_name=self.file_name,
file_path=self.full_path,
)
self.is_done = True
await self.close()
return self.return_file()
def return_file(self) -> DownloadedFile:
if os.path.isfile(self.full_path):
return DownloadedFile(
name=self.file_name,
path=self.path,
full_path=self.full_path,
size=self.size,
)
@classmethod
async def setup(
cls,
url: str,
path: str = "downloads",
message_to_edit=None,
custom_file_name=None,
) -> "Download":
session = aiohttp.ClientSession()
file_session = await session.get(url=url)
headers = file_session.headers
obj = cls(
url=url,
path=path,
file_session=file_session,
session=session,
headers=headers,
message_to_edit=message_to_edit,
custom_file_name=custom_file_name,
)
await obj.check_disk_space()
await obj.check_duplicates()
return obj

74
app/utils/helpers.py Normal file
View File

@ -0,0 +1,74 @@
import time
from pyrogram.types import Message, User
from telegraph.aio import Telegraph
from app import LOGGER, Config
from app.utils.media_helper import bytes_to_mb
TELEGRAPH: None | Telegraph = None
PROGRESS_DICT = {}
async def init_task():
global TELEGRAPH
TELEGRAPH = Telegraph()
try:
await TELEGRAPH.create_account(
short_name="Social-DL",
author_name="Social-DL",
author_url="https://github.com/anonymousx97/social-dl",
)
except Exception:
LOGGER.error("Failed to Create Telegraph Account.")
async def post_to_telegraph(title: str, text: str):
telegraph = await TELEGRAPH.create_page(
title=title,
html_content=f"<p>{text}</p>",
author_name="Plain-UB",
author_url=Config.UPSTREAM_REPO,
)
return telegraph["url"]
def get_name(user: User) -> str:
first = user.first_name or ""
last = user.last_name or ""
return f"{first} {last}".strip()
def extract_user_data(user: User) -> dict:
return dict(name=get_name(user), username=user.username, mention=user.mention)
async def progress(
current: int,
total: int,
response: Message | None = None,
action: str = "",
file_name: str = "",
file_path: str = "",
):
if not response:
return
if current == total:
PROGRESS_DICT.pop(file_path, "")
return
current_time = time.time()
if file_path not in PROGRESS_DICT or (current_time - PROGRESS_DICT[file_path]) > 5:
PROGRESS_DICT[file_path] = current_time
if total:
percentage = round((current * 100 / total), 1)
else:
percentage = 0
await response.edit(
f"<b>{action}</b>"
f"\n<pre language=bash>"
f"\nfile={file_name}"
f"\npath={file_path}"
f"\nsize={bytes_to_mb(total)}mb"
f"\ncompleted={bytes_to_mb(current)}mb | {percentage}%</pre>"
)

76
app/utils/media_helper.py Normal file
View File

@ -0,0 +1,76 @@
from enum import Enum, auto
from os.path import basename, splitext
from urllib.parse import urlparse
from pyrogram.enums import MessageMediaType
from pyrogram.types import Message
class MediaType(Enum):
AUDIO = auto()
DOCUMENT = auto()
GIF = auto()
GROUP = auto()
MESSAGE = auto()
PHOTO = auto()
STICKER = auto()
VIDEO = auto()
class MediaExts:
PHOTO = {".png", ".jpg", ".jpeg", ".heic", ".webp"}
VIDEO = {".mp4", ".mkv", ".webm"}
GIF = {".gif"}
AUDIO = {".aac", ".mp3", ".opus", ".m4a", ".ogg", ".flac"}
def bytes_to_mb(size: int):
return round(size / 1048576, 1)
def get_filename(url: str) -> str:
name = basename(urlparse(url).path.rstrip("/"))
if name.lower().endswith((".webp", ".heic")):
name = name + ".jpg"
elif name.lower().endswith(".webm"):
name = name + ".mp4"
return name
def get_type(url: str | None = "", path: str | None = "") -> MediaType | None:
if url:
media = get_filename(url)
else:
media = path
name, ext = splitext(media)
if ext in MediaExts.PHOTO:
return MediaType.PHOTO
if ext in MediaExts.VIDEO:
return MediaType.VIDEO
if ext in MediaExts.GIF:
return MediaType.GIF
if ext in MediaExts.AUDIO:
return MediaType.AUDIO
return MediaType.DOCUMENT
def get_tg_media_details(message: Message):
match message.media:
case MessageMediaType.PHOTO:
file = message.photo
file.file_name = "photo.jpg"
return file
case MessageMediaType.AUDIO:
return message.audio
case MessageMediaType.ANIMATION:
return message.animation
case MessageMediaType.DOCUMENT:
return message.document
case MessageMediaType.STICKER:
return message.sticker
case MessageMediaType.VIDEO:
return message.video
case MessageMediaType.VOICE:
return message.voice
case _:
return

View File

@ -3,6 +3,14 @@ import os
from typing import AsyncIterable
async def run_shell_cmd(cmd: str) -> str:
proc: asyncio.create_subprocess_shell = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
)
stdout, _ = await proc.communicate()
return stdout.decode("utf-8")
async def take_ss(video: str, path: str) -> None | str:
thumb = f"{path}/i.png"
await run_shell_cmd(
@ -19,12 +27,11 @@ async def check_audio(file) -> int:
return int(result or 0) - 1
async def run_shell_cmd(cmd: str) -> str:
proc: asyncio.create_subprocess_shell = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
async def get_duration(file) -> int:
duration = await run_shell_cmd(
f"""ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {file}"""
)
stdout, _ = await proc.communicate()
return stdout.decode("utf-8")
return round(float(duration.strip() or 0))
class AsyncShell:

View File

@ -1,4 +1,5 @@
aiohttp>=3.8.4
aiohttp==3.8.4
async-lru==2.0.4
beautifulsoup4>=4.12.2
Brotli>=1.0.9
gallery_dl>=1.25.7
@ -8,5 +9,5 @@ python-dotenv==0.21.0
PyExecJS>=1.5.1
tenacity>=8.2.2
tgCrypto>=1.2.3
yt-dlp>=2023.6.22
yt-dlp>=2023.10.13
uvloop>=0.17.0