diff --git a/docs/guides/testing-ldap-krb5.rst b/docs/guides/testing-ldap-krb5.rst index 53c7e2c7..1b26a103 100644 --- a/docs/guides/testing-ldap-krb5.rst +++ b/docs/guides/testing-ldap-krb5.rst @@ -26,6 +26,40 @@ That is enough for LDAP GSSAPI (``ldap_sasl_mech = gssapi``) without each test running ``ktadd``/upload itself. Tests still call ``client.sssd.common.krb5_auth(kdc)`` and configure the SSSD domain as usual. +Reusable client utilities +------------------------- + +LDAP/Kerberos system tests can share the following helpers (no per-test ``named`` +or ``getent`` boilerplate): + +* :class:`~sssd_test_framework.utils.tools.AHostSv4Entry` / + :meth:`~sssd_test_framework.utils.tools.GetentUtils.ahostsv4` — + first IPv4 from ``getent ahostsv4`` on the client (NSS, not ``dig``). + +* :meth:`~sssd_test_framework.utils.tools.GetentUtils.resolve_ipv4` — + ``client.tools.getent.resolve_ipv4(hostname, host=role.host)`` uses topology + ``host.ip`` when set, otherwise ``getent ahostsv4``. + +* :meth:`~sssd_test_framework.utils.network.NetworkUtils.dig` — + ``client.net.dig(name, server)`` for A/PTR/SRV checks (prefer over shell ``dig``). + +* :meth:`~sssd_test_framework.utils.network.NetworkUtils.prepare_ldap_krb5_srv_discovery` — + ensures ``_ldap._tcp`` and ``_kerberos._udp`` SRV for the discovery domain (lab DNS, + ``dns.test``, or local ``named`` on the client). + +* :meth:`~sssd_test_framework.utils.network.NetworkUtils.setup_sasl_canonicalize_bogus_ptr` — + local ``named`` + ``/etc/hosts`` setup for BZ 732935 (bogus PTR for the LDAP + server IP, forward A for the LDAP FQDN, ``resolv.conf`` → ``127.0.0.1``). + Files are backed up via ``client.fs`` and restored after the test. + +* :func:`~sssd_test_framework.misc.ip_to_ptr` — reverse zone name for an IPv4 + address (also used inside the bogus-PTR helper). + +Kerberos templates from :meth:`~sssd_test_framework.roles.kdc.KDC.config` include +``rdns = false`` in ``[libdefaults]`` so tests that call +``client.sssd.common.krb5_auth(kdc)`` do not need to edit ``/etc/krb5.conf`` for +that option. + .. seealso:: * :class:`sssd_test_framework.roles.kdc.KDC` diff --git a/sssd_test_framework/roles/kdc.py b/sssd_test_framework/roles/kdc.py index f611f257..949eed18 100644 --- a/sssd_test_framework/roles/kdc.py +++ b/sssd_test_framework/roles/kdc.py @@ -131,6 +131,7 @@ def config(self) -> str: ticket_lifetime = 24h renew_lifetime = 7d forwardable = yes + rdns = false [realms] {self.host.realm} = {{ diff --git a/sssd_test_framework/roles/ldap.py b/sssd_test_framework/roles/ldap.py index d2c8a62f..5e54da36 100644 --- a/sssd_test_framework/roles/ldap.py +++ b/sssd_test_framework/roles/ldap.py @@ -3,9 +3,11 @@ from __future__ import annotations import base64 +import tempfile +import time from datetime import datetime from enum import Enum -from typing import Any, Generic, TypeVar +from typing import TYPE_CHECKING, Any, Generic, TypeVar import ldap import ldap.ldapobject @@ -17,6 +19,9 @@ from .generic import GenericNetgroupMember, GenericPasswordPolicy, ProtocolName from .nfs import NFSExport +if TYPE_CHECKING: + from .kdc import KDC + __all__ = [ "LDAPRoleType", "LDAPPasswordPolicy", @@ -239,6 +244,197 @@ def setup(self) -> None: except ldap.TYPE_OR_VALUE_EXISTS: pass + def enable_gssapi(self, kdc: KDC) -> None: + """ + Configure Directory Server for GSSAPI/SASL authentication. + + This method sets up the LDAP server to accept GSSAPI (Kerberos) authentication + by creating a service principal, exporting the keytab, and configuring Directory Server. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopology.LDAP_KRB5) + def test_ldap_gssapi(client: Client, ldap: LDAP, kdc: KDC): + # Enable GSSAPI on LDAP server + ldap.enable_gssapi(kdc) + + ldap.user('testuser').add() + kdc.principal('testuser').add() + + # Configure SSSD to use GSSAPI + client.sssd.domain["ldap_sasl_mech"] = "GSSAPI" + client.sssd.start() + + result = client.tools.id('testuser') + assert result is not None + + :param kdc: KDC role object to create service principal + :type kdc: KDC + """ + + # 1. Install required packages + self.host.conn.run( + "dnf install -y cyrus-sasl-gssapi krb5-workstation || " + "yum install -y cyrus-sasl-gssapi krb5-workstation", + ) + self.host.conn.run("rpm -q cyrus-sasl-gssapi") + + # 2. Create LDAP service principal + ldap_principal = f"ldap/{self.host.hostname}" + kdc.principal(ldap_principal).add(password=None) + + # 3. Export keytab to LDAP server (same transfer pattern as LDAPKRB5TopologyController) + keytab_path = "/etc/dirsrv/ds.keytab" + keytab_staging = "/tmp/sssd-test-framework-ds.keytab" + qualified_principal = kdc.qualify(ldap_principal) + kdc.host.conn.run(f"rm -f {keytab_staging}", raise_on_error=False) + kdc.host.conn.run(f"kadmin.local -q 'ktadd -k {keytab_staging} -norandkey \"{qualified_principal}\"'") + with tempfile.NamedTemporaryFile() as tmp: + kdc.host.fs.download(keytab_staging, tmp.name) + self.host.fs.upload(tmp.name, keytab_path) + kdc.host.conn.run(f"rm -f {keytab_staging}", raise_on_error=False) + self.host.conn.run(f"chown dirsrv:dirsrv {keytab_path}") + self.host.conn.run(f"chmod 600 {keytab_path}") + + # 4. Copy krb5.conf from KDC to LDAP server + krb5_conf = kdc.config() + self.host.conn.run(f"cat > /etc/krb5.conf << 'EOFKRB5'\n{krb5_conf}\nEOFKRB5") + + # Add default_keytab_name to krb5.conf as fallback + self.host.conn.run(f"sed -i '/\\[libdefaults\\]/a\\ default_keytab_name = {keytab_path}' /etc/krb5.conf") + + # 5. Configure Cyrus SASL to use the keytab + # This is critical - without this, the SASL GSSAPI plugin won't know where to find the keytab + self.host.conn.run( + f"mkdir -p /etc/sasl2 && " + f"cat > /etc/sasl2/slapd.conf << 'EOFSASL'\n" + f"mech_list: GSSAPI EXTERNAL PLAIN LOGIN\n" + f"keytab: {keytab_path}\n" + f"EOFSASL" + ) + + # Also create for other possible SASL application names + self.host.conn.run("cp /etc/sasl2/slapd.conf /etc/sasl2/ns-slapd.conf") + self.host.conn.run("cp /etc/sasl2/slapd.conf /etc/sasl2/ldap.conf") + + # 6. Set KRB5_KTNAME environment variable for Directory Server via sysconfig + # Note: systemd Environment= directives don't work reliably in containers + # Use EnvironmentFile instead + self.host.conn.run(f"echo 'KRB5_KTNAME={keytab_path}' > /etc/sysconfig/dirsrv-localhost") + + # 7. Configure SASL identity mapping in Directory Server + # Align default Kerberos maps with the data suffix (sssd-qe krb_credential_cache) and + # add a high-priority map for host/ldap service principals used by SSSD GSSAPI binds. + realm = kdc.realm + base_dn = self.naming_context + binddn = self.host.binddn + bindpw = self.host.bindpw + + sasl_ldif = "" + for cn in ( + "Kerberos uid mapping", + "rfc 2829 dn syntax", + "rfc 2829 u syntax", + "uid mapping", + ): + sasl_ldif += ( + f"dn: cn={cn},cn=mapping,cn=sasl,cn=config\n" + "changetype: modify\n" + "replace: nsSaslMapBaseDNTemplate\n" + f"nsSaslMapBaseDNTemplate: {base_dn}\n" + "\n" + ) + self.host.conn.run( + f"ldapmodify -x -D '{binddn}' -w '{bindpw}' -H ldap://localhost", + input=sasl_ldif, + ) + for cn in ( + "SSSD service principals", + "SSSD service principals no realm", + ): + self.host.conn.run( + f"ldapmodify -x -D '{binddn}' -w '{bindpw}' -H ldap://localhost", + input=(f"dn: cn={cn},cn=mapping,cn=sasl,cn=config\n" "changetype: delete\n"), + raise_on_error=False, + ) + + # cn=Directory Manager is a bind identity, not a searchable LDAP entry (BASE + # search returns No such object). Map GSSAPI clients to a real entry under the + # data suffix, per 389-ds server-to-server SASL examples (full target DN + + # (objectclass=*)). + people_ou = f"ou=People,{base_dn}" + gssapi_proxy_dn = f"uid=sssd-gssapi,{people_ou}" + bootstrap_ldif = ( + f"dn: {people_ou}\n" + "changetype: add\n" + "objectClass: organizationalUnit\n" + "ou: People\n" + "\n" + f"dn: {gssapi_proxy_dn}\n" + "changetype: add\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + "objectClass: inetOrgPerson\n" + "cn: SSSD GSSAPI proxy\n" + "sn: proxy\n" + "uid: sssd-gssapi\n" + ) + self.host.conn.run( + f"ldapmodify -x -D '{binddn}' -w '{bindpw}' -H ldap://localhost", + input=bootstrap_ldif, + raise_on_error=False, + ) + service_map_ldif = ( + "dn: cn=SSSD service principals,cn=mapping,cn=sasl,cn=config\n" + "changetype: add\n" + "objectClass: top\n" + "objectClass: nsSaslMapping\n" + "cn: SSSD service principals\n" + f"nsSaslMapRegexString: ^(host|ldap)/.*@{realm}$\n" + f"nsSaslMapBaseDNTemplate: {gssapi_proxy_dn}\n" + "nsSaslMapFilterTemplate: (objectclass=*)\n" + "nsSaslMapPriority: 10\n" + "\n" + "dn: cn=SSSD service principals no realm,cn=mapping,cn=sasl,cn=config\n" + "changetype: add\n" + "objectClass: top\n" + "objectClass: nsSaslMapping\n" + "cn: SSSD service principals no realm\n" + "nsSaslMapRegexString: ^(host|ldap)/.*$\n" + f"nsSaslMapBaseDNTemplate: {gssapi_proxy_dn}\n" + "nsSaslMapFilterTemplate: (objectclass=*)\n" + "nsSaslMapPriority: 11\n" + ) + self.host.conn.run( + f"ldapmodify -x -D '{binddn}' -w '{bindpw}' -H ldap://localhost", + input=service_map_ldif, + ) + verify = self.host.conn.run( + f"ldapsearch -x -D '{binddn}' -w '{bindpw}' -H ldap://localhost " + f"-b '{gssapi_proxy_dn}' -s base '(objectclass=*)' dn", + raise_on_error=False, + ) + if verify.rc != 0 or "dn:" not in (verify.stdout or ""): + raise RuntimeError( + f"SASL GSSAPI proxy entry {gssapi_proxy_dn} is not searchable: " f"{verify.stdout or verify.stderr}" + ) + + # 8. Reload systemd and restart Directory Server + self.host.conn.run("systemctl daemon-reload") + self.host.conn.run("systemctl restart dirsrv@localhost") + + # Wait for Directory Server to fully start with GSSAPI support + time.sleep(3) + + klist = self.host.conn.run(f"klist -kt {keytab_path}", raise_on_error=False) + if qualified_principal not in (klist.stdout or ""): + raise RuntimeError( + f"LDAP GSSAPI keytab {keytab_path} does not contain {qualified_principal}: " + f"{klist.stdout or klist.stderr}" + ) + def fqn(self, name: str) -> str: """ Return fully qualified name in form name@domain. diff --git a/sssd_test_framework/utils/network.py b/sssd_test_framework/utils/network.py index 8cc784d7..e39932dc 100644 --- a/sssd_test_framework/utils/network.py +++ b/sssd_test_framework/utils/network.py @@ -2,6 +2,7 @@ from __future__ import annotations +import re from typing import Any import jc @@ -9,12 +10,75 @@ from pytest_mh.conn import ProcessResult from pytest_mh.utils.fs import LinuxFileSystem -from ..misc import ip_is_valid +from ..misc import ip_is_valid, ip_to_ptr from ..misc.ssh import SSHKillableProcess __all__ = ["NetworkUtils", "IPUtils"] +def _inject_named_forwarders(fs: LinuxFileSystem, forwarder_ips: list[str]) -> None: + """ + Add DNS forwarders to local named so 127.0.0.1 can resolve external names. + + :param fs: Remote host file system (/etc/named.conf). + :type fs: LinuxFileSystem + :param forwarder_ips: Upstream resolver addresses from the original resolv.conf. + :type forwarder_ips: list[str] + """ + if not forwarder_ips: + return + + named_conf = fs.read("/etc/named.conf") + if "forwarders" in named_conf: + return + + forwarders_line = " forwarders { " + "; ".join(forwarder_ips) + "; };\n" + new_lines: list[str] = [] + inserted = False + for line in named_conf.splitlines(keepends=True): + new_lines.append(line) + if not inserted and line.strip() == "options {": + new_lines.append(forwarders_line) + inserted = True + + if inserted: + fs.write("/etc/named.conf", "".join(new_lines)) + + +def _inject_named_options_for_local_dns(fs: LinuxFileSystem) -> None: + """ + Set listen-on 127.0.0.1 and a single dnssec-validation no in named.conf. + + :param fs: Remote host file system (/etc/named.conf). + :type fs: LinuxFileSystem + """ + named_conf = fs.read("/etc/named.conf") + lines = named_conf.splitlines(keepends=True) + filtered: list[str] = [] + for line in lines: + if re.match(r"^\s*dnssec-validation\s+", line): + continue + filtered.append(line) + named_conf = "".join(filtered) + + extra_options = " dnssec-validation no;\n" + if "listen-on port 53 { 127.0.0.1" not in named_conf: + extra_options = ( + " listen-on port 53 { 127.0.0.1; };\n" " listen-on-v6 port 53 { ::1; };\n" + ) + extra_options + + new_lines: list[str] = [] + inserted = False + for line in named_conf.splitlines(keepends=True): + new_lines.append(line) + if not inserted and line.strip() == "options {": + new_lines.append(extra_options) + inserted = True + + if inserted: + fs.write("/etc/named.conf", "".join(new_lines)) + + class NetworkUtils(MultihostUtility[MultihostHost]): def __init__(self, host: MultihostHost, fs: LinuxFileSystem) -> None: @@ -103,8 +167,13 @@ def dig(self, address: str, server: str | None = None) -> list[dict] | None: :return: List of dig results. :rtype: list[dict] """ - server = f"@{server}" if server else "" - args = f"+norecurse {'-x ' if ip_is_valid(address) else ''}{address} {server}" + rev = "-x " if ip_is_valid(address) else "" + if server: + args = f"+norecurse @{server} {rev}{address}" + else: + args = f"+norecurse {rev}{address}" + if address.startswith("_") and " " not in address: + args += " SRV" try: output = self.host.conn.run(f"dig {args}").stdout @@ -115,8 +184,13 @@ def dig(self, address: str, server: str | None = None) -> list[dict] | None: if not isinstance(parsed_output, list) or not parsed_output: return None - result = parsed_output[0].get("answer", []) - if not isinstance(result, list) or not result: + result: list[dict] = [] + for section in ("answer", "additional"): + section_records = parsed_output[0].get(section, []) + if isinstance(section_records, list): + result.extend(section_records) + + if not result: return None required_keys = {"name", "type", "ttl", "data"} @@ -145,6 +219,490 @@ def dig(self, address: str, server: str | None = None) -> list[dict] | None: return records if records else None + def has_srv_record(self, query: str, server: str | None = None) -> bool: + """ + Return True if query has at least one SRV record. + + Uses dig() first, then dig +short when jc omits SRV from the parsed answer. + + :param query: SRV query name (e.g. _ldap._tcp.ldap.test). + :type query: str + :param server: Optional resolver (@127.0.0.1); default is the system resolver. + :type server: str | None, optional + :return: True when at least one SRV record is present. + :rtype: bool + """ + records = self.dig(query, server) + if records and any(r.get("type") == "SRV" for r in records): + return True + + if server: + short_cmd = f"dig +norecurse +short @{server} {query} SRV" + else: + short_cmd = f"dig +norecurse +short {query} SRV" + short = self.host.conn.run(short_cmd, raise_on_error=False) + return short.rc == 0 and bool(short.stdout.strip()) + + def _resolve_host_ipv4(self, hostname: str) -> str: + """ + Return the first IPv4 address for hostname via getent ahostsv4. + + :param hostname: FQDN to resolve on this host. + :type hostname: str + :return: IPv4 dotted-quad string. + :rtype: str + :raises RuntimeError: if the lookup fails or returns no address. + """ + lookup = self.host.conn.run(f"getent ahostsv4 {hostname}", raise_on_error=False) + if lookup.rc != 0 or not lookup.stdout: + raise RuntimeError(f"Could not resolve {hostname} for DNS SRV setup") + return lookup.stdout.split()[0] + + def _role_ipv4(self, hostname: str, *, role_host: object | None = None, label: str = "host") -> str: + """ + Resolve role IPv4 using multihost host.ip when set, else getent ahostsv4. + + :param hostname: FQDN to resolve on this host. + :type hostname: str + :param role_host: Optional multihost role host with ip attribute. + :type role_host: object | None, optional + :param label: Role name for error messages (e.g. LDAP, KDC). + :type label: str, optional + :return: IPv4 dotted-quad string. + :rtype: str + :raises RuntimeError: if the lookup fails or returns no address. + """ + if role_host is not None: + role_ip = getattr(role_host, "ip", None) + if role_ip: + return str(role_ip) + try: + return self._resolve_host_ipv4(hostname) + except RuntimeError as exc: + raise RuntimeError( + f"Setup: could not resolve {label} to IPv4 on client " f"({hostname}; need A record or role host IP)" + ) from exc + + def _pin_discovery_hostnames( + self, + *, + ldap_ip: str, + ldap_hostname: str, + kdc_ip: str, + kdc_hostname: str, + ) -> None: + """ + Pin LDAP and KDC FQDNs in /etc/hosts. + + krb5_child uses kdc = host:88 from /etc/krb5.conf, not SSSD krb5_server SRV. + + :param ldap_ip: LDAP server IPv4 address. + :type ldap_ip: str + :param ldap_hostname: LDAP server FQDN (e.g. master.ldap.test). + :type ldap_hostname: str + :param kdc_ip: KDC IPv4 address. + :type kdc_ip: str + :param kdc_hostname: KDC FQDN (e.g. kdc.test). + :type kdc_hostname: str + :raises RuntimeError: if both hosts share one IP or pinning does not take effect. + """ + if ldap_hostname != kdc_hostname and ldap_ip == kdc_ip: + raise RuntimeError( + f"{ldap_hostname} and {kdc_hostname} resolved to the same address " + f"({ldap_ip}); krb5_child cannot reach a KDC on the LDAP host" + ) + + self.fs.backup("/etc/hosts") + hosts_lines: list[str] = [] + for line in self.fs.read("/etc/hosts").splitlines(): + parts = line.split() + if len(parts) >= 1 and parts[0] in (ldap_ip, kdc_ip): + continue + if len(parts) >= 2 and (ldap_hostname in parts[1:] or kdc_hostname in parts[1:]): + continue + hosts_lines.append(line) + hosts_lines.append(f"{ldap_ip}\t{ldap_hostname}") + hosts_lines.append(f"{kdc_ip}\t{kdc_hostname}") + self.fs.write("/etc/hosts", "\n".join(hosts_lines).rstrip() + "\n") + self.host.conn.run("nscd -i hosts 2>/dev/null || true", raise_on_error=False) + + if self._resolve_host_ipv4(ldap_hostname) != ldap_ip: + raise RuntimeError(f"{ldap_hostname} does not resolve to {ldap_ip} after pinning") + if self._resolve_host_ipv4(kdc_hostname) != kdc_ip: + raise RuntimeError(f"{kdc_hostname} does not resolve to {kdc_ip} after pinning") + + def _probe_kdc_port(self, kdc_ip: str, kdc_hostname: str, port: int) -> None: + """ + Check that the KDC accepts TCP connections on port. + + :param kdc_ip: KDC IPv4 address (after hosts pinning). + :type kdc_ip: str + :param kdc_hostname: KDC FQDN (for error messages). + :type kdc_hostname: str + :param port: Kerberos port (usually 88). + :type port: int + :raises RuntimeError: if the TCP probe fails. + """ + probe = self.host.conn.run( + f"timeout 3 bash -c 'echo > /dev/tcp/{kdc_ip}/{port}'", + raise_on_error=False, + ) + if probe.rc != 0: + raise RuntimeError(f"KDC {kdc_hostname} ({kdc_ip}:{port}) is not reachable from {self.host.hostname}") + + def prepare_ldap_krb5_srv_discovery( + self, + *, + discovery_domain: str, + ldap_hostname: str, + kdc_hostname: str, + client_hostname: str, + ldap_port: int = 389, + kdc_port: int = 88, + ) -> None: + """ + Prepare DNS for LDAP/Kerberos SRV discovery tests on this host. + + Uses the current resolver, then dns.test, else local named on 127.0.0.1. + Pins LDAP/KDC in /etc/hosts and verifies KDC port 88 for password auth. + + :param discovery_domain: DNS domain for SRV records (e.g. ldap.test for + _ldap._tcp / _kerberos._udp). + :type discovery_domain: str + :param ldap_hostname: LDAP server FQDN published in SRV and A records. + :type ldap_hostname: str + :param kdc_hostname: KDC FQDN published in SRV and /etc/krb5.conf. + :type kdc_hostname: str + :param client_hostname: This host FQDN (SOA/NS in a local zone when needed). + :type client_hostname: str + :param ldap_port: LDAP port in the SRV record (default 389). + :type ldap_port: int, optional + :param kdc_port: Kerberos port in the SRV record and TCP probe (default 88). + :type kdc_port: int, optional + :raises RuntimeError: if SRV setup, name pinning, or KDC connectivity fails. + """ + ldap_srv = f"_ldap._tcp.{discovery_domain}" + krb_srv = f"_kerberos._udp.{discovery_domain}" + + ldap_ip = self._resolve_host_ipv4(ldap_hostname) + kdc_ip = self._resolve_host_ipv4(kdc_hostname) + used_local_named = False + + if not (self.has_srv_record(ldap_srv) and self.has_srv_record(krb_srv)): + dns_probe = self.host.conn.run("getent hosts dns.test", raise_on_error=False) + if dns_probe.rc == 0 and dns_probe.stdout: + dns_ip = dns_probe.stdout.split()[0] + if self.has_srv_record(ldap_srv, dns_ip) and self.has_srv_record(krb_srv, dns_ip): + self.fs.backup("/etc/resolv.conf") + self.fs.write( + "/etc/resolv.conf", + f"search {discovery_domain}\nnameserver {dns_ip}\n", + ) + + if not (self.has_srv_record(ldap_srv) and self.has_srv_record(krb_srv)): + used_local_named = True + if self.host.conn.run("rpm -q bind", raise_on_error=False).rc != 0: + self.host.conn.run( + "dnf install -y bind bind-utils || yum install -y bind bind-utils", + raise_on_error=False, + ) + if self.host.conn.run("rpm -q bind", raise_on_error=False).rc != 0: + raise RuntimeError("bind package is required (dnf install -y bind bind-utils)") + + ldap_short = ldap_hostname.removesuffix(f".{discovery_domain}").rstrip(".") + kdc_short = kdc_hostname.removesuffix(f".{discovery_domain}").rstrip(".") + forward_zone_path = f"/var/named/{discovery_domain}" + + self.fs.backup("/etc/resolv.conf") + self.fs.backup("/etc/named.conf") + self.fs.backup(forward_zone_path) + + zone_marker = f'zone "{discovery_domain}"' + named_conf = self.fs.read("/etc/named.conf") + if zone_marker not in named_conf: + named_conf += ( + f'zone "{discovery_domain}" {{\n' + " type master;\n" + " check-names ignore;\n" + f' file "{discovery_domain}";\n' + "};\n" + ) + self.fs.write("/etc/named.conf", named_conf) + + upstream_ips: list[str] = [] + for line in self.fs.read("/etc/resolv.conf").splitlines(): + stripped = line.strip() + if stripped.startswith("nameserver ") and "127.0.0.1" not in stripped: + upstream_ips.append(stripped.split()[1]) + _inject_named_forwarders(self.fs, upstream_ips) + _inject_named_options_for_local_dns(self.fs) + + soa_lines = ( + "$TTL 604800\n" + f"$ORIGIN {discovery_domain}.\n" + f"@ IN SOA {client_hostname}. root.{client_hostname}. (\n" + " 2010050702 ; serial\n" + " 604800 ; refresh\n" + " 86400 ; retry\n" + " 2419200 ; expire\n" + " 10800 ; negative caching time\n" + " )\n" + f"@ IN NS {client_hostname}.\n" + ) + self.fs.write( + forward_zone_path, + soa_lines + + f"{ldap_short} IN A {ldap_ip}\n" + + f"{kdc_short} IN A {kdc_ip}\n" + + f"{ldap_srv}. IN SRV 0 100 {ldap_port} {ldap_hostname}.\n" + + f"{krb_srv}. IN SRV 0 100 {kdc_port} {kdc_hostname}.\n", + ) + self.host.conn.run(f"restorecon -v {forward_zone_path}", raise_on_error=False) + + zone_check = self.host.conn.run( + f"named-checkzone {discovery_domain} {forward_zone_path}", + raise_on_error=False, + ) + if zone_check.rc != 0: + raise RuntimeError( + f"named-checkzone failed for {forward_zone_path}: " + f"{(zone_check.stdout or zone_check.stderr or '').strip()}" + ) + + conf_check = self.host.conn.run("named-checkconf", raise_on_error=False) + if conf_check.rc != 0: + raise RuntimeError( + f"named-checkconf failed: {(conf_check.stdout or conf_check.stderr or '').strip()}" + ) + + self.host.conn.run("systemctl enable named", raise_on_error=False) + named_restart = self.host.conn.run("systemctl restart named", raise_on_error=False) + if named_restart.rc != 0: + journal = self.host.conn.run( + "journalctl -u named -n 30 --no-pager", + raise_on_error=False, + ) + raise RuntimeError( + "systemctl restart named failed; " + f"journal: {(journal.stdout or journal.stderr or '')[-2000:]}" + ) + + self.fs.write("/etc/resolv.conf", f"search {discovery_domain}\nnameserver 127.0.0.1\n") + self.host.conn.run("restorecon -v /etc/resolv.conf", raise_on_error=False) + + self._pin_discovery_hostnames( + ldap_ip=ldap_ip, + ldap_hostname=ldap_hostname, + kdc_ip=kdc_ip, + kdc_hostname=kdc_hostname, + ) + self._probe_kdc_port(kdc_ip, kdc_hostname, kdc_port) + + if used_local_named: + local_dns = "127.0.0.1" + if not (self.has_srv_record(ldap_srv, local_dns) and self.has_srv_record(krb_srv, local_dns)): + dig_ldap = self.host.conn.run( + f"dig +norecurse @{local_dns} {ldap_srv} SRV", + raise_on_error=False, + ) + dig_krb = self.host.conn.run( + f"dig +norecurse @{local_dns} {krb_srv} SRV", + raise_on_error=False, + ) + raise RuntimeError( + f"SRV records for {discovery_domain} are not visible on @{local_dns} after " + f"local named setup ({ldap_srv}, {krb_srv}); " + f"dig ldap: {(dig_ldap.stdout or dig_ldap.stderr or '')[-500:]}; " + f"dig krb: {(dig_krb.stdout or dig_krb.stderr or '')[-500:]}" + ) + + def setup_sasl_canonicalize_bogus_ptr( + self, + *, + ldap_hostname: str, + kdc_hostname: str, + provider_domain: str, + client_hostname: str, + ldap_ip: str | None = None, + kdc_ip: str | None = None, + ldap_host: object | None = None, + kdc_host: object | None = None, + bogus_label: str = "invalid", + ) -> tuple[str, str]: + """ + Configure bogus LDAP PTR and local DNS (ldap_sasl_canonicalize). + + Resolves LDAP and KDC IPv4 on this host when ldap_ip/kdc_ip are omitted + (multihost host.ip, else getent ahostsv4). Runs named on 127.0.0.1, + wrong PTR for ldap_ip, forward A for LDAP, and pins KDC in /etc/hosts. + + :param ldap_hostname: Real LDAP FQDN (Kerberos service name / forward A record). + :type ldap_hostname: str + :param kdc_hostname: KDC FQDN pinned in /etc/hosts. + :type kdc_hostname: str + :param provider_domain: LDAP DNS domain (e.g. ldap.test). + :type provider_domain: str + :param client_hostname: This host FQDN (SOA/NS in zone files). + :type client_hostname: str + :param ldap_ip: LDAP server IPv4 address; resolved when omitted. + :type ldap_ip: str | None, optional + :param kdc_ip: KDC IPv4 address; resolved when omitted. + :type kdc_ip: str | None, optional + :param ldap_host: LDAP role host for multihost host.ip lookup. + :type ldap_host: object | None, optional + :param kdc_host: KDC role host for multihost host.ip lookup. + :type kdc_host: object | None, optional + :param bogus_label: Leftmost PTR label (default invalid -> invalid.ldap.test). + :type bogus_label: str, optional + :return: Resolved (ldap_ip, kdc_ip) used for the setup. + :rtype: tuple[str, str] + :raises RuntimeError: if resolution, bind install, or DNS lookups fail. + """ + if ldap_ip is None: + ldap_ip = self._role_ipv4(ldap_hostname, role_host=ldap_host, label="LDAP host") + if kdc_ip is None: + kdc_ip = self._role_ipv4(kdc_hostname, role_host=kdc_host, label="KDC host") + if self.host.conn.run("rpm -q bind", raise_on_error=False).rc != 0: + self.host.conn.run( + "dnf install -y bind bind-utils || yum install -y bind bind-utils", + raise_on_error=False, + ) + assert ( + self.host.conn.run("rpm -q bind", raise_on_error=False).rc == 0 + ), "bind package is required (dnf install -y bind bind-utils)" + + bogus_hostname = f"{bogus_label}.{provider_domain}" + ldap_short = ldap_hostname.removesuffix(f".{provider_domain}").rstrip(".") + + self.fs.backup("/etc/hosts") + hosts_lines: list[str] = [] + for line in self.fs.read("/etc/hosts").splitlines(): + parts = line.split() + if len(parts) >= 1 and parts[0] == ldap_ip: + continue + if len(parts) >= 2 and (ldap_hostname in parts[1:] or bogus_hostname in parts[1:]): + continue + if len(parts) >= 2 and kdc_hostname in parts[1:]: + continue + hosts_lines.append(line) + hosts_lines.append(f"{ldap_ip}\t{bogus_hostname}") + hosts_lines.append(f"{kdc_ip}\t{kdc_hostname}") + self.fs.write("/etc/hosts", "\n".join(hosts_lines).rstrip() + "\n") + self.host.conn.run("nscd -i hosts 2>/dev/null || true", raise_on_error=False) + + reverse_zone = ip_to_ptr(ldap_ip) + ptr_label = ldap_ip.rsplit(".", maxsplit=1)[-1] + reverse_zone_path = f"/var/named/{reverse_zone}" + forward_zone_path = f"/var/named/{provider_domain}" + + self.fs.backup("/etc/resolv.conf") + self.fs.backup("/etc/named.conf") + self.fs.backup(reverse_zone_path) + self.fs.backup(forward_zone_path) + + def zone_block(zone_name: str, zone_file: str) -> str: + """ + Return a named.conf stanza for a master zone file. + + :param zone_name: Zone name (e.g. ldap.test). + :type zone_name: str + :param zone_file: Zone file basename under /var/named/. + :type zone_file: str + :return: named.conf zone stanza text. + :rtype: str + """ + return ( + f'zone "{zone_name}" {{\n' + " type master;\n" + " check-names ignore;\n" + f' file "{zone_file}";\n' + "};\n" + ) + + named_conf = self.fs.read("/etc/named.conf") + for zone_name, zone_file in ( + (reverse_zone, reverse_zone), + (provider_domain, provider_domain), + ): + if zone_name not in named_conf: + named_conf += zone_block(zone_name, zone_file) + self.fs.write("/etc/named.conf", named_conf) + + upstream_ips: list[str] = [] + for line in self.fs.read("/etc/resolv.conf").splitlines(): + stripped = line.strip() + if stripped.startswith("nameserver ") and "127.0.0.1" not in stripped: + upstream_ips.append(stripped.split()[1]) + _inject_named_forwarders(self.fs, upstream_ips) + + soa_lines = ( + "$TTL 604800\n" + f"$ORIGIN {provider_domain}.\n" + f"@ IN SOA {client_hostname}. root.{client_hostname}. (\n" + " 2010050702 ; serial\n" + " 604800 ; refresh\n" + " 86400 ; retry\n" + " 2419200 ; expire\n" + " 10800 ; negative caching time\n" + " )\n" + f"@ IN NS {client_hostname}.\n" + ) + reverse_soa = soa_lines.replace(f"$ORIGIN {provider_domain}.\n", f"$ORIGIN {reverse_zone}.\n") + self.fs.write( + reverse_zone_path, + reverse_soa + f"{ptr_label} IN PTR {bogus_hostname}.\n", + ) + self.fs.write( + forward_zone_path, + soa_lines + f"{ldap_short} IN A {ldap_ip}\n", + ) + self.host.conn.run( + f"restorecon -v {reverse_zone_path} {forward_zone_path}", + raise_on_error=False, + ) + + self.host.conn.run("systemctl enable named", raise_on_error=False) + named_restart = self.host.conn.run("systemctl restart named", raise_on_error=False) + if named_restart.rc != 0: + journal = self.host.conn.run( + "journalctl -u named -n 30 --no-pager", + raise_on_error=False, + ) + assert False, ( + "systemctl restart named failed; " f"journal: {(journal.stdout or journal.stderr or '')[-2000:]}" + ) + + self.fs.write("/etc/resolv.conf", "nameserver 127.0.0.1\n") + self.host.conn.run("restorecon -v /etc/resolv.conf", raise_on_error=False) + self.host.conn.run("nscd -i hosts 2>/dev/null || true", raise_on_error=False) + + ptr_result = self.dig(ldap_ip, "127.0.0.1") + assert ptr_result, f"dig -x {ldap_ip} @127.0.0.1 returned no PTR" + assert any( + bogus_hostname in str(record.get("data", "")) for record in ptr_result + ), f"PTR for {ldap_ip} via 127.0.0.1 is not {bogus_hostname}" + + a_result = self.dig(ldap_hostname, "127.0.0.1") + assert a_result and any( + record.get("type") == "A" and str(record.get("data")) == ldap_ip for record in a_result + ), (f"no local A record for {ldap_hostname} via 127.0.0.1 " f"(zone {provider_domain}, record {ldap_short})") + + rev_nss = self.host.conn.run(f"getent hosts {ldap_ip}", raise_on_error=False) + rev_out = rev_nss.stdout or "" + assert ( + ldap_hostname not in rev_out + ), f"{ldap_ip} must not reverse-resolve to {ldap_hostname}; getent hosts: {rev_out.strip()!r}" + assert ( + bogus_hostname in rev_out + ), f"{ldap_ip} must reverse-resolve to {bogus_hostname}; getent hosts: {rev_out.strip()!r}" + + if self._resolve_host_ipv4(ldap_hostname) != ldap_ip: + raise RuntimeError(f"forward lookup for {ldap_hostname} must return {ldap_ip}") + if self._resolve_host_ipv4(kdc_hostname) != kdc_ip: + raise RuntimeError(f"forward lookup for {kdc_hostname} must return {kdc_ip}") + + return ldap_ip, kdc_ip + def teardown(self): """ Revert all changes. diff --git a/sssd_test_framework/utils/sssd.py b/sssd_test_framework/utils/sssd.py index dd223b78..ee19d689 100644 --- a/sssd_test_framework/utils/sssd.py +++ b/sssd_test_framework/utils/sssd.py @@ -3,6 +3,7 @@ from __future__ import annotations import configparser +import tempfile from io import StringIO from typing import TYPE_CHECKING, Literal @@ -21,6 +22,7 @@ from pytest_mh.utils.services import SystemdServices from ..roles.base import BaseRole + from ..roles.ipa import IPA from ..roles.kdc import KDC from .authselect import AuthselectUtils from .smartcard import SmartCardUtils @@ -108,10 +110,12 @@ def setup(self) -> None: return # Set default configuration - self.config.read_string(""" + self.config.read_string( + """ [sssd] services = nss, pam - """) + """ + ) def async_start( self, @@ -869,7 +873,7 @@ def local(self) -> None: ) self.sssd.default_domain = "local" - def krb_provider(self, backend: KDC | GenericProvider) -> None: + def krb_provider(self, backend: KDC | GenericProvider | IPA) -> None: """ Set auth_provider to krb5 and populate krb5 options. @@ -878,7 +882,7 @@ def krb_provider(self, backend: KDC | GenericProvider) -> None: the provided backend (KDC, IPA, or AD). :param backend: Backend role object (KDC, IPA, or AD). - :type backend: KDC | GenericProvider + :type backend: KDC | GenericProvider | IPA """ host = backend.host if not isinstance(host, BaseDomainHost): @@ -998,12 +1002,28 @@ def ldap_provider( tls_reqcert: str = "demand", ssl: bool = False, config: dict[str, str] | None = None, + *, + auth_provider: Literal["ldap", "krb5"] = "ldap", + ipa: IPA | None = None, + ldap_schema: str | None = None, + discovery_domain: str | None = None, + gssapi: bool = False, + client_hostname: str | None = None, + ldap_krb5_keytab: str = "/etc/krb5.keytab", + replace_domain: bool | None = None, ) -> None: """ Configure SSSD to use the ldap_provider to connect to IPA or AD. This is an alternate configuration and should rarely be used. LDAP provider test cases should cover these scenarios. + **Default (``auth_provider="ldap"``)** — unchanged behaviour: ``id_provider`` + and ``auth_provider`` are both ``ldap``, simple bind only. + + **Krb5 mode** — pass ``auth_provider="krb5"`` and ``ipa`` (plus optional + ``gssapi``, ``discovery_domain``, ``ldap_schema``) for ``id_provider=ldap`` + with ``auth_provider=krb5`` against IPA. + :param server: LDAP server. :type server: str :param naming_context: Naming context @@ -1023,6 +1043,37 @@ def ldap_provider( :param config: Additional configuration, optional :type config: dict[str, str] | None """ + if auth_provider == "ldap" and ( + ipa is not None + or ldap_schema is not None + or discovery_domain is not None + or gssapi + or client_hostname is not None + or replace_domain is not None + ): + raise ValueError("krb5 options require auth_provider='krb5'") + + if auth_provider == "krb5": + self._ldap_provider_krb5( + server, + naming_context, + bind_user_dn, + bind_password, + ipa=ipa, + ldap_schema=ldap_schema, + discovery_domain=discovery_domain, + gssapi=gssapi, + client_hostname=client_hostname, + ldap_krb5_keytab=ldap_krb5_keytab, + subids=subids, + cacert=cacert, + tls_reqcert=tls_reqcert, + ssl=ssl, + replace_domain=replace_domain, + config=config, + ) + return + self.sssd.domain.clear() self.sssd.domain.update( id_provider="ldap", @@ -1059,6 +1110,209 @@ def ldap_provider( self.sssd.config_apply() + _IPA_HOST_KEYTAB_STAGING = "/root/.sssd-test-framework-host.keytab" + + def _ipa_host_keytab_has_principal(self, keytab: str, principal: str) -> bool: + klist = self.sssd.host.conn.run(f"klist -kt {keytab}", raise_on_error=False) + return klist.rc == 0 and principal in (klist.stdout or "") + + def _ensure_ipa_host_enrolled(self, ipa: IPA, client_hostname: str) -> None: + """Register ``client_hostname`` in IPA when only the local hostname was set.""" + ipa.host.kinit() + if ipa.host.conn.run(f"ipa host-show {client_hostname}", raise_on_error=False).rc == 0: + return + + client = self.sssd.host + join = client.conn.exec(["realm", "join", ipa.domain], input=ipa.host.adminpw, raise_on_error=False) + if join.rc == 0: + return + + ipa.host.kinit() + ipa.host.conn.run(f"ipa host-add {client_hostname} --force", raise_on_error=False) + + def _fetch_ipa_host_keytab(self, ipa: IPA, principals: list[str], staging: str) -> ProcessResult: + ipa.host.kinit() + ipa.host.conn.run(f"rm -f {staging}", raise_on_error=False) + last: ProcessResult | None = None + for principal in principals: + last = ipa.host.conn.run( + f"ipa-getkeytab -s {ipa.server} -p {principal} -k {staging}", + raise_on_error=False, + ) + if last.rc == 0: + return last + assert last is not None + return last + + def ensure_ipa_host_keytab( + self, + ipa: IPA, + *, + client_hostname: str | None = None, + keytab: str = "/etc/krb5.keytab", + ) -> None: + """ + Ensure the client has ``host/`` in ``keytab`` for GSSAPI LDAP binds. + + Per-test client restore may drop ``/etc/krb5.keytab`` when the backup was + taken without it. If the host principal is missing in IPA (hostname was + changed without ``realm join``), enroll or ``ipa host-add`` first, then + fetch keys with ``ipa-getkeytab``. + + :param ipa: IPA role (LDAP + KDC on ``ipa.server``). + :type ipa: IPA + :param client_hostname: Client FQDN (default: ``hostname -f`` on the client). + :type client_hostname: str | None, optional + :param keytab: Host keytab path on the client. + :type keytab: str, optional + """ + if client_hostname is None: + fqdn = self.sssd.host.conn.run("hostname -f", raise_on_error=False).stdout.strip() + client_hostname = fqdn or self.sssd.host.hostname + + principal = f"host/{client_hostname}" + if self._ipa_host_keytab_has_principal(keytab, principal): + return + + self._ensure_ipa_host_enrolled(ipa, client_hostname) + if self._ipa_host_keytab_has_principal(keytab, principal): + return + + staging = self._IPA_HOST_KEYTAB_STAGING + principals = [principal, f"{principal}@{ipa.realm}"] + result = self._fetch_ipa_host_keytab(ipa, principals, staging) + if result.rc != 0: + msg = result.stderr or result.stdout or "unknown error" + raise RuntimeError(f"ipa-getkeytab failed for {principal}: {msg}") + + with tempfile.NamedTemporaryFile() as tmp: + ipa.host.fs.download(staging, tmp.name) + self.sssd.fs.upload(tmp.name, keytab) + + ipa.host.conn.run(f"rm -f {staging}", raise_on_error=False) + self.sssd.host.conn.run( + f"chmod 600 {keytab} && chown root:root {keytab}", + raise_on_error=False, + ) + + def _ldap_provider_krb5( + self, + server: str, + naming_context: str, + bind_user_dn: str, + bind_password: str, + *, + ipa: IPA | None, + ldap_schema: str | None, + discovery_domain: str | None, + gssapi: bool, + client_hostname: str | None, + ldap_krb5_keytab: str, + subids: bool, + cacert: str, + tls_reqcert: str, + ssl: bool, + replace_domain: bool | None, + config: dict[str, str] | None, + ) -> None: + if ipa is None: + raise ValueError("ipa is required when auth_provider is krb5") + + naming_context = naming_context.strip() + search_base = f"cn=accounts,{naming_context}" + + if ldap_schema is None: + ldap_schema = "IPA" + elif ldap_schema.lower() in ("ipa_v1", "ipa"): + ldap_schema = "IPA" + + if gssapi: + if not client_hostname: + fqdn = self.sssd.host.conn.run("hostname -f", raise_on_error=False).stdout.strip() + client_hostname = fqdn or self.sssd.host.hostname + self.ensure_ipa_host_keytab(ipa, client_hostname=client_hostname, keytab=ldap_krb5_keytab) + + self.krb_provider(ipa) + + domain_opts: dict[str, str] = { + "id_provider": "ldap", + "auth_provider": "krb5", + "access_provider": "permit", + "krb5_realm": ipa.realm, + "krb5_server": ipa.server, + "krb5_kpasswd": ipa.server, + "ldap_uri": f"ldap://{server}", + "ldap_search_base": search_base, + "ldap_schema": ldap_schema, + "ldap_tls_reqcert": tls_reqcert, + "ldap_tls_cacert": cacert, + "ldap_id_use_start_tls": "true", + } + + if gssapi: + domain_opts.update( + ldap_sasl_mech="GSSAPI", + ldap_sasl_authid=f"host/{client_hostname}", + ldap_krb5_keytab=ldap_krb5_keytab, + ldap_sasl_minssf="56", + ) + else: + domain_opts.update( + ldap_default_bind_dn=bind_user_dn, + ldap_default_authtok_type="password", + ldap_default_authtok=bind_password, + ) + + if ssl: + domain_opts.update( + ldap_uri=f"ldaps://{server}", + ldap_id_use_start_tls="False", + ) + + if subids: + domain_opts.update( + ldap_subid_ranges_search_base=f"cn=subids,cn=accounts,{naming_context}", + ldap_subuid_object_class="ipasubordinateidentry", + ldap_subuid_count="ipaSubUidCount", + ldap_subgid_count="ipaSubGidCount", + ldap_subuid_number="ipaSubUidNumber", + ldap_subgid_number="ipaSubGidNumber", + ldap_subid_range_owner="ipaOwner", + ) + + if replace_domain is None: + replace_domain = True + + domain_name = self.sssd.default_domain + if domain_name is None: + raise ValueError("No default SSSD domain is set") + + if replace_domain: + del self.sssd.config[f"domain/{domain_name}"] + self.sssd.config[f"domain/{domain_name}"] = domain_opts + else: + self.sssd.domain.clear() + self.sssd.domain.update(domain_opts) + + if discovery_domain is not None: + self.sssd.dom(domain_name)["dns_discovery_domain"] = discovery_domain + + enrolled = self.sssd.host.conn.run("test -f /etc/ipa/default.conf", raise_on_error=False).rc == 0 + if enrolled: + self.sssd.host.conn.run( + "ipa-client-install --configure-dns --configure-krb5 --unattended", + raise_on_error=False, + ) + else: + krb5_conf = ipa.host.fs.read("/etc/krb5.conf") + self.sssd.fs.write("/etc/krb5.conf", krb5_conf, user="root", group="root", mode="0644", dedent=False) + + if config is not None: + for key, value in config.items(): + self.sssd.domain[key] = value + + self.sssd.config_apply() + def proxy( self, proxy: Literal["files", "ldap"] = "files", diff --git a/sssd_test_framework/utils/tools.py b/sssd_test_framework/utils/tools.py index 77303143..c07f5cf7 100644 --- a/sssd_test_framework/utils/tools.py +++ b/sssd_test_framework/utils/tools.py @@ -10,6 +10,7 @@ from pytest_mh.utils.fs import LinuxFileSystem __all__ = [ + "AHostSv4Entry", "GetentUtils", "GroupEntry", "LinuxToolsUtils", @@ -601,6 +602,36 @@ def FromOutput(cls, stdout: str) -> HostsEntry: return cls.FromList(result) +class AHostSv4Entry(object): + """First IPv4 from getent ahostsv4 output (NSS host resolution on the test client).""" + + def __init__(self, ip: str | None) -> None: + self.ip: str | None = ip + """IPv4 dotted-quad (first column of ``getent ahostsv4`` output).""" + + def __str__(self) -> str: + return f"({self.ip})" + + def __repr__(self) -> str: + return str(self) + + @classmethod + def FromOutput(cls, stdout: str) -> AHostSv4Entry | None: + """ + Parse the first IPv4 address from getent ahostsv4 stdout. + + :param stdout: Raw command output. + :type stdout: str + :return: Entry with ip set, or None if no address line is found. + :rtype: AHostSv4Entry | None + """ + for line in stdout.splitlines(): + parts = line.split() + if parts: + return cls(ip=parts[0]) + return None + + class NetworksEntry(object): """ Result of ``getent networks`` @@ -1058,6 +1089,53 @@ def hosts(self, name: str, *, service: str | None = None) -> HostsEntry: """ return self.__exec(HostsEntry, "hosts", name, service) + def ahostsv4(self, name: str, *, service: str | None = None) -> AHostSv4Entry | None: + """ + Run getent ahostsv4 and return the first IPv4. + + Use client.net.dig for DNS A/SRV/PTR checks. + + :param name: Hostname or address to resolve. + :type name: str + :param service: Optional NSS source (getent -s). + :type service: str | None, optional + :return: Parsed entry, or None if lookup failed. + :rtype: AHostSv4Entry | None + """ + args: list[str] = [] + if service is not None: + args = ["-s", service] + + command = self.host.conn.exec(["getent", *args, "ahostsv4", str(name)], raise_on_error=False) + if command.rc != 0: + return None + + return AHostSv4Entry.FromOutput(command.stdout) + + def resolve_ipv4(self, name: str, *, host: object | None = None) -> str | None: + """ + Return IPv4 for name on this host. + + Uses host.ip from the multihost config when available, else ahostsv4(). + + :param name: Hostname to resolve. + :type name: str + :param host: Optional role host (provider.host, kdc.host, …) with ip. + :type host: object | None, optional + :return: IPv4 dotted-quad, or None if not found. + :rtype: str | None + """ + if host is not None: + role_ip = getattr(host, "ip", None) + if role_ip: + return str(role_ip) + + entry = self.ahostsv4(name) + if entry is not None and entry.ip is not None: + return entry.ip + + return None + def networks(self, name: str, *, service: str | None = None) -> NetworksEntry: """ Call ``getent networks $name`` diff --git a/tests/test_tools.py b/tests/test_tools.py new file mode 100644 index 00000000..8bbc2d8d --- /dev/null +++ b/tests/test_tools.py @@ -0,0 +1,25 @@ +"""Unit tests for :mod:`sssd_test_framework.utils.tools`.""" + +from __future__ import annotations + +import pytest + +from sssd_test_framework.utils.tools import AHostSv4Entry + + +@pytest.mark.parametrize( + "stdout, expected_ip", + [ + ("192.168.1.1 STREAM hostname.example\n", "192.168.1.1"), + ("10.0.0.5 STREAM foo\n10.0.0.6 STREAM foo\n", "10.0.0.5"), + ("", None), + ("\n\n", None), + ], +) +def test_ahostsv4_entry_from_output(stdout: str, expected_ip: str | None) -> None: + entry = AHostSv4Entry.FromOutput(stdout) + if expected_ip is None: + assert entry is None + else: + assert entry is not None + assert entry.ip == expected_ip