Rework and Feature update.

This commit is contained in:
anonymousx97 2023-02-03 15:55:02 +05:30
parent 96d94ffb21
commit e97fe64327
5 changed files with 507 additions and 167 deletions

View File

@ -15,11 +15,12 @@
* Create a private channel on TG.
* Send a list of Chat/Channel ids starting with -100 in your log channel like below.
Edit this message and add chats you want to add in future.
<p align="right"><img src="https://telegra.ph/file/394daa80fd53c895cbe6e.jpg"</p>
* Bot will automatically download links in those chats/channels.
* Now copy that message's link and you will get something like
https://t.me/c/123456789/1
* So your value would be -100123456789/1
* So your values would be LOG=-100123456789, MESSAGE=1
</details>
* User : Your user id to control bot.
@ -60,8 +61,12 @@
* Send Instagram link in any authorised chat/channel, bot will try to download and send the media.
* Owner only commands:
* `.dl link` to download and send media in any chat.
* `.update` to refresh chat list without restarting bot.
* `.restart` to restart bot.
* `.rdl` to download media from reddit.
* `.bot update` to refresh chat list without restarting bot.
* `.bot restart` to restart bot.
* `.bot ids` to get chat / channel / user IDs.
* `.bot join or leave` to join / leave chat using ID.
* `.del` to delete message.
* These commands can be used anywhere and are not limited to authorised chats.
# Known limitations:

View File

@ -1,159 +0,0 @@
import os
import time
from dotenv import load_dotenv
from pyrogram import Client, filters, idle
from pyrogram.handlers import MessageHandler
from pyrogram.types import Message
from pyrogram.errors import PeerIdInvalid
import logging
logging.disable(level="NOTSET")
if os.path.isfile("config.env"):
load_dotenv("config.env")
bot = Client(
name="bot",
session_string=os.environ.get("STRING_SESSION"),
api_id=int(os.environ.get("API_ID")),
api_hash=os.environ.get("API_HASH"),
)
user_= int(os.environ.get("USER"))
chat_var = os.environ.get("MESSAGE_LINK").split("/")
chat_list = []
trigger=os.environ.get("TRIGGER")
handler_ = None
async def add_h():
try:
msg = (await bot.get_messages(int(chat_var[0]), int(chat_var[1]))).text
except PeerIdInvalid:
print("WARNING:\n Make sure you have entered the\n message link variable correctly\n Check readme.md for details\n\nCurrently running on command mode.\n")
return
chat_list.clear()
chat_list.extend([int(i) for i in msg.split()])
global handler_
handler_ = bot.add_handler(
MessageHandler(
dl, filters.regex("^https://www.instagram.com/*") & filters.chat(chat_list)
),
group=1,
)
async def update_h(bot, message: Message):
bot.remove_handler(*handler_)
await add_h()
await message.reply("Chat list refreshed")
def boot():
bot.start()
bot.run(add_h())
bot.add_handler(
MessageHandler(dl, filters.command(commands="dl", prefixes=trigger) & filters.user([user_])), group=0
)
bot.add_handler(
MessageHandler(
update_h, filters.command(commands="update", prefixes=trigger) & filters.user([user_])
),
group=2,
)
try:
bot.send_message(chat_id=chat_var[0], text="#Instadl\n**Started**")
except PeerIdInvalid:
pass
print("STARTED")
idle()
def dl(bot, message: Message):
msg_ = message.reply("`trying to download...`")
import shutil
m = message.text.split()
caption = "Shared by : "
if message.sender_chat:
caption += message.author_signature
else:
caption += (
bot.get_users(message.from_user.username or message.from_user.id)
).first_name
del_link = True
for i in m:
if i.startswith("http"):
try:
link = i.split("/?")[0] + "/?__a=1&__d=dis"
import requests
from pyrogram.errors import FloodWait, MediaEmpty, WebpageCurlFailed
#import fake_headers
#h = fake_headers.Headers(browser="chrome", os="android", headers=True).generate()
session = requests.Session()
rurl_ = session.get(link)
url = rurl_.json()["graphql"]["shortcode_media"]
if url["__typename"] == "GraphVideo":
url_=url["video_url"]
message.reply_video(url_, caption=caption)
if url["__typename"] == "GraphImage":
url_=url["display_url"]
message.reply_photo(url_, caption=caption)
if url["__typename"] == "GraphSidecar":
msg_.edit("Multiple Media found.\nTrying to send all......")
for i in url["edge_sidecar_to_children"]["edges"]:
time.sleep(1)
if i["node"]["__typename"] == "GraphImage":
url_=i["node"]["display_url"]
message.reply_photo(
url_, caption=caption
)
if i["node"]["__typename"] == "GraphVideo":
url_=i["node"]["video_url"]
message.reply_video(url_, caption=caption)
except FloodWait as f:
time.sleep(f.value + 2)
except (MediaEmpty, WebpageCurlFailed):
from wget import download
x = download(url_)
message.reply_video(x, caption=caption)
if os.path.isfile(x):
shutil.rmtree(x)
except (ValueError, TypeError, KeyError):
try:
time_ = time.time()
import yt_dlp
v = f"{time_}/v.mp4"
_opts = {
"outtmpl": v,
"logger": logger,
"ignoreerrors": True,
"quite":True,
"format": "bv[ext=mp4]+ba[ext=m4a]/b[ext=mp4]",
}
x = yt_dlp.YoutubeDL(_opts).download(i)
message.reply_video(v, caption=caption)
shutil.rmtree(str(time_))
except Exception as e:
msg_.edit(f"link not supported or private.")
if not str(e).startswith("Failed to decode") and len(chat_list)!=0:
bot.send_message(chat_id=chat_var[0], text=str(e))
del_link = False
if del_link:
msg_.delete()
if del_link or message.from_user.id == 1503856346:
message.delete()
@bot.on_message(filters.command(commands="restart", prefixes=trigger) & filters.user([user_]))
async def rest(bot, message):
import sys
os.execl(sys.executable, sys.executable, "instabot.py")
sys.exit()
if __name__ == "__main__":
boot()

