HEX
Server: Apache
System: Linux eisbus 6.8.12-9-pve #1 SMP PREEMPT_DYNAMIC PMX 6.8.12-9 (2025-03-16T19:18Z) x86_64
User: www-data (33)
PHP: 8.2.29
Disabled: NONE
Upload Files
File: //usr/bin/confconsole
#!/usr/bin/python3
# Copyright (c) 2008 Alon Swartz <alon@turnkeylinux.org> - all rights reserved
"""TurnKey Configuration Console

Options:
    -h, --help           Display this help and exit
        --usage          Display usage screen without Advanced Menu
        --nointeractive  Do not display interactive dialog
        --plugin=<name>  Run plugin directly

"""

import os
import sys
import subprocess
from subprocess import CalledProcessError
import getopt
import shlex
from string import Template
from io import StringIO
import traceback

import dialog
from dialog import DialogError
import traceback
import netinfo

import ipaddr
import ifutil
import conf
import plugin

from typing import NoReturn, Optional, Iterable, Any, Union
import typing

USAGE: str = __doc__
PLUGIN_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)),
                           'plugins.d')


class Error(Exception):
    pass


def fatal(e: str) -> NoReturn:
    print("error:", e, file=sys.stderr)
    sys.exit(1)


def usage(e: Optional[str] = None) -> NoReturn:
    if e:
        print("Error:", e, file=sys.stderr)

    print(f"Syntax: {sys.argv[0]}", file=sys.stderr)
    print(USAGE.strip(), file=sys.stderr)
    sys.exit(1)


def format_fields(fields: Iterable[tuple[str, str, int, int]]
                  ) -> list[tuple[str, int, int, str, int, int, int, int]]:
    '''Takes fields in format (label, field, label_length, field_length) and
    outputs fields in format (label, ly, lx, item, iy, ix, field_length,
    input_length)
    '''
    out = []
    for i, (label, field, l_length, f_length) in enumerate(fields):
        out.append((label, i+1, 1, field, i+1, l_length+1, l_length, f_length))
    return out


WrapperReturn = Union[str, tuple[str, str]]


