Compare commits

..

2 Commits

8 changed files with 343 additions and 207 deletions
+2 -2
View File
@@ -20,10 +20,10 @@ def register_handlers(dp: Dispatcher, state: BotState, bot: Bot):
async def send_welcome(message: Message):
# Создаём инлайн-кнопку для открытия Web App
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Открыть мини-приложение", web_app=WebAppInfo(url="https://overfit-percussively-nicolas.ngrok-free.dev"))]
[InlineKeyboardButton(text="Открыть", web_app=WebAppInfo(url="https://mukhyil.duckdns.org/"))]
])
await message.answer(
f"Расписание на {get_day()} число месяца:",
f"Мой сайт для видео",
reply_markup=keyboard
)
+1 -1
View File
@@ -17,7 +17,7 @@ class TelegramBot:
# Регистрируем обработчики из разных модулей
admin.register_handlers(self.dp, self.state, self.bot)
# schedule.register_handlers(self.dp, self.state)
schedule.register_handlers(self.dp, self.state)
# media.register_handlers(self.dp, self.state, self.bot)
# common.register_handlers(self.dp, self.state, self.bot)
+6 -1
View File
@@ -33,7 +33,12 @@ class Config:
# Settings
ANTISPAM_DELAY = 20
WATCHER_BASE_DELAY = 30
WATCHER_INTERVAL_SEC = 600
WATCHER_RANDOM_DELAY_MIN = 1
WATCHER_RANDOM_DELAY_MAX = 120
SCHEDULE_DRIVE_FOLDER_ID = os.getenv(
"SCHEDULE_DRIVE_FOLDER_ID", "1WhUFHGkS4qC_e84KRArF4ooXHJr8mL5T"
)
# Пути
LOG_FILE = "storage/log/bot.log"
+1 -1
View File
@@ -36,7 +36,7 @@ def register_handlers(dp: Dispatcher, state: BotState):
schedule_service = ScheduleService()
text, url, day, month = await schedule_service.get_schedule(group, day_offset)
msg = await message.answer(text, parse_mode="Markdownv2")
msg = await message.answer(text, parse_mode="HTML")
save_message(msg.chat.id, msg.message_id)
@dp.message(Command("prasp"))
+2
View File
@@ -53,6 +53,8 @@
ply==3.11
propcache==0.3.2
pycparser==2.23
pymupdf==1.27.2.3
pypdf==6.11.0
pydantic==2.11.10
pydantic_core==2.33.2
pyee==13.0.0
+109
View File
@@ -0,0 +1,109 @@
from __future__ import annotations
import logging
import re
import ssl
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
import aiohttp
import certifi
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
DRIVE_FOLDER_EMBED = (
"https://drive.google.com/embeddedfolderview?id={folder_id}#list"
)
DRIVE_DOWNLOAD_URL = "https://drive.google.com/uc?export=download&id={file_id}"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
def _drive_connector() -> aiohttp.TCPConnector:
ssl_context = ssl.create_default_context(cafile=certifi.where())
return aiohttp.TCPConnector(ssl=ssl_context)
@dataclass(frozen=True)
class DriveScheduleFile:
file_id: str
name: str
schedule_date: datetime
class DriveScheduleSource:
def __init__(self, folder_id: str):
self.folder_id = folder_id
self._files_cache: Optional[List[DriveScheduleFile]] = None
@staticmethod
def _parse_filename_date(name: str) -> Optional[datetime]:
match = re.match(
r"^(\d{2})\.(\d{2})\.(\d{2})\s+по\s+учащимся\.pdf$",
name.strip(),
re.IGNORECASE,
)
if not match:
return None
day, month, year = map(int, match.groups())
return datetime(2000 + year, month, day)
async def list_student_schedules(self, force_refresh: bool = False) -> List[DriveScheduleFile]:
if self._files_cache is not None and not force_refresh:
return self._files_cache
url = DRIVE_FOLDER_EMBED.format(folder_id=self.folder_id)
async with aiohttp.ClientSession(
headers={"User-Agent": USER_AGENT},
connector=_drive_connector(),
) as session:
async with session.get(url) as resp:
resp.raise_for_status()
html = await resp.text()
soup = BeautifulSoup(html, "html.parser")
files: List[DriveScheduleFile] = []
for entry in soup.select("div.flip-entry"):
entry_id = entry.get("id", "")
if not entry_id.startswith("entry-"):
continue
file_id = entry_id.removeprefix("entry-")
title_el = entry.select_one(".flip-entry-title")
if not title_el:
continue
name = title_el.get_text(strip=True)
schedule_date = self._parse_filename_date(name)
if schedule_date is None:
continue
files.append(DriveScheduleFile(file_id=file_id, name=name, schedule_date=schedule_date))
files.sort(key=lambda item: item.schedule_date)
self._files_cache = files
return files
async def find_for_date(self, target: datetime) -> Optional[DriveScheduleFile]:
files = await self.list_student_schedules()
for item in reversed(files):
if (
item.schedule_date.day == target.day
and item.schedule_date.month == target.month
and item.schedule_date.year == target.year
):
return item
return None
async def download_pdf(self, file_id: str) -> bytes:
url = DRIVE_DOWNLOAD_URL.format(file_id=file_id)
async with aiohttp.ClientSession(
headers={"User-Agent": USER_AGENT},
connector=_drive_connector(),
) as session:
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.read()
+161 -146
View File
@@ -1,177 +1,192 @@
from datetime import datetime, timedelta
from typing import Optional, Tuple
from playwright.async_api import async_playwright
from __future__ import annotations
import io
import logging
import aiohttp
from bs4 import BeautifulSoup
import ssl
import certifi
import re
from datetime import datetime, timedelta
from html import escape
from typing import List, Optional, Tuple
import fitz
from pypdf import PdfReader
from config import Config
from services.drive_schedule_source import DriveScheduleSource
logger = logging.getLogger(__name__)
BOUNDARY = r"[^0-9A-Za-zА-Яа-яЁё]"
class ScheduleService:
def __init__(self):
self.base_url = (
"https://college.by/accounts/raspis/{mouth:02d}/{day:02d}-PODNAM.htm"
folder_id = getattr(Config, "SCHEDULE_DRIVE_FOLDER_ID", None) or (
"1WhUFHGkS4qC_e84KRArF4ooXHJr8mL5T"
)
self.drive = DriveScheduleSource(folder_id)
self._pdf_cache: dict[str, bytes] = {}
def _make_url(self, day: int = 0) -> Tuple[str, int, int]:
"""Генерация URL для расписания"""
d = datetime.now()
if day == 0:
if d.hour >= 12:
d += timedelta(days=1)
if d.weekday() == 6:
d += timedelta(days=1)
return self.base_url.format(day=d.day, mouth=d.month), d.day, d.month
def _resolve_target_date(self, day_offset: int = 0) -> datetime:
target = datetime.now()
if day_offset == 0:
if target.hour >= 12:
target += timedelta(days=1)
if target.weekday() == 6:
target += timedelta(days=1)
else:
return (
self.base_url.format(day=int(day), mouth=d.month),
int(day),
int(d.month),
target = target.replace(day=int(day_offset))
return target.replace(hour=0, minute=0, second=0, microsecond=0)
async def _load_pdf_for_date(
self, day_offset: int = 0
) -> Tuple[Optional[bytes], Optional[str], int, int]:
target = self._resolve_target_date(day_offset)
day, month = target.day, target.month
drive_file = await self.drive.find_for_date(target)
if not drive_file:
return None, None, day, month
if drive_file.file_id not in self._pdf_cache:
self._pdf_cache[drive_file.file_id] = await self.drive.download_pdf(
drive_file.file_id
)
import re
async def get_schedule(
self, group: str, day_offset: int = 0
) -> Tuple[str, str, int, int]:
"""Получение текста расписания (аналог Rust parse_schedule)"""
url, day, month = self._make_url(day_offset)
ssl_context = ssl.create_default_context(cafile=certifi.where())
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connector = aiohttp.TCPConnector(ssl=ssl_context)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
async with aiohttp.ClientSession(
connector=connector, headers=headers
) as session:
async with session.get(url) as resp:
raw_bytes = await resp.read()
decoded = raw_bytes.decode("cp1251", errors="ignore")
document = BeautifulSoup(decoded, "html.parser")
elements = document.select("p.MsoPlainText b")
found_group = False
schedule_lines = []
# регулярка: ищем точное совпадение группы как отдельного слова
group_pattern = re.compile(rf"\b{re.escape(group)}\b", re.IGNORECASE)
for el in elements:
text = el.get_text(strip=True)
if not found_group:
if group_pattern.search(text):
found_group = True
schedule_lines.append(text)
else:
if "-----" in text or "+----" in text:
break
schedule_lines.append(text)
if not schedule_lines:
result = f"Расписание для группы {group} на {day} число не найдено"
else:
result = f"📅 Расписание для {day} числа:\n```\n"
for line in schedule_lines:
formatted = line.replace("¦", "").replace(" ", " ").strip()
if formatted:
result += f"{formatted}\n"
result += "```"
return result, url, day, month
url = f"https://drive.google.com/file/d/{drive_file.file_id}/view"
return self._pdf_cache[drive_file.file_id], url, day, month
@staticmethod
def exact_group_regex(group: str) -> re.Pattern:
# ищем как отдельный токен: граница слева/справа или начало/конец
pattern = rf"(^|{BOUNDARY}){re.escape(group)}({BOUNDARY}|$)"
return re.compile(pattern)
return re.compile(pattern, re.IGNORECASE)
@staticmethod
def _extract_pdf_lines(pdf_bytes: bytes) -> List[str]:
reader = PdfReader(io.BytesIO(pdf_bytes))
lines: List[str] = []
for page in reader.pages:
text = page.extract_text() or ""
for raw_line in text.splitlines():
line = raw_line.strip()
if line:
lines.append(line)
return lines
@staticmethod
def _parse_group_lines(lines: List[str], group: str) -> List[str]:
regex = ScheduleService.exact_group_regex(group)
schedule_lines: List[str] = []
found_group = False
for line in lines:
if not found_group:
if regex.search(line):
found_group = True
schedule_lines.append(line)
else:
if "-----" in line or "+----" in line:
break
schedule_lines.append(line)
return schedule_lines
@staticmethod
def is_schedule_missing(text: str) -> bool:
lowered = text.lower()
return "не найдено" in lowered or "не опубликовано" in lowered
@staticmethod
def _format_schedule_html(day: int, schedule_lines: List[str]) -> str:
body_lines = []
for line in schedule_lines:
formatted = line.replace("¦", "").replace(" ", " ").strip()
if formatted:
body_lines.append(formatted)
body = escape("\n".join(body_lines))
return f"📅 Расписание для {day} числа:\n<pre>{body}</pre>"
async def is_published_for(self, day_offset: int = 0) -> bool:
target = self._resolve_target_date(day_offset)
return await self.drive.find_for_date(target) is not None
async def get_schedule(
self, group: str, day_offset: int = 0
) -> Tuple[str, str, int, int]:
pdf_bytes, url, day, month = await self._load_pdf_for_date(day_offset)
folder_url = "https://drive.google.com/drive/folders/" + self.drive.folder_id
if not pdf_bytes:
result = (
f"⚠️ Расписание на {day:02d}.{month:02d} ещё не опубликовано "
f"в <a href=\"{folder_url}\">Google Drive</a>"
)
return result, folder_url, day, month
schedule_lines = self._parse_group_lines(
self._extract_pdf_lines(pdf_bytes), group
)
if not schedule_lines:
result = f"⚠️ Расписание для группы {escape(group)} на {day} число не найдено"
else:
result = self._format_schedule_html(day, schedule_lines)
return result, url or folder_url, day, month
async def get_pschedule(
self, group: str, day_offset: int = 0
self, group: str, day_offset: int = 0
) -> Tuple[Optional[bytes], str, int, int]:
url, day, month = self._make_url(day_offset)
pdf_bytes, url, day, month = await self._load_pdf_for_date(day_offset)
fallback_url = (
url
or "https://drive.google.com/drive/folders/" + self.drive.folder_id
)
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(viewport={"width": 400, "height": 3000})
page = await context.new_page()
if not pdf_bytes:
return None, fallback_url, day, month
try:
response = await page.goto(url, wait_until="networkidle", timeout=30000)
if not response or response.status != 200:
logger.warning(f"Ошибка загрузки страницы: {url}")
return None, url, day, month
try:
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
regex = self.exact_group_regex(group)
# 1) сначала пытаемся по более точному селектору (как в HTML-парсере)
candidates = page.locator("p.MsoPlainText b")
count = await candidates.count()
for page in doc:
line_items = []
page_dict = page.get_text("dict")
for block in page_dict.get("blocks", []):
if block.get("type") != 0:
continue
for line in block.get("lines", []):
text = "".join(span["text"] for span in line["spans"]).strip()
if text:
line_items.append((text, fitz.Rect(line["bbox"])))
regex = self.exact_group_regex(group)
target_handle = None
for i in range(count):
el = candidates.nth(i)
text = (await el.inner_text()).strip()
if regex.search(text):
# нашли b с нужной группой — возьмём родительский p для удобного скрина
parent_p = await el.locator("xpath=ancestor::p[1]").element_handle()
target_handle = parent_p or await el.element_handle()
break
# 2) если не нашли в p.MsoPlainText b, попробуем просто p b или p
if not target_handle:
candidates = page.locator("p b")
count = await candidates.count()
for i in range(count):
el = candidates.nth(i)
text = (await el.inner_text()).strip()
found_group = False
rects: List[fitz.Rect] = []
for text, bbox in line_items:
if not found_group:
if regex.search(text):
parent_p = await el.locator("xpath=ancestor::p[1]").element_handle()
target_handle = parent_p or await el.element_handle()
found_group = True
rects.append(bbox)
else:
if "-----" in text or "+----" in text:
break
rects.append(bbox)
if not target_handle:
# последний шанс: любые <p>
candidates = page.locator("p")
count = await candidates.count()
for i in range(count):
el = candidates.nth(i)
text = (await el.inner_text()).strip()
if regex.search(text):
target_handle = await el.element_handle()
break
if not rects:
continue
if target_handle:
# скроллим и получаем box
await target_handle.scroll_into_view_if_needed()
box = await target_handle.bounding_box()
if box:
clip_rect = {
"x": float(max(box["x"], 0)),
"y": float(max(box["y"], 0)),
"width": float(box["width"] + 150),
"height": float(box["height"] + 100),
}
img = await page.screenshot(clip=clip_rect)
return img, url, day, month
clip = rects[0]
for rect in rects[1:]:
clip |= rect
clip.x0 = max(0, clip.x0 - 10)
clip.x1 = min(page.rect.width, clip.x1 + 150)
clip.y0 = max(0, clip.y0 - 5)
clip.y1 = min(page.rect.height, clip.y1 + 10)
except Exception as e:
logger.error(f"Ошибка при получении расписания: {e}")
finally:
await context.close()
await browser.close()
pixmap = page.get_pixmap(clip=clip, matrix=fitz.Matrix(2, 2))
return pixmap.tobytes("png"), fallback_url, day, month
return None, url, day, month
except Exception as e:
logger.error(f"Ошибка при получении расписания из PDF: {e}")
return None, fallback_url, day, month
+61 -56
View File
@@ -1,12 +1,12 @@
import asyncio
from datetime import datetime, timedelta
from random import randint
from aiogram import Bot, types
from models.state import BotState
from config import Config
from services.schedule_service import ScheduleService
from logging import getLogger
from aiogram import Bot, types
from config import Config
from logging import getLogger
from models.state import BotState
from services.schedule_service import ScheduleService
logger = getLogger(__name__)
@@ -40,21 +40,24 @@ class WatcherService:
pass
logger.info("Watcher остановлен")
@staticmethod
def _next_delay() -> int:
return Config.WATCHER_INTERVAL_SEC + randint(
Config.WATCHER_RANDOM_DELAY_MIN,
Config.WATCHER_RANDOM_DELAY_MAX,
)
async def _watcher_loop(self):
"""Основной цикл слежки"""
"""Основной цикл слежки за появлением PDF на Google Drive."""
while self.state.watcher_work:
try:
find = await self._check_all_groups()
if find:
# ничего не нашли → ждём
delay = randint(
Config.WATCHER_BASE_DELAY, Config.WATCHER_BASE_DELAY + 30
)
logger.info(f"Следующая проверка через {delay}")
nothing_found = await self._check_all_groups()
if nothing_found:
delay = self._next_delay()
logger.info(f"PDF/расписание не найдено, следующая проверка через {delay} с")
await asyncio.sleep(delay)
else:
# нашли → останавливаемся
logger.info("Расписание найдено, останавливаем watcher")
logger.info("Расписание найдено и отправлено, останавливаем watcher")
self.state.watcher_work = False
break
except asyncio.CancelledError:
@@ -63,61 +66,63 @@ class WatcherService:
logger.error(f"Ошибка в watcher_loop: {e}")
await asyncio.sleep(60)
@staticmethod
def _get_target_day() -> datetime:
"""Получение целевого дня"""
now = datetime.now()
target = now + timedelta(days=1)
if target.weekday() == 6:
target += timedelta(days=1)
return target
async def _check_all_groups(self) -> bool:
"""
Возвращает True, если НИ в одной группе не найдено расписание.
Возвращает False, если хотя бы в одной группе найдено расписание.
Возвращает True, если расписание ещё недоступно ни для одной группы.
Возвращает False, если хотя бы одной группе отправили расписание.
"""
day = self._get_target_day()
found_any = False
target = self.schedule_service._resolve_target_date(0)
logger.info(
f"Проверяем Google Drive на расписание за {target.strftime('%d.%m.%Y')}"
)
if not await self.schedule_service.is_published_for(0):
return True
found_any = False
for group, chat_id in Config.GROUP_CHATS.items():
logger.info(
f"Проверяем расписание для {group} на {day.strftime('%d.%m.%Y')}"
f"Проверяем расписание для {group} на {target.strftime('%d.%m.%Y')}"
)
found = await self._check_group_schedule(group, chat_id, day.day)
if found:
if await self._check_group_schedule(group, chat_id):
found_any = True
return not found_any # <-- вот так правильно
return not found_any
async def _check_group_schedule(self, group: str, chat_id: int, day: int) -> bool:
async def _check_group_schedule(self, group: str, chat_id: int) -> bool:
text, url, data_day, data_month = await self.schedule_service.get_schedule(
group, day
group, 0
)
if text and "не найдено" not in text.lower():
if not self.schedule_service.is_schedule_missing(text):
msg = await self.bot.send_message(
chat_id,
f"Авто-расписание для {group} на {data_day:02d}.{data_month:02d}\n\n{text}",
parse_mode="Markdown",
(
f"🔔 Авто-расписание для {group} "
f"на {data_day:02d}.{data_month:02d}\n\n{text}"
),
parse_mode="HTML",
)
await self.bot.pin_chat_message(
chat_id, msg.message_id, disable_notification=False
try:
await self.bot.pin_chat_message(
chat_id, msg.message_id, disable_notification=False
)
except Exception as e:
logger.warning(f"Не удалось закрепить сообщение в {chat_id}: {e}")
return True
png, url, data_day, data_month = await self.schedule_service.get_pschedule(
group, 0
)
if png:
await self.bot.send_photo(
chat_id,
types.BufferedInputFile(png, filename=f"{group}.png"),
caption=(
f"🔔 АВАРИЙНЫЙ РЕЖИМ\n\n"
f"Авто-расписание для {group} "
f"на {data_day:02d}.{data_month:02d}"
),
)
return True
else:
png, url, data_day, data_month = await self.schedule_service.get_pschedule(
group, day
)
if png:
await self.bot.send_photo(
chat_id,
types.BufferedInputFile(png, filename=f"{group}.png"),
caption=f"АВАРИЙНЫЙ РЕЖИМ\n\nАвто-расписание для {group} на {data_day:02d}.{data_month:02d}\n\nНайдено с ошибкой",
)
return True
return False
# clip_hash = hashlib.md5(clip_png).hexdigest()
# Логика проверки изменений и отправки сообщений
# ... (ваша существующая логика)