Social-DL V2

New Banner.
Added Credits and Disclaimer.

Refactored the bot into modules.
switched to Latest Pyrogram.
Switched to Custom Client Object.
switched to Custom Message Object.
Switched to Custom decorators.
Switched to Custom filters.
Switched to Classes for media extractors.
Droped Wget
Switched to native In_Memory_DL

Added Gallery-dl Library.
Added Twitter Image support.
Added Threads Support.
Added YouTube Videos up to 5 Minutes.
Added Tiktok Pics Support.

Added .help command.
Added .shell command with Live shell output.
Added Add-Del Sudo commands.
Added Add-Del Chat commands.
Added Block-Unblock Commands.
Added Banner In About message.
Added .reply as Echo command.
Added restart confirmation.
Catch Exceptions and log in the Log Channel.
This commit is contained in:
anonymousx97 2023-07-09 22:11:45 +05:30
parent cb07bbd4e0
commit 490d972ee3
28 changed files with 1944 additions and 626 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
__pycache__
config.env

128
README.md
View File

@ -1,37 +1,21 @@
# Light weight Social Media downloader bot.
* Supported Platforms:
* Videos: Instagram, Tiktok, Twitter, YouTube Shorts
* Images: Instagram, Reddit
* Gif : Reddit
# Social Media downloader bot.
> Formerly "Lightweight Bot" XD
## Usage and Commands:
* Send supported links in any authorised chat/channel.
Bot will try to download and send the media.
![Header Image](https://github.com/anonymousx97/social-dl/blob/main/assets/social_downloader.png?raw=true)
* Owner only commands:
* `.dl link` to download and send media in any chat.
* `.bot update` to refresh chat list without restarting bot.
* `.bot restart` to restart bot.
* `.bot ids` to get chat / channel / user IDs.
* `.bot join or leave` to join / leave chat using ID.
* `.del` reply to a message to delete.
* `.purge` to delete all messages between command and replied message.
<b>A Telegram User-Bot to Download Media from various websites and send in chat.</b>
These commands can be used anywhere.
### Supported Platforms:
- Facebook
- Instagram
- Reddit
- Threads.net
- Tiktok
- Twitter
- YouTube
* Developer Mode Commands:
* `.sh` to run shell commands.
Example: `.sh ls`
* `.exec` to run python code.
Example: `.exec print(1)`
These commands are dangerous and are disabled by default.
Add `DEV_MODE="yes"` in config to enable these.
## For android local deploy:
### For android local deploy:
* Download Latest [Termux](https://github.com/termux/termux-app/releases).
```bash
# Change Default repository of termux.
@ -41,10 +25,10 @@
yes|apt update && yes|apt upgrade
```
## Installation:
### Installation:
```bash
# Install required packages.
apt install -y python git python-pip ffmpeg
apt install -y python3 git curl python-pip ffmpeg
# Clone Repo.
git clone -q https://github.com/anonymousx97/social-dl
@ -60,52 +44,78 @@
nano config.env
```
## Config:
### Config:
* Get API_ID and API_HASH from https://my.telegram.org/auth .
* Generate String Session by running this in Termux:
```bash
bash -c "$(curl -fsSL https://raw.githubusercontent.com/ux-termux/string/main/Termux.sh)"
```
* It will ask you to choose pyrogram version. Select 2.
> It will ask you to choose pyrogram version. Select 2.
* <details>
<summary> Message_link : </summary>
* LOG_CHAT: Create A Log Channel and add it's id along with -100 at the beginning.
* API_KEYS: Optional Instagram scrapping keys from <a href=https://webscraping.ai/>API. </a> recommended to add if you wanna reduce Instagram dl failures.
* <details>
<summary>Tap here for The Message IDs : </summary>
Send 2 messages in your log channel, message text is an empty list : []
* Create a private channel on TG.
* Send a list of Chat/Channel ids starting with -100 in your log channel like below.
Edit this message and add chats you want to add in future.
<p align="right"><img src="https://telegra.ph/file/394daa80fd53c895cbe6e.jpg"</p>
* Bot will automatically download links in those chats/channels.
* Now copy that message's link and you will get something like
https://t.me/c/123456789/1
* So your values would be LOG=-100123456789 MESSAGE=1
* Copy the links of those messages.
* The last digits of these links are the message ids.
* These two are your AUTO_DL_MESSAGE_ID and BLOCKED_USERS_MESSAGE_ID DB.
* Add their IDs in config respectively.
Now send another message but this time include your id in the list: [12345678]
* Copy this message's link and add the message id in USERS_MESSAGE_ID var
</details>
* User : Your user id to control bot.
* Trigger : Trigger to access bot.
* Dev Mode: Set to 1 if you want access to exec, sh, shell commands.
> These commands can Dangerous if used carelessly, Turn on at your own risk.
> if set to 1 both you and sudo users can use these commands.
## Start bot
### Start bot
```bash
python socialbot.py
cd social-dl && python3 -m app
```
* If everything is correct you will get <b><i>Started</i></b> stdout in terminal and in your channel.
* Use `.help` to get list of commands in bot.
## Setup a quick run command for bot.
```bash
echo "alias runbot='cd social-dl && python socialbot.py'" >> ~/.bashrc && bash
```
Now you can run bot with `runbot`
## Known limitations:
### Known Instagram limitations:
* If deployed on a VPS or any server Instragram might block access to some content.
After hitting Instagram's rate limit image download might not work because servers and vps usually have static IP and Instagram would block access.
* Deploying it locally would solve all of those issues because most of us are likely to have dynamic IP.
Bot is made lightweight with local deploys in mind. But battery life will take some hit anyway.
* Logging in with your Instagram which would solve the rate-limit issues is not added and won't be added because 2 of my accounts were suspended till manual verification for using scrapping bots like these.
Bot <s>is</s> was made lightweight with local deploys in mind. But battery life will definitely take a hit.
* Logging in with your Instagram which would solve the rate-limit issues is not added and won't be added because 2 of my accounts were suspended till manual verification for using scrapping bots like these.
## Contact
* For any questions related to deploy or issues contact me on
* For any issues or questions related to deploy contact me on
[Telegram](https://t.me/anonymousx97)
# Special Thanks:
- [Dan](https://github.com/delivrance) for [Pyrogram](https://github.com/pyrogram/pyrogram)
- All Libraries used in the project.
- [Kakashi](https://github.com/AshwinStr) for the Banner and helping with coding concepts.
- [Userge-X](https://github.com/code-rgb/USERGE-X) and [UX-Jutsu](https://github.com/ashwinstr/ux-jutsu) for basic userbot concepts.
- [NotShroud](https://t.me/NotShroudX97) for getting me into userbot stuff.
- [Alicia Dark](https://github.com/Thegreatfoxxgoddess) for Social-DL Idea and inspiring / pushing me to explore modding TG bots.
- [IsthisUser](https://github.com/dishapatel010) for helping with Instagram and threads support.
- [Fnix](https://github.com/fnixdev), [Lucky Jain](https://github.com/lostb053), [Jeel Patel](https://t.me/jeelpatel231) for teaching/helping with code stuff and suggesting improvements.
- [Avinash Reddit](https://t.me/AvinashReddy3108) for suggesting the use of [Gallery-DL](https://github.com/mikf/gallery-dl)
# Disclaimer:
Social-DL provides a way for users to download and upload media. While I facilitate these actions, it is important to note that the content accessed, downloaded, or uploaded through the bot is entirely the responsibility of the users. I do not distribute or endorse any specific media content.
Users are solely responsible for the types of media they download or upload using the Bot. They should ensure that they have the legal right to access or share the media files in question and comply with all applicable copyright laws and regulations. I do not monitor or control the nature, legality, or appropriateness of the content exchanged through the Bot.
It is essential for users to exercise caution and use our service in accordance with the terms of service and relevant laws. Any activities performed by users utilizing the Bot are done at their own risk. I recommend users to respect intellectual property rights, adhere to copyright laws, and obtain proper permissions when necessary.

9
app/__init__.py Executable file
View File

@ -0,0 +1,9 @@
from dotenv import load_dotenv
load_dotenv("config.env")
from .config import Config
from .core.client import BOT
bot = BOT()

8
app/__main__.py Executable file
View File

@ -0,0 +1,8 @@
if __name__ == "__main__":
import tracemalloc
from app import bot
import app.social_dl
tracemalloc.start()
bot.run(bot.boot())

27
app/api/gallerydl.py Normal file
View File

@ -0,0 +1,27 @@
import asyncio
import glob
import os
import time
from app.core import shell
from app.core.scraper_config import ScraperConfig
class Gallery_DL(ScraperConfig):
def __init__(self, url):
super().__init__()
self.url = url
self.set_sauce(url)
async def download_or_extract(self):
self.path = "downloads/" + str(time.time())
os.makedirs(self.path)
try:
async with asyncio.timeout(30):
await shell.run_shell_cmd(f"gallery-dl -q --range '0-4' -D {self.path} '{self.url}'")
except TimeoutError:
pass
files = glob.glob(f"{self.path}/*")
if not files:
return self.cleanup()
self.link = self.success = True

65
app/api/instagram.py Executable file
View File

@ -0,0 +1,65 @@
import os
from urllib.parse import urlparse
from app import Config
from app.core import aiohttp_tools
from app.core.scraper_config import ScraperConfig
API_KEYS = { "KEYS": Config.API_KEYS , "counter": 0 }
class Instagram(ScraperConfig):
def __init__(self, url):
super().__init__()
shortcode = os.path.basename(urlparse(url).path.rstrip("/"))
self.url = (
f"https://www.instagram.com/graphql/query?query_hash=2b0673e0dc4580674a88d426fe00ea90&variables=%7B%22shortcode%22%3A%22{shortcode}%22%7D"
)
self.set_sauce(url)
async def download_or_extract(self):
for func in [self.no_api_dl, self.api_dl]:
if await func():
self.success = True
break
async def no_api_dl(self):
response = await aiohttp_tools.get_json(url=self.url)
if not response or "data" not in response or not response["data"]["shortcode_media"]:
return
return await self.parse_ghraphql(response["data"]["shortcode_media"])
async def api_dl(self):
param = {"api_key": await self.get_key(), "url": self.url, "proxy": "residential", "js": False}
response = await aiohttp_tools.get_json(url="https://api.webscraping.ai/html", timeout=30, params=param)
if not response or "data" not in response or not response["data"]["shortcode_media"]:
return
self.caption = ".."
return await self.parse_ghraphql(response["data"]["shortcode_media"])
async def parse_ghraphql(self, json_: dict):
type_check = json_.get("__typename", None)
if not type_check:
return
elif type_check == "GraphSidecar":
self.link = [i["node"].get("video_url") or i["node"].get("display_url") for i in json_["edge_sidecar_to_children"]["edges"]]
self.group = True
else:
if link := json_.get("video_url"):
self.link = link
self.thumb = json_.get("display_url")
self.video = True
else:
self.link = json_.get("display_url")
self.photo = True
return self.link
# Rotating Key function to avoid hitting limit on single Key
async def get_key(self):
keys, count = API_KEYS
count += 1
if count == len(keys):
count = 0
ret_key = keys[count]
API_KEYS["counter"] = count
return ret_key

52
app/api/reddit.py Executable file
View File

@ -0,0 +1,52 @@
import os
import time
from urllib.parse import urlparse
from app.core import aiohttp_tools, shell
from app.core.scraper_config import ScraperConfig
class Reddit(ScraperConfig):
def __init__(self, url):
super().__init__()
self.set_sauce(url)
parsed_url = urlparse(url)
self.url = f"https://www.reddit.com{parsed_url.path}.json?limit=1"
async def download_or_extract(self):
headers = {
"user-agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_8_7 rv:5.0; en-US) AppleWebKit/533.31.5 (KHTML, like Gecko) Version/4.0 Safari/533.31.5"
}
response = await aiohttp_tools.get_json(url=self.url, headers=headers, json_=True)
if not response:
return
try:
json_ = response[0]["data"]["children"][0]["data"]
except BaseException:
return
self.caption = f"""__{json_["subreddit_name_prefixed"]}:__\n**{json_["title"]}**"""
is_vid, is_gallery = json_.get("is_video"), json_.get("is_gallery")
if is_vid:
self.path = "downloads/" + str(time.time())
os.makedirs(self.path)
self.link = f"{self.path}/v.mp4"
vid_url = json_["secure_media"]["reddit_video"]["hls_url"]
await shell.run_shell_cmd(f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {self.link}')
self.thumb = await shell.take_ss(video=self.link, path=self.path)
self.video = self.success = True
elif is_gallery:
self.link = [val["s"].get("u", val["s"].get("gif")).replace("preview", "i") for val in json_["media_metadata"].values()]
self.group = self.success = True
else:
self.link = json_.get("preview", {}).get("reddit_video_preview", {}).get("fallback_url", json_.get("url_overridden_by_dest", "")).strip()
if self.link.endswith(".gif"):
self.gif = self.success = True
else:
self.photo = self.success = True

30
app/api/threads.py Normal file
View File

@ -0,0 +1,30 @@
import os
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from app.core import aiohttp_tools
from app.core.scraper_config import ScraperConfig
class Threads(ScraperConfig):
def __init__(self, url):
super().__init__()
self.url = url
self.set_sauce(url)
async def download_or_extract(self):
shortcode = os.path.basename(urlparse(self.url).path.rstrip("/"))
response = await (await aiohttp_tools.SESSION.get(f"https://www.threads.net/t/{shortcode}/embed/")).text()
soup = BeautifulSoup(response, "html.parser")
if div := soup.find("div", {"class": "SingleInnerMediaContainer"}):
if video := div.find("video"):
self.link = video.find("source").get("src")
self.video = self.success = True
elif image := div.find("img", {"class": "img"}):
self.link = image.get("src")
self.photo = self.success = True

23
app/api/tiktok.py Executable file
View File

@ -0,0 +1,23 @@
from app.api.tiktok_scraper import Scraper as Tiktok_Scraper
from app.core.scraper_config import ScraperConfig
tiktok_scraper = Tiktok_Scraper(quiet=True)
class Tiktok(ScraperConfig):
def __init__(self, url):
super().__init__()
self.url = url
self.set_sauce(url)
async def download_or_extract(self):
media = await tiktok_scraper.hybrid_parsing(self.url)
if not media or "status" not in media or media["status"] == "failed":
return
if "video_data" in media:
self.link = media["video_data"]["nwm_video_url_HQ"]
self.thumb = media["cover_data"]["dynamic_cover"]["url_list"][0]
self.video = self.success = True
if "image_data" in media:
self.link = media["image_data"]["no_watermark_image_list"]
self.group = self.success = True

682
app/api/tiktok_scraper.py Normal file
View File

@ -0,0 +1,682 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# @Author: https://github.com/Evil0ctal/
# @Time: 2021/11/06
# @Update: 2023/03/08
# @Version: 3.3.0
# @Function:
# 核心代码估值1块(๑•̀ㅂ•́)و✧
# 用于爬取Douyin/TikTok数据并以字典形式返回。
# input link, output dictionary.
import asyncio
import configparser
import os
import platform
import re
import time
import traceback
import urllib.parse
from typing import Union
import aiohttp
import execjs
from tenacity import *
quiet_mode = False
class Scraper:
"""__________________________________________⬇initialization(初始化)⬇______________________________________"""
# 初始化/initialization
def __init__(self, quiet: bool = False):
global quiet_mode
quiet_mode = quiet
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36"
}
self.douyin_api_headers = {
"accept-encoding": "gzip, deflate, br",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"referer": "https://www.douyin.com/",
# 'cookie': "s_v_web_id=verify_leytkxgn_kvO5kOmO_SdMs_4t1o_B5ml_BUqtWM1mP6BF;"
}
self.tiktok_api_headers = {
"User-Agent": "com.ss.android.ugc.trill/494+Mozilla/5.0+(Linux;+Android+12;+2112123G+Build/SKQ1.211006.001;+wv)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Version/4.0+Chrome/107.0.5304.105+Mobile+Safari/537.36"
}
# 判断配置文件是否存在/Check if the configuration file exists
if os.path.exists("config.ini"):
self.config = configparser.ConfigParser()
self.config.read("config.ini", encoding="utf-8")
# 判断是否使用代理
if self.config["Scraper"]["Proxy_switch"] == "True":
# 判断是否区别协议选择代理
if self.config["Scraper"]["Use_different_protocols"] == "False":
self.proxies = {"all": self.config["Scraper"]["All"]}
else:
self.proxies = {"http": self.config["Scraper"]["Http_proxy"], "https": self.config["Scraper"]["Https_proxy"]}
else:
self.proxies = None
# 配置文件不存在则不使用代理/If the configuration file does not exist, do not use
# the proxy
else:
self.proxies = None
# 针对Windows系统的异步事件规则/Asynchronous event rules for Windows systems
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
"""__________________________________________⬇utils(实用程序)⬇______________________________________"""
# 检索字符串中的链接/Retrieve links from string
@staticmethod
def get_url(text: str) -> Union[str, None]:
try:
# 从输入文字中提取索引链接存入列表/Extract index links from input text and store in
# list
url = re.findall("http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+", text)
# 判断是否有链接/Check if there is a link
if len(url) > 0:
return url[0]
except Exception as e:
if not quiet_mode:
print("Error in get_url:", e)
return None
# 生成X-Bogus签名/Generate X-Bogus signature
@staticmethod
def generate_x_bogus_url(url: str, headers: dict) -> str:
query = urllib.parse.urlparse(url).query
xbogus = execjs.compile(open("./X-Bogus.js").read()).call("sign", query, headers["User-Agent"])
new_url = url + "&X-Bogus=" + xbogus
return new_url
# 转换链接/convert url
@retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
async def convert_share_urls(self, url: str) -> Union[str, None]:
"""
用于将分享链接(短链接)转换为原始链接/Convert share links (short links) to original links
:return: 原始链接/Original link
"""
# 检索字符串中的链接/Retrieve links from string
url = self.get_url(url)
# 判断是否有链接/Check if there is a link
if url is None:
if not quiet_mode:
print("无法检索到链接/Unable to retrieve link")
return None
# 判断是否为抖音分享链接/judge if it is a douyin share link
if "douyin" in url:
"""
抖音视频链接类型(不全)
1. https://v.douyin.com/MuKhKn3/
2. https://www.douyin.com/video/7157519152863890719
3. https://www.iesdouyin.com/share/video/7157519152863890719/?region=CN&mid=7157519152863890719&u_code=ffe6jgjg&titleType=title&timestamp=1600000000&utm_source=copy_link&utm_campaign=client_share&utm_medium=android&app=aweme&iid=123456789&share_id=123456789
抖音用户链接类型(不全)
1. https://www.douyin.com/user/MS4wLjABAAAAbLMPpOhVk441et7z7ECGcmGrK42KtoWOuR0_7pLZCcyFheA9__asY-kGfNAtYqXR?relation=0&vid=7157519152863890719
2. https://v.douyin.com/MuKoFP4/
抖音直播链接类型(不全)
1. https://live.douyin.com/88815422890
"""
if "v.douyin" in url:
# 转换链接/convert url
# 例子/Example: https://v.douyin.com/rLyAJgf/8.74
url = re.compile(r"(https://v.douyin.com/)\w+", re.I).match(url).group()
if not quiet_mode:
print("正在通过抖音分享链接获取原始链接...")
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self.headers, proxy=self.proxies, allow_redirects=False, timeout=10) as response:
if response.status == 302:
url = (
response.headers["Location"].split("?")[0]
if "?" in response.headers["Location"]
else response.headers["Location"]
)
if not quiet_mode:
print("获取原始链接成功, 原始链接为: {}".format(url))
return url
except Exception as e:
if not quiet_mode:
print("获取原始链接失败!")
print(e)
# return None
raise e
else:
if not quiet_mode:
print("该链接为原始链接,无需转换,原始链接为: {}".format(url))
return url
# 判断是否为TikTok分享链接/judge if it is a TikTok share link
elif "tiktok" in url:
"""
TikTok视频链接类型(不全)
1. https://www.tiktok.com/@tiktok/video/6950000000000000000
2. https://www.tiktok.com/t/ZTRHcXS2C/
TikTok用户链接类型(不全)
1. https://www.tiktok.com/@tiktok
"""
if "@" in url:
if not quiet_mode:
print("该链接为原始链接,无需转换,原始链接为: {}".format(url))
return url
else:
if not quiet_mode:
print("正在通过TikTok分享链接获取原始链接...")
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self.headers, proxy=self.proxies, allow_redirects=False, timeout=10) as response:
if response.status == 301:
url = (
response.headers["Location"].split("?")[0]
if "?" in response.headers["Location"]
else response.headers["Location"]
)
if not quiet_mode:
print("获取原始链接成功, 原始链接为: {}".format(url))
return url
except Exception as e:
if not quiet_mode:
print("获取原始链接失败!")
print(e)
return None
"""__________________________________________⬇Douyin methods(抖音方法)⬇______________________________________"""
"""
Credits: https://github.com/Johnserf-Seed
[中文]
感谢John为本项目提供了非常多的帮助
大家可以去他的仓库点个star :)
顺便打个广告, 如果需要更稳定快速长期维护的抖音/TikTok API, 或者需要更多的数据APP端,
请移步: https://api.tikhub.io
[English]
Thanks to John for providing a lot of help to this project
You can go to his repository and give him a star :)
By the way, if you need a more stable, fast and long-term maintenance Douyin/TikTok API, or need more data (APP side),
Please go to: https://api.tikhub.io
"""
# 生成抖音X-Bogus签名/Generate Douyin X-Bogus signature
# 下面的代码不能保证稳定性,随时可能失效/ The code below cannot guarantee stability and may
# fail at any time
def generate_x_bogus_url(self, url: str) -> str:
"""
生成抖音X-Bogus签名
:param url: 视频链接
:return: 包含X-Bogus签名的URL
"""
# 调用JavaScript函数
query = urllib.parse.urlparse(url).query
xbogus = execjs.compile(open("./X-Bogus.js").read()).call("sign", query, self.headers["User-Agent"])
if not quiet_mode:
print("生成的X-Bogus签名为: {}".format(xbogus))
new_url = url + "&X-Bogus=" + xbogus
return new_url
# 获取抖音视频ID/Get Douyin video ID
async def get_douyin_video_id(self, original_url: str) -> Union[str, None]:
"""
获取视频id
:param original_url: 视频链接
:return: 视频id
"""
# 正则匹配出视频ID
try:
video_url = await self.convert_share_urls(original_url)
# 链接类型:
# 视频页 https://www.douyin.com/video/7086770907674348841
if "/video/" in video_url:
key = re.findall("/video/(\d+)?", video_url)[0]
if not quiet_mode:
print("获取到的抖音视频ID为: {}".format(key))
return key
# 发现页 https://www.douyin.com/discover?modal_id=7086770907674348841
elif "discover?" in video_url:
key = re.findall("modal_id=(\d+)", video_url)[0]
if not quiet_mode:
print("获取到的抖音视频ID为: {}".format(key))
return key
# 直播页
elif "live.douyin" in video_url:
# https://live.douyin.com/1000000000000000000
video_url = video_url.split("?")[0] if "?" in video_url else video_url
key = video_url.replace("https://live.douyin.com/", "")
if not quiet_mode:
print("获取到的抖音直播ID为: {}".format(key))
return key
# note
elif "note" in video_url:
# https://www.douyin.com/note/7086770907674348841
key = re.findall("/note/(\d+)?", video_url)[0]
if not quiet_mode:
print("获取到的抖音笔记ID为: {}".format(key))
return key
except Exception as e:
if not quiet_mode:
print("获取抖音视频ID出错了:{}".format(e))
return None
# 获取单个抖音视频数据/Get single Douyin video data
@retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
async def get_douyin_video_data(self, video_id: str) -> Union[dict, None]:
"""
:param video_id: str - 抖音视频id
:return:dict - 包含信息的字典
"""
if not quiet_mode:
print("正在获取抖音视频数据...")
try:
# 构造访问链接/Construct the access link
api_url = f"https://www.douyin.com/aweme/v1/web/aweme/detail/?device_platform=webapp&aid=6383&channel=channel_pc_web&aweme_id={video_id}&pc_client_type=1&version_code=190500&version_name=19.5.0&cookie_enabled=true&screen_width=1344&screen_height=756&browser_language=zh-CN&browser_platform=Win32&browser_name=Firefox&browser_version=110.0&browser_online=true&engine_name=Gecko&engine_version=109.0&os_name=Windows&os_version=10&cpu_core_num=16&device_memory=&platform=PC&webid=7158288523463362079&msToken=abL8SeUTPa9-EToD8qfC7toScSADxpg6yLh2dbNcpWHzE0bT04txM_4UwquIcRvkRb9IU8sifwgM1Kwf1Lsld81o9Irt2_yNyUbbQPSUO8EfVlZJ_78FckDFnwVBVUVK"
api_url = self.generate_x_bogus_url(api_url)
# 访问API/Access API
if not quiet_mode:
print("正在获取视频数据API: {}".format(api_url))
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=self.douyin_api_headers, proxy=self.proxies, timeout=10) as response:
response = await response.json()
# 获取视频数据/Get video data
video_data = response["aweme_detail"]
if not quiet_mode:
print("获取视频数据成功!")
# print("抖音API返回数据: {}".format(video_data))
return video_data
except Exception as e:
if not quiet_mode:
print("获取抖音视频数据失败!原因:{}".format(e))
# return None
raise e
# 获取单个抖音直播视频数据/Get single Douyin Live video data
# 暂时不可用,待修复。
@retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
async def get_douyin_live_video_data(self, web_rid: str) -> Union[dict, None]:
if not quiet_mode:
print("正在获取抖音视频数据...")
try:
# 构造访问链接/Construct the access link
api_url = f"https://live.douyin.com/webcast/web/enter/?aid=6383&web_rid={web_rid}"
# 访问API/Access API
if not quiet_mode:
print("正在获取视频数据API: {}".format(api_url))
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=self.douyin_api_headers, proxy=self.proxies, timeout=10) as response:
response = await response.json()
# 获取视频数据/Get video data
video_data = response["data"]
if not quiet_mode:
print(video_data)
print("获取视频数据成功!")
# print("抖音API返回数据: {}".format(video_data))
return video_data
except Exception as e:
if not quiet_mode:
print("获取抖音视频数据失败!原因:{}".format(e))
# return None
raise e
# 获取单个抖音视频数据/Get single Douyin video data
@retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
async def get_douyin_user_profile_videos(self, profile_url: str, tikhub_token: str) -> Union[dict, None]:
try:
api_url = f"https://api.tikhub.io/douyin_profile_videos/?douyin_profile_url={profile_url}&cursor=0&count=20"
_headers = {"Authorization": f"Bearer {tikhub_token}"}
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
response = await response.json()
return response
except Exception as e:
if not quiet_mode:
print("获取抖音视频数据失败!原因:{}".format(e))
# return None
raise e
# 获取抖音主页点赞视频数据/Get Douyin profile like video data
@retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
async def get_douyin_profile_liked_data(self, profile_url: str, tikhub_token: str) -> Union[dict, None]:
try:
api_url = f"https://api.tikhub.io/douyin_profile_liked_videos/?douyin_profile_url={profile_url}&cursor=0&count=20"
_headers = {"Authorization": f"Bearer {tikhub_token}"}
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
response = await response.json()
return response
except Exception as e:
if not quiet_mode:
print("获取抖音视频数据失败!原因:{}".format(e))
# return None
raise e
# 获取抖音视频评论数据/Get Douyin video comment data
@retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
async def get_douyin_video_comments(self, video_url: str, tikhub_token: str) -> Union[dict, None]:
try:
api_url = f"https://api.tikhub.io/douyin_video_comments/?douyin_video_url={video_url}&cursor=0&count=20"
_headers = {"Authorization": f"Bearer {tikhub_token}"}
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
response = await response.json()
return response
except Exception as e:
if not quiet_mode:
print("获取抖音视频数据失败!原因:{}".format(e))
# return None
raise e
"""__________________________________________⬇TikTok methods(TikTok方法)⬇______________________________________"""
# 获取TikTok视频ID/Get TikTok video ID
async def get_tiktok_video_id(self, original_url: str) -> Union[str, None]:
"""
获取视频id
:param original_url: 视频链接
:return: 视频id
"""
try:
# 转换链接/Convert link
original_url = await self.convert_share_urls(original_url)
# 获取视频ID/Get video ID
if "/video/" in original_url:
video_id = re.findall("/video/(\d+)", original_url)[0]
elif "/v/" in original_url:
video_id = re.findall("/v/(\d+)", original_url)[0]
if not quiet_mode:
print("获取到的TikTok视频ID是{}".format(video_id))
# 返回视频ID/Return video ID
return video_id
except Exception as e:
if not quiet_mode:
print("获取TikTok视频ID出错了:{}".format(e))
return None
@retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
async def get_tiktok_video_data(self, video_id: str) -> Union[dict, None]:
"""
获取单个视频信息
:param video_id: 视频id
:return: 视频信息
"""
if not quiet_mode:
print("正在获取TikTok视频数据...")
try:
# 构造访问链接/Construct the access link
api_url = f"https://api16-normal-c-useast1a.tiktokv.com/aweme/v1/feed/?aweme_id={video_id}"
if not quiet_mode:
print("正在获取视频数据API: {}".format(api_url))
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=self.tiktok_api_headers, proxy=self.proxies, timeout=10) as response:
response = await response.json()
video_data = response["aweme_list"][0]
if not quiet_mode:
print("获取视频信息成功!")
return video_data
except Exception as e:
if not quiet_mode:
print("获取视频信息失败!原因:{}".format(e))
# return None
raise e
@retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
async def get_tiktok_user_profile_videos(self, tiktok_video_url: str, tikhub_token: str) -> Union[dict, None]:
try:
api_url = f"https://api.tikhub.io/tiktok_profile_videos/?tiktok_video_url={tiktok_video_url}&cursor=0&count=20"
_headers = {"Authorization": f"Bearer {tikhub_token}"}
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
response = await response.json()
return response
except Exception as e:
if not quiet_mode:
print("获取抖音视频数据失败!原因:{}".format(e))
# return None
raise e
@retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
async def get_tiktok_user_profile_liked_videos(self, tiktok_video_url: str, tikhub_token: str) -> Union[dict, None]:
try:
api_url = f"https://api.tikhub.io/tiktok_profile_liked_videos/?tiktok_video_url={tiktok_video_url}&cursor=0&count=20"
_headers = {"Authorization": f"Bearer {tikhub_token}"}
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
response = await response.json()
return response
except Exception as e:
if not quiet_mode:
print("获取抖音视频数据失败!原因:{}".format(e))
# return None
raise e
"""__________________________________________⬇Hybrid methods(混合方法)⬇______________________________________"""
# 自定义获取数据/Custom data acquisition
async def hybrid_parsing(self, video_url: str) -> dict:
# URL平台判断/Judge URL platform
url_platform = "douyin" if "douyin" in video_url else "tiktok"
if not quiet_mode:
print("当前链接平台为:{}".format(url_platform))
# 获取视频ID/Get video ID
print("正在获取视频ID...")
video_id = await self.get_douyin_video_id(video_url) if url_platform == "douyin" else await self.get_tiktok_video_id(video_url)
if video_id:
if not quiet_mode:
print("获取视频ID成功,视频ID为:{}".format(video_id))
# 获取视频数据/Get video data
print("正在获取视频数据...")
data = await self.get_douyin_video_data(video_id) if url_platform == "douyin" else await self.get_tiktok_video_data(video_id)
if data:
if not quiet_mode:
print("获取视频数据成功,正在判断数据类型...")
url_type_code = data["aweme_type"]
"""以下为抖音/TikTok类型代码/Type code for Douyin/TikTok"""
url_type_code_dict = {
# 抖音/Douyin
2: "image",
4: "video",
68: "image",
# TikTok
0: "video",
51: "video",
55: "video",
58: "video",
61: "video",
150: "image",
}
# 获取视频类型/Get video type
# 如果类型代码不存在,则默认为视频类型/If the type code does not exist, it is
# assumed to be a video type
url_type = url_type_code_dict.get(url_type_code, "video")
if not quiet_mode:
print("数据类型代码: {}".format(url_type_code))
# 判断链接类型/Judge link type
print("数据类型: {}".format(url_type))
print("准备开始判断并处理数据...")
"""
以下为(视频||图片)数据处理的四个方法,如果你需要自定义数据处理请在这里修改.
The following are four methods of (video || image) data processing.
If you need to customize data processing, please modify it here.
"""
"""
创建已知数据字典(索引相同)稍后使用.update()方法更新数据
Create a known data dictionary (index the same),
and then use the .update() method to update the data
"""
result_data = {
"status": "success",
"message": "更多接口请查看(More API see): https://api.tikhub.io/docs",
"type": url_type,
"platform": url_platform,
"aweme_id": video_id,
"official_api_url": {
"User-Agent": self.headers["User-Agent"],
"api_url": f"https://www.iesdouyin.com/aweme/v1/web/aweme/detail/?aweme_id={video_id}&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333&Github=Evil0ctal&words=FXXK_U_ByteDance",
}
if url_platform == "douyin"
else {
"User-Agent": self.tiktok_api_headers["User-Agent"],
"api_url": f"https://api16-normal-c-useast1a.tiktokv.com/aweme/v1/feed/?aweme_id={video_id}",
},
"desc": data.get("desc"),
"create_time": data.get("create_time"),
"author": data.get("author"),
"music": data.get("music"),
"statistics": data.get("statistics"),
"cover_data": {
"cover": data.get("video").get("cover"),
"origin_cover": data.get("video").get("origin_cover"),
"dynamic_cover": data.get("video").get("dynamic_cover"),
},
"hashtags": data.get("text_extra"),
}
# 创建一个空变量,稍后使用.update()方法更新数据/Create an empty variable and use
# the .update() method to update the data
api_data = None
# 判断链接类型并处理数据/Judge link type and process data
try:
# 抖音数据处理/Douyin data processing
if url_platform == "douyin":
# 抖音视频数据处理/Douyin video data processing
if url_type == "video":
if not quiet_mode:
print("正在处理抖音视频数据...")
# 将信息储存在字典中/Store information in a dictionary
uri = data["video"]["play_addr"]["uri"]
wm_video_url = data["video"]["play_addr"]["url_list"][0]
wm_video_url_HQ = f"https://aweme.snssdk.com/aweme/v1/playwm/?video_id={uri}&radio=1080p&line=0"
nwm_video_url = wm_video_url.replace("playwm", "play")
nwm_video_url_HQ = f"https://aweme.snssdk.com/aweme/v1/play/?video_id={uri}&ratio=1080p&line=0"
api_data = {
"video_data": {
"wm_video_url": wm_video_url,
"wm_video_url_HQ": wm_video_url_HQ,
"nwm_video_url": nwm_video_url,
"nwm_video_url_HQ": nwm_video_url_HQ,
}
}
# 抖音图片数据处理/Douyin image data processing
elif url_type == "image":
if not quiet_mode:
print("正在处理抖音图片数据...")
# 无水印图片列表/No watermark image list
no_watermark_image_list = []
# 有水印图片列表/With watermark image list
watermark_image_list = []
# 遍历图片列表/Traverse image list
for i in data["images"]:
no_watermark_image_list.append(i["url_list"][0])
watermark_image_list.append(i["download_url_list"][0])
api_data = {
"image_data": {"no_watermark_image_list": no_watermark_image_list, "watermark_image_list": watermark_image_list}
}
# TikTok数据处理/TikTok data processing
elif url_platform == "tiktok":
# TikTok视频数据处理/TikTok video data processing
if url_type == "video":
if not quiet_mode:
print("正在处理TikTok视频数据...")
# 将信息储存在字典中/Store information in a dictionary
wm_video = data["video"]["download_addr"]["url_list"][0]
api_data = {
"video_data": {
"wm_video_url": wm_video,
"wm_video_url_HQ": wm_video,
"nwm_video_url": data["video"]["play_addr"]["url_list"][0],
"nwm_video_url_HQ": data["video"]["bit_rate"][0]["play_addr"]["url_list"][0],
}
}
# TikTok图片数据处理/TikTok image data processing
elif url_type == "image":
if not quiet_mode:
print("正在处理TikTok图片数据...")
# 无水印图片列表/No watermark image list
no_watermark_image_list = []
# 有水印图片列表/With watermark image list
watermark_image_list = []
for i in data["image_post_info"]["images"]:
no_watermark_image_list.append(i["display_image"]["url_list"][0])
watermark_image_list.append(i["owner_watermark_image"]["url_list"][0])
api_data = {
"image_data": {"no_watermark_image_list": no_watermark_image_list, "watermark_image_list": watermark_image_list}
}
# 更新数据/Update data
result_data.update(api_data)
# print("数据处理完成,最终数据: \n{}".format(result_data))
# 返回数据/Return data
return result_data
except Exception as e:
if not quiet_mode:
traceback.print_exc()
print("数据处理失败!")
return {"status": "failed", "message": "数据处理失败!/Data processing failed!"}
else:
if not quiet_mode:
print("[抖音|TikTok方法]返回数据为空,无法处理!")
return {"status": "failed", "message": "返回数据为空,无法处理!/Return data is empty and cannot be processed!"}
else:
if not quiet_mode:
print("获取视频ID失败")
return {"status": "failed", "message": "获取视频ID失败/Failed to get video ID!"}
# 处理数据方便快捷指令使用/Process data for easy-to-use shortcuts
@staticmethod
def hybrid_parsing_minimal(data: dict) -> dict:
# 如果数据获取成功/If the data is successfully obtained
if data["status"] == "success":
result = {
"status": "success",
"message": data.get("message"),
"platform": data.get("platform"),
"type": data.get("type"),
"desc": data.get("desc"),
"wm_video_url": data["video_data"]["wm_video_url"] if data["type"] == "video" else None,
"wm_video_url_HQ": data["video_data"]["wm_video_url_HQ"] if data["type"] == "video" else None,
"nwm_video_url": data["video_data"]["nwm_video_url"] if data["type"] == "video" else None,
"nwm_video_url_HQ": data["video_data"]["nwm_video_url_HQ"] if data["type"] == "video" else None,
"no_watermark_image_list": data["image_data"]["no_watermark_image_list"] if data["type"] == "image" else None,
"watermark_image_list": data["image_data"]["watermark_image_list"] if data["type"] == "image" else None,
}
return result
else:
return data
"""__________________________________________⬇Test methods(测试方法)⬇______________________________________"""
async def async_test(_douyin_url: str = None, _tiktok_url: str = None) -> None:
# 异步测试/Async test
start_time = time.time()
print("正在进行异步测试...")
print("正在测试异步获取抖音视频ID方法...")
douyin_id = await api.get_douyin_video_id(_douyin_url)
print("正在测试异步获取抖音视频数据方法...")
await api.get_douyin_video_data(douyin_id)
print("正在测试异步获取TikTok视频ID方法...")
tiktok_id = await api.get_tiktok_video_id(_tiktok_url)
print("正在测试异步获取TikTok视频数据方法...")
await api.get_tiktok_video_data(tiktok_id)
print("正在测试异步混合解析方法...")
await api.hybrid_parsing(_douyin_url)
await api.hybrid_parsing(_tiktok_url)
# 总耗时/Total time
total_time = round(time.time() - start_time, 2)
print("异步测试完成,总耗时: {}s".format(total_time))
if __name__ == "__main__":
api = Scraper()
# 运行测试
# params = "device_platform=webapp&aid=6383&channel=channel_pc_web&aweme_id=7153585499477757192&pc_client_type=1&version_code=190500&version_name=19.5.0&cookie_enabled=true&screen_width=1344&screen_height=756&browser_language=zh-CN&browser_platform=Win32&browser_name=Firefox&browser_version=110.0&browser_online=true&engine_name=Gecko&engine_version=109.0&os_name=Windows&os_version=10&cpu_core_num=16&device_memory=&platform=PC&webid=7158288523463362079"
# api.generate_x_bogus(params)
douyin_url = "https://v.douyin.com/rLyrQxA/6.66"
tiktok_url = "https://vt.tiktok.com/ZSRwWXtdr/"
asyncio.run(async_test(_douyin_url=douyin_url, _tiktok_url=tiktok_url))

61
app/api/ytdl.py Executable file
View File

@ -0,0 +1,61 @@
import asyncio
import os
import time
import yt_dlp
from app.core.scraper_config import ScraperConfig
from app.core.shell import take_ss
# To disable YT-DLP logging
# https://github.com/ytdl-org/youtube-dl/blob/fa7f0effbe4e14fcf70e1dc4496371c9862b64b9/test/helper.py#L92
class FakeLogger(object):
def debug(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
class YT_DL(ScraperConfig):
def __init__(self, url):
super().__init__()
self.set_sauce(url)
self.url = url
self.path = "downloads/" + str(time.time())
self.video_path = self.path + "/v.mp4"
self._opts = {
"outtmpl": self.video_path,
"ignoreerrors": True,
"ignore_no_formats_error": True,
"quiet": True,
"logger": FakeLogger(),
"noplaylist": True,
"format": "best[ext=mp4]",
}
async def download_or_extract(self):
if "youtu" in self.url:
self._opts["format"] = "bv[ext=mp4][res=480]+ba[ext=m4a]/b[ext=mp4]"
if "shorts" in self.url:
self._opts["format"] = "bv[ext=mp4][res=720]+ba[ext=m4a]/b[ext=mp4]"
yt_obj = yt_dlp.YoutubeDL(self._opts)
info = yt_obj.extract_info(self.url, download=False)
if info.get("duration", 0) >= 300:
return
await asyncio.to_thread(yt_obj.download, self.url)
if "youtu" in self.url:
self.caption = f"""__{info.get("channel","")}__:\n**{info.get("title","")}**"""
if os.path.isfile(self.video_path):
self.link = self.video_path
self.thumb = await take_ss(self.video_path, path=self.path)
self.video = self.success = True

20
app/config.py Normal file
View File

@ -0,0 +1,20 @@
import os
import json
class Config:
API_KEYS = json.loads(os.envion.get("API_KEYS", "[]"))
BLOCKED_USERS = []
BLOCKED_USERS_MESSAGE_ID = int(os.environ.get("BLOCKED_USERS_MESSAGE_ID",0))
CHATS = []
AUTO_DL_MESSAGE_ID = int(os.environ.get("AUTO_DL_MESSAGE_ID",0))
CMD_DICT = {}
DEV_MODE = int(os.environ.get("DEV_MODE", 0))
LOG_CHAT = int(os.environ.get("LOG_CHAT"))
TRIGGER = os.environ.get("TRIGGER", ".")
USERS = []
USERS_MESSAGE_ID = int(os.environ.get("USERS_MESSAGE_ID",0))

170
app/core/MediaHandler.py Normal file
View File

@ -0,0 +1,170 @@
import asyncio
import glob
import os
import traceback
from urllib.parse import urlparse
from pyrogram.errors import MediaEmpty, PhotoSaveFileInvalid, WebpageCurlFailed
from pyrogram.types import InputMediaPhoto, InputMediaVideo
from app.api.gallerydl import Gallery_DL
from app.api.instagram import Instagram
from app.api.reddit import Reddit
from app.api.threads import Threads
from app.api.tiktok import Tiktok
from app.api.ytdl import YT_DL
from app.core import aiohttp_tools, shell
# Thanks Jeel Patel for the concept TG[@jeelpatel231]
url_map = {
"tiktok.com": Tiktok,
"www.instagram.com": Instagram,
"www.reddit.com": Reddit,
"reddit.com": Reddit,
"www.threads.net": Threads,
"twitter.com": Gallery_DL,
"youtube.com": YT_DL,
"youtu.be": YT_DL,
"www.facebook.com": YT_DL,
}
class ExtractAndSendMedia:
def __init__(self, message):
self.exceptions, self.media_objects = [], []
self.message = message
self.doc = "-d" in message.flags
self.spoiler = "-s" in message.flags
self.sender = "" if "-ns" in message.flags else f"\nShared by : {self.extract_sender()}"
self.args_ = {"chat_id": self.message.chat.id, "reply_to_message_id": message.reply_id}
def extract_sender(self):
author = self.message.author_signature
sender = user.first_name if (user := self.message.from_user) else ""
return author or sender
async def get_media(self):
async with asyncio.TaskGroup() as task_group:
tasks = []
for link in self.message.get_text_list:
if match := url_map.get(urlparse(link).netloc):
tasks.append(task_group.create_task(match.start(link)))
else:
for key, val in url_map.items():
if key in link:
tasks.append(task_group.create_task(val.start(link)))
self.media_objects = [task.result() for task in tasks if task.result()]
async def send_media(self):
for obj in self.media_objects:
if "-nc" in self.message.flags:
caption = ""
else:
caption = obj.caption + obj.caption_url + self.sender
try:
if self.doc:
await self.send_document(obj.link, caption=caption, path=obj.path)
elif obj.group:
await self.send_group(obj, caption=caption)
elif obj.photo:
await self.send_photo(obj.link, caption=caption)
elif obj.video:
await self.send_video(obj.link, thumb=obj.thumb, caption=caption)
elif obj.gif:
await self.send_animation(obj.link, caption=caption)
except BaseException:
self.exceptions.append(traceback.format_exc())
async def send_photo(self, photo, caption):
try:
try:
await self.message._client.send_photo(**self.args_, photo=photo, caption=caption, has_spoiler=self.spoiler)
except (MediaEmpty, WebpageCurlFailed):
photo = await aiohttp_tools.in_memory_dl(photo)
await self.message._client.send_photo(**self.args_, photo=photo, caption=caption, has_spoiler=self.spoiler)
except PhotoSaveFileInvalid:
await self.message._client.send_document(**self.args, document=photo, caption=caption, force_document=True)
async def send_video(self, video, caption, thumb):
try:
await self.message._client.send_video(
**self.args_, video=video, caption=caption, thumb=await aiohttp_tools.thumb_dl(thumb), has_spoiler=self.spoiler
)
except (MediaEmpty, WebpageCurlFailed):
video = await aiohttp_tools.in_memory_dl(video)
await self.message._client.send_video(
**self.args_, video=video, caption=caption, thumb=await aiohttp_tools.thumb_dl(thumb), has_spoiler=self.spoiler
)
async def send_animation(self, gif, caption):
try:
await self.message._client.send_animation(**self.args_, animation=gif, caption=caption, unsave=True, has_spoiler=self.spoiler)
except (MediaEmpty, WebpageCurlFailed):
gif = await aiohttp_tools.in_memory_dl(gif)
await self.message._client.send_animation(**self.args_, animation=gif, caption=caption, unsave=True, has_spoiler=self.spoiler)
async def send_document(self, docs, caption, path=""):
if not path:
docs = await asyncio.gather(*[aiohttp_tools.in_memory_dl(doc) for doc in docs])
else:
[os.rename(file_, file_ + ".png") for file_ in glob.glob(f"{path}/*.webp")]
docs = glob.glob(f"{path}/*")
for doc in docs:
try:
await self.message._client.send_document(**self.args_, document=doc, caption=caption, force_document=True)
except (MediaEmpty, WebpageCurlFailed):
doc = await aiohttp_tools.in_memory_dl(doc)
await self.message._client.send_document(**self.args_, document=doc, caption=caption, force_document=True)
await asyncio.sleep(0.5)
async def send_group(self, media, caption):
if media.path:
sorted = await self.sort_media_path(media.path, caption)
else:
sorted = await self.sort_media_urls(media.link, caption)
for data in sorted:
if isinstance(data, list):
await self.message._client.send_media_group(**self.args_, media=data)
else:
await self.send_animation(data, caption=caption)
await asyncio.sleep(1)
async def sort_media_path(self, path, caption):
[os.rename(file_, file_ + ".png") for file_ in glob.glob(f"{path}/*.webp")]
images, videos, animations = [], [], []
for file in glob.glob(f"{path}/*"):
if file.endswith((".png", ".jpg", ".jpeg")):
images.append(InputMediaPhoto(file, caption=caption, has_spoiler=self.spoiler))
if file.endswith((".mp4", ".mkv", ".webm")):
has_audio = await shell.check_audio(file)
if not has_audio:
animations.append(file)
else:
videos.append(InputMediaVideo(file, caption=caption, has_spoiler=self.spoiler))
return await self.make_chunks(images, videos, animations)
async def sort_media_urls(self, urls, caption):
images, videos, animations = [], [], []
downloads = await asyncio.gather(*[aiohttp_tools.in_memory_dl(url) for url in urls])
for file_obj in downloads:
if file_obj.name.endswith((".png", ".jpg", ".jpeg")):
images.append(InputMediaPhoto(file_obj, caption=caption, has_spoiler=self.spoiler))
if file_obj.name.endswith((".mp4", ".mkv", ".webm")):
videos.append(InputMediaVideo(file_obj, caption=caption, has_spoiler=self.spoiler))
if file_obj.name.endswith(".gif"):
animations.append(file_obj)
return await self.make_chunks(images, videos, animations)
async def make_chunks(self, images=[], videos=[], animations=[]):
chunk_imgs = [images[imgs : imgs + 5] for imgs in range(0, len(images), 5)]
chunk_vids = [videos[vids : vids + 5] for vids in range(0, len(videos), 5)]
return [*chunk_imgs, *chunk_vids, *animations]
@classmethod
async def process(cls, message):
obj = cls(message=message)
await obj.get_media()
await obj.send_media()
[m_obj.cleanup() for m_obj in obj.media_objects]
return obj

46
app/core/aiohttp_tools.py Executable file
View File

@ -0,0 +1,46 @@
import json
import os
from io import BytesIO
from urllib.parse import urlparse
import aiohttp
SESSION = None
async def session_switch():
if not SESSION:
globals().update({"SESSION": aiohttp.ClientSession()})
else:
await SESSION.close()
async def get_json(url: str, headers: dict = None, params: dict = None, retry: bool = False, json_: bool = False, timeout: int = 10):
try:
async with SESSION.get(url=url, headers=headers, params=params, timeout=timeout) as ses:
if json_:
ret_json = await ses.json()
else:
ret_json = json.loads(await ses.text())
return ret_json
except BaseException:
return
async def in_memory_dl(url: str):
async with SESSION.get(url) as remote_file:
bytes_data = await remote_file.read()
file = BytesIO(bytes_data)
name = os.path.basename(urlparse(url).path.rstrip("/"))
if name.endswith(".webp"):
name = name + ".jpg"
if name.endswith(".webm"):
name = name + ".mp4"
file.name = name
return file
async def thumb_dl(thumb):
if not thumb or not thumb.startswith("http"):
return thumb
return await in_memory_dl(thumb)

89
app/core/client.py Executable file
View File

@ -0,0 +1,89 @@
import glob
import importlib
import json
import os
import sys
from functools import wraps
from io import BytesIO
from pyrogram import Client, idle
from pyrogram.enums import ParseMode
from app import Config
from app.core import aiohttp_tools
from app.core.message import Message
class BOT(Client):
_LOADED = False
def __init__(self):
super().__init__(
name="bot",
session_string=os.environ.get("STRING_SESSION"),
api_id=int(os.environ.get("API_ID")),
api_hash=os.environ.get("API_HASH"),
in_memory=True,
parse_mode=ParseMode.DEFAULT,
)
def add_cmd(self, cmd, trigger=Config.TRIGGER): # Custom triggers To do
def the_decorator(func):
@wraps(func)
def wrapper():
Config.CMD_DICT[cmd] = func
wrapper()
return func
return the_decorator
async def boot(self):
await super().start()
await self.import_modules()
await self.set_filter_list()
await aiohttp_tools.session_switch()
await self.edit_restart_msg()
await self.log(text="#Social-dl\n__Started__")
print("started")
await idle()
await aiohttp_tools.session_switch()
async def edit_restart_msg(self):
if (restart_msg := os.environ.get("RESTART_MSG")) and (restart_chat := os.environ.get("RESTART_CHAT")):
await super().get_chat(int(restart_chat))
await super().edit_message_text(chat_id=int(restart_chat), message_id=int(restart_msg), text="#Social-dl\n__Started__")
os.environ.pop("RESTART_MSG", "")
os.environ.pop("RESTART_CHAT", "")
async def import_modules(self):
for module_ in glob.glob("app/*/*.py"):
importlib.import_module(os.path.splitext(module_.replace("/", "."))[0])
self._LOADED = True
async def log(self, text, chat=None, func=None, name="log.txt"):
if chat or func:
text = f"<b>Function:</b> {func}\n<b>Chat:</b> {chat}\n<b>Traceback:</b>\n{text}"
return await self.send_message(chat_id=Config.LOG_CHAT, text=text, name=name)
async def restart(self):
await aiohttp_tools.session_switch()
await super().stop(block=False)
os.execl(sys.executable, sys.executable, "-m", "app")
async def set_filter_list(self):
if chats_id := Config.AUTO_DL_MESSAGE_ID:
Config.CHATS = json.loads((await super().get_messages(Config.LOG_CHAT, chats_id)).text)
if blocked_id := Config.BLOCKED_USERS_MESSAGE_ID:
Config.BLOCKED_USERS = json.loads((await super().get_messages(Config.LOG_CHAT, blocked_id)).text)
if users := Config.USERS_MESSAGE_ID:
Config.USERS = json.loads((await super().get_messages(Config.LOG_CHAT, users)).text)
async def send_message(self, chat_id, text, name: str = "output.txt", **kwargs):
if len(str(text)) < 4096:
return Message.parse_message((await super().send_message(chat_id=chat_id, text=text, **kwargs)))
doc = BytesIO(bytes(text, encoding="utf-8"))
doc.name = name
kwargs.pop("disable_web_page_preview", "")
return await super().send_document(chat_id=chat_id, document=doc, **kwargs)

41
app/core/filters.py Normal file
View File

@ -0,0 +1,41 @@
from urllib.parse import urlparse
from pyrogram import filters as _filters
from app import Config
from app.core.MediaHandler import url_map
def DYNAMIC_CHAT_FILTER(_, __, message):
if not message.text or not message.text.startswith("https"):
return False
chat_check = message.chat.id in Config.CHATS
user_check = True
if user := message.from_user:
user_check = user.id not in Config.BLOCKED_USERS and not user.is_bot
return bool(chat_check and user_check and check_for_urls(message.text.split()))
def check_for_urls(text_list):
for link in text_list:
if match := url_map.get(urlparse(link).netloc):
return True
else:
for key in url_map.keys():
if key in link:
return True
def DYNAMIC_CMD_FILTER(_, __, message):
if not message.text or not message.text.startswith(Config.TRIGGER):
return False
cmd_check = message.text.split(maxsplit=1)[0].replace(Config.TRIGGER, "", 1) in Config.CMD_DICT
user_check = False
if user := message.from_user:
user_check = user.id in Config.USERS
reaction_check = bool(not message.reactions)
return bool(cmd_check and user_check and reaction_check)
chat_filter = _filters.create(DYNAMIC_CHAT_FILTER)
user_filter = _filters.create(DYNAMIC_CMD_FILTER)

59
app/core/message.py Executable file
View File

@ -0,0 +1,59 @@
from pyrogram.types import Message as MSG
class Message(MSG):
def __init__(self, message):
self.flags = []
self.input, self.flt_input = "", ""
self.replied, self.reply_id = None, None
super().__dict__.update(message.__dict__)
self.set_reply_properties()
self.flags_n_input()
self.set_flt_input()
@property
def get_text_list(self):
text_list = self.text.split()
if self.replied and (reply_text := self.replied.text) and "dl" in text_list[0]:
text_list.extend(reply_text.split())
return text_list
def set_reply_properties(self):
if replied := self.reply_to_message:
self.replied = replied
self.reply_id = replied.id
def flags_n_input(self):
self.flags = [i for i in self.text.split() if i.startswith("-")]
split_cmd_str = self.text.split(maxsplit=1)
if len(split_cmd_str) > 1:
self.input = split_cmd_str[-1]
def set_flt_input(self):
line_split = self.input.splitlines()
split_n_joined =[
" ".join([word for word in line.split(" ") if word not in self.flags])
for line in line_split
]
self.flt_input = "\n".join(split_n_joined)
async def reply(self, text, **kwargs):
return await self._client.send_message(chat_id=self.chat.id, text=text, reply_to_message_id=self.id, **kwargs)
async def edit(self, text, **kwargs):
if len(str(text)) < 4096:
kwargs.pop("name", "")
await self.edit_text(text, **kwargs)
else:
await super().delete()
return await self.reply(text, **kwargs)
async def delete(self, reply=False):
await super().delete()
if reply and self.replied:
await self.replied.delete()
@classmethod
def parse_message(cls, message):
ret_obj = cls(message)
return ret_obj

View File

@ -0,0 +1,29 @@
import shutil
class ScraperConfig:
def __init__(self):
self.path = ""
self.link = ""
self.caption = ""
self.caption_url = ""
self.thumb = None
self.success = False
self.photo = False
self.video = False
self.group = False
self.gif = False
def set_sauce(self, url):
self.caption_url = f"\n\n<a href={url}>Sauce</a>"
@classmethod
async def start(cls, url):
obj = cls(url=url)
await obj.download_or_extract()
if obj.success:
return obj
def cleanup(self):
if self.path:
shutil.rmtree(self.path, ignore_errors=True)

49
app/core/shell.py Executable file
View File

@ -0,0 +1,49 @@
import asyncio
import os
async def take_ss(video: str, path: str):
await run_shell_cmd(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{path}/i.png"''')
if os.path.isfile(path + "/i.png"):
return path + "/i.png"
async def check_audio(file):
result = await run_shell_cmd(f"ffprobe -v error -show_entries format=nb_streams -of default=noprint_wrappers=1:nokey=1 {file}")
return int(result or 0) - 1
async def run_shell_cmd(cmd):
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
stdout, _ = await proc.communicate()
return stdout.decode("utf-8")
class AsyncShell:
full_std = ""
is_not_completed = True
def __init__(self, process):
self.process = process
async def get_output(self):
while True:
# Check output and stop loop if it's emtpy
line = (await self.process.stdout.readline()).decode("utf-8")
if not line:
break
self.full_std += line
# Let the Subprocess complete and let it shut down
await self.process.wait()
self.is_not_completed = False
@classmethod
async def run_cmd(cls, cmd):
# Create Subprocess and initialise self using cls
sub_process = cls(process=await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT))
# Start Checking output but don't block code by awaiting it.
asyncio.create_task(sub_process.get_output())
# Sleep for a short time to let previous task start
await asyncio.sleep(0.5)
# Return Self object
return sub_process

149
app/plugins/authorise.py Normal file
View File

@ -0,0 +1,149 @@
from pyrogram.errors import MessageNotModified
from app import Config, bot
async def add_or_remove(mode, task, item, config_list, message_id):
err = None
if item in config_list and mode == "add":
return "ID Already in List"
elif item not in config_list and mode == "remove":
return "ID Not in List"
try:
task(item)
await bot.edit_message_text(
chat_id=Config.LOG_CHAT, message_id=message_id, text=str(config_list)
)
except MessageNotModified:
err = "Duplicate Entries, List Not modified."
except Exception as e:
err = str(e)
return err
def extract_user(message):
user, err = message.input.strip(), None
if not Config.USERS_MESSAGE_ID:
return user, "You haven't added `USERS_MESSAGE_ID` Var, Add it."
if message.replied:
user = message.replied.from_user.id
if not user:
return user, "Unable to Extract User IDs. Try again."
try:
user = int(user)
except ValueError:
return user, "Give a Valid User ID."
return user, err
def extract_chat(message):
chat, err = message.input.strip() or message.chat.id, None
if not Config.AUTO_DL_MESSAGE_ID:
return user, "You haven't added `AUTO_DL_MESSAGE_ID` Var, Add it."
if not chat:
return user, "Unable to Extract Chat IDs. Try again."
try:
chat = int(chat)
except ValueError:
return chat, "Give a Valid chat ID."
return chat, err
@bot.add_cmd(cmd="addsudo")
async def add_sudo(bot, message):
user, err = extract_user(message)
if err:
return await message.reply(err)
if err := await add_or_remove(
mode="add",
task=Config.USERS.append,
item=user,
config_list=Config.USERS,
message_id=Config.USERS_MESSAGE_ID,
):
return await message.reply(err)
await message.reply("User Added to Authorised List.")
@bot.add_cmd(cmd="delsudo")
async def add_sudo(bot, message):
user, err = extract_user(message)
if err:
return await message.reply(err)
if err := await add_or_remove(
mode="remove",
task=Config.USERS.remove,
item=user,
config_list=Config.USERS,
message_id=Config.USERS_MESSAGE_ID,
):
return await message.reply(err)
await message.reply("User Removed from Authorised List.")
@bot.add_cmd(cmd="addchat")
async def add_chat(bot, message):
chat, err = extract_chat(message)
if err:
return await message.reply(err)
if err := await add_or_remove(
mode="add",
task=Config.CHATS.append,
item=chat,
config_list=Config.CHATS,
message_id=Config.AUTO_DL_MESSAGE_ID,
):
return await message.reply(err)
await message.reply(f"<b>{message.chat.title}</b> Added to Authorised List.")
@bot.add_cmd(cmd="delchat")
async def add_chat(bot, message):
chat, err = extract_chat(message)
if err:
return await message.reply(err)
if err := await add_or_remove(
mode="remove",
task=Config.CHATS.remove,
item=chat,
config_list=Config.CHATS,
message_id=Config.AUTO_DL_MESSAGE_ID,
):
return await message.reply(err)
await message.reply(f"<b>{message.chat.title}</b> Added Removed from Authorised List.")
@bot.add_cmd(cmd="block")
async def add_sudo(bot, message):
user, err = extract_user(message)
if err:
return await message.reply(err)
if err := await add_or_remove(
mode="add",
task=Config.BLOCKED_USERS.append,
item=user,
config_list=Config.BLOCKED_USERS,
message_id=Config.BLOCKED_USERS_MESSAGE_ID,
):
return await message.reply(err)
await message.reply("User Added to Ban List.")
@bot.add_cmd(cmd="unblock")
async def add_sudo(bot, message):
user, err = extract_user(message)
if err:
return await message.reply(err)
if err := await add_or_remove(
mode="remove",
task=Config.BLOCKED_USERS.remove,
item=user,
config_list=Config.BLOCKED_USERS,
message_id=Config.BLOCKED_USERS_MESSAGE_ID,
):
return await message.reply(err)
await message.reply("User Removed from Ban List.")

32
app/plugins/bot.py Executable file
View File

@ -0,0 +1,32 @@
import os
from pyrogram.enums import ChatType
from app import Config, bot
@bot.add_cmd(cmd="bot")
async def info(bot, message):
head = "<b><a href=https://t.me/Social_DL>Social-DL</a> is running.</b>"
chat_count = f"\n<b>Auto-Dl enabled in: <code>{len(Config.CHATS)}</code> chats</b>\n"
supported_sites, photo = await bot.get_messages("Social_DL", [2, 3])
await photo.copy(message.chat.id, caption="\n".join([head, chat_count, supported_sites.text.html]))
@bot.add_cmd(cmd="help")
async def help(bot, message):
commands = "\n".join([ f"<code>{Config.TRIGGER}{i}</code>" for i in Config.CMD_DICT.keys()])
await message.reply(f"<b>Available Commands:</b>\n\n{commands}")
@bot.add_cmd(cmd="restart")
async def restart(bot, message):
reply = await message.reply("restarting....")
if message.chat.type in [ChatType.GROUP, ChatType.SUPERGROUP]:
os.environ["RESTART_MSG"] = str(reply.id)
os.environ["RESTART_CHAT"] = str(reply.chat.id)
await bot.restart()
@bot.add_cmd(cmd="update")
async def chat_update(bot, message):
await bot.set_filter_list()
await message.reply("Filters Refreshed")

75
app/plugins/tgUtils.py Normal file
View File

@ -0,0 +1,75 @@
import os
from pyrogram.enums import ChatType
from pyrogram.errors import BadRequest
from pyrogram.types import Message
from app import bot
# Delete replied and command message
@bot.add_cmd(cmd="del")
async def delete_message(bot, message: Message):
await message.delete(reply=True)
# Delete Multiple messages from replied to command.
@bot.add_cmd(cmd="purge")
async def purge_(bot, message: Message):
reply = message.replied
if not reply:
return await message.reply("reply to a message")
start_message = reply.id
end_message = message.id
messages = [end_message] + [i for i in range(int(start_message), int(end_message))]
await bot.delete_messages(chat_id=message.chat.id, message_ids=messages, revoke=True)
@bot.add_cmd(cmd="ids")
async def get_ids(bot, message):
if reply := message.replied:
ids = ""
reply_forward = reply.forward_from_chat
reply_user = reply.from_user
ids += f"Chat : `{reply.chat.id}`\n"
if reply_forward:
ids += f"Replied {'Channel' if reply_forward.type == ChatType.CHANNEL else 'Chat'} : `{reply_forward.id}`\n"
if reply_user:
ids += f"User : {reply.from_user.id}"
else:
ids = f"Chat :`{message.chat.id}`"
await message.reply(ids)
@bot.add_cmd(cmd="join")
async def join_chat(bot, message):
chat = message.input
if chat.isdigit():
chat = int(f"-100{chat}")
try:
await bot.join_chat(chat)
except (KeyError, BadRequest):
try:
await bot.join_chat(os.path.basename(chat).strip())
except Exception as e:
return await message.reply(str(e))
await message.reply("Joined")
@bot.add_cmd(cmd="leave")
async def leave_chat(bot, message):
if message.input:
chat = message.input
else:
chat = message.chat.id
try:
await bot.leave_chat(chat)
except Exception as e:
await message.reply(str(e))
@bot.add_cmd(cmd="reply")
async def reply(bot, message):
text = message.input
await bot.send_message(chat_id=message.chat.id, text=text, reply_to_message_id=message.reply_id, disable_web_page_preview=True)

76
app/plugins/tools.py Normal file
View File

@ -0,0 +1,76 @@
import asyncio
import sys
import traceback
from io import StringIO
from pyrogram.enums import ParseMode
from app import Config
from app.core import shell
from app.core.aiohttp_tools import SESSION, in_memory_dl
# Run shell commands
async def run_cmd(bot, message):
cmd = message.input.strip()
status_ = await message.reply("executing...")
proc_stdout = await shell.run_shell_cmd(cmd)
output = f"`${cmd}`\n\n`{proc_stdout}`"
return await status_.edit(output, name="sh.txt", disable_web_page_preview=True)
# Shell but Live Output
async def live_shell(bot, message):
cmd = message.input.strip()
sub_process = await shell.AsyncShell.run_cmd(cmd)
reply = await message.reply("`getting live output....`")
output = ""
sleep_for = 1
while sub_process.is_not_completed:
# Edit message only when there's new output.
if output != sub_process.full_std:
output = sub_process.full_std
if len(output) <= 4096:
await reply.edit(f"`{output}`", disable_web_page_preview=True, parse_mode=ParseMode.MARKDOWN)
# Reset sleep duration
if sleep_for >= 5:
sleep_for = 1
# Sleep to Unblock running loop and let output reader read new
# output.
await asyncio.sleep(sleep_for)
sleep_for += 1
# If the subprocess is finished edit the message with cmd and full
# output
return await reply.edit(f"`$ {cmd}\n\n``{sub_process.full_std}`", name="shell.txt", disable_web_page_preview=True)
# Run Python code
async def executor_(bot, message):
code = message.flt_input.strip()
if not code:
return await message.reply("exec Jo mama?")
reply = await message.reply("executing")
sys.stdout = codeOut = StringIO()
sys.stderr = codeErr = StringIO()
# Indent code as per proper python syntax
formatted_code = "\n ".join(code.splitlines())
try:
# Create and initialise the function
exec(f"async def _exec(bot, message):\n {formatted_code}")
func_out = await locals().get("_exec")(bot, message)
except BaseException:
func_out = str(traceback.format_exc())
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
output = f"`{codeOut.getvalue().strip() or codeErr.getvalue().strip() or func_out}`"
if "-s" not in message.flags:
output = f"> `{code}`\n\n>> {output}"
return await reply.edit(output, name="exec.txt", disable_web_page_preview=True,parse_mode=ParseMode.MARKDOWN)
if Config.DEV_MODE:
Config.CMD_DICT["sh"] = run_cmd
Config.CMD_DICT["shell"] = live_shell
Config.CMD_DICT["exec"] = executor_

39
app/social_dl.py Normal file
View File

@ -0,0 +1,39 @@
import traceback
from app import Config, bot
from app.core import filters
from app.core.MediaHandler import ExtractAndSendMedia
from app.core.message import Message
@bot.add_cmd(cmd="dl")
async def dl(bot, message):
reply = await bot.send_message(chat_id=message.chat.id, text="`trying to download...`")
media = await ExtractAndSendMedia.process(message)
if media.exceptions:
exceptions = "\n".join(media.exceptions)
await bot.log(text=exceptions, func="DL", chat=message.chat.id, name="traceback.txt")
return await reply.edit(f"Media Download Failed.")
if media.media_objects:
await message.delete()
await reply.delete()
@bot.on_message(filters.user_filter)
@bot.on_edited_message(filters.user_filter)
async def cmd_dispatcher(bot, message):
func = Config.CMD_DICT[message.text.split(maxsplit=1)[0].lstrip(Config.TRIGGER)]
parsed_message = Message.parse_message(message)
try:
await func(bot, parsed_message)
except BaseException:
await bot.log(text=str(traceback.format_exc()), chat=message.chat.id, func=func.__name__, name="traceback.txt")
@bot.on_message(filters.chat_filter)
async def dl_dispatcher(bot, message):
func = Config.CMD_DICT["dl"]
parsed_message = Message.parse_message(message)
try:
await func(bot, parsed_message)
except BaseException:
await bot.log(text=str(traceback.format_exc()), chat=message.chat.id, func=func.__name__, name="traceback.txt")

Binary file not shown.

After

Width:  |  Height:  |  Size: 263 KiB

15
req.txt
View File

@ -1,7 +1,10 @@
aiohttp
aiohttp_retry
pyrogram==2.0.57
aiohttp>=3.8.4
beautifulsoup4>=4.12.2
Brotli>=1.0.9
gallery_dl>=1.25.7
pyrogram>=2.0.106
python-dotenv==0.21.0
tgCrypto==1.2.3
wget
yt-dlp
PyExecJS>=1.5.1
tenacity>=8.2.2
tgCrypto>=1.2.3
yt-dlp>=2023.6.22

View File

@ -1,15 +1,42 @@
API_ID=123456
API_ID=12345678
# You API ID
API_HASH="abcd1238fn...."
API_HASH="abf12395nskfns"
# HASH
DEV_MODE = "no"
API_KEYS=[]
# Your https://api.webscraping.ai/ Keys
# Multiple values are separated by ,
# so [122456, 78990]
LOG_CHANNEL=-10012345678
MESSAGE=12345
BLOCKED_USERS_MESSAGE_ID = 0
# Last digits of a link
# t.me/1263849/69
# 69 in the end of the list so Value is 69
# Blocked users list
DEV_MODE=0
# Change to 1 if you want exec, sh, shell commands
LOG_CHAT=-100.....
# Log channel •Required•
AUTO_DL_MESSAGE_ID=0
# Last digits of link
# For Auto DL chat List
STRING_SESSION="Ab0fbs......."
TRIGGER="."
USERS=[12345678] # Multiple user IDs should be separated by ,
STRING_SESSION="Aq0dj......"
# Your session
USERS_MESSAGE_ID = 0
# for Users List with full bot access
# enter last digits of link

View File

@ -1,554 +0,0 @@
"""
* socialbot.py Main Logic file of Bot.
MIT License
Copyright (c) 2023 Ryuk
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import asyncio
import base64
import glob
import json
import os
import shutil
import sys
import time
import traceback
from io import StringIO
from urllib.parse import urlparse as url_p
import aiohttp
import yt_dlp
from aiohttp_retry import ExponentialRetry, RetryClient
from dotenv import load_dotenv
from pyrogram import Client, filters, idle
from pyrogram.enums import ChatType, ParseMode
from pyrogram.errors import MediaEmpty, PeerIdInvalid, PhotoSaveFileInvalid, WebpageCurlFailed
from pyrogram.handlers import MessageHandler
from pyrogram.types import InputMediaPhoto, InputMediaVideo, Message
from wget import download
if os.path.isfile("config.env"):
load_dotenv("config.env")
bot = Client(
name="bot",
session_string=os.environ.get("STRING_SESSION"),
api_id=os.environ.get("API_ID"),
api_hash=os.environ.get("API_HASH"),
in_memory=True,
parse_mode=ParseMode.DEFAULT,
)
LOG_CHAT = os.environ.get("LOG_CHANNEL")
if LOG_CHAT is None:
print("Enter log channel id in config")
exit()
USERS = json.loads(os.environ.get("USERS"))
TRIGGER = os.environ.get("TRIGGER")
E_JSON = base64.b64decode("Lz9fX2E9MSZfX2Q9MQ==").decode("utf-8")
# BOT Section
@bot.on_edited_message(filters.command(commands="dl", prefixes=TRIGGER) & filters.user(USERS))
@bot.on_message(filters.command(commands="dl", prefixes=TRIGGER) & filters.user(USERS))
async def dl(bot, message: Message):
parsed = MESSAGE_PARSER(message)
if not parsed.coro_:
return
status = "failed"
msg_ = await bot.send_message(chat_id=message.chat.id, text="`trying to download...`")
for coroutine_ in parsed.coro_:
media_dict = await coroutine_
if isinstance(media_dict, dict) and "media" in media_dict:
status = await send_media(message=message, data=media_dict, doc=parsed.doc, caption=parsed.caption)
if status == "failed":
return await msg_.edit(f"Media Download Failed.")
await message.delete()
await msg_.delete()
# Parse Message text and return coroutine for matched links
class MESSAGE_PARSER:
def __init__(self, message):
self.text_list = message.text.split()
self.flags = [i for i in self.text_list if i.startswith("-")]
self.sender = message.author_signature or (user.first_name if (user := message.from_user) else "")
self.caption = f"Shared by : {self.sender}"
self.doc = "-d" in self.flags
self.coro_ = []
self.match_links()
# Thanks Jeel Patel [TG @jeelpatel231] for url map concept.
def match_links(self):
url_map = {
"tiktok.com": yt_dl,
"www.instagram.com": instagram_dl,
"youtube.com/shorts": yt_dl,
"twitter.com": yt_dl,
"www.reddit.com": reddit_dl,
}
for link in self.text_list:
if (match := url_map.get(url_p(link).netloc)):
self.coro_.append(match(url=link,doc=self.doc, caption=self.caption))
else:
for key, val in url_map.items():
if key in link:
self.coro_.append(val(url=link,doc=self.doc, caption=self.caption))
# Send media back
async def send_media(message: Message, data: dict, caption: str, doc: bool = False):
reply = message.reply_to_message
reply_id = reply.id if reply else None
media = data.get("media")
thumb = data.get("thumb", None)
caption = data.get("caption", "")
is_image, is_video, is_animation, is_grouped = (data.get("is_image"), data.get("is_video"), data.get("is_animation"), data.get("is_grouped"))
status = "failed"
args_ = {"chat_id": message.chat.id, "reply_to_message_id": reply_id}
if isinstance(media, list):
for vv in media:
try:
if isinstance(vv, list):
status = await bot.send_media_group(**args_, media=vv)
await asyncio.sleep(2)
elif doc:
status = await bot.send_document(**args_, caption=caption, document=vv, force_document=True)
else:
status = await bot.send_animation(**args_, caption=caption, animation=vv, unsave=True)
except Exception:
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
else:
args_.update({"caption": caption})
try:
if is_image:
status = await bot.send_photo(**args_, photo=media)
elif is_video:
status = await bot.send_video(**args_, video=media, thumb=thumb)
elif is_animation:
status = await bot.send_animation(**args_, animation=media, unsave=True)
else:
status = await bot.send_document(**args_, document=media, force_document=True)
except PhotoSaveFileInvalid:
await bot.send_document(**args_, document=media, force_document=True)
except (MediaEmpty, WebpageCurlFailed, ValueError):
pass
if os.path.exists(str(data["path"])):
shutil.rmtree(str(data["path"]))
if status != "failed":
return "done"
return status
@bot.on_edited_message(filters.command(commands="bot", prefixes=TRIGGER) & filters.user(USERS))
@bot.on_message(filters.command(commands="bot", prefixes=TRIGGER) & filters.user(USERS))
async def multi_func(bot, message: Message):
rw_message = message.text.split()
try:
# Restart
if "restart" in rw_message:
await SESSION.close()
await RETRY_CLIENT.close()
os.execl(sys.executable, sys.executable, __file__)
# Get chat / channel id
elif "ids" in rw_message:
if (reply := message.reply_to_message):
ids = ""
reply_forward = reply.forward_from_chat
reply_user = reply.from_user
ids += f"Chat : `{reply.chat.id}`\n"
if reply_forward:
ids += f"Replied {'Channel' if reply_forward.type == ChatType.CHANNEL else 'Chat'} : `{reply_forward.id}`\n"
if reply_user:
ids += f"User : {reply.from_user.id}"
else:
ids = f"Chat :`{message.chat.id}`"
await message.reply(ids)
# Update Auto-DL chats
elif "update" in rw_message:
bot.remove_handler(*HANDLER_)
await add_h()
await message.reply("Chat list refreshed")
# Join a chat
elif "join" in rw_message:
if len(rw_message) > 2:
try:
await bot.join_chat(rw_message[-1])
except KeyError:
await bot.join_chat(os.path.basename(rw_message[-1]).strip())
except Exception as e:
return await message.reply(str(e))
await message.reply("Joined")
# Leave a chat
elif "leave" in rw_message:
if len(rw_message) == 3:
chat = rw_message[-1]
else:
chat = message.chat.id
await bot.leave_chat(chat)
else:
await message.reply("Social-DL is running")
except Exception:
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
# Delete replied and command message
@bot.on_message(filters.command(commands="del", prefixes=TRIGGER) & filters.user(USERS))
async def delete_message(bot, message: Message):
reply = message.reply_to_message
await message.delete()
if reply:
await reply.delete()
# Delete Multiple messages from replied to command.
@bot.on_message(filters.command(commands="purge", prefixes=TRIGGER) & filters.user(USERS))
async def purge_(bot, message: Message):
reply = message.reply_to_message
if not reply:
return await message.reply("reply to a message")
start_message = reply.id
end_message = message.id
messages = [end_message] + [i for i in range(int(start_message), int(end_message))]
await bot.delete_messages(chat_id=message.chat.id, message_ids=messages, revoke=True)
if os.environ.get("DEV_MODE") == "yes":
# Run shell commands
@bot.on_edited_message(filters.command(commands="sh", prefixes=TRIGGER) & filters.user(USERS))
@bot.on_message(filters.command(commands="sh", prefixes=TRIGGER) & filters.user(USERS))
async def run_cmd(bot, message: Message):
cmd = message.text.replace(f"{TRIGGER}sh ", "").strip()
status_ = await message.reply("executing...")
proc = await run_shell_cmd(cmd)
output = f"${cmd}"
if (stdout := proc.get("stdout")):
output += f"""\n\n**Output:**\n\n`{stdout}`"""
if (stderr := proc.get("stderr")):
output += f"""\n\n**Error:**\n\n`{stderr}`"""
await status_.edit(output,parse_mode=ParseMode.MARKDOWN)
# Run Python code
@bot.on_edited_message(
filters.command(commands="exec", prefixes=TRIGGER) & filters.user(USERS)
)
@bot.on_message(
filters.command(commands="exec", prefixes=TRIGGER) & filters.user(USERS)
)
async def executor_(bot, message):
code = message.text.replace(f"{TRIGGER}exec","").strip()
if not code:
return await message.reply("exec Jo mama?")
reply = await message.reply("executing")
sys.stdout = codeOut = StringIO()
sys.stderr = codeErr = StringIO()
# Indent code as per proper python syntax
formatted_code = "".join(["\n "+i for i in code.split("\n")])
try:
# Create and initialise the function
exec(f"async def exec_(bot, message):{formatted_code}")
func_out = await locals().get("exec_")(bot, message)
except Exception:
func_out = str(traceback.format_exc())
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
output = codeOut.getvalue().strip() or codeErr.getvalue().strip() or func_out or ""
await reply.edit(f"> `{code}`\n\n>> `{output}`",parse_mode=ParseMode.MARKDOWN)
# Add Auto-DL regex Handler
async def add_h():
message_id = os.environ.get("MESSAGE")
if message_id is None:
print("\nEnter Message id in config.\n")
return 1
try:
msg = (await bot.get_messages(int(LOG_CHAT), int(message_id))).text
except PeerIdInvalid:
print("\nLog channel not found.\nCheck the variable for mistakes")
return 1
if msg is None:
print("\nMessage not found\nCheck variable for mistakes\n")
return 1
try:
chats_list = [int(i) for i in msg.split()]
except ValueError:
print("\nThe message id is wrong. \nOr \nChat id message contains letters\nonly numerical ids are allowed.\n")
return 1
social_handler = bot.add_handler(
MessageHandler(
dl,
(
(filters.regex(r"^http*"))
& filters.chat(chats_list)
),
),
group=1,
)
globals().update({"HANDLER_":social_handler})
# Start the bot and wait idle without blocking the main loop
async def boot():
check_handlers = await add_h()
msg = "#Social-dl\nStarted\n"
if check_handlers == 1:
msg += "\n* Running in command only mode. *"
print(msg)
await bot.send_message(chat_id=int(LOG_CHAT), text="#Social-dl\n__Started__")
globals().update({"SESSION":aiohttp.ClientSession()})
globals().update({"RETRY_CLIENT":RetryClient(client_session=SESSION, retry_for_statuses={408, 504}, retry_options=ExponentialRetry(attempts=1))})
await idle()
# API Section
# Instagram
async def instagram_dl(url: str, caption: str, doc: bool = False):
args = locals()
# status = await instafix(message=message, link=i, caption=caption)
for i in [yt_dl, api_2]:
data = await i(**args)
if isinstance(data, dict):
break
return data
async def api_2(url: str, caption: str, doc: bool):
link = url.split("/?")[0] + E_JSON
response = await get_json(url=link)
if not response or "graphql" not in response:
return "failed"
return await parse_ghraphql(
response["graphql"]["shortcode_media"], caption=caption + "\n.."
)
async def parse_ghraphql(json_: dict, caption: str, doc: bool = False):
try:
path = f"downloads/{time.time()}"
os.makedirs(path)
ret_dict = {"path": path, "thumb": None, "caption": caption}
type_check = json_.get("__typename",None)
if not type_check:
return "failed"
elif type_check == "GraphSidecar":
media = []
for i in json_["edge_sidecar_to_children"]["edges"]:
if i["node"]["__typename"] == "GraphImage":
media.append(i["node"]["display_url"])
if i["node"]["__typename"] == "GraphVideo":
media.append(i["node"]["video_url"])
ret_dict.update({"is_grouped": False if doc else True, "media": await async_download(urls=media, path=path, doc=doc, caption=caption)})
else:
media = json_.get("video_url") or json_.get("display_url")
ret_dict.update(**await get_media(url=media, path=path))
except Exception:
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
return ret_dict
# YT-DLP for videos from multiple sites
async def yt_dl(url: str, caption: str, doc:bool=False):
if "instagram.com/p/" in url:
return
path = str(time.time())
video = f"{path}/v.mp4"
_opts = {
"outtmpl": video,
"ignoreerrors": True,
"ignore_no_formats_error": True,
"quiet": True,
"logger": FakeLogger(),
}
if "shorts" in url:
_opts.update({"format": "bv[ext=mp4][res=480]+ba[ext=m4a]/b[ext=mp4]"})
else:
_opts.update({"format": "bv[ext=mp4]+ba[ext=m4a]/b[ext=mp4]"})
data = "failed"
try:
yt_dlp.YoutubeDL(_opts).download(url)
if os.path.isfile(video):
data = {
"path": path,
"is_video": True,
"media": video,
"thumb": await take_ss(video=video, path=path),
"caption": caption,
}
except BaseException:
pass
return data
# To disable YT-DLP logging
class FakeLogger(object):
def debug(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
# Reddit
async def reddit_dl(url: str, caption: str, doc: bool = False):
link = url.split("/?")[0] + ".json?limit=1"
headers = {
"user-agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_8_7 rv:5.0; en-US) AppleWebKit/533.31.5 (KHTML, like Gecko) Version/4.0 Safari/533.31.5"
}
try:
response = await get_json(url=link, headers=headers, json_=True)
if not response:
return "failed"
json_ = response[0]["data"]["children"][0]["data"]
caption = f'__{json_["subreddit_name_prefixed"]}:__\n**{json_["title"]}**\n\n' + caption
path = str(time.time())
os.mkdir(path)
is_vid, is_gallery = json_.get("is_video"), json_.get("is_gallery")
data = {"path": path, "caption": caption}
if is_vid:
video = f"{path}/v.mp4"
vid_url = json_["secure_media"]["reddit_video"]["hls_url"]
await run_shell_cmd(f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {video}')
data.update({"is_video": True, "media": video, "thumb": await take_ss(video=video, path=path)})
elif is_gallery:
grouped_media_urls = [json_["media_metadata"][val]["s"]["u"].replace("preview", "i") for val in json_["media_metadata"]]
downloads = await async_download(urls=grouped_media_urls, path=path, doc=doc, caption=caption)
data.update({"is_grouped": True, "media": downloads})
else:
url_ = json_.get("preview", {}).get("reddit_video_preview", {}).get("fallback_url", "") or json_.get("url_overridden_by_dest", "").strip()
if not url_:
return "failed"
data.update(await get_media(url=url_, path=path))
except Exception:
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
return data
# Get Json response from APIs
async def get_json(url: str, headers: dict = None, params: dict = None, retry: bool = False, json_: bool = False, timeout: int = 10):
if retry:
client = RETRY_CLIENT
else:
client = SESSION
try:
async with client.get(url=url, headers=headers, params=params, timeout=timeout) as ses:
if json_:
ret_json = await ses.json()
else:
ret_json = json.loads(await ses.text())
except (json.decoder.JSONDecodeError, aiohttp.ContentTypeError, asyncio.TimeoutError):
return
except Exception:
await bot.send_message(chat_id=LOG_CHAT, text=str(traceback.format_exc()))
return
return ret_json
# Download media and return it with media type
async def get_media(url: str, path: str):
down_load = download(url, path)
ret_dict = {"media": down_load}
if down_load.lower().endswith((".jpg", ".jpeg", ".png", ".webp")):
ret_dict["is_image"] = True
if down_load.lower().endswith(".webp"):
os.rename(down_load, down_load + ".jpg")
ret_dict.update({"media": down_load + ".jpg"})
elif down_load.lower().endswith((".mkv", ".mp4", ".webm")):
ret_dict.update({"is_video": True, "thumb": await take_ss(video=down_load, path=path)})
elif down_load.lower().endswith(".gif"):
ret_dict.update({"is_animation": True})
else:
return {}
return ret_dict
# Download multiple media asynchronously to save time;
# Return it in a list or a list with smaller lists each containing upto 5 media.
async def async_download(urls: list, path: str, doc: bool = False, caption: str = ""):
down_loads = await asyncio.gather(*[asyncio.to_thread(download, url, path) for url in urls])
if doc:
return down_loads
[os.rename(file, file + ".png") for file in glob.glob(f"{path}/*.webp")]
files = [i + ".png" if i.endswith(".webp") else i for i in down_loads]
grouped_images, grouped_videos, animations = [], [], []
for file in files:
if file.endswith((".png", ".jpg", ".jpeg")):
grouped_images.append(InputMediaPhoto(file, caption=caption))
if file.endswith((".mp4", ".mkv", ".webm")):
has_audio = await check_audio(file)
if not has_audio:
animations.append(file)
else:
grouped_videos.append(InputMediaVideo(file, caption=caption))
return_list = [
grouped_images[imgs : imgs + 5] for imgs in range(0, len(grouped_images), 5)
] + [grouped_videos[vids : vids + 5] for vids in range(0, len(grouped_videos), 5)
] + animations
return return_list
# Thumbnail
async def take_ss(video: str, path: str):
await run_shell_cmd(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{path}/i.png"''')
if os.path.isfile(path + "/i.png"):
return path + "/i.png"
async def check_audio(file):
result = await run_shell_cmd(f"ffprobe -v error -show_entries format=nb_streams -of default=noprint_wrappers=1:nokey=1 {file}")
return int(result.get("stdout", 0)) - 1
async def run_shell_cmd(cmd):
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
return {"stdout": stdout.decode("utf-8"), "stderr": stderr.decode("utf-8")}
# Start only bot when file is called directly.
if __name__ == "__main__":
bot.start()
bot.run(boot())