I add command /id and /dowmp3 for dowload video with Youtube and i improve code.

It's version 0.2.0
This commit is contained in:
Niken
2025-10-04 18:56:50 +03:00
parent 7702c9a85b
commit 5197518029
10 changed files with 400 additions and 85 deletions
View File
+7
View File
@@ -0,0 +1,7 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
@@ -0,0 +1,158 @@
import asyncio
import tempfile
import os
import logging
import glob
import json
import requests
from mutagen.easyid3 import EasyID3
from mutagen.id3 import ID3, APIC, error
logger = logging.getLogger(__name__)
async def get_video_info(url: str) -> dict:
"""Получает информацию о видео через yt-dlp"""
try:
process = await asyncio.create_subprocess_exec(
'yt-dlp',
'--dump-json',
'--no-playlist',
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
return json.loads(stdout.decode())
except Exception as e:
logger.warning(f"Не удалось получить информацию о видео: {e}")
return None
async def download_thumbnail(thumbnail_url: str) -> tuple[bytes, str]:
"""Скачивает обложку видео и возвращает данные и MIME тип"""
try:
response = requests.get(thumbnail_url, timeout=10)
if response.status_code == 200:
if 'jpeg' in thumbnail_url or 'jpg' in thumbnail_url:
mime_type = 'image/jpeg'
elif 'png' in thumbnail_url:
mime_type = 'image/png'
else:
mime_type = response.headers.get('Content-Type', 'image/jpeg')
return response.content, mime_type
except Exception as e:
logger.warning(f"Не удалось скачать обложку: {e}")
return None, None
def apply_metadata(mp3_path: str, metadata: dict):
"""Прописывает ID3-теги и обложку в MP3"""
try:
try:
audio = EasyID3(mp3_path)
except error:
audio = EasyID3()
audio.save(mp3_path)
audio['title'] = metadata.get('title', 'Unknown Title')
audio['artist'] = metadata.get('performer', 'Unknown Artist')
audio.save(mp3_path)
if metadata.get('thumbnail_data'):
audio = ID3(mp3_path)
audio.add(
APIC(
encoding=3,
mime=metadata.get('thumbnail_mime', 'image/jpeg'),
type=3, # front cover
desc='Cover',
data=metadata['thumbnail_data']
)
)
audio.save(mp3_path)
logger.info("Обложка добавлена в MP3")
except Exception as e:
logger.warning(f"Не удалось прописать метаданные: {e}")
async def download_mp3_isolated(url: str) -> tuple[str, dict]:
"""Скачивает MP3, добавляет метаданные и возвращает путь к файлу и метаданные"""
with tempfile.TemporaryDirectory() as temp_dir:
output_template = os.path.join(temp_dir, "audio.%(ext)s")
try:
logger.info(f"Запускаю yt-dlp для: {url}")
video_info = await get_video_info(url)
title = "Unknown Title"
uploader = "Unknown Artist"
thumbnail_url = None
duration = 0
if video_info:
title = video_info.get('title', 'Unknown Title')
uploader = video_info.get('uploader', 'Unknown Artist')
thumbnail_url = video_info.get('thumbnail')
if not thumbnail_url and video_info.get('thumbnails'):
thumbnails = video_info.get('thumbnails', [])
if thumbnails:
thumbnail_url = thumbnails[-1].get('url')
duration = video_info.get('duration', 0)
logger.info(f"Получена информация о видео: {title}")
process = await asyncio.create_subprocess_exec(
'yt-dlp',
'-x', '--audio-format', 'mp3',
'--audio-quality', '320K',
'--no-playlist',
'-o', output_template,
'--ignore-errors',
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
mp3_files = glob.glob(os.path.join(temp_dir, "*.mp3"))
if mp3_files:
actual_file = mp3_files[0]
if os.path.getsize(actual_file) > 1000:
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as final_file:
final_filename = final_file.name
with open(actual_file, 'rb') as src, open(final_filename, 'wb') as dst:
dst.write(src.read())
thumbnail_data, mime_type = None, None
if thumbnail_url:
thumbnail_data, mime_type = await download_thumbnail(thumbnail_url)
if thumbnail_data:
logger.info(f"Обложка скачана: {thumbnail_url}")
metadata = {
'title': title,
'performer': uploader,
'duration': duration,
'thumbnail_data': thumbnail_data,
'thumbnail_mime': mime_type
}
# Прописываем теги в MP3
apply_metadata(final_filename, metadata)
logger.info(f"Успешно скачан и обновлён тегами: {final_filename}")
return final_filename, metadata
error_msg = stderr.decode() or stdout.decode() or "Файл не создан"
raise Exception(f"Ошибка загрузки: {error_msg}")
except asyncio.TimeoutError:
raise Exception("Таймаут загрузки (5 минут)")
except Exception as e:
raise e
+81
View File
@@ -0,0 +1,81 @@
from aiogram import types, Dispatcher, Bot
from aiogram.filters import Command
from models.state import BotState
from utils.antispam import admin_required
from logging import getLogger
from .dowloadmp3_to_youtube import *
from os import path, unlink
logger = getLogger(__name__)
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("dowmp3"))
@admin_required(5)
async def cmd_dowmp3(message: types.Message):
args = message.text.split(maxsplit=1)
if len(args) < 2:
await message.reply("❌ Укажи ссылку: /dowmp3 <youtube_url>")
return
url = args[1]
logger.info(f"Получена команда /dowmp3 от user_id={message.from_user.id}, url={url}")
status_msg = await message.reply("⏳ Скачиваю аудио... Это займет 1-2 минуты")
try:
filename, metadata = await download_mp3_isolated(url)
file_size = path.getsize(filename)
if file_size < 1000:
raise Exception("Файл слишком маленький")
await status_msg.edit_text(f"✅ Аудио готово! Отправляю...")
# Подготавливаем аудио файл
audio_input = types.FSInputFile(filename)
# Базовые параметры
send_params = {
'audio': audio_input,
'title': metadata['title'][:64],
'performer': metadata['performer'][:64],
'duration': int(metadata['duration']) if metadata['duration'] else None,
'caption': f"🎵 {metadata['title']}\n👤 {metadata['performer']}"
}
# Добавляем обложку если есть
if metadata['thumbnail_data']:
try:
# Создаем временный файл для обложки
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as thumb_file:
thumb_filename = thumb_file.name
thumb_file.write(metadata['thumbnail_data'])
# Используем FSInputFile для обложки
send_params['thumbnail'] = types.FSInputFile(thumb_filename)
logger.info("Обложка добавлена к сообщению")
except Exception as e:
logger.warning(f"Не удалось добавить обложку: {e}")
# Отправляем аудио
await message.answer_audio(**send_params)
# Удаляем временный файл обложки если создавали
if 'thumb_filename' in locals() and path.exists(thumb_filename):
unlink(thumb_filename)
await status_msg.delete()
logger.info(f"Аудио отправлено пользователю {message.from_user.id}")
except asyncio.TimeoutError:
await status_msg.edit_text("❌ Превышено время ожидания (5 минут)")
except Exception as e:
await status_msg.edit_text(f"❌ Ошибка: {str(e)}")
logger.error(f"Ошибка при скачивании: {e}")
finally:
if 'filename' in locals() and path.exists(filename):
try:
unlink(filename)
except:
pass
+7
View File
@@ -0,0 +1,7 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+13
View File
@@ -0,0 +1,13 @@
from logging import getLogger
from aiogram import Dispatcher, Bot
from aiogram.types import Message
from aiogram.filters import Command
from models.state import BotState
logger = getLogger(__name__)
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("id"))
async def id(message: Message):
id = message.from_user.id
await message.reply(str(id))
+53
View File
@@ -0,0 +1,53 @@
import importlib
import sys
from pathlib import Path
class AddonManager:
def __init__(self, dp, state, bot):
self.dp = dp
self.state = state
self.bot = bot
self.loaded = {}
def load(self, name: str):
"""Загрузить аддон по имени"""
if name in self.loaded:
return f"Аддон {name} уже загружен"
module_path = f"addons.{name}"
module = importlib.import_module(module_path)
if hasattr(module, "register"):
module.register(self.dp, self.state, self.bot)
self.loaded[name] = module
return f"✅ Аддон {name} подключен"
return f"⚠️ У аддона {name} нет функции register"
def unload(self, name: str):
"""Отключить аддон"""
module = self.loaded.get(name)
if not module:
return f"Аддон {name} не загружен"
if hasattr(module, "unregister"):
module.unregister(self.dp)
# Удаляем из sys.modules, чтобы можно было перезагрузить
sys.modules.pop(f"addons.{name}", None)
self.loaded.pop(name)
return f"❌ Аддон {name} отключен"
def reload(self, name: str):
"""Перезагрузить аддон"""
self.unload(name)
return self.load(name)
def list_addons(self):
"""Возвращает список (имя, состояние) для всех аддонов"""
addons_path = Path("addons")
result = []
for addon in addons_path.iterdir():
if addon.is_dir() and (addon / "__init__.py").exists():
name = addon.name
status = "✅ Загружен" if name in self.loaded else "❌ Выключен"
result.append((name, status))
return result