HEX
Server: Apache
System: Linux eisbus 6.8.12-9-pve #1 SMP PREEMPT_DYNAMIC PMX 6.8.12-9 (2025-03-16T19:18Z) x86_64
User: www-data (33)
PHP: 8.2.29
Disabled: NONE
Upload Files
File: //usr/lib/confconsole/plugin.py
#!/usr/bin/python
import re
import os
import sys
import imp
import importlib.util
import importlib
from traceback import format_exc
from collections import OrderedDict

from types import ModuleType
from typing import Callable, Any, Union, Optional, Iterable
import typing


class PluginError(Exception):
    pass


class EventError(Exception):
    pass


class ModuleInterface(ModuleType):
    # this is a hack, we pretend all plugin modules are derived of this
    module_name: str

    def run(self) -> Optional[str]:
        ...

    def doOnce(self) -> None:
        ...


class EventManager(object):
    _handlers: dict[str, list[Callable[[], None]]]
    _events: set[str]

    ''' Object to handle event/handler interaction '''
    def __init__(self) -> None:
        self._handlers = {}
        self._events = set()

    def add_event(self, event: str) -> Callable[[], None]:
        ''' Adds event and returns callback function to `fire` event '''
        self._events.add(event)
        if event not in self._handlers:
            self._handlers[event] = []

        def fire() -> None:
            self.fire_event(event)

        fire.__doc__ = f' Function to fire the `{event}` event '
        return fire

    def add_handler(self, event: str, handler: Callable[[], None]) -> None:
        ''' Adds a handler to an event '''
        if event not in self._handlers:
            self._events.add(event)
            self._handlers[event] = []
        self._handlers[event].append(handler)

    def fire_event(self, event: str) -> None:
        ''' Fire event, calling all handlers in order '''
        if event not in self._events:
            return  # if event hasn't been registered, don't attempt to fire it

        if event not in self._handlers:
            return  # if event has no handlers, don't attempt to fire it

        for handler in self._handlers[event]:
            try:
                handler()  # handler passed no arguments; can change if needed
            except:  # TODO don't use bare except!
                sys.stderr.write('An Exception has occured within an event'
                                 ' handler whilst attempting to handle event'
                                 f' "{event}"\n{format_exc()}')


class Plugin(object):
    ''' Object that holds various information about a `plugin` '''

    parent: Optional[str]

    def __init__(self, path: str, module_globals: dict[str, Any]) -> None:
        self.path = path
        # for weighted ordering
        self.real_name = os.path.basename(path)
        # for menu entry
        self.name = re.sub(r'^[\d]*', '', self.real_name).replace('_', ' ')

        self.parent = None

        # used for imp.find_module
        self.module_name = os.path.splitext(self.real_name)[0]

        spec = importlib.util.spec_from_file_location(self.module_name,
                                                      self.path)
        assert spec is not None
        assert spec.loader is not None
        self.module = typing.cast(ModuleInterface,
                                  importlib.util.module_from_spec(spec))

        for k in module_globals:
            setattr(self.module, k, module_globals[k])
        setattr(self.module, 'PLUGIN_PATH', self.path)

        # XXX commented this line as it was causing me issues...
        # assert isinstance(spec.loader, importlib.abc.Loader)
        spec.loader.exec_module(self.module)

        # after module is found, it's safe to use pretty name
        self.module_name = os.path.splitext(self.name)[0]

    def updateGlobals(self, newglobals: dict[str, Any]) -> None:
        for k in newglobals:
            setattr(self.module, k, newglobals[k])

    def run(self) -> Optional[str]:
        assert hasattr(self.module, 'run')
        ret: Optional[str] = self.module.run()
        assert ret is None or isinstance(ret, str)

        # default behaviour is to go to previous
        # menu after exiting if not otherwise specified
        if hasattr(self, 'parent'):
            return ret or self.parent
        else:
            return ret or 'advanced'


class PluginDir(object):
    '''Object that mimics behaviour of a plugin but acts only as a menu node'''

    parent: Optional[str]
    plugins: list[Union[Plugin, 'PluginDir']]

    def __init__(self, path: str, module_globals: dict[str, Any]):
        self.path = path
        self.real_name = os.path.basename(path)
        self.name = re.sub(r'^[\d]*', '', self.real_name).replace('_', ' ')

        self.parent = None

        self.module_name = self.name

        self.module_globals = module_globals

        if os.path.isfile(os.path.join(path, 'description')):
            with open(os.path.join(path, 'description'), 'r') as fob:
                self.description = fob.read()
        else:
            self.description = ''

    def updateGlobals(self, newglobals: dict[str, Any]) -> None:
        self.module_globals.update(newglobals)

    def run(self) -> Optional[str]:
        items = []
        plugin_map: dict[str, Union[Plugin, PluginDir]] = {}
        for plugin in self.plugins:
            if isinstance(plugin, Plugin) and hasattr(plugin.module, 'run'):
                items.append((plugin.module_name.capitalize(),
                              str(plugin.module.__doc__)))
                plugin_map[plugin.module_name.capitalize()] = plugin
            elif isinstance(plugin, PluginDir):
                items.append((plugin.module_name.capitalize(),
                              plugin.description))
                plugin_map[plugin.module_name.capitalize()] = plugin

        retcode, choice = self.module_globals['console'].menu(
                self.module_name.capitalize(),
                self.module_name.capitalize()+'\n', items, no_cancel=False)

        if retcode != 'ok':
            if not self.parent:
                return 'advanced'
            else:
                return self.parent

        if choice in plugin_map:
            return plugin_map[choice].path
        else:
            v: str = '_adv_' + choice.lower()
            return v


