It's my tg bot for schedule. version 0.1

This commit is contained in:
Niken
2025-10-04 16:59:38 +03:00
commit 58b47bec5e
16 changed files with 767 additions and 0 deletions
+2
View File
@@ -0,0 +1,2 @@
/.env
/code.txt
+30
View File
@@ -0,0 +1,30 @@
telegram_bot/
├── main.py # Точка входа
├── config.py # Конфигурация
├── bot/
│ ├── __init__.py
│ ├── core.py # Основной класс бота
│ └── filters.py # Кастомные фильтры
├── handlers/
│ ├── __init__.py
│ ├── admin.py # Админ-команды
│ ├── schedule.py # Команды расписания
│ ├── media.py # Медиа-команды
│ └── common.py # Общие команды
├── services/
│ ├── __init__.py
│ ├── schedule_service.py # Логика расписания
│ ├── watcher_service.py # Слежка за изменениями
│ ├── media_service.py # Работа с медиа
│ └── gpt_service.py # GPT-функции
├── models/
│ ├── __init__.py
│ └── state.py # Модели данных
├── utils/
│ ├── __init__.py
│ ├── antispam.py # Антиспам система
│ ├── file_utils.py # Работа с файлами
│ └── validators.py # Валидаторы
└── storage/
├── __init__.py
└── message_storage.py # Работа с сообщениями
View File
+25
View File
@@ -0,0 +1,25 @@
from aiogram import Bot, Dispatcher
from config import Config
from models.state import BotState
class TelegramBot:
def __init__(self):
self.bot = Bot(token=Config.API_TOKEN)
self.dp = Dispatcher()
self.state = BotState()
def setup_handlers(self):
"""Регистрация всех обработчиков"""
from handlers import admin, schedule#, media, common
# Регистрируем обработчики из разных модулей
admin.register_handlers(self.dp, self.state, self.bot)
schedule.register_handlers(self.dp, self.state, self.bot)
#media.register_handlers(self.dp, self.state, self.bot)
#common.register_handlers(self.dp, self.state, self.bot)
async def start(self):
"""Запуск бота"""
self.setup_handlers()
await self.dp.start_polling(self.bot)
View File
+37
View File
@@ -0,0 +1,37 @@
import os
from dotenv import load_dotenv
from typing import Dict
class Config:
# Загружаем .env
load_dotenv()
# API
API_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
if not API_TOKEN:
raise ValueError("❌ TELEGRAM_BOT_TOKEN не найден в переменных окружения!")
# Admins (user_id: уровень)
ADMINS: Dict[int, int] = {
850906163: 0
}
# Chats
CHAT_IDS = [-1003038389942]
GROUP_CHATS: Dict[str, int] = {
"30тс": -1003038389942,
"5Cа": 7571257031,
}
# Settings
ANTISPAM_DELAY = 600
WATCHER_BASE_DELAY = 600
# Пути
LOG_FILE = "bot.log"
if __name__ == "__main__":
a = Config()
print(a.API_TOKEN)
print(a.ADMINS)
+63
View File
@@ -0,0 +1,63 @@
from aiogram import types, Dispatcher, Bot
from aiogram.filters import Command
from models.state import BotState
from config import Config
from utils.antispam import admin_required
from services.watcher_service import WatcherService
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("log"))
@admin_required(3)
async def send_log(message: types.Message):
"""Отправка логов"""
try:
log_file = types.FSInputFile(Config.LOG_FILE)
await message.answer_document(log_file, caption="📑 Логи бота")
except FileNotFoundError:
await message.answer("Файл логов пока не создан.")
@dp.message(Command("status"))
@admin_required(3)
async def send_status(message: types.Message):
"""Статус бота"""
from utils.analytics import analyze_bot_logs
from utils.mac_metrics import get_macbook_battery_level
try:
stats = analyze_bot_logs(Config.LOG_FILE)
batt = await get_macbook_battery_level()
status_text = (
"🤖 СТАТУС БОТА\n"
"══════════════\n"
f"✅ Uptime: {stats.get('uptime_percentage', 0)}%\n"
f"⏱️ Слежка расписания: {'ВКЛ' if state.watcher_work else 'ВЫКЛ'}\n"
f"🔋 Уровень заряда: {batt}%"
)
await message.answer(status_text)
except Exception as e:
await message.answer(f"❌ Ошибка при проверке статуса: {str(e)}")
@dp.message(Command("power"))
@admin_required(2)
async def power_control(message: types.Message):
"""Управление слежкой"""
args = message.text.split()
if len(args) < 2:
status = "включена" if state.watcher_work else "выключена"
await message.answer(f"⏱️ Слежка расписания: {status}")
return
command = args[1].lower()
watcher_service = WatcherService(state, bot)
if command == "on" and not state.watcher_work:
await watcher_service.start()
await message.answer("✅ Слежка расписания включена")
elif command == "off" and state.watcher_work:
await watcher_service.stop()
await message.answer("❌ Слежка расписания выключена")
else:
await message.answer("❌ Неверная команда")
+35
View File
@@ -0,0 +1,35 @@
from aiogram import types, Dispatcher, Bot
from aiogram.filters import Command
from models.state import BotState
from services.schedule_service import ScheduleService
from utils.antispam import is_chat_spam
from storage.message_storage import save_message
def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
@dp.message(Command("rasp"))
async def send_schedule(message: types.Message):
"""Отправка расписания"""
if is_chat_spam(message.chat.id, state):
await message.answer("НЕ СПАМЬТЕ!!!")
return
args = message.text.split(maxsplit=2)
group = args[1].strip() if len(args) > 1 else "30тс"
day_offset = int(args[2]) if len(args) > 2 and args[2].isdigit() else 0
schedule_service = ScheduleService()
clip_png, url, day, mouth = await schedule_service.get_schedule(group, day_offset)
if clip_png:
save_message(message.chat.id, message.message_id)
msg = await message.answer_photo(
types.BufferedInputFile(clip_png, filename=f"{group}.png"),
caption=f"Расписание для {group} на {day}.{mouth:02d}"
)
save_message(message.chat.id, msg.message_id)
state.file_id_cache[group.lower()] = msg.photo[-1].file_id
else:
await message.answer(f"Не удалось найти расписание для {group}")
+32
View File
@@ -0,0 +1,32 @@
from asyncio import run
from logging import basicConfig, FileHandler, StreamHandler, INFO, getLogger
from bot.core import TelegramBot
from config import Config
# Настройка логирования
basicConfig(
level=INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
FileHandler(Config.LOG_FILE, encoding="utf-8"),
StreamHandler()
],
force=True
)
logger = getLogger(__name__)
async def main():
"""Основная функция запуска"""
try:
bot = TelegramBot()
logger.info("Бот запускается...")
await bot.start()
except Exception as e:
logger.error(f"Ошибка при запуске бота: {e}")
finally:
logger.info("Бот остановлен")
if __name__ == "__main__":
run(main())
+26
View File
@@ -0,0 +1,26 @@
from dataclasses import dataclass
from typing import Dict, Optional
from asyncio import Task
@dataclass
class BotState:
"""Состояние бота"""
last_chat_time: Dict[int, str] = None
last_pinned: Dict[str, int] = None
watcher_work: bool = False
watcher_task: Optional[Task] = None
file_id_cache: Dict[str, str] = None
last_day: Dict[str, int] = None
last_clip_hash: Dict[str, str] = None
def __post_init__(self):
if self.last_chat_time is None:
self.last_chat_time = {}
if self.last_pinned is None:
self.last_pinned = {}
if self.file_id_cache is None:
self.file_id_cache = {}
if self.last_day is None:
self.last_day = {}
if self.last_clip_hash is None:
self.last_clip_hash = {}
+62
View File
@@ -0,0 +1,62 @@
import hashlib
from datetime import datetime, timedelta
from typing import Optional, Tuple
from playwright.async_api import async_playwright, ViewportSize, FloatRect
import logging
logger = logging.getLogger(__name__)
class ScheduleService:
def __init__(self):
self.base_url = "https://college.by/accounts/raspis/{mouth:02d}/{day:02d}-PODNAM.htm"
def _make_url(self, day: int = 0) -> Tuple[str, int, int]:
"""Генерация URL для расписания"""
d = datetime.now()
if day == 0:
if d.hour >= 12:
d += timedelta(days=1)
if d.weekday() == 6:
d += timedelta(days=1)
return self.base_url.format(day=d.day, mouth=d.month), d.day, d.month
else:
return self.base_url.format(day=int(day), mouth=d.month), int(day), int(d.month)
async def get_schedule(self, group: str, day_offset: int = 0) -> Tuple[Optional[bytes], str, int, int]:
"""Получение скриншота расписания"""
url, day, month = self._make_url(day_offset)
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(viewport=ViewportSize(width=400, height=3000))
page = await context.new_page()
try:
response = await page.goto(url, wait_until="networkidle", timeout=30000)
if not response or response.status != 200:
logger.warning(f"Ошибка загрузки страницы: {url}")
return None, url, day, month
locator = page.locator("p", has_text=group).first
if await locator.count() > 0:
await locator.scroll_into_view_if_needed()
box = await locator.bounding_box()
if box:
clip_rect = FloatRect(
x=float(max(box["x"] - 0, 0)),
y=float(max(box["y"] - 0, 0)),
width=float(box["width"] + 150),
height=float(box["height"] + 100)
)
return await page.screenshot(clip=clip_rect), url, day, month
except Exception as e:
logger.error(f"Ошибка при получении расписания: {e}")
finally:
await context.close()
await browser.close()
return None, url, day, month
+97
View File
@@ -0,0 +1,97 @@
import asyncio
import hashlib
from datetime import datetime, timedelta
from random import randint
from aiogram import Bot
from aiogram.types import BufferedInputFile
from models.state import BotState
from config import Config
from services.schedule_service import ScheduleService
import logging
logger = logging.getLogger(__name__)
class WatcherService:
def __init__(self, state: BotState, bot: Bot):
self.state = state
self.bot = bot
self.schedule_service = ScheduleService()
async def start(self):
"""Запуск слежки"""
if self.state.watcher_work:
return
self.state.watcher_work = True
self.state.watcher_task = asyncio.create_task(self._watcher_loop())
logger.info("Watcher запущен")
async def stop(self):
"""Остановка слежки"""
if not self.state.watcher_work:
return
self.state.watcher_work = False
if self.state.watcher_task:
self.state.watcher_task.cancel()
try:
await self.state.watcher_task
except asyncio.CancelledError:
pass
logger.info("Watcher остановлен")
async def _watcher_loop(self):
"""Основной цикл слежки"""
while self.state.watcher_work:
try:
await self._check_all_groups()
delay = randint(600, 700)
await asyncio.sleep(delay)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Ошибка в watcher_loop: {e}")
await asyncio.sleep(60)
def _get_target_day(self) -> datetime:
"""Получение целевого дня"""
now = datetime.now()
target = now + timedelta(days=1)
if target.weekday() == 6:
target += timedelta(days=1)
return target
async def _check_all_groups(self):
"""Проверка всех групп на изменения"""
day = self._get_target_day()
for group, chat_id in Config.GROUP_CHATS.items():
await self._check_group_schedule(group, chat_id, day)
async def _check_group_schedule(self, group: str, chat_id: int, day: int):
"""Проверка расписания для конкретной группы"""
logger.info(f"Проверяем расписание для {group} на {day.strftime('%d.%m.%Y')}")
clip_png, url, data_day, data_mouth = await self.schedule_service.get_schedule(group, day.day)
if clip_png:
msg = await self.bot.send_photo(
chat_id,
BufferedInputFile(clip_png, filename=f"{group}.png"),
caption=f"Авто-расписание для {group} на {data_day:02d}.{data_mouth:02d}"
)
await self.bot.pin_chat_message(chat_id, msg.message_id, disable_notification=True)
else:
logger.warning(f"Не удалось получить расписание для {group}, {data_day}, {data_mouth}, {url}")
return
#clip_hash = hashlib.md5(clip_png).hexdigest()
# Логика проверки изменений и отправки сообщений
# ... (ваша существующая логика)
+24
View File
@@ -0,0 +1,24 @@
import os
MESSAGES_FILE = "messages.txt"
# --- функция для записи message_id ---
def save_message(chat_id: int, message_id: int):
with open(MESSAGES_FILE, "a", encoding="utf-8") as f:
f.write(f"{chat_id},{message_id}\n")
# --- функция для загрузки всех сообщений ---
def load_messages():
if not os.path.exists(MESSAGES_FILE):
return []
with open(MESSAGES_FILE, "r", encoding="utf-8") as f:
lines = f.readlines()
return [tuple(map(int, line.strip().split(","))) for line in lines if line.strip()]
# --- функция для очистки файла ---
def clear_messages():
open(MESSAGES_FILE, "w").close()
+264
View File
@@ -0,0 +1,264 @@
from collections import Counter
import re
from datetime import datetime
import tempfile
import json
def analyze_bot_logs(log_file_path="bot.log"):
"""
Анализирует логи бота и создает детальную статистику
"""
try:
with open(log_file_path, 'r', encoding='utf-8') as log:
lines = log.readlines()
except FileNotFoundError:
return {"error": "Лог файл не найден"}
except Exception as e:
return {"error": f"Ошибка чтения файла: {str(e)}"}
if not lines:
return {"error": "Лог файл пуст"}
# Основные счетчики
stats = {
'total_lines': len(lines),
'time_period': {},
'log_levels': Counter(),
'activities': Counter(),
'errors': Counter(),
'warnings': Counter(),
'user_commands': Counter(),
'groups': Counter(),
'restarts': 0,
'schedule_checks': 0,
'schedule_changes': 0,
'schedule_failures': 0,
'network_errors': 0,
'browser_errors': 0,
'telegram_errors': Counter(),
'performance': {
'avg_handling_time': 0,
'fastest_handling': float('inf'),
'slowest_handling': 0,
'handling_count': 0
}
}
# Временные метрики
start_time = None
end_time = None
handling_times = []
# Регулярные выражения для парсинга
timestamp_pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})'
log_level_pattern = r'\[(INFO|WARNING|ERROR)\]'
handling_time_pattern = r'Duration (\d+) ms'
command_pattern = r'Команда /rasp от ([\d-]+), группа=([^,]+), дата=(\d+)'
schedule_pattern = r'Проверяем расписание для ([^ ]+) на (\d{2}\.\d{2}\.\d{4})'
for line in lines:
# Извлекаем временную метку
timestamp_match = re.search(timestamp_pattern, line)
if timestamp_match:
timestamp = timestamp_match.group(1)
if not start_time:
start_time = timestamp
end_time = timestamp
# Уровень логирования
level_match = re.search(log_level_pattern, line)
if level_match:
level = level_match.group(1)
stats['log_levels'][level] += 1
# Время обработки сообщений
time_match = re.search(handling_time_pattern, line)
if time_match and 'is handled' in line:
handling_time = int(time_match.group(1))
handling_times.append(handling_time)
stats['performance']['handling_count'] += 1
stats['performance']['slowest_handling'] = max(
stats['performance']['slowest_handling'], handling_time
)
stats['performance']['fastest_handling'] = min(
stats['performance']['fastest_handling'], handling_time
)
# Команды пользователей
cmd_match = re.search(command_pattern, line)
if cmd_match:
user_id, group, date_offset = cmd_match.groups()
stats['user_commands'][group] += 1
stats['groups'][group] += 1
# Проверки расписания
if 'Проверяем расписание' in line:
stats['schedule_checks'] += 1
sched_match = re.search(schedule_pattern, line)
if sched_match:
group, date = sched_match.groups()
stats['groups'][group] += 1
# Изменения расписания
if 'Изменения найдены' in line:
stats['schedule_changes'] += 1
# Ошибки расписания
if 'Не удалось получить расписание' in line:
stats['schedule_failures'] += 1
# Перезапуски бота
if 'Бот запускается' in line:
stats['restarts'] += 1
# Сетевые ошибки
if 'Failed to fetch updates' in line:
stats['network_errors'] += 1
# Ошибки браузера
if 'TargetClosedError' in line or 'BrowserContext.close' in line:
stats['browser_errors'] += 1
# Ошибки Telegram API
if 'Telegram server says' in line:
error_msg = line.split('Telegram server says - ')[-1].split(':')[0]
stats['telegram_errors'][error_msg] += 1
# Сбор ошибок и предупреждений
if '[ERROR]' in line:
error_msg = line.split('[ERROR]')[-1].strip()
stats['errors'][error_msg[:100]] += 1
if '[WARNING]' in line:
warning_msg = line.split('[WARNING]')[-1].strip()
stats['warnings'][warning_msg[:100]] += 1
# Расчет средней скорости обработки
if handling_times:
stats['performance']['avg_handling_time'] = sum(handling_times) / len(handling_times)
# Период работы
if start_time and end_time:
stats['time_period'] = {
'start': start_time,
'end': end_time,
'duration_hours': calculate_duration_hours(start_time, end_time)
}
# Дополнительные метрики
stats['success_rate'] = calculate_success_rate(stats)
stats['uptime_percentage'] = calculate_uptime_percentage(stats)
stats['schedule_success_rate'] = calculate_schedule_success_rate(stats)
return stats
def calculate_duration_hours(start_str, end_str):
"""Вычисляет продолжительность в часах"""
try:
fmt = '%Y-%m-%d %H:%M:%S'
start = datetime.strptime(start_str, fmt)
end = datetime.strptime(end_str, fmt)
return round((end - start).total_seconds() / 3600, 2)
except:
return 0
def calculate_success_rate(stats):
"""Рассчитывает процент успешных операций"""
total_operations = stats['performance']['handling_count'] + sum(stats['errors'].values())
if total_operations == 0:
return 0
success_rate = (stats['performance']['handling_count'] / total_operations) * 100
return round(success_rate, 2)
def calculate_uptime_percentage(stats):
"""Рассчитывает процент времени работы"""
if stats['time_period'].get('duration_hours', 0) == 0:
return 0
# Предполагаем, что каждый перезапуск занимает ~10 секунд
restart_downtime = stats['restarts'] * 10 / 3600
total_hours = stats['time_period']['duration_hours']
uptime_hours = total_hours - restart_downtime
uptime_percentage = (uptime_hours / total_hours) * 100
return round(uptime_percentage, 2)
def calculate_schedule_success_rate(stats):
"""Рассчитывает процент успешных проверок расписания"""
total_checks = stats['schedule_checks']
if total_checks == 0:
return 0
successful_checks = total_checks - stats['schedule_failures']
success_rate = (successful_checks / total_checks) * 100
return round(success_rate, 2)
def create_statistics_text(stats):
"""Создает текстовый отчет статистики"""
if 'error' in stats:
return f"❌ Ошибка анализа логов: {stats['error']}"
text = "📊 СТАТИСТИКА РАБОТЫ БОТА\n"
text += "=" * 40 + "\n\n"
# Основная информация
text += "📈 ОБЩАЯ ИНФОРМАЦИЯ:\n"
text += f"• Период: {stats['time_period'].get('start', 'N/A')} - {stats['time_period'].get('end', 'N/A')}\n"
text += f"• Длительность: {stats['time_period'].get('duration_hours', 0)} ч\n"
text += f"• Строк в логе: {stats['total_lines']:,}\n\n"
# Производительность
text += "⚡ ПРОИЗВОДИТЕЛЬНОСТЬ:\n"
text += f"• Среднее время: {stats['performance']['avg_handling_time']:.0f} мс\n"
text += f"• Сообщений обработано: {stats['performance']['handling_count']}\n"
text += f"• Успешных операций: {stats['success_rate']}%\n\n"
# Статус работы
text += "🔄 СТАТУС РАБОТЫ:\n"
text += f"• Перезапусков: {stats['restarts']}\n"
text += f"• Uptime: {stats['uptime_percentage']}%\n\n"
# Расписание
text += "📅 РАСПИСАНИЕ:\n"
text += f"• Проверок: {stats['schedule_checks']}\n"
text += f"• Изменений: {stats['schedule_changes']}\n"
text += f"• Ошибок: {stats['schedule_failures']}\n"
text += f"• Успешных проверок: {stats['schedule_success_rate']}%\n\n"
# Ошибки
text += "🚨 ОШИБКИ:\n"
text += f"• Всего ошибок: {sum(stats['errors'].values())}\n"
text += f"• Предупреждений: {sum(stats['warnings'].values())}\n"
text += f"• Сетевых: {stats['network_errors']}\n"
text += f"• Браузера: {stats['browser_errors']}\n\n"
# Пользователи
text += "👥 АКТИВНОСТЬ:\n"
text += f"• Команд: {sum(stats['user_commands'].values())}\n"
text += f"• Групп: {len(stats['groups'])}\n"
# Топ групп
if stats['groups']:
text += "• Топ групп:\n"
for group, count in stats['groups'].most_common(3):
text += f" - {group}: {count}\n"
return text
def create_statistics_file(stats):
"""Создает временный файл с полной статистикой"""
if 'error' in stats:
return None
# Создаем временный файл
with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8',
suffix='.json', delete=False) as f:
json.dump(stats, f, ensure_ascii=False, indent=2, default=str)
temp_filename = f.name
return temp_filename
+43
View File
@@ -0,0 +1,43 @@
from datetime import datetime
from functools import wraps
from aiogram import types
from models.state import BotState
from config import Config
def is_chat_spam(chat_id: int, state: BotState) -> bool:
"""Проверка на спам"""
if chat_id in Config.ADMINS:
return False
now = datetime.now()
if chat_id in state.last_chat_time:
delta = (now - state.last_chat_time[chat_id]).total_seconds()
if delta < Config.ANTISPAM_DELAY:
return True
state.last_chat_time[chat_id] = now
return False
from functools import wraps
from aiogram import types
def admin_required(need_level: int):
"""Декоратор для проверки прав администратора (0 = высший уровень)"""
def decorator(func):
@wraps(func)
async def wrapper(message: types.Message, *args, **kwargs):
user_id = message.from_user.id
user_level = Config.ADMINS.get(user_id, 9999) # 9999 = "нет прав"
# чем меньше число, тем выше права
if user_level > need_level:
await message.answer("⛔ У вас нет доступа к этой команде.")
return
return await func(message, *args, **kwargs)
return wrapper
return decorator
+27
View File
@@ -0,0 +1,27 @@
import asyncio
async def get_macbook_battery_level():
"""
Асинхронная функция для получения уровня заряда батареи на macOS.
Использует утилиту pmset.
"""
# Запускаем команду pmset -g batt
process = await asyncio.create_subprocess_exec(
"pmset", "-g", "batt",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise RuntimeError(f"Ошибка при выполнении pmset: {stderr.decode().strip()}")
output = stdout.decode().strip()
# Пример строки: " - InternalBattery-0 (id=1234567) 85%; charging; ..."
for part in output.splitlines():
if "%" in part:
percent_str = part.split("\t")[-1].split(";")[0].strip()
return int(percent_str.replace("%", ""))
raise ValueError("Не удалось определить уровень заряда")