I add command /prasp
It's version 0.2.2
This commit is contained in:
@@ -24,7 +24,6 @@ class TelegramBot:
|
||||
#add addons
|
||||
self.addons.load("example_addon")
|
||||
self.addons.load("id")
|
||||
self.addons.load("dowloadmp4_to_youtube")
|
||||
|
||||
async def start(self):
|
||||
"""Запуск бота"""
|
||||
|
||||
+19
-1
@@ -19,7 +19,25 @@ def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
|
||||
day_offset = int(args[2]) if len(args) > 2 and args[2].isdigit() else 0
|
||||
|
||||
schedule_service = ScheduleService()
|
||||
clip_png, url, day, mouth = await schedule_service.get_schedule(group, day_offset)
|
||||
text, url, day, month = await schedule_service.get_schedule(group, day_offset)
|
||||
# Отправляем текст расписания
|
||||
msg = await message.answer(text, parse_mode="Markdown")
|
||||
|
||||
save_message(message.chat.id, msg.message_id)
|
||||
|
||||
@dp.message(Command("prasp"))
|
||||
async def send_schedule(message: types.Message):
|
||||
"""Отправка расписания"""
|
||||
if is_chat_spam(message.chat.id, state):
|
||||
await message.answer("НЕ СПАМЬТЕ!!!")
|
||||
return
|
||||
|
||||
args = message.text.split(maxsplit=2)
|
||||
group = args[1].strip() if len(args) > 1 else "30тс"
|
||||
day_offset = int(args[2]) if len(args) > 2 and args[2].isdigit() else 0
|
||||
|
||||
schedule_service = ScheduleService()
|
||||
clip_png, url, day, mouth = await schedule_service.get_pschedule(group, day_offset)
|
||||
|
||||
if clip_png:
|
||||
save_message(message.chat.id, message.message_id)
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
import hashlib
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Tuple
|
||||
from playwright.async_api import async_playwright, ViewportSize, FloatRect
|
||||
import logging
|
||||
import aiohttp
|
||||
from bs4 import BeautifulSoup
|
||||
import ssl
|
||||
import certifi
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -23,7 +26,63 @@ class ScheduleService:
|
||||
else:
|
||||
return self.base_url.format(day=int(day), mouth=d.month), int(day), int(d.month)
|
||||
|
||||
async def get_schedule(self, group: str, day_offset: int = 0) -> Tuple[Optional[bytes], str, int, int]:
|
||||
async def get_schedule(
|
||||
self, group: str, day_offset: int = 0
|
||||
) -> Tuple[str, str, int, int]:
|
||||
"""Получение текста расписания (аналог Rust parse_schedule)"""
|
||||
url, day, month = self._make_url(day_offset)
|
||||
|
||||
ssl_context = ssl.create_default_context(cafile=certifi.where())
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
connector = aiohttp.TCPConnector(ssl=ssl_context)
|
||||
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36'
|
||||
}
|
||||
|
||||
# тут можно использовать aiohttp + chardet/charset_normalizer
|
||||
async with aiohttp.ClientSession(connector=connector, headers=headers) as session:
|
||||
async with session.get(url) as resp:
|
||||
raw_bytes = await resp.read()
|
||||
|
||||
decoded = raw_bytes.decode("cp1251", errors="ignore")
|
||||
document = BeautifulSoup(decoded, "html.parser")
|
||||
|
||||
# ищем <p class="MsoPlainText"><b>...</b>
|
||||
elements = document.select("p.MsoPlainText b")
|
||||
|
||||
found_group = False
|
||||
schedule_lines = []
|
||||
for el in elements:
|
||||
text = el.get_text(strip=True)
|
||||
if not found_group:
|
||||
if group in text:
|
||||
found_group = True
|
||||
schedule_lines.append(text)
|
||||
else:
|
||||
if "-----" in text or "+----" in text:
|
||||
break
|
||||
schedule_lines.append(text)
|
||||
|
||||
if not schedule_lines:
|
||||
result = f"Расписание для группы {group} на {day} число не найдено"
|
||||
else:
|
||||
result = f"📅 Расписание для {day} числа:\n```\n"
|
||||
for line in schedule_lines:
|
||||
formatted = (
|
||||
line.replace("¦", "│")
|
||||
.replace(" ", " ")
|
||||
.strip()
|
||||
)
|
||||
if formatted:
|
||||
result += f"{formatted}\n"
|
||||
result += "```"
|
||||
|
||||
return result, url, day, month
|
||||
|
||||
async def get_pschedule(self, group: str, day_offset: int = 0) -> Tuple[Optional[bytes], str, int, int]:
|
||||
"""Получение скриншота расписания"""
|
||||
url, day, month = self._make_url(day_offset)
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ import asyncio
|
||||
from datetime import datetime, timedelta
|
||||
from random import randint
|
||||
from aiogram import Bot
|
||||
from aiogram.types import BufferedInputFile
|
||||
from models.state import BotState
|
||||
from config import Config
|
||||
from services.schedule_service import ScheduleService
|
||||
@@ -74,21 +73,21 @@ class WatcherService:
|
||||
|
||||
async def _check_group_schedule(self, group: str, chat_id: int, day: int):
|
||||
"""Проверка расписания для конкретной группы"""
|
||||
clip_png, url, data_day, data_mouth = await self.schedule_service.get_schedule(group, day)
|
||||
if clip_png:
|
||||
msg = await self.bot.send_photo(
|
||||
text, url, data_day, data_month = await self.schedule_service.get_schedule(group, day)
|
||||
|
||||
if text and "не найдено" not in text.lower():
|
||||
msg = await self.bot.send_message(
|
||||
chat_id,
|
||||
BufferedInputFile(clip_png, filename=f"{group}.png"),
|
||||
caption=f"Авто-расписание для {group} на {data_day:02d}.{data_mouth:02d}"
|
||||
f"Авто-расписание для {group} на {data_day:02d}.{data_month:02d}\n\n{text}",
|
||||
parse_mode="Markdown"
|
||||
)
|
||||
await self.bot.pin_chat_message(chat_id, msg.message_id, disable_notification=True)
|
||||
|
||||
else:
|
||||
logger.warning(f"Не удалось получить расписание для {group}, {data_day}, {data_mouth}, {url}")
|
||||
logger.warning(
|
||||
f"Не удалось получить расписание для {group}, {data_day}, {data_month}, {url}"
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
|
||||
#clip_hash = hashlib.md5(clip_png).hexdigest()
|
||||
|
||||
# Логика проверки изменений и отправки сообщений
|
||||
|
||||
Reference in New Issue
Block a user