@ -5,3 +5,4 @@ python-dotenv==0.21.0

View File

@ -71,7 +71,123 @@ USERS = json.loads(os.environ.get("USERS"))
TRIGGER = os.environ.get("TRIGGER")
E_JSON = base64.b64decode("Lz9fX2E9MSZfX2Q9MQ==").decode("utf-8")
# Download Section
# Gallery-dl for all types of media
async def gallery_dl(url, caption,doc):
download_dir = f"downloads/{time.time()}"
await run_shell_cmd(f"gallery-dl -q -D {download_dir} '{url}'")
files = glob.glob(f"{download_dir}/*")
if not files:
return "failed"
ret_dict = {"path": download_dir, "caption": caption}
if doc:
# If more than 1 file return grouped media
elif len(files) > 1:
grouped_images, grouped_videos, animations = [], [], []
for file in files:
if file.endswith((".png", ".jpg", ".jpeg",".webp")):
if file.endswith(".webp"):
file = file+ ".png"
grouped_images.append(InputMediaPhoto(file, caption=caption))
elif file.endswith((".mp4", ".mkv", ".webm")):
has_audio = await check_audio(file)
if not has_audio:
grouped_videos.append(InputMediaVideo(file, caption=caption))
# Limit 5 posts per list to avoid TG's flood system
group_list = [
grouped_images[imgs : imgs + 5] for imgs in range(0, len(grouped_images), 5)
] + [grouped_videos[vids : vids + 5] for vids in range(0, len(grouped_videos), 5)
] + animations
# Final output of group_list = [ [1,2,3,4,5], [6,7,8,9,10] ]
ret_dict.update({"is_grouped": True,"media": group_list})
# Otherwise return single after renaming to proper extension coz TG converts webp to sticker
file = files[0]
ret_dict['media'] = file
if file.lower().endswith((".jpg", ".jpeg", ".png", ".webp")):
ret_dict["is_image"] = True
if file.lower().endswith(".webp"):
os.rename(file, file + ".jpg")
ret_dict.update({"media": down_load + ".jpg"})
elif file.lower().endswith((".mkv", ".mp4", ".webm")):
ret_dict.update({"is_video": True, "thumb": await take_ss(video=file, path=download_dir)})
elif file.lower().endswith(".gif"):
ret_dict.update({"is_animation": True})
return ret_dict
# YT-DLP for videos from multiple sites
async def yt_dl(url: str, caption: str, doc:bool=False):
path = "downloads/" + str(time.time())
video = f"{path}/v.mp4"
_opts = {
"outtmpl": video,
"ignoreerrors": True,
"ignore_no_formats_error": True,
"quiet": True,
"logger": FakeLogger(),
if "shorts" in url:
_opts.update({"format": "bv[ext=mp4][res=480]+ba[ext=m4a]/b[ext=mp4]"})
_opts.update({"format": "bv[ext=mp4]+ba[ext=m4a]/b[ext=mp4]"})
data = "failed"
if os.path.isfile(video):
data = {
"path": path,
"is_video": True,
"media": video,
"thumb": await take_ss(video=video, path=path),
"caption": caption,
except BaseException:
return data
# To disable YT-DLP logging
class FakeLogger(object):
def debug(self, msg):
def warning(self, msg):
def error(self, msg):
# Thumbnail
async def take_ss(video: str, path: str):
await run_shell_cmd(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{path}/i.png"''')
if os.path.isfile(path + "/i.png"):
return path + "/i.png"
# Check if it's a video or gif
async def check_audio(file):
result = await run_shell_cmd(f"ffprobe -v error -show_entries format=nb_streams -of default=noprint_wrappers=1:nokey=1 {file}")
return int(result.get("stdout", 0)) - 1
async def run_shell_cmd(cmd):
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
return {"stdout": stdout.decode("utf-8"), "stderr": stderr.decode("utf-8")}
# BOT Section
@ -109,10 +225,10 @@ class MESSAGE_PARSER:
def match_links(self):
url_map = {
"": yt_dl,
"": instagram_dl,
"": gallery_dl,
"": yt_dl,
"": yt_dl,
"": reddit_dl,
"": gallery_dl,
"": gallery_dl,
for link in self.text_list:
if (match := url_map.get(url_p(link).netloc)):
@ -310,15 +426,7 @@ async def add_h():
print("\nThe message id is wrong. \nOr \nChat id message contains letters\nonly numerical ids are allowed.\n")
return 1
social_handler = bot.add_handler(
@ -335,319 +443,7 @@ async def boot():
await idle()
# API Section
# Instagram
async def instagram_dl(url: str, caption: str, doc: bool = False):
args = locals()
# status = await instafix(message=message, link=i, caption=caption)
for i in [yt_dl, api_2]:
data = await i(**args)
if isinstance(data, dict):
return data
async def api_2(url: str, caption: str, doc: bool):
link = url.split("/?")[0] + E_JSON
response = await get_json(url=link)
if not response or "graphql" not in response:
return "failed"
return await parse_ghraphql(
response["graphql"]["shortcode_media"], caption=caption + "\n.."
async def parse_ghraphql(json_: dict, caption: str, doc: bool = False):
path = f"downloads/{time.time()}"
ret_dict = {"path": path, "thumb": None, "caption": caption}
type_check = json_.get("__typename",None)
if not type_check:
return "failed"
elif type_check == "GraphSidecar":
media = []
for i in json_["edge_sidecar_to_children"]["edges"]:
if i["node"]["__typename"] == "GraphImage":
if i["node"]["__typename"] == "GraphVideo":
ret_dict.update({"is_grouped": False if doc else True, "media": await async_download(urls=media, path=path, doc=doc, caption=caption)})
media = json_.get("video_url") or json_.get("display_url")
ret_dict.update(**await get_media(url=media, path=path))
except Exception:
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
return ret_dict
# Reddit
async def reddit_dl(url: str, caption: str, doc: bool = False):
link = url.split("/?")[0] + ".json?limit=1"
headers = {
"user-agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_8_7 rv:5.0; en-US) AppleWebKit/533.31.5 (KHTML, like Gecko) Version/4.0 Safari/533.31.5"
response = await get_json(url=link, headers=headers, json_=True)
if not response:
return "failed"
json_ = response[0]["data"]["children"][0]["data"]
caption = f'__{json_["subreddit_name_prefixed"]}:__\n**{json_["title"]}**\n\n' + caption
path = str(time.time())
is_vid, is_gallery = json_.get("is_video"), json_.get("is_gallery")
data = {"path": path, "caption": caption}
if is_vid:
video = f"{path}/v.mp4"
vid_url = json_["secure_media"]["reddit_video"]["hls_url"]
await run_shell_cmd(f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {video}')
data.update({"is_video": True, "media": video, "thumb": await take_ss(video=video, path=path)})
elif is_gallery:
grouped_media_urls = [json_["media_metadata"][val]["s"]["u"].replace("preview", "i") for val in json_["media_metadata"]]
downloads = await async_download(urls=grouped_media_urls, path=path, doc=doc, caption=caption)
data.update({"is_grouped": True, "media": downloads})
url_ = json_.get("preview", {}).get("reddit_video_preview", {}).get("fallback_url", "") or json_.get("url_overridden_by_dest", "").strip()
if not url_:
return "failed"
data.update(await get_media(url=url_, path=path))
except Exception:
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
return data
# Get Json response from APIs
async def get_json(url: str, headers: dict = None, params: dict = None, retry: bool = False, json_: bool = False, timeout: int = 10):
if retry:
client = SESSION
async with client.get(url=url, headers=headers, params=params, timeout=timeout) as ses:
if json_:
ret_json = await ses.json()
ret_json = json.loads(await ses.text())
except (json.decoder.JSONDecodeError, aiohttp.ContentTypeError, asyncio.TimeoutError):
except Exception:
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
return ret_json
# Download media and return it with media type
async def get_media(url: str, path: str):
down_load = download(url, path)
ret_dict = {"media": down_load}
if down_load.lower().endswith((".jpg", ".jpeg", ".png", ".webp")):
ret_dict["is_image"] = True
if down_load.lower().endswith(".webp"):
os.rename(down_load, down_load + ".jpg")
ret_dict.update({"media": down_load + ".jpg"})
elif down_load.lower().endswith((".mkv", ".mp4", ".webm")):
ret_dict.update({"is_video": True, "thumb": await take_ss(video=down_load, path=path)})
elif down_load.lower().endswith(".gif"):
ret_dict.update({"is_animation": True})
return {}
return ret_dict
# Download multiple media asynchronously to save time;
# Return it in a list or a list with smaller lists each containing upto 5 media.
async def async_download(urls: list, path: str, doc: bool = False, caption: str = ""):
down_loads = await asyncio.gather(*[asyncio.to_thread(download, url, path) for url in urls])
if doc:
return down_loads
[os.rename(file, file + ".png") for file in glob.glob(f"{path}/*.webp")]
files = [i + ".png" if i.endswith(".webp") else i for i in down_loads]
grouped_images, grouped_videos, animations = [], [], []
for file in files:
if file.endswith((".png", ".jpg", ".jpeg")):
grouped_images.append(InputMediaPhoto(file, caption=caption))
if file.endswith((".mp4", ".mkv", ".webm")):
has_audio = await check_audio(file)
if not has_audio:
grouped_videos.append(InputMediaVideo(file, caption=caption))
return_list = [
grouped_images[imgs : imgs + 5] for imgs in range(0, len(grouped_images), 5)
] + [grouped_videos[vids : vids + 5] for vids in range(0, len(grouped_videos), 5)
] + animations
return return_list
# Start only bot when file is called directly.
if __name__ == "__main__":
