It's version 0.4

This commit is contained in:
Niken
2025-10-19 14:28:41 +03:00
parent 772d3d5b83
commit 7b653d4dcc
32 changed files with 775 additions and 326 deletions
+2
View File
@@ -1,7 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+56 -40
View File
@@ -5,6 +5,7 @@ import logging
import glob
import json
import requests
from typing import Optional
from mutagen.easyid3 import EasyID3
from mutagen.id3 import ID3, APIC, error
@@ -16,13 +17,14 @@ async def get_video_info(url: str) -> dict:
"""Получает информацию о видео через yt-dlp"""
try:
process = await asyncio.create_subprocess_exec(
'yt-dlp',
'--dump-json',
'--no-playlist',
'--cookies', '~/myfirstprogramm/addons/example_addon/cookies.txt',
"yt-dlp",
"--dump-json",
"--no-playlist",
"--cookies",
"~/myfirstprogramm/addons/example_addon/cookies.txt",
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
@@ -33,17 +35,19 @@ async def get_video_info(url: str) -> dict:
return None
async def download_thumbnail(thumbnail_url: str) -> tuple[bytes, str]:
async def download_thumbnail(
thumbnail_url: str,
) -> tuple[Optional[bytes], Optional[str]]:
"""Скачивает обложку видео и возвращает данные и MIME тип"""
try:
response = requests.get(thumbnail_url, timeout=10)
if response.status_code == 200:
if 'jpeg' in thumbnail_url or 'jpg' in thumbnail_url:
mime_type = 'image/jpeg'
elif 'png' in thumbnail_url:
mime_type = 'image/png'
if "jpeg" in thumbnail_url or "jpg" in thumbnail_url:
mime_type = "image/jpeg"
elif "png" in thumbnail_url:
mime_type = "image/png"
else:
mime_type = response.headers.get('Content-Type', 'image/jpeg')
mime_type = response.headers.get("Content-Type", "image/jpeg")
return response.content, mime_type
except Exception as e:
logger.warning(f"Не удалось скачать обложку: {e}")
@@ -59,19 +63,19 @@ def apply_metadata(mp3_path: str, metadata: dict):
audio = EasyID3()
audio.save(mp3_path)
audio['title'] = metadata.get('title', 'Unknown Title')
audio['artist'] = metadata.get('performer', 'Unknown Artist')
audio["title"] = metadata.get("title", "Unknown Title")
audio["artist"] = metadata.get("performer", "Unknown Artist")
audio.save(mp3_path)
if metadata.get('thumbnail_data'):
if metadata.get("thumbnail_data"):
audio = ID3(mp3_path)
audio.add(
APIC(
encoding=3,
mime=metadata.get('thumbnail_mime', 'image/jpeg'),
mime=metadata.get("thumbnail_mime", "image/jpeg"),
type=3, # front cover
desc='Cover',
data=metadata['thumbnail_data']
desc="Cover",
data=metadata["thumbnail_data"],
)
)
audio.save(mp3_path)
@@ -96,27 +100,32 @@ async def download_mp3_isolated(url: str) -> tuple[str, dict]:
duration = 0
if video_info:
title = video_info.get('title', 'Unknown Title')
uploader = video_info.get('uploader', 'Unknown Artist')
thumbnail_url = video_info.get('thumbnail')
if not thumbnail_url and video_info.get('thumbnails'):
thumbnails = video_info.get('thumbnails', [])
title = video_info.get("title", "Unknown Title")
uploader = video_info.get("uploader", "Unknown Artist")
thumbnail_url = video_info.get("thumbnail")
if not thumbnail_url and video_info.get("thumbnails"):
thumbnails = video_info.get("thumbnails", [])
if thumbnails:
thumbnail_url = thumbnails[-1].get('url')
duration = video_info.get('duration', 0)
thumbnail_url = thumbnails[-1].get("url")
duration = video_info.get("duration", 0)
logger.info(f"Получена информация о видео: {title}")
process = await asyncio.create_subprocess_exec(
'yt-dlp',
'-x', '--audio-format', 'mp3',
'--cookies', '~/myfirstprogramm/addons/example_addon/cookies.txt',
'--audio-quality', '320K',
'--no-playlist',
'-o', output_template,
'--ignore-errors',
"yt-dlp",
"-x",
"--audio-format",
"mp3",
"--cookies",
"~/myfirstprogramm/addons/example_addon/cookies.txt",
"--audio-quality",
"320K",
"--no-playlist",
"-o",
output_template,
"--ignore-errors",
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
@@ -125,24 +134,31 @@ async def download_mp3_isolated(url: str) -> tuple[str, dict]:
if mp3_files:
actual_file = mp3_files[0]
if os.path.getsize(actual_file) > 1000:
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as final_file:
with tempfile.NamedTemporaryFile(
suffix=".mp3", delete=False
) as final_file:
final_filename = final_file.name
with open(actual_file, 'rb') as src, open(final_filename, 'wb') as dst:
with (
open(actual_file, "rb") as src,
open(final_filename, "wb") as dst,
):
dst.write(src.read())
thumbnail_data, mime_type = None, None
if thumbnail_url:
thumbnail_data, mime_type = await download_thumbnail(thumbnail_url)
thumbnail_data, mime_type = await download_thumbnail(
thumbnail_url
)
if thumbnail_data:
logger.info(f"Обложка скачана: {thumbnail_url}")
metadata = {
'title': title,
'performer': uploader,
'duration': duration,
'thumbnail_data': thumbnail_data,
'thumbnail_mime': mime_type
"title": title,
"performer": uploader,
"duration": duration,
"thumbnail_data": thumbnail_data,
"thumbnail_mime": mime_type,
}
# Прописываем теги в MP3
+23 -16
View File
@@ -3,11 +3,14 @@ from aiogram.filters import Command
from models.state import BotState
from utils.antispam import admin_required
from logging import getLogger
from .dowloadmp3_to_youtube import *
from .dowloadmp3_to_youtube import download_mp3_isolated
import tempfile
import asyncio
from os import path, unlink
logger = getLogger(__name__)
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("dowmp3"))
@admin_required(5)
@@ -18,7 +21,9 @@ def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
return
url = args[1]
logger.info(f"Получена команда /dowmp3 от user_id={message.from_user.id}, url={url}")
logger.info(
f"Получена команда /dowmp3 от user_id={message.from_user.id}, url={url}"
)
status_msg = await message.reply("⏳ Скачиваю аудио... Это займет 1-2 минуты")
@@ -29,30 +34,32 @@ def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
if file_size < 1000:
raise Exception("Файл слишком маленький")
await status_msg.edit_text(f"✅ Аудио готово! Отправляю...")
await status_msg.edit_text("✅ Аудио готово! Отправляю...")
# Подготавливаем аудио файл
audio_input = types.FSInputFile(filename)
# Базовые параметры
send_params = {
'audio': audio_input,
'title': metadata['title'][:64],
'performer': metadata['performer'][:64],
'duration': int(metadata['duration']) if metadata['duration'] else None,
'caption': f"🎵 {metadata['title']}\n👤 {metadata['performer']}"
"audio": audio_input,
"title": metadata["title"][:64],
"performer": metadata["performer"][:64],
"duration": int(metadata["duration"]) if metadata["duration"] else None,
"caption": f"🎵 {metadata['title']}\n👤 {metadata['performer']}",
}
# Добавляем обложку если есть
if metadata['thumbnail_data']:
if metadata["thumbnail_data"]:
try:
# Создаем временный файл для обложки
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as thumb_file:
with tempfile.NamedTemporaryFile(
suffix=".jpg", delete=False
) as thumb_file:
thumb_filename = thumb_file.name
thumb_file.write(metadata['thumbnail_data'])
thumb_file.write(metadata["thumbnail_data"])
# Используем FSInputFile для обложки
send_params['thumbnail'] = types.FSInputFile(thumb_filename)
send_params["thumbnail"] = types.FSInputFile(thumb_filename)
logger.info("Обложка добавлена к сообщению")
except Exception as e:
@@ -62,7 +69,7 @@ def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
await message.answer_audio(**send_params)
# Удаляем временный файл обложки если создавали
if 'thumb_filename' in locals() and path.exists(thumb_filename):
if "thumb_filename" in locals() and path.exists(thumb_filename):
unlink(thumb_filename)
await status_msg.delete()
@@ -74,8 +81,8 @@ def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
await status_msg.edit_text(f"❌ Ошибка: {str(e)}")
logger.error(f"Ошибка при скачивании: {e}")
finally:
if 'filename' in locals() and path.exists(filename):
if "filename" in locals() and path.exists(filename):
try:
unlink(filename)
except:
pass
except OSError:
pass