from pyfiglet import Figlet f = Figlet(font='small') import os import importlib.util import re import inspect from dotenv import load_dotenv from module_interface import ModuleInterface, ModuleManager import asyncio from aiohttp import ClientWebSocketResponse from mipac.models.notification import NotificationNote from mipa.ext.commands.bot import Bot def load_modules(): module_dir = "modules" modules = [] for filename in os.listdir(module_dir): if filename.endswith(".py"): module_name = os.path.splitext(filename)[0] module_path = os.path.join(module_dir, filename) spec = importlib.util.spec_from_file_location(module_name, module_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) for attribute_name in dir(module): attribute = getattr(module, attribute_name) if inspect.isclass(attribute) and issubclass(attribute, ModuleInterface) and attribute != ModuleInterface: modules.append(attribute()) return modules class MiNya(Bot): def __init__(self): super().__init__() self.__modules = [] self.__manager = ModuleManager(self) async def _connect_channel(self): await self.router.connect_channel(['main']) async def on_reconnect(self, ws: ClientWebSocketResponse): await self._connect_channel() async def on_ready(self, ws: ClientWebSocketResponse): await self._connect_channel() print("[Main] Bot System Ready.") print("") print("[Main] Loading Modules...") print("") self.__modules = load_modules() print("") print(f"[Main] Loaded Modules : {len(self.__modules)}") print("") print("[Main] Registering Modules...") for module in self.__modules: self.__manager.register_module(module) print("") async def on_mention(self, notice: NotificationNote): print("[Main] Got Mentioned!") for _, module in self.__manager.modules.items(): if module.regex_pattern and re.match(module.regex_pattern, notice.note.content): print("[Main] Regex Matching Module found : ", module.name) if await module.execute_module(notice.note): break def main(): print(f.renderText('MiNya V1')) print('"MIsskey Networked Yet Ambitious"') print('') print("[Main] Minya V1 Bootstap...") print("[Main] Loading settings...") load_dotenv() instance = os.environ.get('BOT_INSTANCE') key = os.environ.get('BOT_KEY') if not (instance and key): print("[MAIN] ERR! please check .env or os environment!") return bot = MiNya() asyncio.run(bot.start(f'wss://{instance}/streaming', key)) if __name__ == "__main__": main()