From 58b47bec5eb1fcd115dfc86f9a81b3947f839b46 Mon Sep 17 00:00:00 2001 From: Niken Date: Sat, 4 Oct 2025 16:59:38 +0300 Subject: [PATCH] It's my tg bot for schedule. version 0.1 --- .gitignore | 2 + Read.me | 30 ++++ bot/__init__.py | 0 bot/core.py | 25 ++++ bot/filters.py | 0 config.py | 37 +++++ handlers/admin.py | 63 +++++++++ handlers/schedule.py | 35 +++++ main.py | 32 +++++ models/state.py | 26 ++++ services/schedule_service.py | 62 ++++++++ services/watcher_service.py | 97 +++++++++++++ storage/message_storage.py | 24 ++++ utils/analytics.py | 264 +++++++++++++++++++++++++++++++++++ utils/antispam.py | 43 ++++++ utils/mac_metrics.py | 27 ++++ 16 files changed, 767 insertions(+) create mode 100644 .gitignore create mode 100644 Read.me create mode 100644 bot/__init__.py create mode 100644 bot/core.py create mode 100644 bot/filters.py create mode 100644 config.py create mode 100644 handlers/admin.py create mode 100644 handlers/schedule.py create mode 100644 main.py create mode 100644 models/state.py create mode 100644 services/schedule_service.py create mode 100644 services/watcher_service.py create mode 100644 storage/message_storage.py create mode 100644 utils/analytics.py create mode 100644 utils/antispam.py create mode 100644 utils/mac_metrics.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4af2ef5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/.env +/code.txt diff --git a/Read.me b/Read.me new file mode 100644 index 0000000..7fe5b05 --- /dev/null +++ b/Read.me @@ -0,0 +1,30 @@ +telegram_bot/ +├── main.py # Точка входа +├── config.py # Конфигурация +├── bot/ +│ ├── __init__.py +│ ├── core.py # Основной класс бота +│ └── filters.py # Кастомные фильтры +├── handlers/ +│ ├── __init__.py +│ ├── admin.py # Админ-команды +│ ├── schedule.py # Команды расписания +│ ├── media.py # Медиа-команды +│ └── common.py # Общие команды +├── services/ +│ ├── __init__.py +│ ├── schedule_service.py # Логика расписания +│ ├── watcher_service.py # Слежка за изменениями +│ ├── media_service.py # Работа с медиа +│ └── gpt_service.py # GPT-функции +├── models/ +│ ├── __init__.py +│ └── state.py # Модели данных +├── utils/ +│ ├── __init__.py +│ ├── antispam.py # Антиспам система +│ ├── file_utils.py # Работа с файлами +│ └── validators.py # Валидаторы +└── storage/ + ├── __init__.py + └── message_storage.py # Работа с сообщениями diff --git a/bot/__init__.py b/bot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bot/core.py b/bot/core.py new file mode 100644 index 0000000..93d7e4a --- /dev/null +++ b/bot/core.py @@ -0,0 +1,25 @@ +from aiogram import Bot, Dispatcher +from config import Config +from models.state import BotState + + +class TelegramBot: + def __init__(self): + self.bot = Bot(token=Config.API_TOKEN) + self.dp = Dispatcher() + self.state = BotState() + + def setup_handlers(self): + """Регистрация всех обработчиков""" + from handlers import admin, schedule#, media, common + + # Регистрируем обработчики из разных модулей + admin.register_handlers(self.dp, self.state, self.bot) + schedule.register_handlers(self.dp, self.state, self.bot) + #media.register_handlers(self.dp, self.state, self.bot) + #common.register_handlers(self.dp, self.state, self.bot) + + async def start(self): + """Запуск бота""" + self.setup_handlers() + await self.dp.start_polling(self.bot) \ No newline at end of file diff --git a/bot/filters.py b/bot/filters.py new file mode 100644 index 0000000..e69de29 diff --git a/config.py b/config.py new file mode 100644 index 0000000..af964c3 --- /dev/null +++ b/config.py @@ -0,0 +1,37 @@ +import os +from dotenv import load_dotenv +from typing import Dict + +class Config: + # Загружаем .env + load_dotenv() + + # API + API_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") + if not API_TOKEN: + raise ValueError("❌ TELEGRAM_BOT_TOKEN не найден в переменных окружения!") + + # Admins (user_id: уровень) + ADMINS: Dict[int, int] = { + 850906163: 0 + } + + # Chats + CHAT_IDS = [-1003038389942] + GROUP_CHATS: Dict[str, int] = { + "30тс": -1003038389942, + "5Cа": 7571257031, + } + + # Settings + ANTISPAM_DELAY = 600 + WATCHER_BASE_DELAY = 600 + + # Пути + LOG_FILE = "bot.log" + + +if __name__ == "__main__": + a = Config() + print(a.API_TOKEN) + print(a.ADMINS) diff --git a/handlers/admin.py b/handlers/admin.py new file mode 100644 index 0000000..5f9d133 --- /dev/null +++ b/handlers/admin.py @@ -0,0 +1,63 @@ +from aiogram import types, Dispatcher, Bot +from aiogram.filters import Command +from models.state import BotState +from config import Config +from utils.antispam import admin_required +from services.watcher_service import WatcherService + + +def register_handlers(dp: Dispatcher, state: BotState, bot: Bot): + @dp.message(Command("log")) + @admin_required(3) + async def send_log(message: types.Message): + """Отправка логов""" + try: + log_file = types.FSInputFile(Config.LOG_FILE) + await message.answer_document(log_file, caption="📑 Логи бота") + except FileNotFoundError: + await message.answer("Файл логов пока не создан.") + + @dp.message(Command("status")) + @admin_required(3) + async def send_status(message: types.Message): + """Статус бота""" + from utils.analytics import analyze_bot_logs + from utils.mac_metrics import get_macbook_battery_level + + try: + stats = analyze_bot_logs(Config.LOG_FILE) + batt = await get_macbook_battery_level() + + status_text = ( + "🤖 СТАТУС БОТА\n" + "══════════════\n" + f"✅ Uptime: {stats.get('uptime_percentage', 0)}%\n" + f"⏱️ Слежка расписания: {'ВКЛ' if state.watcher_work else 'ВЫКЛ'}\n" + f"🔋 Уровень заряда: {batt}%" + ) + + await message.answer(status_text) + except Exception as e: + await message.answer(f"❌ Ошибка при проверке статуса: {str(e)}") + + @dp.message(Command("power")) + @admin_required(2) + async def power_control(message: types.Message): + """Управление слежкой""" + args = message.text.split() + if len(args) < 2: + status = "включена" if state.watcher_work else "выключена" + await message.answer(f"⏱️ Слежка расписания: {status}") + return + + command = args[1].lower() + watcher_service = WatcherService(state, bot) + + if command == "on" and not state.watcher_work: + await watcher_service.start() + await message.answer("✅ Слежка расписания включена") + elif command == "off" and state.watcher_work: + await watcher_service.stop() + await message.answer("❌ Слежка расписания выключена") + else: + await message.answer("❌ Неверная команда") \ No newline at end of file diff --git a/handlers/schedule.py b/handlers/schedule.py new file mode 100644 index 0000000..0713a08 --- /dev/null +++ b/handlers/schedule.py @@ -0,0 +1,35 @@ +from aiogram import types, Dispatcher, Bot +from aiogram.filters import Command +from models.state import BotState +from services.schedule_service import ScheduleService +from utils.antispam import is_chat_spam +from storage.message_storage import save_message + + +def register_handlers(dp: Dispatcher, state: BotState, bot: Bot): + @dp.message(Command("rasp")) + async def send_schedule(message: types.Message): + """Отправка расписания""" + if is_chat_spam(message.chat.id, state): + await message.answer("НЕ СПАМЬТЕ!!!") + return + + args = message.text.split(maxsplit=2) + group = args[1].strip() if len(args) > 1 else "30тс" + day_offset = int(args[2]) if len(args) > 2 and args[2].isdigit() else 0 + + schedule_service = ScheduleService() + clip_png, url, day, mouth = await schedule_service.get_schedule(group, day_offset) + + if clip_png: + save_message(message.chat.id, message.message_id) + + msg = await message.answer_photo( + types.BufferedInputFile(clip_png, filename=f"{group}.png"), + caption=f"Расписание для {group} на {day}.{mouth:02d}" + ) + save_message(message.chat.id, msg.message_id) + + state.file_id_cache[group.lower()] = msg.photo[-1].file_id + else: + await message.answer(f"Не удалось найти расписание для {group}") \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..33d32c4 --- /dev/null +++ b/main.py @@ -0,0 +1,32 @@ +from asyncio import run +from logging import basicConfig, FileHandler, StreamHandler, INFO, getLogger +from bot.core import TelegramBot +from config import Config + +# Настройка логирования +basicConfig( + level=INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[ + FileHandler(Config.LOG_FILE, encoding="utf-8"), + StreamHandler() + ], + force=True +) + +logger = getLogger(__name__) + +async def main(): + """Основная функция запуска""" + try: + bot = TelegramBot() + logger.info("Бот запускается...") + await bot.start() + except Exception as e: + logger.error(f"Ошибка при запуске бота: {e}") + finally: + logger.info("Бот остановлен") + +if __name__ == "__main__": + run(main()) \ No newline at end of file diff --git a/models/state.py b/models/state.py new file mode 100644 index 0000000..5ef8a30 --- /dev/null +++ b/models/state.py @@ -0,0 +1,26 @@ +from dataclasses import dataclass +from typing import Dict, Optional +from asyncio import Task + +@dataclass +class BotState: + """Состояние бота""" + last_chat_time: Dict[int, str] = None + last_pinned: Dict[str, int] = None + watcher_work: bool = False + watcher_task: Optional[Task] = None + file_id_cache: Dict[str, str] = None + last_day: Dict[str, int] = None + last_clip_hash: Dict[str, str] = None + + def __post_init__(self): + if self.last_chat_time is None: + self.last_chat_time = {} + if self.last_pinned is None: + self.last_pinned = {} + if self.file_id_cache is None: + self.file_id_cache = {} + if self.last_day is None: + self.last_day = {} + if self.last_clip_hash is None: + self.last_clip_hash = {} \ No newline at end of file diff --git a/services/schedule_service.py b/services/schedule_service.py new file mode 100644 index 0000000..6fd0d27 --- /dev/null +++ b/services/schedule_service.py @@ -0,0 +1,62 @@ +import hashlib +from datetime import datetime, timedelta +from typing import Optional, Tuple +from playwright.async_api import async_playwright, ViewportSize, FloatRect +import logging + +logger = logging.getLogger(__name__) + + +class ScheduleService: + def __init__(self): + self.base_url = "https://college.by/accounts/raspis/{mouth:02d}/{day:02d}-PODNAM.htm" + + def _make_url(self, day: int = 0) -> Tuple[str, int, int]: + """Генерация URL для расписания""" + d = datetime.now() + if day == 0: + if d.hour >= 12: + d += timedelta(days=1) + if d.weekday() == 6: + d += timedelta(days=1) + return self.base_url.format(day=d.day, mouth=d.month), d.day, d.month + else: + return self.base_url.format(day=int(day), mouth=d.month), int(day), int(d.month) + + async def get_schedule(self, group: str, day_offset: int = 0) -> Tuple[Optional[bytes], str, int, int]: + """Получение скриншота расписания""" + url, day, month = self._make_url(day_offset) + + async with async_playwright() as p: + browser = await p.chromium.launch(headless=True) + context = await browser.new_context(viewport=ViewportSize(width=400, height=3000)) + page = await context.new_page() + + try: + response = await page.goto(url, wait_until="networkidle", timeout=30000) + + if not response or response.status != 200: + logger.warning(f"Ошибка загрузки страницы: {url}") + return None, url, day, month + + locator = page.locator("p", has_text=group).first + if await locator.count() > 0: + await locator.scroll_into_view_if_needed() + box = await locator.bounding_box() + + if box: + clip_rect = FloatRect( + x=float(max(box["x"] - 0, 0)), + y=float(max(box["y"] - 0, 0)), + width=float(box["width"] + 150), + height=float(box["height"] + 100) + ) + return await page.screenshot(clip=clip_rect), url, day, month + + except Exception as e: + logger.error(f"Ошибка при получении расписания: {e}") + finally: + await context.close() + await browser.close() + + return None, url, day, month \ No newline at end of file diff --git a/services/watcher_service.py b/services/watcher_service.py new file mode 100644 index 0000000..b58df68 --- /dev/null +++ b/services/watcher_service.py @@ -0,0 +1,97 @@ +import asyncio +import hashlib +from datetime import datetime, timedelta +from random import randint +from aiogram import Bot +from aiogram.types import BufferedInputFile +from models.state import BotState +from config import Config +from services.schedule_service import ScheduleService +import logging + + +logger = logging.getLogger(__name__) + + +class WatcherService: + def __init__(self, state: BotState, bot: Bot): + self.state = state + self.bot = bot + self.schedule_service = ScheduleService() + + async def start(self): + """Запуск слежки""" + if self.state.watcher_work: + return + + self.state.watcher_work = True + self.state.watcher_task = asyncio.create_task(self._watcher_loop()) + logger.info("Watcher запущен") + + async def stop(self): + """Остановка слежки""" + if not self.state.watcher_work: + return + + self.state.watcher_work = False + if self.state.watcher_task: + self.state.watcher_task.cancel() + try: + await self.state.watcher_task + except asyncio.CancelledError: + pass + logger.info("Watcher остановлен") + + async def _watcher_loop(self): + """Основной цикл слежки""" + while self.state.watcher_work: + try: + await self._check_all_groups() + delay = randint(600, 700) + await asyncio.sleep(delay) + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Ошибка в watcher_loop: {e}") + await asyncio.sleep(60) + + def _get_target_day(self) -> datetime: + """Получение целевого дня""" + now = datetime.now() + target = now + timedelta(days=1) + if target.weekday() == 6: + target += timedelta(days=1) + return target + + + async def _check_all_groups(self): + """Проверка всех групп на изменения""" + day = self._get_target_day() + + for group, chat_id in Config.GROUP_CHATS.items(): + await self._check_group_schedule(group, chat_id, day) + + async def _check_group_schedule(self, group: str, chat_id: int, day: int): + """Проверка расписания для конкретной группы""" + + logger.info(f"Проверяем расписание для {group} на {day.strftime('%d.%m.%Y')}") + + clip_png, url, data_day, data_mouth = await self.schedule_service.get_schedule(group, day.day) + if clip_png: + msg = await self.bot.send_photo( + chat_id, + BufferedInputFile(clip_png, filename=f"{group}.png"), + caption=f"Авто-расписание для {group} на {data_day:02d}.{data_mouth:02d}" + ) + await self.bot.pin_chat_message(chat_id, msg.message_id, disable_notification=True) + + else: + logger.warning(f"Не удалось получить расписание для {group}, {data_day}, {data_mouth}, {url}") + return + + + + #clip_hash = hashlib.md5(clip_png).hexdigest() + + # Логика проверки изменений и отправки сообщений + # ... (ваша существующая логика) \ No newline at end of file diff --git a/storage/message_storage.py b/storage/message_storage.py new file mode 100644 index 0000000..8c557d4 --- /dev/null +++ b/storage/message_storage.py @@ -0,0 +1,24 @@ +import os + +MESSAGES_FILE = "messages.txt" + +# --- функция для записи message_id --- +def save_message(chat_id: int, message_id: int): + with open(MESSAGES_FILE, "a", encoding="utf-8") as f: + f.write(f"{chat_id},{message_id}\n") + +# --- функция для загрузки всех сообщений --- +def load_messages(): + if not os.path.exists(MESSAGES_FILE): + return [] + with open(MESSAGES_FILE, "r", encoding="utf-8") as f: + lines = f.readlines() + return [tuple(map(int, line.strip().split(","))) for line in lines if line.strip()] + +# --- функция для очистки файла --- +def clear_messages(): + open(MESSAGES_FILE, "w").close() + + + + diff --git a/utils/analytics.py b/utils/analytics.py new file mode 100644 index 0000000..ca243cb --- /dev/null +++ b/utils/analytics.py @@ -0,0 +1,264 @@ +from collections import Counter +import re +from datetime import datetime +import tempfile +import json + + + +def analyze_bot_logs(log_file_path="bot.log"): + """ + Анализирует логи бота и создает детальную статистику + """ + try: + with open(log_file_path, 'r', encoding='utf-8') as log: + lines = log.readlines() + except FileNotFoundError: + return {"error": "Лог файл не найден"} + except Exception as e: + return {"error": f"Ошибка чтения файла: {str(e)}"} + + if not lines: + return {"error": "Лог файл пуст"} + + # Основные счетчики + stats = { + 'total_lines': len(lines), + 'time_period': {}, + 'log_levels': Counter(), + 'activities': Counter(), + 'errors': Counter(), + 'warnings': Counter(), + 'user_commands': Counter(), + 'groups': Counter(), + 'restarts': 0, + 'schedule_checks': 0, + 'schedule_changes': 0, + 'schedule_failures': 0, + 'network_errors': 0, + 'browser_errors': 0, + 'telegram_errors': Counter(), + 'performance': { + 'avg_handling_time': 0, + 'fastest_handling': float('inf'), + 'slowest_handling': 0, + 'handling_count': 0 + } + } + + # Временные метрики + start_time = None + end_time = None + handling_times = [] + + # Регулярные выражения для парсинга + timestamp_pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})' + log_level_pattern = r'\[(INFO|WARNING|ERROR)\]' + handling_time_pattern = r'Duration (\d+) ms' + command_pattern = r'Команда /rasp от ([\d-]+), группа=([^,]+), дата=(\d+)' + schedule_pattern = r'Проверяем расписание для ([^ ]+) на (\d{2}\.\d{2}\.\d{4})' + + for line in lines: + # Извлекаем временную метку + timestamp_match = re.search(timestamp_pattern, line) + if timestamp_match: + timestamp = timestamp_match.group(1) + if not start_time: + start_time = timestamp + end_time = timestamp + + # Уровень логирования + level_match = re.search(log_level_pattern, line) + if level_match: + level = level_match.group(1) + stats['log_levels'][level] += 1 + + # Время обработки сообщений + time_match = re.search(handling_time_pattern, line) + if time_match and 'is handled' in line: + handling_time = int(time_match.group(1)) + handling_times.append(handling_time) + stats['performance']['handling_count'] += 1 + stats['performance']['slowest_handling'] = max( + stats['performance']['slowest_handling'], handling_time + ) + stats['performance']['fastest_handling'] = min( + stats['performance']['fastest_handling'], handling_time + ) + + # Команды пользователей + cmd_match = re.search(command_pattern, line) + if cmd_match: + user_id, group, date_offset = cmd_match.groups() + stats['user_commands'][group] += 1 + stats['groups'][group] += 1 + + # Проверки расписания + if 'Проверяем расписание' in line: + stats['schedule_checks'] += 1 + sched_match = re.search(schedule_pattern, line) + if sched_match: + group, date = sched_match.groups() + stats['groups'][group] += 1 + + # Изменения расписания + if 'Изменения найдены' in line: + stats['schedule_changes'] += 1 + + # Ошибки расписания + if 'Не удалось получить расписание' in line: + stats['schedule_failures'] += 1 + + # Перезапуски бота + if 'Бот запускается' in line: + stats['restarts'] += 1 + + # Сетевые ошибки + if 'Failed to fetch updates' in line: + stats['network_errors'] += 1 + + # Ошибки браузера + if 'TargetClosedError' in line or 'BrowserContext.close' in line: + stats['browser_errors'] += 1 + + # Ошибки Telegram API + if 'Telegram server says' in line: + error_msg = line.split('Telegram server says - ')[-1].split(':')[0] + stats['telegram_errors'][error_msg] += 1 + + # Сбор ошибок и предупреждений + if '[ERROR]' in line: + error_msg = line.split('[ERROR]')[-1].strip() + stats['errors'][error_msg[:100]] += 1 + + if '[WARNING]' in line: + warning_msg = line.split('[WARNING]')[-1].strip() + stats['warnings'][warning_msg[:100]] += 1 + + # Расчет средней скорости обработки + if handling_times: + stats['performance']['avg_handling_time'] = sum(handling_times) / len(handling_times) + + # Период работы + if start_time and end_time: + stats['time_period'] = { + 'start': start_time, + 'end': end_time, + 'duration_hours': calculate_duration_hours(start_time, end_time) + } + + # Дополнительные метрики + stats['success_rate'] = calculate_success_rate(stats) + stats['uptime_percentage'] = calculate_uptime_percentage(stats) + stats['schedule_success_rate'] = calculate_schedule_success_rate(stats) + + return stats + + +def calculate_duration_hours(start_str, end_str): + """Вычисляет продолжительность в часах""" + try: + fmt = '%Y-%m-%d %H:%M:%S' + start = datetime.strptime(start_str, fmt) + end = datetime.strptime(end_str, fmt) + return round((end - start).total_seconds() / 3600, 2) + except: + return 0 + + +def calculate_success_rate(stats): + """Рассчитывает процент успешных операций""" + total_operations = stats['performance']['handling_count'] + sum(stats['errors'].values()) + if total_operations == 0: + return 0 + success_rate = (stats['performance']['handling_count'] / total_operations) * 100 + return round(success_rate, 2) + + +def calculate_uptime_percentage(stats): + """Рассчитывает процент времени работы""" + if stats['time_period'].get('duration_hours', 0) == 0: + return 0 + # Предполагаем, что каждый перезапуск занимает ~10 секунд + restart_downtime = stats['restarts'] * 10 / 3600 + total_hours = stats['time_period']['duration_hours'] + uptime_hours = total_hours - restart_downtime + uptime_percentage = (uptime_hours / total_hours) * 100 + return round(uptime_percentage, 2) + + +def calculate_schedule_success_rate(stats): + """Рассчитывает процент успешных проверок расписания""" + total_checks = stats['schedule_checks'] + if total_checks == 0: + return 0 + successful_checks = total_checks - stats['schedule_failures'] + success_rate = (successful_checks / total_checks) * 100 + return round(success_rate, 2) + + +def create_statistics_text(stats): + """Создает текстовый отчет статистики""" + if 'error' in stats: + return f"❌ Ошибка анализа логов: {stats['error']}" + + text = "📊 СТАТИСТИКА РАБОТЫ БОТА\n" + text += "=" * 40 + "\n\n" + + # Основная информация + text += "📈 ОБЩАЯ ИНФОРМАЦИЯ:\n" + text += f"• Период: {stats['time_period'].get('start', 'N/A')} - {stats['time_period'].get('end', 'N/A')}\n" + text += f"• Длительность: {stats['time_period'].get('duration_hours', 0)} ч\n" + text += f"• Строк в логе: {stats['total_lines']:,}\n\n" + + # Производительность + text += "⚡ ПРОИЗВОДИТЕЛЬНОСТЬ:\n" + text += f"• Среднее время: {stats['performance']['avg_handling_time']:.0f} мс\n" + text += f"• Сообщений обработано: {stats['performance']['handling_count']}\n" + text += f"• Успешных операций: {stats['success_rate']}%\n\n" + + # Статус работы + text += "🔄 СТАТУС РАБОТЫ:\n" + text += f"• Перезапусков: {stats['restarts']}\n" + text += f"• Uptime: {stats['uptime_percentage']}%\n\n" + + # Расписание + text += "📅 РАСПИСАНИЕ:\n" + text += f"• Проверок: {stats['schedule_checks']}\n" + text += f"• Изменений: {stats['schedule_changes']}\n" + text += f"• Ошибок: {stats['schedule_failures']}\n" + text += f"• Успешных проверок: {stats['schedule_success_rate']}%\n\n" + + # Ошибки + text += "🚨 ОШИБКИ:\n" + text += f"• Всего ошибок: {sum(stats['errors'].values())}\n" + text += f"• Предупреждений: {sum(stats['warnings'].values())}\n" + text += f"• Сетевых: {stats['network_errors']}\n" + text += f"• Браузера: {stats['browser_errors']}\n\n" + + # Пользователи + text += "👥 АКТИВНОСТЬ:\n" + text += f"• Команд: {sum(stats['user_commands'].values())}\n" + text += f"• Групп: {len(stats['groups'])}\n" + + # Топ групп + if stats['groups']: + text += "• Топ групп:\n" + for group, count in stats['groups'].most_common(3): + text += f" - {group}: {count}\n" + + return text + + +def create_statistics_file(stats): + """Создает временный файл с полной статистикой""" + if 'error' in stats: + return None + + # Создаем временный файл + with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8', + suffix='.json', delete=False) as f: + json.dump(stats, f, ensure_ascii=False, indent=2, default=str) + temp_filename = f.name + + return temp_filename \ No newline at end of file diff --git a/utils/antispam.py b/utils/antispam.py new file mode 100644 index 0000000..305f78e --- /dev/null +++ b/utils/antispam.py @@ -0,0 +1,43 @@ +from datetime import datetime +from functools import wraps +from aiogram import types +from models.state import BotState +from config import Config + + +def is_chat_spam(chat_id: int, state: BotState) -> bool: + """Проверка на спам""" + if chat_id in Config.ADMINS: + return False + + now = datetime.now() + if chat_id in state.last_chat_time: + delta = (now - state.last_chat_time[chat_id]).total_seconds() + if delta < Config.ANTISPAM_DELAY: + return True + + state.last_chat_time[chat_id] = now + return False + + +from functools import wraps +from aiogram import types + +def admin_required(need_level: int): + """Декоратор для проверки прав администратора (0 = высший уровень)""" + + def decorator(func): + @wraps(func) + async def wrapper(message: types.Message, *args, **kwargs): + user_id = message.from_user.id + user_level = Config.ADMINS.get(user_id, 9999) # 9999 = "нет прав" + + # чем меньше число, тем выше права + if user_level > need_level: + await message.answer("⛔ У вас нет доступа к этой команде.") + return + + return await func(message, *args, **kwargs) + + return wrapper + return decorator diff --git a/utils/mac_metrics.py b/utils/mac_metrics.py new file mode 100644 index 0000000..2b82130 --- /dev/null +++ b/utils/mac_metrics.py @@ -0,0 +1,27 @@ +import asyncio + +async def get_macbook_battery_level(): + """ + Асинхронная функция для получения уровня заряда батареи на macOS. + Использует утилиту pmset. + """ + # Запускаем команду pmset -g batt + process = await asyncio.create_subprocess_exec( + "pmset", "-g", "batt", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + stdout, stderr = await process.communicate() + + if process.returncode != 0: + raise RuntimeError(f"Ошибка при выполнении pmset: {stderr.decode().strip()}") + + output = stdout.decode().strip() + # Пример строки: " - InternalBattery-0 (id=1234567) 85%; charging; ..." + for part in output.splitlines(): + if "%" in part: + percent_str = part.split("\t")[-1].split(";")[0].strip() + return int(percent_str.replace("%", "")) + + raise ValueError("Не удалось определить уровень заряда") \ No newline at end of file