Source code for test_automation.targets.fpga.fpga_controller

# SPDX-FileCopyrightText: <text>Copyright 2025-2026 Arm Limited
# and/or its affiliates <open-source-office@arm.com></text>
#
# SPDX-License-Identifier: MIT

from __future__ import annotations

import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Optional, Sequence, Tuple

from test_automation.configs.config import Config
from test_automation.utils.device import Device
from test_automation.targets.fpga.autofpganetworking import (
    AutoFPGANetworking,
    SSHConfig,
)

[docs] logger = logging.getLogger(__name__)
@dataclass(frozen=True)
[docs] class FPGAPlatformConfig: """ Structured configuration for an FPGA platform session. :param platform_name: Logical platform name used for log directory naming. :param host: Remote hostname or IP address of the FPGA host. :param username: SSH username. :param password: Optional SSH password for password or interactive auth. :param port: SSH port. :param connect_timeout_s: SSH connection/auth timeout in seconds. :param ready_timeout_s: Timeout (seconds) for readiness checks. :param login_timeout_s: Timeout (seconds) for UART login detection and establishing a shell prompt. :param hpc_env_setup: Sequence of shell commands used to set up environment before launching the remote boot command. :param remote_cmd: Remote command used to start FPGA boot/run. :param remote_payload_path: Optional remote path to upload/extract a payload archive into. :param local_payload_path: Optional local archive path to upload to the remote host. :param log_dir: Optional override base directory for logs. :param log_prefix: Optional override prefix for log directory naming. """
[docs] platform_name: str
[docs] host: str
[docs] username: str
[docs] password: Optional[str]
[docs] port: int
[docs] connect_timeout_s: int
[docs] ready_timeout_s: int
[docs] login_timeout_s: int
[docs] hpc_env_setup: Sequence[str]
[docs] remote_cmd: str
[docs] remote_payload_path: Optional[str] = None
[docs] local_payload_path: Optional[str] = None
[docs] log_dir: Optional[str] = None
[docs] log_prefix: Optional[str] = None
[docs] class FPGAController(Device): """ FPGA controller implementing the Device interface. This controller manages the lifecycle of a remote FPGA session over SSH. It is responsible for connecting to the FPGA host, optionally uploading required binaries, launching the configured remote command, monitoring system readiness, handling automated UART login, and exposing runtime state and logs to the test framework. """ def __init__(self, cfg: FPGAPlatformConfig) -> None: """ Initialize the FPGA controller. :param cfg: Platform configuration describing the FPGA host, remote boot command, timeouts, environment setup, and logging options. :type cfg: FPGAPlatformConfig """
[docs] self.cfg = cfg
[docs] self.net = AutoFPGANetworking( SSHConfig( host=cfg.host, username=cfg.username, password=cfg.password, port=cfg.port, connect_timeout_s=cfg.connect_timeout_s, ), platform_name=cfg.platform_name, log_dir=cfg.log_dir, log_prefix=cfg.log_prefix, )
[docs] self._running = False
[docs] self._login_uart: Optional[Tuple[str, int, int]] = None
[docs] def init(self) -> None: """ Establish an SSH connection to the FPGA host. :returns: ``None`` :raises RuntimeError: If SSH connectivity/authentication fails. """ self.net.connect()
[docs] def start(self) -> None: """ Start the FPGA boot workflow and login as root. This method: - Launches the command in the background on the remote host - Waits for a login prompt on the UART - Sends the root login - Marks the controller as running :returns: ``None`` :raises RuntimeError: If not connected or login UART cannot be established. :raises TimeoutError: If login prompt or shell prompt is not detected within configured timeout. """ logger.info( "Starting FPGA execution for platform %s host %s", self.cfg.platform_name, self.cfg.host, ) # Copy binaries on remote and determine extracted binary dir remote_workdir = None if self.cfg.local_payload_path and self.cfg.remote_payload_path: logger.info( "Copying FPGA binaries from %s -> %s:%s", self.cfg.local_payload_path, self.cfg.host, self.cfg.remote_payload_path, ) remote_workdir = self.net.copy_payloads_to_remote( local_archive=self.cfg.local_payload_path, remote_base_dir=self.cfg.remote_payload_path, ) logger.info( "FPGA execution will use working directory: %s", remote_workdir ) # Start remote command execution in background self.net.start_remote_run( env_setup_cmds=self.cfg.hpc_env_setup, remote_cmd=self.cfg.remote_cmd, remote_workdir=remote_workdir, on_line=None, ) # Wait for login prompt, send root uart_log_path, vuart, pts = self.net.wait_login_and_send_root( timeout_s=self.cfg.login_timeout_s ) self._login_uart = (uart_log_path, vuart, pts) self._running = True logger.info( "Login established on vuart=%d pts=%d (uart_log=%s)", vuart, pts, uart_log_path, )
[docs] def stop(self) -> None: """ Stop the FPGA run and collect logs. This method attempts to wait briefly for remote command to exit, then downloads the current RUN_DIR logs into:: <CWD>/logs/<platform>_<timestamp>/remote_run_logs/ Finally, it disconnects the SSH session regardless of outcome. :returns: ``None`` """ try: # If remote run exits quickly, you can optionally wait a little # to let logs flush. If remote run is long-running, this returns # None and we still download current RUN_DIR content. exit_status = self.net.wait_remote_exit(timeout_s=5) logger.debug( "Remote command exit result during stop(): %s", exit_status ) # Download RUN_DIR logs if self.net.last_run_dir: self.net.download_run_dir_logs() finally: self._running = False self.net.disconnect()
[docs] def reset(self) -> None: """ Reset the FPGA by stopping and restarting the controller. This is equivalent to calling ``stop()``, ``init()``, and ``start()`` in sequence. :returns: ``None`` """ self.stop() self.init() self.start()
[docs] def wait_ready(self, timeout_s: Optional[int] = None) -> None: """ Wait until the FPGA run directory is created. This method blocks until remote command creates the RUN_DIR on the remote host, indicating that the FPGA boot has progressed far enough to be considered ready. :param timeout_s: Maximum time to wait in seconds. If ``None``, a default timeout is used. :returns: ``None`` """ run_dir = self.net.wait_for_run_dir(timeout_s=timeout_s or 180) logger.debug( "FPGA run directory is ready: %s", run_dir, )
[docs] def is_running(self) -> bool: """ Check whether the FPGA controller is currently running. :returns: ``True`` if the controller is running, ``False`` otherwise. """ return self._running
[docs] def get_ports(self) -> Dict[str, int]: """ Return exposed service ports for the FPGA. FPGA platforms do not currently expose network service ports. :returns: An empty mapping. """ return {}
[docs] def log_path(self) -> Path: """ Return the primary local boot log path. :returns: Path to the boot log file, or a fallback logs directory if unavailable. """ # Primary local boot log file if self.net.boot_log_path: return self.net.boot_log_path # Fallback (should not normally happen) return Path.cwd() / "logs"
[docs] def get_run_dir(self) -> Optional[str]: """ Return the remote run directory. :returns: Path to the RUN_DIR on the remote host, or ``None`` if unavailable. """ return self.net.last_run_dir
[docs] def get_login_uart(self) -> Tuple[str, int, int]: """ Return UART details for the active login session. :returns: A tuple of ``(uart_log_path, vuart, pts)``. :raises RuntimeError: If the login UART has not been established. """ if not self._login_uart: raise RuntimeError("Login UART not established yet") return self._login_uart
[docs] def run_sample_command(self, command: str) -> None: """ Execute a command on the logged-in UART session. :param command: Shell command to execute. :returns: ``None`` """ _uart_log_path, vuart, pts = self.get_login_uart() self.net.run_uart_command(vuart=vuart, pts=pts, cmd=command)
[docs] def load_fpga_controller_from_yaml( yaml_path: Path, platform_name: str, host: Optional[str] = None, ) -> FPGAController: """ Create an FPGAController instance from a YAML configuration file. :param yaml_path: Path to the YAML configuration file. :param platform_name: Name of the platform entry to load. :param host: Optional override host name/IP. If not provided, the YAML platform object's ``host`` attribute is used. :returns: Constructed FPGAController instance. :raises ValueError: If host cannot be resolved. """ cfg_loader = Config(str(yaml_path)) plat = cfg_loader.get_platform_object(platform_name) resolved_host = host or getattr(plat, "host", None) if not resolved_host: raise ValueError( f"FPGA platform '{platform_name}' requires '--host <hostname>'." ) remote_cmd_list = getattr(plat, "boot_cmd", []) if isinstance(remote_cmd_list, list): remote_cmd = remote_cmd_list[0] else: remote_cmd = str(remote_cmd_list) cfg = FPGAPlatformConfig( platform_name=platform_name, host=resolved_host, username=plat.username, password=getattr(plat, "password", None), port=int(getattr(plat, "port", 22)), connect_timeout_s=int(getattr(plat, "connect_timeout_s", 20)), ready_timeout_s=int(getattr(plat, "ready_timeout_s", 1200)), login_timeout_s=int(getattr(plat, "login_timeout_s", 900)), hpc_env_setup=list(getattr(plat, "hpc_env_setup", [])), remote_cmd=remote_cmd, remote_payload_path=getattr(plat, "remote_payload_path", None), local_payload_path=getattr(plat, "local_payload_path", None), log_dir=getattr(plat, "log_dir", None), log_prefix=getattr(plat, "log_prefix", None), ) return FPGAController(cfg)