# SPDX-FileCopyrightText: <text>Copyright 2025-2026 Arm Limited
# and/or its affiliates <open-source-office@arm.com></text>
#
# SPDX-License-Identifier: MIT
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Optional, Sequence, Tuple
from test_automation.configs.config import Config
from test_automation.utils.device import Device
from test_automation.targets.fpga.autofpganetworking import (
AutoFPGANetworking,
SSHConfig,
)
[docs]
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
[docs]
class FPGAController(Device):
"""
FPGA controller implementing the Device interface.
This controller manages the lifecycle of a remote FPGA session over SSH.
It is responsible for connecting to the FPGA host, optionally uploading
required binaries, launching the configured remote command, monitoring
system readiness, handling automated UART login, and exposing runtime
state and logs to the test framework.
"""
def __init__(self, cfg: FPGAPlatformConfig) -> None:
"""
Initialize the FPGA controller.
:param cfg: Platform configuration describing the FPGA host, remote
boot command, timeouts, environment setup, and logging options.
:type cfg: FPGAPlatformConfig
"""
[docs]
self.net = AutoFPGANetworking(
SSHConfig(
host=cfg.host,
username=cfg.username,
password=cfg.password,
port=cfg.port,
connect_timeout_s=cfg.connect_timeout_s,
),
platform_name=cfg.platform_name,
log_dir=cfg.log_dir,
log_prefix=cfg.log_prefix,
)
[docs]
self._login_uart: Optional[Tuple[str, int, int]] = None
[docs]
def init(self) -> None:
"""
Establish an SSH connection to the FPGA host.
:returns: ``None``
:raises RuntimeError: If SSH connectivity/authentication fails.
"""
self.net.connect()
[docs]
def start(self) -> None:
"""
Start the FPGA boot workflow and login as root.
This method:
- Launches the command in the background on the remote host
- Waits for a login prompt on the UART
- Sends the root login
- Marks the controller as running
:returns: ``None``
:raises RuntimeError: If not connected or login UART cannot be
established.
:raises TimeoutError: If login prompt or shell prompt is not detected
within configured timeout.
"""
logger.info(
"Starting FPGA execution for platform %s host %s",
self.cfg.platform_name,
self.cfg.host,
)
# Copy binaries on remote and determine extracted binary dir
remote_workdir = None
if self.cfg.local_payload_path and self.cfg.remote_payload_path:
logger.info(
"Copying FPGA binaries from %s -> %s:%s",
self.cfg.local_payload_path,
self.cfg.host,
self.cfg.remote_payload_path,
)
remote_workdir = self.net.copy_payloads_to_remote(
local_archive=self.cfg.local_payload_path,
remote_base_dir=self.cfg.remote_payload_path,
)
logger.info(
"FPGA execution will use working directory: %s", remote_workdir
)
# Start remote command execution in background
self.net.start_remote_run(
env_setup_cmds=self.cfg.hpc_env_setup,
remote_cmd=self.cfg.remote_cmd,
remote_workdir=remote_workdir,
on_line=None,
)
# Wait for login prompt, send root
uart_log_path, vuart, pts = self.net.wait_login_and_send_root(
timeout_s=self.cfg.login_timeout_s
)
self._login_uart = (uart_log_path, vuart, pts)
self._running = True
logger.info(
"Login established on vuart=%d pts=%d (uart_log=%s)",
vuart,
pts,
uart_log_path,
)
[docs]
def stop(self) -> None:
"""
Stop the FPGA run and collect logs.
This method attempts to wait briefly for remote command to exit, then
downloads the current RUN_DIR logs into::
<CWD>/logs/<platform>_<timestamp>/remote_run_logs/
Finally, it disconnects the SSH session regardless of outcome.
:returns: ``None``
"""
try:
# If remote run exits quickly, you can optionally wait a little
# to let logs flush. If remote run is long-running, this returns
# None and we still download current RUN_DIR content.
exit_status = self.net.wait_remote_exit(timeout_s=5)
logger.debug(
"Remote command exit result during stop(): %s", exit_status
)
# Download RUN_DIR logs
if self.net.last_run_dir:
self.net.download_run_dir_logs()
finally:
self._running = False
self.net.disconnect()
[docs]
def reset(self) -> None:
"""
Reset the FPGA by stopping and restarting the controller.
This is equivalent to calling ``stop()``, ``init()``, and ``start()``
in sequence.
:returns: ``None``
"""
self.stop()
self.init()
self.start()
[docs]
def wait_ready(self, timeout_s: Optional[int] = None) -> None:
"""
Wait until the FPGA run directory is created.
This method blocks until remote command creates the RUN_DIR on the
remote host, indicating that the FPGA boot has progressed far enough to
be considered ready.
:param timeout_s: Maximum time to wait in seconds. If ``None``,
a default timeout is used.
:returns: ``None``
"""
run_dir = self.net.wait_for_run_dir(timeout_s=timeout_s or 180)
logger.debug(
"FPGA run directory is ready: %s",
run_dir,
)
[docs]
def is_running(self) -> bool:
"""
Check whether the FPGA controller is currently running.
:returns: ``True`` if the controller is running, ``False`` otherwise.
"""
return self._running
[docs]
def get_ports(self) -> Dict[str, int]:
"""
Return exposed service ports for the FPGA.
FPGA platforms do not currently expose network service ports.
:returns: An empty mapping.
"""
return {}
[docs]
def log_path(self) -> Path:
"""
Return the primary local boot log path.
:returns: Path to the boot log file, or a fallback logs directory
if unavailable.
"""
# Primary local boot log file
if self.net.boot_log_path:
return self.net.boot_log_path
# Fallback (should not normally happen)
return Path.cwd() / "logs"
[docs]
def get_run_dir(self) -> Optional[str]:
"""
Return the remote run directory.
:returns: Path to the RUN_DIR on the remote host, or ``None`` if
unavailable.
"""
return self.net.last_run_dir
[docs]
def get_login_uart(self) -> Tuple[str, int, int]:
"""
Return UART details for the active login session.
:returns: A tuple of ``(uart_log_path, vuart, pts)``.
:raises RuntimeError: If the login UART has not been established.
"""
if not self._login_uart:
raise RuntimeError("Login UART not established yet")
return self._login_uart
[docs]
def run_sample_command(self, command: str) -> None:
"""
Execute a command on the logged-in UART session.
:param command: Shell command to execute.
:returns: ``None``
"""
_uart_log_path, vuart, pts = self.get_login_uart()
self.net.run_uart_command(vuart=vuart, pts=pts, cmd=command)
[docs]
def load_fpga_controller_from_yaml(
yaml_path: Path,
platform_name: str,
host: Optional[str] = None,
) -> FPGAController:
"""
Create an FPGAController instance from a YAML configuration file.
:param yaml_path: Path to the YAML configuration file.
:param platform_name: Name of the platform entry to load.
:param host: Optional override host name/IP. If not provided, the YAML
platform object's ``host`` attribute is used.
:returns: Constructed FPGAController instance.
:raises ValueError: If host cannot be resolved.
"""
cfg_loader = Config(str(yaml_path))
plat = cfg_loader.get_platform_object(platform_name)
resolved_host = host or getattr(plat, "host", None)
if not resolved_host:
raise ValueError(
f"FPGA platform '{platform_name}' requires '--host <hostname>'."
)
remote_cmd_list = getattr(plat, "boot_cmd", [])
if isinstance(remote_cmd_list, list):
remote_cmd = remote_cmd_list[0]
else:
remote_cmd = str(remote_cmd_list)
cfg = FPGAPlatformConfig(
platform_name=platform_name,
host=resolved_host,
username=plat.username,
password=getattr(plat, "password", None),
port=int(getattr(plat, "port", 22)),
connect_timeout_s=int(getattr(plat, "connect_timeout_s", 20)),
ready_timeout_s=int(getattr(plat, "ready_timeout_s", 1200)),
login_timeout_s=int(getattr(plat, "login_timeout_s", 900)),
hpc_env_setup=list(getattr(plat, "hpc_env_setup", [])),
remote_cmd=remote_cmd,
remote_payload_path=getattr(plat, "remote_payload_path", None),
local_payload_path=getattr(plat, "local_payload_path", None),
log_dir=getattr(plat, "log_dir", None),
log_prefix=getattr(plat, "log_prefix", None),
)
return FPGAController(cfg)