Add message parser and Upstream with private repo.

This commit is contained in:
anonymousx97 2023-03-26 21:18:16 +05:30
parent 6c5afff7d9
commit 889f031561
2 changed files with 552 additions and 254 deletions

View File

@ -2,12 +2,14 @@ API_ID=123456
API_HASH="abcd1238fn...."
STRING_SESSION="Ab0fbs......."
DEV_MODE = "no"
LOG=-10012345678
MESSAGE=12345
STRING_SESSION="Ab0fbs......."
TRIGGER="."
USERS=[12345678] # Multiple user IDs should be separated by ,

View File

@ -1,3 +1,31 @@
"""
* socialbot.py Main Logic file of Bot.
MIT License
Copyright (c) 2023 Ryuk
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import asyncio
import base64
import glob
@ -7,14 +35,16 @@ import shutil
import sys
import time
import traceback
from subprocess import call
from io import StringIO
from urllib.parse import urlparse as url_p
import aiohttp
import yt_dlp
from aiohttp_retry import ExponentialRetry, RetryClient
from dotenv import load_dotenv
from pyrogram import Client, filters, idle
from pyrogram.enums import ChatType
from pyrogram.errors import MediaEmpty, PeerIdInvalid, PhotoSaveFileInvalid, WebpageCurlFailed
from pyrogram.enums import ChatType, ParseMode
from pyrogram.errors import MediaEmpty, PhotoSaveFileInvalid, WebpageCurlFailed
from pyrogram.handlers import MessageHandler
from pyrogram.types import InputMediaPhoto, InputMediaVideo, Message
from wget import download
@ -22,31 +52,138 @@ from wget import download
if os.path.isfile("config.env"):
load_dotenv("config.env")
bot = Client(name="bot", session_string=os.environ.get("STRING_SESSION"), api_id=os.environ.get("API_ID"), api_hash=os.environ.get("API_HASH"))
log_chat = os.environ.get("LOG")
if log_chat is None:
bot = Client(
name="bot",
session_string=os.environ.get("STRING_SESSION"),
api_id=os.environ.get("API_ID"),
api_hash=os.environ.get("API_HASH"),
in_memory=True,
parse_mode=ParseMode.DEFAULT,
)
LOG_CHAT = os.environ.get("LOG")
if LOG_CHAT is None:
print("Enter log channel id in config")
exit()
chat_list = []
handler_ = []
users = json.loads(os.environ.get("USERS"))
trigger = os.environ.get("TRIGGER")
e_json = base64.b64decode("Lz9fX2E9MSZfX2Q9MQ==").decode("utf-8")
USERS = json.loads(os.environ.get("USERS"))
TRIGGER = os.environ.get("TRIGGER")
E_JSON = base64.b64decode("Lz9fX2E9MSZfX2Q9MQ==").decode("utf-8")
@bot.on_message(filters.command(commands="bot", prefixes=trigger) & filters.user(users))
# BOT Section
@bot.on_edited_message(filters.command(commands="dl", prefixes=TRIGGER) & filters.user(USERS))
@bot.on_message(filters.command(commands="dl", prefixes=TRIGGER) & filters.user(USERS))
async def dl(bot, message: Message):
parsed = MESSAGE_PARSER(message)
if not parsed.coro_:
return
status = "failed"
msg_ = await bot.send_message(chat_id=message.chat.id, text="`trying to download...`")
for coroutine_ in parsed.coro_:
media_dict = await coroutine_
if isinstance(media_dict, dict) and "media" in media_dict:
status = await send_media(message=message, data=media_dict, doc=parsed.doc, caption=parsed.caption)
if status == "failed":
return await msg_.edit(f"Media Download Failed.")
await message.delete()
await msg_.delete()
# Parse Message text and return coroutine for matched links
class MESSAGE_PARSER:
def __init__(self, message):
self.text_list = message.text.split()
self.flags = [i for i in self.text_list if i.startswith("-")]
self.sender = message.author_signature or (user.first_name if (user := message.from_user) else "")
self.caption = f"Shared by : {self.sender}"
self.doc = "-d" in self.flags
self.coro_ = []
self.match_links()
# Thanks Jeel Patel [TG @jeelpatel231] for url map concept.
def match_links(self):
url_map = {
"tiktok.com": yt_dl,
"www.instagram.com": instagram_dl,
"youtube.com/shorts": yt_dl,
"twitter.com": yt_dl,
"www.reddit.com": reddit_dl,
}
for link in self.text_list:
if (match := url_map.get(url_p(link).netloc)):
self.coro_.append(match(url=link,doc=self.doc, caption=self.caption))
else:
for key, val in url_map.items():
if key in link:
self.coro_.append(val(url=link,doc=self.doc, caption=self.caption))
# Send media back
async def send_media(message: Message, data: dict, caption: str, doc: bool = False):
reply = message.reply_to_message
reply_id = reply.id if reply else None
media = data.get("media")
thumb = data.get("thumb", None)
caption = data.get("caption", "")
is_image, is_video, is_animation, is_grouped = (data.get("is_image"), data.get("is_video"), data.get("is_animation"), data.get("is_grouped"))
status = "failed"
args_ = {"chat_id": message.chat.id, "reply_to_message_id": reply_id}
if isinstance(media, list):
for vv in media:
try:
if isinstance(vv, list):
status = await bot.send_media_group(**args_, media=vv)
await asyncio.sleep(2)
elif doc:
status = await bot.send_document(**args_, caption=caption, document=vv, force_document=True)
else:
status = await bot.send_animation(**args_, caption=caption, animation=vv, unsave=True)
except Exception:
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
else:
args_.update({"caption": caption})
try:
if is_image:
status = await bot.send_photo(**args_, photo=media)
elif is_video:
status = await bot.send_video(**args_, video=media, thumb=thumb)
elif is_animation:
status = await bot.send_animation(**args_, animation=media, unsave=True)
else:
status = await bot.send_document(**args_, document=media, force_document=True)
except PhotoSaveFileInvalid:
await bot.send_document(**args_, document=media, force_document=True)
except (MediaEmpty, WebpageCurlFailed, ValueError):
pass
if os.path.exists(str(data["path"])):
shutil.rmtree(str(data["path"]))
if status != "failed":
return "done"
return status
@bot.on_edited_message(filters.command(commands="bot", prefixes=TRIGGER) & filters.user(USERS))
@bot.on_message(filters.command(commands="bot", prefixes=TRIGGER) & filters.user(USERS))
async def multi_func(bot, message: Message):
rw_message = message.text.split()
try:
# Restart
if "restart" in rw_message:
"""Restart bot"""
await SESSION.close()
await RETRY_CLIENT.close()
os.execl(sys.executable, sys.executable, __file__)
# Get chat / channel id
elif "ids" in rw_message:
"""Get chat / channel id"""
if (reply := message.reply_to_message):
ids = ""
reply = message.reply_to_message
if reply:
reply_forward = reply.forward_from_chat
reply_user = reply.from_user
ids += f"Chat : `{reply.chat.id}`\n"
@ -58,26 +195,24 @@ async def multi_func(bot, message: Message):
ids = f"Chat :`{message.chat.id}`"
await message.reply(ids)
# Update Auto-DL chats
elif "update" in rw_message:
"""Update Auto-DL chats"""
for i in handler_:
bot.remove_handler(*i)
bot.remove_handler(*HANDLER_)
await add_h()
await message.reply("Chat list refreshed")
# Join a chat
elif "join" in rw_message:
"""Join a Chat"""
if len(rw_message) > 2:
try:
await bot.join_chat(rw_message[-1])
except KeyError:
await bot.join_chat(rw_message[-1].split()[-1])
await bot.join_chat(os.path.basename(rw_message[-1]).strip())
except Exception as e:
return await message.reply(str(e))
await message.reply("Joined")
# Leave a chat
elif "leave" in rw_message:
"""Leave a Chat"""
if len(rw_message) == 3:
chat = rw_message[-1]
else:
@ -85,218 +220,204 @@ async def multi_func(bot, message: Message):
await bot.leave_chat(chat)
else:
await message.reply("Social-DL is running.")
await message.reply("Social-DL is running")
except Exception:
await bot.send_message(chat_id=log_chat, text=str(traceback.format_exc()))
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
@bot.on_message(filters.command(commands="term", prefixes=trigger) & filters.user(users))
async def run_cmd(bot, message: Message):
"""Function to run shell commands"""
cmd = message.text.replace("+term", "")
status_ = await message.reply("executing...")
process = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
if process.returncode is not None:
output = f"${cmd}"
if stdout:
output += f"\n\n**Output:**\n\n`{stdout.decode('utf-8')}`"
if stderr:
output += f"\n\n**Error:**\n\n`{stderr.decode('utf-8')}`"
await status_.edit(output)
@bot.on_message(filters.command(commands="del", prefixes=trigger) & filters.user(users))
# Delete replied and command message
@bot.on_message(filters.command(commands="del", prefixes=TRIGGER) & filters.user(USERS))
async def delete_message(bot, message: Message):
"""Delete Messages"""
reply = message.reply_to_message
await message.delete()
if reply:
await reply.delete()
@bot.on_message(filters.command(commands="dl", prefixes=trigger) & filters.user(users))
async def dl(bot, message: Message):
"""The main Logic Function to download media"""
rw_message = message.text.split()
# Delete Multiple messages from replied to command.
@bot.on_message(filters.command(commands="purge", prefixes=TRIGGER) & filters.user(USERS))
async def purge_(bot, message: Message):
reply = message.reply_to_message
reply_id = reply.id if reply else None
sender_ = message.author_signature or message.from_user.first_name or ""
response = await bot.send_message(message.chat.id, "`trying to download...`")
curse_ = ""
caption = f"Shared by : {sender_}"
check_dl = "failed"
if "-d" in rw_message:
doc = True
else:
doc = False
for i in rw_message:
if i.startswith("https://www.instagram.com/"):
check_dl = await iyt_dl(url=i)
curse_ = "#FuckInstagram"
if check_dl == "failed":
check_dl = await json_dl(iurl=i, caption=caption, doc=doc)
elif "twitter.com" in i or "https://youtube.com/shorts" in i or "tiktok.com" in i:
check_dl = await iyt_dl(url=i)
elif "www.reddit.com" in i:
check_dl = await reddit_dl(url_=i, doc=doc, sender_=sender_)
curse_ = "Link doesn't contain any media or is restricted\nTip: Make sure you are sending original post url and not an embedded post."
else:
pass
if isinstance(check_dl, dict):
if isinstance(check_dl["media"], list):
for vv in check_dl["media"]:
if isinstance(vv, list):
await bot.send_media_group(message.chat.id, media=vv, reply_to_message_id=reply_id)
await asyncio.sleep(3)
else:
await bot.send_document(message.chat.id, document=vv, caption=check_dl["caption"] + caption, reply_to_message_id=reply_id, force_document=True)
else:
if doc:
await bot.send_document(message.chat.id, document=check_dl["media"], caption=check_dl["caption"] + caption, reply_to_message_id=reply_id, force_document=True)
else:
if not reply:
return await message.reply("reply to a message")
start_message = reply.id
end_message = message.id
messages = [end_message] + [i for i in range(int(start_message), int(end_message))]
await bot.delete_messages(chat_id=message.chat.id, message_ids=messages, revoke=True)
if os.environ.get("DEV_MODE") == "yes":
# Run shell commands
@bot.on_edited_message(filters.command(commands="sh", prefixes=TRIGGER) & filters.user(USERS))
@bot.on_message(filters.command(commands="sh", prefixes=TRIGGER) & filters.user(USERS))
async def run_cmd(bot, message: Message):
cmd = message.text.replace(f"{TRIGGER}sh ", "").strip()
status_ = await message.reply("executing...")
proc = await run_shell_cmd(cmd)
output = f"${cmd}"
if (stdout := proc.get("stdout")):
output += f"""\n\n**Output:**\n\n`{stdout}`"""
if (stderr := proc.get("stderr")):
output += f"""\n\n**Error:**\n\n`{stderr}`"""
await status_.edit(output,parse_mode=ParseMode.MARKDOWN)
# Run Python code
@bot.on_edited_message(
filters.command(commands="exec", prefixes=TRIGGER) & filters.user(USERS)
)
@bot.on_message(
filters.command(commands="exec", prefixes=TRIGGER) & filters.user(USERS)
)
async def executor_(bot, message):
code = message.text.replace(f"{TRIGGER}exec","").strip()
if not code:
return await message.reply("exec Jo mama?")
reply = await message.reply("executing")
sys.stdout = codeOut = StringIO()
sys.stderr = codeErr = StringIO()
# Indent code as per proper python syntax
formatted_code = "".join(["\n "+i for i in code.split("\n")])
try:
if check_dl["type"] == "img":
await bot.send_photo(message.chat.id, photo=check_dl["media"], caption=check_dl["caption"] + caption, reply_to_message_id=reply_id)
elif check_dl["type"] == "vid":
await bot.send_video(message.chat.id, video=check_dl["media"], caption=check_dl["caption"] + caption, thumb=check_dl["thumb"], reply_to_message_id=reply_id)
else:
await bot.send_animation(message.chat.id, animation=check_dl["media"], caption=check_dl["caption"] + caption, reply_to_message_id=reply_id, unsave=True)
except PhotoSaveFileInvalid:
await bot.send_document(message.chat.id, document=check_dl["media"], caption=check_dl["caption"] + caption, reply_to_message_id=reply_id)
except (MediaEmpty, WebpageCurlFailed, ValueError):
pass
if os.path.exists(str(check_dl["path"])):
shutil.rmtree(str(check_dl["path"]))
check_dl = "done"
if check_dl == "failed":
await response.edit(f"Media Download Failed.\n{curse_}")
if check_dl == "done":
await message.delete()
await response.delete()
# Create and initialise the function
exec(f"async def exec_(bot, message):{formatted_code}")
func_out = await locals().get("exec_")(bot, message)
except Exception:
func_out = str(traceback.format_exc())
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
output = codeOut.getvalue().strip() or codeErr.getvalue().strip() or func_out or ""
await reply.edit(f"> `{code}`\n\n>> `{output}`",parse_mode=ParseMode.MARKDOWN)
async def iyt_dl(url: str):
"""Stop handling post url because this only downloads Videos and post might contain images"""
if not url.startswith("https://www.instagram.com/reel/"):
# Add Auto-DL regex Handler
async def add_h():
message_id = os.environ.get("MESSAGE")
if message_id is None:
print("\nEnter Message id in config.\n")
return 1
try:
msg = (await bot.get_messages(int(LOG_CHAT), int(message_id))).text
except PeerIdInvalid:
print("\nLog channel not found.\nCheck the variable for mistakes")
return 1
if msg is None:
print("\nMessage not found\nCheck variable for mistakes\n")
return 1
try:
chats_list = [int(i) for i in msg.split()]
except ValueError:
print("\nThe message id is wrong. \nOr \nChat id message contains letters\nonly numerical ids are allowed.\n")
return 1
social_handler = bot.add_handler(
MessageHandler(
dl,
(
(filters.regex(r"^http*"))
& filters.chat(chats_list)
),
),
group=1,
)
globals().update({"HANDLER_":social_handler})
# Start the bot and wait idle without blocking the main loop
async def boot():
check_handlers = await add_h()
msg = "#Social-dl\nStarted\n"
if check_handlers == 1:
msg += "\n* Running in command only mode. *"
print(msg)
await bot.send_message(chat_id=int(LOG_CHAT), text="#Social-dl\n__Started__")
globals().update({"SESSION":aiohttp.ClientSession()})
globals().update({"RETRY_CLIENT":RetryClient(client_session=SESSION, retry_for_statuses={408, 504}, retry_options=ExponentialRetry(attempts=1))})
await idle()
# API Section
# Instagram
async def instagram_dl(url: str, caption: str, doc: bool = False):
args = locals()
# status = await instafix(message=message, link=i, caption=caption)
for i in [yt_dl, api_2]:
data = await i(**args)
if isinstance(data, dict):
break
return data
async def api_2(url: str, caption: str, doc: bool):
link = url.split("/?")[0] + E_JSON
response = await get_json(url=link)
if not response or "graphql" not in response:
return "failed"
path_ = time.time()
video = f"{path_}/v.mp4"
thumb = f"{path_}/i.png"
_opts = {"outtmpl": video, "ignoreerrors": True, "ignore_no_formats_error": True, "format": "bv[ext=mp4]+ba[ext=m4a]/b[ext=mp4]", "quiet": True, "logger": FakeLogger()}
return_val = "failed"
return await parse_ghraphql(
response["graphql"]["shortcode_media"], caption=caption + "\n.."
)
async def parse_ghraphql(json_: dict, caption: str, doc: bool = False):
try:
path = f"downloads/{time.time()}"
os.makedirs(path)
ret_dict = {"path": path, "thumb": None, "caption": caption}
type_check = json_.get("__typename",None)
if not type_check:
return "failed"
elif type_check == "GraphSidecar":
media = []
for i in json_["edge_sidecar_to_children"]["edges"]:
if i["node"]["__typename"] == "GraphImage":
media.append(i["node"]["display_url"])
if i["node"]["__typename"] == "GraphVideo":
media.append(i["node"]["video_url"])
ret_dict.update({"is_grouped": False if doc else True, "media": await async_download(urls=media, path=path, doc=doc, caption=caption)})
else:
media = json_.get("video_url") or json_.get("display_url")
ret_dict.update(**await get_media(url=media, path=path))
except Exception:
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
return ret_dict
# YT-DLP for videos from multiple sites
async def yt_dl(url: str, caption: str, doc:bool=False):
if "instagram.com/p/" in url:
return
path = str(time.time())
video = f"{path}/v.mp4"
_opts = {
"outtmpl": video,
"ignoreerrors": True,
"ignore_no_formats_error": True,
"quiet": True,
"logger": FakeLogger(),
}
if "shorts" in url:
_opts.update({"format": "bv[ext=mp4][res=480]+ba[ext=m4a]/b[ext=mp4]"})
else:
_opts.update({"format": "bv[ext=mp4]+ba[ext=m4a]/b[ext=mp4]"})
data = "failed"
try:
yt_dlp.YoutubeDL(_opts).download(url)
if os.path.isfile(video):
call(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"''', shell=True)
return_val = {"path": str(path_), "type": "vid", "media": video, "thumb": thumb if os.path.isfile(thumb) else None, "caption": ""}
data = {
"path": path,
"is_video": True,
"media": video,
"thumb": await take_ss(video=video, path=path),
"caption": caption,
}
except BaseException:
pass
return return_val
async def json_dl(iurl: str, doc: bool, caption: str):
link = iurl.split("/?")[0] + e_json
async with (aiohttp.ClientSession() as csession, csession.get(link, timeout=10) as session):
try:
session_resp = await session.text()
rjson = json.loads(session_resp)
except json.decoder.JSONDecodeError:
return "failed"
if "require_login" in rjson:
return "failed"
return_val = "failed"
if "graphql" in rjson:
try:
url = rjson["graphql"]["shortcode_media"]
d_dir = f"downloads/{time.time()}"
os.makedirs(d_dir)
if url["__typename"] == "GraphVideo":
url_ = url["video_url"]
wget_x = download(url_, d_dir)
call(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{wget_x}" -vframes 1 "{d_dir}/i.png"''', shell=True)
return_val = {"path": d_dir, "type": "vid", "media": wget_x, "thumb": d_dir + "/i.png", "caption": ""}
if url["__typename"] == "GraphImage":
url_ = url["display_url"]
wget_x = download(url_, d_dir + "/i.jpg")
return_val = {"path": d_dir, "type": "img", "media": wget_x, "thumb": None, "caption": ""}
if url["__typename"] == "GraphSidecar":
url_list = []
for i in url["edge_sidecar_to_children"]["edges"]:
if i["node"]["__typename"] == "GraphImage":
url_list.append(i["node"]["display_url"])
if i["node"]["__typename"] == "GraphVideo":
url_list.append(i["node"]["video_url"])
downloads = await async_download(urls=url_list, path=d_dir, doc=doc, caption=caption + "\n..")
return_val = {"path": d_dir, "media": downloads}
except Exception:
await bot.send_message(chat_id=log_chat, text=str(traceback.format_exc()))
return return_val
async def reddit_dl(bot, message: Message):
link = url_.split("/?")[0] + ".json?limit=1"
headers = {"user-agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_8_7 rv:5.0; en-US) AppleWebKit/533.31.5 (KHTML, like Gecko) Version/4.0 Safari/533.31.5"}
return_val = "failed"
try:
async with (aiohttp.ClientSession() as session, session.get(link, headers=headers) as ss):
response = await ss.json()
json_ = response[0]["data"]["children"][0]["data"]
caption = f'__{json_["subreddit_name_prefixed"]}:__\n**{json_["title"]}**\n\n'
d_dir = str(time.time())
os.mkdir(d_dir)
is_vid, is_gallery = json_.get("is_video"), json_.get("is_gallery")
if is_vid:
video = f"{d_dir}/v.mp4"
thumb = f"{d_dir}/i.png"
vid_url = json_["secure_media"]["reddit_video"]["hls_url"]
call(f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {video}', shell=True)
call(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"''', shell=True)
return_val = {"path": d_dir, "type": "vid", "media": video, "thumb": thumb, "caption": caption}
elif is_gallery:
grouped_media_urls = [f'https://i.redd.it/{i["media_id"]}.jpg' for i in json_["gallery_data"]["items"]]
downloads = await async_download(urls=grouped_media_urls, path=d_dir, doc=doc, caption=caption + f"Shared by : {sender_}")
return_val = {"path": d_dir, "media": downloads}
else:
media_ = json_get("url_overridden_by_dest","").strip()
if media_.endswith((".jpg", ".jpeg", ".png", ".webp")):
img = download(media_, d_dir)
return_val = {"path": d_dir, "type": "img", "media": img, "thumb": None, "caption": caption}
elif media_.endswith(".gif"):
gif = download(media_, d_dir)
return_val = {"path": d_dir, "type": "animation", "media": gif, "thumb": None, "caption": caption}
else:
gif_url = json_.get("preview", {}).get("reddit_video_preview", {}).get("fallback_url")
if gif_url:
gif = download(gif_url, d_dir)
return_val = {"path": d_dir, "type": "animation", "media": gif, "thumb": None, "caption": caption}
except Exception:
await bot.send_message(chat_id=log_chat, text=str(traceback.format_exc()))
return return_val
async def async_download(urls: list, path: str, doc: bool = False, caption: str = ""):
down_loads = await asyncio.gather(*[asyncio.to_thread(download, url, path) for url in urls])
if doc:
return down_loads
[os.rename(file, file + ".png") for file in glob.glob(f"{path}/*.webp")]
files = [ i + ".png" if i.endswith(".webp") else i for i in down_loads ]
grouped_images = [InputMediaPhoto(img, caption=caption) for img in files if img.endswith((".png", ".jpg", ".jpeg"))]
grouped_videos = [InputMediaVideo(vid, caption=caption) for vid in files if vid.endswith((".mp4", ".mkv", ".webm"))]
return_list = [grouped_images[imgs : imgs + 5] for imgs in range(0, len(grouped_images), 5)] + [
grouped_videos[vids : vids + 5] for vids in range(0, len(grouped_videos), 5)
]
return return_list
return data
# To disable YT-DLP logging
class FakeLogger(object):
def debug(self, msg):
pass
@ -308,50 +429,225 @@ class FakeLogger(object):
pass
async def add_h():
message_id = os.environ.get("MESSAGE")
if message_id is None:
print("Enter Message id in config.\n")
return 1
# Reddit
async def reddit_dl(url: str, caption: str, doc: bool = False):
link = url.split("/?")[0] + ".json?limit=1"
headers = {
"user-agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_8_7 rv:5.0; en-US) AppleWebKit/533.31.5 (KHTML, like Gecko) Version/4.0 Safari/533.31.5"
}
try:
msg = (await bot.get_messages(int(log_chat), int(message_id))).text
except PeerIdInvalid:
print("Log channel not found.\nCheck the variable for mistakes")
return 1
chat_list.clear()
if msg is None:
print("Message not found\nCheck variable for mistakes\n")
return 1
response = await get_json(url=link, headers=headers, json_=True)
if not response:
return "failed"
json_ = response[0]["data"]["children"][0]["data"]
caption = f'__{json_["subreddit_name_prefixed"]}:__\n**{json_["title"]}**\n\n' + caption
path = str(time.time())
os.mkdir(path)
is_vid, is_gallery = json_.get("is_video"), json_.get("is_gallery")
data = {"path": path, "caption": caption}
if is_vid:
video = f"{path}/v.mp4"
vid_url = json_["secure_media"]["reddit_video"]["hls_url"]
await run_shell_cmd(f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {video}')
data.update({"is_video": True, "media": video, "thumb": await take_ss(video=video, path=path)})
elif is_gallery:
grouped_media_urls = [json_["media_metadata"][val]["s"]["u"].replace("preview", "i") for val in json_["media_metadata"]]
downloads = await async_download(urls=grouped_media_urls, path=path, doc=doc, caption=caption)
data.update({"is_grouped": True, "media": downloads})
else:
url_ = json_.get("preview", {}).get("reddit_video_preview", {}).get("fallback_url", "") or json_.get("url_overridden_by_dest", "").strip()
if not url_:
return "failed"
data.update(await get_media(url=url_, path=path))
except Exception:
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
return data
# Get Json response from APIs
async def get_json(url: str, headers: dict = None, params: dict = None, retry: bool = False, json_: bool = False, timeout: int = 10):
if retry:
client = RETRY_CLIENT
else:
client = SESSION
try:
chats_list = [int(i) for i in msg.split()]
except ValueError:
print("Chat id message contains letters\nonly numerical ids are allowed.\nOr the message id is wrong.\n")
return 1
chat_list.extend(chats_list)
social_handler = bot.add_handler(
MessageHandler(
dl,((
filters.regex(r"^https://www.instagram.com/*")
| filters.regex(r"^https://youtube.com/shorts/*")
| filters.regex(r"^https://twitter.com/*")
| filters.regex(r"^https://vm.tiktok.com/*")
| filters.regex(r"^https://www.reddit.com/*")
) & filters.chat(chat_list))),
group=1,
)
handler_.append(social_handler)
async with client.get(url=url, headers=headers, params=params, timeout=timeout) as ses:
if json_:
ret_json = await ses.json()
else:
ret_json = json.loads(await ses.text())
except (json.decoder.JSONDecodeError, aiohttp.ContentTypeError, asyncio.TimeoutError):
return
except Exception:
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
return
return ret_json
async def boot():
check_handlers = await add_h()
msg = "#Social-dl\nStarted\n"
if check_handlers == 1:
msg += "Running in command only mode."
print(msg)
await bot.send_message(chat_id=int(log_chat), text=msg)
await idle()
# Download media and return it with media type
async def get_media(url: str, path: str):
down_load = download(url, path)
ret_dict = {"media": down_load}
if down_load.lower().endswith((".jpg", ".jpeg", ".png", ".webp")):
ret_dict["is_image"] = True
if down_load.lower().endswith(".webp"):
os.rename(down_load, down_load + ".jpg")
ret_dict.update({"media": down_load + ".jpg"})
elif down_load.lower().endswith((".mkv", ".mp4", ".webm")):
ret_dict.update({"is_video": True, "thumb": await take_ss(video=down_load, path=path)})
elif down_load.lower().endswith(".gif"):
ret_dict.update({"is_animation": True})
else:
return {}
return ret_dict
# Download multiple media asynchronously to save time;
# Return it in a list or a list with smaller lists each containing upto 5 media.
async def async_download(urls: list, path: str, doc: bool = False, caption: str = ""):
down_loads = await asyncio.gather(*[asyncio.to_thread(download, url, path) for url in urls])
if doc:
return down_loads
[os.rename(file, file + ".png") for file in glob.glob(f"{path}/*.webp")]
files = [i + ".png" if i.endswith(".webp") else i for i in down_loads]
grouped_images, grouped_videos, animations = [], [], []
for file in files:
if file.endswith((".png", ".jpg", ".jpeg")):
grouped_images.append(InputMediaPhoto(file, caption=caption))
if file.endswith((".mp4", ".mkv", ".webm")):
has_audio = await check_audio(file)
if not has_audio:
animations.append(file)
else:
grouped_videos.append(InputMediaVideo(file, caption=caption))
return_list = [
grouped_images[imgs : imgs + 5] for imgs in range(0, len(grouped_images), 5)
] + [grouped_videos[vids : vids + 5] for vids in range(0, len(grouped_videos), 5)
] + animations
return return_list
# Thumbnail
async def take_ss(video: str, path: str):
await run_shell_cmd(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{path}/i.png"''')
if os.path.isfile(path + "/i.png"):
return path + "/i.png"
async def check_audio(file):
result = await run_shell_cmd(f"ffprobe -v error -show_entries format=nb_streams -of default=noprint_wrappers=1:nokey=1 {file}")
return int(result.get("stdout", 0)) - 1
async def run_shell_cmd(cmd):
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
return {"stdout": stdout.decode("utf-8"), "stderr": stderr.decode("utf-8")}
# Start only bot when file is called directly.
if __name__ == "__main__":
bot.start()
bot.run(boot())
# NOT FOR PUBLIC
#API_KEYS = {
# "abc": {
# "keys": [],
# "counter": 0,
# "exhausted": {},
# },
#}
#SWITCH = [0]
# Rotating Key function to avoid hitting limit on single Key
#async def get_key(func_tion):
# func = API_KEYS.get(func_tion, {})
# key = func.get("keys")
# count = func.get("counter")
# count += 1
# if count == len(key):
# count = 0
# ret_key = key[count]
# API_KEYS[func_tion]["counter"] = count
# return ret_key
# Tiktok
#async def tik_dl(url: str, doc: bool, caption:str):
# status = "failed"
# headers = {
# "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36"
# }
# url_ = f""
# response = await get_json(url_, headers=headers, json_=True)
# if not response or "status" in response and response["status"] == "failed":
# return "failed"
# if "video_data" in response:
# data = response["video_data"]["nwm_video_url_HQ"]
# status = {"path": "", "is_video": True, "media": data, "thumb": None, "caption": ""}
# if "image_data" in response:
# data = response["image_data"]["no_watermark_image_list"]
# path = f"downloads/{time.time()}"
# os.makedirs(path)
# downloads = await async_download(urls=data, path=path, doc=doc, caption=caption)
# status = {"path": path, "media": downloads, "caption": "", "is_grouped": True}
# return status
#async def multi_api(url: str, caption: str, doc: bool = False):
# apis = [
# {
# "url": "",
# "headers": {},
# "querystring": {},
# },
# ]
# switch_ = SWITCH[0] + 1
# if switch_ == len(apis):
# switch_ = 0
# SWITCH[0] = switch_
#
# api = apis[switch_]
# api["headers"]["API-Key"] = await get_key(f"multi_api{switch_}")
# response = await get_json(url=api.get("url"), headers=api.get("headers"), params=api.get("querystring"), json_=True)
# if not response or "message" in response:
# return "failed"
# data = response.get("data", {}).get("shortcode_media", {}) or response
# return await parse_ghraphql(json_=data, caption=caption + "\n" + "•" * switch_, doc=doc)
#async def api_1(url: str, caption: str, doc: bool = False):
# url = ""
# querystring = {"url": url}
# data = "failed"
# headers = {
# "API-Key": await get_key("api_1"),
# "API-Host": "",
# }
# response = await get_json(url=url, headers=headers, params=querystring)
# print(response)
# if not response or "message" in response or "messages" in response:
# return "failed"
# media = response["media"]
# path = f"downloads/{time.time()}"
# os.makedirs(path)
# if isinstance(media, list):
# downloads = await async_download(urls=media, path=path, doc=doc, caption=caption + "\n.")
# data = {"path": path, "media": downloads, "is_grouped": True}
# else:
# data = {
# "path": path,
# "caption": caption + "\n.",
# **await get_media(url=media.split("&filename")[0], path=path),
# }
# return data