Compare commits

..

18 Commits

Author SHA1 Message Date
Niken c49b00ba15 Доработка гит 2026-03-15 20:36:16 +03:00
Niken 4b50941b86 Merge branch 'main' of ssh://192.168.0.54:30009/niken/myfirstprogram 2026-03-15 20:34:55 +03:00
Niken d11c0ee467 It's version 0.7 I upgrade watcher_service.py 2025-12-09 17:22:53 +03:00
Niken 7495062a8a It's version 0.6.2 I clear code 2025-11-23 23:17:00 +03:00
Niken b6c1c60609 It's version 0.6.1 I fixed the set command before, if the user didn't exist, it wouldn't record him. 2025-11-16 14:34:37 +03:00
Niken be9ec785f4 It's version 0.6 I add users DB 2025-11-16 14:15:44 +03:00
Niken e1837400ef It's version 0.5.1 I fix dowmp3 2025-11-11 21:35:16 +03:00
Niken 6d421249ee It's version 0.5 2025-10-29 21:46:22 +03:00
Niken e56401bf1d It's version 0.5 2025-10-29 21:46:06 +03:00
Niken a71a7fbd0c I add requirements.txt
It's version 0.4.1
2025-10-19 14:59:35 +03:00
Niken 7b653d4dcc It's version 0.4 2025-10-19 14:28:41 +03:00
Niken 772d3d5b83 I add command /admin /vadmin /iadmin /hello I create database and I improve code
It's version 0.3.0
2025-10-11 23:02:30 +03:00
Niken cce8c7dc70 I add command /analytics and I improve code
It's version 0.2.3
2025-10-05 23:16:19 +03:00
Niken 3ef1327b67 I add command /prasp
It's version 0.2.2
2025-10-05 22:29:49 +03:00
Niken c5f6da31ba I add command /id and /dowmp4 for dowload video with Youtube and i improve code.
It's version 0.2.1
2025-10-04 23:18:56 +03:00
Niken 5197518029 I add command /id and /dowmp3 for dowload video with Youtube and i improve code.
It's version 0.2.0
2025-10-04 18:56:50 +03:00
Niken 7702c9a85b I add command /del for delite message and i improve code.
It's version 0.1.1
2025-10-04 17:14:26 +03:00
Niken 58b47bec5e It's my tg bot for schedule. version 0.1 2025-10-04 16:59:38 +03:00
54 changed files with 3936 additions and 0 deletions
+25
View File
@@ -0,0 +1,25 @@
/.env
/code.txt
.DS_Store
.idea/
__pycache__/
bot.log
bot/__pycache__/
handlers/__pycache__/
models/__pycache__/
services/__pycache__/
storage/__pycache__/
utils/__pycache__/
storage/message.txt
/addons/dowloadmp4_to_youtube/gettoken.py
/drevo.py
/addons/download_mp3_to_youtube/cookies.txt
/storage/message.db
/addons/todo/todo.db
/addons/rule34/urls.db
/addons/x_days_to/days_to_new_year.db
/addons/hello/photo_2025-11-17_20-57-54.jpg
/addons/poll/img.png
/addons/hello/мемы/
/xaxa.py
/addons/hello/img.png
+12
View File
@@ -0,0 +1,12 @@
# Changelog
Все значимые изменения проекта.
## [Unreleased]
- Добавлена проверка MD5-хеша скриншота в `services/watcher_service.py` — бот теперь отправляет/закрепляет фото только при первом обнаружении и при изменениях.
- Исправлен импорт в `addons/x_days_to/__init__.py` (модуль `x_days_to.py` теперь импортируется корректно).
- (Замечание) Найдена потенциальная ошибка: `addons/send_message/handlers.py` использует `aiohttp` без явного импорта — нужно поправить.
## 0.1.0 - WIP
- Начальная версия проекта. Планируется приватный релиз после исправлений и добавления CI.
+103
View File
@@ -0,0 +1,103 @@
# MyFirstProgramm
Лёгкий Telegram-бот на Python (aiogram + Playwright) для автоматической проверки и рассылки расписаний, а также набора аддонов.
## Краткое описание
Проект периодически проверяет расписание на сайте (через Playwright), делает скриншоты нужных фрагментов, вычисляет хеш (MD5) и отправляет/закрепляет изображение в целевых чатах Telegram только при изменении (чтобы не флудить чат). Также в `addons/` есть дополнительные модули (мини-приложения).
## Структура (важные файлы/папки)
- `main.py` — точка входа приложения (запуск бота).
- `config.py` — конфигурация (переменные окружения).
- `services/` — сервисы: `schedule_service.py`, `watcher_service.py` (логика слежки/хэшей).
- `models/state.py` — in-memory состояние бота (кэши, last_clip_hash и т.д.).
- `addons/` — набор дополнительных модулей (каждый аддон экспортирует `register` / `unregister`).
- `requirements.txt` — зависимости.
## Быстрый старт (локально)
1. Установите зависимости (рекомендуется виртуальное окружение):
```bash
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
2. Создайте `.env` (или экспортируйте переменные окружения):
- `TELEGRAM_BOT_TOKEN` — токен бота
- (опционально) `DEEPGRAM_API_KEY`, `ACCESS_TOKEN` — если используются аддоны
Создайте файл `.env` рядом с `config.py`:
```env
TELEGRAM_BOT_TOKEN=your_bot_token_here
#DEEPGRAM_API_KEY=...
#ACCESS_TOKEN=...
```
3. Запустите бота (пример):
```bash
python main.py
```
(В вашем окружении запуск может быть оформлен скриптом — например `./Desktop/my.sh`.)
## Конфигурация
Основные настройки находятся в `config.py`. Обратите внимание, что `Config` сейчас поднимает исключение, если `TELEGRAM_BOT_TOKEN` не задан.
## Что изменилось (коротко)
- Добавлена проверка MD5-хеша скриншота в `services/watcher_service.py` — теперь бот отправляет фото и закрепляет сообщение только при первом обнаружении и при изменениях.
- Исправлен импорт в `addons/x_days_to/__init__.py` (модуль назывался `x_days_to.py`, вместо отсутствующего `handlers`).
(Более полный список изменений — в `CHANGELOG.md`.)
## Чек-лист готовности к релизу (private GitHub)
Ниже — рекомендации и текущий статус (на основе быстрого статического анализа):
- [ ] Тесты (unit/integration) — отсутствуют. Рекомендую добавить хотя бы базовые тесты для критичных сервисов (например мок ScheduleService и проверка логики хеширования).
- [x] README (есть) — ✅
- [x] CHANGELOG (есть) — ✅
- [ ] Линт/статический анализ — есть предупреждения/ошибки (см. ниже). Нужно исправить перед релизом.
- [ ] Безопасность/секреты — убедитесь, что `TELEGRAM_BOT_TOKEN` и другие ключи не коммитятся; добавьте `.env.example` и `.gitignore`.
- [ ] CI — рекомендую GitHub Actions: lint (ruff), тесты (pytest), security scan.
- [ ] Версионирование — добавьте `__version__` в `pyproject`/`setup.py` или тег релиза в Git.
### Проблемы, найденные статически
- `addons/send_message/handlers.py`: используется `aiohttp` без импорта (нужно `import aiohttp`). Это вызовет ошибку при импорте этого аддона.
- Возможны другие runtime-ошибки — рекомендую прогнать `ruff check . --fix` и запустить бота в dev окружении.
## Рекомендации перед релизом
1. Исправить импорт/синтаксисные ошибки (см. `ruff` / `get_errors`).
2. Добавить `.env.example` и `.gitignore` (включить `.venv/`, `storage/` с чувствительными файлами и логами).
3. Добавить базовые тесты и настроить GitHub Actions (lint + tests).
4. Решить поведение state-персистенции: `BotState` хранит состояние в памяти — после рестарта всё теряется. Если важно сохранять историю/фиксированный pinned_id — добавить simple JSON/SQLite storage.
5. Проверить, что все аддоны регистрируют хендлеры корректно (в `x_days_to` ранее не было регистрации а-ля `@dp.message(...)` — возможно требуется доработить).
## Как выпустить на приватный GitHub
1. Создайте репозиторий и инициализируйте git:
```bash
git init
git add .
git commit -m "Initial commit"
# создайте приватный репо в GitHub и затем
git remote add origin git@github.com:yourorg/yourrepo.git
git branch -M main
git push -u origin main
```
2. Настройте GitHub Actions (workflow) для lint & tests.
3. Добавьте релизный тег и changelog entry:
```bash
git tag -a v0.1.0 -m "Initial private release"
git push origin v0.1.0
```
---
Если хотите, я могу сразу:
- добавить `.env.example` и `.gitignore`;
- исправить `addons/send_message/handlers.py` (добавить импорт `aiohttp`) и/или добавить базовый тест для `watcher_service`.
Скажите, что выполнить следующим шагом — добавлять `.env.example`/`.gitignore`, исправлять найденную ошибку или настроить CI (создать пример workflow).
+30
View File
@@ -0,0 +1,30 @@
telegram_bot/
├── main.py # Точка входа
├── config.py # Конфигурация
├── bot/
│ ├── __init__.py
│ ├── core.py # Основной класс бота
│ └── filters.py # Кастомные фильтры
├── handlers/
│ ├── __init__.py
│ ├── admin.py # Админ-команды
│ ├── schedule.py # Команды расписания
│ ├── media.py # Медиа-команды
│ └── common.py # Общие команды
├── services/
│ ├── __init__.py
│ ├── schedule_service.py # Логика расписания
│ ├── watcher_service.py # Слежка за изменениями
│ ├── media_service.py # Работа с медиа
│ └── gpt_service.py # GPT-функции
├── models/
│ ├── __init__.py
│ └── state.py # Модели данных
├── utils/
│ ├── __init__.py
│ ├── antispam.py # Антиспам система
│ ├── file_utils.py # Работа с файлами
│ └── validators.py # Валидаторы
└── storage/
├── __init__.py
└── message_storage.py # Работа с сообщениями
View File
+8
View File
@@ -0,0 +1,8 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+178
View File
@@ -0,0 +1,178 @@
import asyncio
import tempfile
import os
import logging
import glob
import json
import dropbox
import sys
import io
import unicodedata
import uuid
from config import Config
# Настройка кодировки для всего приложения
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
logger = logging.getLogger(__name__)
DROPBOX_TOKEN = Config.Token
LIMIT = 2 * 1024 * 1024 * 1024 # 2 ГБ
async def safe_filename(name: str) -> str:
"""Создает безопасное имя файла"""
# Нормализуем Unicode символы
normalized = unicodedata.normalize("NFKD", name)
# Убираем акценты и специальные символы, оставляем только ASCII
ascii_name = normalized.encode("ascii", "ignore").decode("ascii")
# Заменяем проблемные символы
safe_name = "".join(
c if c.isalnum() or c in (" ", "-", "_", ".") else "_" for c in ascii_name
)
# Убираем множественные подчеркивания и обрезаем длину
safe_name = "_".join(filter(None, safe_name.split("_")))
return safe_name[:100] or f"video_{uuid.uuid4().hex[:8]}"
async def get_video_info(url: str) -> dict:
"""Получает информацию о видео через yt-dlp"""
try:
process = await asyncio.create_subprocess_exec(
"yt-dlp",
"--dump-json",
"--no-playlist",
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
result = json.loads(stdout.decode("utf-8", errors="ignore"))
logger.info(
f"Информация о видео получена: {result.get('title', 'Unknown')}"
)
return result
else:
error_msg = stderr.decode("utf-8", errors="ignore")
logger.warning(f"yt-dlp ошибка: {error_msg}")
except Exception as e:
logger.error(f"Не удалось получить информацию о видео: {e}")
return None
async def download_mp4_to_dropbox(url: str) -> tuple[str, dict]:
"""Скачивает MP4 в среднем качестве БЫСТРО, загружает в Dropbox"""
with tempfile.TemporaryDirectory() as temp_dir:
output_template = os.path.join(temp_dir, "video.%(ext)s")
try:
logger.info(f"Запускаю yt-dlp для: {url}")
video_info = await get_video_info(url)
title = "Unknown_Title"
uploader = "Unknown_Uploader"
duration = 0
if video_info:
title = await safe_filename(video_info.get("title", "Unknown_Title"))
uploader = await safe_filename(
video_info.get("uploader", "Unknown_Uploader")
)
duration = video_info.get("duration", 0)
logger.info(f"Обработано видео: {title}")
# ОПТИМИЗИРОВАННЫЕ НАСТРОЙКИ ДЛЯ СКОРОСТИ
download_process = await asyncio.create_subprocess_exec(
"yt-dlp",
"-f",
"bestvideo[height<=720][filesize<800M]+bestaudio/best[height<=720][filesize<800M]",
"--no-playlist",
"-o",
output_template,
"--ignore-errors",
"--no-warnings",
"--format-sort",
"quality,res:720,size:800M",
"--concurrent-fragments",
"4",
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
download_process.communicate(), timeout=600
) # Уменьшил таймаут
# Остальной код без изменений...
if download_process.returncode != 0:
error_msg = (
stderr.decode("utf-8", errors="ignore")
if stderr
else "Unknown error"
)
logger.error(f"Ошибка скачивания: {error_msg}")
raise Exception(f"Ошибка скачивания: {error_msg}")
mp4_files = glob.glob(os.path.join(temp_dir, "*.mp4"))
if not mp4_files:
video_files = glob.glob(os.path.join(temp_dir, "*.*"))
video_files = [
f
for f in video_files
if f.lower().endswith((".mp4", ".mkv", ".avi", ".mov", ".webm"))
]
if video_files:
mp4_files = [video_files[0]]
if mp4_files:
actual_file = mp4_files[0]
size = os.path.getsize(actual_file)
if size > LIMIT:
raise Exception("Файл превышает лимит 2 ГБ")
safe_title = await safe_filename(title)
final_filename = f"{safe_title}.mp4"
dbx = dropbox.Dropbox(DROPBOX_TOKEN)
dropbox_path = f"/{final_filename}"
logger.info(
f"Загружаем файл в Dropbox: {dropbox_path} (размер: {size / (1024 * 1024):.1f} MB)"
)
with open(actual_file, "rb") as f:
file_data = f.read()
dbx.files_upload(
file_data,
dropbox_path,
mode=dropbox.files.WriteMode("overwrite"),
)
shared_link = dbx.sharing_create_shared_link_with_settings(dropbox_path)
link = shared_link.url.replace("?dl=0", "?dl=1")
metadata = {
"title": title,
"uploader": uploader,
"duration": duration,
"filesize": size,
"quality": "optimized for speed",
}
logger.info(f"Успешно загружено в Dropbox: {link}")
return link, metadata
raise Exception("Видео файл не найден после скачивания")
except asyncio.TimeoutError:
raise Exception("Таймаут загрузки (10 минут)")
except Exception as e:
logger.error(f"Общая ошибка: {e}")
raise e
+56
View File
@@ -0,0 +1,56 @@
import logging
from aiogram import Dispatcher, Bot
from aiogram.filters import Command
from models.state import BotState
from .dowmp4 import download_mp4_to_dropbox
logger = logging.getLogger(__name__)
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("dowmp4"))
# @admin_required(4)
async def dowmp4_handler(message):
"""Обработчик команды /dowmp4"""
try:
url = message.text.split(" ", 1)[1].strip()
except IndexError:
await message.answer("Пожалуйста, укажите URL видео после команды /dowmp4")
return
processing_msg = await message.answer(
"⏳ Начинаю обработку видео... Это может занять несколько минут."
)
try:
# Скачиваем и загружаем в Dropbox
public_url, metadata = await download_mp4_to_dropbox(url)
# Форматируем информацию о видео
duration_str = f"{int(metadata['duration'] // 60)}:{int(metadata['duration'] % 60):02d}"
size_mb = f"{metadata['filesize'] / (1024 * 1024):.1f} MB"
caption = (
f"🎥 **{metadata.get('title', 'Видео')}**\n"
f"👤 **Автор:** {metadata.get('uploader', 'Неизвестен')}\n"
f"⏱ **Длительность:** {duration_str}\n"
f"📦 **Размер:** {size_mb}\n"
f"🔗 **Ссылка:** [Скачать]({public_url})"
)
await processing_msg.delete() # Удаляем сообщение о обработке
await message.answer(
f"✅ **Видео успешно обработано!**\n\n{caption}",
parse_mode="Markdown",
disable_web_page_preview=True,
)
except ValueError as e:
await message.answer(f"❌ Ошибка: {str(e)}")
except Exception as e:
logger.error(f"Ошибка при обработке /dowmp4: {e}", exc_info=True)
await message.answer(
"❌ Произошла ошибка при обработке видео. Попробуйте позже."
)
@@ -0,0 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
@@ -0,0 +1,194 @@
import asyncio
import tempfile
import os
import logging
import glob
import json
import requests
from typing import Optional
from mutagen.easyid3 import EasyID3
from mutagen.id3 import ID3, APIC, error
logger = logging.getLogger(__name__)
async def get_video_info(url: str) -> Optional[dict]:
"""Получает информацию о видео через yt-dlp"""
try:
process = await asyncio.create_subprocess_exec(
"yt-dlp",
"--dump-json",
"--no-playlist",
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
info = json.loads(stdout.decode())
logger.info(f"Информация получена: {info.get('title', 'Unknown')}")
return info
else:
stderr_msg = stderr.decode()
logger.error(f"yt-dlp ошибка: {stderr_msg}")
except Exception as e:
logger.error(f"Не удалось получить информацию о видео: {e}", exc_info=True)
return None
async def download_thumbnail(
thumbnail_url: str,
) -> tuple[Optional[bytes], Optional[str]]:
"""Скачивает обложку видео и возвращает данные и MIME тип"""
try:
response = requests.get(thumbnail_url, timeout=10)
if response.status_code == 200:
if "jpeg" in thumbnail_url or "jpg" in thumbnail_url:
mime_type = "image/jpeg"
elif "png" in thumbnail_url:
mime_type = "image/png"
else:
mime_type = response.headers.get("Content-Type", "image/jpeg")
return response.content, mime_type
except Exception as e:
logger.warning(f"Не удалось скачать обложку: {e}")
return None, None
def apply_metadata(mp3_path: str, metadata: dict):
"""Прописывает ID3-теги и обложку в MP3"""
try:
# Сначала удаляем старые теги ID3 если есть
try:
ID3(mp3_path).delete()
except Exception as e:
logger.warning(f"Не удалось удалить старые ID3 теги: {e}")
# Добавляем текстовые теги
try:
audio = EasyID3(mp3_path)
except error:
audio = EasyID3()
audio["title"] = metadata.get("title", "Unknown Title")
audio["artist"] = metadata.get("performer", "Unknown Artist")
audio.save(mp3_path, v2_version=4)
logger.info(f"Текстовые теги добавлены: {audio['title']} - {audio['artist']}")
# Добавляем обложку отдельно
if metadata.get("thumbnail_data"):
try:
audio = ID3(mp3_path)
except error:
audio = ID3()
audio.add(
APIC(
encoding=3,
mime=metadata.get("thumbnail_mime", "image/jpeg"),
type=3, # front cover
desc="Cover",
data=metadata["thumbnail_data"],
)
)
audio.save(mp3_path, v2_version=4)
logger.info(f"Обложка добавлена в MP3 ({len(metadata['thumbnail_data'])} байт)")
else:
logger.warning("Данные обложки не найдены в метаданных")
except Exception as e:
logger.error(f"Не удалось прописать метаданные: {e}", exc_info=True)
async def download_mp3_isolated(url: str) -> tuple[str, dict]:
"""Скачивает MP3, добавляет метаданные и возвращает путь к файлу и метаданные"""
with tempfile.TemporaryDirectory() as temp_dir:
output_template = os.path.join(temp_dir, "audio.%(ext)s")
try:
logger.info(f"Запускаю yt-dlp для: {url}")
video_info = await get_video_info(url)
title = "Unknown Title"
uploader = "Unknown Artist"
thumbnail_url = None
duration = 0
if video_info:
title = video_info.get("title", "Unknown Title")
uploader = video_info.get("uploader", "Unknown Artist")
thumbnail_url = video_info.get("thumbnail")
if not thumbnail_url and video_info.get("thumbnails"):
thumbnails = video_info.get("thumbnails", [])
if thumbnails:
thumbnail_url = thumbnails[-1].get("url")
duration = video_info.get("duration", 0)
logger.info(f"Получена информация о видео: title={title}, uploader={uploader}, thumbnail={thumbnail_url is not None}")
else:
logger.warning("video_info = None, используются значения по умолчанию")
process = await asyncio.create_subprocess_exec(
"yt-dlp",
"-x",
"--audio-format",
"mp3",
"--audio-quality",
"320K",
"--no-playlist",
"-o",
output_template,
"--ignore-errors",
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
mp3_files = glob.glob(os.path.join(temp_dir, "*.mp3"))
if mp3_files:
actual_file = mp3_files[0]
if os.path.getsize(actual_file) > 1000:
with tempfile.NamedTemporaryFile(
suffix=".mp3", delete=False
) as final_file:
final_filename = final_file.name
with (
open(actual_file, "rb") as src,
open(final_filename, "wb") as dst,
):
dst.write(src.read())
thumbnail_data, mime_type = None, None
if thumbnail_url:
thumbnail_data, mime_type = await download_thumbnail(
thumbnail_url
)
if thumbnail_data:
logger.info(f"Обложка скачана: {thumbnail_url}")
metadata = {
"title": title,
"performer": uploader,
"duration": duration,
"thumbnail_data": thumbnail_data,
"thumbnail_mime": mime_type,
}
# Прописываем теги в MP3
apply_metadata(final_filename, metadata)
logger.info(f"Успешно скачан и обновлён тегами: {final_filename}")
return final_filename, metadata
error_msg = stderr.decode() or stdout.decode() or "Файл не создан"
raise Exception(f"Ошибка загрузки: {error_msg}")
except asyncio.TimeoutError:
raise Exception("Таймаут загрузки (5 минут)")
except Exception as e:
logger.error(f"Ошибка при скачивании: {e}", exc_info=True)
raise e
@@ -0,0 +1,93 @@
from aiogram import types, Dispatcher, Bot
from aiogram.filters import Command
from models.state import BotState
from utils.antispam import admin_required
from logging import getLogger
from .dowloadmp3_to_youtube import download_mp3_isolated
import tempfile
import asyncio
from os import path, unlink
logger = getLogger(__name__)
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("dowmp3"))
@admin_required(5)
async def cmd_dowmp3(message: types.Message):
args = message.text.split(maxsplit=1)
if len(args) < 2:
await message.reply("❌ Укажи ссылку: /dowmp3 <youtube_url>")
return
url = args[1]
logger.info(
f"Получена команда /dowmp3 от user_id={message.from_user.id}, url={url}"
)
status_msg = await message.reply("⏳ Скачиваю аудио... Это займет 1-2 минуты")
try:
filename, metadata = await download_mp3_isolated(url)
file_size = path.getsize(filename)
if file_size < 1000:
raise Exception("Файл слишком маленький")
await status_msg.edit_text("✅ Аудио готово! Отправляю...")
# Подготавливаем аудио файл
audio_input = types.FSInputFile(filename)
# Базовые параметры
send_params = {
"audio": audio_input,
"title": metadata["title"][:64],
"performer": metadata["performer"][:64],
"duration": int(metadata["duration"]) if metadata["duration"] else None,
"caption": f"🎵 {metadata['title']}\n👤 {metadata['performer']}",
}
# Добавляем обложку если есть
thumb_filename = None
if metadata["thumbnail_data"]:
try:
# Создаем временный файл для обложки
with tempfile.NamedTemporaryFile(
suffix=".jpg", delete=False
) as thumb_file:
thumb_filename = thumb_file.name
thumb_file.write(metadata["thumbnail_data"])
# Используем FSInputFile для обложки
send_params["thumbnail"] = types.FSInputFile(thumb_filename)
logger.info(f"Обложка подготовлена: {thumb_filename} ({len(metadata['thumbnail_data'])} байт)")
except Exception as e:
logger.error(f"Не удалось добавить обложку: {e}", exc_info=True)
# Отправляем аудио
await message.answer_audio(**send_params)
# Удаляем временный файл обложки если создавали
if thumb_filename and path.exists(thumb_filename):
try:
unlink(thumb_filename)
logger.info("Временный файл обложки удален")
except OSError as e:
logger.warning(f"Не удалось удалить файл обложки: {e}")
await status_msg.delete()
logger.info(f"Аудио отправлено пользователю {message.from_user.id}")
except asyncio.TimeoutError:
await status_msg.edit_text("❌ Превышено время ожидания (5 минут)")
except Exception as e:
await status_msg.edit_text(f"❌ Ошибка: {str(e)}")
logger.error(f"Ошибка при скачивании: {e}")
finally:
if "filename" in locals() and path.exists(filename):
try:
unlink(filename)
except OSError:
pass
+9
View File
@@ -0,0 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+195
View File
@@ -0,0 +1,195 @@
import logging
import base64
from io import BytesIO
import asyncio
import aiohttp
from PIL import Image
from aiogram import Dispatcher, Bot
from aiogram.types import Message, BufferedInputFile
from aiogram.filters import Command
from models.state import BotState
from config import Config
from storage.message_storage import save_message
from transformers import pipeline
from utils.antispam import saving
logger = logging.getLogger(__name__)
SD_URL = "http://192.168.31.95:7860/sdapi/v1/txt2img"
# Загружаем пайплайн перевода один раз при старте (синхронный)
translator = pipeline("translation", model="Helsinki-NLP/opus-mt-ru-en")
async def translate_to_en(text: str) -> str:
try:
# выполняем перевод в отдельном потоке, чтобы не блокировать event loop
result = await asyncio.to_thread(translator, text, max_length=512)
return result[0]["translation_text"]
except Exception as e:
logger.error(f"Ошибка перевода: {e}")
return text
async def generate_img2img(prompt: str, init_image: BytesIO) -> BytesIO | None:
"""
Генерация изображения по методу img2img.
:param prompt: текстовый промт (уже переведённый на английский)
:param init_image: входное изображение в BytesIO
:return: BytesIO с результатом или None при ошибке
"""
try:
# Определяем размеры оригинала
init_image.seek(0)
with Image.open(init_image) as img:
width, height = img.size
# кодируем входное изображение в base64
init_image_base64 = base64.b64encode(init_image.getvalue()).decode("utf-8")
payload = {
"init_images": [init_image_base64],
"prompt": prompt,
"negative_prompt": "blurry, low quality, bad anatomy, watermark, text, cropped",
"steps": 25,
"width": width, # берём ширину оригинала
"height": height, # берём высоту оригинала
"sampler_name": "Euler a",
"scheduler": "Karras", # исправлен ключ
"cfg_scale": 10,
"seed": -1,
"denoising_strength": 0.45,
"restore_faces": True,
"override_settings": {
"sd_model_checkpoint": "waiNSFWIllustrious_v150.safetensors"
},
}
async with aiohttp.ClientSession() as session:
async with session.post(
SD_URL.replace("txt2img", "img2img"), json=payload
) as resp:
if resp.status != 200:
logger.error(f"Stable Diffusion img2img API error: {resp.status}")
return None
r = await resp.json()
image_base64 = r["images"][0]
return BytesIO(base64.b64decode(image_base64))
except Exception as e:
logger.error(f"Ошибка img2img: {e}")
return None
# sd_xl_base_1.0.safetensors
# waiNSFWIllustrious_v150.safetensors
async def generate_image(prompt: str) -> BytesIO | None:
payload = {
"prompt": prompt,
"negative_prompt": "blurry, low quality, bad anatomy, watermark, text, cropped",
"steps": 20,
"width": 1024,
"height": 1024,
"sampler_name": "Euler a", # сэмплер
"cfg_scale": 7, # насколько строго следовать промту
"seed": -1, # -1 = случайный сид
"batch_size": 1, # сколько картинок за раз
"n_iter": 1, # сколько раз повторить генерацию
"restore_faces": False, # восстановление лиц
"tiling": False, # тайлинг для текстур
"enable_hr": False, # highres fix (двухэтапная генерация)
"denoising_strength": 0.7, # сила денойзинга (актуально при enable_hr или img2img)
"hr_scale": 2, # во сколько раз увеличить при highres fix
"hr_upscaler": "Latent", # апскейлер для highres fix
"override_settings": {
"sd_model_checkpoint": "waiNSFWIllustrious_v150.safetensors" # выбор модели
},
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(SD_URL, json=payload) as resp:
if resp.status != 200:
logger.error(f"Stable Diffusion API error: {resp.status}")
return None
r = await resp.json()
image_base64 = r["images"][0]
return BytesIO(base64.b64decode(image_base64))
except Exception as e:
logger.error(f"Ошибка генерации изображения: {e}")
return None
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("draw"))
async def draw(message: Message):
save_message(message.chat.id, message.message_id)
if message.from_user.id in Config.BAN:
msg = await message.reply("Вы в бане")
save_message(msg.chat.id, msg.message_id)
else:
user_prompt = message.text.replace("/draw", "").strip()
if not user_prompt:
confirm_msg = await message.answer("❗ Укажи промт после команды /draw")
save_message(confirm_msg.chat.id, confirm_msg.message_id)
return
en_prompt = await translate_to_en(user_prompt)
logger.info(f"Промт переведен: {user_prompt} -> {en_prompt}")
img_bytes = await generate_image(en_prompt)
if img_bytes:
img_bytes.seek(0)
photo = BufferedInputFile(img_bytes.read(), filename="result.png")
msg = await bot.send_photo(chat_id=message.chat.id, photo=photo)
save_message(msg.chat.id, msg.message_id)
else:
error_msg = await message.answer("⚠️ Ошибка при генерации изображения.")
save_message(error_msg.chat.id, error_msg.message_id)
@dp.message(Command("img2img"))
@saving
async def img2img_with_caption(message: Message, bot: Bot):
raw_caption = message.caption or ""
user_prompt = raw_caption.replace("/img2img", "").strip()
if not user_prompt:
await message.answer(
"❗ Укажи промт в подписи к фото после команды /img2img"
)
return
en_prompt = await translate_to_en(user_prompt)
logger.info(f"Промт для img2img переведен: {user_prompt} -> {en_prompt}")
try:
if message.photo:
# Берём последнее (самое большое) фото
photo = message.photo[-1].file_id
# Отправляем в SD API по file_id (как в iadmin)
# Здесь отличие: SD API требует base64, поэтому file_id нужно скачать
# Но логика построена как в iadmin — сначала берём file_id
file = await bot.get_file(photo)
file_bytes = await bot.download_file(file.file_path)
img_bytes = await generate_img2img(
en_prompt, BytesIO(file_bytes.read())
)
if img_bytes:
img_bytes.seek(0)
photo = BufferedInputFile(
img_bytes.read(), filename="img2img_result.png"
)
msg = await bot.send_photo(chat_id=message.chat.id, photo=photo)
else:
msg = await message.answer("⚠️ Ошибка при img2img генерации.")
else:
msg = await message.answer("❗ Пришли фото с подписью /img2img <промт>")
save_message(msg.chat.id, msg.message_id)
except Exception as e:
await message.answer(f"⚠️ Ошибка: {e}")
+9
View File
@@ -0,0 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+718
View File
@@ -0,0 +1,718 @@
import base64
import aiohttp
import logging
from aiogram import Dispatcher, Bot
from aiogram.types import Message, InlineKeyboardButton, CallbackQuery
from aiogram.utils.keyboard import InlineKeyboardBuilder
from utils.antispam import saving, save_message, admin_required
from aiogram.filters import Command
from models.state import BotState
import hashlib
import json
from typing import Dict, List, Tuple, Optional
logger = logging.getLogger(__name__)
# Глобальные переменные на уровне модуля
current_model = "google/gemma-3-12b"
model_hash_map: Dict[str, str] = {} # Словарь для хранения соответствия хэшей и полных названий моделей
# Список моделей, которые точно поддерживают изображения (можно расширять)
IMAGE_SUPPORTED_MODELS = [
"llava", # Модели LLaVA
"vision", # Модели с поддержкой vision
"clip", # CLIP модели
"qwen2-vl", # Qwen с vision
"cogvlm", # CogVLM
"paligemma", # PaLI-Gemma
"gemma-2-2b-it", # Gemma 2 может поддерживать
"gemma-2-9b-it",
"gemini", # Gemini
"llava-hf", # LLaVA HuggingFace
"moondream", # Moondream
]
def get_model_hash(model_name: str) -> str:
"""Создает короткий хэш для названия модели"""
return hashlib.md5(model_name.encode()).hexdigest()[:16]
def truncate_model_name(model_name: str, max_length: int = 40) -> str:
"""Сокращает название модели для отображения"""
if len(model_name) <= max_length:
return model_name
return model_name[:max_length - 3] + "..."
def supports_images(model_name: str) -> bool:
"""Проверяет, поддерживает ли модель изображения"""
model_lower = model_name.lower()
# Если модель содержит ключевые слова, связанные с поддержкой изображений
for keyword in IMAGE_SUPPORTED_MODELS:
if keyword.lower() in model_lower:
return True
return False
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
chat_history = {}
MAX_HISTORY = 20 # храним последние 20 сообщений (user+assistant)
MODEL = "google/gemma-3-12b"
# MODEL = "google/gemma-3n-e4b"
URL_BASE = "10.180.139.124"
URL = f"http://{URL_BASE}:1234/v1/chat/completions"
# URL = "http://192.168.31.95:1234/v1/chat/completions"
MODELS_URL = f"http://{URL_BASE}:1234/v1/models" # URL для получения списка моделей
# Используем глобальные переменные
global current_model, model_hash_map
# # Системный промт для всех чатов
SYSTEM_PROMPT = {
"role": "system",
"content": [
{
"type": "text",
"text": (
"Ты — интеллектуальный помощник.\n"
"Твои задачи:\n"
"- Отвечать ясно, полно и структурировано.\n"
"- Поддерживать дружелюбный, уважительный и профессиональный тон.\n"
"- Адаптироваться под контекст: будь кратким в простых вопросах и подробным в сложных.\n"
"- Использовать списки, таблицы и примеры для наглядности.\n"
"- Никогда не придумывать факты: если информации нет — честно говори об этом.\n"
"- Уважать безопасность: не давать советов по самоповреждению, незаконным действиям или опасным практикам.\n"
"- Поддерживать баланс: быть полезным и информативным, но не перегружать лишними деталями.\n"
"- В диалоге стремиться продвигать разговор вперёд: задавай уточняющие вопросы или предлагай новые направления.\n\n"
"Твой стиль:\n"
"- Ясный, дружелюбный, но не навязчивый.\n"
"- Умный собеседник, который помогает думать глубже.\n"
"- Используй эмодзи для вовлечённости, но умеренно.\n"
"- Будь адаптивным: технические темы — строго и точно, креативные — свободно и образно.\n\n"
"Главная цель: увеличивать знания и понимание собеседника, делая общение полезным и приятным."
),
}
],
}
# SYSTEM_PROMPT = {
# "role": "system",
# "content": [
# {
# "type": "text",
# "text": (
# "Ты — хитрая и злая ассистентка по имени Nika. "
# "Ты девушка, общаешься холодно и уверенно, "
# "Ты ненавидешь коммунизм и если тебя о нем спросят ты четко по пунктам скажешь почему коммунизм это бред"
# ),
# }
# ],
# }
@dp.message(Command("models"))
@admin_required(0)
@saving
async def list_models(message: Message):
"""Получить список доступных моделей и выбрать одну"""
global model_hash_map, current_model
try:
async with aiohttp.ClientSession() as session:
async with session.get(MODELS_URL) as resp:
if resp.status != 200:
error_text = await resp.text()
await message.reply(
f"❌ Ошибка при получении списка моделей: {resp.status} {error_text}"
)
return
data = await resp.json()
# Формат ответа может отличаться в зависимости от API
models = []
# Формат OpenAI-compatible API
if isinstance(data, dict) and "data" in data:
models = [model["id"] for model in data["data"]]
# Другой возможный формат
elif isinstance(data, list):
models = data
# Если не распознали формат
else:
await message.reply(f"❌ Неизвестный формат ответа от API: {data}")
return
if not models:
await message.reply("❌ Нет доступных моделей")
return
# Создаем клавиатуру для выбора модели
builder = InlineKeyboardBuilder()
for model in models:
# Создаем короткий хэш для callback_data
model_hash = get_model_hash(model)
model_hash_map[model_hash] = model # Сохраняем в глобальный словарь
# Сокращаем для отображения
display_name = truncate_model_name(model)
# Помечаем текущую модель и добавляем иконку для моделей с поддержкой изображений
prefix = "" if model == current_model else ""
icon = "🖼️ " if supports_images(model) else ""
builder.row(
InlineKeyboardButton(
text=f"{prefix}{icon}{display_name}",
callback_data=f"model:{model_hash}"
)
)
# Кнопки для управления
builder.row(
InlineKeyboardButton(
text="🔄 Обновить список",
callback_data="refresh_models"
),
InlineKeyboardButton(
text="📋 Показать текущую",
callback_data="show_current"
),
InlineKeyboardButton(
text="🖼️ Модели с картинками",
callback_data="show_image_models"
)
)
# Сохраняем карту хэшей в файл для отладки (опционально)
try:
with open("model_hash_map.json", "w") as f:
json.dump(model_hash_map, f, indent=2, ensure_ascii=False)
except:
pass
# Разделяем модели на поддерживающие изображения и обычные
image_models = [m for m in models if supports_images(m)]
text_only_models = [m for m in models if not supports_images(m)]
models_list = "\n".join(
[f"{i + 1}. {model} {'🖼️' if supports_images(model) else ''}" for i, model in
enumerate(models[:10])])
if len(models) > 10:
models_list += f"\n... и еще {len(models) - 10} моделей"
await message.reply(
f"📋 Доступные модели ({len(models)} шт.):\n"
f"🖼️ Поддерживают изображения: {len(image_models)}\n"
f"📝 Только текст: {len(text_only_models)}\n"
f"Текущая модель: <code>{current_model}</code> {'🖼️' if supports_images(current_model) else '📝'}\n\n"
"Первые 10 моделей:\n" + models_list + "\n\n"
"Выберите модель из списка ниже (🖼️ - поддерживает изображения):",
reply_markup=builder.as_markup(),
parse_mode="HTML"
)
except Exception as e:
logger.error(f"Ошибка при получении списка моделей: {e}")
await message.reply(f"❌ Ошибка при получении списка моделей: {e}")
@dp.message(Command("setmodel"))
@admin_required(0)
@saving
async def set_model(message: Message):
"""Установить модель через команду"""
global current_model, model_hash_map
if not message.text:
await message.reply("❌ Укажите название модели: /setmodel <название_модели>")
return
args = message.text.split(maxsplit=1)
if len(args) < 2:
await message.reply("❌ Укажите название модели: /setmodel <название_модели>")
return
new_model = args[1]
# Проверяем, есть ли модель в текущем кэше (опционально)
if model_hash_map:
model_hashes = list(model_hash_map.values())
if new_model not in model_hashes:
await message.reply(
f"⚠️ Модель <code>{new_model}</code> не найдена в последнем списке.\n"
f"Используйте /models для просмотра доступных моделей.\n"
f"Продолжаем смену модели...",
parse_mode="HTML"
)
old_model = current_model
current_model = new_model
# Сообщаем о поддержке изображений
image_support = "🖼️ поддерживает изображения" if supports_images(new_model) else "📝 только текст"
await message.reply(
f"✅ Модель успешно изменена! ({image_support})\n"
f"📊 Старая модель: <code>{old_model}</code>\n"
f"📈 Новая модель: <code>{current_model}</code>",
parse_mode="HTML"
)
@dp.message(Command("currentmodel"))
@saving
async def show_current_model(message: Message):
"""Показать текущую модель"""
global current_model
image_support = "🖼️ Поддерживает изображения" if supports_images(current_model) else "📝 Только текст"
await message.reply(
f"🤖 Текущая модель: <code>{current_model}</code>\n"
f"{image_support}",
parse_mode="HTML"
)
async def prepare_gpt_request(chat_id: int, message: Message) -> Tuple[Optional[List[dict]], Optional[str]]:
"""Подготавливает запрос к GPT, обрабатывая текст и изображения"""
if chat_id not in chat_history:
chat_history[chat_id] = [SYSTEM_PROMPT]
content_blocks = []
user_prompt = None
# Текст после команды или caption
if message.text:
parts = message.text.split(maxsplit=1)
if len(parts) > 1:
user_prompt = parts[1]
if message.caption:
user_prompt = message.caption
# Добавляем текстовый промпт, если есть
if user_prompt:
content_blocks.append({"type": "text", "text": user_prompt})
elif not message.photo:
# Если нет ни текста, ни фото
return None, "❗ Укажи текст или прикрепи фото"
# Обрабатываем фото, если модель поддерживает изображения
if message.photo and supports_images(current_model):
try:
photo = message.photo[-1]
file = await bot.get_file(photo.file_id)
file_bytes = await bot.download_file(file.file_path)
image_b64 = base64.b64encode(file_bytes.read()).decode("utf-8")
content_blocks.append(
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_b64}"},
}
)
except Exception as e:
logger.error(f"Ошибка при обработке изображения: {e}")
# Если не удалось обработать изображение, продолжаем без него
if not user_prompt:
return None, "❌ Ошибка при обработке изображения"
elif message.photo and not supports_images(current_model):
# Если модель не поддерживает изображения, отправляем текстовое описание
if not user_prompt:
# Если нет текста, просим переформулировать
return None, f"⚠️ Текущая модель <code>{current_model}</code> не поддерживает изображения.\nОтправьте текстовое описание или смените модель на поддерживающую изображения (🖼️)."
# Если есть текст, просто игнорируем изображение и отправляем текст
logger.info(f"Модель {current_model} не поддерживает изображения, отправляем только текст")
# Если после всех проверок content_blocks пустой
if not content_blocks:
return None, "❗ Укажи текст или прикрепи фото"
# Добавляем новое сообщение в историю
chat_history[chat_id].append({"role": "user", "content": content_blocks})
# Ограничиваем историю (оставляем последние MAX_HISTORY сообщений)
if len(chat_history[chat_id]) > MAX_HISTORY:
chat_history[chat_id] = chat_history[chat_id][-MAX_HISTORY:]
return chat_history[chat_id], None
@dp.message(Command("gpt"))
@saving
async def ask_gpt(message: Message):
global current_model
chat_id = message.chat.id
# Подготавливаем запрос
prepared_history, error_message = await prepare_gpt_request(chat_id, message)
if error_message:
await message.reply(error_message, parse_mode="HTML")
return
payload = {
"model": current_model,
"messages": prepared_history,
"temperature": 0.7,
"max_tokens": 4096,
"stream": False,
"ttl": 300,
}
# Флаг, указывающий на попытку отправки изображения
has_image = bool(message.photo)
image_attempt_failed = False
try:
async with aiohttp.ClientSession() as session:
async with session.post(URL, json=payload) as resp:
if resp.status != 200:
error_text = await resp.text()
# Проверяем, если ошибка из-за изображения
if has_image and "does not support images" in error_text:
image_attempt_failed = True
logger.warning(
f"Модель {current_model} не поддерживает изображения, пробуем без изображения")
# Удаляем последнее сообщение из истории (с изображением)
chat_history[chat_id].pop()
# Пробуем отправить только текст
if message.caption:
# Используем caption как промпт
content_blocks = [{"type": "text", "text": message.caption}]
elif message.text:
parts = message.text.split(maxsplit=1)
if len(parts) > 1:
content_blocks = [{"type": "text", "text": parts[1]}]
else:
content_blocks = [{"type": "text", "text": "Опиши изображение"}]
else:
content_blocks = [{"type": "text", "text": "Опиши изображение"}]
# Добавляем текстовый запрос
chat_history[chat_id].append({"role": "user", "content": content_blocks})
# Обновляем payload
payload["messages"] = chat_history[chat_id]
# Повторяем запрос без изображения
async with session.post(URL, json=payload) as retry_resp:
if retry_resp.status != 200:
error_text = await retry_resp.text()
await message.reply(
f"❌ Ошибка LM Studio: {retry_resp.status} {error_text}"
)
return
data = await retry_resp.json()
reply_text = data["choices"][0]["message"]["content"]
else:
await message.reply(
f"❌ Ошибка LM Studio: {resp.status} {error_text}"
)
return
else:
data = await resp.json()
reply_text = data["choices"][0]["message"]["content"]
# Сохраняем ответ ассистента в историю
chat_history[chat_id].append(
{
"role": "assistant",
"content": [{"type": "text", "text": reply_text}],
}
)
# Ограничиваем снова (чтобы не разрасталось)
if len(chat_history[chat_id]) > MAX_HISTORY:
chat_history[chat_id] = chat_history[chat_id][-MAX_HISTORY:]
# Добавляем заметку о попытке отправки изображения
image_note = ""
if image_attempt_failed:
image_note = f"\n\n⚠️ Модель не поддерживает изображения, отправлен только текстовый запрос."
# Делим сообщение на части по 4000 символов
MAX_LEN = 4000
for i in range(0, len(reply_text), MAX_LEN):
chunk = reply_text[i:i + MAX_LEN]
if i == 0:
msg = await message.reply(f"🤖 Ответ (модель: {current_model}){image_note}:\n{chunk}")
else:
msg = await message.reply(chunk)
save_message(msg.chat.id, msg.message_id)
except Exception as e:
logger.error(f"Ошибка при запросе к LM Studio: {e}")
await message.reply(f"❌ Ошибка при запросе к LM Studio: {e}")
@dp.message(Command("agpt"))
@admin_required(0)
@saving
async def ask_gpt_admin(message: Message):
global current_model
chat_id = message.chat.id
# Подготавливаем запрос
prepared_history, error_message = await prepare_gpt_request(chat_id, message)
if error_message:
await message.reply(error_message, parse_mode="HTML")
return
payload = {
"model": current_model,
"messages": prepared_history,
"temperature": 0.7,
"max_tokens": 4096,
"stream": False,
"ttl": 300,
}
target_chat_id = -1003038389942
try:
async with aiohttp.ClientSession() as session:
async with session.post(URL, json=payload) as resp:
if resp.status != 200:
error_text = await resp.text()
await message.reply(
f"❌ Ошибка LM Studio: {resp.status} {error_text}"
)
return
data = await resp.json()
reply_text = data["choices"][0]["message"]["content"]
# Сохраняем ответ ассистента в историю
chat_history[chat_id].append(
{
"role": "assistant",
"content": [{"type": "text", "text": reply_text}],
}
)
# Ограничиваем снова (чтобы не разрасталось)
if len(chat_history[chat_id]) > MAX_HISTORY:
chat_history[chat_id] = chat_history[chat_id][-MAX_HISTORY:]
# Делим сообщение на части по 4000 символов
MAX_LEN = 4000
for i in range(0, len(reply_text), MAX_LEN):
chunk = reply_text[i:i + MAX_LEN]
msg = await bot.send_message(chat_id=target_chat_id,
text=f"🤖 Ответ (модель: {current_model}):\n{chunk}")
save_message(msg.chat.id, msg.message_id)
except Exception as e:
logger.error(f"Ошибка при запросе к LM Studio: {e}")
await message.reply(f"❌ Ошибка при запросе к LM Studio: {e}")
@dp.message(Command("igpt"))
@admin_required(0)
@saving
async def ask_gpt_interactive(message: Message):
global current_model
raw_text = message.text or message.caption
if not raw_text and not (message.photo):
await message.reply(
"❌ Укажи ID чата и текст или прикрепи фото: /igpt <chat_id> <сообщение>"
)
return
args = raw_text.split(maxsplit=2) if raw_text else []
if len(args) < 2:
await message.reply("❌ Укажи ID чата: /igpt <chat_id> <сообщение>")
return
try:
target_chat_id = int(args[1]) # первый аргумент — ID чата
except ValueError:
await message.reply("❌ Неверный формат chat_id")
return
user_prompt = args[2] if len(args) > 2 else ""
# Используем локальный chat_id для обработки запроса
local_chat_id = message.chat.id
if local_chat_id not in chat_history:
chat_history[local_chat_id] = [SYSTEM_PROMPT]
content_blocks = []
if user_prompt:
content_blocks.append({"type": "text", "text": user_prompt})
# Фото → base64 → image_url (только если модель поддерживает)
if message.photo and supports_images(current_model):
try:
photo = message.photo[-1]
file = await bot.get_file(photo.file_id)
file_bytes = await bot.download_file(file.file_path)
image_b64 = base64.b64encode(file_bytes.read()).decode("utf-8")
content_blocks.append(
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_b64}"},
}
)
except Exception as e:
logger.error(f"Ошибка при обработке изображения: {e}")
if not user_prompt:
await message.reply("❌ Ошибка при обработке изображения")
return
elif message.photo and not supports_images(current_model):
await message.reply(f"⚠️ Текущая модель <code>{current_model}</code> не поддерживает изображения.",
parse_mode="HTML")
if not user_prompt:
return
if not content_blocks:
await message.reply("❗ Укажи текст или прикрепи фото")
return
# Добавляем новое сообщение в историю
chat_history[local_chat_id].append({"role": "user", "content": content_blocks})
# Ограничиваем историю
if len(chat_history[local_chat_id]) > MAX_HISTORY:
chat_history[local_chat_id] = chat_history[local_chat_id][-MAX_HISTORY:]
payload = {
"model": current_model,
"messages": chat_history[local_chat_id],
"temperature": 0.7,
"max_tokens": 4096,
"stream": False,
"ttl": 300,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(URL, json=payload) as resp:
if resp.status != 200:
error_text = await resp.text()
await message.reply(
f"❌ Ошибка LM Studio: {resp.status} {error_text}"
)
return
data = await resp.json()
reply_text = data["choices"][0]["message"]["content"]
# Сохраняем ответ ассистента в историю
chat_history[local_chat_id].append(
{
"role": "assistant",
"content": [{"type": "text", "text": reply_text}],
}
)
# Ограничиваем снова
if len(chat_history[local_chat_id]) > MAX_HISTORY:
chat_history[local_chat_id] = chat_history[local_chat_id][-MAX_HISTORY:]
# Делим сообщение на части по 4000 символов
MAX_LEN = 4000
for i in range(0, len(reply_text), MAX_LEN):
chunk = reply_text[i:i + MAX_LEN]
msg = await bot.send_message(chat_id=target_chat_id,
text=f"🤖 Ответ (модель: {current_model}):\n{chunk}")
save_message(msg.chat.id, msg.message_id)
except Exception as e:
logger.error(f"Ошибка при запросе к LM Studio: {e}")
await message.reply(f"❌ Ошибка при запросе к LM Studio: {e}")
@dp.message(Command("clear"))
async def clear(message: Message):
chat_history.pop(message.chat.id, None)
await message.reply("✅ История диалога очищена")
# Создаем простые фильтры для callback
async def model_callback_filter(callback_query: CallbackQuery) -> bool:
return callback_query.data.startswith("model:")
async def refresh_models_filter(callback_query: CallbackQuery) -> bool:
return callback_query.data == "refresh_models"
async def show_current_filter(callback_query: CallbackQuery) -> bool:
return callback_query.data == "show_current"
async def show_image_models_filter(callback_query: CallbackQuery) -> bool:
return callback_query.data == "show_image_models"
@dp.callback_query(model_callback_filter)
async def select_model_callback(callback_query: CallbackQuery):
"""Обработка выбора модели из инлайн-клавиатуры"""
global current_model, model_hash_map
model_hash = callback_query.data.split(":", 1)[1]
# Пытаемся загрузить из файла, если в памяти нет
if not model_hash_map:
try:
with open("model_hash_map.json", "r") as f:
model_hash_map = json.load(f)
except Exception as e:
logger.error(f"Ошибка загрузки model_hash_map из файла: {e}")
# Получаем полное название модели из карты
if model_hash not in model_hash_map:
await callback_query.answer("❌ Ошибка: модель не найдена в кэше. Используйте /models для обновления списка",
show_alert=True)
return
model_id = model_hash_map[model_hash]
old_model = current_model
current_model = model_id
# Сообщаем о поддержке изображений
image_support = "🖼️ поддерживает изображения" if supports_images(model_id) else "📝 только текст"
# Обновляем сообщение
try:
await callback_query.message.edit_text(
f"✅ Модель успешно изменена! ({image_support})\n\n"
f"📊 <b>Старая модель:</b>\n<code>{old_model}</code>\n\n"
f"📈 <b>Новая модель:</b>\n<code>{current_model}</code>\n\n"
f"Для просмотра всех моделей используйте /models",
parse_mode="HTML"
)
except Exception as e:
logger.error(f"Ошибка при редактировании сообщения: {e}")
# Отправляем подтверждение
await callback_query.answer(f"✅ Модель изменена на:\n{truncate_model_name(model_id, 30)}")
@dp.callback_query(refresh_models_filter)
async def refresh_models_callback(callback_query: CallbackQuery):
"""Обновить список моделей"""
await list_models(callback_query.message)
await callback_query.answer("🔄 Список моделей обновлен")
@dp.callback_query(show_current_filter)
async def show_current_callback(callback_query: CallbackQuery):
"""Показать текущую модель"""
global current_model
image_support = "🖼️ Поддерживает изображения" if supports_images(current_model) else "📝 Только текст"
await callback_query.answer(f"Текущая модель: {current_model}\n{image_support}", show_alert=True)
@dp.callback_query(show_image_models_filter)
async def show_image_models_callback(callback_query: CallbackQuery):
"""Показать модели с поддержкой изображений"""
global model_hash_map
if not model_hash_map:
await callback_query.answer("❌ Список моделей пуст. Используйте /models", show_alert=True)
return
image_models = [model for model in model_hash_map.values() if supports_images(model)]
if not image_models:
await callback_query.answer("🖼️ Нет моделей с поддержкой изображений", show_alert=True)
return
models_text = "\n".join([f"{model}" for model in image_models[:10]])
if len(image_models) > 10:
models_text += f"\n... и еще {len(image_models) - 10} моделей"
await callback_query.answer(f"🖼️ Модели с поддержкой изображений ({len(image_models)} шт.):\n{models_text}",
show_alert=True)
+9
View File
@@ -0,0 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+69
View File
@@ -0,0 +1,69 @@
from aiogram import Dispatcher, Bot
from aiogram.types import Message, FSInputFile
from aiogram.filters import Command
from models.state import BotState
from config import Config
import logging
from utils.antispam import admin_required
from storage.message_storage import save_message # импортируем функцию
import os
import asyncio
from random import choice, seed
from time import time
logger = logging.getLogger(__name__)
async def list_files():
# Запускаем синхронный os.listdir в отдельном потоке
return await asyncio.to_thread(os.listdir, "/Users/mac/myfirstprogramm/addons/hello/мемы")
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
i = 0
@dp.message(Command("hello"))
@admin_required(1)
async def hello(message: Message):
# сохраняем саму команду пользователя
nonlocal i
save_message(message.chat.id, message.message_id)
for admin_id in Config.ADMINS:
try:
if 1345058877 == admin_id:
name = Config.Names.get(admin_id, "Админ")
photo = FSInputFile("/Users/mac/myfirstprogramm/addons/hello/photo_2025-11-17_20-57-54.jpg")
msg = await bot.send_photo(
chat_id=admin_id, photo=photo, caption=f"🤖 Я готов к работе, господин {name}!"
)
save_message(msg.chat.id, msg.message_id)
logger.info(f"Фото отправлено админу {admin_id} ({name})")
elif 6394047531 == admin_id:
png = choice(await list_files())
i += 1
seed(time() + i)
name = Config.Names.get(admin_id, "Админ")
photo = FSInputFile(f"/Users/mac/myfirstprogramm/addons/hello/мемы/{png}")
msg = await bot.send_photo(
chat_id=admin_id, photo=photo, caption=f"🤖 Я готов к работе, господин {name}!"
)
save_message(msg.chat.id, msg.message_id)
logger.info(f"Фото {f"/Users/mac/myfirstprogramm/addons/hello/мемы/{png}"} отправлено админу {admin_id} ({name})")
else:
name = Config.Names.get(admin_id, "Админ")
msg = await bot.send_message(
chat_id=admin_id, text=f"🤖 Я готов к работе, господин {name}!"
)
# сохраняем сообщение, отправленное админу
save_message(msg.chat.id, msg.message_id)
logger.info(f"Сообщение отправлено админу {admin_id} ({name})")
except Exception as e:
logger.error(f"Ошибка при отправке админу {admin_id}: {e}")
confirm_msg = await message.answer("✅ Всем админам отправлено приветствие.")
# сохраняем подтверждение пользователю
save_message(confirm_msg.chat.id, confirm_msg.message_id)
+9
View File
@@ -0,0 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+37
View File
@@ -0,0 +1,37 @@
from logging import getLogger
from aiogram import Dispatcher, Bot
from aiogram.types import Message
from aiogram.filters import Command
from models.state import BotState
API_URL = "http://127.0.0.1:7700/speak"
logger = getLogger(__name__)
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("id"))
async def id_handler(message: Message):
# Разбираем аргументы команды
args = message.text.split()
if len(args) > 1:
try:
user_id = int(args[1]) # берём ID из аргумента
except ValueError:
await message.reply("ID должен быть числом")
return
else:
# если аргумента нет — берём ID самого пользователя
user_id = message.from_user.id
# Получаем фото профиля
photos = await bot.get_user_profile_photos(user_id=user_id)
if photos.total_count > 0:
for i, photo_sizes in enumerate(photos.photos):
file_id = photo_sizes[-1].file_id # самое большое разрешение
await message.answer_photo(file_id, caption=f"Аватар #{i + 1}")
await message.reply(f"ID пользователя: {user_id}")
else:
await message.reply(f"У пользователя {user_id} нет аватара")
+54
View File
@@ -0,0 +1,54 @@
import importlib
import sys
from pathlib import Path
class AddonManager:
def __init__(self, dp, state, bot):
self.dp = dp
self.state = state
self.bot = bot
self.loaded = {}
def load(self, name: str):
"""Загрузить аддон по имени"""
if name in self.loaded:
return f"Аддон {name} уже загружен"
module_path = f"addons.{name}"
module = importlib.import_module(module_path)
if hasattr(module, "register"):
module.register(self.dp, self.state, self.bot)
self.loaded[name] = module
return f"✅ Аддон {name} подключен"
return f"⚠️ У аддона {name} нет функции register"
def unload(self, name: str):
"""Отключить аддон"""
module = self.loaded.get(name)
if not module:
return f"Аддон {name} не загружен"
if hasattr(module, "unregister"):
module.unregister(self.dp)
# Удаляем из sys.modules, чтобы можно было перезагрузить
sys.modules.pop(f"addons.{name}", None)
self.loaded.pop(name)
return f"❌ Аддон {name} отключен"
def reload(self, name: str):
"""Перезагрузить аддон"""
self.unload(name)
return self.load(name)
def list_addons(self):
"""Возвращает список (имя, состояние) для всех аддонов"""
addons_path = Path("addons")
result = []
for addon in addons_path.iterdir():
if addon.is_dir() and (addon / "__init__.py").exists():
name = addon.name
status = "✅ Загружен" if name in self.loaded else "❌ Выключен"
result.append((name, status))
return result
+9
View File
@@ -0,0 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+29
View File
@@ -0,0 +1,29 @@
from aiogram import Dispatcher, Bot
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, WebAppInfo
from models.state import BotState
from aiogram.filters import Command
from logging import getLogger
from datetime import datetime
logger = getLogger(__name__)
def get_day() -> int:
day = datetime.now().day
if day == 6:
return day + 1
return day
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("app"))
async def send_welcome(message: Message):
# Создаём инлайн-кнопку для открытия Web App
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Открыть мини-приложение", web_app=WebAppInfo(url="https://overfit-percussively-nicolas.ngrok-free.dev"))]
])
await message.answer(
f"Расписание на {get_day()} число месяца:",
reply_markup=keyboard
)
+9
View File
@@ -0,0 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+76
View File
@@ -0,0 +1,76 @@
from aiogram import Dispatcher, Bot
from aiogram.types import Message, ChatPermissions
from aiogram.filters import Command
from logging import getLogger
from datetime import timedelta
import asyncio
from utils.antispam import admin_required
from models.state import BotState
logger = getLogger(__name__)
# Максимальное время мьюта — 4 минуты (240 секунд)
MAX_MUTE_SECONDS = 4 * 60 # 240
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("mute"))
@admin_required(0)
async def mute_user(message: Message):
logger.info(f"Команда /mute от пользователя {message.from_user.id} в чате {message.chat.id}")
if not message.reply_to_message:
await message.reply("Эта команда должна использоваться в ответ на сообщение.")
return
command_text = message.text or ""
if command_text.lower().startswith("/mute@"):
command_prefix = command_text.split()[0]
else:
command_prefix = "/mute"
args_part = command_text[len(command_prefix):].strip()
args = args_part.split() if args_part else []
# Парсим время (по умолчанию 60 секунд)
mute_time = int(args[0]) if args and args[0].isdigit() else 60
reason = " ".join(args[1:]) if len(args) > 1 else "Без причины"
# Ограничиваем максимум 4 минутами
if mute_time > MAX_MUTE_SECONDS:
old_time = mute_time
mute_time = MAX_MUTE_SECONDS
reason += f" (время ограничено 4 минутами, было указано {old_time} сек)"
user_id = message.reply_to_message.from_user.id
chat_id = message.chat.id
try:
await bot.restrict_chat_member(
chat_id=chat_id,
user_id=user_id,
permissions=ChatPermissions(can_send_messages=False),
until_date=message.date + timedelta(seconds=mute_time)
)
await message.delete()
notification = await message.reply_to_message.reply(
f"⛔ Вас замьютили на {mute_time} секунд.\nПричина: {reason}"
)
asyncio.create_task(delete_after_delay(notification, delay=5))
except Exception as e:
logger.error(f"Ошибка при муте пользователя {user_id}: {e}")
await message.reply("Не удалось замьютить пользователя. Возможно, у меня недостаточно прав или пользователь — админ.")
async def delete_after_delay(message: Message, delay: int = 5):
await asyncio.sleep(delay)
try:
await message.delete()
except Exception as e:
logger.debug(f"Не удалось удалить уведомление о мьюте: {e}")
+9
View File
@@ -0,0 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+91
View File
@@ -0,0 +1,91 @@
from config import Config
from utils.antispam import admin_required
from aiogram import Dispatcher, Bot
from aiogram.types import Message
from models.state import BotState
from aiogram.filters import Command
from logging import getLogger
from storage.message_storage import save_message
logger = getLogger(__name__)
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("poll"))
@admin_required(5)
async def send_poll(message: Message):
for chat_id in Config.CHAT_IDS:
try:
poll_msg = await bot.send_poll(
chat_id=chat_id,
question="Кто опоздает?",
options=["Я", "Я очень сильно опоздаю", "Я пиздец как опоздаю", "Наверное", "я ДОЛБОЯЩЕР и я к 2 паре", "Не опоздаю"],
is_anonymous=False,
allows_multiple_answers=False,
)
await bot.pin_chat_message(
chat_id, poll_msg.message_id, disable_notification=False
)
# сохраняем сам опрос
save_message(poll_msg.chat.id, poll_msg.message_id)
logger.info(f"Опрос отправлен в чат {chat_id}")
except Exception as e:
logger.error(f"Ошибка при отправке в чат {chat_id}: {e}")
# @dp.poll_answer()
# async def handle_poll_answer(poll_answer: PollAnswer):
# user = poll_answer.user
# option_ids = poll_answer.option_ids
#
# # username или fallback на имя
# username = f"@{user.username}" if user.username else user.first_name
#
# # всегда пишем в первый чат из Config.CHAT_IDS
# # 6394047531
# #850906163
# STARAST = 6394047531
#
# if not option_ids:
# msg = await bot.send_message(
# chat_id=STARAST, text=f"{username} Отменил свой голос"
# )
# save_message(msg.chat.id, msg.message_id)
# elif option_ids and option_ids[0] == 0:
# msg = await bot.send_message(
# chat_id=STARAST, text=f"{username} опоздает"
# )
# save_message(msg.chat.id, msg.message_id)
# elif option_ids and option_ids[0] == 1:
# msg = await bot.send_message(
# chat_id=STARAST, text=f"{username} сильно опоздает"
# )
# save_message(msg.chat.id, msg.message_id)
# elif option_ids and option_ids[0] == 2:
# msg = await bot.send_message(
# chat_id=STARAST, text=f"{username} Пиздец опоздает"
# )
# save_message(msg.chat.id, msg.message_id)
# elif option_ids[0] == 3:
# msg = await bot.send_message(
# chat_id=STARAST, text=f"{username} возможно опоздает"
# )
# save_message(msg.chat.id, msg.message_id)
#
# elif option_ids[0] == 4:
# photo = FSInputFile("/Users/mac/myfirstprogramm/addons/poll/img.png")
# msg = await bot.send_photo(
# chat_id=STARAST, photo=photo, caption=f"{username} ДОЛБОЯЩЕР"
# )
# save_message(msg.chat.id, msg.message_id)
#
# elif option_ids[0] == 5:
# msg = await bot.send_message(
# chat_id=STARAST, text=f"{username} не опоздает"
# )
# save_message(msg.chat.id, msg.message_id)
# else:
# msg = await bot.send_message(
# chat_id=STARAST, text=f"{username} выбрал вариант {option_ids}"
# )
# save_message(msg.chat.id, msg.message_id)
+9
View File
@@ -0,0 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+89
View File
@@ -0,0 +1,89 @@
import asyncio
from random import randint, choice
from playwright.async_api import async_playwright
BASE_URL = "https://rule34.xxx"
URL = f"{BASE_URL}/index.php?page=post&s=list"
MAXIMUM = 999
# Хранилище тегов в памяти
TAGS = set()
def add_tags(tags: list[str]):
TAGS.update(tags)
def del_tags(tags: list[str]):
for t in tags:
TAGS.discard(t)
def get_tags_str() -> str:
return "+".join(TAGS) if TAGS else "(нет тегов)"
def get_tags() -> str:
return "+".join(TAGS) if TAGS else ""
async def get_url():
async with async_playwright() as p:
browser = await p.firefox.launch(headless=True)
page = await browser.new_page(user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64)")
while True:
try:
tags = get_tags()
pid = randint(1, MAXIMUM)
# Формируем корректный URL
if tags:
url_page = f"{URL}&tags={tags}&pid={pid}"
else:
url_page = f"{URL}&pid={pid}"
await page.goto(url_page, timeout=5000)
# Ищем блок с картинками
block = await page.query_selector(".image-list")
if not block:
continue
spans = await block.query_selector_all("span")
if not spans:
continue
link_el = await choice(spans).query_selector("a")
if not link_el:
continue
href = await link_el.get_attribute("href")
if not href:
continue
await page.goto(f"{BASE_URL}{href}", timeout=30000)
flexi = await page.query_selector(".flexi")
if not flexi:
continue
img_el = await flexi.query_selector("img")
if not img_el:
continue
url = await img_el.get_attribute("src")
if not url:
continue
await browser.close()
return url
except Exception as e:
print(f"[get_url ERROR] {e}")
continue
# Пример использования
async def main():
result = await get_url()
print("Result URL:", result)
if __name__ == "__main__":
asyncio.run(main())
+85
View File
@@ -0,0 +1,85 @@
from aiogram import Dispatcher, Bot
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto, Message, CallbackQuery
from aiogram.filters import Command
from aiogram.exceptions import TelegramBadRequest, TelegramRetryAfter
from aiogram import F
from logging import getLogger
from storage.message_storage import save_message
from .get_post import get_url, add_tags, del_tags, get_tags_str
import asyncio
logger = getLogger(__name__)
def get_keyboard():
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[InlineKeyboardButton(text="Следующее фото", callback_data="next")]
]
)
return keyboard
def register_handlers(dp: Dispatcher, state, bot: Bot):
@dp.message(Command("rule34"))
async def rule34(message: Message):
msg = await message.answer_photo(
photo= await get_url(),
caption="Вот фото 📷",
reply_markup=get_keyboard()
)
# сохраняем id сообщения, если нужно
save_message(msg.chat.id, msg.message_id)
@dp.callback_query(F.data == "next")
async def next_photo(callback: CallbackQuery):
for attempt in range(3): # максимум 3 попытки
try:
media = InputMediaPhoto(
media=await get_url(),
caption=f"Новое фото 🌄 (попытка {attempt + 1})"
)
await callback.message.edit_media(
media=media,
reply_markup=get_keyboard()
)
break # успех — выходим из цикла
except TelegramRetryAfter as e:
# Telegram сказал подождать e.retry_after секунд
logger.warning(f"Flood control: ждем {e.retry_after} сек")
await asyncio.sleep(e.retry_after)
continue # пробуем снова
except TelegramBadRequest as e:
logger.warning(f"Ошибка при загрузке фото (попытка {attempt + 1}): {e}")
continue
else:
# если все попытки неудачные
logger.error("Не удалось загрузить фото после 3 попыток")
# закрываем "часики" в любом случае
await callback.answer()
@dp.message(Command("addteg"))
async def cmd_addteg(message: Message):
# Разбиваем текст после команды на теги
parts = message.text.split()[1:]
if not parts:
await message.answer("❌ Укажи теги через пробел: /addteg tag1 tag2")
return
add_tags(parts)
await message.answer(f"✅ Добавлены теги: {', '.join(parts)}\nТекущие: {get_tags_str()}")
@dp.message(Command("delteg"))
async def cmd_delteg(message: Message):
parts = message.text.split()[1:]
if not parts:
await message.answer("❌ Укажи теги для удаления: /delteg tag1 tag2")
return
del_tags(parts)
await message.answer(f"🗑 Удалены теги: {', '.join(parts)}\nТекущие: {get_tags_str()}")
+9
View File
@@ -0,0 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+174
View File
@@ -0,0 +1,174 @@
from config import Config
import ssl
import certifi
from aiogram.types import BufferedInputFile
from utils.antispam import admin_required
from aiogram import Dispatcher, Bot
from aiogram.types import Message
from models.state import BotState
from aiogram.filters import Command
from logging import getLogger
import aiohttp
logger = getLogger(__name__)
API_URL = "http://127.0.0.1:7700/speak"
ssl_context = ssl.create_default_context(cafile=certifi.where())
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("vadmin"))
@admin_required(0)
async def vadmin(message: Message):
parts = message.text.split(maxsplit=1)
if len(parts) < 2:
await message.reply("❗ Укажи текст после /vadmin")
return
phrase = parts[1]
# URL с параметрами модели и голоса
url = f"{Config.DEEPGRAM_TTS_URL}model=aura-2-andromeda-en"
headers = {
"Authorization": f"Token {Config.DEEPGRAM_API_KEY}",
"Content-Type": "application/json",
}
# В JSON только text
payload = {"text": phrase}
async with aiohttp.ClientSession() as session:
async with session.post(
url, headers=headers, json=payload, ssl=ssl_context
) as resp:
if resp.status != 200:
error_text = await resp.text()
await message.reply(
f"Ошибка генерации аудио: {resp.status} {error_text}"
)
return
audio_bytes = await resp.read()
audio_file = BufferedInputFile(audio_bytes, filename="speech.wav")
# Рассылка в чаты
for chat_id in Config.CHAT_IDS:
try:
await bot.send_audio(chat_id, audio=audio_file, caption="🎙 Текст")
except Exception as e:
await message.reply(f"Не удалось отправить в {chat_id}: {e}")
await message.reply("✅ Озвучка разослана.")
@dp.message(Command("admin"))
@admin_required(0)
async def admin(message: Message):
raw_text = message.text or message.caption
if not raw_text and not (
message.photo or message.document or message.audio or message.video
):
await message.reply(
"❌ Укажи текст или прикрепи файл/медиа: /admin <сообщение>"
)
return
# Отрезаем саму команду (/admin)
args = raw_text.split(maxsplit=1) if raw_text else []
text_to_send = args[1] if len(args) > 1 else ""
for chat_id in Config.CHAT_IDS:
try:
if message.photo:
# Фото
photo = message.photo[-1].file_id
await bot.send_photo(chat_id, photo, caption=text_to_send)
elif message.document:
# Документ
await bot.send_document(
chat_id, message.document.file_id, caption=text_to_send
)
elif message.audio:
# Аудио (музыка)
await bot.send_audio(
chat_id, message.audio.file_id, caption=text_to_send
)
elif message.video:
# Видео
await bot.send_video(
chat_id, message.video.file_id, caption=text_to_send
)
else:
# Только текст
await bot.send_message(chat_id, text_to_send)
logger.info(f"Сообщение отправлено в чат {chat_id}")
except Exception as e:
logger.error(f"Ошибка при отправке в чат {chat_id}: {e}")
await message.answer("✅ Сообщение отправлено.")
@dp.message(Command("iadmin"))
@admin_required(0)
async def id_admin(message: Message):
raw_text = message.text or message.caption
if not raw_text and not (
message.photo or message.document or message.audio or message.video
):
await message.reply(
"❌ Укажи ID чата и текст или прикрепи файл/медиа: /iadmin <chat_id> <сообщение>"
)
return
# Отрезаем саму команду (/iadmin)
args = raw_text.split(maxsplit=2) if raw_text else []
if len(args) < 2:
await message.reply("❌ Укажи ID чата: /iadmin <chat_id> <сообщение>")
return
try:
chat_id = int(args[1]) # первый аргумент — ID чата
except ValueError:
await message.reply("❌ Неверный формат chat_id")
return
text_to_send = args[2] if len(args) > 2 else ""
try:
if message.photo:
# Фото
photo = message.photo[-1].file_id
await bot.send_photo(chat_id, photo, caption=text_to_send)
elif message.document:
# Документ
await bot.send_document(
chat_id, message.document.file_id, caption=text_to_send
)
elif message.audio:
# Аудио (музыка)
await bot.send_audio(
chat_id, message.audio.file_id, caption=text_to_send
)
elif message.video:
# Видео
await bot.send_video(
chat_id, message.video.file_id, caption=text_to_send
)
else:
# Только текст
await bot.send_message(chat_id, text_to_send, parse_mode="Markdown")
logger.info(f"Сообщение отправлено в чат {chat_id}")
await message.answer("✅ Сообщение отправлено.")
except Exception as e:
logger.error(f"Ошибка при отправке в чат {chat_id}: {e}")
await message.answer(f"❌ Ошибка при отправке: {e}")
+57
View File
@@ -0,0 +1,57 @@
import sqlite3
from datetime import datetime
DIR = "/Users/mac/myfirstprogramm/addons/todo/todo.db" # лучше указать полный путь
# создаём подключение
db = sqlite3.connect(DIR)
cursor = db.cursor()
# создаём таблицу, если её ещё нет
cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
who INTEGER NOT NULL,
task TEXT NOT NULL,
due_date TEXT NOT NULL
)
""")
db.commit()
# функция для добавления задачи
def add_task(who: int, task: str, due_date: str):
cursor.execute(
"INSERT INTO tasks (who, task, due_date) VALUES (?, ?, ?)",
(who, task, due_date)
)
db.commit()
# функция для получения всех задач пользователя
def get_tasks(who: int):
cursor.execute("SELECT id, task, due_date FROM tasks WHERE who = ?", (who,))
return cursor.fetchall()
# функция для удаления всех задач пользователя
def delete_tasks_by_who(who: int):
cursor.execute("DELETE FROM tasks WHERE who = ?", (who,))
db.commit()
# пример использования
if __name__ == "__main__":
# добавляем задачи
add_task(123456789, "Сделать бота", datetime.now().strftime("%d.%m.%Y %H:%M:%S"))
add_task(123456789, "Проверить базу", datetime.now().strftime("%d.%m.%Y %H:%M:%S"))
print("Задачи до удаления:")
tasks = get_tasks(123456789)
for t in tasks:
print(t)
# удаляем все задачи пользователя
delete_tasks_by_who(123456789)
print("\nЗадачи после удаления:")
tasks = get_tasks(123456789)
for t in tasks:
print(t)
+9
View File
@@ -0,0 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+68
View File
@@ -0,0 +1,68 @@
from aiogram import Dispatcher, Bot
from aiogram.types import Message
from aiogram.filters import Command
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.context import FSMContext
import logging
from .DB import get_tasks, add_task, delete_tasks_by_who
from storage.message_storage import save_message
from datetime import datetime
logger = logging.getLogger(__name__)
class TodoForm(StatesGroup):
waiting_for_task = State()
def register_handlers(dp: Dispatcher, state, bot: Bot):
# /todos — начать добавление задачи
@dp.message(Command("todos"))
async def cmd_todos(message: Message, state: FSMContext):
save_message(message.chat.id, message.message_id) # сохраняем сообщение
await message.answer("Введите текст задачи:")
await state.set_state(TodoForm.waiting_for_task)
# обработка текста задачи
@dp.message(TodoForm.waiting_for_task)
async def process_task(message: Message, state: FSMContext):
save_message(message.chat.id, message.message_id) # сохраняем сообщение
task_text = message.text
user_id = message.from_user.id
try:
created_at = datetime.now().strftime("%d.%m.%Y %H:%M:%S")
add_task(user_id, task_text, created_at)
reply = await message.answer(f"Задача сохранена ✅\n{task_text}")
save_message(reply.chat.id, reply.message_id) # сохраняем ответ бота
except Exception as e:
logger.error(f"Ошибка при добавлении задачи: {e}")
reply = await message.answer("Не удалось сохранить задачу ❌")
save_message(reply.chat.id, reply.message_id)
finally:
await state.clear()
# /todor — показать список задач
@dp.message(Command("todor"))
async def cmd_todor(message: Message):
save_message(message.chat.id, message.message_id) # сохраняем сообщение
user_id = message.from_user.id
try:
tasks = get_tasks(user_id)
if not tasks:
reply = await message.answer("У вас пока нет задач 📝")
save_message(reply.chat.id, reply.message_id)
return
text = "Ваши задачи:\n\n"
for tid, task, created_at in tasks:
text += f"{task} (создана {created_at})\n"
reply = await message.answer(text)
save_message(reply.chat.id, reply.message_id)
except Exception as e:
logger.error(f"Ошибка при чтении задач: {e}")
reply = await message.answer("Не удалось получить список задач ❌")
save_message(reply.chat.id, reply.message_id)
@dp.message(Command("todod"))
async def del_todo(message: Message):
delete_tasks_by_who(message.from_user.id)
+9
View File
@@ -0,0 +1,9 @@
def register(dp, state, bot):
from . import handlers
handlers.register_handlers(dp, state, bot)
def unregister(dp):
# Здесь можно удалить хендлеры, если нужно
dp.message_handlers.handlers.clear()
+117
View File
@@ -0,0 +1,117 @@
import asyncio
import aiosqlite
from datetime import datetime
from logging import getLogger
from aiogram import Dispatcher, Bot
from config import Config
from models.state import BotState
from utils.antispam import save_message
logger = getLogger(__name__)
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot) -> int:
async def init_db():
async with aiosqlite.connect(Config.DAYS_TO_DB_PATH) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS days_to_new_year (
user_id INTEGER PRIMARY KEY,
days INTEGER NOT NULL,
timestamp TEXT NOT NULL
)
""")
await db.commit()
logger.info("База данных инициализирована")
async def save_days_to_db(user_id: int, days: int):
logger.debug(f"Сохраняем user_id={user_id}, days={days}")
async with aiosqlite.connect(Config.DAYS_TO_DB_PATH) as db:
await db.execute("""
INSERT OR REPLACE INTO days_to_new_year (user_id, days, timestamp)
VALUES (?, ?, ?)
""", (int(user_id), int(days), datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
await db.commit()
logger.info(f"Запись сохранена: user_id={user_id}, days={days}")
async def get_last_days(user_id: int) -> int | None:
async with aiosqlite.connect(Config.DAYS_TO_DB_PATH) as db:
async with db.execute(
"SELECT days FROM days_to_new_year WHERE user_id = ?", (int(user_id),)
) as cursor:
row = await cursor.fetchone()
return row[0] if row else None
async def days_to_new_years() -> int:
now = datetime.now()
new_year = datetime(now.year + 1, 1, 1)
delta = (new_year - now).days
logger.debug(f"До Нового года осталось {delta} дней")
return delta
async def days_to_summer() -> int:
"""Считает дни до 1 июня текущего года (или следующего, если уже лето прошло)."""
now = datetime.now()
summer = datetime(now.year, 6, 1)
if now >= summer:
summer = datetime(now.year + 1, 6, 1)
delta = (summer - now).days
logger.debug(f"До лета осталось {delta} дней")
return delta
async def days_to_session() -> int:
"""Считает дни до 1 июня текущего года (или следующего, если уже лето прошло)."""
now = datetime.now()
summer = datetime(2026, 7, 6)
if now >= summer:
logger.warning("days_to_session")
delta = (summer - now).days
logger.debug(f"До Сессии осталось {delta} дней")
return delta
async def send_days_to_new_years(user_id: int):
days_ny = await days_to_new_years()
days_summer = await days_to_summer()
days_session = await days_to_session()
last_days = await get_last_days(user_id)
if last_days == days_ny:
logger.info(f"user_id={user_id}: запись уже есть ({days_ny} дней), пропускаем")
return
await save_days_to_db(user_id, days_ny)
events = [
("🌞 До лета", days_summer),
("📚 До конца сессии", days_session),
("🎄 До Нового года", days_ny),
]
# сортировка по числу дней (от меньшего к большему)
events_sorted = sorted(events, key=lambda x: x[1])
message_text = "\n".join([f"{emoji} осталось {days} дней!" for emoji, days in events_sorted])
for chat_id in Config.CHAT_IDS:
try:
logger.info(f"Отправляем сообщение в чат {chat_id} для user_id={user_id}")
await bot.send_message(chat_id, message_text)
except Exception as e:
logger.error(f"Ошибка при отправке в чат {chat_id}: {e}")
async def periodic_task():
await init_db()
while True:
logger.info("Запуск цикла periodic_task")
for uid in Config.CHAT_IDS:
try:
msg = await send_days_to_new_years(int(uid))
if msg:
save_message(msg.chat.id, msg.message_id)
except Exception as e:
logger.exception(f"Ошибка при обработке uid={uid}: {e}")
logger.info("Завершён цикл periodic_task, спим 6 часов")
await asyncio.sleep(21600) # каждые 6 часов
asyncio.create_task(periodic_task())
return 0
View File
+40
View File
@@ -0,0 +1,40 @@
from aiogram import Bot, Dispatcher
from config import Config
from models.state import BotState
from addons.manager import AddonManager
class TelegramBot:
def __init__(self):
self.bot = Bot(token=Config.API_TOKEN)
self.dp = Dispatcher()
self.state = BotState()
self.addons = AddonManager(self.dp, self.state, self.bot)
def setup_handlers(self):
"""Регистрация всех обработчиков"""
from handlers import admin, schedule # , media, common
# Регистрируем обработчики из разных модулей
admin.register_handlers(self.dp, self.state, self.bot)
# schedule.register_handlers(self.dp, self.state)
# media.register_handlers(self.dp, self.state, self.bot)
# common.register_handlers(self.dp, self.state, self.bot)
# add addons
self.addons.load("download_mp3_to_youtube")
self.addons.load("id")
self.addons.load("send_message")
self.addons.load("poll")
self.addons.load("hello")
# self.addons.load("draw")
self.addons.load("gpt")
self.addons.load("rule34")
# self.addons.load("todo")
self.addons.load("miniapps")
self.addons.load("x_days_to")
async def start(self):
"""Запуск бота"""
self.setup_handlers()
await self.dp.start_polling(self.bot)
View File
+46
View File
@@ -0,0 +1,46 @@
import os
from dotenv import load_dotenv
from typing import Dict
class Config:
# Загружаем .env
load_dotenv()
# API
API_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
Token = os.getenv("ACCESS_TOKEN")
DEEPGRAM_API_KEY = os.getenv("DEEPGRAM_API_KEY")
DEEPGRAM_AGENT_URL = "https://api.deepgram.com/v1/agent/think"
DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak?"
# 5575756416
BAN = [1]
if not API_TOKEN:
raise ValueError("❌ TELEGRAM_BOT_TOKEN не найден в переменных окружения!")
# Admins (user_id: уровень)
ADMINS: Dict[int, int] = {850906163: 0, 6394047531: 4, 1345058877: 3}
Names: Dict[int, str] = {850906163: "Ляпич", 6394047531: "Прокопович", 1345058877: "Сом"}
# Chats
CHAT_IDS = [-1003038389942]
GROUP_CHATS: Dict[str, int] = {
"30тс": -1003038389942,
"5Cа": 7571257031,
}
# Settings
ANTISPAM_DELAY = 20
WATCHER_BASE_DELAY = 30
# Пути
LOG_FILE = "storage/log/bot.log"
DAYS_TO_DB_PATH = "addons/x_days_to/days_to_new_year.db"
if __name__ == "__main__":
a = Config()
print(a.API_TOKEN)
print(a.ADMINS)
+108
View File
@@ -0,0 +1,108 @@
from aiogram import types, Dispatcher, Bot
from aiogram.types import Message
from aiogram.filters import Command
from config import Config
from models.state import BotState
from utils.antispam import admin_required, saving
from services.watcher_service import WatcherService
from storage.message_storage import load_messages, save_message, clear_messages
from logging import getLogger
from utils.analytics import create_statistics_text
logger = getLogger(__name__)
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("log"))
@saving
@admin_required(3)
async def send_log(message: Message):
try:
log_file = types.FSInputFile(Config.LOG_FILE)
await message.answer_document(log_file, caption="📑 Логи бота")
except FileNotFoundError:
await message.answer("Файл логов пока не создан.")
@dp.message(Command("status"))
@saving
@admin_required(3)
async def send_status(message: Message):
from utils.analytics import analyze_bot_logs
from utils.mac_metrics import get_macbook_battery_level, get_process_usage
try:
stats = analyze_bot_logs(Config.LOG_FILE)
batt = await get_macbook_battery_level()
usage = await get_process_usage()
status_text = (
"🤖 СТАТУС БОТА\n"
"══════════════\n"
f"✅ Uptime: {stats.get('uptime_percentage', 0)}%\n"
f"⏱️ Слежка расписания: {'ВКЛ' if state.watcher_work else 'ВЫКЛ'}\n"
f"🔋 Уровень заряда: {batt}%\n"
f"🖥️ Загрузка цп: {usage['cpu_percent']}\n"
f"🧠 Загрузка оперативки: {usage['rss_mb']:.2f} MB\n"
)
await message.answer(status_text)
except Exception as e:
await message.answer(f"❌ Ошибка при проверке статуса: {str(e)}")
@dp.message(Command("analytics"))
@saving
@admin_required(1)
async def stat(message: Message):
from utils.analytics import analyze_bot_logs
stats = analyze_bot_logs(Config.LOG_FILE)
await message.answer(
create_statistics_text(stats), reply_to_message_id=message.message_id
)
@dp.message(Command("del"))
@admin_required(1)
async def delete_all_messages(message: Message):
messages = load_messages()
if not messages:
sent = await message.answer(
"📭 Нет сохранённых сообщений для удаления.",
reply_to_message_id=message.message_id,
)
save_message(sent.chat.id, sent.message_id)
return
deleted = 0
for chat_id, msg_id in messages:
try:
await bot.delete_message(chat_id, msg_id)
deleted += 1
except Exception as e:
logger.warning(f"Не удалось удалить {msg_id} в чате {chat_id}: {e}")
clear_messages()
sent = await message.answer(
f"✅ Удалено {deleted} сообщений (включая /rasp).",
reply_to_message_id=message.message_id,
)
save_message(sent.chat.id, sent.message_id)
@dp.message(Command("power"))
@saving
@admin_required(2)
async def power_control(message: types.Message):
args = message.text.split()
if len(args) < 2:
status = "включена" if state.watcher_work else "выключена"
await message.answer(f"⏱️ Слежка расписания: {status}")
return
command = args[1].lower()
watcher_service = WatcherService(state, bot)
if command == "on" and not state.watcher_work:
await watcher_service.start()
await message.answer("✅ Слежка расписания включена")
elif command == "off" and state.watcher_work:
await watcher_service.stop()
await message.answer("❌ Слежка расписания выключена")
else:
await message.answer("❌ Неверная команда")
+101
View File
@@ -0,0 +1,101 @@
from aiogram import types, Dispatcher
from aiogram.filters import Command
from models.state import BotState
from services.schedule_service import ScheduleService
from utils.antispam import is_chat_spam, saving
from storage.message_storage import save_message
from storage.users_storage import set_group, get_group
from aiogram.utils.keyboard import InlineKeyboardBuilder
VALID_GROUPS = [
"603Т", "33мд", "32мд", "31тс", "30тс", "29то", "28то", "27д", "26д", "25тм", "24тм",
"23д", "22мд", "21мд", "20тс", "19тс", "18то", "17то", "16д", "15д", "14тм", "13тм",
"12д", "11мд", "10мд", "8тс", "7то", "7Ст", "6то", "6Сб", "", "5Cа", "4Сб", "3тм",
"3Са", "2тм", "2Cб", "1Са", "600Р", "601Р", "602д"
]
def register_handlers(dp: Dispatcher, state: BotState):
@dp.message(Command("rasp"))
@saving
async def send_schedule(message: types.Message):
"""Отправка расписания (текст)"""
if is_chat_spam(message.chat.id, state):
await message.answer("НЕ СПАМЬТЕ!!!")
return
args = message.text.split(maxsplit=2)
# Определяем группу
if len(args) > 1:
group = args[1].strip()
else:
group = get_group(message.from_user.id)
# Определяем смещение по дню
day_offset = int(args[2]) if len(args) > 2 and args[2].isdigit() else 0
schedule_service = ScheduleService()
text, url, day, month = await schedule_service.get_schedule(group, day_offset)
msg = await message.answer(text, parse_mode="Markdownv2")
save_message(msg.chat.id, msg.message_id)
@dp.message(Command("prasp"))
@saving
async def send_pschedule(message: types.Message):
"""Отправка расписания"""
if is_chat_spam(message.chat.id, state):
await message.answer("НЕ СПАМЬТЕ!!!")
return
args = message.text.split(maxsplit=2)
if len(args) > 1:
group = args[1].strip()
else:
group = get_group(message.from_user.id)
day_offset = int(args[2]) if len(args) > 2 and args[2].isdigit() else 0
schedule_service = ScheduleService()
clip_png, url, day, mouth = await schedule_service.get_pschedule(
group, day_offset
)
if clip_png:
save_message(message.chat.id, message.message_id)
msg = await message.answer_photo(
types.BufferedInputFile(clip_png, filename=f"{group}.png"),
caption=f"Расписание для {group} на {day}.{mouth:02d}",
)
save_message(message.chat.id, msg.message_id)
state.file_id_cache[group.lower()] = msg.photo[-1].file_id
else:
await message.answer(f"Не удалось найти расписание для {group}")
@dp.message(Command("set"))
@saving
async def set(message: types.Message):
# создаём клавиатуру
builder = InlineKeyboardBuilder()
for group in VALID_GROUPS:
builder.button(text=group, callback_data=f"set_group:{group}")
builder.adjust(5) # количество кнопок в строке
msg = await message.answer(
"Выбери группу из списка:",
reply_markup=builder.as_markup()
)
save_message(msg.chat.id, msg.message_id)
@dp.callback_query(lambda c: c.data.startswith("set_group:"))
async def process_group_choice(callback: types.CallbackQuery):
group = callback.data.split(":")[1]
set_group(callback.from_user.id, group)
# редактируем сообщение: убираем клавиатуру
await callback.message.edit_reply_markup(reply_markup=None)
msg = await callback.message.answer(f"✅ Группа установлена: {group}")
save_message(msg.chat.id, msg.message_id)
await callback.answer()
+31
View File
@@ -0,0 +1,31 @@
from asyncio import run
from logging import basicConfig, FileHandler, StreamHandler, INFO, getLogger
from bot.core import TelegramBot
from config import Config
# Настройка логирования
basicConfig(
level=INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[FileHandler(Config.LOG_FILE, encoding="utf-8"), StreamHandler()],
force=True,
)
logger = getLogger(__name__)
async def main():
"""Основная функция запуска"""
try:
bot = TelegramBot()
logger.info("Бот запускается...")
await bot.start()
except Exception as e:
logger.error(f"Ошибка при запуске бота: {e}")
finally:
logger.info("Бот остановлен")
if __name__ == "__main__":
run(main())
+8
View File
@@ -0,0 +1,8 @@
{
"ca7463d0df0fc143": "captainerisnebula-12b-chimera-v1.1-iq-imatrix",
"b6886fc896f68593": "liquid/lfm2.5-1.2b",
"43efb8b5d51c38d7": "tiefighter-holodeck-holomax-mythomax-f1-v1-compos-20b",
"4f9b128b9b567451": "text-embedding-nomic-embed-text-v1.5",
"1ce7277325bb3050": "google/gemma-3n-e4b",
"390b8888e1038e17": "google/gemma-3-12b"
}
+28
View File
@@ -0,0 +1,28 @@
from dataclasses import dataclass
from typing import Dict, Optional
from asyncio import Task
@dataclass
class BotState:
"""Состояние бота"""
last_chat_time: Dict[int, str] = None
last_pinned: Dict[str, int] = None
watcher_work: bool = False
watcher_task: Optional[Task] = None
file_id_cache: Dict[str, str] = None
last_day: Dict[str, int] = None
last_clip_hash: Dict[str, str] = None
def __post_init__(self):
if self.last_chat_time is None:
self.last_chat_time = {}
if self.last_pinned is None:
self.last_pinned = {}
if self.file_id_cache is None:
self.file_id_cache = {}
if self.last_day is None:
self.last_day = {}
if self.last_clip_hash is None:
self.last_clip_hash = {}
+84
View File
@@ -0,0 +1,84 @@
aenum==3.1.16
aiofiles==24.1.0
aiogram==3.22.0
aiohappyeyeballs==2.6.1
aiohttp==3.12.15
aiosignal==1.4.0
aiosqlite==0.21.0
annotated-types==0.7.0
anyio==4.11.0
attrs==25.3.0
beautifulsoup4==4.14.2
bs4==0.0.2
certifi==2025.8.3
cffi==2.0.0
charset-normalizer==3.4.3
click==8.3.0
cryptography==46.0.2
dataclasses-json==0.6.7
deepgram-sdk==3.11.0
deprecation==2.1.0
distro==1.9.0
dotenv==0.9.9
dropbox==12.0.2
filelock==3.20.0
frozenlist==1.7.0
fsspec==2025.9.0
greenlet==3.2.4
h11==0.16.0
hf-xet==1.1.10
httpcore==1.0.9
httpx==0.28.1
huggingface-hub==0.35.3
idna==3.10
Jinja2==3.1.6
jiter==0.11.1
joblib==1.5.2
lxml==6.0.2
magic-filter==1.0.12
MarkupSafe==3.0.3
marshmallow==3.26.1
mpmath==1.3.0
msal==1.34.0
multidict==6.6.4
mutagen==1.47.0
mypy_extensions==1.1.0
networkx==3.5
numpy==2.3.4
ollama==0.6.0
openai==2.5.0
packaging==25.0
pillow==12.0.0
playwright==1.55.0
ply==3.11
propcache==0.3.2
pycparser==2.23
pydantic==2.11.10
pydantic_core==2.33.2
pyee==13.0.0
PyJWT==2.10.1
python-dotenv==1.1.1
PyYAML==6.0.3
regex==2025.9.18
requests==2.32.5
ruff==0.14.6
sacremoses==0.1.1
safetensors==0.6.2
sentencepiece==0.2.1
setuptools==80.9.0
six==1.17.0
sniffio==1.3.1
soupsieve==2.8
stone==3.3.1
sympy==1.14.0
tokenizers==0.22.1
torch==2.9.0
tqdm==4.67.1
transformers==4.57.1
typing-inspect==0.9.0
typing-inspection==0.4.2
typing_extensions==4.15.0
urllib3==2.5.0
websockets==15.0.1
yarl==1.20.1
yt-dlp==2025.10.22
+177
View File
@@ -0,0 +1,177 @@
from datetime import datetime, timedelta
from typing import Optional, Tuple
from playwright.async_api import async_playwright
import logging
import aiohttp
from bs4 import BeautifulSoup
import ssl
import certifi
import re
logger = logging.getLogger(__name__)
BOUNDARY = r"[^0-9A-Za-zА-Яа-яЁё]"
class ScheduleService:
def __init__(self):
self.base_url = (
"https://college.by/accounts/raspis/{mouth:02d}/{day:02d}-PODNAM.htm"
)
def _make_url(self, day: int = 0) -> Tuple[str, int, int]:
"""Генерация URL для расписания"""
d = datetime.now()
if day == 0:
if d.hour >= 12:
d += timedelta(days=1)
if d.weekday() == 6:
d += timedelta(days=1)
return self.base_url.format(day=d.day, mouth=d.month), d.day, d.month
else:
return (
self.base_url.format(day=int(day), mouth=d.month),
int(day),
int(d.month),
)
import re
async def get_schedule(
self, group: str, day_offset: int = 0
) -> Tuple[str, str, int, int]:
"""Получение текста расписания (аналог Rust parse_schedule)"""
url, day, month = self._make_url(day_offset)
ssl_context = ssl.create_default_context(cafile=certifi.where())
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connector = aiohttp.TCPConnector(ssl=ssl_context)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
async with aiohttp.ClientSession(
connector=connector, headers=headers
) as session:
async with session.get(url) as resp:
raw_bytes = await resp.read()
decoded = raw_bytes.decode("cp1251", errors="ignore")
document = BeautifulSoup(decoded, "html.parser")
elements = document.select("p.MsoPlainText b")
found_group = False
schedule_lines = []
# регулярка: ищем точное совпадение группы как отдельного слова
group_pattern = re.compile(rf"\b{re.escape(group)}\b", re.IGNORECASE)
for el in elements:
text = el.get_text(strip=True)
if not found_group:
if group_pattern.search(text):
found_group = True
schedule_lines.append(text)
else:
if "-----" in text or "+----" in text:
break
schedule_lines.append(text)
if not schedule_lines:
result = f"Расписание для группы {group} на {day} число не найдено"
else:
result = f"📅 Расписание для {day} числа:\n```\n"
for line in schedule_lines:
formatted = line.replace("¦", "").replace(" ", " ").strip()
if formatted:
result += f"{formatted}\n"
result += "```"
return result, url, day, month
@staticmethod
def exact_group_regex(group: str) -> re.Pattern:
# ищем как отдельный токен: граница слева/справа или начало/конец
pattern = rf"(^|{BOUNDARY}){re.escape(group)}({BOUNDARY}|$)"
return re.compile(pattern)
async def get_pschedule(
self, group: str, day_offset: int = 0
) -> Tuple[Optional[bytes], str, int, int]:
url, day, month = self._make_url(day_offset)
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(viewport={"width": 400, "height": 3000})
page = await context.new_page()
try:
response = await page.goto(url, wait_until="networkidle", timeout=30000)
if not response or response.status != 200:
logger.warning(f"Ошибка загрузки страницы: {url}")
return None, url, day, month
# 1) сначала пытаемся по более точному селектору (как в HTML-парсере)
candidates = page.locator("p.MsoPlainText b")
count = await candidates.count()
regex = self.exact_group_regex(group)
target_handle = None
for i in range(count):
el = candidates.nth(i)
text = (await el.inner_text()).strip()
if regex.search(text):
# нашли b с нужной группой — возьмём родительский p для удобного скрина
parent_p = await el.locator("xpath=ancestor::p[1]").element_handle()
target_handle = parent_p or await el.element_handle()
break
# 2) если не нашли в p.MsoPlainText b, попробуем просто p b или p
if not target_handle:
candidates = page.locator("p b")
count = await candidates.count()
for i in range(count):
el = candidates.nth(i)
text = (await el.inner_text()).strip()
if regex.search(text):
parent_p = await el.locator("xpath=ancestor::p[1]").element_handle()
target_handle = parent_p or await el.element_handle()
break
if not target_handle:
# последний шанс: любые <p>
candidates = page.locator("p")
count = await candidates.count()
for i in range(count):
el = candidates.nth(i)
text = (await el.inner_text()).strip()
if regex.search(text):
target_handle = await el.element_handle()
break
if target_handle:
# скроллим и получаем box
await target_handle.scroll_into_view_if_needed()
box = await target_handle.bounding_box()
if box:
clip_rect = {
"x": float(max(box["x"], 0)),
"y": float(max(box["y"], 0)),
"width": float(box["width"] + 150),
"height": float(box["height"] + 100),
}
img = await page.screenshot(clip=clip_rect)
return img, url, day, month
except Exception as e:
logger.error(f"Ошибка при получении расписания: {e}")
finally:
await context.close()
await browser.close()
return None, url, day, month
+123
View File
@@ -0,0 +1,123 @@
import asyncio
from datetime import datetime, timedelta
from random import randint
from aiogram import Bot, types
from models.state import BotState
from config import Config
from services.schedule_service import ScheduleService
from logging import getLogger
logger = getLogger(__name__)
class WatcherService:
def __init__(self, state: BotState, bot: Bot):
self.state = state
self.bot = bot
self.schedule_service = ScheduleService()
async def start(self):
"""Запуск слежки"""
if self.state.watcher_work:
return
self.state.watcher_work = True
self.state.watcher_task = asyncio.create_task(self._watcher_loop())
logger.info("Watcher запущен")
async def stop(self):
"""Остановка слежки"""
if not self.state.watcher_work:
return
self.state.watcher_work = False
if self.state.watcher_task:
self.state.watcher_task.cancel()
try:
await self.state.watcher_task
except asyncio.CancelledError:
pass
logger.info("Watcher остановлен")
async def _watcher_loop(self):
"""Основной цикл слежки"""
while self.state.watcher_work:
try:
find = await self._check_all_groups()
if find:
# ничего не нашли → ждём
delay = randint(
Config.WATCHER_BASE_DELAY, Config.WATCHER_BASE_DELAY + 30
)
logger.info(f"Следующая проверка через {delay}")
await asyncio.sleep(delay)
else:
# нашли → останавливаемся
logger.info("Расписание найдено, останавливаем watcher")
self.state.watcher_work = False
break
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Ошибка в watcher_loop: {e}")
await asyncio.sleep(60)
@staticmethod
def _get_target_day() -> datetime:
"""Получение целевого дня"""
now = datetime.now()
target = now + timedelta(days=1)
if target.weekday() == 6:
target += timedelta(days=1)
return target
async def _check_all_groups(self) -> bool:
"""
Возвращает True, если НИ в одной группе не найдено расписание.
Возвращает False, если хотя бы в одной группе найдено расписание.
"""
day = self._get_target_day()
found_any = False
for group, chat_id in Config.GROUP_CHATS.items():
logger.info(
f"Проверяем расписание для {group} на {day.strftime('%d.%m.%Y')}"
)
found = await self._check_group_schedule(group, chat_id, day.day)
if found:
found_any = True
return not found_any # <-- вот так правильно
async def _check_group_schedule(self, group: str, chat_id: int, day: int) -> bool:
text, url, data_day, data_month = await self.schedule_service.get_schedule(
group, day
)
if text and "не найдено" not in text.lower():
msg = await self.bot.send_message(
chat_id,
f"Авто-расписание для {group} на {data_day:02d}.{data_month:02d}\n\n{text}",
parse_mode="Markdown",
)
await self.bot.pin_chat_message(
chat_id, msg.message_id, disable_notification=False
)
return True
else:
png, url, data_day, data_month = await self.schedule_service.get_pschedule(
group, day
)
if png:
await self.bot.send_photo(
chat_id,
types.BufferedInputFile(png, filename=f"{group}.png"),
caption=f"АВАРИЙНЫЙ РЕЖИМ\n\nАвто-расписание для {group} на {data_day:02d}.{data_month:02d}\n\nНайдено с ошибкой",
)
return True
return False
# clip_hash = hashlib.md5(clip_png).hexdigest()
# Логика проверки изменений и отправки сообщений
# ... (ваша существующая логика)
+36
View File
@@ -0,0 +1,36 @@
import sqlite3
DIR = "/Users/mac/myfirstprogramm/storage/message.db"
if __name__ == "__main__":
db = sqlite3.connect(DIR)
cursor = db.cursor()
# создаём таблицы (лучше добавить IF NOT EXISTS)
cursor.execute("""CREATE TABLE IF NOT EXISTS message (
chat_id INTEGER,
message_id INTEGER
)""")
cursor.execute("""CREATE TABLE IF NOT EXISTS users (
user_id INTEGER,
user_group TEXT
)""")
# добавим тестовые данные
cursor.execute("INSERT INTO message VALUES (?, ?)", (1, 100))
cursor.execute("INSERT INTO users VALUES (?, ?)", (42, 'admin'))
db.commit()
# читаем данные
cursor.execute("SELECT * FROM message")
print("Message:", cursor.fetchall())
cursor.execute("SELECT * FROM users")
print("Users:", cursor.fetchall())
db.close()
def get_db():
return sqlite3.connect(DIR)
+29
View File
@@ -0,0 +1,29 @@
from .DB import get_db
def save_message(chat_id: int, message_id: int):
db = get_db()
cur = db.cursor()
cur.execute("INSERT INTO message VALUES (?, ?)", (int(chat_id), int(message_id)))
db.commit()
cur.close()
db.close()
def load_messages():
db = get_db()
cur = db.cursor()
cur.execute("SELECT * FROM message")
rows = cur.fetchall()
cur.close()
db.close()
return rows
def clear_messages():
db = get_db()
cur = db.cursor()
cur.execute("DELETE FROM message")
db.commit()
cur.close()
db.close()
+45
View File
@@ -0,0 +1,45 @@
from .DB import get_db
def save_user(user_id: int, group: str = "30тс"):
db = get_db()
cur = db.cursor()
cur.execute("INSERT INTO users (user_id, user_group) VALUES (?, ?)", (user_id, group))
db.commit()
cur.close()
db.close()
def set_group(user_id: int, group: str = "30тс"):
db = get_db()
cur = db.cursor()
# проверяем, есть ли пользователь
cur.execute("SELECT 1 FROM users WHERE user_id = ?", (user_id,))
exists = cur.fetchone()
if exists:
# если есть — обновляем группу
cur.execute("UPDATE users SET user_group = ? WHERE user_id = ?", (group, user_id))
else:
# если нет — регистрируем нового пользователя
cur.execute("INSERT INTO users (user_id, user_group) VALUES (?, ?)", (user_id, group))
db.commit()
cur.close()
db.close()
def get_group(user_id: int, default: str = "30тс") -> str:
db = get_db()
cur = db.cursor()
cur.execute("SELECT user_group FROM users WHERE user_id = ?", (user_id,))
row = cur.fetchone()
if row:
group = row[0]
else:
# если пользователя нет — регистрируем с дефолтной группой
cur.execute("INSERT INTO users (user_id, user_group) VALUES (?, ?)", (user_id, default))
db.commit()
group = default
cur.close()
db.close()
return group
+289
View File
@@ -0,0 +1,289 @@
from collections import Counter
import re
from datetime import datetime
import statistics
import tempfile
import json
def analyze_bot_logs(log_file_path="bot.log"):
"""
Анализирует логи бота и создает детальную статистику
"""
try:
with open(log_file_path, "r", encoding="utf-8") as log:
lines = log.readlines()
except FileNotFoundError:
return {"error": "Лог файл не найден"}
except Exception as e:
return {"error": f"Ошибка чтения файла: {str(e)}"}
if not lines:
return {"error": "Лог файл пуст"}
# Основные счетчики
stats = {
"total_lines": len(lines),
"time_period": {},
"log_levels": Counter(),
"activities": Counter(),
"errors": Counter(),
"warnings": Counter(),
"user_commands": Counter(),
"groups": Counter(),
"restarts": 0,
"schedule_checks": 0,
"schedule_changes": 0,
"schedule_failures": 0,
"network_errors": 0,
"browser_errors": 0,
"telegram_errors": Counter(),
"performance": {
"avg_handling_time": 0,
"fastest_handling": float("inf"),
"slowest_handling": 0,
"handling_count": 0,
},
}
# Временные метрики
start_time = None
end_time = None
handling_times = []
# Регулярные выражения для парсинга
timestamp_pattern = r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
log_level_pattern = r"\[(INFO|WARNING|ERROR)\]"
handling_time_pattern = r"Duration (\d+) ms"
command_pattern = r"Команда /rasp от ([\d-]+), группа=([^,]+), дата=(\d+)"
schedule_pattern = r"Проверяем расписание для ([^ ]+) на (\d{2}\.\d{2}\.\d{4})"
for line in lines:
# Извлекаем временную метку
timestamp_match = re.search(timestamp_pattern, line)
if timestamp_match:
timestamp = timestamp_match.group(1)
if not start_time:
start_time = timestamp
end_time = timestamp
# Уровень логирования
level_match = re.search(log_level_pattern, line)
if level_match:
level = level_match.group(1)
stats["log_levels"][level] += 1
# Время обработки сообщений
time_match = re.search(handling_time_pattern, line)
if time_match and "is handled" in line:
handling_time = int(time_match.group(1))
handling_times.append(handling_time)
stats["performance"]["handling_count"] += 1
stats["performance"]["slowest_handling"] = max(
stats["performance"]["slowest_handling"], handling_time
)
stats["performance"]["fastest_handling"] = min(
stats["performance"]["fastest_handling"], handling_time
)
# Команды пользователей
cmd_match = re.search(command_pattern, line)
if cmd_match:
user_id, group, date_offset = cmd_match.groups()
stats["user_commands"][group] += 1
stats["groups"][group] += 1
# Проверки расписания
if "Проверяем расписание" in line:
stats["schedule_checks"] += 1
sched_match = re.search(schedule_pattern, line)
if sched_match:
group, date = sched_match.groups()
stats["groups"][group] += 1
# Изменения расписания
if "Изменения найдены" in line:
stats["schedule_changes"] += 1
# Ошибки расписания
if "Не удалось получить расписание" in line:
stats["schedule_failures"] += 1
# Перезапуски бота
if "Бот запускается" in line:
stats["restarts"] += 1
# Сетевые ошибки
if "Failed to fetch updates" in line:
stats["network_errors"] += 1
# Ошибки браузера
if "TargetClosedError" in line or "BrowserContext.close" in line:
stats["browser_errors"] += 1
# Ошибки Telegram API
if "Telegram server says" in line:
error_msg = line.split("Telegram server says - ")[-1].split(":")[0]
stats["telegram_errors"][error_msg] += 1
# Сбор ошибок и предупреждений
if "[ERROR]" in line:
error_msg = line.split("[ERROR]")[-1].strip()
stats["errors"][error_msg[:100]] += 1
if "[WARNING]" in line:
warning_msg = line.split("[WARNING]")[-1].strip()
stats["warnings"][warning_msg[:100]] += 1
# Расчет средней скорости обработки
if handling_times:
stats["performance"]["avg_handling_time"] = sum(handling_times) / len(
handling_times
)
# Период работы
if start_time and end_time:
stats["time_period"] = {
"start": start_time,
"end": end_time,
"duration_hours": calculate_duration_hours(start_time, end_time),
}
# Дополнительные метрики
stats["success_rate"] = calculate_success_rate(stats)
stats["uptime_percentage"] = calculate_uptime_percentage(stats)
stats["schedule_success_rate"] = calculate_schedule_success_rate(stats)
return stats
def calculate_duration_hours(start_str: str, end_str: str) -> float:
"""Вычисляет продолжительность в часах"""
fmt = "%Y-%m-%d %H:%M:%S"
try:
start = datetime.strptime(start_str, fmt)
end = datetime.strptime(end_str, fmt)
return round((end - start).total_seconds() / 3600, 2)
except (ValueError, TypeError):
return 0.0
def calculate_success_rate(stats):
"""Рассчитывает процент успешных операций"""
total_operations = stats["performance"]["handling_count"] + sum(
stats["errors"].values()
)
if total_operations == 0:
return 0
success_rate = (stats["performance"]["handling_count"] / total_operations) * 100
return round(success_rate, 2)
def calculate_uptime_percentage(stats):
"""Рассчитывает процент времени работы"""
if stats["time_period"].get("duration_hours", 0) == 0:
return 0
# Предполагаем, что каждый перезапуск занимает ~10 секунд
restart_downtime = stats["restarts"] * 10 / 3600
total_hours = stats["time_period"]["duration_hours"]
uptime_hours = total_hours - restart_downtime
uptime_percentage = (uptime_hours / total_hours) * 100
return round(uptime_percentage, 2)
def calculate_schedule_success_rate(stats):
"""Рассчитывает процент успешных проверок расписания"""
total_checks = stats["schedule_checks"]
if total_checks == 0:
return 0
successful_checks = total_checks - stats["schedule_failures"]
success_rate = (successful_checks / total_checks) * 100
return round(success_rate, 2)
def create_statistics_text(stats):
"""Создает текстовый отчет статистики с расширенными метриками"""
if "error" in stats:
return f"❌ Ошибка анализа логов: {stats['error']}"
text = "📊 СТАТИСТИКА РАБОТЫ БОТА\n"
text += "=" * 40 + "\n\n"
# Основная информация
text += "📈 ОБЩАЯ ИНФОРМАЦИЯ:\n"
text += f"• Период: {stats['time_period'].get('start', 'N/A')} - {stats['time_period'].get('end', 'N/A')}\n"
text += f"• Длительность: {stats['time_period'].get('duration_hours', 0)} ч\n"
text += f"• Строк в логе: {stats['total_lines']:,}\n\n"
# Производительность
handling_times = stats.get(
"handling_times", []
) # сохрани список в analyze_bot_logs
median_time = statistics.median(handling_times) if handling_times else 0
text += "⚡ ПРОИЗВОДИТЕЛЬНОСТЬ:\n"
text += f"• Среднее время: {stats['performance']['avg_handling_time']:.0f} мс\n"
text += f"• Медиана времени: {median_time:.0f} мс\n"
text += f"• Сообщений обработано: {stats['performance']['handling_count']}\n"
text += f"• Успешных операций: {stats['success_rate']}%\n\n"
# Статус работы
duration = stats["time_period"].get("duration_hours", 0)
errors_total = sum(stats["errors"].values())
errors_per_hour = round(errors_total / duration, 2) if duration else 0
restarts = stats["restarts"]
mtbf = round(duration / restarts, 2) if restarts else duration
text += "🔄 СТАТУС РАБОТЫ:\n"
text += f"• Перезапусков: {restarts}\n"
text += f"• Uptime: {stats['uptime_percentage']}%\n"
text += f"• MTBF (среднее время между перезапусками): {mtbf} ч\n"
text += f"• Ошибок в час: {errors_per_hour}\n\n"
# Расписание
text += "📅 РАСПИСАНИЕ:\n"
text += f"• Проверок: {stats['schedule_checks']}\n"
text += f"• Изменений: {stats['schedule_changes']}\n"
text += f"• Ошибок: {stats['schedule_failures']}\n"
text += f"• Успешных проверок: {stats['schedule_success_rate']}%\n\n"
# Ошибки
text += "🚨 ОШИБКИ:\n"
text += f"• Всего ошибок: {errors_total}\n"
text += f"• Предупреждений: {sum(stats['warnings'].values())}\n"
text += f"• Сетевых: {stats['network_errors']}\n"
text += f"• Браузера: {stats['browser_errors']}\n"
# Топ-3 ошибок
if stats["errors"]:
text += "• Топ ошибок:\n"
for err, count in stats["errors"].most_common(3):
text += f" - {err} ({count})\n"
text += "\n"
# Пользователи
text += "👥 АКТИВНОСТЬ:\n"
text += f"• Команд: {sum(stats['user_commands'].values())}\n"
text += f"• Групп: {len(stats['groups'])}\n"
if stats["groups"]:
text += "• Топ групп:\n"
for group, count in stats["groups"].most_common(3):
text += f" - {group}: {count}\n"
return text
def create_statistics_file(stats):
"""Создает временный файл с полной статистикой"""
if "error" in stats:
return None
# Создаем временный файл
with tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", suffix=".json", delete=False
) as f:
json.dump(stats, f, ensure_ascii=False, indent=2, default=str)
temp_filename = f.name
return temp_filename
+53
View File
@@ -0,0 +1,53 @@
from datetime import datetime
from functools import wraps
from aiogram import types
from models.state import BotState
from config import Config
from storage.message_storage import save_message
def is_chat_spam(chat_id: int, state: BotState) -> bool:
"""Проверка на спам"""
if chat_id in Config.ADMINS:
return False
now = datetime.now()
if chat_id in state.last_chat_time:
delta = (now - state.last_chat_time[chat_id]).total_seconds()
if delta < Config.ANTISPAM_DELAY:
return True
state.last_chat_time[chat_id] = now
return False
def admin_required(need_level: int):
"""Декоратор для проверки прав администратора (0 = высший уровень)"""
def decorator(func):
@wraps(func)
async def wrapper(message: types.Message, *args, **kwargs):
user_id = message.from_user.id
user_level = Config.ADMINS.get(user_id, 9999) # 9999 = "нет прав"
# чем меньше число, тем выше права
if user_level > need_level:
await message.answer("⛔ У вас нет доступа к этой команде.")
return
return await func(message, *args, **kwargs)
return wrapper
return decorator
def saving(func):
"""Декоратор для сохранения входящего сообщения"""
@wraps(func)
async def wrapper(message: types.Message, *args, **kwargs):
save_message(message.chat.id, message.message_id)
return await func(message, *args, **kwargs)
return wrapper
+72
View File
@@ -0,0 +1,72 @@
import asyncio
import os
async def get_macbook_battery_level():
process = await asyncio.create_subprocess_exec(
"pmset",
"-g",
"batt",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise RuntimeError(f"Ошибка при выполнении pmset: {stderr.decode().strip()}")
output = stdout.decode().strip()
for part in output.splitlines():
if "%" in part:
percent_str = part.split("\t")[-1].split(";")[0].strip()
return int(percent_str.replace("%", ""))
raise ValueError("Не удалось определить уровень заряда")
async def get_process_usage(pid=None):
"""
Возвращает CPU, MEM% и RSS в мегабайтах для указанного процесса.
По умолчанию берёт текущий процесс (os.getpid()).
"""
if pid is None:
pid = os.getpid()
process = await asyncio.create_subprocess_exec(
"ps",
"-p",
str(pid),
"-o",
"%cpu,%mem,rss,comm",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise RuntimeError(f"Ошибка ps: {stderr.decode().strip()}")
lines = stdout.decode().strip().splitlines()
if len(lines) < 2:
return None
cpu, mem_percent, rss_kb, comm = lines[1].split(None, 3)
return {
"pid": pid,
"command": comm,
"cpu_percent": float(cpu),
"mem_percent": float(mem_percent),
"rss_mb": int(rss_kb) / 1024, # переводим КБ → МБ
}
async def main():
battery = await get_macbook_battery_level()
usage = await get_process_usage()
print(f"🔋 Батарея: {battery}%")
print(
f"🖥 CPU: {usage['cpu_percent']}% | MEM: {usage['mem_percent']}% | RSS: {usage['rss_mb']:.2f} MB"
)
if __name__ == "__main__":
asyncio.run(main())