177 lines
6.4 KiB
Python
177 lines
6.4 KiB
Python
import asyncio
|
|
import tempfile
|
|
import os
|
|
import logging
|
|
import glob
|
|
import json
|
|
import requests
|
|
from typing import Optional
|
|
|
|
from mutagen.easyid3 import EasyID3
|
|
from mutagen.id3 import ID3, APIC, error
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def get_video_info(url: str) -> dict:
|
|
"""Получает информацию о видео через yt-dlp"""
|
|
try:
|
|
process = await asyncio.create_subprocess_exec(
|
|
"yt-dlp",
|
|
"--dump-json",
|
|
"--no-playlist",
|
|
"--cookies",
|
|
"/addons/download_mp3_to_youtube/cookies.txt",
|
|
url,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
|
|
stdout, stderr = await process.communicate()
|
|
if process.returncode == 0:
|
|
return json.loads(stdout.decode())
|
|
except Exception as e:
|
|
logger.warning(f"Не удалось получить информацию о видео: {e}")
|
|
return None
|
|
|
|
|
|
async def download_thumbnail(
|
|
thumbnail_url: str,
|
|
) -> tuple[Optional[bytes], Optional[str]]:
|
|
"""Скачивает обложку видео и возвращает данные и MIME тип"""
|
|
try:
|
|
response = requests.get(thumbnail_url, timeout=10)
|
|
if response.status_code == 200:
|
|
if "jpeg" in thumbnail_url or "jpg" in thumbnail_url:
|
|
mime_type = "image/jpeg"
|
|
elif "png" in thumbnail_url:
|
|
mime_type = "image/png"
|
|
else:
|
|
mime_type = response.headers.get("Content-Type", "image/jpeg")
|
|
return response.content, mime_type
|
|
except Exception as e:
|
|
logger.warning(f"Не удалось скачать обложку: {e}")
|
|
return None, None
|
|
|
|
|
|
def apply_metadata(mp3_path: str, metadata: dict):
|
|
"""Прописывает ID3-теги и обложку в MP3"""
|
|
try:
|
|
try:
|
|
audio = EasyID3(mp3_path)
|
|
except error:
|
|
audio = EasyID3()
|
|
audio.save(mp3_path)
|
|
|
|
audio["title"] = metadata.get("title", "Unknown Title")
|
|
audio["artist"] = metadata.get("performer", "Unknown Artist")
|
|
audio.save(mp3_path)
|
|
|
|
if metadata.get("thumbnail_data"):
|
|
audio = ID3(mp3_path)
|
|
audio.add(
|
|
APIC(
|
|
encoding=3,
|
|
mime=metadata.get("thumbnail_mime", "image/jpeg"),
|
|
type=3, # front cover
|
|
desc="Cover",
|
|
data=metadata["thumbnail_data"],
|
|
)
|
|
)
|
|
audio.save(mp3_path)
|
|
logger.info("Обложка добавлена в MP3")
|
|
except Exception as e:
|
|
logger.warning(f"Не удалось прописать метаданные: {e}")
|
|
|
|
|
|
async def download_mp3_isolated(url: str) -> tuple[str, dict]:
|
|
"""Скачивает MP3, добавляет метаданные и возвращает путь к файлу и метаданные"""
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
output_template = os.path.join(temp_dir, "audio.%(ext)s")
|
|
|
|
try:
|
|
logger.info(f"Запускаю yt-dlp для: {url}")
|
|
|
|
video_info = await get_video_info(url)
|
|
title = "Unknown Title"
|
|
uploader = "Unknown Artist"
|
|
thumbnail_url = None
|
|
duration = 0
|
|
|
|
if video_info:
|
|
title = video_info.get("title", "Unknown Title")
|
|
uploader = video_info.get("uploader", "Unknown Artist")
|
|
thumbnail_url = video_info.get("thumbnail")
|
|
if not thumbnail_url and video_info.get("thumbnails"):
|
|
thumbnails = video_info.get("thumbnails", [])
|
|
if thumbnails:
|
|
thumbnail_url = thumbnails[-1].get("url")
|
|
duration = video_info.get("duration", 0)
|
|
logger.info(f"Получена информация о видео: {title}")
|
|
|
|
process = await asyncio.create_subprocess_exec(
|
|
"yt-dlp",
|
|
"-x",
|
|
"--audio-format",
|
|
"mp3",
|
|
"--cookies",
|
|
"/addons/download_mp3_to_youtube/cookies.txt",
|
|
"--audio-quality",
|
|
"320K",
|
|
"--no-playlist",
|
|
"-o",
|
|
output_template,
|
|
"--ignore-errors",
|
|
url,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
|
|
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
|
|
|
|
mp3_files = glob.glob(os.path.join(temp_dir, "*.mp3"))
|
|
if mp3_files:
|
|
actual_file = mp3_files[0]
|
|
if os.path.getsize(actual_file) > 1000:
|
|
with tempfile.NamedTemporaryFile(
|
|
suffix=".mp3", delete=False
|
|
) as final_file:
|
|
final_filename = final_file.name
|
|
|
|
with (
|
|
open(actual_file, "rb") as src,
|
|
open(final_filename, "wb") as dst,
|
|
):
|
|
dst.write(src.read())
|
|
|
|
thumbnail_data, mime_type = None, None
|
|
if thumbnail_url:
|
|
thumbnail_data, mime_type = await download_thumbnail(
|
|
thumbnail_url
|
|
)
|
|
if thumbnail_data:
|
|
logger.info(f"Обложка скачана: {thumbnail_url}")
|
|
|
|
metadata = {
|
|
"title": title,
|
|
"performer": uploader,
|
|
"duration": duration,
|
|
"thumbnail_data": thumbnail_data,
|
|
"thumbnail_mime": mime_type,
|
|
}
|
|
|
|
# Прописываем теги в MP3
|
|
apply_metadata(final_filename, metadata)
|
|
|
|
logger.info(f"Успешно скачан и обновлён тегами: {final_filename}")
|
|
return final_filename, metadata
|
|
|
|
error_msg = stderr.decode() or stdout.decode() or "Файл не создан"
|
|
raise Exception(f"Ошибка загрузки: {error_msg}")
|
|
|
|
except asyncio.TimeoutError:
|
|
raise Exception("Таймаут загрузки (5 минут)")
|
|
except Exception as e:
|
|
raise e
|