from __future__ import annotations import io import logging import re from datetime import datetime, timedelta from html import escape from typing import List, Optional, Tuple import fitz from pypdf import PdfReader from config import Config from services.drive_schedule_source import DriveScheduleSource logger = logging.getLogger(__name__) BOUNDARY = r"[^0-9A-Za-zА-Яа-яЁё]" class ScheduleService: def __init__(self): folder_id = getattr(Config, "SCHEDULE_DRIVE_FOLDER_ID", None) or ( "1WhUFHGkS4qC_e84KRArF4ooXHJr8mL5T" ) self.drive = DriveScheduleSource(folder_id) self._pdf_cache: dict[str, bytes] = {} def _resolve_target_date(self, day_offset: int = 0) -> datetime: target = datetime.now() if day_offset == 0: if target.hour >= 12: target += timedelta(days=1) if target.weekday() == 6: target += timedelta(days=1) else: target = target.replace(day=int(day_offset)) return target.replace(hour=0, minute=0, second=0, microsecond=0) async def _load_pdf_for_date( self, day_offset: int = 0 ) -> Tuple[Optional[bytes], Optional[str], int, int]: target = self._resolve_target_date(day_offset) day, month = target.day, target.month drive_file = await self.drive.find_for_date(target) if not drive_file: return None, None, day, month if drive_file.file_id not in self._pdf_cache: self._pdf_cache[drive_file.file_id] = await self.drive.download_pdf( drive_file.file_id ) url = f"https://drive.google.com/file/d/{drive_file.file_id}/view" return self._pdf_cache[drive_file.file_id], url, day, month @staticmethod def exact_group_regex(group: str) -> re.Pattern: pattern = rf"(^|{BOUNDARY}){re.escape(group)}({BOUNDARY}|$)" return re.compile(pattern, re.IGNORECASE) @staticmethod def _extract_pdf_lines(pdf_bytes: bytes) -> List[str]: reader = PdfReader(io.BytesIO(pdf_bytes)) lines: List[str] = [] for page in reader.pages: text = page.extract_text() or "" for raw_line in text.splitlines(): line = raw_line.strip() if line: lines.append(line) return lines @staticmethod def _parse_group_lines(lines: List[str], group: str) -> List[str]: regex = ScheduleService.exact_group_regex(group) schedule_lines: List[str] = [] found_group = False for line in lines: if not found_group: if regex.search(line): found_group = True schedule_lines.append(line) else: if "-----" in line or "+----" in line: break schedule_lines.append(line) return schedule_lines @staticmethod def is_schedule_missing(text: str) -> bool: lowered = text.lower() return "не найдено" in lowered or "не опубликовано" in lowered @staticmethod def _format_schedule_html(day: int, schedule_lines: List[str]) -> str: body_lines = [] for line in schedule_lines: formatted = line.replace("¦", "│").replace(" ", " ").strip() if formatted: body_lines.append(formatted) body = escape("\n".join(body_lines)) return f"📅 Расписание для {day} числа:\n
{body}"
async def is_published_for(self, day_offset: int = 0) -> bool:
target = self._resolve_target_date(day_offset)
return await self.drive.find_for_date(target) is not None
async def get_schedule(
self, group: str, day_offset: int = 0
) -> Tuple[str, str, int, int]:
pdf_bytes, url, day, month = await self._load_pdf_for_date(day_offset)
folder_url = "https://drive.google.com/drive/folders/" + self.drive.folder_id
if not pdf_bytes:
result = (
f"⚠️ Расписание на {day:02d}.{month:02d} ещё не опубликовано "
f"в Google Drive"
)
return result, folder_url, day, month
schedule_lines = self._parse_group_lines(
self._extract_pdf_lines(pdf_bytes), group
)
if not schedule_lines:
result = f"⚠️ Расписание для группы {escape(group)} на {day} число не найдено"
else:
result = self._format_schedule_html(day, schedule_lines)
return result, url or folder_url, day, month
async def get_pschedule(
self, group: str, day_offset: int = 0
) -> Tuple[Optional[bytes], str, int, int]:
pdf_bytes, url, day, month = await self._load_pdf_for_date(day_offset)
fallback_url = (
url
or "https://drive.google.com/drive/folders/" + self.drive.folder_id
)
if not pdf_bytes:
return None, fallback_url, day, month
try:
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
regex = self.exact_group_regex(group)
for page in doc:
line_items = []
page_dict = page.get_text("dict")
for block in page_dict.get("blocks", []):
if block.get("type") != 0:
continue
for line in block.get("lines", []):
text = "".join(span["text"] for span in line["spans"]).strip()
if text:
line_items.append((text, fitz.Rect(line["bbox"])))
found_group = False
rects: List[fitz.Rect] = []
for text, bbox in line_items:
if not found_group:
if regex.search(text):
found_group = True
rects.append(bbox)
else:
if "-----" in text or "+----" in text:
break
rects.append(bbox)
if not rects:
continue
clip = rects[0]
for rect in rects[1:]:
clip |= rect
clip.x0 = max(0, clip.x0 - 10)
clip.x1 = min(page.rect.width, clip.x1 + 150)
clip.y0 = max(0, clip.y0 - 5)
clip.y1 = min(page.rect.height, clip.y1 + 10)
pixmap = page.get_pixmap(clip=clip, matrix=fitz.Matrix(2, 2))
return pixmap.tobytes("png"), fallback_url, day, month
except Exception as e:
logger.error(f"Ошибка при получении расписания из PDF: {e}")
return None, fallback_url, day, month