diff --git a/addons/miniapps/handlers.py b/addons/miniapps/handlers.py index aeba302..349c36a 100644 --- a/addons/miniapps/handlers.py +++ b/addons/miniapps/handlers.py @@ -20,10 +20,10 @@ def register_handlers(dp: Dispatcher, state: BotState, bot: Bot): async def send_welcome(message: Message): # Создаём инлайн-кнопку для открытия Web App keyboard = InlineKeyboardMarkup(inline_keyboard=[ - [InlineKeyboardButton(text="Открыть мини-приложение", web_app=WebAppInfo(url="https://overfit-percussively-nicolas.ngrok-free.dev"))] + [InlineKeyboardButton(text="Открыть", web_app=WebAppInfo(url="https://mukhyil.duckdns.org/"))] ]) await message.answer( - f"Расписание на {get_day()} число месяца:", + f"Мой сайт для видео", reply_markup=keyboard ) \ No newline at end of file diff --git a/bot/core.py b/bot/core.py index 496cc9e..5e5082c 100644 --- a/bot/core.py +++ b/bot/core.py @@ -17,7 +17,7 @@ class TelegramBot: # Регистрируем обработчики из разных модулей admin.register_handlers(self.dp, self.state, self.bot) - # schedule.register_handlers(self.dp, self.state) + schedule.register_handlers(self.dp, self.state) # media.register_handlers(self.dp, self.state, self.bot) # common.register_handlers(self.dp, self.state, self.bot) diff --git a/config.py b/config.py index 337bde6..e91a597 100644 --- a/config.py +++ b/config.py @@ -33,7 +33,12 @@ class Config: # Settings ANTISPAM_DELAY = 20 - WATCHER_BASE_DELAY = 30 + WATCHER_INTERVAL_SEC = 600 + WATCHER_RANDOM_DELAY_MIN = 1 + WATCHER_RANDOM_DELAY_MAX = 120 + SCHEDULE_DRIVE_FOLDER_ID = os.getenv( + "SCHEDULE_DRIVE_FOLDER_ID", "1WhUFHGkS4qC_e84KRArF4ooXHJr8mL5T" + ) # Пути LOG_FILE = "storage/log/bot.log" diff --git a/handlers/schedule.py b/handlers/schedule.py index 339949a..33783d8 100644 --- a/handlers/schedule.py +++ b/handlers/schedule.py @@ -36,7 +36,7 @@ def register_handlers(dp: Dispatcher, state: BotState): schedule_service = ScheduleService() text, url, day, month = await schedule_service.get_schedule(group, day_offset) - msg = await message.answer(text, parse_mode="Markdownv2") + msg = await message.answer(text, parse_mode="HTML") save_message(msg.chat.id, msg.message_id) @dp.message(Command("prasp")) diff --git a/requirements.txt b/requirements.txt index 5427acd..2ea3ab5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -53,6 +53,8 @@ ply==3.11 propcache==0.3.2 pycparser==2.23 + pymupdf==1.27.2.3 + pypdf==6.11.0 pydantic==2.11.10 pydantic_core==2.33.2 pyee==13.0.0 diff --git a/services/drive_schedule_source.py b/services/drive_schedule_source.py new file mode 100644 index 0000000..cc78e28 --- /dev/null +++ b/services/drive_schedule_source.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import logging +import re +import ssl +from dataclasses import dataclass +from datetime import datetime +from typing import List, Optional + +import aiohttp +import certifi +from bs4 import BeautifulSoup + +logger = logging.getLogger(__name__) + +DRIVE_FOLDER_EMBED = ( + "https://drive.google.com/embeddedfolderview?id={folder_id}#list" +) +DRIVE_DOWNLOAD_URL = "https://drive.google.com/uc?export=download&id={file_id}" +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" +) + + +def _drive_connector() -> aiohttp.TCPConnector: + ssl_context = ssl.create_default_context(cafile=certifi.where()) + return aiohttp.TCPConnector(ssl=ssl_context) + + +@dataclass(frozen=True) +class DriveScheduleFile: + file_id: str + name: str + schedule_date: datetime + + +class DriveScheduleSource: + def __init__(self, folder_id: str): + self.folder_id = folder_id + self._files_cache: Optional[List[DriveScheduleFile]] = None + + @staticmethod + def _parse_filename_date(name: str) -> Optional[datetime]: + match = re.match( + r"^(\d{2})\.(\d{2})\.(\d{2})\s+по\s+учащимся\.pdf$", + name.strip(), + re.IGNORECASE, + ) + if not match: + return None + day, month, year = map(int, match.groups()) + return datetime(2000 + year, month, day) + + async def list_student_schedules(self, force_refresh: bool = False) -> List[DriveScheduleFile]: + if self._files_cache is not None and not force_refresh: + return self._files_cache + + url = DRIVE_FOLDER_EMBED.format(folder_id=self.folder_id) + + async with aiohttp.ClientSession( + headers={"User-Agent": USER_AGENT}, + connector=_drive_connector(), + ) as session: + async with session.get(url) as resp: + resp.raise_for_status() + html = await resp.text() + + soup = BeautifulSoup(html, "html.parser") + files: List[DriveScheduleFile] = [] + + for entry in soup.select("div.flip-entry"): + entry_id = entry.get("id", "") + if not entry_id.startswith("entry-"): + continue + file_id = entry_id.removeprefix("entry-") + title_el = entry.select_one(".flip-entry-title") + if not title_el: + continue + name = title_el.get_text(strip=True) + schedule_date = self._parse_filename_date(name) + if schedule_date is None: + continue + files.append(DriveScheduleFile(file_id=file_id, name=name, schedule_date=schedule_date)) + + files.sort(key=lambda item: item.schedule_date) + self._files_cache = files + return files + + async def find_for_date(self, target: datetime) -> Optional[DriveScheduleFile]: + files = await self.list_student_schedules() + for item in reversed(files): + if ( + item.schedule_date.day == target.day + and item.schedule_date.month == target.month + and item.schedule_date.year == target.year + ): + return item + return None + + async def download_pdf(self, file_id: str) -> bytes: + url = DRIVE_DOWNLOAD_URL.format(file_id=file_id) + async with aiohttp.ClientSession( + headers={"User-Agent": USER_AGENT}, + connector=_drive_connector(), + ) as session: + async with session.get(url) as resp: + resp.raise_for_status() + return await resp.read() diff --git a/services/schedule_service.py b/services/schedule_service.py index f386c64..531ebc1 100644 --- a/services/schedule_service.py +++ b/services/schedule_service.py @@ -1,177 +1,192 @@ -from datetime import datetime, timedelta -from typing import Optional, Tuple -from playwright.async_api import async_playwright +from __future__ import annotations + +import io import logging -import aiohttp -from bs4 import BeautifulSoup -import ssl -import certifi import re +from datetime import datetime, timedelta +from html import escape +from typing import List, Optional, Tuple + +import fitz +from pypdf import PdfReader + +from config import Config +from services.drive_schedule_source import DriveScheduleSource logger = logging.getLogger(__name__) BOUNDARY = r"[^0-9A-Za-zА-Яа-яЁё]" + class ScheduleService: def __init__(self): - self.base_url = ( - "https://college.by/accounts/raspis/{mouth:02d}/{day:02d}-PODNAM.htm" + folder_id = getattr(Config, "SCHEDULE_DRIVE_FOLDER_ID", None) or ( + "1WhUFHGkS4qC_e84KRArF4ooXHJr8mL5T" ) + self.drive = DriveScheduleSource(folder_id) + self._pdf_cache: dict[str, bytes] = {} - def _make_url(self, day: int = 0) -> Tuple[str, int, int]: - """Генерация URL для расписания""" - d = datetime.now() - if day == 0: - if d.hour >= 12: - d += timedelta(days=1) - if d.weekday() == 6: - d += timedelta(days=1) - return self.base_url.format(day=d.day, mouth=d.month), d.day, d.month + def _resolve_target_date(self, day_offset: int = 0) -> datetime: + target = datetime.now() + if day_offset == 0: + if target.hour >= 12: + target += timedelta(days=1) + if target.weekday() == 6: + target += timedelta(days=1) else: - return ( - self.base_url.format(day=int(day), mouth=d.month), - int(day), - int(d.month), + target = target.replace(day=int(day_offset)) + return target.replace(hour=0, minute=0, second=0, microsecond=0) + + async def _load_pdf_for_date( + self, day_offset: int = 0 + ) -> Tuple[Optional[bytes], Optional[str], int, int]: + target = self._resolve_target_date(day_offset) + day, month = target.day, target.month + + drive_file = await self.drive.find_for_date(target) + if not drive_file: + return None, None, day, month + + if drive_file.file_id not in self._pdf_cache: + self._pdf_cache[drive_file.file_id] = await self.drive.download_pdf( + drive_file.file_id ) - import re - - async def get_schedule( - self, group: str, day_offset: int = 0 - ) -> Tuple[str, str, int, int]: - """Получение текста расписания (аналог Rust parse_schedule)""" - url, day, month = self._make_url(day_offset) - - ssl_context = ssl.create_default_context(cafile=certifi.where()) - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE - - connector = aiohttp.TCPConnector(ssl=ssl_context) - - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36" - } - - async with aiohttp.ClientSession( - connector=connector, headers=headers - ) as session: - async with session.get(url) as resp: - raw_bytes = await resp.read() - - decoded = raw_bytes.decode("cp1251", errors="ignore") - document = BeautifulSoup(decoded, "html.parser") - - elements = document.select("p.MsoPlainText b") - - found_group = False - schedule_lines = [] - - # регулярка: ищем точное совпадение группы как отдельного слова - group_pattern = re.compile(rf"\b{re.escape(group)}\b", re.IGNORECASE) - - for el in elements: - text = el.get_text(strip=True) - if not found_group: - if group_pattern.search(text): - found_group = True - schedule_lines.append(text) - else: - if "-----" in text or "+----" in text: - break - schedule_lines.append(text) - - if not schedule_lines: - result = f"Расписание для группы {group} на {day} число не найдено" - else: - result = f"📅 Расписание для {day} числа:\n```\n" - for line in schedule_lines: - formatted = line.replace("¦", "│").replace(" ", " ").strip() - if formatted: - result += f"{formatted}\n" - result += "```" - - return result, url, day, month - + url = f"https://drive.google.com/file/d/{drive_file.file_id}/view" + return self._pdf_cache[drive_file.file_id], url, day, month @staticmethod def exact_group_regex(group: str) -> re.Pattern: - # ищем как отдельный токен: граница слева/справа или начало/конец pattern = rf"(^|{BOUNDARY}){re.escape(group)}({BOUNDARY}|$)" - return re.compile(pattern) + return re.compile(pattern, re.IGNORECASE) + + @staticmethod + def _extract_pdf_lines(pdf_bytes: bytes) -> List[str]: + reader = PdfReader(io.BytesIO(pdf_bytes)) + lines: List[str] = [] + for page in reader.pages: + text = page.extract_text() or "" + for raw_line in text.splitlines(): + line = raw_line.strip() + if line: + lines.append(line) + return lines + + @staticmethod + def _parse_group_lines(lines: List[str], group: str) -> List[str]: + regex = ScheduleService.exact_group_regex(group) + schedule_lines: List[str] = [] + found_group = False + + for line in lines: + if not found_group: + if regex.search(line): + found_group = True + schedule_lines.append(line) + else: + if "-----" in line or "+----" in line: + break + schedule_lines.append(line) + + return schedule_lines + + @staticmethod + def is_schedule_missing(text: str) -> bool: + lowered = text.lower() + return "не найдено" in lowered or "не опубликовано" in lowered + + @staticmethod + def _format_schedule_html(day: int, schedule_lines: List[str]) -> str: + body_lines = [] + for line in schedule_lines: + formatted = line.replace("¦", "│").replace(" ", " ").strip() + if formatted: + body_lines.append(formatted) + body = escape("\n".join(body_lines)) + return f"📅 Расписание для {day} числа:\n
{body}"
+
+ async def is_published_for(self, day_offset: int = 0) -> bool:
+ target = self._resolve_target_date(day_offset)
+ return await self.drive.find_for_date(target) is not None
+
+ async def get_schedule(
+ self, group: str, day_offset: int = 0
+ ) -> Tuple[str, str, int, int]:
+ pdf_bytes, url, day, month = await self._load_pdf_for_date(day_offset)
+
+ folder_url = "https://drive.google.com/drive/folders/" + self.drive.folder_id
+
+ if not pdf_bytes:
+ result = (
+ f"⚠️ Расписание на {day:02d}.{month:02d} ещё не опубликовано "
+ f"в Google Drive"
+ )
+ return result, folder_url, day, month
+
+ schedule_lines = self._parse_group_lines(
+ self._extract_pdf_lines(pdf_bytes), group
+ )
+
+ if not schedule_lines:
+ result = f"⚠️ Расписание для группы {escape(group)} на {day} число не найдено"
+ else:
+ result = self._format_schedule_html(day, schedule_lines)
+
+ return result, url or folder_url, day, month
async def get_pschedule(
- self, group: str, day_offset: int = 0
+ self, group: str, day_offset: int = 0
) -> Tuple[Optional[bytes], str, int, int]:
- url, day, month = self._make_url(day_offset)
+ pdf_bytes, url, day, month = await self._load_pdf_for_date(day_offset)
+ fallback_url = (
+ url
+ or "https://drive.google.com/drive/folders/" + self.drive.folder_id
+ )
- async with async_playwright() as p:
- browser = await p.chromium.launch(headless=True)
- context = await browser.new_context(viewport={"width": 400, "height": 3000})
- page = await context.new_page()
+ if not pdf_bytes:
+ return None, fallback_url, day, month
- try:
- response = await page.goto(url, wait_until="networkidle", timeout=30000)
- if not response or response.status != 200:
- logger.warning(f"Ошибка загрузки страницы: {url}")
- return None, url, day, month
+ try:
+ doc = fitz.open(stream=pdf_bytes, filetype="pdf")
+ regex = self.exact_group_regex(group)
- # 1) сначала пытаемся по более точному селектору (как в HTML-парсере)
- candidates = page.locator("p.MsoPlainText b")
- count = await candidates.count()
+ for page in doc:
+ line_items = []
+ page_dict = page.get_text("dict")
+ for block in page_dict.get("blocks", []):
+ if block.get("type") != 0:
+ continue
+ for line in block.get("lines", []):
+ text = "".join(span["text"] for span in line["spans"]).strip()
+ if text:
+ line_items.append((text, fitz.Rect(line["bbox"])))
- regex = self.exact_group_regex(group)
- target_handle = None
-
- for i in range(count):
- el = candidates.nth(i)
- text = (await el.inner_text()).strip()
- if regex.search(text):
- # нашли b с нужной группой — возьмём родительский p для удобного скрина
- parent_p = await el.locator("xpath=ancestor::p[1]").element_handle()
- target_handle = parent_p or await el.element_handle()
- break
-
- # 2) если не нашли в p.MsoPlainText b, попробуем просто p b или p
- if not target_handle:
- candidates = page.locator("p b")
- count = await candidates.count()
- for i in range(count):
- el = candidates.nth(i)
- text = (await el.inner_text()).strip()
+ found_group = False
+ rects: List[fitz.Rect] = []
+ for text, bbox in line_items:
+ if not found_group:
if regex.search(text):
- parent_p = await el.locator("xpath=ancestor::p[1]").element_handle()
- target_handle = parent_p or await el.element_handle()
+ found_group = True
+ rects.append(bbox)
+ else:
+ if "-----" in text or "+----" in text:
break
+ rects.append(bbox)
- if not target_handle:
- # последний шанс: любые - candidates = page.locator("p") - count = await candidates.count() - for i in range(count): - el = candidates.nth(i) - text = (await el.inner_text()).strip() - if regex.search(text): - target_handle = await el.element_handle() - break + if not rects: + continue - if target_handle: - # скроллим и получаем box - await target_handle.scroll_into_view_if_needed() - box = await target_handle.bounding_box() - if box: - clip_rect = { - "x": float(max(box["x"], 0)), - "y": float(max(box["y"], 0)), - "width": float(box["width"] + 150), - "height": float(box["height"] + 100), - } - img = await page.screenshot(clip=clip_rect) - return img, url, day, month + clip = rects[0] + for rect in rects[1:]: + clip |= rect + clip.x0 = max(0, clip.x0 - 10) + clip.x1 = min(page.rect.width, clip.x1 + 150) + clip.y0 = max(0, clip.y0 - 5) + clip.y1 = min(page.rect.height, clip.y1 + 10) - except Exception as e: - logger.error(f"Ошибка при получении расписания: {e}") - finally: - await context.close() - await browser.close() + pixmap = page.get_pixmap(clip=clip, matrix=fitz.Matrix(2, 2)) + return pixmap.tobytes("png"), fallback_url, day, month - return None, url, day, month + except Exception as e: + logger.error(f"Ошибка при получении расписания из PDF: {e}") + return None, fallback_url, day, month diff --git a/services/watcher_service.py b/services/watcher_service.py index 00044f6..c2df54c 100644 --- a/services/watcher_service.py +++ b/services/watcher_service.py @@ -1,12 +1,12 @@ import asyncio -from datetime import datetime, timedelta from random import randint -from aiogram import Bot, types -from models.state import BotState -from config import Config -from services.schedule_service import ScheduleService -from logging import getLogger +from aiogram import Bot, types + +from config import Config +from logging import getLogger +from models.state import BotState +from services.schedule_service import ScheduleService logger = getLogger(__name__) @@ -40,21 +40,24 @@ class WatcherService: pass logger.info("Watcher остановлен") + @staticmethod + def _next_delay() -> int: + return Config.WATCHER_INTERVAL_SEC + randint( + Config.WATCHER_RANDOM_DELAY_MIN, + Config.WATCHER_RANDOM_DELAY_MAX, + ) + async def _watcher_loop(self): - """Основной цикл слежки""" + """Основной цикл слежки за появлением PDF на Google Drive.""" while self.state.watcher_work: try: - find = await self._check_all_groups() - if find: - # ничего не нашли → ждём - delay = randint( - Config.WATCHER_BASE_DELAY, Config.WATCHER_BASE_DELAY + 30 - ) - logger.info(f"Следующая проверка через {delay}") + nothing_found = await self._check_all_groups() + if nothing_found: + delay = self._next_delay() + logger.info(f"PDF/расписание не найдено, следующая проверка через {delay} с") await asyncio.sleep(delay) else: - # нашли → останавливаемся - logger.info("Расписание найдено, останавливаем watcher") + logger.info("Расписание найдено и отправлено, останавливаем watcher") self.state.watcher_work = False break except asyncio.CancelledError: @@ -63,61 +66,63 @@ class WatcherService: logger.error(f"Ошибка в watcher_loop: {e}") await asyncio.sleep(60) - @staticmethod - def _get_target_day() -> datetime: - """Получение целевого дня""" - now = datetime.now() - target = now + timedelta(days=1) - if target.weekday() == 6: - target += timedelta(days=1) - return target - async def _check_all_groups(self) -> bool: """ - Возвращает True, если НИ в одной группе не найдено расписание. - Возвращает False, если хотя бы в одной группе найдено расписание. + Возвращает True, если расписание ещё недоступно ни для одной группы. + Возвращает False, если хотя бы одной группе отправили расписание. """ - day = self._get_target_day() - found_any = False + target = self.schedule_service._resolve_target_date(0) + logger.info( + f"Проверяем Google Drive на расписание за {target.strftime('%d.%m.%Y')}" + ) + if not await self.schedule_service.is_published_for(0): + return True + + found_any = False for group, chat_id in Config.GROUP_CHATS.items(): logger.info( - f"Проверяем расписание для {group} на {day.strftime('%d.%m.%Y')}" + f"Проверяем расписание для {group} на {target.strftime('%d.%m.%Y')}" ) - found = await self._check_group_schedule(group, chat_id, day.day) - if found: + if await self._check_group_schedule(group, chat_id): found_any = True - return not found_any # <-- вот так правильно + return not found_any - async def _check_group_schedule(self, group: str, chat_id: int, day: int) -> bool: + async def _check_group_schedule(self, group: str, chat_id: int) -> bool: text, url, data_day, data_month = await self.schedule_service.get_schedule( - group, day + group, 0 ) - if text and "не найдено" not in text.lower(): + if not self.schedule_service.is_schedule_missing(text): msg = await self.bot.send_message( chat_id, - f"Авто-расписание для {group} на {data_day:02d}.{data_month:02d}\n\n{text}", - parse_mode="Markdown", + ( + f"🔔 Авто-расписание для {group} " + f"на {data_day:02d}.{data_month:02d}\n\n{text}" + ), + parse_mode="HTML", ) - await self.bot.pin_chat_message( - chat_id, msg.message_id, disable_notification=False + try: + await self.bot.pin_chat_message( + chat_id, msg.message_id, disable_notification=False + ) + except Exception as e: + logger.warning(f"Не удалось закрепить сообщение в {chat_id}: {e}") + return True + + png, url, data_day, data_month = await self.schedule_service.get_pschedule( + group, 0 + ) + if png: + await self.bot.send_photo( + chat_id, + types.BufferedInputFile(png, filename=f"{group}.png"), + caption=( + f"🔔 АВАРИЙНЫЙ РЕЖИМ\n\n" + f"Авто-расписание для {group} " + f"на {data_day:02d}.{data_month:02d}" + ), ) return True - else: - png, url, data_day, data_month = await self.schedule_service.get_pschedule( - group, day - ) - if png: - await self.bot.send_photo( - chat_id, - types.BufferedInputFile(png, filename=f"{group}.png"), - caption=f"АВАРИЙНЫЙ РЕЖИМ\n\nАвто-расписание для {group} на {data_day:02d}.{data_month:02d}\n\nНайдено с ошибкой", - ) - return True + return False - - # clip_hash = hashlib.md5(clip_png).hexdigest() - - # Логика проверки изменений и отправки сообщений - # ... (ваша существующая логика)