It's version 0.6 I add users DB
This commit is contained in:
@@ -1,14 +1,15 @@
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Tuple
|
||||
from playwright.async_api import async_playwright, ViewportSize, FloatRect
|
||||
from playwright.async_api import async_playwright
|
||||
import logging
|
||||
import aiohttp
|
||||
from bs4 import BeautifulSoup
|
||||
import ssl
|
||||
import certifi
|
||||
import re
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BOUNDARY = r"[^0-9A-Za-zА-Яа-яЁё]"
|
||||
|
||||
class ScheduleService:
|
||||
def __init__(self):
|
||||
@@ -32,8 +33,10 @@ class ScheduleService:
|
||||
int(d.month),
|
||||
)
|
||||
|
||||
import re
|
||||
|
||||
async def get_schedule(
|
||||
self, group: str, day_offset: int = 0
|
||||
self, group: str, day_offset: int = 0
|
||||
) -> Tuple[str, str, int, int]:
|
||||
"""Получение текста расписания (аналог Rust parse_schedule)"""
|
||||
url, day, month = self._make_url(day_offset)
|
||||
@@ -48,9 +51,8 @@ class ScheduleService:
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
|
||||
}
|
||||
|
||||
# тут можно использовать aiohttp + chardet/charset_normalizer
|
||||
async with aiohttp.ClientSession(
|
||||
connector=connector, headers=headers
|
||||
connector=connector, headers=headers
|
||||
) as session:
|
||||
async with session.get(url) as resp:
|
||||
raw_bytes = await resp.read()
|
||||
@@ -58,15 +60,18 @@ class ScheduleService:
|
||||
decoded = raw_bytes.decode("cp1251", errors="ignore")
|
||||
document = BeautifulSoup(decoded, "html.parser")
|
||||
|
||||
# ищем <p class="MsoPlainText"><b>...</b>
|
||||
elements = document.select("p.MsoPlainText b")
|
||||
|
||||
found_group = False
|
||||
schedule_lines = []
|
||||
|
||||
# регулярка: ищем точное совпадение группы как отдельного слова
|
||||
group_pattern = re.compile(rf"\b{re.escape(group)}\b", re.IGNORECASE)
|
||||
|
||||
for el in elements:
|
||||
text = el.get_text(strip=True)
|
||||
if not found_group:
|
||||
if group in text:
|
||||
if group_pattern.search(text):
|
||||
found_group = True
|
||||
schedule_lines.append(text)
|
||||
else:
|
||||
@@ -86,39 +91,81 @@ class ScheduleService:
|
||||
|
||||
return result, url, day, month
|
||||
|
||||
|
||||
|
||||
def exact_group_regex(self, group: str) -> re.Pattern:
|
||||
# ищем как отдельный токен: граница слева/справа или начало/конец
|
||||
pattern = rf"(^|{BOUNDARY}){re.escape(group)}({BOUNDARY}|$)"
|
||||
return re.compile(pattern)
|
||||
|
||||
async def get_pschedule(
|
||||
self, group: str, day_offset: int = 0
|
||||
self, group: str, day_offset: int = 0
|
||||
) -> Tuple[Optional[bytes], str, int, int]:
|
||||
"""Получение скриншота расписания"""
|
||||
url, day, month = self._make_url(day_offset)
|
||||
|
||||
async with async_playwright() as p:
|
||||
browser = await p.chromium.launch(headless=True)
|
||||
context = await browser.new_context(
|
||||
viewport=ViewportSize(width=400, height=3000)
|
||||
)
|
||||
context = await browser.new_context(viewport={"width": 400, "height": 3000})
|
||||
page = await context.new_page()
|
||||
|
||||
try:
|
||||
response = await page.goto(url, wait_until="networkidle", timeout=30000)
|
||||
|
||||
if not response or response.status != 200:
|
||||
logger.warning(f"Ошибка загрузки страницы: {url}")
|
||||
return None, url, day, month
|
||||
|
||||
locator = page.locator("p", has_text=group).first
|
||||
if await locator.count() > 0:
|
||||
await locator.scroll_into_view_if_needed()
|
||||
box = await locator.bounding_box()
|
||||
# 1) сначала пытаемся по более точному селектору (как в HTML-парсере)
|
||||
candidates = page.locator("p.MsoPlainText b")
|
||||
count = await candidates.count()
|
||||
|
||||
regex = self.exact_group_regex(group)
|
||||
target_handle = None
|
||||
|
||||
for i in range(count):
|
||||
el = candidates.nth(i)
|
||||
text = (await el.inner_text()).strip()
|
||||
if regex.search(text):
|
||||
# нашли b с нужной группой — возьмём родительский p для удобного скрина
|
||||
parent_p = await el.locator("xpath=ancestor::p[1]").element_handle()
|
||||
target_handle = parent_p or await el.element_handle()
|
||||
break
|
||||
|
||||
# 2) если не нашли в p.MsoPlainText b, попробуем просто p b или p
|
||||
if not target_handle:
|
||||
candidates = page.locator("p b")
|
||||
count = await candidates.count()
|
||||
for i in range(count):
|
||||
el = candidates.nth(i)
|
||||
text = (await el.inner_text()).strip()
|
||||
if regex.search(text):
|
||||
parent_p = await el.locator("xpath=ancestor::p[1]").element_handle()
|
||||
target_handle = parent_p or await el.element_handle()
|
||||
break
|
||||
|
||||
if not target_handle:
|
||||
# последний шанс: любые <p>
|
||||
candidates = page.locator("p")
|
||||
count = await candidates.count()
|
||||
for i in range(count):
|
||||
el = candidates.nth(i)
|
||||
text = (await el.inner_text()).strip()
|
||||
if regex.search(text):
|
||||
target_handle = await el.element_handle()
|
||||
break
|
||||
|
||||
if target_handle:
|
||||
# скроллим и получаем box
|
||||
await target_handle.scroll_into_view_if_needed()
|
||||
box = await target_handle.bounding_box()
|
||||
if box:
|
||||
clip_rect = FloatRect(
|
||||
x=float(max(box["x"] - 0, 0)),
|
||||
y=float(max(box["y"] - 0, 0)),
|
||||
width=float(box["width"] + 150),
|
||||
height=float(box["height"] + 100),
|
||||
)
|
||||
return await page.screenshot(clip=clip_rect), url, day, month
|
||||
clip_rect = {
|
||||
"x": float(max(box["x"], 0)),
|
||||
"y": float(max(box["y"], 0)),
|
||||
"width": float(box["width"] + 150),
|
||||
"height": float(box["height"] + 100),
|
||||
}
|
||||
img = await page.screenshot(clip=clip_rect)
|
||||
return img, url, day, month
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при получении расписания: {e}")
|
||||
@@ -127,3 +174,4 @@ class ScheduleService:
|
||||
await browser.close()
|
||||
|
||||
return None, url, day, month
|
||||
|
||||
|
||||
Reference in New Issue
Block a user