Files
myfirstprogram/addons/download_mp3_to_youtube/dowloadmp3_to_youtube.py
T
2025-11-23 23:17:00 +03:00

195 lines
7.5 KiB
Python

import asyncio
import tempfile
import os
import logging
import glob
import json
import requests
from typing import Optional
from mutagen.easyid3 import EasyID3
from mutagen.id3 import ID3, APIC, error
logger = logging.getLogger(__name__)
async def get_video_info(url: str) -> Optional[dict]:
"""Получает информацию о видео через yt-dlp"""
try:
process = await asyncio.create_subprocess_exec(
"yt-dlp",
"--dump-json",
"--no-playlist",
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
info = json.loads(stdout.decode())
logger.info(f"Информация получена: {info.get('title', 'Unknown')}")
return info
else:
stderr_msg = stderr.decode()
logger.error(f"yt-dlp ошибка: {stderr_msg}")
except Exception as e:
logger.error(f"Не удалось получить информацию о видео: {e}", exc_info=True)
return None
async def download_thumbnail(
thumbnail_url: str,
) -> tuple[Optional[bytes], Optional[str]]:
"""Скачивает обложку видео и возвращает данные и MIME тип"""
try:
response = requests.get(thumbnail_url, timeout=10)
if response.status_code == 200:
if "jpeg" in thumbnail_url or "jpg" in thumbnail_url:
mime_type = "image/jpeg"
elif "png" in thumbnail_url:
mime_type = "image/png"
else:
mime_type = response.headers.get("Content-Type", "image/jpeg")
return response.content, mime_type
except Exception as e:
logger.warning(f"Не удалось скачать обложку: {e}")
return None, None
def apply_metadata(mp3_path: str, metadata: dict):
"""Прописывает ID3-теги и обложку в MP3"""
try:
# Сначала удаляем старые теги ID3 если есть
try:
ID3(mp3_path).delete()
except Exception as e:
logger.warning(f"Не удалось удалить старые ID3 теги: {e}")
# Добавляем текстовые теги
try:
audio = EasyID3(mp3_path)
except error:
audio = EasyID3()
audio["title"] = metadata.get("title", "Unknown Title")
audio["artist"] = metadata.get("performer", "Unknown Artist")
audio.save(mp3_path, v2_version=4)
logger.info(f"Текстовые теги добавлены: {audio['title']} - {audio['artist']}")
# Добавляем обложку отдельно
if metadata.get("thumbnail_data"):
try:
audio = ID3(mp3_path)
except error:
audio = ID3()
audio.add(
APIC(
encoding=3,
mime=metadata.get("thumbnail_mime", "image/jpeg"),
type=3, # front cover
desc="Cover",
data=metadata["thumbnail_data"],
)
)
audio.save(mp3_path, v2_version=4)
logger.info(f"Обложка добавлена в MP3 ({len(metadata['thumbnail_data'])} байт)")
else:
logger.warning("Данные обложки не найдены в метаданных")
except Exception as e:
logger.error(f"Не удалось прописать метаданные: {e}", exc_info=True)
async def download_mp3_isolated(url: str) -> tuple[str, dict]:
"""Скачивает MP3, добавляет метаданные и возвращает путь к файлу и метаданные"""
with tempfile.TemporaryDirectory() as temp_dir:
output_template = os.path.join(temp_dir, "audio.%(ext)s")
try:
logger.info(f"Запускаю yt-dlp для: {url}")
video_info = await get_video_info(url)
title = "Unknown Title"
uploader = "Unknown Artist"
thumbnail_url = None
duration = 0
if video_info:
title = video_info.get("title", "Unknown Title")
uploader = video_info.get("uploader", "Unknown Artist")
thumbnail_url = video_info.get("thumbnail")
if not thumbnail_url and video_info.get("thumbnails"):
thumbnails = video_info.get("thumbnails", [])
if thumbnails:
thumbnail_url = thumbnails[-1].get("url")
duration = video_info.get("duration", 0)
logger.info(f"Получена информация о видео: title={title}, uploader={uploader}, thumbnail={thumbnail_url is not None}")
else:
logger.warning("video_info = None, используются значения по умолчанию")
process = await asyncio.create_subprocess_exec(
"yt-dlp",
"-x",
"--audio-format",
"mp3",
"--audio-quality",
"320K",
"--no-playlist",
"-o",
output_template,
"--ignore-errors",
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
mp3_files = glob.glob(os.path.join(temp_dir, "*.mp3"))
if mp3_files:
actual_file = mp3_files[0]
if os.path.getsize(actual_file) > 1000:
with tempfile.NamedTemporaryFile(
suffix=".mp3", delete=False
) as final_file:
final_filename = final_file.name
with (
open(actual_file, "rb") as src,
open(final_filename, "wb") as dst,
):
dst.write(src.read())
thumbnail_data, mime_type = None, None
if thumbnail_url:
thumbnail_data, mime_type = await download_thumbnail(
thumbnail_url
)
if thumbnail_data:
logger.info(f"Обложка скачана: {thumbnail_url}")
metadata = {
"title": title,
"performer": uploader,
"duration": duration,
"thumbnail_data": thumbnail_data,
"thumbnail_mime": mime_type,
}
# Прописываем теги в MP3
apply_metadata(final_filename, metadata)
logger.info(f"Успешно скачан и обновлён тегами: {final_filename}")
return final_filename, metadata
error_msg = stderr.decode() or stdout.decode() or "Файл не создан"
raise Exception(f"Ошибка загрузки: {error_msg}")
except asyncio.TimeoutError:
raise Exception("Таймаут загрузки (5 минут)")
except Exception as e:
logger.error(f"Ошибка при скачивании: {e}", exc_info=True)
raise e