it's version 0.7.2 Я используя нейронку создал новый парсер для гугл таблиц. Тем самым востановив работу программы
This commit is contained in:
+161
-146
@@ -1,177 +1,192 @@
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Tuple
|
||||
from playwright.async_api import async_playwright
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import logging
|
||||
import aiohttp
|
||||
from bs4 import BeautifulSoup
|
||||
import ssl
|
||||
import certifi
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from html import escape
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
import fitz
|
||||
from pypdf import PdfReader
|
||||
|
||||
from config import Config
|
||||
from services.drive_schedule_source import DriveScheduleSource
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
BOUNDARY = r"[^0-9A-Za-zА-Яа-яЁё]"
|
||||
|
||||
|
||||
class ScheduleService:
|
||||
def __init__(self):
|
||||
self.base_url = (
|
||||
"https://college.by/accounts/raspis/{mouth:02d}/{day:02d}-PODNAM.htm"
|
||||
folder_id = getattr(Config, "SCHEDULE_DRIVE_FOLDER_ID", None) or (
|
||||
"1WhUFHGkS4qC_e84KRArF4ooXHJr8mL5T"
|
||||
)
|
||||
self.drive = DriveScheduleSource(folder_id)
|
||||
self._pdf_cache: dict[str, bytes] = {}
|
||||
|
||||
def _make_url(self, day: int = 0) -> Tuple[str, int, int]:
|
||||
"""Генерация URL для расписания"""
|
||||
d = datetime.now()
|
||||
if day == 0:
|
||||
if d.hour >= 12:
|
||||
d += timedelta(days=1)
|
||||
if d.weekday() == 6:
|
||||
d += timedelta(days=1)
|
||||
return self.base_url.format(day=d.day, mouth=d.month), d.day, d.month
|
||||
def _resolve_target_date(self, day_offset: int = 0) -> datetime:
|
||||
target = datetime.now()
|
||||
if day_offset == 0:
|
||||
if target.hour >= 12:
|
||||
target += timedelta(days=1)
|
||||
if target.weekday() == 6:
|
||||
target += timedelta(days=1)
|
||||
else:
|
||||
return (
|
||||
self.base_url.format(day=int(day), mouth=d.month),
|
||||
int(day),
|
||||
int(d.month),
|
||||
target = target.replace(day=int(day_offset))
|
||||
return target.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
|
||||
async def _load_pdf_for_date(
|
||||
self, day_offset: int = 0
|
||||
) -> Tuple[Optional[bytes], Optional[str], int, int]:
|
||||
target = self._resolve_target_date(day_offset)
|
||||
day, month = target.day, target.month
|
||||
|
||||
drive_file = await self.drive.find_for_date(target)
|
||||
if not drive_file:
|
||||
return None, None, day, month
|
||||
|
||||
if drive_file.file_id not in self._pdf_cache:
|
||||
self._pdf_cache[drive_file.file_id] = await self.drive.download_pdf(
|
||||
drive_file.file_id
|
||||
)
|
||||
|
||||
import re
|
||||
|
||||
async def get_schedule(
|
||||
self, group: str, day_offset: int = 0
|
||||
) -> Tuple[str, str, int, int]:
|
||||
"""Получение текста расписания (аналог Rust parse_schedule)"""
|
||||
url, day, month = self._make_url(day_offset)
|
||||
|
||||
ssl_context = ssl.create_default_context(cafile=certifi.where())
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
connector = aiohttp.TCPConnector(ssl=ssl_context)
|
||||
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession(
|
||||
connector=connector, headers=headers
|
||||
) as session:
|
||||
async with session.get(url) as resp:
|
||||
raw_bytes = await resp.read()
|
||||
|
||||
decoded = raw_bytes.decode("cp1251", errors="ignore")
|
||||
document = BeautifulSoup(decoded, "html.parser")
|
||||
|
||||
elements = document.select("p.MsoPlainText b")
|
||||
|
||||
found_group = False
|
||||
schedule_lines = []
|
||||
|
||||
# регулярка: ищем точное совпадение группы как отдельного слова
|
||||
group_pattern = re.compile(rf"\b{re.escape(group)}\b", re.IGNORECASE)
|
||||
|
||||
for el in elements:
|
||||
text = el.get_text(strip=True)
|
||||
if not found_group:
|
||||
if group_pattern.search(text):
|
||||
found_group = True
|
||||
schedule_lines.append(text)
|
||||
else:
|
||||
if "-----" in text or "+----" in text:
|
||||
break
|
||||
schedule_lines.append(text)
|
||||
|
||||
if not schedule_lines:
|
||||
result = f"Расписание для группы {group} на {day} число не найдено"
|
||||
else:
|
||||
result = f"📅 Расписание для {day} числа:\n```\n"
|
||||
for line in schedule_lines:
|
||||
formatted = line.replace("¦", "│").replace(" ", " ").strip()
|
||||
if formatted:
|
||||
result += f"{formatted}\n"
|
||||
result += "```"
|
||||
|
||||
return result, url, day, month
|
||||
|
||||
url = f"https://drive.google.com/file/d/{drive_file.file_id}/view"
|
||||
return self._pdf_cache[drive_file.file_id], url, day, month
|
||||
|
||||
@staticmethod
|
||||
def exact_group_regex(group: str) -> re.Pattern:
|
||||
# ищем как отдельный токен: граница слева/справа или начало/конец
|
||||
pattern = rf"(^|{BOUNDARY}){re.escape(group)}({BOUNDARY}|$)"
|
||||
return re.compile(pattern)
|
||||
return re.compile(pattern, re.IGNORECASE)
|
||||
|
||||
@staticmethod
|
||||
def _extract_pdf_lines(pdf_bytes: bytes) -> List[str]:
|
||||
reader = PdfReader(io.BytesIO(pdf_bytes))
|
||||
lines: List[str] = []
|
||||
for page in reader.pages:
|
||||
text = page.extract_text() or ""
|
||||
for raw_line in text.splitlines():
|
||||
line = raw_line.strip()
|
||||
if line:
|
||||
lines.append(line)
|
||||
return lines
|
||||
|
||||
@staticmethod
|
||||
def _parse_group_lines(lines: List[str], group: str) -> List[str]:
|
||||
regex = ScheduleService.exact_group_regex(group)
|
||||
schedule_lines: List[str] = []
|
||||
found_group = False
|
||||
|
||||
for line in lines:
|
||||
if not found_group:
|
||||
if regex.search(line):
|
||||
found_group = True
|
||||
schedule_lines.append(line)
|
||||
else:
|
||||
if "-----" in line or "+----" in line:
|
||||
break
|
||||
schedule_lines.append(line)
|
||||
|
||||
return schedule_lines
|
||||
|
||||
@staticmethod
|
||||
def is_schedule_missing(text: str) -> bool:
|
||||
lowered = text.lower()
|
||||
return "не найдено" in lowered or "не опубликовано" in lowered
|
||||
|
||||
@staticmethod
|
||||
def _format_schedule_html(day: int, schedule_lines: List[str]) -> str:
|
||||
body_lines = []
|
||||
for line in schedule_lines:
|
||||
formatted = line.replace("¦", "│").replace(" ", " ").strip()
|
||||
if formatted:
|
||||
body_lines.append(formatted)
|
||||
body = escape("\n".join(body_lines))
|
||||
return f"📅 Расписание для {day} числа:\n<pre>{body}</pre>"
|
||||
|
||||
async def is_published_for(self, day_offset: int = 0) -> bool:
|
||||
target = self._resolve_target_date(day_offset)
|
||||
return await self.drive.find_for_date(target) is not None
|
||||
|
||||
async def get_schedule(
|
||||
self, group: str, day_offset: int = 0
|
||||
) -> Tuple[str, str, int, int]:
|
||||
pdf_bytes, url, day, month = await self._load_pdf_for_date(day_offset)
|
||||
|
||||
folder_url = "https://drive.google.com/drive/folders/" + self.drive.folder_id
|
||||
|
||||
if not pdf_bytes:
|
||||
result = (
|
||||
f"⚠️ Расписание на {day:02d}.{month:02d} ещё не опубликовано "
|
||||
f"в <a href=\"{folder_url}\">Google Drive</a>"
|
||||
)
|
||||
return result, folder_url, day, month
|
||||
|
||||
schedule_lines = self._parse_group_lines(
|
||||
self._extract_pdf_lines(pdf_bytes), group
|
||||
)
|
||||
|
||||
if not schedule_lines:
|
||||
result = f"⚠️ Расписание для группы {escape(group)} на {day} число не найдено"
|
||||
else:
|
||||
result = self._format_schedule_html(day, schedule_lines)
|
||||
|
||||
return result, url or folder_url, day, month
|
||||
|
||||
async def get_pschedule(
|
||||
self, group: str, day_offset: int = 0
|
||||
self, group: str, day_offset: int = 0
|
||||
) -> Tuple[Optional[bytes], str, int, int]:
|
||||
url, day, month = self._make_url(day_offset)
|
||||
pdf_bytes, url, day, month = await self._load_pdf_for_date(day_offset)
|
||||
fallback_url = (
|
||||
url
|
||||
or "https://drive.google.com/drive/folders/" + self.drive.folder_id
|
||||
)
|
||||
|
||||
async with async_playwright() as p:
|
||||
browser = await p.chromium.launch(headless=True)
|
||||
context = await browser.new_context(viewport={"width": 400, "height": 3000})
|
||||
page = await context.new_page()
|
||||
if not pdf_bytes:
|
||||
return None, fallback_url, day, month
|
||||
|
||||
try:
|
||||
response = await page.goto(url, wait_until="networkidle", timeout=30000)
|
||||
if not response or response.status != 200:
|
||||
logger.warning(f"Ошибка загрузки страницы: {url}")
|
||||
return None, url, day, month
|
||||
try:
|
||||
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
||||
regex = self.exact_group_regex(group)
|
||||
|
||||
# 1) сначала пытаемся по более точному селектору (как в HTML-парсере)
|
||||
candidates = page.locator("p.MsoPlainText b")
|
||||
count = await candidates.count()
|
||||
for page in doc:
|
||||
line_items = []
|
||||
page_dict = page.get_text("dict")
|
||||
for block in page_dict.get("blocks", []):
|
||||
if block.get("type") != 0:
|
||||
continue
|
||||
for line in block.get("lines", []):
|
||||
text = "".join(span["text"] for span in line["spans"]).strip()
|
||||
if text:
|
||||
line_items.append((text, fitz.Rect(line["bbox"])))
|
||||
|
||||
regex = self.exact_group_regex(group)
|
||||
target_handle = None
|
||||
|
||||
for i in range(count):
|
||||
el = candidates.nth(i)
|
||||
text = (await el.inner_text()).strip()
|
||||
if regex.search(text):
|
||||
# нашли b с нужной группой — возьмём родительский p для удобного скрина
|
||||
parent_p = await el.locator("xpath=ancestor::p[1]").element_handle()
|
||||
target_handle = parent_p or await el.element_handle()
|
||||
break
|
||||
|
||||
# 2) если не нашли в p.MsoPlainText b, попробуем просто p b или p
|
||||
if not target_handle:
|
||||
candidates = page.locator("p b")
|
||||
count = await candidates.count()
|
||||
for i in range(count):
|
||||
el = candidates.nth(i)
|
||||
text = (await el.inner_text()).strip()
|
||||
found_group = False
|
||||
rects: List[fitz.Rect] = []
|
||||
for text, bbox in line_items:
|
||||
if not found_group:
|
||||
if regex.search(text):
|
||||
parent_p = await el.locator("xpath=ancestor::p[1]").element_handle()
|
||||
target_handle = parent_p or await el.element_handle()
|
||||
found_group = True
|
||||
rects.append(bbox)
|
||||
else:
|
||||
if "-----" in text or "+----" in text:
|
||||
break
|
||||
rects.append(bbox)
|
||||
|
||||
if not target_handle:
|
||||
# последний шанс: любые <p>
|
||||
candidates = page.locator("p")
|
||||
count = await candidates.count()
|
||||
for i in range(count):
|
||||
el = candidates.nth(i)
|
||||
text = (await el.inner_text()).strip()
|
||||
if regex.search(text):
|
||||
target_handle = await el.element_handle()
|
||||
break
|
||||
if not rects:
|
||||
continue
|
||||
|
||||
if target_handle:
|
||||
# скроллим и получаем box
|
||||
await target_handle.scroll_into_view_if_needed()
|
||||
box = await target_handle.bounding_box()
|
||||
if box:
|
||||
clip_rect = {
|
||||
"x": float(max(box["x"], 0)),
|
||||
"y": float(max(box["y"], 0)),
|
||||
"width": float(box["width"] + 150),
|
||||
"height": float(box["height"] + 100),
|
||||
}
|
||||
img = await page.screenshot(clip=clip_rect)
|
||||
return img, url, day, month
|
||||
clip = rects[0]
|
||||
for rect in rects[1:]:
|
||||
clip |= rect
|
||||
clip.x0 = max(0, clip.x0 - 10)
|
||||
clip.x1 = min(page.rect.width, clip.x1 + 150)
|
||||
clip.y0 = max(0, clip.y0 - 5)
|
||||
clip.y1 = min(page.rect.height, clip.y1 + 10)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при получении расписания: {e}")
|
||||
finally:
|
||||
await context.close()
|
||||
await browser.close()
|
||||
pixmap = page.get_pixmap(clip=clip, matrix=fitz.Matrix(2, 2))
|
||||
return pixmap.tobytes("png"), fallback_url, day, month
|
||||
|
||||
return None, url, day, month
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при получении расписания из PDF: {e}")
|
||||
|
||||
return None, fallback_url, day, month
|
||||
|
||||
Reference in New Issue
Block a user