mirror of
https://github.com/anonymousx97/social-dl.git
synced 2025-02-20 11:13:19 +08:00
What's New: - Sort Plugins into their respective folders. - download, upload, rename commands. - Help Docstrings. - Proper Logging. - rename .exec cmd to .py
72 lines
2.2 KiB
Python
72 lines
2.2 KiB
Python
import asyncio
|
|
import os
|
|
from typing import AsyncIterable
|
|
|
|
|
|
async def run_shell_cmd(cmd: str) -> str:
|
|
proc: asyncio.create_subprocess_shell = await asyncio.create_subprocess_shell(
|
|
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
|
|
)
|
|
stdout, _ = await proc.communicate()
|
|
return stdout.decode("utf-8")
|
|
|
|
|
|
async def take_ss(video: str, path: str) -> None | str:
|
|
thumb = f"{path}/i.png"
|
|
await run_shell_cmd(
|
|
f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"'''
|
|
)
|
|
if os.path.isfile(thumb):
|
|
return thumb
|
|
|
|
|
|
async def check_audio(file) -> int:
|
|
result = await run_shell_cmd(
|
|
f"ffprobe -v error -show_entries format=nb_streams -of default=noprint_wrappers=1:nokey=1 {file}"
|
|
)
|
|
return int(result or 0) - 1
|
|
|
|
|
|
async def get_duration(file) -> int:
|
|
duration = await run_shell_cmd(
|
|
f"""ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {file}"""
|
|
)
|
|
return round(float(duration.strip() or 0))
|
|
|
|
|
|
class AsyncShell:
|
|
def __init__(self, process: asyncio.create_subprocess_shell):
|
|
self.process: asyncio.create_subprocess_shell = process
|
|
self.full_std: str = ""
|
|
self.is_done: bool = False
|
|
self._task: asyncio.Task | None = None
|
|
|
|
async def read_output(self) -> None:
|
|
while True:
|
|
line: str = (await self.process.stdout.readline()).decode("utf-8")
|
|
if not line:
|
|
break
|
|
self.full_std += line
|
|
self.is_done = True
|
|
await self.process.wait()
|
|
|
|
async def get_output(self) -> AsyncIterable:
|
|
while not self.is_done:
|
|
yield self.full_std
|
|
|
|
def cancel(self) -> None:
|
|
if not self.is_done:
|
|
self.process.kill()
|
|
self._task.cancel()
|
|
|
|
@classmethod
|
|
async def run_cmd(cls, cmd: str, name: str = "AsyncShell") -> "AsyncShell":
|
|
sub_process: AsyncShell = cls(
|
|
process=await asyncio.create_subprocess_shell(
|
|
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
|
|
)
|
|
)
|
|
sub_process._task = asyncio.create_task(sub_process.read_output(), name=name)
|
|
await asyncio.sleep(0.5)
|
|
return sub_process
|