diff --git a/sample-config.env b/sample-config.env index 6a9486e..80b9b39 100644 --- a/sample-config.env +++ b/sample-config.env @@ -2,12 +2,14 @@ API_ID=123456 API_HASH="abcd1238fn...." -STRING_SESSION="Ab0fbs......." +DEV_MODE = "no" LOG=-10012345678 MESSAGE=12345 +STRING_SESSION="Ab0fbs......." + TRIGGER="." USERS=[12345678] # Multiple user IDs should be separated by , \ No newline at end of file diff --git a/socialbot.py b/socialbot.py index db08b20..9cdbc3e 100644 --- a/socialbot.py +++ b/socialbot.py @@ -1,3 +1,31 @@ +""" + +* socialbot.py Main Logic file of Bot. + +MIT License + +Copyright (c) 2023 Ryuk + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +""" + + import asyncio import base64 import glob @@ -7,14 +35,16 @@ import shutil import sys import time import traceback -from subprocess import call +from io import StringIO +from urllib.parse import urlparse as url_p import aiohttp import yt_dlp +from aiohttp_retry import ExponentialRetry, RetryClient from dotenv import load_dotenv from pyrogram import Client, filters, idle -from pyrogram.enums import ChatType -from pyrogram.errors import MediaEmpty, PeerIdInvalid, PhotoSaveFileInvalid, WebpageCurlFailed +from pyrogram.enums import ChatType, ParseMode +from pyrogram.errors import MediaEmpty, PhotoSaveFileInvalid, WebpageCurlFailed from pyrogram.handlers import MessageHandler from pyrogram.types import InputMediaPhoto, InputMediaVideo, Message from wget import download @@ -22,31 +52,138 @@ from wget import download if os.path.isfile("config.env"): load_dotenv("config.env") -bot = Client(name="bot", session_string=os.environ.get("STRING_SESSION"), api_id=os.environ.get("API_ID"), api_hash=os.environ.get("API_HASH")) -log_chat = os.environ.get("LOG") -if log_chat is None: +bot = Client( + name="bot", + session_string=os.environ.get("STRING_SESSION"), + api_id=os.environ.get("API_ID"), + api_hash=os.environ.get("API_HASH"), + in_memory=True, + parse_mode=ParseMode.DEFAULT, +) + +LOG_CHAT = os.environ.get("LOG") + +if LOG_CHAT is None: print("Enter log channel id in config") exit() -chat_list = [] -handler_ = [] -users = json.loads(os.environ.get("USERS")) -trigger = os.environ.get("TRIGGER") -e_json = base64.b64decode("Lz9fX2E9MSZfX2Q9MQ==").decode("utf-8") + +USERS = json.loads(os.environ.get("USERS")) + +TRIGGER = os.environ.get("TRIGGER") + +E_JSON = base64.b64decode("Lz9fX2E9MSZfX2Q9MQ==").decode("utf-8") -@bot.on_message(filters.command(commands="bot", prefixes=trigger) & filters.user(users)) +# BOT Section + +@bot.on_edited_message(filters.command(commands="dl", prefixes=TRIGGER) & filters.user(USERS)) +@bot.on_message(filters.command(commands="dl", prefixes=TRIGGER) & filters.user(USERS)) +async def dl(bot, message: Message): + parsed = MESSAGE_PARSER(message) + if not parsed.coro_: + return + status = "failed" + msg_ = await bot.send_message(chat_id=message.chat.id, text="`trying to download...`") + for coroutine_ in parsed.coro_: + media_dict = await coroutine_ + if isinstance(media_dict, dict) and "media" in media_dict: + status = await send_media(message=message, data=media_dict, doc=parsed.doc, caption=parsed.caption) + if status == "failed": + return await msg_.edit(f"Media Download Failed.") + await message.delete() + await msg_.delete() + + +# Parse Message text and return coroutine for matched links +class MESSAGE_PARSER: + def __init__(self, message): + self.text_list = message.text.split() + self.flags = [i for i in self.text_list if i.startswith("-")] + self.sender = message.author_signature or (user.first_name if (user := message.from_user) else "") + self.caption = f"Shared by : {self.sender}" + self.doc = "-d" in self.flags + self.coro_ = [] + self.match_links() + + # Thanks Jeel Patel [TG @jeelpatel231] for url map concept. + def match_links(self): + url_map = { + "tiktok.com": yt_dl, + "www.instagram.com": instagram_dl, + "youtube.com/shorts": yt_dl, + "twitter.com": yt_dl, + "www.reddit.com": reddit_dl, + } + for link in self.text_list: + if (match := url_map.get(url_p(link).netloc)): + self.coro_.append(match(url=link,doc=self.doc, caption=self.caption)) + else: + for key, val in url_map.items(): + if key in link: + self.coro_.append(val(url=link,doc=self.doc, caption=self.caption)) + + + +# Send media back +async def send_media(message: Message, data: dict, caption: str, doc: bool = False): + reply = message.reply_to_message + reply_id = reply.id if reply else None + media = data.get("media") + thumb = data.get("thumb", None) + caption = data.get("caption", "") + is_image, is_video, is_animation, is_grouped = (data.get("is_image"), data.get("is_video"), data.get("is_animation"), data.get("is_grouped")) + status = "failed" + args_ = {"chat_id": message.chat.id, "reply_to_message_id": reply_id} + if isinstance(media, list): + for vv in media: + try: + if isinstance(vv, list): + status = await bot.send_media_group(**args_, media=vv) + await asyncio.sleep(2) + elif doc: + status = await bot.send_document(**args_, caption=caption, document=vv, force_document=True) + else: + status = await bot.send_animation(**args_, caption=caption, animation=vv, unsave=True) + except Exception: + await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc())) + else: + args_.update({"caption": caption}) + try: + if is_image: + status = await bot.send_photo(**args_, photo=media) + elif is_video: + status = await bot.send_video(**args_, video=media, thumb=thumb) + elif is_animation: + status = await bot.send_animation(**args_, animation=media, unsave=True) + else: + status = await bot.send_document(**args_, document=media, force_document=True) + except PhotoSaveFileInvalid: + await bot.send_document(**args_, document=media, force_document=True) + except (MediaEmpty, WebpageCurlFailed, ValueError): + pass + if os.path.exists(str(data["path"])): + shutil.rmtree(str(data["path"])) + if status != "failed": + return "done" + return status + + + +@bot.on_edited_message(filters.command(commands="bot", prefixes=TRIGGER) & filters.user(USERS)) +@bot.on_message(filters.command(commands="bot", prefixes=TRIGGER) & filters.user(USERS)) async def multi_func(bot, message: Message): rw_message = message.text.split() try: + # Restart if "restart" in rw_message: - """Restart bot""" + await SESSION.close() + await RETRY_CLIENT.close() os.execl(sys.executable, sys.executable, __file__) + # Get chat / channel id elif "ids" in rw_message: - """Get chat / channel id""" - ids = "" - reply = message.reply_to_message - if reply: + if (reply := message.reply_to_message): + ids = "" reply_forward = reply.forward_from_chat reply_user = reply.from_user ids += f"Chat : `{reply.chat.id}`\n" @@ -58,26 +195,24 @@ async def multi_func(bot, message: Message): ids = f"Chat :`{message.chat.id}`" await message.reply(ids) + # Update Auto-DL chats elif "update" in rw_message: - """Update Auto-DL chats""" - for i in handler_: - bot.remove_handler(*i) + bot.remove_handler(*HANDLER_) await add_h() await message.reply("Chat list refreshed") + # Join a chat elif "join" in rw_message: - """Join a Chat""" if len(rw_message) > 2: try: await bot.join_chat(rw_message[-1]) except KeyError: - await bot.join_chat(rw_message[-1].split()[-1]) + await bot.join_chat(os.path.basename(rw_message[-1]).strip()) except Exception as e: return await message.reply(str(e)) await message.reply("Joined") - + # Leave a chat elif "leave" in rw_message: - """Leave a Chat""" if len(rw_message) == 3: chat = rw_message[-1] else: @@ -85,218 +220,204 @@ async def multi_func(bot, message: Message): await bot.leave_chat(chat) else: - await message.reply("Social-DL is running.") + await message.reply("Social-DL is running") except Exception: - await bot.send_message(chat_id=log_chat, text=str(traceback.format_exc())) + await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc())) -@bot.on_message(filters.command(commands="term", prefixes=trigger) & filters.user(users)) -async def run_cmd(bot, message: Message): - """Function to run shell commands""" - cmd = message.text.replace("+term", "") - status_ = await message.reply("executing...") - process = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) - stdout, stderr = await process.communicate() - - if process.returncode is not None: - output = f"${cmd}" - if stdout: - output += f"\n\n**Output:**\n\n`{stdout.decode('utf-8')}`" - if stderr: - output += f"\n\n**Error:**\n\n`{stderr.decode('utf-8')}`" - await status_.edit(output) - - -@bot.on_message(filters.command(commands="del", prefixes=trigger) & filters.user(users)) +# Delete replied and command message +@bot.on_message(filters.command(commands="del", prefixes=TRIGGER) & filters.user(USERS)) async def delete_message(bot, message: Message): - """Delete Messages""" reply = message.reply_to_message await message.delete() if reply: await reply.delete() -@bot.on_message(filters.command(commands="dl", prefixes=trigger) & filters.user(users)) -async def dl(bot, message: Message): - """The main Logic Function to download media""" - rw_message = message.text.split() +# Delete Multiple messages from replied to command. +@bot.on_message(filters.command(commands="purge", prefixes=TRIGGER) & filters.user(USERS)) +async def purge_(bot, message: Message): reply = message.reply_to_message - reply_id = reply.id if reply else None - sender_ = message.author_signature or message.from_user.first_name or "" - response = await bot.send_message(message.chat.id, "`trying to download...`") - curse_ = "" - caption = f"Shared by : {sender_}" - check_dl = "failed" - if "-d" in rw_message: - doc = True - else: - doc = False - for i in rw_message: - if i.startswith("https://www.instagram.com/"): - check_dl = await iyt_dl(url=i) - curse_ = "#FuckInstagram" - if check_dl == "failed": - check_dl = await json_dl(iurl=i, caption=caption, doc=doc) - elif "twitter.com" in i or "https://youtube.com/shorts" in i or "tiktok.com" in i: - check_dl = await iyt_dl(url=i) - elif "www.reddit.com" in i: - check_dl = await reddit_dl(url_=i, doc=doc, sender_=sender_) - curse_ = "Link doesn't contain any media or is restricted\nTip: Make sure you are sending original post url and not an embedded post." - else: - pass - if isinstance(check_dl, dict): - if isinstance(check_dl["media"], list): - for vv in check_dl["media"]: - if isinstance(vv, list): - await bot.send_media_group(message.chat.id, media=vv, reply_to_message_id=reply_id) - await asyncio.sleep(3) - else: - await bot.send_document(message.chat.id, document=vv, caption=check_dl["caption"] + caption, reply_to_message_id=reply_id, force_document=True) - else: - if doc: - await bot.send_document(message.chat.id, document=check_dl["media"], caption=check_dl["caption"] + caption, reply_to_message_id=reply_id, force_document=True) - else: - try: - if check_dl["type"] == "img": - await bot.send_photo(message.chat.id, photo=check_dl["media"], caption=check_dl["caption"] + caption, reply_to_message_id=reply_id) - elif check_dl["type"] == "vid": - await bot.send_video(message.chat.id, video=check_dl["media"], caption=check_dl["caption"] + caption, thumb=check_dl["thumb"], reply_to_message_id=reply_id) - else: - await bot.send_animation(message.chat.id, animation=check_dl["media"], caption=check_dl["caption"] + caption, reply_to_message_id=reply_id, unsave=True) - except PhotoSaveFileInvalid: - await bot.send_document(message.chat.id, document=check_dl["media"], caption=check_dl["caption"] + caption, reply_to_message_id=reply_id) - except (MediaEmpty, WebpageCurlFailed, ValueError): - pass - if os.path.exists(str(check_dl["path"])): - shutil.rmtree(str(check_dl["path"])) - check_dl = "done" - if check_dl == "failed": - await response.edit(f"Media Download Failed.\n{curse_}") - if check_dl == "done": - await message.delete() - await response.delete() + if not reply: + return await message.reply("reply to a message") + start_message = reply.id + end_message = message.id + messages = [end_message] + [i for i in range(int(start_message), int(end_message))] + await bot.delete_messages(chat_id=message.chat.id, message_ids=messages, revoke=True) -async def iyt_dl(url: str): - """Stop handling post url because this only downloads Videos and post might contain images""" - if not url.startswith("https://www.instagram.com/reel/"): +if os.environ.get("DEV_MODE") == "yes": + # Run shell commands + @bot.on_edited_message(filters.command(commands="sh", prefixes=TRIGGER) & filters.user(USERS)) + @bot.on_message(filters.command(commands="sh", prefixes=TRIGGER) & filters.user(USERS)) + async def run_cmd(bot, message: Message): + cmd = message.text.replace(f"{TRIGGER}sh ", "").strip() + status_ = await message.reply("executing...") + proc = await run_shell_cmd(cmd) + output = f"${cmd}" + if (stdout := proc.get("stdout")): + output += f"""\n\n**Output:**\n\n`{stdout}`""" + if (stderr := proc.get("stderr")): + output += f"""\n\n**Error:**\n\n`{stderr}`""" + await status_.edit(output,parse_mode=ParseMode.MARKDOWN) + + + # Run Python code + @bot.on_edited_message( + filters.command(commands="exec", prefixes=TRIGGER) & filters.user(USERS) + ) + @bot.on_message( + filters.command(commands="exec", prefixes=TRIGGER) & filters.user(USERS) + ) + async def executor_(bot, message): + code = message.text.replace(f"{TRIGGER}exec","").strip() + if not code: + return await message.reply("exec Jo mama?") + reply = await message.reply("executing") + sys.stdout = codeOut = StringIO() + sys.stderr = codeErr = StringIO() + # Indent code as per proper python syntax + formatted_code = "".join(["\n "+i for i in code.split("\n")]) + try: + # Create and initialise the function + exec(f"async def exec_(bot, message):{formatted_code}") + func_out = await locals().get("exec_")(bot, message) + except Exception: + func_out = str(traceback.format_exc()) + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + output = codeOut.getvalue().strip() or codeErr.getvalue().strip() or func_out or "" + await reply.edit(f"> `{code}`\n\n>> `{output}`",parse_mode=ParseMode.MARKDOWN) + + +# Add Auto-DL regex Handler +async def add_h(): + message_id = os.environ.get("MESSAGE") + if message_id is None: + print("\nEnter Message id in config.\n") + return 1 + try: + msg = (await bot.get_messages(int(LOG_CHAT), int(message_id))).text + except PeerIdInvalid: + print("\nLog channel not found.\nCheck the variable for mistakes") + return 1 + if msg is None: + print("\nMessage not found\nCheck variable for mistakes\n") + return 1 + try: + chats_list = [int(i) for i in msg.split()] + except ValueError: + print("\nThe message id is wrong. \nOr \nChat id message contains letters\nonly numerical ids are allowed.\n") + return 1 + social_handler = bot.add_handler( + MessageHandler( + dl, + ( + (filters.regex(r"^http*")) + & filters.chat(chats_list) + ), + ), + group=1, + ) + globals().update({"HANDLER_":social_handler}) + + +# Start the bot and wait idle without blocking the main loop +async def boot(): + check_handlers = await add_h() + msg = "#Social-dl\nStarted\n" + if check_handlers == 1: + msg += "\n* Running in command only mode. *" + print(msg) + await bot.send_message(chat_id=int(LOG_CHAT), text="#Social-dl\n__Started__") + globals().update({"SESSION":aiohttp.ClientSession()}) + globals().update({"RETRY_CLIENT":RetryClient(client_session=SESSION, retry_for_statuses={408, 504}, retry_options=ExponentialRetry(attempts=1))}) + await idle() + + + +# API Section + + +# Instagram +async def instagram_dl(url: str, caption: str, doc: bool = False): + args = locals() + # status = await instafix(message=message, link=i, caption=caption) + for i in [yt_dl, api_2]: + data = await i(**args) + if isinstance(data, dict): + break + return data + + +async def api_2(url: str, caption: str, doc: bool): + link = url.split("/?")[0] + E_JSON + response = await get_json(url=link) + if not response or "graphql" not in response: return "failed" - path_ = time.time() - video = f"{path_}/v.mp4" - thumb = f"{path_}/i.png" - _opts = {"outtmpl": video, "ignoreerrors": True, "ignore_no_formats_error": True, "format": "bv[ext=mp4]+ba[ext=m4a]/b[ext=mp4]", "quiet": True, "logger": FakeLogger()} - return_val = "failed" + return await parse_ghraphql( + response["graphql"]["shortcode_media"], caption=caption + "\n.." + ) + + +async def parse_ghraphql(json_: dict, caption: str, doc: bool = False): + try: + path = f"downloads/{time.time()}" + os.makedirs(path) + ret_dict = {"path": path, "thumb": None, "caption": caption} + type_check = json_.get("__typename",None) + if not type_check: + return "failed" + elif type_check == "GraphSidecar": + media = [] + for i in json_["edge_sidecar_to_children"]["edges"]: + if i["node"]["__typename"] == "GraphImage": + media.append(i["node"]["display_url"]) + if i["node"]["__typename"] == "GraphVideo": + media.append(i["node"]["video_url"]) + ret_dict.update({"is_grouped": False if doc else True, "media": await async_download(urls=media, path=path, doc=doc, caption=caption)}) + else: + media = json_.get("video_url") or json_.get("display_url") + ret_dict.update(**await get_media(url=media, path=path)) + except Exception: + await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc())) + return ret_dict + + +# YT-DLP for videos from multiple sites +async def yt_dl(url: str, caption: str, doc:bool=False): + if "instagram.com/p/" in url: + return + path = str(time.time()) + video = f"{path}/v.mp4" + _opts = { + "outtmpl": video, + "ignoreerrors": True, + "ignore_no_formats_error": True, + "quiet": True, + "logger": FakeLogger(), + } + if "shorts" in url: + _opts.update({"format": "bv[ext=mp4][res=480]+ba[ext=m4a]/b[ext=mp4]"}) + else: + _opts.update({"format": "bv[ext=mp4]+ba[ext=m4a]/b[ext=mp4]"}) + data = "failed" try: yt_dlp.YoutubeDL(_opts).download(url) if os.path.isfile(video): - call(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"''', shell=True) - return_val = {"path": str(path_), "type": "vid", "media": video, "thumb": thumb if os.path.isfile(thumb) else None, "caption": ""} + data = { + "path": path, + "is_video": True, + "media": video, + "thumb": await take_ss(video=video, path=path), + "caption": caption, + } except BaseException: pass - return return_val - - -async def json_dl(iurl: str, doc: bool, caption: str): - link = iurl.split("/?")[0] + e_json - async with (aiohttp.ClientSession() as csession, csession.get(link, timeout=10) as session): - try: - session_resp = await session.text() - rjson = json.loads(session_resp) - except json.decoder.JSONDecodeError: - return "failed" - if "require_login" in rjson: - return "failed" - - return_val = "failed" - if "graphql" in rjson: - try: - url = rjson["graphql"]["shortcode_media"] - d_dir = f"downloads/{time.time()}" - os.makedirs(d_dir) - if url["__typename"] == "GraphVideo": - url_ = url["video_url"] - wget_x = download(url_, d_dir) - call(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{wget_x}" -vframes 1 "{d_dir}/i.png"''', shell=True) - return_val = {"path": d_dir, "type": "vid", "media": wget_x, "thumb": d_dir + "/i.png", "caption": ""} - - if url["__typename"] == "GraphImage": - url_ = url["display_url"] - wget_x = download(url_, d_dir + "/i.jpg") - return_val = {"path": d_dir, "type": "img", "media": wget_x, "thumb": None, "caption": ""} - - if url["__typename"] == "GraphSidecar": - url_list = [] - for i in url["edge_sidecar_to_children"]["edges"]: - if i["node"]["__typename"] == "GraphImage": - url_list.append(i["node"]["display_url"]) - if i["node"]["__typename"] == "GraphVideo": - url_list.append(i["node"]["video_url"]) - downloads = await async_download(urls=url_list, path=d_dir, doc=doc, caption=caption + "\n..") - return_val = {"path": d_dir, "media": downloads} - except Exception: - await bot.send_message(chat_id=log_chat, text=str(traceback.format_exc())) - return return_val - - -async def reddit_dl(bot, message: Message): - link = url_.split("/?")[0] + ".json?limit=1" - headers = {"user-agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_8_7 rv:5.0; en-US) AppleWebKit/533.31.5 (KHTML, like Gecko) Version/4.0 Safari/533.31.5"} - return_val = "failed" - try: - async with (aiohttp.ClientSession() as session, session.get(link, headers=headers) as ss): - response = await ss.json() - json_ = response[0]["data"]["children"][0]["data"] - caption = f'__{json_["subreddit_name_prefixed"]}:__\n**{json_["title"]}**\n\n' - d_dir = str(time.time()) - os.mkdir(d_dir) - is_vid, is_gallery = json_.get("is_video"), json_.get("is_gallery") - - if is_vid: - video = f"{d_dir}/v.mp4" - thumb = f"{d_dir}/i.png" - vid_url = json_["secure_media"]["reddit_video"]["hls_url"] - call(f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {video}', shell=True) - call(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"''', shell=True) - return_val = {"path": d_dir, "type": "vid", "media": video, "thumb": thumb, "caption": caption} - - elif is_gallery: - grouped_media_urls = [f'https://i.redd.it/{i["media_id"]}.jpg' for i in json_["gallery_data"]["items"]] - downloads = await async_download(urls=grouped_media_urls, path=d_dir, doc=doc, caption=caption + f"Shared by : {sender_}") - return_val = {"path": d_dir, "media": downloads} - - else: - media_ = json_get("url_overridden_by_dest","").strip() - if media_.endswith((".jpg", ".jpeg", ".png", ".webp")): - img = download(media_, d_dir) - return_val = {"path": d_dir, "type": "img", "media": img, "thumb": None, "caption": caption} - elif media_.endswith(".gif"): - gif = download(media_, d_dir) - return_val = {"path": d_dir, "type": "animation", "media": gif, "thumb": None, "caption": caption} - else: - gif_url = json_.get("preview", {}).get("reddit_video_preview", {}).get("fallback_url") - if gif_url: - gif = download(gif_url, d_dir) - return_val = {"path": d_dir, "type": "animation", "media": gif, "thumb": None, "caption": caption} - - except Exception: - await bot.send_message(chat_id=log_chat, text=str(traceback.format_exc())) - return return_val - - -async def async_download(urls: list, path: str, doc: bool = False, caption: str = ""): - down_loads = await asyncio.gather(*[asyncio.to_thread(download, url, path) for url in urls]) - if doc: - return down_loads - [os.rename(file, file + ".png") for file in glob.glob(f"{path}/*.webp")] - files = [ i + ".png" if i.endswith(".webp") else i for i in down_loads ] - grouped_images = [InputMediaPhoto(img, caption=caption) for img in files if img.endswith((".png", ".jpg", ".jpeg"))] - grouped_videos = [InputMediaVideo(vid, caption=caption) for vid in files if vid.endswith((".mp4", ".mkv", ".webm"))] - return_list = [grouped_images[imgs : imgs + 5] for imgs in range(0, len(grouped_images), 5)] + [ - grouped_videos[vids : vids + 5] for vids in range(0, len(grouped_videos), 5) - ] - return return_list + return data +# To disable YT-DLP logging class FakeLogger(object): def debug(self, msg): pass @@ -308,50 +429,225 @@ class FakeLogger(object): pass -async def add_h(): - message_id = os.environ.get("MESSAGE") - if message_id is None: - print("Enter Message id in config.\n") - return 1 +# Reddit +async def reddit_dl(url: str, caption: str, doc: bool = False): + link = url.split("/?")[0] + ".json?limit=1" + headers = { + "user-agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_8_7 rv:5.0; en-US) AppleWebKit/533.31.5 (KHTML, like Gecko) Version/4.0 Safari/533.31.5" + } try: - msg = (await bot.get_messages(int(log_chat), int(message_id))).text - except PeerIdInvalid: - print("Log channel not found.\nCheck the variable for mistakes") - return 1 - chat_list.clear() - if msg is None: - print("Message not found\nCheck variable for mistakes\n") - return 1 + response = await get_json(url=link, headers=headers, json_=True) + if not response: + return "failed" + json_ = response[0]["data"]["children"][0]["data"] + caption = f'__{json_["subreddit_name_prefixed"]}:__\n**{json_["title"]}**\n\n' + caption + path = str(time.time()) + os.mkdir(path) + is_vid, is_gallery = json_.get("is_video"), json_.get("is_gallery") + data = {"path": path, "caption": caption} + if is_vid: + video = f"{path}/v.mp4" + vid_url = json_["secure_media"]["reddit_video"]["hls_url"] + await run_shell_cmd(f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {video}') + data.update({"is_video": True, "media": video, "thumb": await take_ss(video=video, path=path)}) + + elif is_gallery: + grouped_media_urls = [json_["media_metadata"][val]["s"]["u"].replace("preview", "i") for val in json_["media_metadata"]] + downloads = await async_download(urls=grouped_media_urls, path=path, doc=doc, caption=caption) + data.update({"is_grouped": True, "media": downloads}) + + else: + url_ = json_.get("preview", {}).get("reddit_video_preview", {}).get("fallback_url", "") or json_.get("url_overridden_by_dest", "").strip() + if not url_: + return "failed" + data.update(await get_media(url=url_, path=path)) + + except Exception: + await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc())) + return data + + +# Get Json response from APIs +async def get_json(url: str, headers: dict = None, params: dict = None, retry: bool = False, json_: bool = False, timeout: int = 10): + if retry: + client = RETRY_CLIENT + else: + client = SESSION try: - chats_list = [int(i) for i in msg.split()] - except ValueError: - print("Chat id message contains letters\nonly numerical ids are allowed.\nOr the message id is wrong.\n") - return 1 - chat_list.extend(chats_list) - social_handler = bot.add_handler( - MessageHandler( - dl,(( - filters.regex(r"^https://www.instagram.com/*") - | filters.regex(r"^https://youtube.com/shorts/*") - | filters.regex(r"^https://twitter.com/*") - | filters.regex(r"^https://vm.tiktok.com/*") - | filters.regex(r"^https://www.reddit.com/*") - ) & filters.chat(chat_list))), - group=1, - ) - handler_.append(social_handler) + async with client.get(url=url, headers=headers, params=params, timeout=timeout) as ses: + if json_: + ret_json = await ses.json() + else: + ret_json = json.loads(await ses.text()) + except (json.decoder.JSONDecodeError, aiohttp.ContentTypeError, asyncio.TimeoutError): + return + except Exception: + await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc())) + return + return ret_json -async def boot(): - check_handlers = await add_h() - msg = "#Social-dl\nStarted\n" - if check_handlers == 1: - msg += "Running in command only mode." - print(msg) - await bot.send_message(chat_id=int(log_chat), text=msg) - await idle() +# Download media and return it with media type +async def get_media(url: str, path: str): + down_load = download(url, path) + ret_dict = {"media": down_load} + if down_load.lower().endswith((".jpg", ".jpeg", ".png", ".webp")): + ret_dict["is_image"] = True + if down_load.lower().endswith(".webp"): + os.rename(down_load, down_load + ".jpg") + ret_dict.update({"media": down_load + ".jpg"}) + elif down_load.lower().endswith((".mkv", ".mp4", ".webm")): + ret_dict.update({"is_video": True, "thumb": await take_ss(video=down_load, path=path)}) + elif down_load.lower().endswith(".gif"): + ret_dict.update({"is_animation": True}) + else: + return {} + return ret_dict +# Download multiple media asynchronously to save time; +# Return it in a list or a list with smaller lists each containing upto 5 media. +async def async_download(urls: list, path: str, doc: bool = False, caption: str = ""): + down_loads = await asyncio.gather(*[asyncio.to_thread(download, url, path) for url in urls]) + if doc: + return down_loads + [os.rename(file, file + ".png") for file in glob.glob(f"{path}/*.webp")] + files = [i + ".png" if i.endswith(".webp") else i for i in down_loads] + grouped_images, grouped_videos, animations = [], [], [] + for file in files: + if file.endswith((".png", ".jpg", ".jpeg")): + grouped_images.append(InputMediaPhoto(file, caption=caption)) + if file.endswith((".mp4", ".mkv", ".webm")): + has_audio = await check_audio(file) + if not has_audio: + animations.append(file) + else: + grouped_videos.append(InputMediaVideo(file, caption=caption)) + return_list = [ + grouped_images[imgs : imgs + 5] for imgs in range(0, len(grouped_images), 5) + ] + [grouped_videos[vids : vids + 5] for vids in range(0, len(grouped_videos), 5) + ] + animations + return return_list + + +# Thumbnail +async def take_ss(video: str, path: str): + await run_shell_cmd(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{path}/i.png"''') + if os.path.isfile(path + "/i.png"): + return path + "/i.png" + + +async def check_audio(file): + result = await run_shell_cmd(f"ffprobe -v error -show_entries format=nb_streams -of default=noprint_wrappers=1:nokey=1 {file}") + return int(result.get("stdout", 0)) - 1 + + +async def run_shell_cmd(cmd): + proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + stdout, stderr = await proc.communicate() + return {"stdout": stdout.decode("utf-8"), "stderr": stderr.decode("utf-8")} + + +# Start only bot when file is called directly. if __name__ == "__main__": bot.start() bot.run(boot()) + + + +# NOT FOR PUBLIC + +#API_KEYS = { +# "abc": { +# "keys": [], +# "counter": 0, +# "exhausted": {}, +# }, +#} +#SWITCH = [0] + + +# Rotating Key function to avoid hitting limit on single Key +#async def get_key(func_tion): +# func = API_KEYS.get(func_tion, {}) +# key = func.get("keys") +# count = func.get("counter") +# count += 1 +# if count == len(key): +# count = 0 +# ret_key = key[count] +# API_KEYS[func_tion]["counter"] = count +# return ret_key + + +# Tiktok +#async def tik_dl(url: str, doc: bool, caption:str): +# status = "failed" +# headers = { +# "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36" +# } +# url_ = f"" +# response = await get_json(url_, headers=headers, json_=True) +# if not response or "status" in response and response["status"] == "failed": +# return "failed" +# if "video_data" in response: +# data = response["video_data"]["nwm_video_url_HQ"] +# status = {"path": "", "is_video": True, "media": data, "thumb": None, "caption": ""} +# if "image_data" in response: +# data = response["image_data"]["no_watermark_image_list"] +# path = f"downloads/{time.time()}" +# os.makedirs(path) +# downloads = await async_download(urls=data, path=path, doc=doc, caption=caption) +# status = {"path": path, "media": downloads, "caption": "", "is_grouped": True} +# return status + + +#async def multi_api(url: str, caption: str, doc: bool = False): +# apis = [ +# { +# "url": "", +# "headers": {}, +# "querystring": {}, +# }, +# ] + +# switch_ = SWITCH[0] + 1 +# if switch_ == len(apis): +# switch_ = 0 +# SWITCH[0] = switch_ +# +# api = apis[switch_] +# api["headers"]["API-Key"] = await get_key(f"multi_api{switch_}") +# response = await get_json(url=api.get("url"), headers=api.get("headers"), params=api.get("querystring"), json_=True) +# if not response or "message" in response: +# return "failed" +# data = response.get("data", {}).get("shortcode_media", {}) or response +# return await parse_ghraphql(json_=data, caption=caption + "\n" + "•" * switch_, doc=doc) + + +#async def api_1(url: str, caption: str, doc: bool = False): +# url = "" +# querystring = {"url": url} +# data = "failed" +# headers = { +# "API-Key": await get_key("api_1"), +# "API-Host": "", +# } +# response = await get_json(url=url, headers=headers, params=querystring) +# print(response) +# if not response or "message" in response or "messages" in response: +# return "failed" +# media = response["media"] +# path = f"downloads/{time.time()}" +# os.makedirs(path) +# if isinstance(media, list): +# downloads = await async_download(urls=media, path=path, doc=doc, caption=caption + "\n.") +# data = {"path": path, "media": downloads, "is_grouped": True} +# else: +# data = { +# "path": path, +# "caption": caption + "\n.", +# **await get_media(url=media.split("&filename")[0], path=path), +# } +# return data +