import asyncio import tempfile import os import logging import glob import json import requests from typing import Optional from mutagen.easyid3 import EasyID3 from mutagen.id3 import ID3, APIC, error logger = logging.getLogger(__name__) async def get_video_info(url: str) -> dict: """Получает информацию о видео через yt-dlp""" try: process = await asyncio.create_subprocess_exec( "yt-dlp", "--dump-json", "--no-playlist", "--cookies", "/addons/download_mp3_to_youtube/cookies.txt", url, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode == 0: return json.loads(stdout.decode()) except Exception as e: logger.warning(f"Не удалось получить информацию о видео: {e}") return None async def download_thumbnail( thumbnail_url: str, ) -> tuple[Optional[bytes], Optional[str]]: """Скачивает обложку видео и возвращает данные и MIME тип""" try: response = requests.get(thumbnail_url, timeout=10) if response.status_code == 200: if "jpeg" in thumbnail_url or "jpg" in thumbnail_url: mime_type = "image/jpeg" elif "png" in thumbnail_url: mime_type = "image/png" else: mime_type = response.headers.get("Content-Type", "image/jpeg") return response.content, mime_type except Exception as e: logger.warning(f"Не удалось скачать обложку: {e}") return None, None def apply_metadata(mp3_path: str, metadata: dict): """Прописывает ID3-теги и обложку в MP3""" try: try: audio = EasyID3(mp3_path) except error: audio = EasyID3() audio.save(mp3_path) audio["title"] = metadata.get("title", "Unknown Title") audio["artist"] = metadata.get("performer", "Unknown Artist") audio.save(mp3_path) if metadata.get("thumbnail_data"): audio = ID3(mp3_path) audio.add( APIC( encoding=3, mime=metadata.get("thumbnail_mime", "image/jpeg"), type=3, # front cover desc="Cover", data=metadata["thumbnail_data"], ) ) audio.save(mp3_path) logger.info("Обложка добавлена в MP3") except Exception as e: logger.warning(f"Не удалось прописать метаданные: {e}") async def download_mp3_isolated(url: str) -> tuple[str, dict]: """Скачивает MP3, добавляет метаданные и возвращает путь к файлу и метаданные""" with tempfile.TemporaryDirectory() as temp_dir: output_template = os.path.join(temp_dir, "audio.%(ext)s") try: logger.info(f"Запускаю yt-dlp для: {url}") video_info = await get_video_info(url) title = "Unknown Title" uploader = "Unknown Artist" thumbnail_url = None duration = 0 if video_info: title = video_info.get("title", "Unknown Title") uploader = video_info.get("uploader", "Unknown Artist") thumbnail_url = video_info.get("thumbnail") if not thumbnail_url and video_info.get("thumbnails"): thumbnails = video_info.get("thumbnails", []) if thumbnails: thumbnail_url = thumbnails[-1].get("url") duration = video_info.get("duration", 0) logger.info(f"Получена информация о видео: {title}") process = await asyncio.create_subprocess_exec( "yt-dlp", "-x", "--audio-format", "mp3", "--cookies", "/addons/download_mp3_to_youtube/cookies.txt", "--audio-quality", "320K", "--no-playlist", "-o", output_template, "--ignore-errors", url, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300) mp3_files = glob.glob(os.path.join(temp_dir, "*.mp3")) if mp3_files: actual_file = mp3_files[0] if os.path.getsize(actual_file) > 1000: with tempfile.NamedTemporaryFile( suffix=".mp3", delete=False ) as final_file: final_filename = final_file.name with ( open(actual_file, "rb") as src, open(final_filename, "wb") as dst, ): dst.write(src.read()) thumbnail_data, mime_type = None, None if thumbnail_url: thumbnail_data, mime_type = await download_thumbnail( thumbnail_url ) if thumbnail_data: logger.info(f"Обложка скачана: {thumbnail_url}") metadata = { "title": title, "performer": uploader, "duration": duration, "thumbnail_data": thumbnail_data, "thumbnail_mime": mime_type, } # Прописываем теги в MP3 apply_metadata(final_filename, metadata) logger.info(f"Успешно скачан и обновлён тегами: {final_filename}") return final_filename, metadata error_msg = stderr.decode() or stdout.decode() or "Файл не создан" raise Exception(f"Ошибка загрузки: {error_msg}") except asyncio.TimeoutError: raise Exception("Таймаут загрузки (5 минут)") except Exception as e: raise e