class Console:
    def __init__(self, title: Optional[str] = None,
                 width: int = 60, height: int = 20):
        self.width = width
        self.height = height

        self.console = dialog.Dialog(dialog="dialog")
        self.console.add_persistent_args(["--no-collapse"])
        self.console.add_persistent_args(["--ok-label", "Select"])
        self.console.add_persistent_args(["--cancel-label", "Back"])
        self.console.add_persistent_args(["--colors"])
        if conf.Conf().copy_paste:
            self.console.add_persistent_args(["--no-mouse"])
        if title:
            self.console.add_persistent_args(["--backtitle", title])

    def _handle_exitcode(self, retcode: str) -> bool:
        if retcode == 'esc':
            text = "Do you really want to quit?"
            if self.console.yesno(text) == self.console.OK:
                sys.exit(0)
            return False
        return True

    def _wrapper(self, dialog: str, text: str, *args: Any, **kws: Any
                 ) -> WrapperReturn:
        try:
            method = getattr(self.console, dialog)
        except AttributeError:
            raise Error("dialog not supported: " + dialog)

        ret: WrapperReturn

        while 1:
            try:
                ret = method("\n" + text, *args, **kws)
            except DialogError as e:
                if "Can't make new window" in e.message:
                    self.console.msgbox(
                        "Terminal too small for UI, resize terminal and"
                        " press OK",
                        ok_label='OK'
                    )
                    continue
                else:
                    raise

            if type(ret) is str:
                retcode = ret
            else:
                retcode = ret[0]

            if self._handle_exitcode(retcode):
                break

        return ret

    def infobox(self, text: str) -> str:
        v = self._wrapper("infobox", text)
        assert isinstance(v, str)
        return v

    def yesno(self, text: str, autosize: bool = False) -> str:
        if autosize:
            text += '\n '
            height, width = 0, 0
        else:
            height, width = 10, 30
        v = self._wrapper("yesno", text, height, width)
        assert isinstance(v, str)
        return v

    def msgbox(self, title: str, text: str, button_label: str = "ok",
               autosize: bool = False) -> str:
        if autosize:
            text += '\n '
            height, width = 0, 0
        else:
            height, width = self.height, self.width

        v = self._wrapper("msgbox", text, height, width,
                          title=title, ok_label=button_label)
        assert isinstance(v, str)
        return v

    def inputbox(self, title: str, text: str, init: str = '',
                 ok_label: str = "OK", cancel_label: str = "Cancel"
                 ) -> tuple[str, str]:
        no_cancel = True if cancel_label == "" else False
        v = self._wrapper("inputbox", text, self.height, self.width,
                          title=title, init=init, ok_label=ok_label,
                          cancel_label=cancel_label, no_cancel=no_cancel)
        assert isinstance(v, tuple)
        return v

    def menu(self, title: str, text: str, choices: list[tuple[str, str]],
             no_cancel: bool = False) -> tuple[str, str]:
        v = self._wrapper("menu", text, self.height, self.width,
                          menu_height=len(choices)+1,
                          title=title, choices=choices, no_cancel=no_cancel)
        assert isinstance(v, tuple)
        return v

    def form(self, title: str, text: str,
             fields: list[tuple[str, int, int, str, int, int, int, int]],
             ok_label: str = "Apply",
             cancel_label: str = "Cancel",
             autosize: bool = False) -> tuple[str, list[str]]:
        if autosize:
            text += '\n '
            height, width = 0, 0
        else:
            height, width = self.height, self.width
        v = self._wrapper("form", text, fields,
                          height=height, width=width,
                          form_height=len(fields)+1,
                          title=title,
                          ok_label=ok_label,
                          cancel_label=cancel_label)
        assert isinstance(v, tuple)
        assert isinstance(v[1], list)
        return v


class Installer:
    def __init__(self, path: str):
        self.path: str = path
        self.available: bool = self._is_available()

    def _is_available(self) -> bool:
        if not os.path.exists(self.path):
            return False

        with open('/proc/cmdline') as fob:
            return 'boot=live' in fob.readline().split()

    def execute(self) -> None:
        if not self.available:
            raise Error("installer is not available to be executed")

        subprocess.run([self.path])


