# SPDX-FileCopyrightText: <text>Copyright 2025-2026 Arm Limited
# and/or its affiliates <open-source-office@arm.com></text>
#
# SPDX-License-Identifier: MIT
"""
Auto FPGA networking manager.
This module provides an SSH based session manager used to boot and interact
with an FPGA target.
"""
from __future__ import annotations
import logging
import re
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, List, Optional, Sequence, Tuple
import paramiko
from test_automation.utils.networking_base import BaseNetworkManager
from test_automation.targets.fpga.fpga_runtime_options import (
LogPathOptions,
LoginWaitOptions,
RemoteRunSpec,
ShellPromptWaitOptions,
)
[docs]
logger = logging.getLogger(__name__)
[docs]
RUN_DIR_RE = re.compile(r"RUN_DIR\s*:\s*(/tmp/[A-Za-z0-9._-]+)")
[docs]
_UART_LOG_NAME_RE = re.compile(r"UART_log_vuart_(\d+)__dev_pts_(\d+)_\.txt$")
[docs]
_PROMPT_RE = re.compile(r"root@[^:]+:.*#\s*$", re.MULTILINE)
[docs]
_LOGIN_MARKER = "login:"
[docs]
def _read_all(stdout) -> str:
"""
Read and decode all data from a Paramiko stdout-like stream.
:param stdout: Stream object returned by Paramiko exec_command.
:returns: Decoded text (UTF-8 with replacement on decode errors).
"""
return stdout.read().decode("utf-8", errors="replace")
@dataclass(frozen=True)
[docs]
class SSHConfig:
"""
SSH connection settings for the FPGA host.
:param host: Remote hostname or IP.
:param username: SSH username.
:param password: Optional password used for password or interactive auth.
:param port: SSH port.
:param connect_timeout_s: Connection/auth timeout in seconds.
"""
[docs]
password: Optional[str] = None
[docs]
connect_timeout_s: int = 20
@dataclass
[docs]
class _OutputTarget:
"""
Destination for execution output.
:param log_file: local boot log file handle
:param on_line: optional callback invoked per output record
"""
[docs]
on_line: Optional[Callable[[str], None]] = None
@dataclass
[docs]
class _OutputBuffer:
"""
Accumulates execution output text between recv() calls
until complete log records can be flushed.
"""
[docs]
class AutoFPGANetworking(BaseNetworkManager):
"""
SSH based networking manager for FPGA targets.
Provides remote session management, command execution, log streaming,
and console interaction utilities required to control and monitor
an FPGA platform during automated testing.
"""
[docs]
def _resolve_log_paths(self, kwargs) -> LogPathOptions:
"""
Resolve LogPathOptions from kwargs (log_paths) or overrides legacy.
:param kwargs: Keyword args passed to the constructor.
:returns: Structured log path configuration.
"""
local = dict(kwargs)
log_paths: Optional[LogPathOptions] = local.pop("log_paths", None)
if log_paths is not None:
return log_paths
return LogPathOptions(
log_dir=kwargs.pop("log_dir", None),
log_prefix=kwargs.pop("log_prefix", None),
)
def __init__(self, cfg: SSHConfig, *args, **kwargs) -> None:
"""
Initialize the networking manager.
:param cfg: SSH configuration for the FPGA host.
:param args: Optional positional arguments. If provided, the first
positional argument is interpreted as ``platform_name``.
:param kwargs: Optional keyword arguments supporting backward-
compatible construction.
Supported keyword arguments:
:param platform_name: Name of the platform used for log directory
naming.
:param log_dir: Optional base directory for log output.
:param log_prefix: Optional prefix for the generated log directory.
:param log_paths: Optional :class:`LogPathOptions` object to override
log directory configuration.
"""
log_paths = self._resolve_log_paths(kwargs)
[docs]
self._override_log_dir = (
Path(log_paths.log_dir).expanduser() if log_paths.log_dir else None
)
[docs]
self._override_log_prefix = (
str(log_paths.log_prefix) if log_paths.log_prefix else None
)
[docs]
self._client: Optional[paramiko.SSHClient] = None
[docs]
self._connected = False
[docs]
self._last_run_dir: Optional[str] = None
[docs]
self._run_dir_lock = threading.Lock()
[docs]
self._run_dir_event = threading.Event()
[docs]
self._run_ts: Optional[str] = None
[docs]
self._run_local_dir: Optional[Path] = None
[docs]
self._remote_run_logs_local_dir: Optional[Path] = None
[docs]
self._boot_log_local: Optional[Path] = None
[docs]
self._run_thread: Optional[threading.Thread] = None
[docs]
self._run_exit_status: Optional[int] = None
[docs]
self._run_chan: Optional[paramiko.Channel] = None
# --------------
# FPGA lifecycle
# --------------
[docs]
def _initialize_ssh_client(self) -> paramiko.SSHClient:
"""
Create and configure a Paramiko SSH client.
:returns: Configured SSH client instance.
:rtype: paramiko.SSHClient
"""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
return client
[docs]
def _authenticate_with_password(self, client: paramiko.SSHClient) -> None:
"""
Attempt SSH authentication using password-based authentication.
:param client: Initialized SSH client.
:raises paramiko.AuthenticationException: If password auth fails.
:raises Exception: For other SSH transport/connection errors.
"""
client.connect(
hostname=self.cfg.host,
port=self.cfg.port,
username=self.cfg.username,
password=self.cfg.password,
timeout=self.cfg.connect_timeout_s,
banner_timeout=self.cfg.connect_timeout_s,
auth_timeout=self.cfg.connect_timeout_s,
look_for_keys=True,
allow_agent=True,
)
self._client = client
self._connected = True
[docs]
def _handle_interactive_prompts(self, _title, _instructions, prompt_list):
"""
Paramiko keyboard-interactive callback.
For prompts containing the word "password" (case-insensitive), the
configured password is returned; otherwise an empty response is used.
:param _title: Prompt title (unused).
:param _instructions: Prompt instructions (unused).
:param prompt_list: List of (prompt, echo) tuples.
:returns: List of response strings matching ``prompt_list`` length
"""
replies = []
for prompt, _echo in prompt_list:
if "password" in prompt.lower():
replies.append(self.cfg.password or "")
else:
replies.append("")
return replies
[docs]
def _authenticate_with_interactive(
self, client: paramiko.SSHClient
) -> None:
"""
Attempt SSH authentication using keyboard-interactive authentication.
:param client: Initialized SSH client.
:type client: paramiko.SSHClient
:raises RuntimeError: If the SSH transport cannot be obtained.
"""
transport = client.get_transport()
if not transport:
raise RuntimeError("SSH transport not available")
transport.auth_interactive(
self.cfg.username, self._handle_interactive_prompts
)
self._client = client
self._connected = True
[docs]
def connect(self) -> None:
"""
Establish an SSH connection to the FPGA host.
The method first attempts standard password authentication and falls
back to keyboard based authentication when required by the server.
:raises RuntimeError: If SSH connectivity or authentication fails.
"""
client = self._initialize_ssh_client()
logger.info(
"SSH connection begins: %s@%s:%d",
self.cfg.username,
self.cfg.host,
self.cfg.port,
)
try:
self._authenticate_with_password(client)
except paramiko.AuthenticationException:
self._authenticate_with_interactive(client)
except Exception as exc:
raise RuntimeError(
f"SSH connectivity check failed for "
f"{self.cfg.username}@{self.cfg.host}: {exc}"
) from exc
logger.debug("SSH connection established successfully.")
[docs]
def disconnect(self) -> None:
"""
Close SSH connection and clear internal state.
This closes the Paramiko client (if present) and resets the connected
flag.
"""
if not self._client:
self._connected = False
return
try:
self._client.close()
except Exception as exc:
logger.exception("Failed to close SSH client: %s", exc)
self._connected = False
return
self._client = None
self._connected = False
[docs]
def _ensure_ssh_client(self) -> None:
"""
Ensure the SSH client is connected.
:raises RuntimeError: If the SSH client is not connected.
"""
if not self._client or not self._connected:
raise RuntimeError(
"SSH client not connected; call connect() first"
)
# ---------------------------------------------------------------------
# Upload/extract binaries
# ---------------------------------------------------------------------
[docs]
def _upload_binary(self, src: Path, remote_archive: str) -> None:
"""
Upload an archive to the remote host using SFTP.
:param src: Local archive path.
:param remote_archive: Destination path on the remote host.
:raises RuntimeError: If SSH is not connected.
"""
self._ensure_ssh_client()
sftp = self._client.open_sftp()
try:
sftp.put(str(src), remote_archive)
finally:
try:
sftp.close()
except Exception:
pass
[docs]
def _detect_fpga_workdir(self, remote_base_dir: str) -> str:
"""
Detect a top-level directory created by extracting the archive and
lists directories under ``remote_base_dir`` and uses the most
recently modified entry as a candidate work directory. If no directory
is found, the base directory is returned.
:param remote_base_dir: Remote extraction base directory.
:returns: Detected work directory path.
:raises RuntimeError: If SSH is not connected.
"""
self._ensure_ssh_client()
detect_cmd = (
f"cd {remote_base_dir} && ls -1dt */ 2>/dev/null | head -n 1"
)
_, out2, _ = self._client.exec_command(
"bash -lc " + self._shell_escape(detect_cmd)
)
extracted = (
out2.read().decode("utf-8", errors="replace").strip().rstrip("/")
)
if extracted:
workdir = f"{remote_base_dir}/{extracted}"
else:
logger.warning(
"Archive did not create a top-level directory; "
"returning base dir %s as workdir",
remote_base_dir,
)
workdir = remote_base_dir
ls_cmd = f"ls -la {workdir} 2>/dev/null || true"
_, out3, _ = self._client.exec_command(
"bash -lc " + self._shell_escape(ls_cmd)
)
listing = out3.read().decode("utf-8", errors="replace")
logger.debug(
"Listing the files in workdir (%s):\n%s", workdir, listing
)
return workdir
[docs]
def copy_payloads_to_remote(
self, local_archive: str, remote_base_dir: str
) -> str:
"""
Upload an archive to the remote host, extract it, and return workdir.
:param local_archive: Local path to an archive containing FPGA binary.
:param remote_base_dir: Remote directory where the archive will be
uploaded and extracted.
:returns: The top-level extracted directory if one is detected,
otherwise ``remote_base_dir``.
:raises RuntimeError: If SSH is not connected.
:raises FileNotFoundError: If ``local_archive`` does not exist.
:raises RuntimeError: If extraction fails.
"""
self._ensure_ssh_client()
src = Path(local_archive)
if not src.is_file():
raise FileNotFoundError(
f"Local payload not found: {local_archive}"
)
self._client.exec_command(
"bash -lc " + self._shell_escape(f"mkdir -p {remote_base_dir}")
)
remote_archive = f"{remote_base_dir}/{src.name}"
self._upload_binary(src, remote_archive)
logger.info("Uploaded fpga binaries to remote: %s", remote_archive)
extract_cmd = self._build_extract_cmd(src.name, remote_base_dir)
_, out, err = self._client.exec_command(
"bash -lc " + self._shell_escape(extract_cmd)
)
rc = out.channel.recv_exit_status()
if rc != 0:
stderr = err.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Extraction failed (rc={rc}): {stderr}")
return self._detect_fpga_workdir(remote_base_dir)
# ---------------------------------------------------------------------
# Local log layout
# ---------------------------------------------------------------------
[docs]
def _ensure_local_run_dirs(self) -> None:
"""
Ensure local directories exist for boot logs and downloaded RUN_DIR
logs.
Creates:
- <CWD>/logs/<platform>_<ts>/
- <CWD>/logs/<platform>_<ts>/remote_run_logs/
- <CWD>/logs/<platform>_<ts>/<platform>_boot_<ts>.log
"""
if (
self._run_local_dir
and self._remote_run_logs_local_dir
and self._boot_log_local
):
return
ts = time.strftime("%Y%m%d_%H%M%S")
if self._override_log_dir:
prefix = self._override_log_prefix or self.platform_name
base = self._override_log_dir / f"{prefix}_{ts}"
else:
base = Path.cwd() / "logs" / f"{self.platform_name}_{ts}"
remote_logs = base / "remote_run_logs"
base.mkdir(parents=True, exist_ok=True)
remote_logs.mkdir(parents=True, exist_ok=True)
boot_log = base / f"{self.platform_name}_boot_{ts}.log"
self._run_ts = ts
self._run_local_dir = base
self._remote_run_logs_local_dir = remote_logs
self._boot_log_local = boot_log
logger.info("Local logs directory: %s", base)
logger.info("Local boot log file: %s", boot_log)
@property
[docs]
def boot_log_path(self) -> Optional[Path]:
"""
Return the local boot log file path.
:returns: Path to the local boot log file, if initialized.
"""
return self._boot_log_local
@property
[docs]
def local_run_dir(self) -> Optional[Path]:
"""
Return the local run directory created for this execution.
:returns: Path to the local run directory, if initialized.
"""
return self._run_local_dir
@property
[docs]
def local_remote_run_logs_dir(self) -> Optional[Path]:
"""
Return the local directory where remote RUN_DIR logs are downloaded.
:returns: Path to the local remote logs directory, if initialized.
"""
return self._remote_run_logs_local_dir
@property
[docs]
def last_run_dir(self) -> Optional[str]:
"""
Return the most recently detected remote ``RUN_DIR``.
:returns: Remote RUN_DIR string, if detected.
"""
return self._last_run_dir
# ---------------------------------------------------------------------
# Remote run: compose command, open PTY session, stream output
# ---------------------------------------------------------------------
[docs]
def _create_full_cmd(
self,
env_setup_cmds: Sequence[str],
remote_cmd: str,
remote_workdir: Optional[str],
) -> str:
"""
Compose the full remote command string.
:param env_setup_cmds: Sequence of shell commands to prepare the
environment (e.g. ``source ...``).
:param remote_cmd: Main command to execute on the remote host.
:param remote_workdir: Optional remote working directory to ``cd`` into
before executing ``remote_cmd``.
:returns: Fully composed command string suitable for Paramiko exec.
"""
parts = list(env_setup_cmds)
if remote_workdir:
parts.append(f"cd {self._shell_escape(remote_workdir)}")
parts.append(remote_cmd)
remote = "set -euo pipefail; " + "; ".join(parts)
return f"bash -lc {self._shell_escape(remote)}"
[docs]
def _open_exec_channel(self, full_cmd: str) -> paramiko.Channel:
"""
Open a PTY-enabled Paramiko session channel and execute ``full_cmd``.
:param full_cmd: Command string returned by :meth:`_create_full_cmd`.
:returns: Paramiko channel used to stream output and obtain exit value.
:raises RuntimeError: If SSH is not connected or transport is missing.
"""
self._ensure_ssh_client()
transport = self._client.get_transport()
if not transport:
raise RuntimeError("Paramiko transport not available")
exec_handle = transport.open_session()
exec_handle.get_pty(term="xterm", width=200, height=60)
exec_handle.set_combine_stderr(True)
exec_handle.exec_command(full_cmd)
return exec_handle
[docs]
def _write_and_process_line(
self, text: str, log_file, on_line: Optional[Callable[[str], None]]
) -> None:
"""
Write output text to the boot log, invoke callback, and detect RUN_DIR.
:param text: Text chunk/record to write.
:type text: str
:param log_file: Open file handle to write to.
:param on_line: Optional callback invoked with the written text.
"""
log_file.write(text)
log_file.flush()
if on_line:
on_line(text)
m = RUN_DIR_RE.search(text)
if m:
with self._run_dir_lock:
self._last_run_dir = m.group(1)
logger.info("RUN_DIR detected: %s", self._last_run_dir)
self._run_dir_event.set()
[docs]
def _flush_complete_records(
self, sink: _OutputTarget, buffer: _OutputBuffer
) -> None:
"""
Flush complete newline-terminated records from the output buffer.
:param sink: Output sink configuration.
:param buffer: Output buffer to flush.
"""
while "\n" in buffer.text:
record, buffer.text = buffer.text.split("\n", 1)
self._write_and_process_line(
record + "\n", sink.log_file, sink.on_line
)
[docs]
def _process_output_log(
self, chunk: bytes, sink: _OutputTarget, buffer: _OutputBuffer
) -> None:
"""
Decode a received byte chunk and append it to the output buffer.
:param chunk: Bytes read from Paramiko channel.
:param sink: Output sink configuration.
:param buffer: Output buffer to append to.
"""
decoded = chunk.decode("utf-8", errors="replace")
buffer.text += decoded
self._flush_complete_records(sink, buffer)
[docs]
def _finalize_run_output(
self,
exec_stream: paramiko.Channel,
sink: _OutputTarget,
buffer: _OutputBuffer,
) -> None:
"""
Drain remaining output after process termination and record exit
status.
:param exec_stream: Paramiko channel.
:param sink: Output sink configuration.
:param buffer: Output buffer.
"""
while exec_stream.recv_ready():
chunk_text = exec_stream.recv(4096).decode(
"utf-8", errors="replace"
)
if not chunk_text:
break
buffer.text += chunk_text
self._flush_complete_records(sink, buffer)
if buffer.text:
self._write_and_process_line(
buffer.text, sink.log_file, sink.on_line
)
self._run_exit_status = int(exec_stream.recv_exit_status())
logger.info("Remote command exited with rc=%d", self._run_exit_status)
[docs]
def _read_available_output_once(
self,
exec_handle: paramiko.Channel,
sink: _OutputTarget,
buffer: _OutputBuffer,
) -> None:
"""
Read and process available output once (non-blocking).
:param exec_handle: Paramiko channel.
:param sink: Output sink configuration.
:param buffer: Output buffer.
"""
if not exec_handle.recv_ready():
return
chunk = exec_handle.recv(4096)
if not chunk:
return
self._process_output_log(chunk, sink, buffer)
[docs]
def _monitor_execution_output(
self,
*,
exec_handle: paramiko.Channel,
on_line: Optional[Callable[[str], None]],
) -> None:
"""
Monitor output from a Paramiko channel and stream to boot log.
This method runs until the remote command exits and writes all received
output into the local boot log file.
:param exec_handle: Paramiko channel that is executing remote command.
:param on_line: Optional callback invoked for each flushed log record.
"""
assert self._boot_log_local is not None
with self._boot_log_local.open("w", encoding="utf-8") as log_file:
sink = _OutputTarget(log_file=log_file, on_line=on_line)
buffer = _OutputBuffer(text="")
while True:
self._read_available_output_once(exec_handle, sink, buffer)
if exec_handle.exit_status_ready():
self._finalize_run_output(exec_handle, sink, buffer)
break
time.sleep(0.1)
[docs]
def start_remote_run(
self,
run_spec: Optional[RemoteRunSpec] = None,
on_line: Optional[Callable[[str], None]] = None,
**kwargs,
) -> None:
"""
Start the remote run command in a background thread and stream output
to the local boot log.
:param run_spec: Optional structured run specification.
If not provided, legacy keyword arguments are used to construct it.
:param on_line: Optional callback invoked for each flushed output.
:param kwargs: Backward-compatible keyword arguments.
:raises RuntimeError: If :meth:`connect` was not called first.
"""
if run_spec is None:
run_spec = RemoteRunSpec(
env_setup_cmds=kwargs.pop("env_setup_cmds"),
remote_cmd=kwargs.pop("remote_cmd"),
remote_workdir=kwargs.pop("remote_workdir", None),
)
if on_line is None:
on_line = kwargs.pop("on_line", None)
if not self._connected or not self._client:
raise RuntimeError("connect() must be called first")
self._ensure_local_run_dirs()
assert self._boot_log_local is not None
full_cmd = self._create_full_cmd(
run_spec.env_setup_cmds,
run_spec.remote_cmd,
run_spec.remote_workdir,
)
exec_handle = self._open_exec_channel(full_cmd)
self._run_chan = exec_handle
self._run_exit_status = None
t = threading.Thread(
target=self._monitor_execution_output,
kwargs={"exec_handle": exec_handle, "on_line": on_line},
name="boot-log-reader",
daemon=True,
)
self._run_thread = t
t.start()
[docs]
def wait_for_run_dir(self, timeout_s: int = 180) -> str:
"""
Wait until ``RUN_DIR`` is detected from the remote output stream.
:param timeout_s: Maximum time to wait in seconds.
:returns: Detected RUN_DIR path.
:raises TimeoutError: If RUN_DIR is not detected within timeout.
"""
if self._last_run_dir:
return self._last_run_dir
if not self._run_dir_event.wait(timeout=timeout_s):
raise TimeoutError(f"RUN_DIR not detected within {timeout_s}s")
assert self._last_run_dir
return self._last_run_dir
[docs]
def wait_remote_exit(
self, timeout_s: Optional[int] = None
) -> Optional[int]:
"""
Wait for the remote run thread to complete and return exit status.
:param timeout_s: Optional timeout for joining the run thread.
:returns: Exit status if available, otherwise ``None``.
"""
t = self._run_thread
if t:
t.join(timeout=timeout_s)
return self._run_exit_status
# ---------------------------------------------------------------------
# UART helpers
# ---------------------------------------------------------------------
[docs]
def find_uart_login(
self, run_dir: str, *, tail_lines: int = 120
) -> Optional[Tuple[int, int, str]]:
"""
Search UART logs under the given RUN_DIR for a login prompt.
This method scans all ``UART_log_vuart_*`` files located under
``run_dir`` and inspects the last ``tail_lines`` lines of each log
for the presence of a login marker (e.g., ``"login:"``). If a match
is found, the corresponding VUART index, PTS number, and log path
are returned.
:param run_dir: Remote RUN_DIR path containing UART log files.
:type run_dir: str
:param tail_lines: Number of lines from the end of each UART log
to inspect. Defaults to 120.
:type tail_lines: int
:returns: Tuple ``(vuart, pts, uart_log_path)`` if a login prompt
is detected; otherwise ``None``.
:rtype: Optional[Tuple[int, int, str]]
:raises RuntimeError: If the SSH client is not connected.
"""
self._ensure_ssh_client()
cmd = (
"bash -lc 'ls -1 "
f"{self._shell_escape(run_dir)}/UART_log_vuart_* "
"2>/dev/null || true'"
)
_, out, _ = self._client.exec_command(cmd)
logs = [
line.strip()
for line in _read_all(out).splitlines()
if line.strip()
]
if not logs:
return None
for log_path in logs:
base = log_path.split("/")[-1]
m = _UART_LOG_NAME_RE.search(base)
if not m:
continue
vuart = int(m.group(1))
pts = int(m.group(2))
tail_cmd = (
f"tail -n {tail_lines} {self._shell_escape(log_path)} "
"2>/dev/null || true"
)
cmd_tail = "bash -lc " + self._shell_escape(tail_cmd)
_, out2, _ = self._client.exec_command(cmd_tail)
text = _read_all(out2)
if _LOGIN_MARKER in text:
return (vuart, pts, log_path)
return None
[docs]
def send_to_vuart_link_sftp(
self, run_dir: str, payload: str, **kwargs
) -> None:
"""
Write payload to ``RUN_DIR/vuart_<vuart>__dev_pts_<pts>`` using SFTP.
This helper writes directly to the VUART link to avoid shell quoting
issues.
:param run_dir: Remote RUN_DIR path.
:param payload: Text to write to the VUART link.
:param kwargs: Backward-compatible keyword args.
:raises TypeError: If ``vuart`` or ``pts`` are missing.
:raises RuntimeError: If SSH is not connected.
"""
vuart = kwargs.pop("vuart", None)
pts = kwargs.pop("pts", None)
if vuart is None or pts is None:
raise TypeError("send_to_vuart_link_sftp requires vuart and pts")
self._ensure_ssh_client()
link = f"{run_dir}/vuart_{int(vuart)}__dev_pts_{int(pts)}"
payload = payload.rstrip("\r\n") + "\r\n"
sftp = self._client.open_sftp()
try:
with sftp.file(link, "ab") as log_file:
log_file.write(payload.encode("utf-8"))
log_file.flush()
finally:
try:
sftp.close()
except Exception:
pass
[docs]
def wait_for_shell_prompt(
self,
uart_log_path: str,
options: Optional[ShellPromptWaitOptions] = None,
**kwargs,
) -> None:
"""
Wait until a root shell prompt is visible in the specified UART log.
This method periodically tails the UART log file and checks for a
shell prompt pattern. It stops when the prompt is detected or when
the timeout expires.
:param uart_log_path: Remote UART log file path.
:param options: Optional structured wait configuration. If not
provided, values may be supplied via backward-compatible
keyword arguments.
:param timeout_s: (kwarg) Maximum time in seconds to wait for the
shell prompt. Defaults to 120.
:param poll_s: (kwarg) Polling interval in seconds between log checks.
Defaults to 1.0.
:param tail_lines: (kwarg) Number of log lines to inspect on each
poll. Defaults to 200.
:returns: ``None``
:raises RuntimeError: If the SSH client is not connected.
:raises TimeoutError: If the shell prompt is not detected within
the timeout.
"""
if options is None:
options = ShellPromptWaitOptions(
timeout_s=int(kwargs.pop("timeout_s", 120)),
poll_s=float(kwargs.pop("poll_s", 1.0)),
tail_lines=int(kwargs.pop("tail_lines", 200)),
)
self._ensure_ssh_client()
deadline = time.time() + options.timeout_s
while time.time() < deadline:
tail_cmd = (
f"tail -n {options.tail_lines} "
f"{self._shell_escape(uart_log_path)} "
"2>/dev/null || true"
)
cmd_tail = "bash -lc " + self._shell_escape(tail_cmd)
_, out, _ = self._client.exec_command(cmd_tail)
text = _read_all(out)
if _PROMPT_RE.search(text):
logger.info(
"Shell prompt detected on UART log: %s", uart_log_path
)
return
time.sleep(options.poll_s)
raise TimeoutError(
(
"Shell prompt not detected within "
f"{options.timeout_s}s for {uart_log_path}"
)
)
[docs]
def wait_login_and_send_root(
self, options: Optional[LoginWaitOptions] = None, **kwargs
) -> Tuple[str, int, int]:
"""
Wait for a UART login prompt, then wait for a shell prompt.
This method scans available UART logs for a login marker, sends the
``root`` username to the detected VUART link, and then waits until
a shell prompt becomes available.
:param options: Optional structured login wait configuration. If not
provided, values may be supplied via backward-compatible
keyword arguments.
:param timeout_s: (kwarg) Maximum time in seconds to wait for the
login prompt. Defaults to 900.
:param poll_s: (kwarg) Polling interval in seconds between login
checks. Defaults to 2.0.
:param tail_lines: (kwarg) Number of UART log lines to inspect
during each poll. Defaults to 150.
:param shell_prompt_timeout_s: (kwarg) Maximum time in seconds to
wait for the shell prompt after sending ``root``. Defaults to 120.
:returns: Tuple ``(uart_log_path, vuart, pts)`` identifying the
UART log file and its associated VUART and PTS numbers.
:returns: Tuple[str, int, int]
:raises RuntimeError: If the SSH client is not connected.
:raises TimeoutError: If the login prompt or shell prompt is not
detected within the timeout.
"""
if options is None:
options = LoginWaitOptions(
timeout_s=int(kwargs.pop("timeout_s", 900)),
poll_s=float(kwargs.pop("poll_s", 2.0)),
tail_lines=int(kwargs.pop("tail_lines", 150)),
shell_prompt_timeout_s=int(
kwargs.pop("shell_prompt_timeout_s", 120)
),
)
run_dir = self.wait_for_run_dir(timeout_s=min(180, options.timeout_s))
deadline = time.time() + options.timeout_s
while time.time() < deadline:
hit = self.find_uart_login(run_dir, tail_lines=options.tail_lines)
if not hit:
time.sleep(options.poll_s)
continue
vuart, pts, uart_log_path = hit
logger.info(
(
"Login prompt detected (vuart=%d pts=%d "
"uart_log=%s). Sending root..."
),
vuart,
pts,
uart_log_path,
)
time.sleep(0.5)
self.send_to_vuart_link_sftp(run_dir, "root", vuart=vuart, pts=pts)
self.wait_for_shell_prompt(
uart_log_path,
ShellPromptWaitOptions(
timeout_s=options.shell_prompt_timeout_s
),
)
logger.info(
"Root shell prompt ready (vuart=%d pts=%d)", vuart, pts
)
return (uart_log_path, vuart, pts)
raise TimeoutError(f"login: not detected within {options.timeout_s}s")
[docs]
def run_uart_command(self, cmd: str, **kwargs) -> None:
"""
Send a command to a specific UART by writing to its VUART link.
The command is appended with a newline and written to the
corresponding ``RUN_DIR/vuart_<vuart>__dev_pts_<pts>`` link.
:param cmd: Command string to send to the UART.
:param vuart: (kwarg) VUART index.
:param pts: (kwarg) PTS number associated with the VUART.
:returns: ``None``
:raises RuntimeError: If the SSH client is not connected.
"""
vuart = kwargs.pop("vuart")
pts = kwargs.pop("pts")
run_dir = self.wait_for_run_dir(timeout_s=180)
payload = cmd.rstrip("\r\n") + "\r\n"
logger.info(
"UART cmd (vuart=%d pts=%d): %s", int(vuart), int(pts), cmd
)
self.send_to_vuart_link_sftp(run_dir, payload, vuart=vuart, pts=pts)
time.sleep(0.3)
[docs]
def list_uart_logs(self) -> List[Tuple[int, int, str]]:
"""
List UART log files under the current ``RUN_DIR``.
:returns: Sorted list of tuples ``(vuart, pts, uart_log_path)``.
:raises RuntimeError: If SSH is not connected or RUN_DIR is unknown.
"""
self._ensure_ssh_client()
run_dir = self.wait_for_run_dir(timeout_s=180)
cmd = (
"bash -lc 'ls -1 "
f"{self._shell_escape(run_dir)}/UART_log_vuart_* "
"2>/dev/null || true'"
)
_, out, _ = self._client.exec_command(cmd)
logs: List[Tuple[int, int, str]] = []
for line in out.read().decode("utf-8", errors="replace").splitlines():
m = re.search(r"UART_log_vuart_(\d+)__dev_pts_(\d+)_\.txt$", line)
if m:
logs.append((int(m.group(1)), int(m.group(2)), line.strip()))
logs.sort(key=lambda x: x[0])
return logs
[docs]
def read_uart_log(self, uart_log_path: str) -> str:
"""
Read the full contents of a UART log file.
:param uart_log_path: Remote UART log file path.
:returns: UART log contents (UTF-8 decoded with replacement).
:raises RuntimeError: If SSH is not connected.
"""
self._ensure_ssh_client()
cmd = "bash -lc " + self._shell_escape(
f"cat {self._shell_escape(uart_log_path)} 2>/dev/null || true"
)
_, out, _ = self._client.exec_command(cmd)
return out.read().decode("utf-8", errors="replace")
[docs]
def tail_uart_log(self, uart_log_path: str, *, lines: int = 200) -> str:
"""
Read the tail of a UART log file.
:param uart_log_path: Remote UART log file path.
:param lines: Number of lines from the end of the file to return.
:returns: UART log tail (UTF-8 decoded with replacement).
:raises RuntimeError: If SSH is not connected.
"""
self._ensure_ssh_client()
tail_cmd = (
f"tail -n {lines} {self._shell_escape(uart_log_path)} "
"2>/dev/null || true"
)
cmd = "bash -lc " + self._shell_escape(tail_cmd)
_, out, _ = self._client.exec_command(cmd)
return out.read().decode("utf-8", errors="replace")
[docs]
def find_uarts_with_strings(
self, strings: List[str], *, tail_lines: int = 300
) -> List[Tuple[int, int, str]]:
"""
Find UART logs whose tail contains any of the provided strings.
:param strings: List of substrings to search for.
:param tail_lines: Number of tail lines to inspect for each UART log.
:returns: List of tuples ``(vuart, pts, uart_log_path)`` that matched.
:raises RuntimeError: If SSH is not connected.
"""
matches: List[Tuple[int, int, str]] = []
for vuart, pts, path in self.list_uart_logs():
text = self.tail_uart_log(path, lines=tail_lines)
if any(s in text for s in strings):
matches.append((vuart, pts, path))
return matches
[docs]
def start_uart_keepalive(self, **kwargs) -> None:
"""
This can help keep UART sessions active and stimulate console output
while waiting for prompts.
"""
vuart = int(kwargs.pop("vuart"))
pts = int(kwargs.pop("pts"))
interval_s = float(kwargs.pop("interval_s", 25.0))
if hasattr(self, "_keepalive_stop"):
pass
else:
self._keepalive_stop = threading.Event()
self._keepalive_thread = None
def _worker() -> None:
logger.info(
"UART keepalive started (vuart=%d pts=%d every %.1fs)",
vuart,
pts,
interval_s,
)
while not self._keepalive_stop.is_set():
try:
self.run_uart_command("", vuart=vuart, pts=pts)
except Exception as exc:
logger.warning("UART keepalive write failed: %s", exc)
self._keepalive_stop.wait(interval_s)
logger.info("UART keepalive stopped")
t = threading.Thread(
target=_worker, name="uart-keepalive", daemon=True
)
self._keepalive_thread = t
t.start()
[docs]
def stop_uart_keepalive(self) -> None:
"""
Stop the UART keepalive thread if it is running.
"""
if hasattr(self, "_keepalive_stop"):
self._keepalive_stop.set()
t = getattr(self, "_keepalive_thread", None)
if t:
t.join(timeout=5)
[docs]
def map_uarts_by_predicate(
self,
predicate: Callable[[str], bool],
*,
tail_lines: int = 300,
) -> List[Tuple[int, int, str]]:
"""
Apply a predicate to UART log text and return matching UARTs.
:param predicate: Callable ``predicate(text) -> bool``.
:param tail_lines: Number of lines from the UART log tail to inspect.
:returns: List of tuples ``(vuart, pts, uart_log_path)``.
"""
matches: List[Tuple[int, int, str]] = []
for vuart, pts, log_path in self.list_uart_logs():
text = self.tail_uart_log(log_path, lines=tail_lines)
try:
if predicate(text):
matches.append((vuart, pts, log_path))
except Exception as exc:
logger.debug(
"Predicate raised exception on vuart=%d log=%s: %s",
vuart,
log_path,
exc,
)
return matches
# ---------------------------------------------------------------------
# Download RUN_DIR logs
# ---------------------------------------------------------------------
[docs]
def download_run_dir_logs(self) -> Path:
"""
Download all files under the current remote ``RUN_DIR`` to local logs.
The destination directory is:
``<CWD>/logs/<platform>_<ts>/remote_run_logs/``
:returns: Local destination path for downloaded remote logs.
:raises RuntimeError: If SSH is not connected or RUN_DIR is unknown.
"""
self._ensure_ssh_client()
run_dir = self._last_run_dir
if not run_dir:
raise RuntimeError("RUN_DIR not known; cannot download logs")
self._ensure_local_run_dirs()
assert self._remote_run_logs_local_dir is not None
local_dst = self._remote_run_logs_local_dir
logger.info(
"Downloading RUN_DIR logs from %s -> %s", run_dir, local_dst
)
sftp = self._client.open_sftp()
try:
try:
self._sftp_get_dir_recursive(sftp, run_dir, local_dst)
except (IOError, OSError, paramiko.SFTPError, RuntimeError) as exc:
logger.exception(
"Failed to download RUN_DIR logs from %s to %s: %s",
run_dir,
local_dst,
exc,
)
# Have a clear error to callers
# while preserving original exception
raise RuntimeError(
f"Failed to download RUN_DIR logs from {run_dir}: {exc}"
) from exc
finally:
try:
sftp.close()
except Exception as exc:
# Closing can fail; log at debug level
# but do not mask previous errors
logger.debug("Failed to close SFTP session: %s", exc)
logger.info("RUN_DIR logs downloaded to: %s", local_dst)
return local_dst
[docs]
def _sftp_get_dir_recursive(
self, sftp: paramiko.SFTPClient, remote_dir: str, local_dir: Path
) -> None:
"""
Recursively download a remote directory using SFTP.
:param sftp: Active Paramiko SFTP client.
:param remote_dir: Remote directory path to download.
:param local_dir: Local directory to write downloaded files into.
"""
import stat
local_dir.mkdir(parents=True, exist_ok=True)
for entry in sftp.listdir_attr(remote_dir):
rpath = f"{remote_dir}/{entry.filename}"
lpath = local_dir / entry.filename
if stat.S_ISDIR(entry.st_mode):
self._sftp_get_dir_recursive(sftp, rpath, lpath)
else:
try:
sftp.get(rpath, str(lpath))
except Exception as exc:
logger.debug("Skipping %s (%s)", rpath, exc)
@staticmethod
[docs]
def _shell_escape(s: str) -> str:
"""
Escape a string for safe inclusion in a single-quoted shell literal.
:param s: Input string.
:returns: Single-quoted, shell-safe literal.
"""
return "'" + s.replace("'", "'\"'\"'") + "'"