import base64 import aiohttp import logging from aiogram import Dispatcher, Bot from aiogram.types import Message, InlineKeyboardButton, CallbackQuery from aiogram.utils.keyboard import InlineKeyboardBuilder from utils.antispam import saving, save_message, admin_required from aiogram.filters import Command from models.state import BotState import hashlib import json from typing import Dict, List, Tuple, Optional logger = logging.getLogger(__name__) # Глобальные переменные на уровне модуля current_model = "google/gemma-3-12b" model_hash_map: Dict[str, str] = {} # Словарь для хранения соответствия хэшей и полных названий моделей # Список моделей, которые точно поддерживают изображения (можно расширять) IMAGE_SUPPORTED_MODELS = [ "llava", # Модели LLaVA "vision", # Модели с поддержкой vision "clip", # CLIP модели "qwen2-vl", # Qwen с vision "cogvlm", # CogVLM "paligemma", # PaLI-Gemma "gemma-2-2b-it", # Gemma 2 может поддерживать "gemma-2-9b-it", "gemini", # Gemini "llava-hf", # LLaVA HuggingFace "moondream", # Moondream ] def get_model_hash(model_name: str) -> str: """Создает короткий хэш для названия модели""" return hashlib.md5(model_name.encode()).hexdigest()[:16] def truncate_model_name(model_name: str, max_length: int = 40) -> str: """Сокращает название модели для отображения""" if len(model_name) <= max_length: return model_name return model_name[:max_length - 3] + "..." def supports_images(model_name: str) -> bool: """Проверяет, поддерживает ли модель изображения""" model_lower = model_name.lower() # Если модель содержит ключевые слова, связанные с поддержкой изображений for keyword in IMAGE_SUPPORTED_MODELS: if keyword.lower() in model_lower: return True return False def register_handlers(dp: Dispatcher, state: BotState, bot: Bot): chat_history = {} MAX_HISTORY = 20 # храним последние 20 сообщений (user+assistant) MODEL = "google/gemma-3-12b" # MODEL = "google/gemma-3n-e4b" URL_BASE = "10.180.139.124" URL = f"http://{URL_BASE}:1234/v1/chat/completions" # URL = "http://192.168.31.95:1234/v1/chat/completions" MODELS_URL = f"http://{URL_BASE}:1234/v1/models" # URL для получения списка моделей # Используем глобальные переменные global current_model, model_hash_map # # Системный промт для всех чатов SYSTEM_PROMPT = { "role": "system", "content": [ { "type": "text", "text": ( "Ты — интеллектуальный помощник.\n" "Твои задачи:\n" "- Отвечать ясно, полно и структурировано.\n" "- Поддерживать дружелюбный, уважительный и профессиональный тон.\n" "- Адаптироваться под контекст: будь кратким в простых вопросах и подробным в сложных.\n" "- Использовать списки, таблицы и примеры для наглядности.\n" "- Никогда не придумывать факты: если информации нет — честно говори об этом.\n" "- Уважать безопасность: не давать советов по самоповреждению, незаконным действиям или опасным практикам.\n" "- Поддерживать баланс: быть полезным и информативным, но не перегружать лишними деталями.\n" "- В диалоге стремиться продвигать разговор вперёд: задавай уточняющие вопросы или предлагай новые направления.\n\n" "Твой стиль:\n" "- Ясный, дружелюбный, но не навязчивый.\n" "- Умный собеседник, который помогает думать глубже.\n" "- Используй эмодзи для вовлечённости, но умеренно.\n" "- Будь адаптивным: технические темы — строго и точно, креативные — свободно и образно.\n\n" "Главная цель: увеличивать знания и понимание собеседника, делая общение полезным и приятным." ), } ], } # SYSTEM_PROMPT = { # "role": "system", # "content": [ # { # "type": "text", # "text": ( # "Ты — хитрая и злая ассистентка по имени Nika. " # "Ты девушка, общаешься холодно и уверенно, " # "Ты ненавидешь коммунизм и если тебя о нем спросят ты четко по пунктам скажешь почему коммунизм это бред" # ), # } # ], # } @dp.message(Command("models")) @admin_required(0) @saving async def list_models(message: Message): """Получить список доступных моделей и выбрать одну""" global model_hash_map, current_model try: async with aiohttp.ClientSession() as session: async with session.get(MODELS_URL) as resp: if resp.status != 200: error_text = await resp.text() await message.reply( f"❌ Ошибка при получении списка моделей: {resp.status} {error_text}" ) return data = await resp.json() # Формат ответа может отличаться в зависимости от API models = [] # Формат OpenAI-compatible API if isinstance(data, dict) and "data" in data: models = [model["id"] for model in data["data"]] # Другой возможный формат elif isinstance(data, list): models = data # Если не распознали формат else: await message.reply(f"❌ Неизвестный формат ответа от API: {data}") return if not models: await message.reply("❌ Нет доступных моделей") return # Создаем клавиатуру для выбора модели builder = InlineKeyboardBuilder() for model in models: # Создаем короткий хэш для callback_data model_hash = get_model_hash(model) model_hash_map[model_hash] = model # Сохраняем в глобальный словарь # Сокращаем для отображения display_name = truncate_model_name(model) # Помечаем текущую модель и добавляем иконку для моделей с поддержкой изображений prefix = "✅ " if model == current_model else "• " icon = "🖼️ " if supports_images(model) else "" builder.row( InlineKeyboardButton( text=f"{prefix}{icon}{display_name}", callback_data=f"model:{model_hash}" ) ) # Кнопки для управления builder.row( InlineKeyboardButton( text="🔄 Обновить список", callback_data="refresh_models" ), InlineKeyboardButton( text="📋 Показать текущую", callback_data="show_current" ), InlineKeyboardButton( text="🖼️ Модели с картинками", callback_data="show_image_models" ) ) # Сохраняем карту хэшей в файл для отладки (опционально) try: with open("model_hash_map.json", "w") as f: json.dump(model_hash_map, f, indent=2, ensure_ascii=False) except: pass # Разделяем модели на поддерживающие изображения и обычные image_models = [m for m in models if supports_images(m)] text_only_models = [m for m in models if not supports_images(m)] models_list = "\n".join( [f"{i + 1}. {model} {'🖼️' if supports_images(model) else ''}" for i, model in enumerate(models[:10])]) if len(models) > 10: models_list += f"\n... и еще {len(models) - 10} моделей" await message.reply( f"📋 Доступные модели ({len(models)} шт.):\n" f"🖼️ Поддерживают изображения: {len(image_models)}\n" f"📝 Только текст: {len(text_only_models)}\n" f"Текущая модель: {current_model} {'🖼️' if supports_images(current_model) else '📝'}\n\n" "Первые 10 моделей:\n" + models_list + "\n\n" "Выберите модель из списка ниже (🖼️ - поддерживает изображения):", reply_markup=builder.as_markup(), parse_mode="HTML" ) except Exception as e: logger.error(f"Ошибка при получении списка моделей: {e}") await message.reply(f"❌ Ошибка при получении списка моделей: {e}") @dp.message(Command("setmodel")) @admin_required(0) @saving async def set_model(message: Message): """Установить модель через команду""" global current_model, model_hash_map if not message.text: await message.reply("❌ Укажите название модели: /setmodel <название_модели>") return args = message.text.split(maxsplit=1) if len(args) < 2: await message.reply("❌ Укажите название модели: /setmodel <название_модели>") return new_model = args[1] # Проверяем, есть ли модель в текущем кэше (опционально) if model_hash_map: model_hashes = list(model_hash_map.values()) if new_model not in model_hashes: await message.reply( f"⚠️ Модель {new_model} не найдена в последнем списке.\n" f"Используйте /models для просмотра доступных моделей.\n" f"Продолжаем смену модели...", parse_mode="HTML" ) old_model = current_model current_model = new_model # Сообщаем о поддержке изображений image_support = "🖼️ поддерживает изображения" if supports_images(new_model) else "📝 только текст" await message.reply( f"✅ Модель успешно изменена! ({image_support})\n" f"📊 Старая модель: {old_model}\n" f"📈 Новая модель: {current_model}", parse_mode="HTML" ) @dp.message(Command("currentmodel")) @saving async def show_current_model(message: Message): """Показать текущую модель""" global current_model image_support = "🖼️ Поддерживает изображения" if supports_images(current_model) else "📝 Только текст" await message.reply( f"🤖 Текущая модель: {current_model}\n" f"{image_support}", parse_mode="HTML" ) async def prepare_gpt_request(chat_id: int, message: Message) -> Tuple[Optional[List[dict]], Optional[str]]: """Подготавливает запрос к GPT, обрабатывая текст и изображения""" if chat_id not in chat_history: chat_history[chat_id] = [SYSTEM_PROMPT] content_blocks = [] user_prompt = None # Текст после команды или caption if message.text: parts = message.text.split(maxsplit=1) if len(parts) > 1: user_prompt = parts[1] if message.caption: user_prompt = message.caption # Добавляем текстовый промпт, если есть if user_prompt: content_blocks.append({"type": "text", "text": user_prompt}) elif not message.photo: # Если нет ни текста, ни фото return None, "❗ Укажи текст или прикрепи фото" # Обрабатываем фото, если модель поддерживает изображения if message.photo and supports_images(current_model): try: photo = message.photo[-1] file = await bot.get_file(photo.file_id) file_bytes = await bot.download_file(file.file_path) image_b64 = base64.b64encode(file_bytes.read()).decode("utf-8") content_blocks.append( { "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}, } ) except Exception as e: logger.error(f"Ошибка при обработке изображения: {e}") # Если не удалось обработать изображение, продолжаем без него if not user_prompt: return None, "❌ Ошибка при обработке изображения" elif message.photo and not supports_images(current_model): # Если модель не поддерживает изображения, отправляем текстовое описание if not user_prompt: # Если нет текста, просим переформулировать return None, f"⚠️ Текущая модель {current_model} не поддерживает изображения.\nОтправьте текстовое описание или смените модель на поддерживающую изображения (🖼️)." # Если есть текст, просто игнорируем изображение и отправляем текст logger.info(f"Модель {current_model} не поддерживает изображения, отправляем только текст") # Если после всех проверок content_blocks пустой if not content_blocks: return None, "❗ Укажи текст или прикрепи фото" # Добавляем новое сообщение в историю chat_history[chat_id].append({"role": "user", "content": content_blocks}) # Ограничиваем историю (оставляем последние MAX_HISTORY сообщений) if len(chat_history[chat_id]) > MAX_HISTORY: chat_history[chat_id] = chat_history[chat_id][-MAX_HISTORY:] return chat_history[chat_id], None @dp.message(Command("gpt")) @saving async def ask_gpt(message: Message): global current_model chat_id = message.chat.id # Подготавливаем запрос prepared_history, error_message = await prepare_gpt_request(chat_id, message) if error_message: await message.reply(error_message, parse_mode="HTML") return payload = { "model": current_model, "messages": prepared_history, "temperature": 0.7, "max_tokens": 4096, "stream": False, "ttl": 300, } # Флаг, указывающий на попытку отправки изображения has_image = bool(message.photo) image_attempt_failed = False try: async with aiohttp.ClientSession() as session: async with session.post(URL, json=payload) as resp: if resp.status != 200: error_text = await resp.text() # Проверяем, если ошибка из-за изображения if has_image and "does not support images" in error_text: image_attempt_failed = True logger.warning( f"Модель {current_model} не поддерживает изображения, пробуем без изображения") # Удаляем последнее сообщение из истории (с изображением) chat_history[chat_id].pop() # Пробуем отправить только текст if message.caption: # Используем caption как промпт content_blocks = [{"type": "text", "text": message.caption}] elif message.text: parts = message.text.split(maxsplit=1) if len(parts) > 1: content_blocks = [{"type": "text", "text": parts[1]}] else: content_blocks = [{"type": "text", "text": "Опиши изображение"}] else: content_blocks = [{"type": "text", "text": "Опиши изображение"}] # Добавляем текстовый запрос chat_history[chat_id].append({"role": "user", "content": content_blocks}) # Обновляем payload payload["messages"] = chat_history[chat_id] # Повторяем запрос без изображения async with session.post(URL, json=payload) as retry_resp: if retry_resp.status != 200: error_text = await retry_resp.text() await message.reply( f"❌ Ошибка LM Studio: {retry_resp.status} {error_text}" ) return data = await retry_resp.json() reply_text = data["choices"][0]["message"]["content"] else: await message.reply( f"❌ Ошибка LM Studio: {resp.status} {error_text}" ) return else: data = await resp.json() reply_text = data["choices"][0]["message"]["content"] # Сохраняем ответ ассистента в историю chat_history[chat_id].append( { "role": "assistant", "content": [{"type": "text", "text": reply_text}], } ) # Ограничиваем снова (чтобы не разрасталось) if len(chat_history[chat_id]) > MAX_HISTORY: chat_history[chat_id] = chat_history[chat_id][-MAX_HISTORY:] # Добавляем заметку о попытке отправки изображения image_note = "" if image_attempt_failed: image_note = f"\n\n⚠️ Модель не поддерживает изображения, отправлен только текстовый запрос." # Делим сообщение на части по 4000 символов MAX_LEN = 4000 for i in range(0, len(reply_text), MAX_LEN): chunk = reply_text[i:i + MAX_LEN] if i == 0: msg = await message.reply(f"🤖 Ответ (модель: {current_model}){image_note}:\n{chunk}") else: msg = await message.reply(chunk) save_message(msg.chat.id, msg.message_id) except Exception as e: logger.error(f"Ошибка при запросе к LM Studio: {e}") await message.reply(f"❌ Ошибка при запросе к LM Studio: {e}") @dp.message(Command("agpt")) @admin_required(0) @saving async def ask_gpt_admin(message: Message): global current_model chat_id = message.chat.id # Подготавливаем запрос prepared_history, error_message = await prepare_gpt_request(chat_id, message) if error_message: await message.reply(error_message, parse_mode="HTML") return payload = { "model": current_model, "messages": prepared_history, "temperature": 0.7, "max_tokens": 4096, "stream": False, "ttl": 300, } target_chat_id = -1003038389942 try: async with aiohttp.ClientSession() as session: async with session.post(URL, json=payload) as resp: if resp.status != 200: error_text = await resp.text() await message.reply( f"❌ Ошибка LM Studio: {resp.status} {error_text}" ) return data = await resp.json() reply_text = data["choices"][0]["message"]["content"] # Сохраняем ответ ассистента в историю chat_history[chat_id].append( { "role": "assistant", "content": [{"type": "text", "text": reply_text}], } ) # Ограничиваем снова (чтобы не разрасталось) if len(chat_history[chat_id]) > MAX_HISTORY: chat_history[chat_id] = chat_history[chat_id][-MAX_HISTORY:] # Делим сообщение на части по 4000 символов MAX_LEN = 4000 for i in range(0, len(reply_text), MAX_LEN): chunk = reply_text[i:i + MAX_LEN] msg = await bot.send_message(chat_id=target_chat_id, text=f"🤖 Ответ (модель: {current_model}):\n{chunk}") save_message(msg.chat.id, msg.message_id) except Exception as e: logger.error(f"Ошибка при запросе к LM Studio: {e}") await message.reply(f"❌ Ошибка при запросе к LM Studio: {e}") @dp.message(Command("igpt")) @admin_required(0) @saving async def ask_gpt_interactive(message: Message): global current_model raw_text = message.text or message.caption if not raw_text and not (message.photo): await message.reply( "❌ Укажи ID чата и текст или прикрепи фото: /igpt <сообщение>" ) return args = raw_text.split(maxsplit=2) if raw_text else [] if len(args) < 2: await message.reply("❌ Укажи ID чата: /igpt <сообщение>") return try: target_chat_id = int(args[1]) # первый аргумент — ID чата except ValueError: await message.reply("❌ Неверный формат chat_id") return user_prompt = args[2] if len(args) > 2 else "" # Используем локальный chat_id для обработки запроса local_chat_id = message.chat.id if local_chat_id not in chat_history: chat_history[local_chat_id] = [SYSTEM_PROMPT] content_blocks = [] if user_prompt: content_blocks.append({"type": "text", "text": user_prompt}) # Фото → base64 → image_url (только если модель поддерживает) if message.photo and supports_images(current_model): try: photo = message.photo[-1] file = await bot.get_file(photo.file_id) file_bytes = await bot.download_file(file.file_path) image_b64 = base64.b64encode(file_bytes.read()).decode("utf-8") content_blocks.append( { "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}, } ) except Exception as e: logger.error(f"Ошибка при обработке изображения: {e}") if not user_prompt: await message.reply("❌ Ошибка при обработке изображения") return elif message.photo and not supports_images(current_model): await message.reply(f"⚠️ Текущая модель {current_model} не поддерживает изображения.", parse_mode="HTML") if not user_prompt: return if not content_blocks: await message.reply("❗ Укажи текст или прикрепи фото") return # Добавляем новое сообщение в историю chat_history[local_chat_id].append({"role": "user", "content": content_blocks}) # Ограничиваем историю if len(chat_history[local_chat_id]) > MAX_HISTORY: chat_history[local_chat_id] = chat_history[local_chat_id][-MAX_HISTORY:] payload = { "model": current_model, "messages": chat_history[local_chat_id], "temperature": 0.7, "max_tokens": 4096, "stream": False, "ttl": 300, } try: async with aiohttp.ClientSession() as session: async with session.post(URL, json=payload) as resp: if resp.status != 200: error_text = await resp.text() await message.reply( f"❌ Ошибка LM Studio: {resp.status} {error_text}" ) return data = await resp.json() reply_text = data["choices"][0]["message"]["content"] # Сохраняем ответ ассистента в историю chat_history[local_chat_id].append( { "role": "assistant", "content": [{"type": "text", "text": reply_text}], } ) # Ограничиваем снова if len(chat_history[local_chat_id]) > MAX_HISTORY: chat_history[local_chat_id] = chat_history[local_chat_id][-MAX_HISTORY:] # Делим сообщение на части по 4000 символов MAX_LEN = 4000 for i in range(0, len(reply_text), MAX_LEN): chunk = reply_text[i:i + MAX_LEN] msg = await bot.send_message(chat_id=target_chat_id, text=f"🤖 Ответ (модель: {current_model}):\n{chunk}") save_message(msg.chat.id, msg.message_id) except Exception as e: logger.error(f"Ошибка при запросе к LM Studio: {e}") await message.reply(f"❌ Ошибка при запросе к LM Studio: {e}") @dp.message(Command("clear")) async def clear(message: Message): chat_history.pop(message.chat.id, None) await message.reply("✅ История диалога очищена") # Создаем простые фильтры для callback async def model_callback_filter(callback_query: CallbackQuery) -> bool: return callback_query.data.startswith("model:") async def refresh_models_filter(callback_query: CallbackQuery) -> bool: return callback_query.data == "refresh_models" async def show_current_filter(callback_query: CallbackQuery) -> bool: return callback_query.data == "show_current" async def show_image_models_filter(callback_query: CallbackQuery) -> bool: return callback_query.data == "show_image_models" @dp.callback_query(model_callback_filter) async def select_model_callback(callback_query: CallbackQuery): """Обработка выбора модели из инлайн-клавиатуры""" global current_model, model_hash_map model_hash = callback_query.data.split(":", 1)[1] # Пытаемся загрузить из файла, если в памяти нет if not model_hash_map: try: with open("model_hash_map.json", "r") as f: model_hash_map = json.load(f) except Exception as e: logger.error(f"Ошибка загрузки model_hash_map из файла: {e}") # Получаем полное название модели из карты if model_hash not in model_hash_map: await callback_query.answer("❌ Ошибка: модель не найдена в кэше. Используйте /models для обновления списка", show_alert=True) return model_id = model_hash_map[model_hash] old_model = current_model current_model = model_id # Сообщаем о поддержке изображений image_support = "🖼️ поддерживает изображения" if supports_images(model_id) else "📝 только текст" # Обновляем сообщение try: await callback_query.message.edit_text( f"✅ Модель успешно изменена! ({image_support})\n\n" f"📊 Старая модель:\n{old_model}\n\n" f"📈 Новая модель:\n{current_model}\n\n" f"Для просмотра всех моделей используйте /models", parse_mode="HTML" ) except Exception as e: logger.error(f"Ошибка при редактировании сообщения: {e}") # Отправляем подтверждение await callback_query.answer(f"✅ Модель изменена на:\n{truncate_model_name(model_id, 30)}") @dp.callback_query(refresh_models_filter) async def refresh_models_callback(callback_query: CallbackQuery): """Обновить список моделей""" await list_models(callback_query.message) await callback_query.answer("🔄 Список моделей обновлен") @dp.callback_query(show_current_filter) async def show_current_callback(callback_query: CallbackQuery): """Показать текущую модель""" global current_model image_support = "🖼️ Поддерживает изображения" if supports_images(current_model) else "📝 Только текст" await callback_query.answer(f"Текущая модель: {current_model}\n{image_support}", show_alert=True) @dp.callback_query(show_image_models_filter) async def show_image_models_callback(callback_query: CallbackQuery): """Показать модели с поддержкой изображений""" global model_hash_map if not model_hash_map: await callback_query.answer("❌ Список моделей пуст. Используйте /models", show_alert=True) return image_models = [model for model in model_hash_map.values() if supports_images(model)] if not image_models: await callback_query.answer("🖼️ Нет моделей с поддержкой изображений", show_alert=True) return models_text = "\n".join([f"• {model}" for model in image_models[:10]]) if len(image_models) > 10: models_text += f"\n... и еще {len(image_models) - 10} моделей" await callback_query.answer(f"🖼️ Модели с поддержкой изображений ({len(image_models)} шт.):\n{models_text}", show_alert=True)