Source code for test_automation.targets.fvp.autofvpnetworking

# SPDX-FileCopyrightText: <text>Copyright 2025-2026 Arm Limited and/or its
# affiliates <open-source-office@arm.com></text>
#
# SPDX-License-Identifier: MIT
"""
Module to manage telnet sessions
"""

from typing import Dict, Optional, Tuple
import os
import re
import time
import datetime
import threading
import logging
import inspect
from pathlib import Path

import pexpect
from pexpect import exceptions as pexpect_exceptions

from test_automation.utils.logfiltering import (
    AnsiStrippingStream,
)

[docs] logger = logging.getLogger(__name__)
[docs] class TelnetSessionManager: """ Manage multiple telnet sessions for exposed telnet ports of FVP. 1. Read FVP boot log to get terminal-port mappings 2. Spawn pexpect based telnet session for each terminal 3. Continuously log for each telnet session's output 4. Check for login and shell prompts, send commands and redirect outputs. 5. Drain logs and clean up sessions on demand. """ def __init__( self, config=None, platform: Optional[str] = None, **options, ) -> None: """ Initializes the TelnetSessionManager. Optional arguments are accepted via **options for: - base_log_root: str = "logs" - timeout: int = 30 - encoding: str = "utf-8" - test_name: Optional[str] = None :param config: Loaded YAML config accessor (must expose get_platform). :param platform: Platform key to select from config. :param options: Optional keyword args (see above). :raises RuntimeError: If platform and config are not configured. """ # Defaults (options override these) base_log_root = options.get("base_log_root", "logs") timeout = int(options.get("timeout", 30)) encoding = options.get("encoding", "utf-8")
[docs] self.test_name: Optional[str] = options.get("test_name")
# Load required data from YAML Config if config is None or platform is None: raise RuntimeError( "TelnetSessionManager requires 'config' and 'platform' " "arguments" ) platform_cfg = self._init_load_platform_cfg(config, platform) self._init_load_timeouts(platform_cfg, platform) self._init_load_prompts(platform_cfg, platform) self._init_required_terminals(platform_cfg, platform) self._init_port_map(platform_cfg) self._init_paths_and_state(base_log_root, timeout, encoding) logger.debug( "Using %ds timeout for login prompt check, " "%ds for shell prompt check", self.login_prompt_timeout, self.shell_prompt_timeout, ) # init helpers
[docs] def _init_load_platform_cfg(self, config, platform: str) -> dict: """ Load the platform configuration from the provided configuration object. :param config: Configuration object exposing ``get_platform()``. :param platform: Platform key to retrieve from the configuration. :returns: Platform configuration dictionary returned by ``get_platform()``. :raises KeyError: If the platform key is not found. """ try: platform_cfg = config.get_platform(platform) except KeyError as e: raise KeyError(f"Platform '{platform}' not found in Config") from e logger.info( "Using %r platform from YAML Config", ( platform_cfg.strip() if isinstance(platform_cfg, str) else platform_cfg ), ) return platform_cfg
[docs] def _init_load_timeouts(self, platform_cfg: dict, platform: str) -> None: """ Initialize login and shell prompt timeouts from platform configuration. :param platform_cfg: Platform configuration dictionary containing timeout settings. :param platform: Platform name (used for error messages). :raises KeyError: If required timeout keys are missing. """ try: self.login_prompt_timeout = int( platform_cfg["timeouts"]["login_prompt"] ) self.shell_prompt_timeout = int( platform_cfg["timeouts"]["shell_prompt"] ) except KeyError as e: raise KeyError( "Missing required 'timeouts' keys in platform " f"'{platform}': {e}" ) from e
[docs] def _init_load_prompts(self, platform_cfg: dict, platform: str) -> None: """ Initialize default and per-terminal prompt strings from configuration. :param platform_cfg: Platform configuration dictionary containing prompt mappings. :param platform: Platform name (used for error messages). :raises KeyError: If default prompt keys are missing. """ try: self.default_login_prompt = platform_cfg["prompts"]["default"][ "login" ] self.default_shell_prompt = platform_cfg["prompts"]["default"][ "shell" ] except KeyError as e: raise KeyError( "Missing required 'prompts.default' keys in platform " f"'{platform}': {e}" ) from e self.login_prompt_map = dict( platform_cfg.get("prompts", {}).get("login_map", {}) ) self.shell_prompt_map = dict( platform_cfg.get("prompts", {}).get("shell_map", {}) )
[docs] def _init_required_terminals( self, platform_cfg: dict, platform: str ) -> None: """ Initialize the list of required terminal names. :param platform_cfg: Platform configuration dictionary. :param platform: Platform name (used for error messages). :raises KeyError: If ``required_terminals`` is missing or empty. """ req_terms = platform_cfg.get("required_terminals") if not isinstance(req_terms, list) or not req_terms: raise KeyError( f"Platform '{platform}' must define non-empty " "'required_terminals'" ) self.required_terminals = [ t.strip() for t in req_terms if t and isinstance(t, str) ]
[docs] def _init_port_map(self, platform_cfg: dict) -> None: """ Initialize terminal-to-port mapping for the current platform. :param platform_cfg: Platform configuration dictionary containing port mappings. """ self.port_map = dict(platform_cfg.get("port_map", {}))
[docs] def _init_paths_and_state( self, base_log_root: str, timeout: int, encoding: str, ) -> None: """ Initialize filesystem paths, log directories, and runtime state. :param base_log_root: Base directory path for all telnet log files. :param timeout: Default timeout in seconds for telnet interactions. :param encoding: Encoding used for log and I/O operations. """ self.base_log_dir = str(Path(base_log_root).expanduser().resolve()) self.timeout = timeout self.encoding = encoding self.sessions: Dict[int, pexpect.spawn] = {} self.logfiles: Dict[int, any] = {} self.threads: Dict[int, threading.Thread] = {} self.expected_prompts: Dict[int, str] = {} self.terminal_to_port: Dict[str, int] = {} self.running = True self.paused_ports: set = set() # Ports where logging is paused self.paused_lock = threading.Lock() # Lock for thread-safe access os.makedirs(self.base_log_dir, exist_ok=True) # Path for output log of executed command self.cmd_output_path = os.path.join( self.base_log_dir, "cmd_output.txt", )
# Core APIs
[docs] def _get_log_path( self, port: int, terminal_name: Optional[str] = None ) -> str: """ Get the log file path for a telnet session. The log filename is constructed as ``<terminal_name>_<port>.log`` when a terminal name is provided. If no terminal name is available, it falls back to ``telnet_<port>.log``. :param port: Telnet port number :param terminal_name: Optional terminal name :return: Full path to the log file """ if terminal_name: log_filename = f"{terminal_name}_{port}.log" else: log_filename = f"telnet_{port}.log" return os.path.join(self.base_log_dir, log_filename)
[docs] def create_and_start_session( self, port: int, terminal_name: Optional[str] = None ) -> None: """ Spawn a telnet session and start logging simultaneously. :param port: Telnet port of the telnet session. :param terminal_name: Optional name for prompt mapping. """ cmd = f"telnet localhost {port}" logger.info("Starting telnet session on port %d: %s", port, cmd) session = pexpect.spawn( cmd, timeout=self.timeout, encoding=self.encoding, codec_errors="replace", ) log_path = self._get_log_path(port, terminal_name) logfile = open( log_path, "w", encoding=self.encoding, errors="replace", buffering=1, ) session.logfile_read = AnsiStrippingStream(logfile) self.sessions[port] = session self.logfiles[port] = logfile if terminal_name: self.terminal_to_port[terminal_name] = port thread = threading.Thread( target=self._log_telnet_session, args=(port, session), daemon=True, ) thread.start() self.threads[port] = thread
[docs] def get_port(self, terminal_name: str) -> Optional[int]: """ Return the port number for a terminal name, or None if not registered. :param terminal_name: Terminal name to query. :returns: Port number if known, else None. """ port = self.terminal_to_port.get(terminal_name) if port is not None: logger.debug( "Found port %d for terminal '%s'", port, terminal_name, ) else: logger.debug("No port found for terminal '%s'", terminal_name) return port
[docs] def pause_logging(self, port: int) -> None: """ Pause the background logging thread for a port. This allows direct expect() calls on the session without the logging thread consuming the output. :param port: Port number to pause logging for. """ logger.debug("Pausing logging for port %d", port) with self.paused_lock: self.paused_ports.add(port) # Give the logging thread time to stop its current iteration time.sleep(0.2)
[docs] def resume_logging(self, port: int) -> None: """ Resume the background logging thread for a port. :param port: Port number to resume logging for. """ logger.debug("Resuming logging for port %d", port) with self.paused_lock: self.paused_ports.discard(port)
# logging loop
[docs] def _log_telnet_session(self, port: int, session: pexpect.spawn) -> None: """ Continuously log data from the session into its logfile, logs errors and exits cleanly when the session or logfile is closed. :param port: Port number of the telnet session. :param session: Active pexpect telnet session instance. """ logger.info("Logging console data started on port %d", port) while self.running and session.isalive(): # Skip logging when port is paused (to allow expect() calls) with self.paused_lock: is_paused = port in self.paused_ports if is_paused: time.sleep(0.1) continue if not self._log_iteration(port, session): break logger.info("Logging thread exiting for port %d", port)
[docs] def _log_iteration(self, port: int, session: pexpect.spawn) -> bool: """One non-blocking expect iteration from the original logic.""" try: # Expect TIMEOUT to force a non-blocking read & write session.expect([pexpect.TIMEOUT], timeout=0.1) return True except pexpect.TIMEOUT: # No new data right now; loop again return True except IOError as e: # Swallow closed file errors and exit logger.warning( "Port %d: I/O closed (%s); stopping log thread", port, e, ) return False except pexpect.EOF as e: # Connection closed by remote end logger.error( "Port %d: EOF from telnet session (%s); stopping log thread", port, e, ) return False
[docs] def _get_terminal_name( self, port: int, terminal_name: Optional[str] ) -> Optional[str]: """ Resolve the terminal name for a given port. Returns the explicit `terminal_name` if provided. Otherwise, uses `_term_for_port()` to resolve the name. """ if terminal_name is not None: return terminal_name terminal_map_name = self._term_for_port(port) if terminal_map_name is None: logger.debug( "No terminal name found for port %d, " "using default log filename (telnet_%d.log)", port, port, ) return terminal_map_name
# log scanning
[docs] def wait_for_prompt_in_log(self, *args, **kwargs) -> bool: """ Waits for a prompt/pattern in the telnet log file. - If ``regex`` is True: ``prompt`` is treated as a regex pattern. - If ``regex`` is False: ``prompt`` is treated as a literal string. - If ``regex`` is None: a heuristic decides if ``prompt`` looks like a regex (auto-detect). :param args: Positional arguments (port, prompt, timeout). :param kwargs: Keyword arguments used to pass options (port=..., prompt=..., timeout=...). :returns: True if the prompt/pattern is found in the log, otherwise False. """ port, prompt, timeout, regex, ignore_case = self._extract_wait_args( args, kwargs, ) if regex is None: regex = self._check_for_regex(prompt) terminal_name = self._get_terminal_name(port, None) log_path = self._get_log_path(port, terminal_name) logger.info( "Port %d: Waiting up to %ds for '%s'", port, timeout, prompt, ) prompt_pattern = self._compile_prompt_pattern( port, prompt, (regex, ignore_case), ) if prompt_pattern is None: return False return self._poll_until_found( (port, prompt, timeout, regex, log_path, prompt_pattern) )
[docs] def _extract_wait_args( self, args: tuple, kwargs: dict, ) -> tuple[int, str, int, bool | None, bool]: """ Extract and validate wait_for_prompt_in_log arguments. This helper pops known keyword arguments, applies positional args, validates there are no unknown kwargs, and ensures ``port`` and ``prompt`` are present. :param args: Positional args passed through. :param kwargs: Keyword args passed through (will be mutated). :returns: Tuple (port, prompt, timeout, regex, ignore_case). :raises TypeError: on invalid or missing arguments. """ port, prompt, timeout, regex, ignore_case = self._pop_wait_kwargs( kwargs ) port, prompt, timeout = self._apply_positional_args( args, (port, prompt, timeout), ) if kwargs: unknown = ", ".join(sorted(kwargs.keys())) raise TypeError( "wait_for_prompt_in_log() got unexpected keyword arguments: " f"{unknown}" ) if port is None or prompt is None: raise TypeError( "wait_for_prompt_in_log() requires port and " "prompt" ) return port, prompt, timeout, regex, ignore_case
[docs] def _pop_wait_kwargs( self, kwargs: dict, ) -> tuple[int | None, str | None, int, bool | None, bool]: """ Pop recognized keyword arguments for the wait helper. Recognized keys: port, prompt, timeout, regex, ignore_case. :param kwargs: Keyword arguments dict (mutated by popping). :returns: (port, prompt, timeout, regex, ignore_case). """ port = kwargs.pop("port", None) prompt = kwargs.pop("prompt", None) timeout = kwargs.pop("timeout", 120) regex = kwargs.pop("regex", None) ignore_case = kwargs.pop("ignore_case", True) return port, prompt, timeout, regex, ignore_case
[docs] def _apply_positional_args( self, args: tuple, current: tuple[int | None, str | None, int], ) -> tuple[int | None, str | None, int]: """ Apply positional arguments to (port, prompt, timeout). :param args: Positional args tuple. :param current: Tuple (port, prompt, timeout). :returns: Updated (port, prompt, timeout). :raises TypeError: If more than three positional args are provided. """ port, prompt, timeout = current if not args: return port, prompt, timeout if len(args) > 3: raise TypeError( "wait_for_prompt_in_log() takes at most 3 positional " "arguments (port, prompt, timeout)" ) if port is None and len(args) >= 1: port = args[0] if prompt is None and len(args) >= 2: prompt = args[1] if len(args) >= 3: timeout = args[2] return port, prompt, timeout
[docs] def _check_for_regex(self, text: str) -> bool: """ Determine if a string looks like a regex. Checks for common regex markers such as groups, classes, escapes, quantifiers and anchors. :param text: User given string. :returns: True if the string likely represents a regex pattern. """ regex_markers = ( "[", # character class "]", "(", # group ")", "{", # repetition "}", r"\s", # whitespace r"\d", # digit r"\w", # word r"\b", # word boundary r".*", # wildcard r"^", # start anchor r"$", # end anchor r"|", # alternation r"+", # quantifier r"?", # quantifier ) return any(marker in text for marker in regex_markers)
[docs] def _compile_prompt_pattern( self, port: int, prompt: str, options: tuple[bool, bool], ) -> re.Pattern | None: """ Compile the prompt into a regex.Pattern. :param port: Port number (used only for logging). :param prompt: Prompt text or regex pattern. :param options: Tuple (regex, ignore_case). :returns: Compiled pattern, or ``None`` on error. """ regex, ignore_case = options flags = re.MULTILINE if ignore_case: flags |= re.IGNORECASE pattern_text = prompt if regex else re.escape(prompt) try: return re.compile(pattern_text, flags) except re.error as exc: logger.error( "Port %d: Invalid regex pattern '%s' (%s)", port, prompt, exc, ) return None
[docs] def _poll_until_found( self, wait_params: tuple[int, str, int, bool, str, re.Pattern], ) -> bool: """ Poll the log file until the pattern is found or timeout occurs. :param wait_params: Tuple (port, prompt, timeout, regex, log_path, prompt_pattern). :returns: True if match found, otherwise False. """ port, prompt, timeout, regex, log_path, prompt_pattern = wait_params started_at = time.time() deadline = started_at + timeout poll_interval = 1.0 last_debug_time = 0.0 debug_interval = 30.0 match_mode = "regex" if regex else "literal" while True: now = time.time() if self._is_timeout_reached(now, deadline): logger.info( "Port %d: Timeout (%ds) waiting for %s '%s'", port, timeout, match_mode, prompt, ) return False matched_text = self._file_contains(log_path, prompt_pattern) if matched_text: logger.info( "Port %d: Found %s '%s'", port, match_mode, matched_text.strip(), ) return True last_debug_time = self._update_debug_status_if_needed( ( port, now, started_at, timeout, last_debug_time, debug_interval, ) ) time.sleep(poll_interval)
[docs] def _is_timeout_reached(self, now: float, deadline: float) -> bool: """ Return True if the current time has passed the deadline. :param now: Current time (seconds since epoch). :param deadline: Deadline timestamp. :returns: True if timeout reached, otherwise False. """ return now >= deadline
[docs] def _update_debug_status_if_needed( self, debug_params: tuple[int, float, float, int, float, float], ) -> float: """ Update debug status if enough time has passed since last debug. :param debug_params: Tuple (port, now, started_at, timeout, last_debug_time, debug_interval). :returns: Updated last debug time. """ port, now, started_at, timeout, last_debug_time, debug_interval = ( debug_params ) if now - last_debug_time >= debug_interval: self._debug_wait_status(port, started_at, timeout) return now return last_debug_time
[docs] def _file_contains(self, path: str, regex: re.Pattern): """ Return matched text if pattern is found, else False. :param path: Path to the log file to be scanned. :param regex: Compiled regular expression pattern to search for. :returns: The matched text if the pattern is found, else ``False``. """ try: with open( path, "r", encoding=self.encoding, errors="ignore", ) as f: content = f.read() match = regex.search(content) if match: return match.group(0) except FileNotFoundError: return False return False
[docs] def _debug_wait_status( self, port: int, started_at: float, timeout: int ) -> None: """ Log a periodic debug message while waiting for a prompt. :param port: Telnet port number associated with the session. :param started_at: Timestamp when waiting started. :param timeout: Total timeout value in seconds. :returns: None """ # Throttle the message to once every debug_interval seconds elapsed = int(time.time() - started_at) logger.debug( "Port %d: Still waiting for prompt after %ds", port, elapsed, )
# login flow
[docs] def login_if_needed( self, port: int, login_prompt: Optional[str] = None, shell_prompt: Optional[str] = None, ) -> bool: """ Ensure a shell is ready on the given port, logging in if required. :param port: Telnet port number. :param login_prompt: Optional login prompt regex/string override. :param shell_prompt: Optional shell prompt regex/string override. :returns: ``True`` if a shell prompt is observed; otherwise ``False``. """ session = self.sessions.get(port) if session is None: logger.info("Port %d: No session found; skipping login.", port) return False term_name = self._term_for_port(port) lp, sp = self._resolve_prompts(term_name, login_prompt, shell_prompt) logger.debug("Port %d: Checking for login prompt '%s'", port, lp) if self._handle_login_path(port, lp, sp): return True return self._handle_no_login_path(port, sp)
[docs] def _resolve_prompts( self, term_name: Optional[str], login_prompt: Optional[str], shell_prompt: Optional[str], ) -> Tuple[Optional[str], str]: """ Resolve effective login and shell prompts for a terminal. :param term_name: Logical terminal name, if known. :param login_prompt: Optional login prompt override. :param shell_prompt: Optional shell prompt override. :returns: Tuple where the first item may be ``None`` if no login is expected. """ lp = ( login_prompt if login_prompt is not None else self.login_prompt_map.get( term_name, self.default_login_prompt, ) ) sp = ( shell_prompt if shell_prompt is not None else self.shell_prompt_map.get( term_name, self.default_shell_prompt, ) ) return lp, sp
[docs] def _handle_login_path( self, port: int, login_prompt: Optional[str], shell_prompt: str, ) -> bool: """ Handle the login flow if a login prompt is expected and observed. :param port: Telnet port number. :param login_prompt: Login prompt regex/string, if any. :param shell_prompt: Expected shell prompt regex/string. :returns: ``True`` if shell prompt is reached; else ``False``. """ if not login_prompt: return False if self.wait_for_prompt_in_log( port, login_prompt, timeout=self.login_prompt_timeout, ): logger.info( "Port %d: Detected login prompt '%s', sending 'root'", port, login_prompt, ) self.sessions[port].sendline("root") return self._await_shell_after_login(port, shell_prompt) return False
[docs] def _await_shell_after_login(self, port: int, shell_prompt: str) -> bool: """ Wait for the shell prompt after credentials were submitted. :param port: Telnet port number. :param shell_prompt: Expected shell prompt regex/string. :returns: ``True`` if the shell prompt appears; else ``False``. """ logger.debug( "Port %d: Waiting for shell prompt '%s' after login", port, shell_prompt, ) if self.wait_for_prompt_in_log( port, shell_prompt, timeout=self.shell_prompt_timeout, ): logger.info( "Port %d: Login successful (shell prompt '%s')", port, shell_prompt, ) return True logger.warning( "Port %d: Shell prompt '%s' not found after sending credentials", port, shell_prompt, ) return False
[docs] def _handle_no_login_path(self, port: int, shell_prompt: str) -> bool: """ Ensure the shell is present when a login prompt is not expected/found. :param port: Telnet port number. :param shell_prompt: Expected shell prompt regex/string. :returns: ``True`` if the shell prompt is found; else ``False``. """ logger.debug( "Port %d: No/absent login prompt; checking for shell '%s'", port, shell_prompt, ) if self.wait_for_prompt_in_log( port, shell_prompt, timeout=self.shell_prompt_timeout, ): logger.info( "Port %d: Already logged in (shell prompt '%s')", port, shell_prompt, ) return True logger.warning( "Port %d: Shell prompt '%s' not found", port, shell_prompt, ) return False
[docs] def _term_for_port(self, port: int) -> Optional[str]: """ Resolve the logical terminal name for a given telnet port. :param port: Telnet port number. :returns: The terminal name mapped to ``port`` or ``None`` if unknown. """ # Determine terminal name for per-terminal maps for tname, p in self.terminal_to_port.items(): if p == port: return tname return None
# command exec
[docs] def execute_command_with_prompt_capture( self, port: int, command: str, timeout: Optional[int] = None ) -> Tuple[int, str]: """ Execute a shell command on the specified Telnet session, wait for its completion marker, and capture both exit code and output. :param port: Telnet port number associated with the active session. :param command: The shell command to be executed. :param timeout: Optional timeout in seconds for command execution. If not provided, the default session timeout is used. :returns: A tuple ``(exit_code, output)`` where: - ``exit_code`` is the integer exit status of the executed command. - ``output`` is the cleaned textual output of the command. :raises RuntimeError: If the Telnet session is not alive or terminal mapping cannot be resolved. """ timeout = timeout or self.timeout session, term_name = self._get_session_and_term(port) login_prompt, shell_prompt = self._get_prompts_for_term(term_name) # Pause background logging to prevent it from consuming expect() data self.pause_logging(port) try: # Pack prompts to keep arg count low prompts = (login_prompt, shell_prompt) # Ensure shell; return early if not ready if not self._ensure_shell_ready(port, prompts, timeout): return 1, (session.before or "") # Run command and wait for completion full_cmd, marker_literal, marker_regex = self._create_full_command( command ) self._send_and_try_echo(port, full_cmd, marker_literal) # Pack (shell_prompt, marker_regex) to keep arg count low expect = (shell_prompt, marker_regex) exit_code, errbuf = self._await_completion(port, expect, timeout) # Early return on timeout or error if exit_code is None: return 1, errbuf or (session.before or "") # Finish and persist output; pack meta to keep arg count low final_output = self._finalize_output_and_persist( session.before or "", (full_cmd, marker_regex, port, command) ) return exit_code, final_output finally: # Resume background logging self.resume_logging(port)
# helpers used by execute_command_with_prompt_capture
[docs] def _get_session_and_term(self, port: int) -> Tuple[pexpect.spawn, str]: """ Fetch the live pexpect session and its logical terminal name. :param port: Telnet port number whose session is required. :returns: Tuple of ``(session, terminal_name)``. :raises RuntimeError: If the session is not alive or the terminal name cannot be resolved. """ session = self.sessions.get(port) if not session or not session.isalive(): raise RuntimeError(f"Port {port}: session not alive") term_name = self._term_for_port(port) if not term_name: raise RuntimeError( "Port {port}: terminal name not found in terminal_to_port" ) return session, term_name
[docs] def _get_prompts_for_term( self, term_name: str ) -> Tuple[Optional[str], str]: """ Resolve the effective login and shell prompts for a terminal. :param term_name: Logical terminal name. :returns: Tuple ``(login_prompt, shell_prompt)``; the first may be ``None`` when no login is expected. :raises RuntimeError: If no shell prompt can be determined. """ login_prompt = self.login_prompt_map.get(term_name) shell_prompt = self.shell_prompt_map.get( term_name, self.default_shell_prompt, ) if not shell_prompt: raise RuntimeError( "No shell prompt defined for terminal " f"'{term_name}'" ) return login_prompt, shell_prompt
[docs] def _ensure_shell_ready( self, port: int, prompts: Tuple[Optional[str], str], timeout: int, ) -> bool: """ Ensure a shell is ready on the session, logging in if needed. :param port: Telnet port number. :param prompts: Tuple (login_prompt, shell_prompt). :param timeout: Seconds to wait for prompts. :returns: ``True`` if a shell prompt is confirmed; else ``False``. """ return self._ensure_shell_or_login( port, prompts, timeout, )
[docs] def _create_full_command( self, command: str ) -> Tuple[str, str, re.Pattern]: """ Build the final command and its completion marker. :param command: Shell command to execute. :returns: Tuple ``(full_cmd, marker_literal, marker_regex)`` where ``marker_literal`` is the plain prefix (e.g. ``"CMD_DONE_"``) and ``marker_regex`` captures the exit code. """ marker_literal = "CMD_DONE_" full_cmd = f"{command}; rc=$?; echo {marker_literal}$rc" marker_regex = re.compile(r"CMD_+DONE_(\d+)") return full_cmd, marker_literal, marker_regex
[docs] def _send_and_try_echo( self, port: int, full_cmd: str, marker_literal: str, ) -> None: """ Send the full command and wait for the marker echo. :param port: Telnet port number. :param full_cmd: Composite shell command to send. :param marker_literal: Plain marker prefix expected in the echo. :returns: ``None``. """ """ The pattern reconstructs the marker "CMD_DONE_$rc", allowing any stray/corrupted characters between each letter. This handles telnet session corruption where control characters escape sequences, or duplicated bytes might appear mid-transmission. """ pattern = re.compile( r"echo\s+" # echo + whitespace r".*?C" # C with possible junk before r".*?M" # M with possible junk before r".*?D" # D with possible junk before r".*?_" # underscore with possible junk before r".*?D" # D with possible junk before r".*?O" # O with possible junk before r".*?N" # N with possible junk before r".*?E" # E with possible junk before r"+" # one or more underscores with possible junk before r".*?\$" # $ with possible junk before r".*?r" # r with possible junk before r".*?c" # c with possible junk before r".*?\n" # newline with possible junk before ) time.sleep(2) logger.info("Port %s: Sending full command: %r", port, full_cmd) self.sessions[port].sendline(full_cmd) time.sleep(2) try: self.sessions[port].expect(pattern, timeout=10) except pexpect.TIMEOUT: logger.warning( "Port %s: Full command didn't echo properly; continuing...", port, )
# Await completion for regex marker and timeout
[docs] def _await_completion( self, port: int, expect: Tuple[str, re.Pattern], timeout: int, ) -> Tuple[Optional[int], Optional[str]]: """ Wait for the command execution to complete by detecting either the completion marker or the shell prompt in the session output. :param port: Telnet port number of the session. :param expect: Tuple (shell_prompt, marker_regex). :param timeout: Maximum number of seconds to wait for completion. :returns: A tuple ``(exit_code, error_buffer)``. """ shell_prompt, marker_regex = expect logger.debug( "Port %s: Waiting up to %s s for command done marker %r", port, timeout, marker_regex, ) kind, payload = self._expect_marker_or_prompt( port, expect, timeout, ) if kind == "timeout": return None, payload if kind == "marker": return self._parse_exit_from_match(payload, marker_regex) # kind == "prompt": payload is the buffer before the prompt return self._parse_exit_from_buffer( payload, marker_regex, port, )
[docs] def _expect_marker_or_prompt( self, port: int, expect: Tuple[str, re.Pattern], timeout: int, ) -> Tuple[str, object]: """ Waits for either the marker or the shell prompt. :returns : ("marker", session) when marker matched (use session.match) ("prompt", buffer) when shell prompt matched (session.before) ("timeout", buffer) on timeout (session.before) """ shell_prompt, marker_regex = expect session = self.sessions[port] try: idx = session.expect([marker_regex, shell_prompt], timeout=timeout) if idx == 0: return "marker", session return "prompt", (session.before or "") except pexpect.TIMEOUT: partial = session.before or "" logger.error( "Port %s: TIMEOUT waiting for command done marker or prompt. " "Partial buffer:\n%r", port, partial, ) return "timeout", partial
[docs] def _parse_exit_from_match( self, session: pexpect.spawn, marker_regex: re.Pattern ) -> Tuple[int, None]: """ Extract the command exit code from the session match object. :param session: The pexpect session whose last match contains the exit code. :param marker_regex: Compiled regex pattern that captures the exit code from the completion marker. :returns: A tuple ``(exit_code, None)``. Falls back to scanning the session buffer if the marker match cannot be parsed. """ try: return int(session.match.group(1)), None except (IndexError, ValueError, AttributeError): return ( self._fallback_exit_from_buffer( session.before or "", marker_regex, default=1, ), None, )
[docs] def _parse_exit_from_buffer( self, buffer: str, marker_regex: re.Pattern, port: int ) -> Tuple[Optional[int], Optional[str]]: """ Parse the exit code from a plain buffer when the shell prompt appears. :param buffer: The text preceding the shell prompt. :param marker_regex: Compiled regex pattern used to locate and extract the exit code. :param port: Telnet port number of the session (used for logging). :returns: A tuple ``(exit_code, error_buffer)``: - ``exit_code`` is the parsed integer exit status, or ``None`` if the marker was missing or invalid. - ``error_buffer`` is the raw buffer when no marker is found, otherwise ``None``. """ m = marker_regex.search(buffer) if m: try: return int(m.group(1)), None except (IndexError, ValueError): return 1, None logger.error( "Port %s: Shell prompt appeared but no command done marker found. " "Buffer:\n%r", port, buffer, ) return None, buffer
[docs] def _finalize_output_and_persist( self, raw_output: str, meta: Tuple[str, re.Pattern, int, str], ) -> str: """ Clean command output, remove any buffer, and copy the result to file. :param raw_output: Raw session output captured before the prompt or marker. :param meta: Tuple (full_cmd, marker_regex, port, command). :returns: The cleaned command output string. """ full_cmd, marker_regex, port, command = meta final_output = self._clean_exec_output( raw_output, full_cmd, marker_regex, ) self._persist_cmd_output(port, command, final_output) logger.debug("Command output:\n%s", final_output) return final_output
# ensure shell/login
[docs] def _ensure_shell_or_login( self, port: int, prompts: Tuple[Optional[str], str], timeout: int, ) -> bool: """ Ensure a usable shell, logging in first if necessary. :param port: Telnet port of the session. :param prompts: Tuple (login_prompt, shell_prompt). :param timeout: Seconds to wait for shell prompt if needed. :returns: ``True`` if shell is ready, else ``False``. """ login_prompt, shell_prompt = prompts if self._already_at_shell(port, shell_prompt): return True if login_prompt: return self._login_path_then_shell( port, (login_prompt, shell_prompt), timeout, ) return self._wait_for_shell_with_timeout( port, shell_prompt, timeout, )
[docs] def _expect_pattern( self, session: pexpect.spawn, pattern: str, timeout: int ) -> Tuple[bool, str]: """ Wait for a pattern, returning whether it matched and any partial buffer. :param session: Active pexpect session. :param pattern: Regex/string to expect. :param timeout: Seconds to wait for a match. :returns: Tuple ``(matched, buffer_on_timeout)`` where ``matched`` is ``True`` if the pattern matched; otherwise ``False`` and ``buffer_on_timeout`` contains ``session.before``. """ try: session.expect(pattern, timeout=timeout) return True, "" except pexpect.TIMEOUT: return False, (session.before or "")
[docs] def _already_at_shell( self, port: int, shell_prompt: str, ) -> bool: """ Check quickly whether we are already at the shell prompt. :param port: Telnet port of the session. :param shell_prompt: Expected shell prompt regex/string. :returns: ``True`` if the shell prompt is present, else ``False``. """ session = self.sessions[port] term_name = self._term_for_port(port) matched, _ = self._expect_pattern(session, shell_prompt, 2) if matched: logger.info( "Port %s (%s): Already at shell prompt; skipping login", port, term_name, ) return True return False
[docs] def _login_path_then_shell( self, port: int, prompts: Tuple[str, str], timeout: int, ) -> bool: """ Attempt the login path, then wait for the shell prompt. :param port: Telnet port of the session. :param prompts: Tuple (login_prompt, shell_prompt). :param timeout: Seconds to wait for the shell prompt if needed. :returns: ``True`` if shell is ready, else ``False``. """ login_prompt, shell_prompt = prompts session = self.sessions[port] term_name = self._term_for_port(port) logger.info( "Port %s (%s): Checking for login prompt %r", port, term_name, login_prompt, ) saw_login, _ = self._expect_pattern(session, login_prompt, 5) if saw_login: return self._after_saw_login_send_root_and_wait_shell( port, shell_prompt, ) return self._no_login_prompt_then_wait_shell( port, shell_prompt, timeout, )
[docs] def _after_saw_login_send_root_and_wait_shell( self, port: int, shell_prompt: str, ) -> bool: """ Send credentials after seeing the login prompt and wait for the shell. :param port: Telnet port of the session. :param shell_prompt: Expected shell prompt regex/string. :returns: ``True`` if shell prompt appears, else ``False``. """ term_name = self._term_for_port(port) session = self.sessions[port] logger.info( "Port %s (%s): Found login prompt; sending 'root'", port, term_name, ) session.sendline("root") matched_after, partial = self._expect_pattern( session, shell_prompt, self.shell_prompt_timeout, ) if matched_after: logger.info( "Port %s (%s): Shell prompt confirmed after login", port, term_name, ) return True logger.error( "Port %s (%s): TIMEOUT waiting for shell prompt after login. " "Partial:\n%r", port, term_name, partial, ) return False
[docs] def _no_login_prompt_then_wait_shell( self, port: int, shell_prompt: str, timeout: int, ) -> bool: """ If login prompt is not observed, wait directly for the shell prompt. :param port: Telnet port of the session. :param shell_prompt: Expected shell prompt regex/string. :param timeout: Seconds to wait for the shell prompt. :returns: ``True`` if shell prompt appears, else ``False``. """ term_name = self._term_for_port(port) logger.info( "Port %s (%s): No login prompt quickly; waiting for shell prompt " "instead", port, term_name, ) return self._wait_for_shell_with_timeout( port, shell_prompt, timeout, )
[docs] def _wait_for_shell_with_timeout( self, port: int, shell_prompt: str, timeout: int, ) -> bool: """ Wait for a shell prompt within a time span. :param port: Telnet port of the session. :param shell_prompt: Expected shell prompt regex/string. :param timeout: Seconds to wait for the shell prompt. :returns: ``True`` if shell prompt appears, else ``False``. """ session = self.sessions[port] term_name = self._term_for_port(port) matched, partial = self._expect_pattern(session, shell_prompt, timeout) if matched: logger.info("Port %s (%s): Shell prompt reached", port, term_name) return True logger.error( "Port %s (%s): TIMEOUT waiting for shell prompt. Partial:\n%r", port, term_name, partial, ) return False
# misc helpers
[docs] def _fallback_exit_from_buffer( self, buffer: str, marker_regex: re.Pattern, default: int = 1 ) -> int: """ Try to extract the exit code from a buffer using the marker regex. :param buffer: Text captured before the prompt/marker. :param marker_regex: Compiled regex that captures the exit code in group 1 (e.g. ``CMD_+DONE_(\\d+)``). :param default: Exit code to return when no code can be parsed. :returns: Parsed integer exit code, or ``default`` if unavailable. """ m2 = marker_regex.search(buffer) if not m2: return default try: return int(m2.group(1)) except (IndexError, ValueError): return default
[docs] def _clean_exec_output( self, raw_output: str, full_cmd: str, marker_regex ) -> str: """ Remove noise from raw command output and return a clean string. :param raw_output: Unfiltered text collected before the marker/prompt. :param full_cmd: Exact command string that was sent (for echo removal). :param marker_regex: Compiled regex for the completion marker line. :returns: Cleaned command output with noise removed. """ lines = raw_output.splitlines() skip_regexes = [ re.compile(r"^#"), # shell prompt lines re.compile(re.escape(full_cmd)), # echoed full command re.compile(r".*\$rc.*"), # any line with $rc re.compile(r".*rc=\$?.*"), # rc=$? fragments # CMD_DONE, CMD_DONNE, _DONE_$rc etc. re.compile(r".*CMD[_ ]?DON+E.*"), re.compile(r"^[A-Za-z]$"), # remove stray characters marker_regex, # final CMD_DONE_\d+ marker ] cleaned: list[str] = [] for ln in lines: s = ln.strip() if not s: continue if any(rx.match(s) or rx.search(s) for rx in skip_regexes): continue cleaned.append(s) return "\n".join(cleaned).rstrip()
[docs] def _detect_test_name(self) -> str: """ Decide a test name for logging, preferring an explicit name. :returns: Resolved test name or ``"<no test>"``. """ chosen = self._preferred_test_name() if chosen: return chosen return self._first_test_func_from_stack() or "<no test>"
[docs] def _preferred_test_name(self) -> Optional[str]: """ Return the preferred explicit test name when available. :returns: Test name if set, else None. """ if self.test_name and self.test_name != "session": return self.test_name return None
[docs] def _first_test_func_from_stack(self) -> Optional[str]: """ Scan the call stack and return the first ``test_*`` function name. :returns: Name of the first function starting with ``test_``, or ``None`` if no such frame exists. """ for frame_info in inspect.stack(): func = frame_info.function if func.startswith("test_"): return func return None
[docs] def _persist_cmd_output( self, port: int, command: str, output: str ) -> None: """ Append a command's output to the persistent command log file. :param port: Telnet port from which the command was executed. :param command: Exact shell command that was run. :param output: Final cleaned output to be written. :returns: None """ ts = datetime.datetime.now().isoformat() test = self._detect_test_name() with open( self.cmd_output_path, "a", encoding="utf-8", buffering=1, ) as f: f.write(f"[{ts}] Test: {test} Port: {port}\n") f.write(f"Command: {command}\n") f.write(output + "\n\n")
# prompt checks
[docs] def _check_prompt(self, term: str, port: int) -> None: """ Attempt to log in on required port, then wait for its shell prompt. :param term: Terminal name. :param port: Port number. """ login_prompt = self.login_prompt_map.get(term) if login_prompt: if not self.wait_for_prompt_in_log( port, login_prompt, timeout=self.login_prompt_timeout, ): logger.error( "%s:%s: Login prompt %r not found after %s s. Exiting.", term, port, login_prompt, self.login_prompt_timeout, ) return logger.info("Port %s: Found login prompt, sending 'root'", port) self.sessions[port].sendline("root") else: logger.debug( "%s:%s: No login prompt defined; skipping login step.", term, port, ) shell_prompt = self.shell_prompt_map.get(term) if not shell_prompt: logger.error( "%s:%s: No shell prompt in config; aborting", term, port, ) return if not self.wait_for_prompt_in_log( port, shell_prompt, timeout=self.shell_prompt_timeout, ): logger.error( "%s:%s: Shell prompt '%r' not found after %ss", term, port, shell_prompt, self.shell_prompt_timeout, ) return
# startup orchestration
[docs] def _normalize_terminal(self, raw: str) -> str: """ Normalize a terminal key into the canonical ``terminal_*`` name. :param raw: Raw terminal key from the FVP driver. :returns: Lowercased name prefixed with ``terminal_`` if missing. """ s = (raw or "").strip().lower() if not s.startswith("terminal_"): s = f"terminal_{s}" return s
[docs] def start_telnet_sessions_after_fvp_ready( self, fvp_driver, log_file_path: str = "" ) -> None: """ With LocalFVP: poll get_ports() briefly so multiple terminals can appear, then spawn Telnet sessions for required terminals; if none match, fall back to all. :param fvp_driver: Controller exposing terminal ports. :param log_file_path: Path to FVP boot log. """ actual_log = self._resolve_log_path(fvp_driver, log_file_path) self.fvp_boot_log_path = str(actual_log) logger.info("Using FVP boot log: %r", actual_log) if not hasattr(fvp_driver, "get_ports"): logger.error( "FVP driver has no get_ports(); cannot start Telnet sessions" ) return norm = self._discover_ports(fvp_driver) if not norm: logger.warning("No terminal→port mappings found; nothing to start") return started = self._spawn_required_then_fallback(norm) self._wait_for_started_prompts(started)
[docs] def _resolve_log_path(self, fvp_driver, log_file_path: str) -> str: """ Resolve the boot log path using the driver if possible. :param fvp_driver: FVP driver which may implement ``log_path()``. :param log_file_path: Fallback path to use if the driver raises or does not implement the method. :returns: Resolved log path or ``"<unknown>"`` if none is available. """ try: return fvp_driver.log_path() except Exception: return log_file_path or "<unknown>"
[docs] def _discover_ports(self, fvp_driver): """ Poll the driver for terminal→port mappings until they stabilize. The poll runs up to ``min(timeout, 15)`` seconds, returning the most recent normalized mapping. It stops early if all required terminals are present. :param fvp_driver: Driver exposing ``get_ports() -> Dict[str, int]``. :returns: Dict mapping normalized terminal names to ports. """ settle_s = min(max(5, int(self.timeout)), 15) deadline = time.time() + settle_s def _current_norm_ports(): raw = fvp_driver.get_ports() or {} return {self._normalize_terminal(k): v for k, v in raw.items()} norm = _current_norm_ports() last_count = -1 while time.time() < deadline and not all( t in norm for t in self.required_terminals ): if len(norm) != last_count: logger.debug("Port discovery progress: %s", norm) last_count = len(norm) time.sleep(1) norm = _current_norm_ports() if len(norm) != last_count: logger.debug("Final discovered ports: %s", norm) return norm
[docs] def _spawn_required_then_fallback( self, norm: Dict[str, int] ) -> Dict[str, int]: """ Spawn Telnet sessions for required terminals, or all as a fallback. :param norm: Mapping of normalized terminal names to ports. :returns: Mapping of terminal names to ports that actually started. """ started: Dict[str, int] = {} for term, port in norm.items(): if term in self.required_terminals: self.create_and_start_session(port, terminal_name=term) started[term] = port logger.debug( "Spawned Telnet session for required %r on port %s", term, port, ) if not started: logger.warning( "None of required_terminals matched discovered ports. " "required=%r, discovered=%r; starting all as fallback.", self.required_terminals, norm, ) for term, port in norm.items(): self.create_and_start_session(port, terminal_name=term) started[term] = port logger.debug( "Spawned Telnet session for %r on port %s (fallback)", term, port, ) return started
[docs] def _wait_for_started_prompts(self, started: Dict[str, int]) -> None: """ Wait for prompts on each started terminal. Spawns a thread per started terminal that checks the prompt, then joins all threads before returning. :param started: Mapping of terminal name to telnet port that has been started. :returns: None """ threads = [] for terminal, port in started.items(): t = threading.Thread( target=self._check_prompt, args=(terminal, port), daemon=True, ) t.start() threads.append(t) logger.debug( "Started prompt check thread for %r:%s", terminal, port, ) for t in threads: t.join() logger.info("Completed prompt checks for all Telnet sessions")
[docs] def close_sessions(self) -> None: """ Gracefully close all active telnet sessions and their log files. :returns: None """ self.running = False logger.info("Closing all telnet sessions and log files") # Terminate each pexpect session for port, session in self.sessions.items(): try: logger.debug("Shutting down telnet session on port %s", port) session.close(force=True) logger.info("Session on port %s closed successfully", port) except (OSError, pexpect_exceptions.ExceptionPexpect) as e: logger.error( "Failed to close session on port %s: %s", port, e, ) # Close each logfile for port, logfile in self.logfiles.items(): try: logger.debug("Closing logfile for port %s", port) logfile.close() logger.info("Logfile for port %s closed successfully", port) except OSError as e: logger.error( "Failed to close logfile for port %s: %s", port, e, ) logger.info( "All telnet sessions and respective log files have been closed" )
# utility
[docs] def set_log_dir(self, path: str) -> None: """ Set the base directory for telnet logs and initialize output file. Creates the directory if needed and ensures the command output file exists. Raises a ``RuntimeError`` on permission or OS errors. :param path: New base directory for logs. :returns: None :raises RuntimeError: If the directory or file cannot be created. """ from pathlib import Path self.base_log_dir = str(Path(path).expanduser().resolve()) try: os.makedirs(self.base_log_dir, exist_ok=True) except PermissionError as e: raise RuntimeError( "No permission to create log directory " f"{self.base_log_dir}: {e}" ) from e except OSError as e: raise RuntimeError( "Failed to create log directory " f"{self.base_log_dir}: {e}" ) from e self.cmd_output_path = os.path.join( self.base_log_dir, "cmd_output.txt" ) try: open(self.cmd_output_path, "w", encoding="utf-8").close() except PermissionError as e: raise RuntimeError( "No permission to create cmd_output.txt in " f"{self.base_log_dir}: {e}" ) from e except OSError as e: raise RuntimeError( "Failed to init cmd_output.txt in " f"{self.base_log_dir}: {e}" ) from e
[docs] def drain_console_to_tail(self, session: pexpect.spawn, timeout) -> None: """Consume any pending console output to start from a clean state. param session: The pexpect session to read from. param timeout: Maximum time to spend draining the console. """ deadline = time.monotonic() + timeout while time.monotonic() < deadline: try: session.read_nonblocking(size=4096, timeout=0.05) except pexpect.TIMEOUT: break except pexpect.EOF: break
[docs] def run_cmd( self, port: int, cmd: str, timeout: Optional[int] = None, ) -> str: """ Execute a command on the target console and return its output. :port: Telnet port number of the target console. :param cmd: Command string to execute. :param timeout: Maximum time to wait for command completion. :return: Command output with leading and trailing whitespace removed. :raises RuntimeError: If the command returns a non-zero status code. """ status, output = self.execute_command_with_prompt_capture( port=port, command=cmd, timeout=timeout, ) time.sleep(0.5) # Small delay to capture output if status != 0: raise RuntimeError(f"Command failed rc={status}: {cmd}\n{output}") return output.strip()