Source code for test_automation.targets.fvp.fvp_controller

#
# SPDX-FileCopyrightText: <text>Copyright 2025 Arm Limited and/or its
# affiliates <open-source-office@arm.com></text>
#
# SPDX-License-Identifier: MIT

from __future__ import annotations

from test_automation.targets.fvp.autofvpnetworking import (
    TelnetSessionManager,
)
from test_automation.utils.device import Device

import atexit
import logging
import re
import signal
import subprocess
import threading
import time
import os

from pathlib import Path
from typing import Dict, Optional

[docs] logger = logging.getLogger(__name__)
# --------------------------- # Exceptions # ---------------------------
[docs] class FVPError(RuntimeError): """Base exception for FVP related errors."""
[docs] class FVPStartError(FVPError): """Raised when the FVP fails to start."""
[docs] class FVPTimeout(FVPError): """ Raised when the FVP does not become ready within the expected timeout. """
[docs] class FVPStopped(FVPError): """Raised when the FVP process is unexpectedly stopped."""
# ------------------- # Local FVP platform # -------------------
[docs] class LocalFVP(Device): """Local FVP controller to start/stop/reset an FVP process using parameters from YAML. """ def __init__(self, platform_config: dict, *args, **options) -> None: """ Construct a LocalFVP instance from a platform configuration. :param platform_config: Parsed platform configuration from YAML. :param args: Legacy positional arguments. If provided, the first positional argument is ``TelnetSessionManager`` instance. :param options: Optional keyword arguments. :returns: ``None`` """ self._init_required_fields(platform_config) self._init_optional_and_telnet(platform_config, args, options) self._init_runtime_state() # init helpers
[docs] def _init_required_fields(self, platform_config: dict) -> None: """ Populate fields derived directly from the platform configuration. :param platform_config: Parsed platform configuration mapping. :returns: ``None`` """ self.binary = Path( os.path.expanduser(os.path.expandvars(platform_config["binary"])) ).resolve() raw_params = [ s.strip() for s in platform_config.get("arguments", []) if s and s.strip() ] self.arguments = [p for p in raw_params if not p.startswith("--data ")] self.data_args = [p for p in raw_params if p.startswith("--data ")] self.env = dict(platform_config.get("env", {})) self.workdir = ( Path(platform_config["workdir"]).expanduser().resolve() if "workdir" in platform_config else None ) self.ready_regex = platform_config.get( "ready_regex", r"(?P<term>terminal_[a-z0-9_]+):.*?port\s+(?P<port>\d+)", ) self.ready_timeout_s = int(platform_config.get("ready_timeout_s", 180))
[docs] def _init_optional_and_telnet( self, platform_config: dict, args: tuple, options: dict ) -> None: """ Resolve optional log settings and initialize the Telnet manager. :param platform_config: Parsed platform configuration mapping. :param args: Legacy positional args. If present, the first element may be a ``TelnetSessionManager`` instance. :param options: Keyword options that may include: * **log_dir** (str, optional) -- Override log directory path. * **log_prefix** (str, optional) -- Override log filename prefix. * **telnet_log_dir** (str, optional) -- Directory for Telnet logs. * **telnet_manager** (TelnetSessionManager, optional) -- Telnet session manager instance. Defaults to a new instance. :returns: ``None`` """ log_dir = options.get("log_dir") log_prefix = options.get("log_prefix") chosen_dir = log_dir or str( platform_config.get("log_dir", "./logs/fvp_logs") ) self.log_dir = Path(chosen_dir).expanduser().resolve() self.log_prefix = log_prefix or str( platform_config.get("log_prefix", "fvp_boot") ) telnet_manager = args[0] if args else options.get("telnet_manager") self.telnet_manager = telnet_manager or TelnetSessionManager() telnet_log_dir = options.get("telnet_log_dir") if telnet_log_dir and hasattr(self.telnet_manager, "set_log_dir"): self.telnet_manager.set_log_dir( str(Path(telnet_log_dir).expanduser().resolve()) )
[docs] def _init_runtime_state(self) -> None: """ Initialize mutable runtime state and atexit hook. """ self._proc: Optional[subprocess.Popen] = None self._log_file: Optional[Path] = None self._reader_thread: Optional[threading.Thread] = None self._ready = threading.Event() self._ports: Dict[str, int] = {} self._stop_read = threading.Event() atexit.register(self._atexit)
# FVP Lifecycle
[docs] def init(self) -> None: """ Initialize any pre-provisioning steps, currently no-op. """ logger.debug("LocalFVP.init(), nothing to pre-provision")
[docs] def start(self) -> None: """ Start the FVP process with the configured parameters. :raises FVPStartError: If the FVP process fails to start. """ if self.is_running(): logger.info("FVPController.start(): already running") return cmd = self._build_cmd() self._log_file = self._make_log_path() logger.info("Starting FVP: %s", cmd) try: self._proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, universal_newlines=True, bufsize=1, shell=True, start_new_session=True, ) except Exception as e: raise FVPStartError(f"Failed to start FVP: {e}") from e # start reader self._stop_read.clear() self._reader_thread = threading.Thread( target=self._read_loop, daemon=True ) self._reader_thread.start()
[docs] def _cleanup_threads_and_state(self) -> None: """ Join read thread, clear flags/state. """ self._stop_read.set() if self._reader_thread: self._reader_thread.join(timeout=2) self._reader_thread = None self._ready.clear() self._ports.clear() self._stop_read.clear() logger.debug("LocalFVP: threads/state cleanup complete")
[docs] def _signal(self, sig: int) -> None: """ Send a signal to the FVP process or its process group. :param sig: Signal number (e.g. :data:`signal.SIGTERM`). :returns: ``None`` """ proc = self._proc if not proc: return try: try: pgid = os.getpgid(proc.pid) except Exception: pgid = None if pgid is not None: os.killpg(pgid, sig) return if sig == signal.SIGTERM: proc.send_signal(sig) return proc.kill() except Exception as e: logger.warning( "Failed to send signal %s to process: %s", sig, e, )
[docs] def _terminate_proc(self) -> None: """ Terminate the FVP process normally, then forcefully if needed. """ if not self._proc: return try: self._signal(signal.SIGTERM) rc = self._proc.wait(timeout=10) logger.debug("FVP exited with rc=%s after SIGTERM", rc) except subprocess.TimeoutExpired: logger.warning("SIGTERM timed out; sending SIGKILL") self._signal(signal.SIGKILL) try: rc = self._proc.wait(timeout=5) logger.debug("FVP killed; rc=%s", rc) except Exception as e: logger.error("Failed to kill FVP: %s", e) finally: self._proc = None
[docs] def stop(self) -> None: """ Stop the FVP process and clean up telnet sessions. Ensures the reader thread exits, the process is terminated, and all telnet sessions are closed. """ self._stop_read.set() self._terminate_proc() self._cleanup_threads_and_state() closer = getattr(self.telnet_manager, "stop_all", None) or getattr( self.telnet_manager, "close_sessions", None ) if closer: try: closer() except Exception as e: logger.debug( "Ignoring telnet manager close error: %s", e, )
[docs] def reset(self) -> None: """ Restart the FVP process so the device returns to a known-good state. If it's running, stop then start; otherwise just start. """ logger.info("LocalFVP.reset(): resetting FVP process") if self.is_running(): try: self.stop() finally: time.sleep(20) self.start() self.wait_ready()
[docs] def wait_ready(self, timeout_s: Optional[int] = None) -> None: """ Block until the FVP is ready or the timeout expires. :param timeout_s: Timeout in seconds. Defaults to configured `ready_timeout_s`. :raises FVPTimeout: If the FVP is not ready within the timeout. """ timeout_s = timeout_s or self.ready_timeout_s logger.info("Waiting for FVP ready (timeout=%ss)", timeout_s) ok = self._ready.wait(timeout=timeout_s) if not ok: raise FVPTimeout(f"FVP did not become ready within {timeout_s}s") try: logger.debug("Starting Telnet sessions (ports=%s)", self._ports) self.telnet_manager.start_telnet_sessions_after_fvp_ready(self) except Exception as e: logger.warning("Telnet manager failed: %s", e)
[docs] def is_running(self) -> bool: """ Return True if the FVP process is currently running. """ return self._proc is not None and self._proc.poll() is None
[docs] def get_ports(self) -> Dict[str, int]: """ Return mapping of terminal names to telnet ports. """ return dict(self._ports)
[docs] def log_path(self) -> Path: """ Get the path to the current boot log file. :raises FVPError: If start() has not been called yet. """ if not self._log_file: raise FVPError("No log path yet – start() not called") return self._log_file
# ---- helpers ----
[docs] def _build_cmd(self) -> str: """ Construct the final command string for launching the FVP. """ bin_path = str(Path(self.binary).expanduser().resolve()) parts = [bin_path, *self.arguments, *self.data_args] logger.debug("Built FVP cmd (%d parts): %r", len(parts), parts) return " ".join(parts)
[docs] def _make_log_path(self) -> Path: ts = time.strftime("%Y%m%d_%H%M%S") return self.log_dir / f"{self.log_prefix}_{ts}.log"
[docs] def _process_stdout_line( self, line: str, pattern: re.Pattern[str], first_port_seen: bool ) -> bool: """ Process one line of FVP stdout, update ports, and set readiness state. :param line: A single line from the FVP process stdout. :param pattern: Compiled regex to match terminal/port info. :param first_port_seen: Whether a port was already discovered before. :returns: True if this line contained the first discovered port, otherwise the prior ``first_port_seen`` value. """ m = pattern.search(line) if not m: return first_port_seen name = m.group("term").lower() port = int(m.group("port")) self._ports[name] = port logger.debug("Discovered port: %s -> %s", name.lower(), port) if not first_port_seen: self._ready.set() logger.debug("First port seen; ready event set") return True return first_port_seen
[docs] def _read_loop(self) -> None: """ Read process output, write to boot log, and capture telnet port mappings. Sets ready after the first port is discovered. """ proc = self._proc stdout = getattr(proc, "stdout", None) if stdout is None: return pattern = re.compile(self.ready_regex, re.IGNORECASE) first_port_seen = False log_path = self._log_file or self._make_log_path() logger.debug("Reader writing boot log to: %s", log_path) try: with open(log_path, "w", encoding="utf-8") as logf: for line in stdout: if self._stop_read.is_set() or not line: break logf.write(line) logf.flush() first_port_seen = self._process_stdout_line( line, pattern, first_port_seen ) finally: logger.info("FVP stdout closed")
[docs] def _atexit(self) -> None: """Ensure the FVP process and sessions are stopped on exit.""" try: self.stop() except Exception: pass