class PluginManager(object):
    ''' Object that holds various information about multiple `plugins` '''

    path_map: OrderedDict[str, Union[Plugin, PluginDir]] = OrderedDict()

    def __init__(self, path: str, module_globals: dict[str, Any]) -> None:
        path = os.path.realpath(path)  # Just in case
        path_map: dict[str, Union[Plugin, PluginDir]] = {}

        module_globals.update({
            'impByName': lambda *a, **k: self.impByName(*a, **k),
            'impByDir': lambda *a, **k: self.impByDir(*a, **k),
            'impByPath': lambda *a, **k: self.impByPath(*a, **k),
        })

        self.module_globals = module_globals

        if not os.path.isdir(path):
            raise PluginError('Plugin directory "{}" does not exist!'
                              ''.format(path))

        for root, dirs, files in os.walk(path):
            for file_name in files:
                if not file_name.endswith('.py'):
                    continue

                file_path = os.path.join(root, file_name)
                if os.path.isfile(file_path):
                    if not os.stat(file_path).st_mode & 0o111 == 0:
                        path_map[file_path] = Plugin(file_path, module_globals)

            for dir_name in dirs:
                if dir_name == '__pycache__':
                    continue
                dir_path = os.path.join(root, dir_name)

                if os.path.isdir(dir_path):
                    path_map[dir_path] = PluginDir(dir_path, module_globals)

        for key in path_map:
            plugin = path_map[key]
            if isinstance(plugin, Plugin) and hasattr(plugin.module, 'doOnce'):
                # Run plugin init
                plugin.module.doOnce()
        self.path_map = OrderedDict(sorted(path_map.items(),
                                           key=lambda x: x[0]))

        for key in self.path_map:
            if os.path.isdir(key):
                sub_plugins = self.getByDir(key)
                for plugin in sub_plugins:
                    plugin.parent = key
                v = self.path_map[key]
                assert isinstance(v, PluginDir)
                v.plugins = list(sub_plugins)

    def updateGlobals(self, newglobals: dict[str, Any]) -> None:
        for path, plugin in self.path_map.items():
            plugin.updateGlobals(newglobals)
            # self.module_globals.update(newglobals)

    def getByName(self, name: str) -> Iterable[Union[Plugin, PluginDir]]:
        ''' Return list of plugin objects matching given name '''
        return filter(lambda x: x.module_name == name, self.path_map.values())

    def getByDir(self, path: str) -> Iterable[Union[Plugin, PluginDir]]:
        ''' Return a list of plugin objects in given directory '''
        plugins = []
        for path_key in self.path_map:
            if os.path.dirname(path_key) == path:
                plugins.append(self.path_map[path_key])
        return plugins

    def getByPath(self, path: str) -> Optional[Union[Plugin, PluginDir]]:
        ''' Return plugin object with exact given path or None'''
        return self.path_map.get(path, None)

    # -- Used by plugins
    def impByName(self, name: str) -> Iterable[ModuleInterface]:
        '''Return a list of python modules (from plugins excluding PluginDirs)
        matching given name'''

        modules = [x.module for x in
                   self.getByName(name) if isinstance(x, Plugin)]

        return filter(None, modules)

    def impByDir(self, path: str) -> Iterable[ModuleInterface]:
        '''Return a list of python modules (from plugins excluding PluginDirs)
        in given directory'''

        modules = [x.module for x in
                   self.getByDir(path) if isinstance(x, Plugin)]

        return filter(None, modules)

    def impByPath(self, path: str) -> Optional[ModuleInterface]:
        ''' Return a python module from plugin at given path or None '''
        out = self.getByPath(path)
        if out and isinstance(out, Plugin):
            return out.module
        return None

# em = EventManager()
# pm = PluginManager('plugins.d', {'eventManager': em})

# for plugin in pm.plugins:
#     print '\nRunning:', plugin.path, '\n','-'*10
#     plugin.module.run()