HEX
Server: nginx/1.24.0
System: Linux webserver 6.8.0-87-generic #88-Ubuntu SMP PREEMPT_DYNAMIC Sat Oct 11 09:28:41 UTC 2025 x86_64
User: wpuser (1002)
PHP: 8.3.6
Disabled: NONE
Upload Files
File: //usr/lib/python3/dist-packages/cloudinit/net/networkd.py
# Copyright (C) 2021-2025 VMware by Broadcom.
#
# Author: Shreenidhi Shedi <yesshedi@gmail.com>
#
# This file is part of cloud-init. See LICENSE file for license information.

import logging
from typing import Any, Dict, List, Optional

from cloudinit import subp, util
from cloudinit.net import renderer, should_add_gateway_onlink_flag
from cloudinit.net.network_state import NetworkState

LOG = logging.getLogger(__name__)


def normalize(data: Dict[str, List[Any]]) -> Dict[str, List[Any]]:
    """
    Normalize a dictionary of lists.
    - Assumes top-level keys map to lists.
    - Each list and any nested dicts/lists will be recursively normalized.
    """
    normalized = {}
    for key, value in data.items():
        normalized[key] = _normalize_value(value)
    return normalized


def _normalize_value(data: Any) -> Any:
    """
    Recursively normalize a dictionary or list:
    - Dicts: keys sorted, values normalized
    - Lists: items normalized and sorted (if comparable)
    """
    if isinstance(data, dict):
        normalized = {}
        for key in sorted(data):
            normalized[key] = _normalize_value(data[key])
        return normalized
    elif isinstance(data, list):
        normalized_items = []
        for item in data:
            if isinstance(item, (dict, list)):
                normalized_item = _normalize_value(item)
            else:
                normalized_item = item
            normalized_items.append(normalized_item)
        try:
            return sorted(normalized_items)
        except TypeError:
            return normalized_items

    return data


class CfgParser:
    def __init__(self):
        self.conf_dict = {
            "Match": [],
            "Link": [],
            "Network": [],
            "DHCPv4": [],
            "DHCPv6": [],
            "Address": [],
            "Route": {},
            "NetDev": [],
            "VLAN": [],
            "Bond": [],
        }

    def update_section(self, sec, key, val):
        for k in self.conf_dict.keys():
            if k == sec:
                self.conf_dict[k].append(f"{key}={val}")
                self.conf_dict[k] = list(dict.fromkeys(self.conf_dict[k]))
                self.conf_dict[k].sort()

    def update_route_section(self, sec, rid, key, val):
        """
        For each route section we use rid as a key, this allows us to isolate
        this route from others on subsequent calls.
        """
        for k in self.conf_dict.keys():
            if k == sec:
                if rid not in self.conf_dict[k]:
                    self.conf_dict[k][rid] = []
                self.conf_dict[k][rid].append(f"{key}={val}")
                # remove duplicates from list
                self.conf_dict[k][rid] = list(
                    dict.fromkeys(self.conf_dict[k][rid])
                )
                self.conf_dict[k][rid].sort()

    def get_final_conf(self):
        contents = ""

        self.conf_dict = normalize(self.conf_dict)

        for k, v in sorted(self.conf_dict.items()):
            if not v:
                continue
            if k == "Address":
                for e in sorted(v):
                    contents += f"[{k}]\n{e}\n\n"
            elif k == "Route":
                for n in sorted(v):
                    contents += f"[{k}]\n"
                    for e in sorted(v[n]):
                        contents += f"{e}\n"
                    contents += "\n"
            else:
                contents += f"[{k}]\n"
                for e in sorted(v):
                    contents += f"{e}\n"
                contents += "\n"

        return contents


