Source code for test_automation.targets.fvp.autofvpnetworking

#
# SPDX-FileCopyrightText: <text>Copyright 2025 Arm Limited and/or its
# affiliates <open-source-office@arm.com></text>
#
# SPDX-License-Identifier: MIT
"""
Module to manage telnet sessions
"""
from typing import Dict, Optional, Tuple
import os
import re
import time
import datetime
import threading
import logging
import inspect
from pathlib import Path

import pexpect
from pexpect import exceptions as pexpect_exceptions

from test_automation.utils.logfiltering import (
    AnsiStrippingStream,
)

[docs] logger = logging.getLogger(__name__)
[docs] class TelnetSessionManager: """ Manage multiple telnet sessions for exposed telnet ports of FVP. 1. Read FVP boot log to get terminal-port mappings 2. Spawn pexpect based telnet session for each terminal 3. Continuously log for each telnet session's output 4. Check for login and shell prompts, send commands and redirect outputs. """ def __init__( self, config=None, platform: Optional[str] = None, **options, ) -> None: """ Initializes the TelnetSessionManager. Optional arguments are accepted via **options for: - base_log_root: str = "logs" - timeout: int = 30 - encoding: str = "utf-8" - test_name: Optional[str] = None :param config: Loaded YAML config accessor (must expose get_platform). :param platform: Platform key to select from config. :param options: Optional keyword args (see above). :raises RuntimeError: If platform and config are not configured. """ # Defaults (options override these) base_log_root = options.get("base_log_root", "logs") timeout = int(options.get("timeout", 30)) encoding = options.get("encoding", "utf-8")
[docs] self.test_name: Optional[str] = options.get("test_name")
# ---- Load required data from YAML Config (no legacy .properties) ---- if config is None or platform is None: raise RuntimeError( "TelnetSessionManager requires 'config' and 'platform' " "arguments" ) try: platform_cfg = config.get_platform(platform) except KeyError as e: raise KeyError(f"Platform '{platform}' not found in Config") from e # Required timeouts try: self.login_prompt_timeout = int( platform_cfg["timeouts"]["login_prompt"] ) self.shell_prompt_timeout = int( platform_cfg["timeouts"]["shell_prompt"] ) except KeyError as e: raise KeyError( "Missing required 'timeouts' keys in platform " f"'{platform}': {e}" ) from e # Required default prompts try: self.default_login_prompt = platform_cfg["prompts"]["default"][ "login" ] self.default_shell_prompt = platform_cfg["prompts"]["default"][ "shell" ] except KeyError as e: raise KeyError( "Missing required 'prompts.default' keys in platform " f"'{platform}': {e}" ) from e # Optional per-terminal maps
[docs] self.login_prompt_map = dict( platform_cfg.get("prompts", {}).get("login_map", {}) )
[docs] self.shell_prompt_map = dict( platform_cfg.get("prompts", {}).get("shell_map", {}) )
# Required terminals req_terms = platform_cfg.get("required_terminals") if not isinstance(req_terms, list) or not req_terms: raise KeyError( f"Platform '{platform}' must define non-empty " "'required_terminals'" )
[docs] self.required_terminals = [ t.strip() for t in req_terms if t and isinstance(t, str) ]
# Optional port_map
[docs] self.port_map = dict(platform_cfg.get("port_map", {}))
logger.info( "Using %r platform from YAML Config", ( platform_cfg.strip() if isinstance(platform_cfg, str) else platform_cfg ), )
[docs] self.base_log_dir = str(Path(base_log_root).expanduser().resolve())
[docs] self.timeout = timeout
[docs] self.encoding = encoding
[docs] self.sessions: Dict[int, pexpect.spawn] = {}
[docs] self.logfiles: Dict[int, any] = {}
[docs] self.threads: Dict[int, threading.Thread] = {}
[docs] self.expected_prompts: Dict[int, str] = {}
[docs] self.terminal_to_port: Dict[str, int] = {}
[docs] self.running = True
os.makedirs(self.base_log_dir, exist_ok=True) # Path for output log of executed command
[docs] self.cmd_output_path = os.path.join( self.base_log_dir, "cmd_output.txt" )
logger.debug( "Using %ds timeout for login prompt check, %ds for shell prompt " "check", self.login_prompt_timeout, self.shell_prompt_timeout, )
[docs] def create_and_start_session( self, port: int, terminal_name: Optional[str] = None ) -> None: """ Spawn a telnet session and start logging simultaneously. :param port: Telnet port of the telnet session. :param terminal_name: Optional name for prompt mapping. """ cmd = f"telnet localhost {port}" logger.info("Starting telnet session on port %d: %s", port, cmd) session = pexpect.spawn( cmd, timeout=self.timeout, encoding=self.encoding ) log_path = os.path.join(self.base_log_dir, f"telnet_{port}.log") logfile = open(log_path, "w", encoding=self.encoding, buffering=1) session.logfile_read = AnsiStrippingStream(logfile) self.sessions[port] = session self.logfiles[port] = logfile if terminal_name: self.terminal_to_port[terminal_name] = port thread = threading.Thread( target=self._log_telnet_session, args=(port, session), daemon=True, ) thread.start() self.threads[port] = thread
[docs] def get_port(self, terminal_name: str) -> Optional[int]: """ Return the port number for a terminal name, or None if not registered. :param terminal_name: Terminal name to query. :returns: Port number if known, else None. """ port = self.terminal_to_port.get(terminal_name) if port is not None: logger.debug( "Found port %d for terminal '%s'", port, terminal_name ) else: logger.debug("No port found for terminal '%s'", terminal_name) return port
[docs] def _log_telnet_session(self, port: int, session: pexpect.spawn) -> None: """ Continuously log data from the session into its logfile, logs errors and exits cleanly when the session or logfile is closed. :param port: Port number of the telnet session. :param session: Active pexpect telnet session instance. """ logger.info("Logging console data started on port %d", port) while self.running and session.isalive(): try: # Expect TIMEOUT to force a non-blocking read & write session.expect([pexpect.TIMEOUT], timeout=0.1) except pexpect.TIMEOUT: # No new data right now; loop again continue except IOError as e: # Swallow closed file errors and exit logger.warning( "Port %d: I/O closed (%s); stopping log thread", port, e, ) break except pexpect.EOF as e: # Connection closed by remote end logger.error( "Port %d: EOF from telnet session (%s); " "stopping log thread", port, e, ) break logger.info("Logging thread exiting for port %d", port)
[docs] def wait_for_prompt_in_log( self, port: int, prompt: str, timeout: int = 120 ) -> bool: """ Wait until the specified prompt appears in telnet_<port>.log. Checks periodically until found or timeout expires. :param port: Port number of the session. :param prompt: Regex or literal string prompt to wait for. :param timeout: Seconds to wait before timing out. :returns: True if prompt is found, False otherwise. """ log_path = os.path.join(self.base_log_dir, f"telnet_{port}.log") logger.info( "Port %d: Waiting up to %ds for prompt '%s'", port, timeout, prompt, ) deadline = time.time() + timeout poll_interval = 1.0 # track when last logged a debug retry last_debug_time = 0.0 debug_interval = 30.0 prompt_pattern = re.compile(prompt) while True: now = time.time() if now >= deadline: logger.info( "Port %d: Timeout (%ds) waiting for prompt '%s'", port, timeout, prompt, ) return False try: with open(log_path, "r", encoding=self.encoding) as f: content = f.read() if prompt_pattern.search(content): logger.info( "Found prompt '%s' for port %d", prompt, port ) return True except FileNotFoundError: if now - last_debug_time >= debug_interval: logger.debug("Port %d: Log file not yet created", port) last_debug_time = now # throttle the message to once every debug_interval seconds if now - last_debug_time >= debug_interval: elapsed = int(now - (deadline - timeout)) logger.debug( "Port %d: Still waiting for prompt after %ds", port, elapsed, ) last_debug_time = now time.sleep(poll_interval)
[docs] def login_if_needed( self, port: int, login_prompt: Optional[str] = None, shell_prompt: Optional[str] = None, ) -> bool: """ If console is at a login prompt, send 'root' and wait for shell_prompt. If no login prompt appears, wait for shell_prompt directly. :param port: Port to use. :param login_prompt: Expected login prompt string. :param shell_prompt: Expected shell prompt string. :returns: True if shell prompt is seen, False otherwise. """ session = self.sessions.get(port) if session is None: logger.info("Port %d: No session found; skipping login.", port) return False # Determine terminal name for per-terminal maps term_name = None for tname, p in self.terminal_to_port.items(): if p == port: term_name = tname break # Select prompts from per-terminal map or defaults from YAML if login_prompt is None: login_prompt = self.login_prompt_map.get( term_name, self.default_login_prompt ) if shell_prompt is None: shell_prompt = self.shell_prompt_map.get( term_name, self.default_shell_prompt ) logger.debug( "Port %d: Checking for login prompt '%s'", port, login_prompt ) if self.wait_for_prompt_in_log( port, login_prompt, timeout=self.login_prompt_timeout ): logger.info( "Port %d: Detected login prompt '%s', sending 'root'", port, login_prompt, ) session.sendline("root") logger.debug( "Port %d: Waiting for shell prompt '%s' after login", port, shell_prompt, ) if self.wait_for_prompt_in_log( port, shell_prompt, timeout=self.shell_prompt_timeout ): logger.info( "Port %d: Login successful (shell prompt '%s')", port, shell_prompt, ) return True logger.warning( "Port %d: Shell prompt '%s' not found after " "sending credentials", port, shell_prompt, ) return False logger.debug( "Port %d: No login prompt '%s' found; checking for shell '%s'", port, login_prompt, shell_prompt, ) if self.wait_for_prompt_in_log( port, shell_prompt, timeout=self.shell_prompt_timeout ): logger.info( "Port %d: Already logged in (shell prompt '%s')", port, shell_prompt, ) return True logger.warning( "Port %d: Shell prompt '%s' not found", port, shell_prompt ) return False
[docs] def execute_command_with_prompt_capture( self, port: int, command: str, timeout: Optional[int] = None ) -> Tuple[int, str]: """ Execute a command with a completion marker to capture exit code and output. :param port: Port number for the telnet session. :param command: Shell command to run. :param timeout: Timeout for waiting on shell prompt and marker. :returns: (exit_code, output) tuple. """ timeout = timeout or self.timeout session = self.sessions.get(port) if not session or not session.isalive(): raise RuntimeError(f"Port {port}: session not alive") # Identify terminal name for this port term_name = next( (t for t, p in self.terminal_to_port.items() if p == port), None, ) if not term_name: raise RuntimeError( "Port {port}: terminal name not found in terminal_to_port" ) login_prompt = self.login_prompt_map.get(term_name) shell_prompt = self.shell_prompt_map.get( term_name, self.default_shell_prompt ) if not shell_prompt: raise RuntimeError( "No shell prompt defined for terminal " f"'{term_name}' (port {port})" ) # -------------- # login section # -------------- # Quick check: if already at shell try: session.expect(shell_prompt, timeout=2) logger.info( "Port %s (%s): Already at shell prompt; skipping login", port, term_name, ) except pexpect.TIMEOUT: # Not at shell; if we have a login prompt pattern, try to log in if login_prompt: logger.info( "Port %s (%s): Checking for login prompt %r", port, term_name, login_prompt, ) try: session.expect(login_prompt, timeout=5) logger.info( "Port %s (%s): Found login prompt; sending 'root'", port, term_name, ) session.sendline("root") try: session.expect( shell_prompt, timeout=self.shell_prompt_timeout, ) logger.info( "Port %s (%s): Shell prompt confirmed after " "login", port, term_name, ) except pexpect.TIMEOUT: partial = session.before or "" logger.error( "Port %s (%s): TIMEOUT waiting for shell prompt " "after login. Partial:\n%r", port, term_name, partial, ) return 1, partial except pexpect.TIMEOUT: logger.info( "Port %s (%s): No login prompt quickly; waiting for " "shell prompt instead", port, term_name, ) try: session.expect(shell_prompt, timeout=timeout) logger.info( "Port %s (%s): Shell prompt appeared " "(no login step)", port, term_name, ) except pexpect.TIMEOUT: partial = session.before or "" logger.error( "Port %s (%s): TIMEOUT waiting for shell prompt. " "Partial:\n%r", port, term_name, partial, ) return 1, partial else: # No login prompt expected—just wait for the shell prompt logger.info( "Port %s (%s): No login prompt configured; waiting " "for shell prompt", port, term_name, ) try: session.expect(shell_prompt, timeout=timeout) logger.info( "Port %s (%s): Shell prompt reached", port, term_name, ) except pexpect.TIMEOUT: partial = session.before or "" logger.error( "Port %s (%s): TIMEOUT waiting for shell prompt. " "Partial:\n%r", port, term_name, partial, ) return 1, partial # ------------------ # shell run command # ------------------ time.sleep(2) command_complete_marker = "CMD_DONE_" full_cmd = f"{command}; rc=$?; echo {command_complete_marker}$rc" logger.info("Port %s: Sending full command: %r", port, full_cmd) session.sendline(full_cmd) time.sleep(2) try: session.expect_exact(command_complete_marker, timeout=10) except pexpect.TIMEOUT: logger.warning( "Port %s: Full command didn't echo properly; continuing...", port, ) marker_regex = re.compile(r"CMD_+DONE_(\d+)") logger.debug( "Port %s: Waiting up to %s s for command done marker %r", port, timeout, marker_regex, ) try: idx = session.expect([marker_regex, shell_prompt], timeout=timeout) if idx == 0: exit_code = int(session.match.group(1)) logger.debug( "Port %s: Got command done marker, exit_code=%s", port, exit_code, ) else: buffer = session.before or "" m = marker_regex.search(buffer) if m: exit_code = int(m.group(1)) logger.debug( "Port %s: Found command done marker in buffer, " "exit_code=%s", port, exit_code, ) else: logger.error( "Port %s: Shell prompt appeared but no command done " "marker found. Buffer:\n%r", port, buffer, ) return 1, buffer except pexpect.TIMEOUT: partial = session.before or "" logger.error( "Port %s: TIMEOUT waiting for command done marker or prompt. " "Partial buffer:\n%r", port, partial, ) return 1, partial # Collect command output (lines before marker, minus echoes/empties) raw_output = session.before or "" lines = raw_output.splitlines() # Regex patterns to skip skip_regexes = [ re.compile(r"^#"), # shell prompt lines re.compile(re.escape(full_cmd)), # echoed full command re.compile(r".*\$rc.*"), # any line with $rc re.compile(r".*rc=\$?.*"), # rc=$? fragments # CMD_DONE, CMD_DONNE, _DONE_$rc etc. re.compile(r".*CMD[_ ]?DON+E.*"), marker_regex, # final CMD_DONE_\d+ marker ] cleaned: list[str] = [] for ln in lines: s = ln.strip() if not s: continue if any(rx.match(s) or rx.search(s) for rx in skip_regexes): continue cleaned.append(s) final_output = "\n".join(cleaned).rstrip() # ------------------------- # Detect test name # ------------------------- test = None if self.test_name and self.test_name != "session": test = self.test_name else: for frame_info in inspect.stack(): func = frame_info.function if func.startswith("test_"): test = func break if not test: test = "<no test>" # Persist output to file ts = datetime.datetime.now().isoformat() with open( self.cmd_output_path, "a", encoding="utf-8", buffering=1 ) as f: f.write(f"[{ts}] Test: {test} Port: {port}\n") f.write(f"Command: {command}\n") f.write(final_output + "\n\n") logger.debug( "Port %s: Executed command with prompt capture returning " "exit_code=%s", port, exit_code, ) logger.debug("Command output:\n%s", final_output) return exit_code, final_output
[docs] def _check_prompt(self, term: str, port: int) -> None: """ Attempt to log in on required port, then wait for its shell prompt. :param term: Terminal name. :param port: Port number. """ # Check and attempt login, if login prompt defined login_prompt = self.login_prompt_map.get(term) if login_prompt: if not self.wait_for_prompt_in_log( port, login_prompt, timeout=self.login_prompt_timeout ): logger.error( "%s:%s: Login prompt %r not found after %s s. Exiting.", term, port, login_prompt, self.login_prompt_timeout, ) return logger.info("Port %s: Found login prompt, sending 'root'", port) self.sessions[port].sendline("root") else: logger.debug( "%s:%s: No login prompt defined; skipping login step.", term, port, ) # Wait for shell prompt shell_prompt = self.shell_prompt_map.get(term) if not shell_prompt: logger.error( "%s:%s: No shell prompt in config; aborting", term, port ) return if not self.wait_for_prompt_in_log( port, shell_prompt, timeout=self.shell_prompt_timeout ): logger.error( "%s:%s: Shell prompt '%r' not found after %ss", term, port, shell_prompt, self.shell_prompt_timeout, ) return
[docs] def _normalize_term(self, raw: str) -> str: s = (raw or "").strip().lower() if not s.startswith("terminal_"): s = f"terminal_{s}" return s
[docs] def start_telnet_sessions_after_fvp_ready( self, fvp_driver, log_file_path: str = "" ) -> None: """ With LocalFVP: poll get_ports() briefly so multiple terminals can appear, then spawn Telnet sessions for required terminals; if none match, fall back to all. :param fvp_driver: Controller exposing terminal ports. :param log_file_path: Path to FVP boot log. """ # Friendly log path (optional) try: actual_log = fvp_driver.log_path() except Exception: actual_log = log_file_path or "<unknown>" logger.info("Using FVP boot log: %r", actual_log) if not hasattr(fvp_driver, "get_ports"): logger.error( "FVP driver has no get_ports(); cannot start Telnet sessions" ) return # Poll for ports to settle: up to min(self.timeout, 15) seconds settle_s = min(max(5, int(self.timeout)), 15) deadline = time.time() + settle_s def _current_norm_ports(): raw = fvp_driver.get_ports() or {} return {self._normalize_term(k): v for k, v in raw.items()} norm = _current_norm_ports() last_count = -1 while time.time() < deadline and not all( t in norm for t in self.required_terminals ): if len(norm) != last_count: logger.debug("Port discovery progress: %s", norm) last_count = len(norm) time.sleep(1) norm = _current_norm_ports() if len(norm) != last_count: logger.debug("Final discovered ports: %s", norm) if not norm: logger.warning("No terminal→port mappings found; nothing to start") return # Spawn sessions for required terminals first started = {} for term, port in norm.items(): if term in self.required_terminals: self.create_and_start_session(port, terminal_name=term) started[term] = port logger.debug( "Spawned Telnet session for required %r on port %s", term, port, ) # Fallback: if none matched (naming differences), start all if not started: logger.warning( "None of required_terminals matched discovered ports. " "required=%r, discovered=%r; starting all as fallback.", self.required_terminals, norm, ) for term, port in norm.items(): self.create_and_start_session(port, terminal_name=term) started[term] = port logger.debug( "Spawned Telnet session for %r on port %s (fallback)", term, port, ) # Wait for prompts on each started terminal threads = [] for terminal, port in started.items(): t = threading.Thread( target=self._check_prompt, args=(terminal, port), daemon=True, ) t.start() threads.append(t) logger.debug( "Started prompt check thread for %r:%s", terminal, port ) for t in threads: t.join() logger.info("Completed prompt checks for all Telnet sessions")
[docs] def close_sessions(self) -> None: """ Gracefully terminate all active Telnet sessions and close log files. """ self.running = False logger.info("Closing all telnet sessions and log files") # Terminate each pexpect session for port, session in self.sessions.items(): try: logger.debug("Shutting down telnet session on port %s", port) session.close(force=True) logger.info("Session on port %s closed successfully", port) except (OSError, pexpect_exceptions.ExceptionPexpect) as e: logger.error("Failed to close session on port %s: %s", port, e) # Close each logfile for port, logfile in self.logfiles.items(): try: logger.debug("Closing logfile for port %s", port) logfile.close() logger.info("Logfile for port %s closed successfully", port) except OSError as e: logger.error( "Failed to close logfile for port %s: %s", port, e ) logger.info( "All telnet sessions and respective log files have been closed" )
[docs] def set_log_dir(self, path: str) -> None: """Force all telnet logs under this directory.""" from pathlib import Path self.base_log_dir = str(Path(path).expanduser().resolve()) try: os.makedirs(self.base_log_dir, exist_ok=True) except PermissionError as e: raise RuntimeError( "No permission to create log directory " f"{self.base_log_dir}: {e}" ) from e except OSError as e: raise RuntimeError( "Failed to create log directory " f"{self.base_log_dir}: {e}" ) from e # Keep command output file in sync with the new base dir self.cmd_output_path = os.path.join( self.base_log_dir, "cmd_output.txt" ) try: open(self.cmd_output_path, "w", encoding="utf-8").close() except PermissionError as e: raise RuntimeError( "No permission to create cmd_output.txt in " f"{self.base_log_dir}: {e}" ) from e except OSError as e: raise RuntimeError( "Failed to init cmd_output.txt in " f"{self.base_log_dir}: {e}" ) from e