social-dl/socialbot.py

358 lines
15 KiB
Python
Raw Normal View History

2023-02-03 15:55:02 +05:30
import asyncio
import base64
import glob
2023-02-03 15:55:02 +05:30
import json
import os
import shutil
import sys
import time
import traceback
from subprocess import call
import aiohttp
import yt_dlp
from dotenv import load_dotenv
from pyrogram import Client, filters, idle
from pyrogram.enums import ChatType
from pyrogram.errors import MediaEmpty, PeerIdInvalid, PhotoSaveFileInvalid, WebpageCurlFailed
2023-02-03 15:55:02 +05:30
from pyrogram.handlers import MessageHandler
from pyrogram.types import InputMediaPhoto, InputMediaVideo, Message
from wget import download
if os.path.isfile("config.env"):
load_dotenv("config.env")
bot = Client(name="bot", session_string=os.environ.get("STRING_SESSION"), api_id=os.environ.get("API_ID"), api_hash=os.environ.get("API_HASH"))
2023-02-03 15:55:02 +05:30
log_chat = os.environ.get("LOG")
if log_chat is None:
2023-02-03 15:55:02 +05:30
print("Enter log channel id in config")
exit()
chat_list = []
handler_ = []
users = json.loads(os.environ.get("USERS"))
trigger = os.environ.get("TRIGGER")
e_json = base64.b64decode("Lz9fX2E9MSZfX2Q9MQ==").decode("utf-8")
@bot.on_message(filters.command(commands="bot", prefixes=trigger) & filters.user(users))
async def multi_func(bot, message: Message):
rw_message = message.text.split()
try:
if "restart" in rw_message:
"""Restart bot"""
os.execl(sys.executable, sys.executable, __file__)
elif "ids" in rw_message:
"""Get chat / channel id"""
ids = ""
reply = message.reply_to_message
if reply:
reply_forward = reply.forward_from_chat
reply_user = reply.from_user
ids += f"Chat : `{reply.chat.id}`\n"
if reply_forward:
ids += f"Replied {'Channel' if reply_forward.type == ChatType.CHANNEL else 'Chat'} : `{reply_forward.id}`\n"
if reply_user:
ids += f"User : {reply.from_user.id}"
else:
ids = f"Chat :`{message.chat.id}`"
await message.reply(ids)
elif "update" in rw_message:
"""Update Auto-DL chats"""
for i in handler_:
bot.remove_handler(*i)
await add_h()
await message.reply("Chat list refreshed")
elif "join" in rw_message:
"""Join a Chat"""
if len(rw_message) > 2:
try:
await bot.join_chat(rw_message[-1])
except KeyError:
await bot.join_chat(rw_message[-1].split()[-1])
except Exception as e:
return await message.reply(str(e))
await message.reply("Joined")
elif "leave" in rw_message:
"""Leave a Chat"""
if len(rw_message) == 3:
chat = rw_message[-1]
else:
chat = message.chat.id
await bot.leave_chat(chat)
else:
await message.reply("Social-DL is running.")
except Exception:
await bot.send_message(chat_id=log_chat, text=str(traceback.format_exc()))
@bot.on_message(filters.command(commands="term", prefixes=trigger) & filters.user(users))
2023-02-03 15:55:02 +05:30
async def run_cmd(bot, message: Message):
"""Function to run shell commands"""
cmd = message.text.replace("+term", "")
status_ = await message.reply("executing...")
process = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
2023-02-03 15:55:02 +05:30
stdout, stderr = await process.communicate()
if process.returncode is not None:
output = f"${cmd}"
if stdout:
output += f"\n\n**Output:**\n\n`{stdout.decode('utf-8')}`"
if stderr:
output += f"\n\n**Error:**\n\n`{stderr.decode('utf-8')}`"
await status_.edit(output)
@bot.on_message(filters.command(commands="del", prefixes=trigger) & filters.user(users))
async def delete_message(bot, message: Message):
"""Delete Messages"""
reply = message.reply_to_message
await message.delete()
if reply:
await reply.delete()
@bot.on_message(filters.command(commands="dl", prefixes=trigger) & filters.user(users))
async def dl(bot, message: Message):
"""The main Logic Function to download media"""
2023-02-03 15:55:02 +05:30
rw_message = message.text.split()
reply = message.reply_to_message
reply_id = reply.id if reply else None
sender_ = message.author_signature or message.from_user.first_name or ""
response = await bot.send_message(message.chat.id, "`trying to download...`")
2023-02-03 15:55:02 +05:30
curse_ = ""
caption = f"Shared by : {sender_}"
2023-02-03 15:55:02 +05:30
check_dl = "failed"
if "-d" in rw_message:
doc = True
else:
doc = False
for i in rw_message:
if i.startswith("https://www.instagram.com/"):
check_dl = await iyt_dl(url=i)
curse_ = "#FuckInstagram"
if check_dl == "failed":
check_dl = await json_dl(iurl=i, caption=caption, doc=doc)
elif "twitter.com" in i or "https://youtube.com/shorts" in i or "tiktok.com" in i:
2023-02-03 15:55:02 +05:30
check_dl = await iyt_dl(url=i)
elif "www.reddit.com" in i:
check_dl = await reddit_dl(url_=i, doc=doc, sender_=sender_)
curse_ = "Link doesn't contain any media or is restricted\nTip: Make sure you are sending original post url and not an embedded post."
else:
pass
2023-02-03 15:55:02 +05:30
if isinstance(check_dl, dict):
if isinstance(check_dl["media"], list):
for vv in check_dl["media"]:
2023-02-03 15:55:02 +05:30
if isinstance(vv, list):
await bot.send_media_group(message.chat.id, media=vv, reply_to_message_id=reply_id)
2023-02-03 15:55:02 +05:30
await asyncio.sleep(3)
else:
await bot.send_document(message.chat.id, document=vv, caption=check_dl["caption"] + caption, reply_to_message_id=reply_id, force_document=True)
else:
2023-02-03 15:55:02 +05:30
if doc:
await bot.send_document(message.chat.id, document=check_dl["media"], caption=check_dl["caption"] + caption, reply_to_message_id=reply_id, force_document=True)
2023-02-03 15:55:02 +05:30
else:
try:
if check_dl["type"] == "img":
await bot.send_photo(message.chat.id, photo=check_dl["media"], caption=check_dl["caption"] + caption, reply_to_message_id=reply_id)
elif check_dl["type"] == "vid":
await bot.send_video(message.chat.id, video=check_dl["media"], caption=check_dl["caption"] + caption, thumb=check_dl["thumb"], reply_to_message_id=reply_id)
else:
await bot.send_animation(message.chat.id, animation=check_dl["media"], caption=check_dl["caption"] + caption, reply_to_message_id=reply_id, unsave=True)
except PhotoSaveFileInvalid:
await bot.send_document(message.chat.id, document=check_dl["media"], caption=check_dl["caption"] + caption, reply_to_message_id=reply_id)
except (MediaEmpty, WebpageCurlFailed, ValueError):
pass
2023-02-03 15:55:02 +05:30
if os.path.exists(str(check_dl["path"])):
shutil.rmtree(str(check_dl["path"]))
check_dl = "done"
if check_dl == "failed":
await response.edit(f"Media Download Failed.\n{curse_}")
if check_dl == "done":
await message.delete()
await response.delete()
async def iyt_dl(url: str):
2023-02-04 19:56:39 +05:30
"""Stop handling post url because this only downloads Videos and post might contain images"""
if not url.startswith("https://www.instagram.com/reel/"):
2023-02-03 15:55:02 +05:30
return "failed"
path_ = time.time()
video = f"{path_}/v.mp4"
thumb = f"{path_}/i.png"
_opts = {"outtmpl": video, "ignoreerrors": True, "ignore_no_formats_error": True, "format": "bv[ext=mp4]+ba[ext=m4a]/b[ext=mp4]", "quiet": True, "logger": FakeLogger()}
2023-02-03 15:55:02 +05:30
return_val = "failed"
try:
yt_dlp.YoutubeDL(_opts).download(url)
if os.path.isfile(video):
call(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"''', shell=True)
return_val = {"path": str(path_), "type": "vid", "media": video, "thumb": thumb if os.path.isfile(thumb) else None, "caption": ""}
2023-02-03 15:55:02 +05:30
except BaseException:
pass
return return_val
async def json_dl(iurl: str, doc: bool, caption: str):
link = iurl.split("/?")[0] + e_json
2023-02-04 19:56:39 +05:30
async with (aiohttp.ClientSession() as csession, csession.get(link, timeout=10) as session):
2023-02-03 15:55:02 +05:30
try:
2023-02-04 19:56:39 +05:30
session_resp = await session.text()
rjson = json.loads(session_resp)
except json.decoder.JSONDecodeError:
2023-02-03 15:55:02 +05:30
return "failed"
if "require_login" in rjson:
2023-02-03 16:01:49 +05:30
return "failed"
2023-02-03 15:55:02 +05:30
return_val = "failed"
if "graphql" in rjson:
try:
url = rjson["graphql"]["shortcode_media"]
d_dir = f"downloads/{time.time()}"
os.makedirs(d_dir)
if url["__typename"] == "GraphVideo":
url_ = url["video_url"]
wget_x = download(url_, d_dir)
call(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{wget_x}" -vframes 1 "{d_dir}/i.png"''', shell=True)
return_val = {"path": d_dir, "type": "vid", "media": wget_x, "thumb": d_dir + "/i.png", "caption": ""}
2023-02-03 15:55:02 +05:30
if url["__typename"] == "GraphImage":
url_ = url["display_url"]
wget_x = download(url_, d_dir + "/i.jpg")
return_val = {"path": d_dir, "type": "img", "media": wget_x, "thumb": None, "caption": ""}
2023-02-03 15:55:02 +05:30
if url["__typename"] == "GraphSidecar":
url_list = []
2023-02-03 15:55:02 +05:30
for i in url["edge_sidecar_to_children"]["edges"]:
if i["node"]["__typename"] == "GraphImage":
url_list.append(i["node"]["display_url"])
2023-02-03 15:55:02 +05:30
if i["node"]["__typename"] == "GraphVideo":
url_list.append(i["node"]["video_url"])
downloads = await async_download(urls=url_list, path=d_dir, doc=doc, caption=caption + "\n..")
return_val = {"path": d_dir, "media": downloads}
2023-02-03 15:55:02 +05:30
except Exception:
await bot.send_message(chat_id=log_chat, text=str(traceback.format_exc()))
return return_val
async def reddit_dl(bot, message: Message):
link = url_.split("/?")[0] + ".json?limit=1"
headers = {"user-agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_8_7 rv:5.0; en-US) AppleWebKit/533.31.5 (KHTML, like Gecko) Version/4.0 Safari/533.31.5"}
return_val = "failed"
try:
async with (aiohttp.ClientSession() as session, session.get(link, headers=headers) as ss):
response = await ss.json()
json_ = response[0]["data"]["children"][0]["data"]
caption = f'__{json_["subreddit_name_prefixed"]}:__\n**{json_["title"]}**\n\n'
d_dir = str(time.time())
os.mkdir(d_dir)
is_vid, is_gallery = json_.get("is_video"), json_.get("is_gallery")
if is_vid:
video = f"{d_dir}/v.mp4"
thumb = f"{d_dir}/i.png"
vid_url = json_["secure_media"]["reddit_video"]["hls_url"]
call(f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {video}', shell=True)
call(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"''', shell=True)
return_val = {"path": d_dir, "type": "vid", "media": video, "thumb": thumb, "caption": caption}
elif is_gallery:
grouped_media_urls = [f'https://i.redd.it/{i["media_id"]}.jpg' for i in json_["gallery_data"]["items"]]
downloads = await async_download(urls=grouped_media_urls, path=d_dir, doc=doc, caption=caption + f"Shared by : {sender_}")
return_val = {"path": d_dir, "media": downloads}
else:
2023-02-20 22:59:01 +05:30
media_ = json_get("url_overridden_by_dest","").strip()
if media_.endswith((".jpg", ".jpeg", ".png", ".webp")):
img = download(media_, d_dir)
return_val = {"path": d_dir, "type": "img", "media": img, "thumb": None, "caption": caption}
elif media_.endswith(".gif"):
gif = download(media_, d_dir)
return_val = {"path": d_dir, "type": "animation", "media": gif, "thumb": None, "caption": caption}
else:
gif_url = json_.get("preview", {}).get("reddit_video_preview", {}).get("fallback_url")
if gif_url:
gif = download(gif_url, d_dir)
return_val = {"path": d_dir, "type": "animation", "media": gif, "thumb": None, "caption": caption}
except Exception:
await bot.send_message(chat_id=log_chat, text=str(traceback.format_exc()))
return return_val
async def async_download(urls: list, path: str, doc: bool = False, caption: str = ""):
down_loads = await asyncio.gather(*[asyncio.to_thread(download, url, path) for url in urls])
if doc:
return down_loads
[os.rename(file, file + ".png") for file in glob.glob(f"{path}/*.webp")]
files = glob.glob(f"{path}/*")
grouped_images = [InputMediaPhoto(img, caption=caption) for img in files if img.endswith((".png", ".jpg", ".jpeg"))]
grouped_videos = [InputMediaVideo(vid, caption=caption) for vid in files if vid.endswith((".mp4", ".mkv", ".webm"))]
return_list = [grouped_images[imgs : imgs + 5] for imgs in range(0, len(grouped_images), 5)] + [
grouped_videos[vids : vids + 5] for vids in range(0, len(grouped_videos), 5)
]
return return_list
2023-02-03 15:55:02 +05:30
class FakeLogger(object):
def debug(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
async def add_h():
message_id = os.environ.get("MESSAGE")
if message_id is None:
2023-02-03 15:55:02 +05:30
print("Enter Message id in config.\n")
return 1
try:
msg = (await bot.get_messages(int(log_chat), int(message_id))).text
except PeerIdInvalid:
print("Log channel not found.\nCheck the variable for mistakes")
return 1
chat_list.clear()
if msg is None:
2023-02-03 15:55:02 +05:30
print("Message not found\nCheck variable for mistakes\n")
return 1
try:
chats_list = [int(i) for i in msg.split()]
except ValueError:
print("Chat id message contains letters\nonly numerical ids are allowed.\nOr the message id is wrong.\n")
return 1
chat_list.extend(chats_list)
social_handler = bot.add_handler(
MessageHandler(
dl,((
2023-02-03 15:55:02 +05:30
filters.regex(r"^https://www.instagram.com/*")
| filters.regex(r"^https://youtube.com/shorts/*")
| filters.regex(r"^https://twitter.com/*")
| filters.regex(r"^https://vm.tiktok.com/*")
| filters.regex(r"^https://www.reddit.com/*")
) & filters.chat(chat_list))),
2023-02-03 15:55:02 +05:30
group=1,
)
handler_.append(social_handler)
2023-02-03 15:55:02 +05:30
async def boot():
check_handlers = await add_h()
msg = "#Social-dl\nStarted\n"
if check_handlers == 1:
msg += "Running in command only mode."
print(msg)
await bot.send_message(chat_id=int(log_chat), text=msg)
await idle()
if __name__ == "__main__":
bot.start()
bot.run(boot())