class TurnkeyConsole:
    OK = 'ok'
    CANCEL = 1

    def __init__(self, pluginManager: plugin.PluginManager,
                 eventManager: plugin.EventManager,
                 advanced_enabled: bool = True):
        title = "TurnKey GNU/Linux Configuration Console"
        self.width = 60
        self.height = 20

        self.console = Console(title, self.width, self.height)
        self.appname = f"TurnKey GNU/Linux {netinfo.get_hostname().upper()}"

        self.installer = Installer(path='/usr/bin/di-live')

        self.advanced_enabled = advanced_enabled

        # self.eventManager = plugin.EventManager()
        # self.pluginManager = plugin.PluginManager(
        #     PLUGIN_PATH,
        #     {'eventManager': self.eventManager, 'console': self.console})

        self.eventManager = eventManager
        self.pluginManager = pluginManager
        self.pluginManager.updateGlobals({'console': self.console})

    @staticmethod
    def _get_filtered_ifnames() -> list[str]:
        ifnames = []
        for ifname in netinfo.get_ifnames():
            if ifname.startswith(('lo', 'tap', 'br', 'natbr', 'tun',
                                  'vmnet', 'veth', 'wmaster')):
                continue
            ifnames.append(ifname)

        # handle bridged LXC where br0 is the default outward-facing interface
        defifname = conf.Conf().default_nic
        if defifname and defifname.startswith('br'):
            ifnames.append(defifname)
            bridgedif = subprocess.check_output(
                        ['brctl', 'show', defifname],
                        text=True).split('\n')[1].split('\t')[-1]
            ifnames.remove(bridgedif)

        ifnames.sort()
        return ifnames

    @classmethod
    def _get_default_nic(cls) -> Optional[str]:
        def _validip(ifname: str) -> bool:
            ip = ifutil.get_ipconf(ifname)[0]
            if ip and not ip.startswith('169'):
                return True
            return False

        defifname = conf.Conf().default_nic
        if defifname and _validip(defifname):
            return defifname

        for ifname in cls._get_filtered_ifnames():
            if _validip(ifname):
                return ifname

        return None

    @classmethod
    def _get_public_ipaddr(cls) -> Optional[str]:
        publicip_cmd = conf.Conf().publicip_cmd
        if publicip_cmd:
            command = subprocess.run(shlex.split(publicip_cmd),
                                     capture_output=True,
                                     text=True)
            if command.returncode == 0:
                return command.stdout.strip()

        return None

    def _get_advmenu(self
                     ) -> tuple[list[tuple[str, str]],
                                dict[str, Union[plugin.Plugin,
                                     plugin.PluginDir]]]:
        items = []
        if conf.Conf().networking:
            items.append(("Networking", "Configure appliance networking"))

        if self.installer.available:
            items.append(("Install", "Install to hard disk"))

        plugin_map = {}

        for path in self.pluginManager.path_map:
            plug = self.pluginManager.path_map[path]
            if os.path.dirname(path) == PLUGIN_PATH:
                if isinstance(plug, plugin.Plugin) and hasattr(plug.module,
                                                               'run'):
                    items.append((plug.module_name.capitalize(),
                                  str(plug.module.__doc__)))
                elif isinstance(plug, plugin.PluginDir):
                    items.append((plug.module_name.capitalize(),
                                  plug.description))
                plugin_map[plug.module_name.capitalize()] = plug

        items.append(("Reboot", "Reboot the appliance"))
        items.append(("Shutdown", "Shutdown the appliance"))
        items.append(("Quit", "Quit the configuration console"))

        return items, plugin_map

    def _get_netmenu(self) -> list[tuple[str, str]]:
        menu = []
        for ifname in self._get_filtered_ifnames():
            addr = ifutil.get_ipconf(ifname)[0]
            ifmethod = ifutil.get_ifmethod(ifname)

            if addr:
                desc = addr
                if ifmethod:
                    desc += f" ({ifmethod})"

                if ifname == self._get_default_nic():
                    desc += " [*]"
            else:
                desc = "not configured"

            menu.append((ifname, desc))

        return menu

    def _get_ifconfmenu(self, ifname: str) -> list[tuple[str, str]]:
        menu = []
        menu.append(("DHCP", "Configure networking automatically"))
        menu.append(("StaticIP", "Configure networking manually"))

        if not ifname == self._get_default_nic() and \
           len(self._get_filtered_ifnames()) > 1 and \
           ifutil.get_ipconf(ifname)[0] is not None:
            menu.append(("Default", "Show this adapter's IP address in Usage"))

        return menu

    def _get_ifconftext(self, ifname: str) -> str:
        addr, netmask, gateway, nameservers = ifutil.get_ipconf(ifname)
        if addr is None:
            return "Network adapter is not configured\n"

        text = f"IP Address:      {addr}\n"
        text += f"Netmask:         {netmask}\n"
        text += f"Default Gateway: {gateway}\n"
        text += f"Name Server(s):  {' '.join(nameservers)}\n\n"

        ifmethod = ifutil.get_ifmethod(ifname)
        if ifmethod:
            text += f"Networking configuration method: {ifmethod}\n"

        if len(self._get_filtered_ifnames()) > 1:
            text += "Is this adapter's IP address displayed in Usage: "
            if ifname == self._get_default_nic():
                text += "yes\n"
            else:
                text += "no\n"

        return text

    def usage(self) -> str:
        if self.advanced_enabled:
            default_button_label = "Advanced Menu"
            default_return_value = "advanced"
        else:
            default_button_label = "Quit"
            default_return_value = "quit"

        # if no interfaces at all - display error and go to advanced
        if len(self._get_filtered_ifnames()) == 0:
            error = "No network adapters detected"
            if not self.advanced_enabled:
                fatal(error)

            self.console.msgbox("Error", error)
            return "advanced"

        # if interfaces but no default - display error and go to networking
        ifname = self._get_default_nic()
        if not ifname:
            error = "Networking is not yet configured"
            if not self.advanced_enabled:
                fatal(error)

            self.console.msgbox("Error", error)
            return "networking"

        # tklbam integration
        tklbamstatus_cmd = subprocess.run(['which', 'tklbam-status'],
                                          capture_output=True,
                                          text=True).stdout.strip()
        if tklbamstatus_cmd:
            tklbam_status = subprocess.run([tklbamstatus_cmd, "--short"],
                                           capture_output=True,
                                           text=True).stdout
        else:
            tklbam_status = ("TKLBAM not found - please check that it's"
                             " installed.")

        # display usage
        ip_addr = self._get_public_ipaddr()
        if not ip_addr:
            ip_addr = ifutil.get_ipconf(ifname)[0]

        hostname = netinfo.get_hostname().upper()

        try:
            with open(conf.path('services.txt'), 'r') as fob:
                t = fob.read().rstrip()
        except conf.Error:
            t = ""
        text = Template(t).substitute(ipaddr=ip_addr)

        text += f"\n\n{tklbam_status}\n\n"
        text += "\n" * (self.height - len(text.splitlines()) - 7)
        text += "         TurnKey Backups and Cloud Deployment\n"
        text += "             https://hub.turnkeylinux.org"

        retcode = self.console.msgbox(f"{hostname} appliance services",
                                      text,
                                      button_label=default_button_label)

        if retcode is not self.OK:
            self.running = False

        return default_return_value

    def advanced(self) -> str:
        # dont display cancel button when no interfaces at all
        no_cancel = False
        if len(self._get_filtered_ifnames()) == 0:
            no_cancel = True

        items, plugin_map = self._get_advmenu()

        retcode, choice = self.console.menu("Advanced Menu",
                                            self.appname + " Advanced Menu\n",
                                            items,
                                            no_cancel=no_cancel)

        if retcode is not self.OK:
            return "usage"

        if choice in plugin_map:
            return plugin_map[choice].path

        return "_adv_" + choice.lower()

    def networking(self) -> str:
        ifnames = self._get_filtered_ifnames()

        # if no interfaces at all - display error and go to advanced
        if len(ifnames) == 0:
            self.console.msgbox("Error", "No network adapters detected")
            return "advanced"

        # if only 1 interface, dont display menu - just configure it
        if len(ifnames) == 1:
            self.ifname = ifnames[0]
            return "ifconf"

        # display networking
        text = "Choose network adapter to configure\n"
        if self._get_default_nic():
            text += "[*] This adapter's IP address is displayed in Usage"

        retcode, self.ifname = self.console.menu("Networking configuration",
                                                 text, self._get_netmenu())

        if retcode is not self.OK:
            return "advanced"

        return "ifconf"

    def ifconf(self) -> str:
        retcode, choice = self.console.menu(f"{self.ifname} configuration",
                                            self._get_ifconftext(self.ifname),
                                            self._get_ifconfmenu(self.ifname))

        if retcode is not self.OK:
            # if multiple interfaces go back to networking
            if len(self._get_filtered_ifnames()) > 1:
                return "networking"

            return "advanced"

        return "_ifconf_" + choice.lower()

    def _ifconf_staticip(self) -> str:
        def _validate(addr: str, netmask: str, gateway: str, nameservers:
                      list[str]) -> list[str]:
            """Validate Static IP form parameters. Returns an empty array on
               success, an array of strings describing errors otherwise"""

            errors = []
            if not addr:
                errors.append("No IP address provided")
            elif not ipaddr.is_legal_ip(addr):
                errors.append(f"Invalid IP address: {addr}")

            if not netmask:
                errors.append("No netmask provided")
            elif not ipaddr.is_legal_ip(netmask):
                errors.append(f"Invalid netmask: {netmask}")

            for nameserver in nameservers:
                if nameserver and not ipaddr.is_legal_ip(nameserver):
                    errors.append(f"Invalid nameserver: {nameserver}")

            if len(nameservers) != len(set(nameservers)):
                errors.append("Duplicate nameservers specified")

            if errors:
                return errors

            if gateway:
                if not ipaddr.is_legal_ip(gateway):
                    return [f"Invalid gateway: {gateway}"]
                else:
                    iprange = ipaddr.IPRange(addr, netmask)
                    if gateway not in iprange:
                        return [f"Gateway ({gateway}) not in IP range"
                                f" ({iprange})"]
            return []

        warnings = []
        addr = None
        netmask = None
        gateway = None
        nameservers = None
        try:
            addr, netmask, gateway, nameservers \
                    = ifutil.get_ipconf(self.ifname, True)
        except CalledProcessError:
            warnings.append('`route -n` returned non-0 exit code! (unable to'
                            ' get gateway)')
        except netinfo.NetInfoError:
            warnings.append('failed to find default gateway!')
            addr, netmask, gateway, nameservers = ifutil.get_ipconf(
                    self.ifname, False)

        if addr is None:
            warnings.append('failed to assertain current address!')
            addr = ''
        if netmask is None:
            warnings.append('failed to assertain current netmask!')
            netmask = ''
        if gateway is None:
            gateway = ''
        if nameservers is None:
            nameservers = []

        if warnings:
            warnings.append('\nWill leave relevant fields blank')

        if warnings:
            self.console.msgbox("Warning", '\n'.join(warnings))

        value = [addr, netmask, gateway]
        value.extend(nameservers)

        # include minimum 2 nameserver fields and 1 blank one
        if len(value) < 4:
            value.append('')

        if value[-1]:
            value.append('')

        field_width = 30
        field_limit = 15

        while 1:
            pre_fields: list[tuple[str, str, int, int]] = [
                ("IP Address", value[0], field_width, field_limit),
                ("Netmask", value[1], field_width, field_limit),
                ("Default Gateway", value[2], field_width, field_limit),
            ]

            for i in range(len(value[3:])):
                pre_fields.append(("Name Server", value[3+i],
                                   field_width, field_limit))

            fields: list[tuple[str, int, int, str, int, int, int, int]
                         ] = format_fields(pre_fields)
            text = f"Static IP configuration ({self.ifname})"
            retcode, input = self.console.form("Network settings",
                                               text, fields)

            if retcode is not self.OK:
                break

            # remove any whitespaces the user might of included
            for i in range(len(input)):
                input[i] = input[i].strip()

            # unconfigure the nic if all entries are empty
            if not input[0] and not input[1] and not input[2] and not input[3]:
                ifutil.unconfigure_if(self.ifname)
                break

            addr, netmask, gateway = input[:3]
            nameservers = input[3:]
            for i in range(nameservers.count('')):
                nameservers.remove('')

            err_parts = _validate(addr, netmask, gateway, nameservers)
            if err_parts:
                err: str = "\n".join(err_parts)
                self.console.msgbox("Error", err)
            else:
                in_ssh = 'SSH_CONNECTION' in os.environ
                if not in_ssh or (in_ssh and self.console.yesno(
                        "Warning: Changing ip while an ssh session is active"
                        " will drop said ssh session!",
                        autosize=True) == self.OK):
                    maybe_err: Optional[str] = ifutil.set_static(
                            self.ifname, addr, netmask, gateway, nameservers)
                    if maybe_err is None:
                        break
                    self.console.msgbox("Error", maybe_err)
                else:
                    break

        return "ifconf"

    def _ifconf_dhcp(self) -> str:
        in_ssh = 'SSH_CONNECTION' in os.environ
        if not in_ssh or (in_ssh and self.console.yesno(
                "Warning: Changing ip while an ssh session is active will"
                " drop said ssh session!", autosize=True) == self.OK):
            self.console.infobox(f"Requesting DHCP for {self.ifname}...")
            err = ifutil.set_dhcp(self.ifname)
            if err:
                self.console.msgbox("Error", err)

        return "ifconf"

    def _ifconf_default(self) -> str:
        conf.Conf().set_default_nic(self.ifname)
        return "ifconf"

    def _adv_install(self) -> str:
        text = "Please note that any changes you may have made to the\n"
        text += "live system will *not* be installed to the hard disk.\n\n"
        self.console.msgbox("Installer", text)

        self.installer.execute()
        return "advanced"

    def _shutdown(self, text: str, opt: str) -> str:
        if self.console.yesno(text) == self.OK:
            self.running = False
            cmd = f"shutdown {opt} now"
            fgvt = os.environ.get("FGVT")
            if fgvt:
                cmd = f"chvt {fgvt}; " + cmd
            os.system(cmd)

        return "advanced"

    def _adv_reboot(self) -> str:
        return self._shutdown("Reboot the appliance?", "-r")

    def _adv_shutdown(self) -> str:
        return self._shutdown("Shutdown the appliance?", "-h")

    def _adv_quit(self) -> str:
        if not self.advanced_enabled:
            self.running = False
            return "usage"

        if self.console.yesno("Do you really want to quit?",
                              autosize=True) == self.OK:
            self.running = False

        return "advanced"

    _adv_networking = networking
    quit = _adv_quit

    def loop(self, dialog: str = "usage") -> None:
        self.running = True
        prev_dialog = dialog
        standalone = dialog != 'usage'  # no "back" for plugins

        while dialog and self.running:
            try:
                if not dialog.startswith(PLUGIN_PATH):
                    try:
                        method = getattr(self, dialog)
                    except AttributeError:
                        raise Error("dialog not supported: " + dialog)
                else:
                    try:
                        method = self.pluginManager.path_map[dialog].run
                    except KeyError:
                        raise Error("could not find plugin dialog: " + dialog)

                new_dialog = method()
                if standalone:  # XXX This feels dirty
                    break
                prev_dialog = dialog
                dialog = new_dialog

            except Exception as e:
                sio = StringIO()
                traceback.print_exc(file=sio)

                self.console.msgbox("Caught exception", sio.getvalue())
                dialog = prev_dialog