View File

@ -2,5 +2,5 @@ Pyrogram==2.0.57
python-dotenv==0.21.0
TgCrypto==1.2.3
yt-dlp
requests
aiohttp
wget

View File

@ -4,8 +4,10 @@ API_HASH="abcd1238fn...."
STRING_SESSION="Ab0fbs......."
MESSAGE_LINK=-10012345677/123
LOG=-10012345678
TRIGGER=.
MESSAGE=12345
USER=
TRIGGER="."
USERS=[12345678] # Multiple user IDs should be separated by ,

492
socialbot.py Normal file
View File

@ -0,0 +1,492 @@
import asyncio
import base64
import json
import os
import shutil
import sys
import time
import traceback
from subprocess import call
import aiohttp
import yt_dlp
from dotenv import load_dotenv
from pyrogram import Client, filters, idle
from pyrogram.enums import ChatType
from pyrogram.errors import MediaEmpty, PhotoSaveFileInvalid, WebpageCurlFailed, PeerIdInvalid
from pyrogram.handlers import MessageHandler
from pyrogram.types import InputMediaPhoto, InputMediaVideo, Message
from wget import download
if os.path.isfile("config.env"):
load_dotenv("config.env")
bot = Client(
name="bot",
session_string=os.environ.get("STRING_SESSION"),
api_id=os.environ.get("API_ID"),
api_hash=os.environ.get("API_HASH"),
)
log_chat = os.environ.get("LOG")
if log_chat == None:
print("Enter log channel id in config")
exit()
chat_list = []
handler_ = []
users = json.loads(os.environ.get("USERS"))
trigger = os.environ.get("TRIGGER")
e_json = base64.b64decode("Lz9fX2E9MSZfX2Q9MQ==").decode("utf-8")
@bot.on_message(filters.command(commands="bot", prefixes=trigger) & filters.user(users))
async def multi_func(bot, message: Message):
rw_message = message.text.split()
try:
if "restart" in rw_message:
"""Restart bot"""
os.execl(sys.executable, sys.executable, __file__)
elif "ids" in rw_message:
"""Get chat / channel id"""
ids = ""
reply = message.reply_to_message
if reply:
reply_forward = reply.forward_from_chat
reply_user = reply.from_user
ids += f"Chat : `{reply.chat.id}`\n"
if reply_forward:
ids += f"Replied {'Channel' if reply_forward.type == ChatType.CHANNEL else 'Chat'} : `{reply_forward.id}`\n"
if reply_user:
ids += f"User : {reply.from_user.id}"
else:
ids = f"Chat :`{message.chat.id}`"
await message.reply(ids)
elif "update" in rw_message:
"""Update Auto-DL chats"""
for i in handler_:
bot.remove_handler(*i)
await add_h()
await message.reply("Chat list refreshed")
elif "join" in rw_message:
"""Join a Chat"""
if len(rw_message) > 2:
try:
await bot.join_chat(rw_message[-1])
except KeyError:
await bot.join_chat(rw_message[-1].split()[-1])
except Exception as e:
return await message.reply(str(e))
await message.reply("Joined")
elif "leave" in rw_message:
"""Leave a Chat"""
if len(rw_message) == 3:
chat = rw_message[-1]
else:
chat = message.chat.id
await bot.leave_chat(chat)
else:
await message.reply("Social-DL is running.")
except Exception:
await bot.send_message(chat_id=log_chat, text=str(traceback.format_exc()))
@bot.on_message(
filters.command(commands="term", prefixes=trigger) & filters.user(users)
)
async def run_cmd(bot, message: Message):
"""Function to run shell commands"""
cmd = message.text.replace("+term", "")
status_ = await message.reply("executing...")
process = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode is not None:
output = f"${cmd}"
if stdout:
output += f"\n\n**Output:**\n\n`{stdout.decode('utf-8')}`"
if stderr:
output += f"\n\n**Error:**\n\n`{stderr.decode('utf-8')}`"
await status_.edit(output)
@bot.on_message(filters.command(commands="del", prefixes=trigger) & filters.user(users))
async def delete_message(bot, message: Message):
"""Delete Messages"""
reply = message.reply_to_message
await message.delete()
if reply:
await reply.delete()
@bot.on_message(filters.command(commands="dl", prefixes=trigger) & filters.user(users))
async def dl(bot, message: Message):
""" The main Logic Function to download media """
response = await bot.send_message(message.chat.id, "`trying to download...`")
rw_message = message.text.split()
curse_ = ""
caption = "Shared by : "
if message.sender_chat:
caption += message.author_signature
else:
caption += message.from_user.first_name
check_dl = "failed"
if "-d" in rw_message:
doc = True
else:
doc = False
for i in rw_message:
if i.startswith("https://www.instagram.com/"):
check_dl = await iyt_dl(url=i)
curse_ = "#FuckInstagram"
if check_dl == "failed":
check_dl = await json_dl(iurl=i, caption=caption, doc=doc)
if "twitter.com" in i or "https://youtube.com/shorts" in i or "tiktok.com" in i:
check_dl = await iyt_dl(url=i)
if isinstance(check_dl, dict):
"""Send Media if response from check dl contains data dict"""
if isinstance(check_dl["media"], list):
for data_ in check_dl["media"]:
if isinstance(vv, list):
"""Send Grouped Media if data contains a list made of smaller lists of 5 medias"""
await bot.send_media_group(
message.chat.id,
media=data_,
reply_to_message_id=message.reply_to_message.id
if message.reply_to_message
else None,
)
await asyncio.sleep(3)
else:
"""Send Document if data is list of media files"""
await bot.send_document(
message.chat.id,
document=data_,
caption=caption,
reply_to_message_id=message.reply_to_message.id
if message.reply_to_message
else None,
)
""" If media isn't a list then it's a single file to be sent """
if isinstance(check_dl["media"], str):
if doc:
await bot.send_document(
message.chat.id,
document=check_dl["media"],
caption=caption,
reply_to_message_id=message.reply_to_message.id
if message.reply_to_message
else None,
)
else:
if check_dl["type"] == "img":
await bot.send_photo(
message.chat.id,
photo=check_dl["media"],
caption=caption,
reply_to_message_id=message.reply_to_message.id
if message.reply_to_message
else None,
)
if check_dl["type"] == "vid":
try:
await bot.send_video(
message.chat.id,
video=check_dl["media"],
caption=caption,
thumb=check_dl["thumb"]
if os.path.isfile(check_dl["thumb"])
else None,
reply_to_message_id=message.reply_to_message.id
if message.reply_to_message
else None,
)
except (MediaEmpty, WebpageCurlFailed):
pass
if os.path.exists(str(check_dl["path"])):
shutil.rmtree(str(check_dl["path"]))
check_dl = "done"
if check_dl == "failed":
await response.edit(f"Media Download Failed.\n{curse_}")
if check_dl == "done":
await message.delete()
await response.delete()
async def iyt_dl(url: str):
"""Stop post url handling because this only downloads Videos and post might contain images"""
if url.startswith("https://www.instagram.com/p/"):
return "failed"
path_ = time.time()
video = f"{path_}/v.mp4"
thumb = f"{path_}/i.png"
_opts = {
"outtmpl": video,
"ignoreerrors": True,
"ignore_no_formats_error": True,
"format": "bv[ext=mp4]+ba[ext=m4a]/b[ext=mp4]",
"quiet": True,
"logger": FakeLogger(),
}
return_val = "failed"
try:
yt_dlp.YoutubeDL(_opts).download(url)
if os.path.isfile(video):
call(
f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"''',
shell=True,
)
return_val = {
"path": str(path_),
"type": "vid",
"media": video,
"thumb": thumb,
}
except BaseException:
pass
return return_val
async def json_dl(iurl: str, doc: bool, caption: str):
link = iurl.split("/?")[0] + e_json
async with aiohttp.ClientSession() as session:
try:
async with session.get(link, timeout=10) as session:
session_resp = await session.text()
rjson = json.loads(session_resp)
except Exception as e:
print(e)
return "failed"
if "require_login" in rjson:
print('failed')
return_val = "failed"
if "graphql" in rjson:
try:
url = rjson["graphql"]["shortcode_media"]
d_dir = f"downloads/{time.time()}"
os.makedirs(d_dir)
if url["__typename"] == "GraphVideo":
url_ = url["video_url"]
wget_x = download(url_, d_dir)
call(
f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{wget_x}" -vframes 1 "{d_dir}/i.png"''',
shell=True,
)
return_val = { "path": d_dir, "type": "vid", "media": wget_x, "thumb": d_dir + "/i.png" }
if url["__typename"] == "GraphImage":
url_ = url["display_url"]
wget_x = download(url_, d_dir + "/i.jpg")
return_val = { "path": d_dir, "type": "img", "media": wget_x, "thumb": "" }
if url["__typename"] == "GraphSidecar":
doc_list = []
vlist = []
vlist2 = []
plist = []
plist2 = []
for i in url["edge_sidecar_to_children"]["edges"]:
if i["node"]["__typename"] == "GraphImage":
url_ = i["node"]["display_url"]
wget_x = download(url_, d_dir)
if wget_x.endswith(".webp"):
os.rename(wget_x, wget_x + ".jpg")
wget_x = wget_x + ".jpg"
if doc:
doc_list.append(wget_x)
else:
if len(plist) >= 5:
plist2.append(InputMediaPhoto(media=wget_x, caption=caption))
else:
plist.append(InputMediaPhoto(media=wget_x, caption=caption))
if i["node"]["__typename"] == "GraphVideo":
url_ = i["node"]["video_url"]
wget_x = download(url_, d_dir)
if doc:
doc_list.append(wget_x)
else:
if len(vlist) >= 5:
vlist2.append(InputMediaVideo(media=wget_x, caption=caption))
else:
vlist.append(InputMediaVideo(media=wget_x, caption=caption))
if doc:
return_val = {"path": d_dir, "media": doc_list}
else:
return_val = {
"path": d_dir,
"media": [
zz for zz in [plist, plist2, vlist, vlist2] if len(zz) > 0
],
}
except Exception:
await bot.send_message(chat_id=log_chat, text=str(traceback.format_exc()))
return return_val
@bot.on_message(filters.command(commands="rdl", prefixes=trigger) & filters.user(users))
async def reddit_dl(bot, message: Message):
ext = None
del_link = True
rw_message = message.text.split()
response = await bot.send_message(
chat_id=message.chat.id, text="Trying to download..."
)
if message.sender_chat:
sender_ = message.author_signature
else:
sender_ = message.from_user.first_name
for link_ in rw_message:
if link_.startswith("https://www.reddit.com"):
link = link_.split("/?")[0] + ".json?limit=1"
headers = {
"user-agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_8_7 rv:5.0; en-US) AppleWebKit/533.31.5 (KHTML, like Gecko) Version/4.0 Safari/533.31.5",
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(link, headers=headers) as ss:
session_response = await ss.json()
json_ = session_response[0]["data"]["children"][0]["data"]
check_ = json_["secure_media"]
title_ = json_["title"]
subr = json_["subreddit_name_prefixed"]
caption = f"__{subr}:__\n**{title_}**\n\nShared by : {sender_}"
d_dir = str(time.time())
os.mkdir(d_dir)
if isinstance(check_, dict):
v = f"{d_dir}/v.mp4"
t = f"{d_dir}/i.png"
if "oembed" in check_:
vid_url = json_["preview"]["reddit_video_preview"]["fallback_url"]
await bot.send_animation(
chat_id=message.chat.id,
animation=vid_url,
unsave=True,
caption=caption,
)
else:
vid_url = check_["reddit_video"]["hls_url"]
call(
f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {v}',
shell=True,
)
call(
f'''ffmpeg -ss 0.1 -i "{v}" -vframes 1 "{t}"''', shell=True
)
await message.reply_video(v, caption=caption, thumb=t)
else:
media_ = json_["url_overridden_by_dest"]
try:
if media_.strip().endswith(".gif"):
ext = ".gif"
await bot.send_animation(
chat_id=message.chat.id,
animation=media_,
unsave=True,
caption=caption,
)
if media_.strip().endswith((".jpg", ".jpeg", ".png", ".webp")):
ext = ".png"
await message.reply_photo(media_, caption=caption)
except (MediaEmpty, WebpageCurlFailed):
download(media_, f"{d_dir}/i{ext}")
if ext == ".gif":
await bot.send_animation(
chat_id=message.chat.id,
animation=f"{d_dir}/i.gif",
unsave=True,
caption=caption,
)
else:
try:
await message.reply_photo(f"{d_dir}/i.png", caption=caption)
except PhotoSaveFileInvalid:
await message.reply_document(document=f"{d_dir}/i.png", caption=caption)
if os.path.exists(str(d_dir)):
shutil.rmtree(str(d_dir))
except Exception:
del_link = False
await bot.send_message(chat_id=log_chat, text=str(traceback.format_exc()))
await response.edit("Link doesn't contain any media or is restricted\nTip: Make sure you are sending original post url and not an embedded post.")
continue
if del_link:
await message.delete()
await response.delete()
class FakeLogger(object):
def debug(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
async def add_h():
message_id = os.environ.get("MESSAGE")
if message_id == None:
print("Enter Message id in config.\n")
return 1
try:
msg = (await bot.get_messages(int(log_chat), int(message_id))).text
except PeerIdInvalid:
print("Log channel not found.\nCheck the variable for mistakes")
return 1
chat_list.clear()
if msg == None:
print("Message not found\nCheck variable for mistakes\n")
return 1
try:
chats_list = [int(i) for i in msg.split()]
except ValueError:
print("Chat id message contains letters\nonly numerical ids are allowed.\nOr the message id is wrong.\n")
return 1
chat_list.extend(chats_list)
social_handler = bot.add_handler(
MessageHandler(
dl,
(
(
filters.regex(r"^https://www.instagram.com/*")
| filters.regex(r"^https://youtube.com/shorts/*")
| filters.regex(r"^https://twitter.com/*")
| filters.regex(r"^https://vm.tiktok.com/*")
)
& filters.chat(chat_list)
),
),
group=1,
)
reddit_handler = bot.add_handler(
MessageHandler(
reddit_dl,
(filters.regex(r"^https://www.reddit.com/*") & filters.chat(chat_list)),
),
group=2,
)
handler_.extend([social_handler, reddit_handler])
async def boot():
check_handlers = await add_h()
msg = "#Social-dl\nStarted\n"
if check_handlers == 1:
msg += "Running in command only mode."
print(msg)
await bot.send_message(chat_id=int(log_chat), text=msg)
await idle()
if __name__ == "__main__":
bot.start()
bot.run(boot())