HEX
Server: Apache
System: Linux eisbus 6.8.12-9-pve #1 SMP PREEMPT_DYNAMIC PMX 6.8.12-9 (2025-03-16T19:18Z) x86_64
User: www-data (33)
PHP: 8.2.29
Disabled: NONE
Upload Files
File: //usr/lib/confconsole/ifutil.py
# Copyright (c) 2008 Alon Swartz <alon@turnkeylinux.org> - all rights reserved
# Copyright (c) 2022 TurnKey GNU/Linux <admin@turnkeylinux.org>

import os
from time import sleep
import subprocess
from typing import Optional

from netinfo import InterfaceInfo
from netinfo import get_hostname


class IfError(Exception):
    pass


class EtcNetworkInterfaces:
    """class for controlling /etc/network/interfaces

    An error will be raised if the interfaces file does not include the
    header: # UNCONFIGURED INTERFACES (in other words, we will not override
    any customizations)
    """

    CONF_FILE = '/etc/network/interfaces'
    HEADER_UNCONFIGURED = "# UNCONFIGURED INTERFACES"

    def __init__(self) -> None:
        self.read_conf()

    def read_conf(self) -> None:
        self.conf: dict[str, str] = {}
        self.unconfigured = False

        ifname = None
        with open(self.CONF_FILE) as fob:
            for line in fob:
                line = line.rstrip()

                if line == self.HEADER_UNCONFIGURED:
                    self.unconfigured = True

                if not line or line.startswith("#"):
                    continue

                if line.startswith("auto") or line.startswith("allow-hotplug"):
                    ifname = line.split()[1]
                    self.conf[ifname] = line + "\n"
                elif ifname:
                    self.conf[ifname] += line + "\n"

    def _get_iface_opts(self, ifname: str) -> list[str]:
        iface_opts = ('pre-up', 'up', 'post-up',
                      'pre-down', 'down', 'post-down')
        if ifname not in self.conf:
            return []

        ifconf = self.conf[ifname]
        return [line.strip()
                for line in ifconf.splitlines()
                if line.strip().split()[0] in iface_opts]

    def _get_bridge_opts(self, ifname: str) -> list[str]:
        bridge_opts = ('bridge_ports', 'bridge_ageing', 'bridge_bridgeprio',
                       'bridge_fd', 'bridge_gcinit', 'bridge_hello',
                       'bridge_hw', 'bridge_maxage', 'bridge_maxwait',
                       'bridge_pathcost', 'bridge_portprio', 'bridge_stp',
                       'bridge_waitport')
        if ifname not in self.conf:
            return []

        ifconf = self.conf[ifname]
        return [line.strip()
                for line in ifconf.splitlines()
                if line.strip().split()[0] in bridge_opts]

    def write_conf(self, ifname: str, ifconf: str) -> None:
        self.read_conf()
        if not self.unconfigured:
            raise IfError(f"refusing to write to {self.CONF_FILE}\n"
                          f"header not found: {self.HEADER_UNCONFIGURED}")

        # carry over previously defined bridge options
        ifconf += "\n" + "\n".join(["    " + opt
                                   for opt in self._get_bridge_opts(ifname)])

        # carry over previously defined interface options
        ifconf += "\n" + "\n".join(["    " + opt
                                    for opt in self._get_iface_opts(ifname)])

        with open(self.CONF_FILE, "w") as fob:
            fob.write(self.HEADER_UNCONFIGURED+'\n')
            fob.write("# remove the above line if you edit this file")

            for iface in self.conf.keys():
                if iface:
                    fob.write('\n\n')
                    if iface == ifname:
                        fob.writelines(ifconf.rstrip())
                    else:
                        fob.writelines(self.conf[iface].rstrip())
            fob.write('\n')

    @staticmethod
    def _preproc_if(ifname_conf: str) -> list[str]:
        lines = ifname_conf.splitlines()
        new_lines = []
        hostname = get_hostname()
        for line in lines:
            _line = line.lstrip()
            if (_line.startswith('allow-hotplug')
                    or _line.startswith('auto')
                    or _line.startswith('iface')
                    or _line.startswith('wpa-conf')):
                new_lines.append(line)
            elif _line.startswith('hostname'):
                if hostname:
                    new_lines.append(f'    hostname {hostname}')
                else:
                    continue
            elif (_line.startswith('address')
                    or _line.startswith('netmask')
                    or _line.startswith('gateway')
                    or _line.startswith('dns-nameserver')):
                continue
            else:
                raise IfError(f'Unexpected config line: {line}')
        if len(new_lines) == 2 and hostname:
            new_lines.append(f'    hostname {hostname}')
        return new_lines

    def set_dhcp(self, ifname: str) -> None:
        ifconf = self._preproc_if(self.conf[ifname])
        ifconf[1] = f'iface {ifname} inet dhcp'

        ifconf_str = "\n".join(ifconf)
        self.write_conf(ifname, ifconf_str)

    def set_manual(self, ifname: str) -> None:
        ifconf = self._preproc_if(self.conf[ifname])
        ifconf[1] = f'iface {ifname} inet manual'
        ifconf_str = "\n".join(ifconf)
        self.write_conf(ifname, ifconf_str)

    def set_static(self, ifname: str, addr: str, netmask: str,
                   gateway: Optional[str] = None,
                   nameservers: Optional[list[str]] = None
                   ) -> None:
        ifconf = self._preproc_if(self.conf[ifname])
        ifconf[1] = f'iface {ifname} inet static'

        ifconf.extend([f"    address {addr}",
                       f"    netmask {netmask}"])
        if gateway:
            ifconf.append(f"    gateway {gateway}")
        if nameservers:
            ifconf.append(f"    dns-nameservers {' '.join(nameservers)}")

        ifconf_str = "\n".join(ifconf)
        self.write_conf(ifname, ifconf_str)