def main() -> None:
    interactive = True
    advanced_enabled = True
    plugin_name = None

    if os.geteuid() != 0:
        fatal("confconsole needs root privileges to run")

    try:
        l_opts = ["help", "usage", "nointeractive", "plugin="]
        opts, args = getopt.gnu_getopt(sys.argv[1:], "hn", l_opts)
    except getopt.GetoptError as e:
        usage(str(e))

    for opt, val in opts:
        if opt in ("-h", "--help"):
            usage()
        elif opt == "--usage":
            advanced_enabled = False
        elif opt == "--nointeractive":
            interactive = False
        elif opt == "--plugin":
            plugin_name = val
        else:
            usage()

    em = plugin.EventManager()
    pm = plugin.PluginManager(PLUGIN_PATH,
                              {'eventManager': em, 'interactive': interactive})

    if plugin_name:

        ps = list(filter(lambda x: isinstance(x, plugin.Plugin),
                         pm.getByName(plugin_name)))

        if len(ps) > 1:
            fatal(f'plugin name ambiguous, matches all of {ps}')
        elif len(ps) == 1:
            p = ps[0]

            if interactive:
                tc = TurnkeyConsole(pm, em, advanced_enabled)
                tc.loop(dialog=p.path)  # calls .run()
            else:
                assert isinstance(p, plugin.Plugin)
                p.module.run()
        else:
            fatal('no such plugin')
    else:
        tc = TurnkeyConsole(pm, em, advanced_enabled)
        tc.loop()


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        subprocess.run(['stty', 'sane'])
        traceback.print_exc()