Files
myfirstprogram/utils/analytics.py
T
2025-11-23 23:17:00 +03:00

290 lines
11 KiB
Python

from collections import Counter
import re
from datetime import datetime
import statistics
import tempfile
import json
def analyze_bot_logs(log_file_path="bot.log"):
"""
Анализирует логи бота и создает детальную статистику
"""
try:
with open(log_file_path, "r", encoding="utf-8") as log:
lines = log.readlines()
except FileNotFoundError:
return {"error": "Лог файл не найден"}
except Exception as e:
return {"error": f"Ошибка чтения файла: {str(e)}"}
if not lines:
return {"error": "Лог файл пуст"}
# Основные счетчики
stats = {
"total_lines": len(lines),
"time_period": {},
"log_levels": Counter(),
"activities": Counter(),
"errors": Counter(),
"warnings": Counter(),
"user_commands": Counter(),
"groups": Counter(),
"restarts": 0,
"schedule_checks": 0,
"schedule_changes": 0,
"schedule_failures": 0,
"network_errors": 0,
"browser_errors": 0,
"telegram_errors": Counter(),
"performance": {
"avg_handling_time": 0,
"fastest_handling": float("inf"),
"slowest_handling": 0,
"handling_count": 0,
},
}
# Временные метрики
start_time = None
end_time = None
handling_times = []
# Регулярные выражения для парсинга
timestamp_pattern = r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})"
log_level_pattern = r"\[(INFO|WARNING|ERROR)\]"
handling_time_pattern = r"Duration (\d+) ms"
command_pattern = r"Команда /rasp от ([\d-]+), группа=([^,]+), дата=(\d+)"
schedule_pattern = r"Проверяем расписание для ([^ ]+) на (\d{2}\.\d{2}\.\d{4})"
for line in lines:
# Извлекаем временную метку
timestamp_match = re.search(timestamp_pattern, line)
if timestamp_match:
timestamp = timestamp_match.group(1)
if not start_time:
start_time = timestamp
end_time = timestamp
# Уровень логирования
level_match = re.search(log_level_pattern, line)
if level_match:
level = level_match.group(1)
stats["log_levels"][level] += 1
# Время обработки сообщений
time_match = re.search(handling_time_pattern, line)
if time_match and "is handled" in line:
handling_time = int(time_match.group(1))
handling_times.append(handling_time)
stats["performance"]["handling_count"] += 1
stats["performance"]["slowest_handling"] = max(
stats["performance"]["slowest_handling"], handling_time
)
stats["performance"]["fastest_handling"] = min(
stats["performance"]["fastest_handling"], handling_time
)
# Команды пользователей
cmd_match = re.search(command_pattern, line)
if cmd_match:
user_id, group, date_offset = cmd_match.groups()
stats["user_commands"][group] += 1
stats["groups"][group] += 1
# Проверки расписания
if "Проверяем расписание" in line:
stats["schedule_checks"] += 1
sched_match = re.search(schedule_pattern, line)
if sched_match:
group, date = sched_match.groups()
stats["groups"][group] += 1
# Изменения расписания
if "Изменения найдены" in line:
stats["schedule_changes"] += 1
# Ошибки расписания
if "Не удалось получить расписание" in line:
stats["schedule_failures"] += 1
# Перезапуски бота
if "Бот запускается" in line:
stats["restarts"] += 1
# Сетевые ошибки
if "Failed to fetch updates" in line:
stats["network_errors"] += 1
# Ошибки браузера
if "TargetClosedError" in line or "BrowserContext.close" in line:
stats["browser_errors"] += 1
# Ошибки Telegram API
if "Telegram server says" in line:
error_msg = line.split("Telegram server says - ")[-1].split(":")[0]
stats["telegram_errors"][error_msg] += 1
# Сбор ошибок и предупреждений
if "[ERROR]" in line:
error_msg = line.split("[ERROR]")[-1].strip()
stats["errors"][error_msg[:100]] += 1
if "[WARNING]" in line:
warning_msg = line.split("[WARNING]")[-1].strip()
stats["warnings"][warning_msg[:100]] += 1
# Расчет средней скорости обработки
if handling_times:
stats["performance"]["avg_handling_time"] = sum(handling_times) / len(
handling_times
)
# Период работы
if start_time and end_time:
stats["time_period"] = {
"start": start_time,
"end": end_time,
"duration_hours": calculate_duration_hours(start_time, end_time),
}
# Дополнительные метрики
stats["success_rate"] = calculate_success_rate(stats)
stats["uptime_percentage"] = calculate_uptime_percentage(stats)
stats["schedule_success_rate"] = calculate_schedule_success_rate(stats)
return stats
def calculate_duration_hours(start_str: str, end_str: str) -> float:
"""Вычисляет продолжительность в часах"""
fmt = "%Y-%m-%d %H:%M:%S"
try:
start = datetime.strptime(start_str, fmt)
end = datetime.strptime(end_str, fmt)
return round((end - start).total_seconds() / 3600, 2)
except (ValueError, TypeError):
return 0.0
def calculate_success_rate(stats):
"""Рассчитывает процент успешных операций"""
total_operations = stats["performance"]["handling_count"] + sum(
stats["errors"].values()
)
if total_operations == 0:
return 0
success_rate = (stats["performance"]["handling_count"] / total_operations) * 100
return round(success_rate, 2)
def calculate_uptime_percentage(stats):
"""Рассчитывает процент времени работы"""
if stats["time_period"].get("duration_hours", 0) == 0:
return 0
# Предполагаем, что каждый перезапуск занимает ~10 секунд
restart_downtime = stats["restarts"] * 10 / 3600
total_hours = stats["time_period"]["duration_hours"]
uptime_hours = total_hours - restart_downtime
uptime_percentage = (uptime_hours / total_hours) * 100
return round(uptime_percentage, 2)
def calculate_schedule_success_rate(stats):
"""Рассчитывает процент успешных проверок расписания"""
total_checks = stats["schedule_checks"]
if total_checks == 0:
return 0
successful_checks = total_checks - stats["schedule_failures"]
success_rate = (successful_checks / total_checks) * 100
return round(success_rate, 2)
def create_statistics_text(stats):
"""Создает текстовый отчет статистики с расширенными метриками"""
if "error" in stats:
return f"❌ Ошибка анализа логов: {stats['error']}"
text = "📊 СТАТИСТИКА РАБОТЫ БОТА\n"
text += "=" * 40 + "\n\n"
# Основная информация
text += "📈 ОБЩАЯ ИНФОРМАЦИЯ:\n"
text += f"• Период: {stats['time_period'].get('start', 'N/A')} - {stats['time_period'].get('end', 'N/A')}\n"
text += f"• Длительность: {stats['time_period'].get('duration_hours', 0)} ч\n"
text += f"• Строк в логе: {stats['total_lines']:,}\n\n"
# Производительность
handling_times = stats.get(
"handling_times", []
) # сохрани список в analyze_bot_logs
median_time = statistics.median(handling_times) if handling_times else 0
text += "⚡ ПРОИЗВОДИТЕЛЬНОСТЬ:\n"
text += f"• Среднее время: {stats['performance']['avg_handling_time']:.0f} мс\n"
text += f"• Медиана времени: {median_time:.0f} мс\n"
text += f"• Сообщений обработано: {stats['performance']['handling_count']}\n"
text += f"• Успешных операций: {stats['success_rate']}%\n\n"
# Статус работы
duration = stats["time_period"].get("duration_hours", 0)
errors_total = sum(stats["errors"].values())
errors_per_hour = round(errors_total / duration, 2) if duration else 0
restarts = stats["restarts"]
mtbf = round(duration / restarts, 2) if restarts else duration
text += "🔄 СТАТУС РАБОТЫ:\n"
text += f"• Перезапусков: {restarts}\n"
text += f"• Uptime: {stats['uptime_percentage']}%\n"
text += f"• MTBF (среднее время между перезапусками): {mtbf} ч\n"
text += f"• Ошибок в час: {errors_per_hour}\n\n"
# Расписание
text += "📅 РАСПИСАНИЕ:\n"
text += f"• Проверок: {stats['schedule_checks']}\n"
text += f"• Изменений: {stats['schedule_changes']}\n"
text += f"• Ошибок: {stats['schedule_failures']}\n"
text += f"• Успешных проверок: {stats['schedule_success_rate']}%\n\n"
# Ошибки
text += "🚨 ОШИБКИ:\n"
text += f"• Всего ошибок: {errors_total}\n"
text += f"• Предупреждений: {sum(stats['warnings'].values())}\n"
text += f"• Сетевых: {stats['network_errors']}\n"
text += f"• Браузера: {stats['browser_errors']}\n"
# Топ-3 ошибок
if stats["errors"]:
text += "• Топ ошибок:\n"
for err, count in stats["errors"].most_common(3):
text += f" - {err} ({count})\n"
text += "\n"
# Пользователи
text += "👥 АКТИВНОСТЬ:\n"
text += f"• Команд: {sum(stats['user_commands'].values())}\n"
text += f"• Групп: {len(stats['groups'])}\n"
if stats["groups"]:
text += "• Топ групп:\n"
for group, count in stats["groups"].most_common(3):
text += f" - {group}: {count}\n"
return text
def create_statistics_file(stats):
"""Создает временный файл с полной статистикой"""
if "error" in stats:
return None
# Создаем временный файл
with tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", suffix=".json", delete=False
) as f:
json.dump(stats, f, ensure_ascii=False, indent=2, default=str)
temp_filename = f.name
return temp_filename