class Renderer(renderer.Renderer):
    """
    Renders network information in /etc/systemd/network

    This Renderer is currently experimental and doesn't support all the
    use cases supported by the other renderers yet.
    """

    def __init__(self, config=None):
        if not config:
            config = {}
        self.resolve_conf_fn = config.get(
            "resolve_conf_fn", "/etc/systemd/resolved.conf"
        )
        self.network_conf_dir = config.get(
            "network_conf_dir", "/etc/systemd/network/"
        )

    def generate_match_section(self, iface, cfg: CfgParser):
        sec = "Match"
        match_dict = {
            "name": "Name",
            "driver": "Driver",
        }

        if iface["type"] == "physical":
            match_dict["mac_address"] = "MACAddress"

        if not iface:
            return

        for k, v in match_dict.items():
            if k in iface and iface[k]:
                cfg.update_section(sec, v, iface[k])

        return iface["name"]

    def generate_link_section(self, iface, cfg: CfgParser):
        sec = "Link"

        if not iface:
            return

        if iface.get("mtu"):
            cfg.update_section(sec, "MTUBytes", iface["mtu"])

        if iface["type"] != "physical" and iface.get("mac_address"):
            cfg.update_section(sec, "MACAddress", iface["mac_address"])

        if "optional" in iface and iface["optional"]:
            cfg.update_section(sec, "RequiredForOnline", "no")

    def parse_routes(self, rid, conf, cfg: CfgParser):
        """
        Parse a route and use rid as a key in order to isolate the route from
        others in the route dict.
        """
        sec = "Route"
        route_cfg_map = {
            "gateway": "Gateway",
            "network": "Destination",
            "metric": "Metric",
        }

        # prefix is derived using netmask by network_state
        prefix = ""
        if "prefix" in conf:
            prefix = f"/{conf['prefix']}"

        for k, v in conf.items():
            if k not in route_cfg_map:
                continue
            if k == "network":
                v += prefix
            cfg.update_route_section(sec, rid, route_cfg_map[k], v)

    def parse_subnets(self, iface, cfg: CfgParser):
        dhcp = "no"
        sec = "Network"
        rid = 0
        for e in iface.get("subnets", []):
            t = e["type"]
            if t in {"dhcp4", "dhcp"}:
                if dhcp == "no":
                    dhcp = "ipv4"
                elif dhcp == "ipv6":
                    dhcp = "yes"
            elif t == "dhcp6":
                if dhcp == "no":
                    dhcp = "ipv6"
                elif dhcp == "ipv4":
                    dhcp = "yes"
            if "routes" in e and e["routes"]:
                for i in e["routes"]:
                    # Use "r" as a dict key prefix for this route to isolate
                    # it from other sources of routes
                    self.parse_routes(f"r{rid}", i, cfg)
                    rid = rid + 1
            if "address" in e:
                addr = e["address"]
                if "prefix" in e:
                    addr += f"/{e['prefix']}"
                subnet_cfg_map = {
                    "address": "Address",
                    "gateway": "Gateway",
                    "dns_nameservers": "DNS",
                    "dns_search": "Domains",
                }
                for k, v in e.items():
                    if k == "address":
                        cfg.update_section("Address", subnet_cfg_map[k], addr)
                    elif k == "gateway":
                        # Use "a" as a dict key prefix for this route to
                        # isolate it from other sources of routes
                        cfg.update_route_section(
                            "Route", f"a{rid}", subnet_cfg_map[k], v
                        )
                        if should_add_gateway_onlink_flag(v, addr):
                            LOG.debug(
                                "Gateway %s is not contained within subnet %s,"
                                " adding GatewayOnLink flag",
                                v,
                                addr,
                            )
                            cfg.update_route_section(
                                "Route", f"a{rid}", "GatewayOnLink", "yes"
                            )
                        rid = rid + 1
                    elif k in {"dns_nameservers", "dns_search"}:
                        cfg.update_section(sec, subnet_cfg_map[k], " ".join(v))

        cfg.update_section(sec, "DHCP", dhcp)

        if isinstance(iface.get("accept-ra"), bool):
            val = "no"
            if iface["accept-ra"]:
                val = "yes"
            cfg.update_section(sec, "IPv6AcceptRA", val)

        return dhcp

    # This is to accommodate extra keys present in VMware config
    def dhcp_domain(self, d, cfg: CfgParser):
        for item in ["dhcp4domain", "dhcp6domain"]:
            if item not in d:
                continue
            ret = str(d[item]).casefold()
            try:
                ret = util.translate_bool(ret)
                ret = "yes" if ret else "no"
            except ValueError:
                if ret != "route":
                    LOG.warning("Invalid dhcp4domain value - %s", ret)
                    ret = "no"
            if item == "dhcp4domain":
                section = "DHCPv4"
            else:
                section = "DHCPv6"
            cfg.update_section(section, "UseDomains", ret)

    def parse_dns(self, iface, cfg: CfgParser, ns: NetworkState):
        sec = "Network"

        dns = iface.get("dns")
        if not dns and ns.version == 1:
            dns = {
                "search": ns.dns_searchdomains,
                "nameservers": ns.dns_nameservers,
            }
        elif not dns and ns.version == 2:
            return

        if dns.get("search"):
            cfg.update_section(sec, "Domains", " ".join(dns["search"]))
        if dns.get("nameservers"):
            cfg.update_section(sec, "DNS", " ".join(dns["nameservers"]))

    def parse_dhcp_overrides(self, cfg: CfgParser, device, dhcp, version):
        dhcp_config_maps = {
            "UseDNS": "use-dns",
            "UseDomains": "use-domains",
            "UseHostname": "use-hostname",
            "UseNTP": "use-ntp",
        }

        if version == "4":
            dhcp_config_maps.update(
                {
                    "SendHostname": "send-hostname",
                    "Hostname": "hostname",
                    "RouteMetric": "route-metric",
                    "UseMTU": "use-mtu",
                    "UseRoutes": "use-routes",
                }
            )

        if f"dhcp{version}-overrides" in device and dhcp in [
            "yes",
            f"ipv{version}",
        ]:
            dhcp_overrides = device[f"dhcp{version}-overrides"]
            for k, v in dhcp_config_maps.items():
                if v in dhcp_overrides:
                    cfg.update_section(f"DHCPv{version}", k, dhcp_overrides[v])

    def create_network_file(self, link, conf, nwk_dir, ext=".network"):
        net_fn_owner = "systemd-network"

        LOG.debug("Setting Networking Config for %s", link)
        net_fn = f"{nwk_dir}10-cloud-init-{link}{ext}"

        if conf.endswith("\n\n"):
            conf = conf[:-1]
        util.write_file(net_fn, conf)
        util.chownbyname(net_fn, net_fn_owner, net_fn_owner)

    def render_network_state(
        self,
        network_state: NetworkState,
        templates: Optional[dict] = None,
        target=None,
    ) -> None:
        network_dir = self.network_conf_dir
        if target:
            network_dir = subp.target_path(target) + network_dir

        util.ensure_dir(network_dir)

        network = self._render_content(network_state)
        vlan_netdev = network.pop("vlan_netdev", {})
        bond_netdev = network.pop("bond_netdev", {})

        for k, v in network.items():
            self.create_network_file(k, v, network_dir)

        for k, v in vlan_netdev.items():
            self.create_network_file(k, v, network_dir, ext=".netdev")

        for k, v in bond_netdev.items():
            self.create_network_file(k, v, network_dir, ext=".netdev")

    def _render_content(self, ns: NetworkState):
        ret_dict = {}
        vlan_link = {}
        bond_link = {}

        if "vlans" in ns.config:
            vlan_dict = self.render_vlans(ns)
            vlan_netdev = vlan_dict["vlan_netdev"]
            vlan_link = vlan_dict["vlan_link"]
            ret_dict["vlan_netdev"] = vlan_netdev

        if "bonds" in ns.config:
            bond_dict = self.render_bonds(ns)
            bond_netdev = bond_dict["bond_netdev"]
            bond_link = bond_dict["bond_link"]
            ret_dict["bond_netdev"] = bond_netdev

        for iface in ns.iter_interfaces():
            cfg = CfgParser()

            iface_name = iface["name"]

            vlan_link_name = vlan_link.get(iface_name)
            if vlan_link_name:
                cfg.update_section("Network", "VLAN", vlan_link_name)

            # TODO: revisit this once network state renders macaddress
            # properly for vlan config
            if not iface["mac_address"] and vlan_link.get("macaddress"):
                mac = vlan_link["macaddress"].get(iface_name)
                if mac:
                    iface["mac_address"] = mac

            bond_link_name = bond_link.get(iface_name)
            if bond_link_name:
                cfg.update_section("Network", "Bond", bond_link_name)

            # TODO: revisit this once network state renders macaddress
            # properly for bond config
            if not iface["mac_address"] and bond_link.get("macaddress"):
                mac = bond_link["macaddress"].get(iface_name)
                if mac:
                    iface["mac_address"] = mac

            link = self.generate_match_section(iface, cfg)

            self.generate_link_section(iface, cfg)
            dhcp = self.parse_subnets(iface, cfg)
            self.parse_dns(iface, cfg, ns)

            rid = 0
            for route in ns.iter_routes():
                # Use "c" as a dict key prefix for this route to isolate it
                # from other sources of routes
                self.parse_routes(f"c{rid}", route, cfg)
                rid = rid + 1

            if ns.version == 2:
                name: Optional[str] = iface["name"]
                # network state doesn't give dhcp domain info
                # using ns.config as a workaround here

                # Check to see if this interface matches against an interface
                # from the network state that specified a set-name directive.
                # If there is a device with a set-name directive and it has
                # set-name value that matches the current name, then update the
                # current name to the device's name. That will be the value in
                # the ns.config['ethernets'] dict below.
                for dev_name, dev_cfg in ns.config["ethernets"].items():
                    if "set-name" in dev_cfg:
                        if dev_cfg.get("set-name") == name:
                            name = dev_name
                            break
                if name in ns.config["ethernets"]:
                    device = ns.config["ethernets"][name]

                    # dhcp{version}domain are extra keys only present in
                    # VMware config
                    self.dhcp_domain(device, cfg)
                    for version in ["4", "6"]:
                        if (
                            f"dhcp{version}domain" in device
                            and "use-domains"
                            in device.get(f"dhcp{version}-overrides", {})
                        ):
                            exception = (
                                f"{name} has both dhcp{version}domain"
                                f" and dhcp{version}-overrides.use-domains"
                                f" configured. Use one"
                            )
                            raise RuntimeError(exception)

                        self.parse_dhcp_overrides(cfg, device, dhcp, version)

            ret_dict.update({link: cfg.get_final_conf()})

        return ret_dict

    def render_vlans(self, ns: NetworkState) -> dict:
        vlan_link_info: Dict[str, Any] = {}
        vlan_ndev_configs = {}
        vlan_link_info["macaddress"] = {}

        vlans = ns.config.get("vlans", {})
        for vlan_name, vlan_cfg in vlans.items():
            vlan_id = vlan_cfg.get("id")
            parent = vlan_cfg.get("link")

            if vlan_id is None or parent is None:
                LOG.warning(
                    "Skipping VLAN %s - missing 'id' or 'link'", vlan_name
                )
                continue

            vlan_link_info[parent] = vlan_name

            # -------- .netdev for VLAN --------
            cfg = CfgParser()
            cfg.update_section("NetDev", "Name", vlan_name)
            cfg.update_section("NetDev", "Kind", "vlan")

            val = vlan_cfg.get("mtu")
            if val:
                cfg.update_section("NetDev", "MTUBytes", val)

            val = vlan_cfg.get("macaddress")
            if val:
                val = val.lower()
                cfg.update_section("NetDev", "MACAddress", val)
                vlan_link_info["macaddress"][vlan_name] = val

            cfg.update_section("VLAN", "Id", vlan_id)
            vlan_ndev_configs[vlan_name] = cfg.get_final_conf()

        ret_dict = {
            "vlan_netdev": vlan_ndev_configs,
            "vlan_link": vlan_link_info,
        }
        return ret_dict

    def render_bonds(self, ns: NetworkState) -> dict:
        bond_link_info: Dict[str, Any] = {}
        bond_ndev_configs = {}
        section = "Bond"

        bond_link_info["macaddress"] = {}

        bonds = ns.config.get("bonds", {})
        for bond_name, bond_cfg in bonds.items():
            interfaces = bond_cfg.get("interfaces")
            if not interfaces:
                LOG.warning(
                    "Skipping bond %s - missing 'interfaces'", bond_name
                )
                continue

            bond_link_info.update({iface: bond_name for iface in interfaces})

            # -------- .netdev for Bond --------
            cfg = CfgParser()
            cfg.update_section("NetDev", "Name", bond_name)
            cfg.update_section("NetDev", "Kind", "bond")

            val = bond_cfg.get("mtu")
            if val:
                cfg.update_section("NetDev", "MTUBytes", val)

            val = bond_cfg.get("macaddress")
            if val:
                val = val.lower()
                cfg.update_section("NetDev", "MACAddress", val)
                bond_link_info["macaddress"][bond_name] = val

            # Optional bond parameters
            params = bond_cfg.get("parameters", {})

            if "mode" in params:
                cfg.update_section(section, "Mode", params["mode"])

            if "mii-monitor-interval" in params:
                cfg.update_section(
                    section,
                    "MIIMonitorSec",
                    f"{params['mii-monitor-interval']}ms",
                )

            if "updelay" in params:
                cfg.update_section(
                    section, "UpDelaySec", f"{params['updelay']}ms"
                )

            if "downdelay" in params:
                cfg.update_section(
                    section, "DownDelaySec", f"{params['downdelay']}ms"
                )

            if "arp-interval" in params:
                cfg.update_section(
                    section, "ARPIntervalSec", f"{params['arp-interval']}ms"
                )

            if "arp-ip-target" in params:
                targets = params["arp-ip-target"]
                if isinstance(targets, str):
                    targets = [targets]
                ip_list = " ".join(targets)
                cfg.update_section(section, "ARPIPTargets", ip_list)

            if "arp-validate" in params:
                cfg.update_section(
                    section, "ARPValidate", params["arp-validate"]
                )

            if "arp-all-targets" in params:
                cfg.update_section(
                    section, "ARPAllTargets", params["arp-all-targets"]
                )

            if "primary-reselect" in params:
                cfg.update_section(
                    section,
                    "PrimaryReselectPolicy",
                    params["primary-reselect"],
                )

            if "lacp-rate" in params:
                cfg.update_section(
                    section, "LACPTransmitRate", params["lacp-rate"]
                )

            if "transmit-hash-policy" in params:
                cfg.update_section(
                    section,
                    "TransmitHashPolicy",
                    params["transmit-hash-policy"],
                )

            if "ad-select" in params:
                cfg.update_section(section, "AdSelect", params["ad-select"])

            if "min-links" in params:
                cfg.update_section(
                    section, "MinLinks", str(params["min-links"])
                )

            if "all-slaves-active" in params:
                cfg.update_section(
                    section,
                    "AllSlavesActive",
                    str(params["all-slaves-active"]).lower(),
                )

            bond_ndev_configs[bond_name] = cfg.get_final_conf()

        ret_dict = {
            "bond_netdev": bond_ndev_configs,
            "bond_link": bond_link_info,
        }
        return ret_dict


def available(target=None):
    expected = ["ip", "systemctl"]
    search = ["/usr/sbin", "/bin"]
    for p in expected:
        if not subp.which(p, search=search):
            return False
    return True