diff --git a/app/__init__.py b/app/__init__.py
index bf8e43e..a58e7fc 100755
--- a/app/__init__.py
+++ b/app/__init__.py
@@ -1,14 +1,15 @@
-from dotenv import load_dotenv
+import os
+from dotenv import load_dotenv
load_dotenv("config.env")
from .config import Config
from .core.client import BOT
-import os
if not os.environ.get("TERMUX_APK_RELEASE"):
import uvloop
+
uvloop.install()
bot = BOT()
diff --git a/app/__main__.py b/app/__main__.py
index 4cb31b6..97df1fd 100755
--- a/app/__main__.py
+++ b/app/__main__.py
@@ -1,6 +1,8 @@
if __name__ == "__main__":
import tracemalloc
+
tracemalloc.start()
from app import bot
+
bot.run(bot.boot())
diff --git a/app/api/gallerydl.py b/app/api/gallerydl.py
index 4bbb185..5a0e7a2 100644
--- a/app/api/gallerydl.py
+++ b/app/api/gallerydl.py
@@ -11,14 +11,15 @@ class Gallery_DL(ScraperConfig):
def __init__(self, url):
super().__init__()
self.url = url
- self.set_sauce(url)
async def download_or_extract(self):
self.path = "downloads/" + str(time.time())
os.makedirs(self.path)
try:
async with asyncio.timeout(30):
- await shell.run_shell_cmd(f"gallery-dl -q --range '0-4' -D {self.path} '{self.url}'")
+ await shell.run_shell_cmd(
+ f"gallery-dl -q --range '0-4' -D {self.path} '{self.url}'"
+ )
except TimeoutError:
pass
files = glob.glob(f"{self.path}/*")
diff --git a/app/api/reddit.py b/app/api/reddit.py
index efcee49..0eef990 100755
--- a/app/api/reddit.py
+++ b/app/api/reddit.py
@@ -9,7 +9,6 @@ from app.core.scraper_config import ScraperConfig
class Reddit(ScraperConfig):
def __init__(self, url):
super().__init__()
- self.set_sauce(url)
parsed_url = urlparse(url)
self.url = f"https://www.reddit.com{parsed_url.path}.json?limit=1"
@@ -17,7 +16,9 @@ class Reddit(ScraperConfig):
headers = {
"user-agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_8_7 rv:5.0; en-US) AppleWebKit/533.31.5 (KHTML, like Gecko) Version/4.0 Safari/533.31.5"
}
- response = await aiohttp_tools.get_json(url=self.url, headers=headers, json_=True)
+ response = await aiohttp_tools.get_json(
+ url=self.url, headers=headers, json_=True
+ )
if not response:
return
@@ -26,7 +27,9 @@ class Reddit(ScraperConfig):
except BaseException:
return
- self.caption = f"""__{json_["subreddit_name_prefixed"]}:__\n**{json_["title"]}**"""
+ self.caption = (
+ f"""__{json_["subreddit_name_prefixed"]}:__\n**{json_["title"]}**"""
+ )
is_vid, is_gallery = json_.get("is_video"), json_.get("is_gallery")
@@ -35,16 +38,26 @@ class Reddit(ScraperConfig):
os.makedirs(self.path)
self.link = f"{self.path}/v.mp4"
vid_url = json_["secure_media"]["reddit_video"]["hls_url"]
- await shell.run_shell_cmd(f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {self.link}')
+ await shell.run_shell_cmd(
+ f'ffmpeg -hide_banner -loglevel error -i "{vid_url.strip()}" -c copy {self.link}'
+ )
self.thumb = await shell.take_ss(video=self.link, path=self.path)
self.video = self.success = True
elif is_gallery:
- self.link = [val["s"].get("u", val["s"].get("gif")).replace("preview", "i") for val in json_["media_metadata"].values()]
+ self.link = [
+ val["s"].get("u", val["s"].get("gif")).replace("preview", "i")
+ for val in json_["media_metadata"].values()
+ ]
self.group = self.success = True
else:
- self.link = json_.get("preview", {}).get("reddit_video_preview", {}).get("fallback_url", json_.get("url_overridden_by_dest", "")).strip()
+ self.link = (
+ json_.get("preview", {})
+ .get("reddit_video_preview", {})
+ .get("fallback_url", json_.get("url_overridden_by_dest", ""))
+ .strip()
+ )
if not self.link:
return
if self.link.endswith(".gif"):
diff --git a/app/api/threads.py b/app/api/threads.py
index a36431d..06eecb0 100644
--- a/app/api/threads.py
+++ b/app/api/threads.py
@@ -11,12 +11,15 @@ class Threads(ScraperConfig):
def __init__(self, url):
super().__init__()
self.url = url
- self.set_sauce(url)
async def download_or_extract(self):
shortcode = os.path.basename(urlparse(self.url).path.rstrip("/"))
- response = await (await aiohttp_tools.SESSION.get(f"https://www.threads.net/t/{shortcode}/embed/")).text()
+ response = await (
+ await aiohttp_tools.SESSION.get(
+ f"https://www.threads.net/t/{shortcode}/embed/"
+ )
+ ).text()
soup = BeautifulSoup(response, "html.parser")
diff --git a/app/api/tiktok.py b/app/api/tiktok.py
index 8a18067..5cd3bba 100755
--- a/app/api/tiktok.py
+++ b/app/api/tiktok.py
@@ -8,7 +8,6 @@ class Tiktok(ScraperConfig):
def __init__(self, url):
super().__init__()
self.url = url
- self.set_sauce(url)
async def download_or_extract(self):
media = await tiktok_scraper.hybrid_parsing(self.url)
diff --git a/app/api/tiktok_scraper.py b/app/api/tiktok_scraper.py
index f0f184f..66dd82a 100644
--- a/app/api/tiktok_scraper.py
+++ b/app/api/tiktok_scraper.py
@@ -1,682 +1,682 @@
-#!/usr/bin/env python
-# -*- encoding: utf-8 -*-
-# @Author: https://github.com/Evil0ctal/
-# @Time: 2021/11/06
-# @Update: 2023/03/08
-# @Version: 3.3.0
-# @Function:
-# 核心代码,估值1块(๑•̀ㅂ•́)و✧
-# 用于爬取Douyin/TikTok数据并以字典形式返回。
-# input link, output dictionary.
-
-
-import asyncio
-import configparser
-import os
-import platform
-import re
-import time
-import traceback
-import urllib.parse
-from typing import Union
-
-import aiohttp
-import execjs
-from tenacity import *
-
-quiet_mode = False
-
-
-class Scraper:
- """__________________________________________⬇️initialization(初始化)⬇️______________________________________"""
-
- # 初始化/initialization
- def __init__(self, quiet: bool = False):
- global quiet_mode
- quiet_mode = quiet
- self.headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36"
- }
- self.douyin_api_headers = {
- "accept-encoding": "gzip, deflate, br",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
- "referer": "https://www.douyin.com/",
- # 'cookie': "s_v_web_id=verify_leytkxgn_kvO5kOmO_SdMs_4t1o_B5ml_BUqtWM1mP6BF;"
- }
- self.tiktok_api_headers = {
- "User-Agent": "com.ss.android.ugc.trill/494+Mozilla/5.0+(Linux;+Android+12;+2112123G+Build/SKQ1.211006.001;+wv)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Version/4.0+Chrome/107.0.5304.105+Mobile+Safari/537.36"
- }
- # 判断配置文件是否存在/Check if the configuration file exists
- if os.path.exists("config.ini"):
- self.config = configparser.ConfigParser()
- self.config.read("config.ini", encoding="utf-8")
- # 判断是否使用代理
- if self.config["Scraper"]["Proxy_switch"] == "True":
- # 判断是否区别协议选择代理
- if self.config["Scraper"]["Use_different_protocols"] == "False":
- self.proxies = {"all": self.config["Scraper"]["All"]}
- else:
- self.proxies = {"http": self.config["Scraper"]["Http_proxy"], "https": self.config["Scraper"]["Https_proxy"]}
- else:
- self.proxies = None
- # 配置文件不存在则不使用代理/If the configuration file does not exist, do not use
- # the proxy
- else:
- self.proxies = None
- # 针对Windows系统的异步事件规则/Asynchronous event rules for Windows systems
- if platform.system() == "Windows":
- asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
-
- """__________________________________________⬇️utils(实用程序)⬇️______________________________________"""
-
- # 检索字符串中的链接/Retrieve links from string
- @staticmethod
- def get_url(text: str) -> Union[str, None]:
- try:
- # 从输入文字中提取索引链接存入列表/Extract index links from input text and store in
- # list
- url = re.findall("http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+", text)
- # 判断是否有链接/Check if there is a link
- if len(url) > 0:
- return url[0]
- except Exception as e:
- if not quiet_mode:
- print("Error in get_url:", e)
- return None
-
- # 生成X-Bogus签名/Generate X-Bogus signature
- @staticmethod
- def generate_x_bogus_url(url: str, headers: dict) -> str:
- query = urllib.parse.urlparse(url).query
- xbogus = execjs.compile(open("./X-Bogus.js").read()).call("sign", query, headers["User-Agent"])
- new_url = url + "&X-Bogus=" + xbogus
- return new_url
-
- # 转换链接/convert url
- @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
- async def convert_share_urls(self, url: str) -> Union[str, None]:
- """
- 用于将分享链接(短链接)转换为原始链接/Convert share links (short links) to original links
- :return: 原始链接/Original link
- """
- # 检索字符串中的链接/Retrieve links from string
- url = self.get_url(url)
- # 判断是否有链接/Check if there is a link
- if url is None:
- if not quiet_mode:
- print("无法检索到链接/Unable to retrieve link")
- return None
- # 判断是否为抖音分享链接/judge if it is a douyin share link
- if "douyin" in url:
- """
- 抖音视频链接类型(不全):
- 1. https://v.douyin.com/MuKhKn3/
- 2. https://www.douyin.com/video/7157519152863890719
- 3. https://www.iesdouyin.com/share/video/7157519152863890719/?region=CN&mid=7157519152863890719&u_code=ffe6jgjg&titleType=title×tamp=1600000000&utm_source=copy_link&utm_campaign=client_share&utm_medium=android&app=aweme&iid=123456789&share_id=123456789
- 抖音用户链接类型(不全):
- 1. https://www.douyin.com/user/MS4wLjABAAAAbLMPpOhVk441et7z7ECGcmGrK42KtoWOuR0_7pLZCcyFheA9__asY-kGfNAtYqXR?relation=0&vid=7157519152863890719
- 2. https://v.douyin.com/MuKoFP4/
- 抖音直播链接类型(不全):
- 1. https://live.douyin.com/88815422890
- """
- if "v.douyin" in url:
- # 转换链接/convert url
- # 例子/Example: https://v.douyin.com/rLyAJgf/8.74
- url = re.compile(r"(https://v.douyin.com/)\w+", re.I).match(url).group()
- if not quiet_mode:
- print("正在通过抖音分享链接获取原始链接...")
- try:
- async with aiohttp.ClientSession() as session:
- async with session.get(url, headers=self.headers, proxy=self.proxies, allow_redirects=False, timeout=10) as response:
- if response.status == 302:
- url = (
- response.headers["Location"].split("?")[0]
- if "?" in response.headers["Location"]
- else response.headers["Location"]
- )
- if not quiet_mode:
- print("获取原始链接成功, 原始链接为: {}".format(url))
- return url
- except Exception as e:
- if not quiet_mode:
- print("获取原始链接失败!")
- print(e)
- # return None
- raise e
- else:
- if not quiet_mode:
- print("该链接为原始链接,无需转换,原始链接为: {}".format(url))
- return url
- # 判断是否为TikTok分享链接/judge if it is a TikTok share link
- elif "tiktok" in url:
- """
- TikTok视频链接类型(不全):
- 1. https://www.tiktok.com/@tiktok/video/6950000000000000000
- 2. https://www.tiktok.com/t/ZTRHcXS2C/
- TikTok用户链接类型(不全):
- 1. https://www.tiktok.com/@tiktok
- """
- if "@" in url:
- if not quiet_mode:
- print("该链接为原始链接,无需转换,原始链接为: {}".format(url))
- return url
- else:
- if not quiet_mode:
- print("正在通过TikTok分享链接获取原始链接...")
- try:
- async with aiohttp.ClientSession() as session:
- async with session.get(url, headers=self.headers, proxy=self.proxies, allow_redirects=False, timeout=10) as response:
- if response.status == 301:
- url = (
- response.headers["Location"].split("?")[0]
- if "?" in response.headers["Location"]
- else response.headers["Location"]
- )
- if not quiet_mode:
- print("获取原始链接成功, 原始链接为: {}".format(url))
- return url
- except Exception as e:
- if not quiet_mode:
- print("获取原始链接失败!")
- print(e)
- return None
-
- """__________________________________________⬇️Douyin methods(抖音方法)⬇️______________________________________"""
-
- """
- Credits: https://github.com/Johnserf-Seed
- [中文]
- 感谢John为本项目提供了非常多的帮助
- 大家可以去他的仓库点个star :)
- 顺便打个广告, 如果需要更稳定、快速、长期维护的抖音/TikTok API, 或者需要更多的数据(APP端),
- 请移步: https://api.tikhub.io
-
- [English]
- Thanks to John for providing a lot of help to this project
- You can go to his repository and give him a star :)
- By the way, if you need a more stable, fast and long-term maintenance Douyin/TikTok API, or need more data (APP side),
- Please go to: https://api.tikhub.io
- """
-
- # 生成抖音X-Bogus签名/Generate Douyin X-Bogus signature
- # 下面的代码不能保证稳定性,随时可能失效/ The code below cannot guarantee stability and may
- # fail at any time
- def generate_x_bogus_url(self, url: str) -> str:
- """
- 生成抖音X-Bogus签名
- :param url: 视频链接
- :return: 包含X-Bogus签名的URL
- """
- # 调用JavaScript函数
- query = urllib.parse.urlparse(url).query
- xbogus = execjs.compile(open("./X-Bogus.js").read()).call("sign", query, self.headers["User-Agent"])
- if not quiet_mode:
- print("生成的X-Bogus签名为: {}".format(xbogus))
- new_url = url + "&X-Bogus=" + xbogus
- return new_url
-
- # 获取抖音视频ID/Get Douyin video ID
- async def get_douyin_video_id(self, original_url: str) -> Union[str, None]:
- """
- 获取视频id
- :param original_url: 视频链接
- :return: 视频id
- """
- # 正则匹配出视频ID
- try:
- video_url = await self.convert_share_urls(original_url)
- # 链接类型:
- # 视频页 https://www.douyin.com/video/7086770907674348841
- if "/video/" in video_url:
- key = re.findall("/video/(\d+)?", video_url)[0]
- if not quiet_mode:
- print("获取到的抖音视频ID为: {}".format(key))
- return key
- # 发现页 https://www.douyin.com/discover?modal_id=7086770907674348841
- elif "discover?" in video_url:
- key = re.findall("modal_id=(\d+)", video_url)[0]
- if not quiet_mode:
- print("获取到的抖音视频ID为: {}".format(key))
- return key
- # 直播页
- elif "live.douyin" in video_url:
- # https://live.douyin.com/1000000000000000000
- video_url = video_url.split("?")[0] if "?" in video_url else video_url
- key = video_url.replace("https://live.douyin.com/", "")
- if not quiet_mode:
- print("获取到的抖音直播ID为: {}".format(key))
- return key
- # note
- elif "note" in video_url:
- # https://www.douyin.com/note/7086770907674348841
- key = re.findall("/note/(\d+)?", video_url)[0]
- if not quiet_mode:
- print("获取到的抖音笔记ID为: {}".format(key))
- return key
- except Exception as e:
- if not quiet_mode:
- print("获取抖音视频ID出错了:{}".format(e))
- return None
-
- # 获取单个抖音视频数据/Get single Douyin video data
- @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
- async def get_douyin_video_data(self, video_id: str) -> Union[dict, None]:
- """
- :param video_id: str - 抖音视频id
- :return:dict - 包含信息的字典
- """
- if not quiet_mode:
- print("正在获取抖音视频数据...")
- try:
- # 构造访问链接/Construct the access link
- api_url = f"https://www.douyin.com/aweme/v1/web/aweme/detail/?device_platform=webapp&aid=6383&channel=channel_pc_web&aweme_id={video_id}&pc_client_type=1&version_code=190500&version_name=19.5.0&cookie_enabled=true&screen_width=1344&screen_height=756&browser_language=zh-CN&browser_platform=Win32&browser_name=Firefox&browser_version=110.0&browser_online=true&engine_name=Gecko&engine_version=109.0&os_name=Windows&os_version=10&cpu_core_num=16&device_memory=&platform=PC&webid=7158288523463362079&msToken=abL8SeUTPa9-EToD8qfC7toScSADxpg6yLh2dbNcpWHzE0bT04txM_4UwquIcRvkRb9IU8sifwgM1Kwf1Lsld81o9Irt2_yNyUbbQPSUO8EfVlZJ_78FckDFnwVBVUVK"
- api_url = self.generate_x_bogus_url(api_url)
- # 访问API/Access API
- if not quiet_mode:
- print("正在获取视频数据API: {}".format(api_url))
- async with aiohttp.ClientSession() as session:
- async with session.get(api_url, headers=self.douyin_api_headers, proxy=self.proxies, timeout=10) as response:
- response = await response.json()
- # 获取视频数据/Get video data
- video_data = response["aweme_detail"]
- if not quiet_mode:
- print("获取视频数据成功!")
- # print("抖音API返回数据: {}".format(video_data))
- return video_data
- except Exception as e:
- if not quiet_mode:
- print("获取抖音视频数据失败!原因:{}".format(e))
- # return None
- raise e
-
- # 获取单个抖音直播视频数据/Get single Douyin Live video data
- # 暂时不可用,待修复。
- @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
- async def get_douyin_live_video_data(self, web_rid: str) -> Union[dict, None]:
- if not quiet_mode:
- print("正在获取抖音视频数据...")
- try:
- # 构造访问链接/Construct the access link
- api_url = f"https://live.douyin.com/webcast/web/enter/?aid=6383&web_rid={web_rid}"
- # 访问API/Access API
- if not quiet_mode:
- print("正在获取视频数据API: {}".format(api_url))
- async with aiohttp.ClientSession() as session:
- async with session.get(api_url, headers=self.douyin_api_headers, proxy=self.proxies, timeout=10) as response:
- response = await response.json()
- # 获取视频数据/Get video data
- video_data = response["data"]
- if not quiet_mode:
- print(video_data)
- print("获取视频数据成功!")
- # print("抖音API返回数据: {}".format(video_data))
- return video_data
- except Exception as e:
- if not quiet_mode:
- print("获取抖音视频数据失败!原因:{}".format(e))
- # return None
- raise e
-
- # 获取单个抖音视频数据/Get single Douyin video data
- @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
- async def get_douyin_user_profile_videos(self, profile_url: str, tikhub_token: str) -> Union[dict, None]:
- try:
- api_url = f"https://api.tikhub.io/douyin_profile_videos/?douyin_profile_url={profile_url}&cursor=0&count=20"
- _headers = {"Authorization": f"Bearer {tikhub_token}"}
- async with aiohttp.ClientSession() as session:
- async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
- response = await response.json()
- return response
- except Exception as e:
- if not quiet_mode:
- print("获取抖音视频数据失败!原因:{}".format(e))
- # return None
- raise e
-
- # 获取抖音主页点赞视频数据/Get Douyin profile like video data
- @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
- async def get_douyin_profile_liked_data(self, profile_url: str, tikhub_token: str) -> Union[dict, None]:
- try:
- api_url = f"https://api.tikhub.io/douyin_profile_liked_videos/?douyin_profile_url={profile_url}&cursor=0&count=20"
- _headers = {"Authorization": f"Bearer {tikhub_token}"}
- async with aiohttp.ClientSession() as session:
- async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
- response = await response.json()
- return response
- except Exception as e:
- if not quiet_mode:
- print("获取抖音视频数据失败!原因:{}".format(e))
- # return None
- raise e
-
- # 获取抖音视频评论数据/Get Douyin video comment data
- @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
- async def get_douyin_video_comments(self, video_url: str, tikhub_token: str) -> Union[dict, None]:
- try:
- api_url = f"https://api.tikhub.io/douyin_video_comments/?douyin_video_url={video_url}&cursor=0&count=20"
- _headers = {"Authorization": f"Bearer {tikhub_token}"}
- async with aiohttp.ClientSession() as session:
- async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
- response = await response.json()
- return response
- except Exception as e:
- if not quiet_mode:
- print("获取抖音视频数据失败!原因:{}".format(e))
- # return None
- raise e
-
- """__________________________________________⬇️TikTok methods(TikTok方法)⬇️______________________________________"""
-
- # 获取TikTok视频ID/Get TikTok video ID
- async def get_tiktok_video_id(self, original_url: str) -> Union[str, None]:
- """
- 获取视频id
- :param original_url: 视频链接
- :return: 视频id
- """
- try:
- # 转换链接/Convert link
- original_url = await self.convert_share_urls(original_url)
- # 获取视频ID/Get video ID
- if "/video/" in original_url:
- video_id = re.findall("/video/(\d+)", original_url)[0]
- elif "/v/" in original_url:
- video_id = re.findall("/v/(\d+)", original_url)[0]
- if not quiet_mode:
- print("获取到的TikTok视频ID是{}".format(video_id))
- # 返回视频ID/Return video ID
- return video_id
- except Exception as e:
- if not quiet_mode:
- print("获取TikTok视频ID出错了:{}".format(e))
- return None
-
- @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
- async def get_tiktok_video_data(self, video_id: str) -> Union[dict, None]:
- """
- 获取单个视频信息
- :param video_id: 视频id
- :return: 视频信息
- """
- if not quiet_mode:
- print("正在获取TikTok视频数据...")
- try:
- # 构造访问链接/Construct the access link
- api_url = f"https://api16-normal-c-useast1a.tiktokv.com/aweme/v1/feed/?aweme_id={video_id}"
- if not quiet_mode:
- print("正在获取视频数据API: {}".format(api_url))
- async with aiohttp.ClientSession() as session:
- async with session.get(api_url, headers=self.tiktok_api_headers, proxy=self.proxies, timeout=10) as response:
- response = await response.json()
- video_data = response["aweme_list"][0]
- if not quiet_mode:
- print("获取视频信息成功!")
- return video_data
- except Exception as e:
- if not quiet_mode:
- print("获取视频信息失败!原因:{}".format(e))
- # return None
- raise e
-
- @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
- async def get_tiktok_user_profile_videos(self, tiktok_video_url: str, tikhub_token: str) -> Union[dict, None]:
- try:
- api_url = f"https://api.tikhub.io/tiktok_profile_videos/?tiktok_video_url={tiktok_video_url}&cursor=0&count=20"
- _headers = {"Authorization": f"Bearer {tikhub_token}"}
- async with aiohttp.ClientSession() as session:
- async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
- response = await response.json()
- return response
- except Exception as e:
- if not quiet_mode:
- print("获取抖音视频数据失败!原因:{}".format(e))
- # return None
- raise e
-
- @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
- async def get_tiktok_user_profile_liked_videos(self, tiktok_video_url: str, tikhub_token: str) -> Union[dict, None]:
- try:
- api_url = f"https://api.tikhub.io/tiktok_profile_liked_videos/?tiktok_video_url={tiktok_video_url}&cursor=0&count=20"
- _headers = {"Authorization": f"Bearer {tikhub_token}"}
- async with aiohttp.ClientSession() as session:
- async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
- response = await response.json()
- return response
- except Exception as e:
- if not quiet_mode:
- print("获取抖音视频数据失败!原因:{}".format(e))
- # return None
- raise e
-
- """__________________________________________⬇️Hybrid methods(混合方法)⬇️______________________________________"""
-
- # 自定义获取数据/Custom data acquisition
- async def hybrid_parsing(self, video_url: str) -> dict:
- # URL平台判断/Judge URL platform
- url_platform = "douyin" if "douyin" in video_url else "tiktok"
- if not quiet_mode:
- print("当前链接平台为:{}".format(url_platform))
- # 获取视频ID/Get video ID
- print("正在获取视频ID...")
- video_id = await self.get_douyin_video_id(video_url) if url_platform == "douyin" else await self.get_tiktok_video_id(video_url)
- if video_id:
- if not quiet_mode:
- print("获取视频ID成功,视频ID为:{}".format(video_id))
- # 获取视频数据/Get video data
- print("正在获取视频数据...")
- data = await self.get_douyin_video_data(video_id) if url_platform == "douyin" else await self.get_tiktok_video_data(video_id)
- if data:
- if not quiet_mode:
- print("获取视频数据成功,正在判断数据类型...")
- url_type_code = data["aweme_type"]
- """以下为抖音/TikTok类型代码/Type code for Douyin/TikTok"""
- url_type_code_dict = {
- # 抖音/Douyin
- 2: "image",
- 4: "video",
- 68: "image",
- # TikTok
- 0: "video",
- 51: "video",
- 55: "video",
- 58: "video",
- 61: "video",
- 150: "image",
- }
- # 获取视频类型/Get video type
- # 如果类型代码不存在,则默认为视频类型/If the type code does not exist, it is
- # assumed to be a video type
- url_type = url_type_code_dict.get(url_type_code, "video")
- if not quiet_mode:
- print("数据类型代码: {}".format(url_type_code))
- # 判断链接类型/Judge link type
-
- print("数据类型: {}".format(url_type))
- print("准备开始判断并处理数据...")
-
- """
- 以下为(视频||图片)数据处理的四个方法,如果你需要自定义数据处理请在这里修改.
- The following are four methods of (video || image) data processing.
- If you need to customize data processing, please modify it here.
- """
-
- """
- 创建已知数据字典(索引相同),稍后使用.update()方法更新数据
- Create a known data dictionary (index the same),
- and then use the .update() method to update the data
- """
-
- result_data = {
- "status": "success",
- "message": "更多接口请查看(More API see): https://api.tikhub.io/docs",
- "type": url_type,
- "platform": url_platform,
- "aweme_id": video_id,
- "official_api_url": {
- "User-Agent": self.headers["User-Agent"],
- "api_url": f"https://www.iesdouyin.com/aweme/v1/web/aweme/detail/?aweme_id={video_id}&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333&Github=Evil0ctal&words=FXXK_U_ByteDance",
- }
- if url_platform == "douyin"
- else {
- "User-Agent": self.tiktok_api_headers["User-Agent"],
- "api_url": f"https://api16-normal-c-useast1a.tiktokv.com/aweme/v1/feed/?aweme_id={video_id}",
- },
- "desc": data.get("desc"),
- "create_time": data.get("create_time"),
- "author": data.get("author"),
- "music": data.get("music"),
- "statistics": data.get("statistics"),
- "cover_data": {
- "cover": data.get("video").get("cover"),
- "origin_cover": data.get("video").get("origin_cover"),
- "dynamic_cover": data.get("video").get("dynamic_cover"),
- },
- "hashtags": data.get("text_extra"),
- }
- # 创建一个空变量,稍后使用.update()方法更新数据/Create an empty variable and use
- # the .update() method to update the data
- api_data = None
- # 判断链接类型并处理数据/Judge link type and process data
- try:
- # 抖音数据处理/Douyin data processing
- if url_platform == "douyin":
- # 抖音视频数据处理/Douyin video data processing
- if url_type == "video":
- if not quiet_mode:
- print("正在处理抖音视频数据...")
- # 将信息储存在字典中/Store information in a dictionary
- uri = data["video"]["play_addr"]["uri"]
- wm_video_url = data["video"]["play_addr"]["url_list"][0]
- wm_video_url_HQ = f"https://aweme.snssdk.com/aweme/v1/playwm/?video_id={uri}&radio=1080p&line=0"
- nwm_video_url = wm_video_url.replace("playwm", "play")
- nwm_video_url_HQ = f"https://aweme.snssdk.com/aweme/v1/play/?video_id={uri}&ratio=1080p&line=0"
- api_data = {
- "video_data": {
- "wm_video_url": wm_video_url,
- "wm_video_url_HQ": wm_video_url_HQ,
- "nwm_video_url": nwm_video_url,
- "nwm_video_url_HQ": nwm_video_url_HQ,
- }
- }
- # 抖音图片数据处理/Douyin image data processing
- elif url_type == "image":
- if not quiet_mode:
- print("正在处理抖音图片数据...")
- # 无水印图片列表/No watermark image list
- no_watermark_image_list = []
- # 有水印图片列表/With watermark image list
- watermark_image_list = []
- # 遍历图片列表/Traverse image list
- for i in data["images"]:
- no_watermark_image_list.append(i["url_list"][0])
- watermark_image_list.append(i["download_url_list"][0])
- api_data = {
- "image_data": {"no_watermark_image_list": no_watermark_image_list, "watermark_image_list": watermark_image_list}
- }
- # TikTok数据处理/TikTok data processing
- elif url_platform == "tiktok":
- # TikTok视频数据处理/TikTok video data processing
- if url_type == "video":
- if not quiet_mode:
- print("正在处理TikTok视频数据...")
- # 将信息储存在字典中/Store information in a dictionary
- wm_video = data["video"]["download_addr"]["url_list"][0]
- api_data = {
- "video_data": {
- "wm_video_url": wm_video,
- "wm_video_url_HQ": wm_video,
- "nwm_video_url": data["video"]["play_addr"]["url_list"][0],
- "nwm_video_url_HQ": data["video"]["bit_rate"][0]["play_addr"]["url_list"][0],
- }
- }
- # TikTok图片数据处理/TikTok image data processing
- elif url_type == "image":
- if not quiet_mode:
- print("正在处理TikTok图片数据...")
- # 无水印图片列表/No watermark image list
- no_watermark_image_list = []
- # 有水印图片列表/With watermark image list
- watermark_image_list = []
- for i in data["image_post_info"]["images"]:
- no_watermark_image_list.append(i["display_image"]["url_list"][0])
- watermark_image_list.append(i["owner_watermark_image"]["url_list"][0])
- api_data = {
- "image_data": {"no_watermark_image_list": no_watermark_image_list, "watermark_image_list": watermark_image_list}
- }
- # 更新数据/Update data
- result_data.update(api_data)
- # print("数据处理完成,最终数据: \n{}".format(result_data))
- # 返回数据/Return data
- return result_data
- except Exception as e:
- if not quiet_mode:
- traceback.print_exc()
- print("数据处理失败!")
- return {"status": "failed", "message": "数据处理失败!/Data processing failed!"}
- else:
- if not quiet_mode:
- print("[抖音|TikTok方法]返回数据为空,无法处理!")
- return {"status": "failed", "message": "返回数据为空,无法处理!/Return data is empty and cannot be processed!"}
- else:
- if not quiet_mode:
- print("获取视频ID失败!")
- return {"status": "failed", "message": "获取视频ID失败!/Failed to get video ID!"}
-
- # 处理数据方便快捷指令使用/Process data for easy-to-use shortcuts
- @staticmethod
- def hybrid_parsing_minimal(data: dict) -> dict:
- # 如果数据获取成功/If the data is successfully obtained
- if data["status"] == "success":
- result = {
- "status": "success",
- "message": data.get("message"),
- "platform": data.get("platform"),
- "type": data.get("type"),
- "desc": data.get("desc"),
- "wm_video_url": data["video_data"]["wm_video_url"] if data["type"] == "video" else None,
- "wm_video_url_HQ": data["video_data"]["wm_video_url_HQ"] if data["type"] == "video" else None,
- "nwm_video_url": data["video_data"]["nwm_video_url"] if data["type"] == "video" else None,
- "nwm_video_url_HQ": data["video_data"]["nwm_video_url_HQ"] if data["type"] == "video" else None,
- "no_watermark_image_list": data["image_data"]["no_watermark_image_list"] if data["type"] == "image" else None,
- "watermark_image_list": data["image_data"]["watermark_image_list"] if data["type"] == "image" else None,
- }
- return result
- else:
- return data
-
-
-"""__________________________________________⬇️Test methods(测试方法)⬇️______________________________________"""
-
-
-async def async_test(_douyin_url: str = None, _tiktok_url: str = None) -> None:
- # 异步测试/Async test
- start_time = time.time()
- print("正在进行异步测试...")
-
- print("正在测试异步获取抖音视频ID方法...")
- douyin_id = await api.get_douyin_video_id(_douyin_url)
- print("正在测试异步获取抖音视频数据方法...")
- await api.get_douyin_video_data(douyin_id)
-
- print("正在测试异步获取TikTok视频ID方法...")
- tiktok_id = await api.get_tiktok_video_id(_tiktok_url)
- print("正在测试异步获取TikTok视频数据方法...")
- await api.get_tiktok_video_data(tiktok_id)
-
- print("正在测试异步混合解析方法...")
- await api.hybrid_parsing(_douyin_url)
- await api.hybrid_parsing(_tiktok_url)
-
- # 总耗时/Total time
- total_time = round(time.time() - start_time, 2)
- print("异步测试完成,总耗时: {}s".format(total_time))
-
-
-if __name__ == "__main__":
- api = Scraper()
- # 运行测试
- # params = "device_platform=webapp&aid=6383&channel=channel_pc_web&aweme_id=7153585499477757192&pc_client_type=1&version_code=190500&version_name=19.5.0&cookie_enabled=true&screen_width=1344&screen_height=756&browser_language=zh-CN&browser_platform=Win32&browser_name=Firefox&browser_version=110.0&browser_online=true&engine_name=Gecko&engine_version=109.0&os_name=Windows&os_version=10&cpu_core_num=16&device_memory=&platform=PC&webid=7158288523463362079"
- # api.generate_x_bogus(params)
- douyin_url = "https://v.douyin.com/rLyrQxA/6.66"
- tiktok_url = "https://vt.tiktok.com/ZSRwWXtdr/"
- asyncio.run(async_test(_douyin_url=douyin_url, _tiktok_url=tiktok_url))
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+# @Author: https://github.com/Evil0ctal/
+# @Time: 2021/11/06
+# @Update: 2023/03/08
+# @Version: 3.3.0
+# @Function:
+# 核心代码,估值1块(๑•̀ㅂ•́)و✧
+# 用于爬取Douyin/TikTok数据并以字典形式返回。
+# input link, output dictionary.
+
+
+import asyncio
+import configparser
+import os
+import platform
+import re
+import time
+import traceback
+import urllib.parse
+from typing import Union
+
+import aiohttp
+import execjs
+from tenacity import *
+
+quiet_mode = False
+
+
+class Scraper:
+ """__________________________________________⬇️initialization(初始化)⬇️______________________________________"""
+
+ # 初始化/initialization
+ def __init__(self, quiet: bool = False):
+ global quiet_mode
+ quiet_mode = quiet
+ self.headers = {
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36"
+ }
+ self.douyin_api_headers = {
+ "accept-encoding": "gzip, deflate, br",
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
+ "referer": "https://www.douyin.com/",
+ # 'cookie': "s_v_web_id=verify_leytkxgn_kvO5kOmO_SdMs_4t1o_B5ml_BUqtWM1mP6BF;"
+ }
+ self.tiktok_api_headers = {
+ "User-Agent": "com.ss.android.ugc.trill/494+Mozilla/5.0+(Linux;+Android+12;+2112123G+Build/SKQ1.211006.001;+wv)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Version/4.0+Chrome/107.0.5304.105+Mobile+Safari/537.36"
+ }
+ # 判断配置文件是否存在/Check if the configuration file exists
+ if os.path.exists("config.ini"):
+ self.config = configparser.ConfigParser()
+ self.config.read("config.ini", encoding="utf-8")
+ # 判断是否使用代理
+ if self.config["Scraper"]["Proxy_switch"] == "True":
+ # 判断是否区别协议选择代理
+ if self.config["Scraper"]["Use_different_protocols"] == "False":
+ self.proxies = {"all": self.config["Scraper"]["All"]}
+ else:
+ self.proxies = {"http": self.config["Scraper"]["Http_proxy"], "https": self.config["Scraper"]["Https_proxy"]}
+ else:
+ self.proxies = None
+ # 配置文件不存在则不使用代理/If the configuration file does not exist, do not use
+ # the proxy
+ else:
+ self.proxies = None
+ # 针对Windows系统的异步事件规则/Asynchronous event rules for Windows systems
+ if platform.system() == "Windows":
+ asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
+
+ """__________________________________________⬇️utils(实用程序)⬇️______________________________________"""
+
+ # 检索字符串中的链接/Retrieve links from string
+ @staticmethod
+ def get_url(text: str) -> Union[str, None]:
+ try:
+ # 从输入文字中提取索引链接存入列表/Extract index links from input text and store in
+ # list
+ url = re.findall("http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+", text)
+ # 判断是否有链接/Check if there is a link
+ if len(url) > 0:
+ return url[0]
+ except Exception as e:
+ if not quiet_mode:
+ print("Error in get_url:", e)
+ return None
+
+ # 生成X-Bogus签名/Generate X-Bogus signature
+ @staticmethod
+ def generate_x_bogus_url(url: str, headers: dict) -> str:
+ query = urllib.parse.urlparse(url).query
+ xbogus = execjs.compile(open("./X-Bogus.js").read()).call("sign", query, headers["User-Agent"])
+ new_url = url + "&X-Bogus=" + xbogus
+ return new_url
+
+ # 转换链接/convert url
+ @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
+ async def convert_share_urls(self, url: str) -> Union[str, None]:
+ """
+ 用于将分享链接(短链接)转换为原始链接/Convert share links (short links) to original links
+ :return: 原始链接/Original link
+ """
+ # 检索字符串中的链接/Retrieve links from string
+ url = self.get_url(url)
+ # 判断是否有链接/Check if there is a link
+ if url is None:
+ if not quiet_mode:
+ print("无法检索到链接/Unable to retrieve link")
+ return None
+ # 判断是否为抖音分享链接/judge if it is a douyin share link
+ if "douyin" in url:
+ """
+ 抖音视频链接类型(不全):
+ 1. https://v.douyin.com/MuKhKn3/
+ 2. https://www.douyin.com/video/7157519152863890719
+ 3. https://www.iesdouyin.com/share/video/7157519152863890719/?region=CN&mid=7157519152863890719&u_code=ffe6jgjg&titleType=title×tamp=1600000000&utm_source=copy_link&utm_campaign=client_share&utm_medium=android&app=aweme&iid=123456789&share_id=123456789
+ 抖音用户链接类型(不全):
+ 1. https://www.douyin.com/user/MS4wLjABAAAAbLMPpOhVk441et7z7ECGcmGrK42KtoWOuR0_7pLZCcyFheA9__asY-kGfNAtYqXR?relation=0&vid=7157519152863890719
+ 2. https://v.douyin.com/MuKoFP4/
+ 抖音直播链接类型(不全):
+ 1. https://live.douyin.com/88815422890
+ """
+ if "v.douyin" in url:
+ # 转换链接/convert url
+ # 例子/Example: https://v.douyin.com/rLyAJgf/8.74
+ url = re.compile(r"(https://v.douyin.com/)\w+", re.I).match(url).group()
+ if not quiet_mode:
+ print("正在通过抖音分享链接获取原始链接...")
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, headers=self.headers, proxy=self.proxies, allow_redirects=False, timeout=10) as response:
+ if response.status == 302:
+ url = (
+ response.headers["Location"].split("?")[0]
+ if "?" in response.headers["Location"]
+ else response.headers["Location"]
+ )
+ if not quiet_mode:
+ print("获取原始链接成功, 原始链接为: {}".format(url))
+ return url
+ except Exception as e:
+ if not quiet_mode:
+ print("获取原始链接失败!")
+ print(e)
+ # return None
+ raise e
+ else:
+ if not quiet_mode:
+ print("该链接为原始链接,无需转换,原始链接为: {}".format(url))
+ return url
+ # 判断是否为TikTok分享链接/judge if it is a TikTok share link
+ elif "tiktok" in url:
+ """
+ TikTok视频链接类型(不全):
+ 1. https://www.tiktok.com/@tiktok/video/6950000000000000000
+ 2. https://www.tiktok.com/t/ZTRHcXS2C/
+ TikTok用户链接类型(不全):
+ 1. https://www.tiktok.com/@tiktok
+ """
+ if "@" in url:
+ if not quiet_mode:
+ print("该链接为原始链接,无需转换,原始链接为: {}".format(url))
+ return url
+ else:
+ if not quiet_mode:
+ print("正在通过TikTok分享链接获取原始链接...")
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, headers=self.headers, proxy=self.proxies, allow_redirects=False, timeout=10) as response:
+ if response.status == 301:
+ url = (
+ response.headers["Location"].split("?")[0]
+ if "?" in response.headers["Location"]
+ else response.headers["Location"]
+ )
+ if not quiet_mode:
+ print("获取原始链接成功, 原始链接为: {}".format(url))
+ return url
+ except Exception as e:
+ if not quiet_mode:
+ print("获取原始链接失败!")
+ print(e)
+ return None
+
+ """__________________________________________⬇️Douyin methods(抖音方法)⬇️______________________________________"""
+
+ """
+ Credits: https://github.com/Johnserf-Seed
+ [中文]
+ 感谢John为本项目提供了非常多的帮助
+ 大家可以去他的仓库点个star :)
+ 顺便打个广告, 如果需要更稳定、快速、长期维护的抖音/TikTok API, 或者需要更多的数据(APP端),
+ 请移步: https://api.tikhub.io
+
+ [English]
+ Thanks to John for providing a lot of help to this project
+ You can go to his repository and give him a star :)
+ By the way, if you need a more stable, fast and long-term maintenance Douyin/TikTok API, or need more data (APP side),
+ Please go to: https://api.tikhub.io
+ """
+
+ # 生成抖音X-Bogus签名/Generate Douyin X-Bogus signature
+ # 下面的代码不能保证稳定性,随时可能失效/ The code below cannot guarantee stability and may
+ # fail at any time
+ def generate_x_bogus_url(self, url: str) -> str:
+ """
+ 生成抖音X-Bogus签名
+ :param url: 视频链接
+ :return: 包含X-Bogus签名的URL
+ """
+ # 调用JavaScript函数
+ query = urllib.parse.urlparse(url).query
+ xbogus = execjs.compile(open("./X-Bogus.js").read()).call("sign", query, self.headers["User-Agent"])
+ if not quiet_mode:
+ print("生成的X-Bogus签名为: {}".format(xbogus))
+ new_url = url + "&X-Bogus=" + xbogus
+ return new_url
+
+ # 获取抖音视频ID/Get Douyin video ID
+ async def get_douyin_video_id(self, original_url: str) -> Union[str, None]:
+ """
+ 获取视频id
+ :param original_url: 视频链接
+ :return: 视频id
+ """
+ # 正则匹配出视频ID
+ try:
+ video_url = await self.convert_share_urls(original_url)
+ # 链接类型:
+ # 视频页 https://www.douyin.com/video/7086770907674348841
+ if "/video/" in video_url:
+ key = re.findall("/video/(\d+)?", video_url)[0]
+ if not quiet_mode:
+ print("获取到的抖音视频ID为: {}".format(key))
+ return key
+ # 发现页 https://www.douyin.com/discover?modal_id=7086770907674348841
+ elif "discover?" in video_url:
+ key = re.findall("modal_id=(\d+)", video_url)[0]
+ if not quiet_mode:
+ print("获取到的抖音视频ID为: {}".format(key))
+ return key
+ # 直播页
+ elif "live.douyin" in video_url:
+ # https://live.douyin.com/1000000000000000000
+ video_url = video_url.split("?")[0] if "?" in video_url else video_url
+ key = video_url.replace("https://live.douyin.com/", "")
+ if not quiet_mode:
+ print("获取到的抖音直播ID为: {}".format(key))
+ return key
+ # note
+ elif "note" in video_url:
+ # https://www.douyin.com/note/7086770907674348841
+ key = re.findall("/note/(\d+)?", video_url)[0]
+ if not quiet_mode:
+ print("获取到的抖音笔记ID为: {}".format(key))
+ return key
+ except Exception as e:
+ if not quiet_mode:
+ print("获取抖音视频ID出错了:{}".format(e))
+ return None
+
+ # 获取单个抖音视频数据/Get single Douyin video data
+ @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
+ async def get_douyin_video_data(self, video_id: str) -> Union[dict, None]:
+ """
+ :param video_id: str - 抖音视频id
+ :return:dict - 包含信息的字典
+ """
+ if not quiet_mode:
+ print("正在获取抖音视频数据...")
+ try:
+ # 构造访问链接/Construct the access link
+ api_url = f"https://www.douyin.com/aweme/v1/web/aweme/detail/?device_platform=webapp&aid=6383&channel=channel_pc_web&aweme_id={video_id}&pc_client_type=1&version_code=190500&version_name=19.5.0&cookie_enabled=true&screen_width=1344&screen_height=756&browser_language=zh-CN&browser_platform=Win32&browser_name=Firefox&browser_version=110.0&browser_online=true&engine_name=Gecko&engine_version=109.0&os_name=Windows&os_version=10&cpu_core_num=16&device_memory=&platform=PC&webid=7158288523463362079&msToken=abL8SeUTPa9-EToD8qfC7toScSADxpg6yLh2dbNcpWHzE0bT04txM_4UwquIcRvkRb9IU8sifwgM1Kwf1Lsld81o9Irt2_yNyUbbQPSUO8EfVlZJ_78FckDFnwVBVUVK"
+ api_url = self.generate_x_bogus_url(api_url)
+ # 访问API/Access API
+ if not quiet_mode:
+ print("正在获取视频数据API: {}".format(api_url))
+ async with aiohttp.ClientSession() as session:
+ async with session.get(api_url, headers=self.douyin_api_headers, proxy=self.proxies, timeout=10) as response:
+ response = await response.json()
+ # 获取视频数据/Get video data
+ video_data = response["aweme_detail"]
+ if not quiet_mode:
+ print("获取视频数据成功!")
+ # print("抖音API返回数据: {}".format(video_data))
+ return video_data
+ except Exception as e:
+ if not quiet_mode:
+ print("获取抖音视频数据失败!原因:{}".format(e))
+ # return None
+ raise e
+
+ # 获取单个抖音直播视频数据/Get single Douyin Live video data
+ # 暂时不可用,待修复。
+ @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
+ async def get_douyin_live_video_data(self, web_rid: str) -> Union[dict, None]:
+ if not quiet_mode:
+ print("正在获取抖音视频数据...")
+ try:
+ # 构造访问链接/Construct the access link
+ api_url = f"https://live.douyin.com/webcast/web/enter/?aid=6383&web_rid={web_rid}"
+ # 访问API/Access API
+ if not quiet_mode:
+ print("正在获取视频数据API: {}".format(api_url))
+ async with aiohttp.ClientSession() as session:
+ async with session.get(api_url, headers=self.douyin_api_headers, proxy=self.proxies, timeout=10) as response:
+ response = await response.json()
+ # 获取视频数据/Get video data
+ video_data = response["data"]
+ if not quiet_mode:
+ print(video_data)
+ print("获取视频数据成功!")
+ # print("抖音API返回数据: {}".format(video_data))
+ return video_data
+ except Exception as e:
+ if not quiet_mode:
+ print("获取抖音视频数据失败!原因:{}".format(e))
+ # return None
+ raise e
+
+ # 获取单个抖音视频数据/Get single Douyin video data
+ @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
+ async def get_douyin_user_profile_videos(self, profile_url: str, tikhub_token: str) -> Union[dict, None]:
+ try:
+ api_url = f"https://api.tikhub.io/douyin_profile_videos/?douyin_profile_url={profile_url}&cursor=0&count=20"
+ _headers = {"Authorization": f"Bearer {tikhub_token}"}
+ async with aiohttp.ClientSession() as session:
+ async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
+ response = await response.json()
+ return response
+ except Exception as e:
+ if not quiet_mode:
+ print("获取抖音视频数据失败!原因:{}".format(e))
+ # return None
+ raise e
+
+ # 获取抖音主页点赞视频数据/Get Douyin profile like video data
+ @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
+ async def get_douyin_profile_liked_data(self, profile_url: str, tikhub_token: str) -> Union[dict, None]:
+ try:
+ api_url = f"https://api.tikhub.io/douyin_profile_liked_videos/?douyin_profile_url={profile_url}&cursor=0&count=20"
+ _headers = {"Authorization": f"Bearer {tikhub_token}"}
+ async with aiohttp.ClientSession() as session:
+ async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
+ response = await response.json()
+ return response
+ except Exception as e:
+ if not quiet_mode:
+ print("获取抖音视频数据失败!原因:{}".format(e))
+ # return None
+ raise e
+
+ # 获取抖音视频评论数据/Get Douyin video comment data
+ @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
+ async def get_douyin_video_comments(self, video_url: str, tikhub_token: str) -> Union[dict, None]:
+ try:
+ api_url = f"https://api.tikhub.io/douyin_video_comments/?douyin_video_url={video_url}&cursor=0&count=20"
+ _headers = {"Authorization": f"Bearer {tikhub_token}"}
+ async with aiohttp.ClientSession() as session:
+ async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
+ response = await response.json()
+ return response
+ except Exception as e:
+ if not quiet_mode:
+ print("获取抖音视频数据失败!原因:{}".format(e))
+ # return None
+ raise e
+
+ """__________________________________________⬇️TikTok methods(TikTok方法)⬇️______________________________________"""
+
+ # 获取TikTok视频ID/Get TikTok video ID
+ async def get_tiktok_video_id(self, original_url: str) -> Union[str, None]:
+ """
+ 获取视频id
+ :param original_url: 视频链接
+ :return: 视频id
+ """
+ try:
+ # 转换链接/Convert link
+ original_url = await self.convert_share_urls(original_url)
+ # 获取视频ID/Get video ID
+ if "/video/" in original_url:
+ video_id = re.findall("/video/(\d+)", original_url)[0]
+ elif "/v/" in original_url:
+ video_id = re.findall("/v/(\d+)", original_url)[0]
+ if not quiet_mode:
+ print("获取到的TikTok视频ID是{}".format(video_id))
+ # 返回视频ID/Return video ID
+ return video_id
+ except Exception as e:
+ if not quiet_mode:
+ print("获取TikTok视频ID出错了:{}".format(e))
+ return None
+
+ @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
+ async def get_tiktok_video_data(self, video_id: str) -> Union[dict, None]:
+ """
+ 获取单个视频信息
+ :param video_id: 视频id
+ :return: 视频信息
+ """
+ if not quiet_mode:
+ print("正在获取TikTok视频数据...")
+ try:
+ # 构造访问链接/Construct the access link
+ api_url = f"https://api16-normal-c-useast1a.tiktokv.com/aweme/v1/feed/?aweme_id={video_id}"
+ if not quiet_mode:
+ print("正在获取视频数据API: {}".format(api_url))
+ async with aiohttp.ClientSession() as session:
+ async with session.get(api_url, headers=self.tiktok_api_headers, proxy=self.proxies, timeout=10) as response:
+ response = await response.json()
+ video_data = response["aweme_list"][0]
+ if not quiet_mode:
+ print("获取视频信息成功!")
+ return video_data
+ except Exception as e:
+ if not quiet_mode:
+ print("获取视频信息失败!原因:{}".format(e))
+ # return None
+ raise e
+
+ @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
+ async def get_tiktok_user_profile_videos(self, tiktok_video_url: str, tikhub_token: str) -> Union[dict, None]:
+ try:
+ api_url = f"https://api.tikhub.io/tiktok_profile_videos/?tiktok_video_url={tiktok_video_url}&cursor=0&count=20"
+ _headers = {"Authorization": f"Bearer {tikhub_token}"}
+ async with aiohttp.ClientSession() as session:
+ async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
+ response = await response.json()
+ return response
+ except Exception as e:
+ if not quiet_mode:
+ print("获取抖音视频数据失败!原因:{}".format(e))
+ # return None
+ raise e
+
+ @retry(stop=stop_after_attempt(4), wait=wait_fixed(7))
+ async def get_tiktok_user_profile_liked_videos(self, tiktok_video_url: str, tikhub_token: str) -> Union[dict, None]:
+ try:
+ api_url = f"https://api.tikhub.io/tiktok_profile_liked_videos/?tiktok_video_url={tiktok_video_url}&cursor=0&count=20"
+ _headers = {"Authorization": f"Bearer {tikhub_token}"}
+ async with aiohttp.ClientSession() as session:
+ async with session.get(api_url, headers=_headers, proxy=self.proxies, timeout=10) as response:
+ response = await response.json()
+ return response
+ except Exception as e:
+ if not quiet_mode:
+ print("获取抖音视频数据失败!原因:{}".format(e))
+ # return None
+ raise e
+
+ """__________________________________________⬇️Hybrid methods(混合方法)⬇️______________________________________"""
+
+ # 自定义获取数据/Custom data acquisition
+ async def hybrid_parsing(self, video_url: str) -> dict:
+ # URL平台判断/Judge URL platform
+ url_platform = "douyin" if "douyin" in video_url else "tiktok"
+ if not quiet_mode:
+ print("当前链接平台为:{}".format(url_platform))
+ # 获取视频ID/Get video ID
+ print("正在获取视频ID...")
+ video_id = await self.get_douyin_video_id(video_url) if url_platform == "douyin" else await self.get_tiktok_video_id(video_url)
+ if video_id:
+ if not quiet_mode:
+ print("获取视频ID成功,视频ID为:{}".format(video_id))
+ # 获取视频数据/Get video data
+ print("正在获取视频数据...")
+ data = await self.get_douyin_video_data(video_id) if url_platform == "douyin" else await self.get_tiktok_video_data(video_id)
+ if data:
+ if not quiet_mode:
+ print("获取视频数据成功,正在判断数据类型...")
+ url_type_code = data["aweme_type"]
+ """以下为抖音/TikTok类型代码/Type code for Douyin/TikTok"""
+ url_type_code_dict = {
+ # 抖音/Douyin
+ 2: "image",
+ 4: "video",
+ 68: "image",
+ # TikTok
+ 0: "video",
+ 51: "video",
+ 55: "video",
+ 58: "video",
+ 61: "video",
+ 150: "image",
+ }
+ # 获取视频类型/Get video type
+ # 如果类型代码不存在,则默认为视频类型/If the type code does not exist, it is
+ # assumed to be a video type
+ url_type = url_type_code_dict.get(url_type_code, "video")
+ if not quiet_mode:
+ print("数据类型代码: {}".format(url_type_code))
+ # 判断链接类型/Judge link type
+
+ print("数据类型: {}".format(url_type))
+ print("准备开始判断并处理数据...")
+
+ """
+ 以下为(视频||图片)数据处理的四个方法,如果你需要自定义数据处理请在这里修改.
+ The following are four methods of (video || image) data processing.
+ If you need to customize data processing, please modify it here.
+ """
+
+ """
+ 创建已知数据字典(索引相同),稍后使用.update()方法更新数据
+ Create a known data dictionary (index the same),
+ and then use the .update() method to update the data
+ """
+
+ result_data = {
+ "status": "success",
+ "message": "更多接口请查看(More API see): https://api.tikhub.io/docs",
+ "type": url_type,
+ "platform": url_platform,
+ "aweme_id": video_id,
+ "official_api_url": {
+ "User-Agent": self.headers["User-Agent"],
+ "api_url": f"https://www.iesdouyin.com/aweme/v1/web/aweme/detail/?aweme_id={video_id}&aid=1128&version_name=23.5.0&device_platform=android&os_version=2333&Github=Evil0ctal&words=FXXK_U_ByteDance",
+ }
+ if url_platform == "douyin"
+ else {
+ "User-Agent": self.tiktok_api_headers["User-Agent"],
+ "api_url": f"https://api16-normal-c-useast1a.tiktokv.com/aweme/v1/feed/?aweme_id={video_id}",
+ },
+ "desc": data.get("desc"),
+ "create_time": data.get("create_time"),
+ "author": data.get("author"),
+ "music": data.get("music"),
+ "statistics": data.get("statistics"),
+ "cover_data": {
+ "cover": data.get("video").get("cover"),
+ "origin_cover": data.get("video").get("origin_cover"),
+ "dynamic_cover": data.get("video").get("dynamic_cover"),
+ },
+ "hashtags": data.get("text_extra"),
+ }
+ # 创建一个空变量,稍后使用.update()方法更新数据/Create an empty variable and use
+ # the .update() method to update the data
+ api_data = None
+ # 判断链接类型并处理数据/Judge link type and process data
+ try:
+ # 抖音数据处理/Douyin data processing
+ if url_platform == "douyin":
+ # 抖音视频数据处理/Douyin video data processing
+ if url_type == "video":
+ if not quiet_mode:
+ print("正在处理抖音视频数据...")
+ # 将信息储存在字典中/Store information in a dictionary
+ uri = data["video"]["play_addr"]["uri"]
+ wm_video_url = data["video"]["play_addr"]["url_list"][0]
+ wm_video_url_HQ = f"https://aweme.snssdk.com/aweme/v1/playwm/?video_id={uri}&radio=1080p&line=0"
+ nwm_video_url = wm_video_url.replace("playwm", "play")
+ nwm_video_url_HQ = f"https://aweme.snssdk.com/aweme/v1/play/?video_id={uri}&ratio=1080p&line=0"
+ api_data = {
+ "video_data": {
+ "wm_video_url": wm_video_url,
+ "wm_video_url_HQ": wm_video_url_HQ,
+ "nwm_video_url": nwm_video_url,
+ "nwm_video_url_HQ": nwm_video_url_HQ,
+ }
+ }
+ # 抖音图片数据处理/Douyin image data processing
+ elif url_type == "image":
+ if not quiet_mode:
+ print("正在处理抖音图片数据...")
+ # 无水印图片列表/No watermark image list
+ no_watermark_image_list = []
+ # 有水印图片列表/With watermark image list
+ watermark_image_list = []
+ # 遍历图片列表/Traverse image list
+ for i in data["images"]:
+ no_watermark_image_list.append(i["url_list"][0])
+ watermark_image_list.append(i["download_url_list"][0])
+ api_data = {
+ "image_data": {"no_watermark_image_list": no_watermark_image_list, "watermark_image_list": watermark_image_list}
+ }
+ # TikTok数据处理/TikTok data processing
+ elif url_platform == "tiktok":
+ # TikTok视频数据处理/TikTok video data processing
+ if url_type == "video":
+ if not quiet_mode:
+ print("正在处理TikTok视频数据...")
+ # 将信息储存在字典中/Store information in a dictionary
+ wm_video = data["video"]["download_addr"]["url_list"][0]
+ api_data = {
+ "video_data": {
+ "wm_video_url": wm_video,
+ "wm_video_url_HQ": wm_video,
+ "nwm_video_url": data["video"]["play_addr"]["url_list"][0],
+ "nwm_video_url_HQ": data["video"]["bit_rate"][0]["play_addr"]["url_list"][0],
+ }
+ }
+ # TikTok图片数据处理/TikTok image data processing
+ elif url_type == "image":
+ if not quiet_mode:
+ print("正在处理TikTok图片数据...")
+ # 无水印图片列表/No watermark image list
+ no_watermark_image_list = []
+ # 有水印图片列表/With watermark image list
+ watermark_image_list = []
+ for i in data["image_post_info"]["images"]:
+ no_watermark_image_list.append(i["display_image"]["url_list"][0])
+ watermark_image_list.append(i["owner_watermark_image"]["url_list"][0])
+ api_data = {
+ "image_data": {"no_watermark_image_list": no_watermark_image_list, "watermark_image_list": watermark_image_list}
+ }
+ # 更新数据/Update data
+ result_data.update(api_data)
+ # print("数据处理完成,最终数据: \n{}".format(result_data))
+ # 返回数据/Return data
+ return result_data
+ except Exception as e:
+ if not quiet_mode:
+ traceback.print_exc()
+ print("数据处理失败!")
+ return {"status": "failed", "message": "数据处理失败!/Data processing failed!"}
+ else:
+ if not quiet_mode:
+ print("[抖音|TikTok方法]返回数据为空,无法处理!")
+ return {"status": "failed", "message": "返回数据为空,无法处理!/Return data is empty and cannot be processed!"}
+ else:
+ if not quiet_mode:
+ print("获取视频ID失败!")
+ return {"status": "failed", "message": "获取视频ID失败!/Failed to get video ID!"}
+
+ # 处理数据方便快捷指令使用/Process data for easy-to-use shortcuts
+ @staticmethod
+ def hybrid_parsing_minimal(data: dict) -> dict:
+ # 如果数据获取成功/If the data is successfully obtained
+ if data["status"] == "success":
+ result = {
+ "status": "success",
+ "message": data.get("message"),
+ "platform": data.get("platform"),
+ "type": data.get("type"),
+ "desc": data.get("desc"),
+ "wm_video_url": data["video_data"]["wm_video_url"] if data["type"] == "video" else None,
+ "wm_video_url_HQ": data["video_data"]["wm_video_url_HQ"] if data["type"] == "video" else None,
+ "nwm_video_url": data["video_data"]["nwm_video_url"] if data["type"] == "video" else None,
+ "nwm_video_url_HQ": data["video_data"]["nwm_video_url_HQ"] if data["type"] == "video" else None,
+ "no_watermark_image_list": data["image_data"]["no_watermark_image_list"] if data["type"] == "image" else None,
+ "watermark_image_list": data["image_data"]["watermark_image_list"] if data["type"] == "image" else None,
+ }
+ return result
+ else:
+ return data
+
+
+"""__________________________________________⬇️Test methods(测试方法)⬇️______________________________________"""
+
+
+async def async_test(_douyin_url: str = None, _tiktok_url: str = None) -> None:
+ # 异步测试/Async test
+ start_time = time.time()
+ print("正在进行异步测试...")
+
+ print("正在测试异步获取抖音视频ID方法...")
+ douyin_id = await api.get_douyin_video_id(_douyin_url)
+ print("正在测试异步获取抖音视频数据方法...")
+ await api.get_douyin_video_data(douyin_id)
+
+ print("正在测试异步获取TikTok视频ID方法...")
+ tiktok_id = await api.get_tiktok_video_id(_tiktok_url)
+ print("正在测试异步获取TikTok视频数据方法...")
+ await api.get_tiktok_video_data(tiktok_id)
+
+ print("正在测试异步混合解析方法...")
+ await api.hybrid_parsing(_douyin_url)
+ await api.hybrid_parsing(_tiktok_url)
+
+ # 总耗时/Total time
+ total_time = round(time.time() - start_time, 2)
+ print("异步测试完成,总耗时: {}s".format(total_time))
+
+
+if __name__ == "__main__":
+ api = Scraper()
+ # 运行测试
+ # params = "device_platform=webapp&aid=6383&channel=channel_pc_web&aweme_id=7153585499477757192&pc_client_type=1&version_code=190500&version_name=19.5.0&cookie_enabled=true&screen_width=1344&screen_height=756&browser_language=zh-CN&browser_platform=Win32&browser_name=Firefox&browser_version=110.0&browser_online=true&engine_name=Gecko&engine_version=109.0&os_name=Windows&os_version=10&cpu_core_num=16&device_memory=&platform=PC&webid=7158288523463362079"
+ # api.generate_x_bogus(params)
+ douyin_url = "https://v.douyin.com/rLyrQxA/6.66"
+ tiktok_url = "https://vt.tiktok.com/ZSRwWXtdr/"
+ asyncio.run(async_test(_douyin_url=douyin_url, _tiktok_url=tiktok_url))
\ No newline at end of file
diff --git a/app/api/ytdl.py b/app/api/ytdl.py
index f64a486..dc178bb 100755
--- a/app/api/ytdl.py
+++ b/app/api/ytdl.py
@@ -24,7 +24,6 @@ class FakeLogger(object):
class YT_DL(ScraperConfig):
def __init__(self, url):
super().__init__()
- self.set_sauce(url)
self.url = url
self.path = "downloads/" + str(time.time())
self.video_path = self.path + "/v.mp4"
@@ -35,30 +34,42 @@ class YT_DL(ScraperConfig):
"quiet": True,
"logger": FakeLogger(),
"noplaylist": True,
- "format": "best[ext=mp4]",
}
+ self.set_format()
async def download_or_extract(self):
- if "youtu" in self.url:
- if "/playlist" in self.url or "/live" in self.url or os.path.basename(self.url).startswith("@"):
- return
- self._opts["format"] = "bv[ext=mp4][res=480]+ba[ext=m4a]/b[ext=mp4]"
- if "/shorts" in self.url:
- self._opts["format"] = "bv[ext=mp4][res=720]+ba[ext=m4a]/b[ext=mp4]"
-
- yt_obj = yt_dlp.YoutubeDL(self._opts)
-
- info = yt_obj.extract_info(self.url, download=False)
-
- if not info or info.get("duration", 0) >= 300:
+ if self.check_url():
return
+ with yt_dlp.YoutubeDL(self._opts) as yt_obj:
+ info = await asyncio.to_thread(
+ yt_obj.extract_info, self.url, download=False
+ )
- await asyncio.to_thread(yt_obj.download, self.url)
+ if not info or info.get("duration", 0) >= 300:
+ return
- if "youtu" in self.url:
- self.caption = f"""__{info.get("channel","")}__:\n**{info.get("title","")}**"""
+ await asyncio.to_thread(yt_obj.download, self.url)
+
+ if "youtu" in self.url:
+ self.caption = (
+ f"""__{info.get("channel","")}__:\n**{info.get("title","")}**"""
+ )
if os.path.isfile(self.video_path):
self.link = self.video_path
self.thumb = await take_ss(self.video_path, path=self.path)
self.video = self.success = True
+
+ def check_url(self):
+ if "youtu" in self.url and (
+ "/live" in self.url or os.path.basename(self.url).startswith("@")
+ ):
+ return 1
+
+ def set_format(self):
+ if "/shorts" in self.url:
+ self._opts["format"] = "bv[ext=mp4][res=720]+ba[ext=m4a]/b[ext=mp4]"
+ elif "youtu" in self.url:
+ self._opts["format"] = "bv[ext=mp4][res=480]+ba[ext=m4a]/b[ext=mp4]"
+ else:
+ self._opts["format"] = "b[ext=mp4]"
diff --git a/app/config.py b/app/config.py
index e58c1e2..380cf87 100644
--- a/app/config.py
+++ b/app/config.py
@@ -1,14 +1,15 @@
-import os
import json
+import os
+
class Config:
API_KEYS = json.loads(os.environ.get("API_KEYS", "[]"))
BLOCKED_USERS = []
- BLOCKED_USERS_MESSAGE_ID = int(os.environ.get("BLOCKED_USERS_MESSAGE_ID",0))
+ BLOCKED_USERS_MESSAGE_ID = int(os.environ.get("BLOCKED_USERS_MESSAGE_ID", 0))
CHATS = []
- AUTO_DL_MESSAGE_ID = int(os.environ.get("AUTO_DL_MESSAGE_ID",0))
+ AUTO_DL_MESSAGE_ID = int(os.environ.get("AUTO_DL_MESSAGE_ID", 0))
CMD_DICT = {}
@@ -17,4 +18,4 @@ class Config:
TRIGGER = os.environ.get("TRIGGER", ".")
USERS = []
- USERS_MESSAGE_ID = int(os.environ.get("USERS_MESSAGE_ID",0))
\ No newline at end of file
+ USERS_MESSAGE_ID = int(os.environ.get("USERS_MESSAGE_ID", 0))
diff --git a/app/core/MediaHandler.py b/app/core/MediaHandler.py
index 578b9c1..5a36869 100644
--- a/app/core/MediaHandler.py
+++ b/app/core/MediaHandler.py
@@ -4,6 +4,9 @@ import os
import traceback
from urllib.parse import urlparse
+from pyrogram.errors import MediaEmpty, PhotoSaveFileInvalid, WebpageCurlFailed
+from pyrogram.types import InputMediaPhoto, InputMediaVideo
+
from app.api.gallerydl import Gallery_DL
from app.api.instagram import Instagram
from app.api.reddit import Reddit
@@ -11,8 +14,6 @@ from app.api.threads import Threads
from app.api.tiktok import Tiktok
from app.api.ytdl import YT_DL
from app.core import aiohttp_tools, shell
-from pyrogram.errors import MediaEmpty, PhotoSaveFileInvalid, WebpageCurlFailed
-from pyrogram.types import InputMediaPhoto, InputMediaVideo
url_map = {
"tiktok.com": Tiktok,
@@ -29,32 +30,48 @@ url_map = {
class ExtractAndSendMedia:
def __init__(self, message):
- self.exceptions, self.media_objects = [], []
+ self.exceptions, self.media_objects, self.sender_dict = [], [], {}
self.__client = message._client
self.message = message
self.doc = "-d" in message.flags
self.spoiler = "-s" in message.flags
- self.sender = "" if "-ns" in message.flags else f"\nShared by : {self.extract_sender()}"
self.args_ = {
"chat_id": self.message.chat.id,
"reply_to_message_id": message.reply_id,
}
- def extract_sender(self):
+ def get_sender(self, reply=False):
+ if "-ns" in self.message.flags:
+ return ""
+ text = f"\nShared by : "
author = self.message.author_signature
sender = user.first_name if (user := self.message.from_user) else ""
- return author or sender
+ reply_sender = ""
+ if reply:
+ reply_msg = self.message.replied
+ reply_sender = (
+ reply_user.first_name if (reply_user := reply_msg.from_user) else ""
+ )
+ if any((author, sender, reply_sender)):
+ return text + (author or sender if not reply else reply_sender)
+ else:
+ return ""
async def get_media(self):
async with asyncio.TaskGroup() as task_group:
tasks = []
- for link in self.message.get_text_list:
+ text_list = self.message.text_list
+ reply_text_list = self.message.reply_text_list
+ for link in text_list + reply_text_list:
+ reply = link in reply_text_list
if match := url_map.get(urlparse(link).netloc):
tasks.append(task_group.create_task(match.start(link)))
+ self.sender_dict[link] = self.get_sender(reply=reply)
else:
for key, val in url_map.items():
if key in link:
tasks.append(task_group.create_task(val.start(link)))
+ self.sender_dict[link] = self.get_sender(reply=reply)
self.media_objects = [task.result() for task in tasks if task.result()]
async def send_media(self):
@@ -62,7 +79,9 @@ class ExtractAndSendMedia:
if "-nc" in self.message.flags:
caption = ""
else:
- caption = obj.caption + obj.caption_url + self.sender
+ caption = (
+ obj.caption + obj.caption_url + self.sender_dict[obj.query_url]
+ )
try:
if self.doc:
await self.send_document(obj.link, caption=caption, path=obj.path)
@@ -70,14 +89,14 @@ class ExtractAndSendMedia:
await self.send_group(obj, caption=caption)
elif obj.photo:
await self.send(
- media={"photo":obj.link},
+ media={"photo": obj.link},
method=self.__client.send_photo,
caption=caption,
has_spoiler=self.spoiler,
)
elif obj.video:
await self.send(
- media={"video":obj.link},
+ media={"video": obj.link},
method=self.__client.send_video,
thumb=await aiohttp_tools.thumb_dl(obj.thumb),
caption=caption,
@@ -85,14 +104,16 @@ class ExtractAndSendMedia:
)
elif obj.gif:
await self.send(
- media={"animation":obj.link},
+ media={"animation": obj.link},
method=self.__client.send_animation,
caption=caption,
has_spoiler=self.spoiler,
unsave=True,
)
except BaseException:
- self.exceptions.append("\n".join([obj.caption_url.strip(), traceback.format_exc()]))
+ self.exceptions.append(
+ "\n".join([obj.caption_url.strip(), traceback.format_exc()])
+ )
async def send(self, media, method, **kwargs):
try:
@@ -103,20 +124,28 @@ class ExtractAndSendMedia:
media[key] = await aiohttp_tools.in_memory_dl(value)
await method(**media, **self.args_, **kwargs)
except PhotoSaveFileInvalid:
- await self.__client.send_document(**self.args_, document=media, caption=caption, force_document=True)
+ await self.__client.send_document(
+ **self.args_, document=media, caption=caption, force_document=True
+ )
async def send_document(self, docs, caption, path=""):
if not path:
- docs = await asyncio.gather(*[aiohttp_tools.in_memory_dl(doc) for doc in docs])
+ docs = await asyncio.gather(
+ *[aiohttp_tools.in_memory_dl(doc) for doc in docs]
+ )
else:
[os.rename(file_, file_ + ".png") for file_ in glob.glob(f"{path}/*.webp")]
docs = glob.glob(f"{path}/*")
for doc in docs:
try:
- await self.__client.send_document(**self.args_, document=doc, caption=caption, force_document=True)
+ await self.__client.send_document(
+ **self.args_, document=doc, caption=caption, force_document=True
+ )
except (MediaEmpty, WebpageCurlFailed):
doc = await aiohttp_tools.in_memory_dl(doc)
- await self.__client.send_document(**self.args_, document=doc, caption=caption, force_document=True)
+ await self.__client.send_document(
+ **self.args_, document=doc, caption=caption, force_document=True
+ )
await asyncio.sleep(0.5)
async def send_group(self, media, caption):
@@ -128,7 +157,13 @@ class ExtractAndSendMedia:
if isinstance(data, list):
await self.__client.send_media_group(**self.args_, media=data)
else:
- await self.send(media={"animation": data}, method=self.__client.send_animation, caption=caption, has_spoiler=self.spoiler, unsave=True)
+ await self.send(
+ media={"animation": data},
+ method=self.__client.send_animation,
+ caption=caption,
+ has_spoiler=self.spoiler,
+ unsave=True,
+ )
await asyncio.sleep(1)
async def sort_media_path(self, path, caption):
@@ -136,24 +171,34 @@ class ExtractAndSendMedia:
images, videos, animations = [], [], []
for file in glob.glob(f"{path}/*"):
if file.lower().endswith((".png", ".jpg", ".jpeg")):
- images.append(InputMediaPhoto(file, caption=caption, has_spoiler=self.spoiler))
+ images.append(
+ InputMediaPhoto(file, caption=caption, has_spoiler=self.spoiler)
+ )
if file.lower().endswith((".mp4", ".mkv", ".webm")):
has_audio = await shell.check_audio(file)
if not has_audio:
animations.append(file)
else:
- videos.append(InputMediaVideo(file, caption=caption, has_spoiler=self.spoiler))
+ videos.append(
+ InputMediaVideo(file, caption=caption, has_spoiler=self.spoiler)
+ )
return await self.make_chunks(images, videos, animations)
async def sort_media_urls(self, urls, caption):
images, videos, animations = [], [], []
- downloads = await asyncio.gather(*[aiohttp_tools.in_memory_dl(url) for url in urls])
+ downloads = await asyncio.gather(
+ *[aiohttp_tools.in_memory_dl(url) for url in urls]
+ )
for file_obj in downloads:
name = file_obj.name.lower()
if name.endswith((".png", ".jpg", ".jpeg")):
- images.append(InputMediaPhoto(file_obj, caption=caption, has_spoiler=self.spoiler))
+ images.append(
+ InputMediaPhoto(file_obj, caption=caption, has_spoiler=self.spoiler)
+ )
if name.endswith((".mp4", ".mkv", ".webm")):
- videos.append(InputMediaVideo(file_obj, caption=caption, has_spoiler=self.spoiler))
+ videos.append(
+ InputMediaVideo(file_obj, caption=caption, has_spoiler=self.spoiler)
+ )
if name.endswith(".gif"):
animations.append(file_obj)
return await self.make_chunks(images, videos, animations)
diff --git a/app/core/aiohttp_tools.py b/app/core/aiohttp_tools.py
index cfb17bf..49c09ef 100755
--- a/app/core/aiohttp_tools.py
+++ b/app/core/aiohttp_tools.py
@@ -15,9 +15,17 @@ async def session_switch():
await SESSION.close()
-async def get_json(url: str, headers: dict = None, params: dict = None, json_: bool = False, timeout: int = 10):
+async def get_json(
+ url: str,
+ headers: dict = None,
+ params: dict = None,
+ json_: bool = False,
+ timeout: int = 10,
+):
try:
- async with SESSION.get(url=url, headers=headers, params=params, timeout=timeout) as ses:
+ async with SESSION.get(
+ url=url, headers=headers, params=params, timeout=timeout
+ ) as ses:
if json_:
ret_json = await ses.json()
else:
diff --git a/app/core/client.py b/app/core/client.py
index ae5a06a..209fe67 100644
--- a/app/core/client.py
+++ b/app/core/client.py
@@ -11,17 +11,15 @@ from pyrogram.enums import ParseMode
from app import Config
from app.core import aiohttp_tools
-
from app.core.message import Message
class BOT(Client):
-
def __init__(self):
if string := os.environ.get("STRING_SESSION"):
- mode_arg = { "session_string": string }
+ mode_arg = {"session_string": string}
else:
- mode_arg = { "bot_token": os.environ.get("BOT_TOKEN") }
+ mode_arg = {"bot_token": os.environ.get("BOT_TOKEN")}
super().__init__(
name="bot",
**mode_arg,
@@ -60,7 +58,11 @@ class BOT(Client):
restart_chat = os.environ.get("RESTART_CHAT")
if restart_msg and restart_chat:
await super().get_chat(int(restart_chat))
- await super().edit_message_text(chat_id=int(restart_chat), message_id=int(restart_msg), text="#Social-dl\n__Started__")
+ await super().edit_message_text(
+ chat_id=int(restart_chat),
+ message_id=int(restart_msg),
+ text="#Social-dl\n__Started__",
+ )
os.environ.pop("RESTART_MSG", "")
os.environ.pop("RESTART_CHAT", "")
@@ -70,10 +72,17 @@ class BOT(Client):
py_name = name.replace("/", ".")
importlib.import_module(py_name)
- async def log(self, text, chat=None, func=None, name="log.txt",disable_web_page_preview=True):
+ async def log(
+ self, text, chat=None, func=None, name="log.txt", disable_web_page_preview=True
+ ):
if chat or func:
text = f"Function: {func}\nChat: {chat}\nTraceback:\n{text}"
- return await self.send_message(chat_id=Config.LOG_CHAT, text=text, name=name, disable_web_page_preview=disable_web_page_preview)
+ return await self.send_message(
+ chat_id=Config.LOG_CHAT,
+ text=text,
+ name=name,
+ disable_web_page_preview=disable_web_page_preview,
+ )
async def restart(self):
await aiohttp_tools.session_switch()
@@ -86,16 +95,23 @@ class BOT(Client):
users = Config.USERS_MESSAGE_ID
if chats_id:
- Config.CHATS = json.loads((await super().get_messages(Config.LOG_CHAT, chats_id)).text)
+ Config.CHATS = json.loads(
+ (await super().get_messages(Config.LOG_CHAT, chats_id)).text
+ )
if blocked_id:
- Config.BLOCKED_USERS = json.loads((await super().get_messages(Config.LOG_CHAT, blocked_id)).text)
+ Config.BLOCKED_USERS = json.loads(
+ (await super().get_messages(Config.LOG_CHAT, blocked_id)).text
+ )
if users:
- Config.USERS = json.loads((await super().get_messages(Config.LOG_CHAT, users)).text)
-
+ Config.USERS = json.loads(
+ (await super().get_messages(Config.LOG_CHAT, users)).text
+ )
async def send_message(self, chat_id, text, name: str = "output.txt", **kwargs):
if len(str(text)) < 4096:
- return Message.parse_message((await super().send_message(chat_id=chat_id, text=text, **kwargs)))
+ return Message.parse_message(
+ (await super().send_message(chat_id=chat_id, text=text, **kwargs))
+ )
doc = BytesIO(bytes(text, encoding="utf-8"))
doc.name = name
kwargs.pop("disable_web_page_preview", "")
diff --git a/app/core/filters.py b/app/core/filters.py
index 83111d4..5a83f82 100644
--- a/app/core/filters.py
+++ b/app/core/filters.py
@@ -15,7 +15,7 @@ def Dynamic_Chat_Filter(_, __, message):
):
return False
user = message.from_user
- if user and ( user.id in Config.BLOCKED_USERS or user.is_bot ):
+ if user and (user.id in Config.BLOCKED_USERS or user.is_bot):
return False
url_check = check_for_urls(message.text.split())
return bool(url_check)
diff --git a/app/core/message.py b/app/core/message.py
index 1b98829..b13cab2 100755
--- a/app/core/message.py
+++ b/app/core/message.py
@@ -1,58 +1,102 @@
+import asyncio
+from functools import cached_property
+
from pyrogram.types import Message as MSG
+from app import Config
+
class Message(MSG):
def __init__(self, message):
- self.flags = []
- self.input, self.flt_input = "", ""
- self.replied, self.reply_id = None, None
super().__dict__.update(message.__dict__)
- self.set_reply_properties()
- self.flags_n_input()
- self.set_flt_input()
- @property
- def get_text_list(self):
- text_list = self.text.split()
- if self.replied and (reply_text := self.replied.text) and "dl" in text_list[0]:
- text_list.extend(reply_text.split())
- return text_list
+ @cached_property
+ def text_list(self):
+ return self.text.split()
- def set_reply_properties(self):
- if replied := self.reply_to_message:
- self.replied = replied
- self.reply_id = replied.id
+ @cached_property
+ def reply_text_list(self):
+ reply_text_list = []
+ if (
+ self.replied
+ and (reply_text := self.replied.text)
+ and "dl" in self.text_list[0]
+ ):
+ reply_text_list = self.replied.text_list
+ return reply_text_list
- def flags_n_input(self):
- self.flags = [i for i in self.text.split() if i.startswith("-") ]
- split_cmd_str = self.text.split(maxsplit=1)
- if len(split_cmd_str) > 1:
- self.input = split_cmd_str[-1]
+ @cached_property
+ def cmd(self):
+ raw_cmd = self.text_list[0]
+ cmd = raw_cmd.lstrip(Config.TRIGGER)
+ return cmd if cmd in Config.CMD_DICT else None
- def set_flt_input(self):
- line_split = self.input.splitlines()
- split_n_joined =[
- " ".join([word for word in line.split(" ") if word not in self.flags])
- for line in line_split
+ @cached_property
+ def flags(self):
+ return [i for i in self.text_list if i.startswith("-")]
+
+ @cached_property
+ def input(self):
+ if len(self.text_list) > 1:
+ return self.text.split(maxsplit=1)[-1]
+ return ""
+
+ @cached_property
+ def flt_input(self):
+ split_lines = self.input.splitlines()
+ split_n_joined = [
+ " ".join([word for word in line.split(" ") if word not in self.flags])
+ for line in split_lines
]
- self.flt_input = "\n".join(split_n_joined)
+ return "\n".join(split_n_joined)
- async def reply(self, text, **kwargs):
- return await self._client.send_message(chat_id=self.chat.id, text=text, reply_to_message_id=self.id, **kwargs)
+ @cached_property
+ def replied(self):
+ if self.reply_to_message:
+ return Message.parse_message(self.reply_to_message)
- async def edit(self, text, **kwargs):
+ @cached_property
+ def reply_id(self):
+ return self.replied.id if self.replied else None
+
+ async def reply(self, text, del_in: int = 0, block=True, **kwargs):
+ task = self._client.send_message(
+ chat_id=self.chat.id, text=text, reply_to_message_id=self.id, **kwargs
+ )
+ if del_in:
+ await self.async_deleter(task=task, del_in=del_in, block=block)
+ else:
+ return await task
+
+ async def edit(self, text, del_in: int = 0, block=True, **kwargs):
if len(str(text)) < 4096:
kwargs.pop("name", "")
- await self.edit_text(text, **kwargs)
+ task = self.edit_text(text, **kwargs)
+ if del_in:
+ reply = await self.async_deleter(task=task, del_in=del_in, block=block)
+ else:
+ reply = await task
else:
- await super().delete()
- return await self.reply(text, **kwargs)
+ _, reply = await asyncio.gather(
+ super().delete(), self.reply(text, **kwargs)
+ )
+ return reply
async def delete(self, reply=False):
await super().delete()
if reply and self.replied:
await self.replied.delete()
+ async def async_deleter(self, del_in, task, block):
+ if block:
+ x = await task
+ await asyncio.sleep(del_in)
+ await x.delete()
+ else:
+ asyncio.create_task(
+ self.async_deleter(del_in=del_in, task=task, block=True)
+ )
+
@classmethod
def parse_message(cls, message):
ret_obj = cls(message)
diff --git a/app/core/scraper_config.py b/app/core/scraper_config.py
index 5f83ef9..d2b5a87 100644
--- a/app/core/scraper_config.py
+++ b/app/core/scraper_config.py
@@ -14,12 +14,14 @@ class ScraperConfig:
self.group = False
self.gif = False
- def set_sauce(self, url):
- self.caption_url = f"\n\nSauce"
+ def set_sauce(self):
+ self.caption_url = f"\n\nSauce"
@classmethod
async def start(cls, url):
obj = cls(url=url)
+ obj.query_url = url
+ obj.set_sauce()
await obj.download_or_extract()
if obj.success:
return obj
diff --git a/app/core/shell.py b/app/core/shell.py
index eb3989d..2375131 100755
--- a/app/core/shell.py
+++ b/app/core/shell.py
@@ -4,18 +4,24 @@ import os
async def take_ss(video: str, path: str):
thumb = f"{path}/i.png"
- await run_shell_cmd(f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"''')
+ await run_shell_cmd(
+ f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"'''
+ )
if os.path.isfile(thumb):
return thumb
async def check_audio(file):
- result = await run_shell_cmd(f"ffprobe -v error -show_entries format=nb_streams -of default=noprint_wrappers=1:nokey=1 {file}")
+ result = await run_shell_cmd(
+ f"ffprobe -v error -show_entries format=nb_streams -of default=noprint_wrappers=1:nokey=1 {file}"
+ )
return int(result or 0) - 1
async def run_shell_cmd(cmd):
- proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
+ proc = await asyncio.create_subprocess_shell(
+ cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
+ )
stdout, _ = await proc.communicate()
return stdout.decode("utf-8")
@@ -41,7 +47,11 @@ class AsyncShell:
@classmethod
async def run_cmd(cls, cmd):
# Create Subprocess and initialise self using cls
- sub_process = cls(process=await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT))
+ sub_process = cls(
+ process=await asyncio.create_subprocess_shell(
+ cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
+ )
+ )
# Start Checking output but don't block code by awaiting it.
asyncio.create_task(sub_process.get_output())
# Sleep for a short time to let previous task start
diff --git a/app/plugins/authorise.py b/app/plugins/authorise.py
index b5a882e..bb4323f 100644
--- a/app/plugins/authorise.py
+++ b/app/plugins/authorise.py
@@ -61,8 +61,8 @@ async def add_sudo(bot, message):
config_list=Config.USERS,
message_id=Config.USERS_MESSAGE_ID,
):
- return await message.reply(err)
- await message.reply("User Added to Authorised List.")
+ return await message.reply(err, del_in=5)
+ await message.reply("User Added to Authorised List.", del_in=5)
@bot.add_cmd(cmd="delsudo")
@@ -77,8 +77,8 @@ async def add_sudo(bot, message):
config_list=Config.USERS,
message_id=Config.USERS_MESSAGE_ID,
):
- return await message.reply(err)
- await message.reply("User Removed from Authorised List.")
+ return await message.reply(err, del_in=5)
+ await message.reply("User Removed from Authorised List.", del_in=5)
@bot.add_cmd(cmd="addchat")
@@ -93,8 +93,11 @@ async def add_chat(bot, message):
config_list=Config.CHATS,
message_id=Config.AUTO_DL_MESSAGE_ID,
):
- return await message.reply(err)
- await message.reply(f"{message.chat.title or message.chat.first_name} Added to Authorised List.")
+ return await message.reply(err, del_in=5)
+ await message.reply(
+ f"{message.chat.title or message.chat.first_name} Added to Authorised List.",
+ del_in=5,
+ )
@bot.add_cmd(cmd="delchat")
@@ -109,9 +112,11 @@ async def add_chat(bot, message):
config_list=Config.CHATS,
message_id=Config.AUTO_DL_MESSAGE_ID,
):
- return await message.reply(err)
- await message.reply(f"{message.chat.title or message.chat.first_name} Added Removed from Authorised List.")
-
+ return await message.reply(err, del_in=5)
+ await message.reply(
+ f"{message.chat.title or message.chat.first_name} Added Removed from Authorised List.",
+ del_in=5,
+ )
@bot.add_cmd(cmd="block")
@@ -126,8 +131,8 @@ async def add_sudo(bot, message):
config_list=Config.BLOCKED_USERS,
message_id=Config.BLOCKED_USERS_MESSAGE_ID,
):
- return await message.reply(err)
- await message.reply("User Added to Ban List.")
+ return await message.reply(err, del_in=5)
+ await message.reply("User Added to Ban List.", del_in=5)
@bot.add_cmd(cmd="unblock")
@@ -142,8 +147,5 @@ async def add_sudo(bot, message):
config_list=Config.BLOCKED_USERS,
message_id=Config.BLOCKED_USERS_MESSAGE_ID,
):
- return await message.reply(err)
- await message.reply("User Removed from Ban List.")
-
-
-
+ return await message.reply(err, del_in=5)
+ await message.reply("User Removed from Ban List.", del_in=5)
diff --git a/app/plugins/bot.py b/app/plugins/bot.py
index a49cf76..465099c 100755
--- a/app/plugins/bot.py
+++ b/app/plugins/bot.py
@@ -8,14 +8,23 @@ from app import Config, bot
@bot.add_cmd(cmd="bot")
async def info(bot, message):
head = "Social-DL is running."
- chat_count = f"\nAuto-Dl enabled in: {len(Config.CHATS)}
chats\n"
+ chat_count = (
+ f"\nAuto-Dl enabled in: {len(Config.CHATS)}
chats\n"
+ )
supported_sites, photo = await bot.get_messages("Social_DL", [2, 3])
- await photo.copy(message.chat.id, caption="\n".join([head, chat_count, supported_sites.text.html]))
+ await photo.copy(
+ message.chat.id,
+ caption="\n".join([head, chat_count, supported_sites.text.html]),
+ )
+
@bot.add_cmd(cmd="help")
async def help(bot, message):
- commands = "\n".join([ f"{Config.TRIGGER}{i}
" for i in Config.CMD_DICT.keys()])
- await message.reply(f"Available Commands:\n\n{commands}")
+ commands = "\n".join(
+ [f"{Config.TRIGGER}{i}
" for i in Config.CMD_DICT.keys()]
+ )
+ await message.reply(f"Available Commands:\n\n{commands}", del_in=30)
+
@bot.add_cmd(cmd="restart")
async def restart(bot, message):
@@ -29,4 +38,4 @@ async def restart(bot, message):
@bot.add_cmd(cmd="update")
async def chat_update(bot, message):
await bot.set_filter_list()
- await message.reply("Filters Refreshed")
\ No newline at end of file
+ await message.reply("Filters Refreshed", del_in=10)
diff --git a/app/plugins/dev_tools.py b/app/plugins/dev_tools.py
new file mode 100644
index 0000000..95a0726
--- /dev/null
+++ b/app/plugins/dev_tools.py
@@ -0,0 +1,109 @@
+import asyncio
+import importlib
+import sys
+import traceback
+from io import StringIO
+
+from pyrogram.enums import ParseMode
+
+from app import Config
+from app.core import shell
+
+from app.core import aiohttp_tools as aio # isort:skip
+
+
+# Run shell commands
+async def run_cmd(bot, message):
+ cmd = message.input.strip()
+ status_ = await message.reply("executing...")
+ proc_stdout = await shell.run_shell_cmd(cmd)
+ output = f"`${cmd}`\n\n`{proc_stdout}`"
+ return await status_.edit(output, name="sh.txt", disable_web_page_preview=True)
+
+
+# Shell but Live Output
+async def live_shell(bot, message):
+ cmd = message.input.strip()
+ sub_process = await shell.AsyncShell.run_cmd(cmd)
+ reply = await message.reply("`getting live output....`")
+ output = ""
+ sleep_for = 1
+ while sub_process.is_not_completed:
+ # Edit message only when there's new output.
+ if output != sub_process.full_std:
+ output = sub_process.full_std
+ if len(output) <= 4096:
+ await reply.edit(
+ f"`{output}`",
+ disable_web_page_preview=True,
+ parse_mode=ParseMode.MARKDOWN,
+ )
+ # Reset sleep duration
+ if sleep_for >= 5:
+ sleep_for = 1
+ # Sleep to Unblock running loop and let output reader read new
+ # output.
+ await asyncio.sleep(sleep_for)
+ sleep_for += 1
+ # If the subprocess is finished edit the message with cmd and full
+ # output
+ return await reply.edit(
+ f"`$ {cmd}\n\n``{sub_process.full_std}`",
+ name="shell.txt",
+ disable_web_page_preview=True,
+ )
+
+
+# Run Python code
+async def executor(bot, message):
+ code = message.flt_input.strip()
+ if not code:
+ return await message.reply("exec Jo mama?")
+ reply = await message.reply("executing")
+ sys.stdout = codeOut = StringIO()
+ sys.stderr = codeErr = StringIO()
+ # Indent code as per proper python syntax
+ formatted_code = "\n ".join(code.splitlines())
+ try:
+ # Create and initialise the function
+ exec(f"async def _exec(bot, message):\n {formatted_code}")
+ func_out = await locals()["_exec"](bot, message)
+ except BaseException:
+ func_out = str(traceback.format_exc())
+ sys.stdout = sys.__stdout__
+ sys.stderr = sys.__stderr__
+ output = f"`{codeOut.getvalue().strip() or codeErr.getvalue().strip() or func_out}`"
+ if "-s" not in message.flags:
+ output = f"> `{code}`\n\n>> {output}"
+ return await reply.edit(
+ output,
+ name="exec.txt",
+ disable_web_page_preview=True,
+ parse_mode=ParseMode.MARKDOWN,
+ )
+
+
+async def loader(bot, message):
+ if (
+ not message.replied
+ or not message.replied.document
+ or not message.replied.document.file_name.endswith(".py")
+ ):
+ return await message.reply("reply to a plugin.")
+ reply = await message.reply("Loading....")
+ file_name = message.replied.document.file_name.rstrip(".py")
+ reload = sys.modules.pop(f"app.temp.{file_name}", None)
+ status = "Reloaded" if reload else "Loaded"
+ await message.replied.download("app/temp/")
+ try:
+ importlib.import_module(f"app.temp.{file_name}")
+ except BaseException:
+ return await reply.edit(str(traceback.format_exc()))
+ await reply.edit(f"{status} {file_name}.py.")
+
+
+if Config.DEV_MODE:
+ Config.CMD_DICT["sh"] = run_cmd
+ Config.CMD_DICT["shell"] = live_shell
+ Config.CMD_DICT["exec"] = executor
+ Config.CMD_DICT["load"] = loader
diff --git a/app/plugins/loader.py b/app/plugins/loader.py
deleted file mode 100644
index 6649ba3..0000000
--- a/app/plugins/loader.py
+++ /dev/null
@@ -1,24 +0,0 @@
-import importlib
-import sys
-import traceback
-
-from app import bot
-
-@bot.add_cmd(cmd="load")
-async def loader(bot, message):
- if (
- not message.replied
- or not message.replied.document
- or not message.replied.document.file_name.endswith(".py")
- ):
- return await message.reply("reply to a plugin.")
- reply = await message.reply("Loading....")
- file_name = message.replied.document.file_name.rstrip(".py")
- reload = sys.modules.pop(f"app.temp.{file_name}", None)
- status = "Reloaded" if reload else "Loaded"
- plugin = await message.replied.download("app/temp/")
- try:
- importlib.import_module(f"app.temp.{file_name}")
- except BaseException:
- return await reply.edit(str(traceback.format_exc()))
- await reply.edit(f"{status} {file_name}.py.")
diff --git a/app/plugins/song.py b/app/plugins/song.py
index 346a10c..ebc3d08 100644
--- a/app/plugins/song.py
+++ b/app/plugins/song.py
@@ -5,11 +5,19 @@ from time import time
from urllib.parse import urlparse
import yt_dlp
+
from app import bot
from app.api.ytdl import FakeLogger
from app.core.aiohttp_tools import in_memory_dl
-domains = ["www.youtube.com", "youtube.com", "m.youtube.com", "youtu.be", "www.youtube-nocookie.com", "music.youtube.com"]
+domains = [
+ "www.youtube.com",
+ "youtube.com",
+ "m.youtube.com",
+ "youtu.be",
+ "www.youtube-nocookie.com",
+ "music.youtube.com",
+]
@bot.add_cmd(cmd="song")
@@ -36,7 +44,11 @@ async def song_dl(bot, message):
"logger": FakeLogger(),
"outtmpl": dl_path + "%(title)s.%(ext)s",
"format": "bestaudio",
- "postprocessors": [{"key": "FFmpegExtractAudio", "preferredcodec": aformat}, {"key": "FFmpegMetadata"}, {"key": "EmbedThumbnail"}],
+ "postprocessors": [
+ {"key": "FFmpegExtractAudio", "preferredcodec": aformat},
+ {"key": "FFmpegMetadata"},
+ {"key": "EmbedThumbnail"},
+ ],
}
ytdl = yt_dlp.YoutubeDL(yt_opts)
yt_info = await asyncio.to_thread(ytdl.extract_info, query_or_search)
@@ -51,6 +63,11 @@ async def song_dl(bot, message):
await response.edit("Uploading....")
for audio_file in down_path:
if audio_file.endswith((".opus", ".mp3")):
- await message.reply_audio(audio=audio_file, duration=int(duration), performer=str(artist), thumb=thumb)
+ await message.reply_audio(
+ audio=audio_file,
+ duration=int(duration),
+ performer=str(artist),
+ thumb=thumb,
+ )
await response.delete()
- shutil.rmtree(dl_path, ignore_errors=True)
\ No newline at end of file
+ shutil.rmtree(dl_path, ignore_errors=True)
diff --git a/app/plugins/tgUtils.py b/app/plugins/tgUtils.py
index 51dc8f1..82a71c9 100644
--- a/app/plugins/tgUtils.py
+++ b/app/plugins/tgUtils.py
@@ -23,7 +23,9 @@ async def purge_(bot, message: Message):
start_message = reply.id
end_message = message.id
messages = [end_message] + [i for i in range(int(start_message), int(end_message))]
- await bot.delete_messages(chat_id=message.chat.id, message_ids=messages, revoke=True)
+ await bot.delete_messages(
+ chat_id=message.chat.id, message_ids=messages, revoke=True
+ )
@bot.add_cmd(cmd="ids")
@@ -70,4 +72,9 @@ async def leave_chat(bot, message):
@bot.add_cmd(cmd="reply")
async def reply(bot, message):
text = message.input
- await bot.send_message(chat_id=message.chat.id, text=text, reply_to_message_id=message.reply_id, disable_web_page_preview=True)
+ await bot.send_message(
+ chat_id=message.chat.id,
+ text=text,
+ reply_to_message_id=message.reply_id,
+ disable_web_page_preview=True,
+ )
diff --git a/app/plugins/tools.py b/app/plugins/tools.py
index bd09943..37d541f 100644
--- a/app/plugins/tools.py
+++ b/app/plugins/tools.py
@@ -1,76 +1,19 @@
-import asyncio
-import sys
-import traceback
-from io import StringIO
-
-from pyrogram.enums import ParseMode
-
-from app import Config
-from app.core import shell
-from app.core import aiohttp_tools as aio # isort:skip
-
-# Run shell commands
-
-async def run_cmd(bot, message):
- cmd = message.input.strip()
- status_ = await message.reply("executing...")
- proc_stdout = await shell.run_shell_cmd(cmd)
- output = f"`${cmd}`\n\n`{proc_stdout}`"
- return await status_.edit(output, name="sh.txt", disable_web_page_preview=True)
+from app import bot
+from app.social_dl import current_tasks
-# Shell but Live Output
-
-
-async def live_shell(bot, message):
- cmd = message.input.strip()
- sub_process = await shell.AsyncShell.run_cmd(cmd)
- reply = await message.reply("`getting live output....`")
- output = ""
- sleep_for = 1
- while sub_process.is_not_completed:
- # Edit message only when there's new output.
- if output != sub_process.full_std:
- output = sub_process.full_std
- if len(output) <= 4096:
- await reply.edit(f"`{output}`", disable_web_page_preview=True, parse_mode=ParseMode.MARKDOWN)
- # Reset sleep duration
- if sleep_for >= 5:
- sleep_for = 1
- # Sleep to Unblock running loop and let output reader read new
- # output.
- await asyncio.sleep(sleep_for)
- sleep_for += 1
- # If the subprocess is finished edit the message with cmd and full
- # output
- return await reply.edit(f"`$ {cmd}\n\n``{sub_process.full_std}`", name="shell.txt", disable_web_page_preview=True)
-
-
-# Run Python code
-async def executor_(bot, message):
- code = message.flt_input.strip()
- if not code:
- return await message.reply("exec Jo mama?")
- reply = await message.reply("executing")
- sys.stdout = codeOut = StringIO()
- sys.stderr = codeErr = StringIO()
- # Indent code as per proper python syntax
- formatted_code = "\n ".join(code.splitlines())
- try:
- # Create and initialise the function
- exec(f"async def _exec(bot, message):\n {formatted_code}")
- func_out = await locals().get("_exec")(bot, message)
- except BaseException:
- func_out = str(traceback.format_exc())
- sys.stdout = sys.__stdout__
- sys.stderr = sys.__stderr__
- output = f"`{codeOut.getvalue().strip() or codeErr.getvalue().strip() or func_out}`"
- if "-s" not in message.flags:
- output = f"> `{code}`\n\n>> {output}"
- return await reply.edit(output, name="exec.txt", disable_web_page_preview=True,parse_mode=ParseMode.MARKDOWN)
-
-
-if Config.DEV_MODE:
- Config.CMD_DICT["sh"] = run_cmd
- Config.CMD_DICT["shell"] = live_shell
- Config.CMD_DICT["exec"] = executor_
+@bot.add_cmd(cmd="cancel")
+async def cancel_task(bot, message):
+ task_id = message.reply_id
+ if not task_id:
+ return await message.reply("Reply To a Command or Bot's Response Message.")
+ task = current_tasks.get(task_id)
+ if not task:
+ return await message.reply("Task not in Currently Running Tasks.")
+ reply = await message.reply("Cancelling....")
+ cancelled = task.cancel()
+ if cancelled:
+ response = "Task Cancelled Successfully."
+ else:
+ response = "Task not Running.\nIt either is Finished or has Errored."
+ await reply.edit(response)
diff --git a/app/social_dl.py b/app/social_dl.py
index 449b400..7f7060a 100644
--- a/app/social_dl.py
+++ b/app/social_dl.py
@@ -1,39 +1,73 @@
+import asyncio
import traceback
+
from app import Config, bot
from app.core import filters
from app.core.MediaHandler import ExtractAndSendMedia
from app.core.message import Message
+current_tasks = {}
+
@bot.add_cmd(cmd="dl")
async def dl(bot, message):
- reply = await bot.send_message(chat_id=message.chat.id, text="`trying to download...`")
- media = await ExtractAndSendMedia.process(message)
+ reply = await bot.send_message(
+ chat_id=message.chat.id, text="`trying to download...`"
+ )
+ task = asyncio.Task(ExtractAndSendMedia.process(message))
+ current_tasks[reply.id] = task
+ media = await task
if media.exceptions:
exceptions = "\n".join(media.exceptions)
- await bot.log(text=exceptions, func="DL", chat=message.chat.title or message.chat.first_name, name="traceback.txt")
+ await bot.log(
+ text=exceptions,
+ func="DL",
+ chat=message.chat.title or message.chat.first_name,
+ name="traceback.txt",
+ )
+ current_tasks.pop(reply.id)
return await reply.edit(f"Media Download Failed.")
if media.media_objects:
await message.delete()
await reply.delete()
+ current_tasks.pop(reply.id)
@bot.on_message(filters.user_filter)
@bot.on_edited_message(filters.user_filter)
async def cmd_dispatcher(bot, message):
- func = Config.CMD_DICT[message.text.split(maxsplit=1)[0].lstrip(Config.TRIGGER)]
parsed_message = Message.parse_message(message)
+ func = Config.CMD_DICT[parsed_message.cmd]
try:
- await func(bot, parsed_message)
+ task = asyncio.Task(func(bot, parsed_message))
+ current_tasks[message.id] = task
+ await task
+ except asyncio.exceptions.CancelledError:
+ pass
except BaseException:
- await bot.log(text=str(traceback.format_exc()), chat=message.chat.title or message.chat.first_name, func=func.__name__, name="traceback.txt")
+ await bot.log(
+ text=str(traceback.format_exc()),
+ chat=message.chat.title or message.chat.first_name,
+ func=func.__name__,
+ name="traceback.txt",
+ )
+ current_tasks.pop(message.id)
@bot.on_message(filters.chat_filter)
async def dl_dispatcher(bot, message):
- func = Config.CMD_DICT["dl"]
parsed_message = Message.parse_message(message)
try:
- await func(bot, parsed_message)
+ task = asyncio.Task(dl(bot, parsed_message))
+ current_tasks[message.id] = task
+ await task
+ except asyncio.exceptions.CancelledError:
+ pass
except BaseException:
- await bot.log(text=str(traceback.format_exc()), chat=message.chat.title or message.chat.first_name, func=func.__name__, name="traceback.txt")
+ await bot.log(
+ text=str(traceback.format_exc()),
+ chat=message.chat.title or message.chat.first_name,
+ func=dl.__name__,
+ name="traceback.txt",
+ )
+ current_tasks.pop(message.id)