class EtcNetworkInterface:
    """enumerate interface information from /etc/network/interfaces"""

    def __init__(self, ifname: str):
        self.ifname = ifname

        interfaces = EtcNetworkInterfaces()

        self.conflines = []
        if ifname in interfaces.conf:
            self.conflines = interfaces.conf[ifname].splitlines()

    def _parse_attr(self, attr: str) -> list[str]:
        for line in self.conflines:

            vals = line.strip().split()
            if not vals:
                continue

            if vals[0] == attr:
                return vals

        return []

    @property
    def method(self) -> Optional[str]:
        try:
            return self._parse_attr('iface')[3]
        except IndexError:
            return None

    @property
    def dns_nameservers(self) -> list[str]:
        return self._parse_attr('dns-nameservers')[1:]

    def __getattr__(self, attrname: str) -> list[str]:
        # attributes with multiple values will be returned in an array
        # exception: dns-nameservers always returns in array (expected)

        attrname = attrname.replace('_', '-')
        values = self._parse_attr(attrname)
        if len(values) > 2:
            return values[1:]
        elif len(values) > 1:
            return [values[1]]

        return []


def get_nameservers(ifname: str) -> list[str]:

    # /etc/network/interfaces (static)
    interface = EtcNetworkInterface(ifname)
    if interface.dns_nameservers:
        return interface.dns_nameservers

    def parse_resolv(path: str) -> list[str]:
        nameservers = []
        with open(path, 'r') as fob:
            for line in fob:
                if line.startswith('nameserver'):
                    nameservers.append(line.strip().split()[1])
        return nameservers

    # resolvconf (dhcp)
    path = '/etc/resolvconf/run/interface'
    if os.path.exists(path):
        for f in os.listdir(path):
            if not f.startswith(ifname) or f.endswith('.inet'):
                continue

            nameservers = parse_resolv(os.path.join(path, f))
            if nameservers:
                return nameservers

    # /etc/resolv.conf (fallback)
    nameservers = parse_resolv('/etc/resolv.conf')
    if nameservers:
        return nameservers

    return []


def ifup(ifname: str) -> str:
    subprocess.run(["ifup", "--force", "--ignore-errors", ifname],
                   stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)


def ifdown(ifname: str) -> subprocess.CompletedProcess[bytes]:
    subprocess.run(["ifdown", "--force", "--ignore-errors", ifname],
                   stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)


def unconfigure_if(ifname: str) -> Optional[str]:
    try:
        ifdown(ifname)
        interfaces = EtcNetworkInterfaces()
        interfaces.set_manual(ifname)
        subprocess.check_output(['ifconfig', ifname, '0.0.0.0'])
        ifup(ifname)
        return None
    except subprocess.CalledProcessError as e:
        return str(e)


def set_static(ifname: str, addr: str, netmask: str,
               gateway: str, nameservers: list[str]
               ) -> Optional[str]:
    try:
        ifdown(ifname)
        interfaces = EtcNetworkInterfaces()
        interfaces.set_static(ifname, addr, netmask, gateway, nameservers)

        # FIXME when issue in ifupdown/virtio-net becomes apparent
        sleep(0.5)

        output = ifup(ifname)

        net = InterfaceInfo(ifname)
        if not net.address:
            raise IfError(f'Error obtaining IP address\n\n{output}')
        return None
    except Exception as e:
        return str(e)


def set_dhcp(ifname: str) -> Optional[str]:
    try:
        ifdown(ifname)
        interfaces = EtcNetworkInterfaces()
        interfaces.set_dhcp(ifname)
        output = ifup(ifname)

        net = InterfaceInfo(ifname)
        if not net.address:
            raise IfError(f'Error obtaining IP address\n\n{output}')
        return None
    except Exception as e:
        return str(e)


def get_ipconf(ifname: str, error: bool = False
               ) -> tuple[str, str, Optional[str], list[str]]:
    for i in range(6):
        net = InterfaceInfo(ifname)
        if net.address is not None and net.netmask is not None:
            gateway = net.get_gateway(error)
            return (net.address, net.netmask, gateway, get_nameservers(ifname))
        sleep(0.1)
    # no interface which is up; this is ok though
    return (None, None, net.get_gateway(error), get_nameservers(ifname))

def get_ifmethod(ifname: str) -> Optional[str]:
    interface = EtcNetworkInterface(ifname)
    return interface.method