178 lines
7.1 KiB
Python
178 lines
7.1 KiB
Python
from datetime import datetime, timedelta
|
||
from typing import Optional, Tuple
|
||
from playwright.async_api import async_playwright
|
||
import logging
|
||
import aiohttp
|
||
from bs4 import BeautifulSoup
|
||
import ssl
|
||
import certifi
|
||
import re
|
||
|
||
logger = logging.getLogger(__name__)
|
||
BOUNDARY = r"[^0-9A-Za-zА-Яа-яЁё]"
|
||
|
||
class ScheduleService:
|
||
def __init__(self):
|
||
self.base_url = (
|
||
"https://college.by/accounts/raspis/{mouth:02d}/{day:02d}-PODNAM.htm"
|
||
)
|
||
|
||
def _make_url(self, day: int = 0) -> Tuple[str, int, int]:
|
||
"""Генерация URL для расписания"""
|
||
d = datetime.now()
|
||
if day == 0:
|
||
if d.hour >= 12:
|
||
d += timedelta(days=1)
|
||
if d.weekday() == 6:
|
||
d += timedelta(days=1)
|
||
return self.base_url.format(day=d.day, mouth=d.month), d.day, d.month
|
||
else:
|
||
return (
|
||
self.base_url.format(day=int(day), mouth=d.month),
|
||
int(day),
|
||
int(d.month),
|
||
)
|
||
|
||
import re
|
||
|
||
async def get_schedule(
|
||
self, group: str, day_offset: int = 0
|
||
) -> Tuple[str, str, int, int]:
|
||
"""Получение текста расписания (аналог Rust parse_schedule)"""
|
||
url, day, month = self._make_url(day_offset)
|
||
|
||
ssl_context = ssl.create_default_context(cafile=certifi.where())
|
||
ssl_context.check_hostname = False
|
||
ssl_context.verify_mode = ssl.CERT_NONE
|
||
|
||
connector = aiohttp.TCPConnector(ssl=ssl_context)
|
||
|
||
headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
|
||
}
|
||
|
||
async with aiohttp.ClientSession(
|
||
connector=connector, headers=headers
|
||
) as session:
|
||
async with session.get(url) as resp:
|
||
raw_bytes = await resp.read()
|
||
|
||
decoded = raw_bytes.decode("cp1251", errors="ignore")
|
||
document = BeautifulSoup(decoded, "html.parser")
|
||
|
||
elements = document.select("p.MsoPlainText b")
|
||
|
||
found_group = False
|
||
schedule_lines = []
|
||
|
||
# регулярка: ищем точное совпадение группы как отдельного слова
|
||
group_pattern = re.compile(rf"\b{re.escape(group)}\b", re.IGNORECASE)
|
||
|
||
for el in elements:
|
||
text = el.get_text(strip=True)
|
||
if not found_group:
|
||
if group_pattern.search(text):
|
||
found_group = True
|
||
schedule_lines.append(text)
|
||
else:
|
||
if "-----" in text or "+----" in text:
|
||
break
|
||
schedule_lines.append(text)
|
||
|
||
if not schedule_lines:
|
||
result = f"Расписание для группы {group} на {day} число не найдено"
|
||
else:
|
||
result = f"📅 Расписание для {day} числа:\n```\n"
|
||
for line in schedule_lines:
|
||
formatted = line.replace("¦", "│").replace(" ", " ").strip()
|
||
if formatted:
|
||
result += f"{formatted}\n"
|
||
result += "```"
|
||
|
||
return result, url, day, month
|
||
|
||
|
||
|
||
def exact_group_regex(self, group: str) -> re.Pattern:
|
||
# ищем как отдельный токен: граница слева/справа или начало/конец
|
||
pattern = rf"(^|{BOUNDARY}){re.escape(group)}({BOUNDARY}|$)"
|
||
return re.compile(pattern)
|
||
|
||
async def get_pschedule(
|
||
self, group: str, day_offset: int = 0
|
||
) -> Tuple[Optional[bytes], str, int, int]:
|
||
url, day, month = self._make_url(day_offset)
|
||
|
||
async with async_playwright() as p:
|
||
browser = await p.chromium.launch(headless=True)
|
||
context = await browser.new_context(viewport={"width": 400, "height": 3000})
|
||
page = await context.new_page()
|
||
|
||
try:
|
||
response = await page.goto(url, wait_until="networkidle", timeout=30000)
|
||
if not response or response.status != 200:
|
||
logger.warning(f"Ошибка загрузки страницы: {url}")
|
||
return None, url, day, month
|
||
|
||
# 1) сначала пытаемся по более точному селектору (как в HTML-парсере)
|
||
candidates = page.locator("p.MsoPlainText b")
|
||
count = await candidates.count()
|
||
|
||
regex = self.exact_group_regex(group)
|
||
target_handle = None
|
||
|
||
for i in range(count):
|
||
el = candidates.nth(i)
|
||
text = (await el.inner_text()).strip()
|
||
if regex.search(text):
|
||
# нашли b с нужной группой — возьмём родительский p для удобного скрина
|
||
parent_p = await el.locator("xpath=ancestor::p[1]").element_handle()
|
||
target_handle = parent_p or await el.element_handle()
|
||
break
|
||
|
||
# 2) если не нашли в p.MsoPlainText b, попробуем просто p b или p
|
||
if not target_handle:
|
||
candidates = page.locator("p b")
|
||
count = await candidates.count()
|
||
for i in range(count):
|
||
el = candidates.nth(i)
|
||
text = (await el.inner_text()).strip()
|
||
if regex.search(text):
|
||
parent_p = await el.locator("xpath=ancestor::p[1]").element_handle()
|
||
target_handle = parent_p or await el.element_handle()
|
||
break
|
||
|
||
if not target_handle:
|
||
# последний шанс: любые <p>
|
||
candidates = page.locator("p")
|
||
count = await candidates.count()
|
||
for i in range(count):
|
||
el = candidates.nth(i)
|
||
text = (await el.inner_text()).strip()
|
||
if regex.search(text):
|
||
target_handle = await el.element_handle()
|
||
break
|
||
|
||
if target_handle:
|
||
# скроллим и получаем box
|
||
await target_handle.scroll_into_view_if_needed()
|
||
box = await target_handle.bounding_box()
|
||
if box:
|
||
clip_rect = {
|
||
"x": float(max(box["x"], 0)),
|
||
"y": float(max(box["y"], 0)),
|
||
"width": float(box["width"] + 150),
|
||
"height": float(box["height"] + 100),
|
||
}
|
||
img = await page.screenshot(clip=clip_rect)
|
||
return img, url, day, month
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при получении расписания: {e}")
|
||
finally:
|
||
await context.close()
|
||
await browser.close()
|
||
|
||
return None, url, day, month
|
||
|