#
# SPDX-FileCopyrightText: <text>Copyright 2025 Arm Limited and/or its
# affiliates <open-source-office@arm.com></text>
#
# SPDX-License-Identifier: MIT
from __future__ import annotations
from test_automation.targets.fvp.autofvpnetworking import (
TelnetSessionManager,
)
from test_automation.utils.device import Device
import atexit
import logging
import re
import signal
import subprocess
import threading
import time
import os
from pathlib import Path
from typing import Dict, Optional
[docs]
logger = logging.getLogger(__name__)
# ---------------------------
# Exceptions
# ---------------------------
[docs]
class FVPError(RuntimeError):
"""Base exception for FVP related errors."""
[docs]
class FVPStartError(FVPError):
"""Raised when the FVP fails to start."""
[docs]
class FVPTimeout(FVPError):
"""
Raised when the FVP does not become ready within
the expected timeout.
"""
[docs]
class FVPStopped(FVPError):
"""Raised when the FVP process is unexpectedly stopped."""
# -------------------
# Local FVP platform
# -------------------
[docs]
class LocalFVP(Device):
"""Local FVP controller to start/stop/reset an FVP process using parameters
from YAML.
"""
def __init__(self, platform_config: dict, *args, **options) -> None:
"""
Construct a LocalFVP instance from a platform configuration.
:param platform_config: Parsed platform configuration from YAML.
:param args: Legacy positional arguments. If provided, the first
positional argument is ``TelnetSessionManager`` instance.
:param options: Optional keyword arguments.
:returns: ``None``
"""
self._init_required_fields(platform_config)
self._init_optional_and_telnet(platform_config, args, options)
self._init_runtime_state()
# init helpers
[docs]
def _init_required_fields(self, platform_config: dict) -> None:
"""
Populate fields derived directly from the platform configuration.
:param platform_config: Parsed platform configuration mapping.
:returns: ``None``
"""
self.binary = Path(
os.path.expanduser(os.path.expandvars(platform_config["binary"]))
).resolve()
raw_params = [
s.strip()
for s in platform_config.get("arguments", [])
if s and s.strip()
]
self.arguments = [p for p in raw_params if not p.startswith("--data ")]
self.data_args = [p for p in raw_params if p.startswith("--data ")]
self.env = dict(platform_config.get("env", {}))
self.workdir = (
Path(platform_config["workdir"]).expanduser().resolve()
if "workdir" in platform_config
else None
)
self.ready_regex = platform_config.get(
"ready_regex",
r"(?P<term>terminal_[a-z0-9_]+):.*?port\s+(?P<port>\d+)",
)
self.ready_timeout_s = int(platform_config.get("ready_timeout_s", 180))
[docs]
def _init_optional_and_telnet(
self, platform_config: dict, args: tuple, options: dict
) -> None:
"""
Resolve optional log settings and initialize the Telnet manager.
:param platform_config: Parsed platform configuration mapping.
:param args: Legacy positional args. If present, the first element
may be a ``TelnetSessionManager`` instance.
:param options: Keyword options that may include:
* **log_dir** (str, optional) -- Override log directory path.
* **log_prefix** (str, optional) -- Override log filename prefix.
* **telnet_log_dir** (str, optional) -- Directory for Telnet logs.
* **telnet_manager** (TelnetSessionManager, optional) -- Telnet
session manager instance. Defaults to a new instance.
:returns: ``None``
"""
log_dir = options.get("log_dir")
log_prefix = options.get("log_prefix")
chosen_dir = log_dir or str(
platform_config.get("log_dir", "./logs/fvp_logs")
)
self.log_dir = Path(chosen_dir).expanduser().resolve()
self.log_prefix = log_prefix or str(
platform_config.get("log_prefix", "fvp_boot")
)
telnet_manager = args[0] if args else options.get("telnet_manager")
self.telnet_manager = telnet_manager or TelnetSessionManager()
telnet_log_dir = options.get("telnet_log_dir")
if telnet_log_dir and hasattr(self.telnet_manager, "set_log_dir"):
self.telnet_manager.set_log_dir(
str(Path(telnet_log_dir).expanduser().resolve())
)
[docs]
def _init_runtime_state(self) -> None:
"""
Initialize mutable runtime state and atexit hook.
"""
self._proc: Optional[subprocess.Popen] = None
self._log_file: Optional[Path] = None
self._reader_thread: Optional[threading.Thread] = None
self._ready = threading.Event()
self._ports: Dict[str, int] = {}
self._stop_read = threading.Event()
atexit.register(self._atexit)
# FVP Lifecycle
[docs]
def init(self) -> None:
"""
Initialize any pre-provisioning steps, currently no-op.
"""
logger.debug("LocalFVP.init(), nothing to pre-provision")
[docs]
def start(self) -> None:
"""
Start the FVP process with the configured parameters.
:raises FVPStartError: If the FVP process fails to start.
"""
if self.is_running():
logger.info("FVPController.start(): already running")
return
cmd = self._build_cmd()
self._log_file = self._make_log_path()
logger.info("Starting FVP: %s", cmd)
try:
self._proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
universal_newlines=True,
bufsize=1,
shell=True,
start_new_session=True,
)
except Exception as e:
raise FVPStartError(f"Failed to start FVP: {e}") from e
# start reader
self._stop_read.clear()
self._reader_thread = threading.Thread(
target=self._read_loop, daemon=True
)
self._reader_thread.start()
[docs]
def _cleanup_threads_and_state(self) -> None:
"""
Join read thread, clear flags/state.
"""
self._stop_read.set()
if self._reader_thread:
self._reader_thread.join(timeout=2)
self._reader_thread = None
self._ready.clear()
self._ports.clear()
self._stop_read.clear()
logger.debug("LocalFVP: threads/state cleanup complete")
[docs]
def _signal(self, sig: int) -> None:
"""
Send a signal to the FVP process or its process group.
:param sig: Signal number (e.g. :data:`signal.SIGTERM`).
:returns: ``None``
"""
proc = self._proc
if not proc:
return
try:
try:
pgid = os.getpgid(proc.pid)
except Exception:
pgid = None
if pgid is not None:
os.killpg(pgid, sig)
return
if sig == signal.SIGTERM:
proc.send_signal(sig)
return
proc.kill()
except Exception as e:
logger.warning(
"Failed to send signal %s to process: %s",
sig,
e,
)
[docs]
def _terminate_proc(self) -> None:
"""
Terminate the FVP process normally, then forcefully if needed.
"""
if not self._proc:
return
try:
self._signal(signal.SIGTERM)
rc = self._proc.wait(timeout=10)
logger.debug("FVP exited with rc=%s after SIGTERM", rc)
except subprocess.TimeoutExpired:
logger.warning("SIGTERM timed out; sending SIGKILL")
self._signal(signal.SIGKILL)
try:
rc = self._proc.wait(timeout=5)
logger.debug("FVP killed; rc=%s", rc)
except Exception as e:
logger.error("Failed to kill FVP: %s", e)
finally:
self._proc = None
[docs]
def stop(self) -> None:
"""
Stop the FVP process and clean up telnet sessions.
Ensures the reader thread exits, the process is terminated,
and all telnet sessions are closed.
"""
self._stop_read.set()
self._terminate_proc()
self._cleanup_threads_and_state()
closer = getattr(self.telnet_manager, "stop_all", None) or getattr(
self.telnet_manager, "close_sessions", None
)
if closer:
try:
closer()
except Exception as e:
logger.debug(
"Ignoring telnet manager close error: %s",
e,
)
[docs]
def reset(self) -> None:
"""
Restart the FVP process so the device returns to a known-good state.
If it's running, stop then start; otherwise just start.
"""
logger.info("LocalFVP.reset(): resetting FVP process")
if self.is_running():
try:
self.stop()
finally:
time.sleep(20)
self.start()
self.wait_ready()
[docs]
def wait_ready(self, timeout_s: Optional[int] = None) -> None:
"""
Block until the FVP is ready or the timeout expires.
:param timeout_s: Timeout in seconds. Defaults to configured
`ready_timeout_s`.
:raises FVPTimeout: If the FVP is not ready within the timeout.
"""
timeout_s = timeout_s or self.ready_timeout_s
logger.info("Waiting for FVP ready (timeout=%ss)", timeout_s)
ok = self._ready.wait(timeout=timeout_s)
if not ok:
raise FVPTimeout(f"FVP did not become ready within {timeout_s}s")
try:
logger.debug("Starting Telnet sessions (ports=%s)", self._ports)
self.telnet_manager.start_telnet_sessions_after_fvp_ready(self)
except Exception as e:
logger.warning("Telnet manager failed: %s", e)
[docs]
def is_running(self) -> bool:
"""
Return True if the FVP process is currently running.
"""
return self._proc is not None and self._proc.poll() is None
[docs]
def get_ports(self) -> Dict[str, int]:
"""
Return mapping of terminal names to telnet ports.
"""
return dict(self._ports)
[docs]
def log_path(self) -> Path:
"""
Get the path to the current boot log file.
:raises FVPError: If start() has not been called yet.
"""
if not self._log_file:
raise FVPError("No log path yet – start() not called")
return self._log_file
# ---- helpers ----
[docs]
def _build_cmd(self) -> str:
"""
Construct the final command string for launching the FVP.
"""
bin_path = str(Path(self.binary).expanduser().resolve())
parts = [bin_path, *self.arguments, *self.data_args]
logger.debug("Built FVP cmd (%d parts): %r", len(parts), parts)
return " ".join(parts)
[docs]
def _make_log_path(self) -> Path:
ts = time.strftime("%Y%m%d_%H%M%S")
return self.log_dir / f"{self.log_prefix}_{ts}.log"
[docs]
def _process_stdout_line(
self, line: str, pattern: re.Pattern[str], first_port_seen: bool
) -> bool:
"""
Process one line of FVP stdout, update ports, and set readiness state.
:param line: A single line from the FVP process stdout.
:param pattern: Compiled regex to match terminal/port info.
:param first_port_seen: Whether a port was already discovered before.
:returns: True if this line contained the first discovered port,
otherwise the prior ``first_port_seen`` value.
"""
m = pattern.search(line)
if not m:
return first_port_seen
name = m.group("term").lower()
port = int(m.group("port"))
self._ports[name] = port
logger.debug("Discovered port: %s -> %s", name.lower(), port)
if not first_port_seen:
self._ready.set()
logger.debug("First port seen; ready event set")
return True
return first_port_seen
[docs]
def _read_loop(self) -> None:
"""
Read process output, write to boot log, and capture telnet port
mappings. Sets ready after the first port is discovered.
"""
proc = self._proc
stdout = getattr(proc, "stdout", None)
if stdout is None:
return
pattern = re.compile(self.ready_regex, re.IGNORECASE)
first_port_seen = False
log_path = self._log_file or self._make_log_path()
logger.debug("Reader writing boot log to: %s", log_path)
try:
with open(log_path, "w", encoding="utf-8") as logf:
for line in stdout:
if self._stop_read.is_set() or not line:
break
logf.write(line)
logf.flush()
first_port_seen = self._process_stdout_line(
line, pattern, first_port_seen
)
finally:
logger.info("FVP stdout closed")
[docs]
def _atexit(self) -> None:
"""Ensure the FVP process and sessions are stopped on exit."""
try:
self.stop()
except Exception:
pass