# SPDX-FileCopyrightText: <text>Copyright 2025-2026 Arm Limited
# and/or its affiliates <open-source-office@arm.com></text>
#
# SPDX-License-Identifier: MIT
from __future__ import annotations
import posixpath as p
import time
import pytest
from typing import Dict, List
from test_automation.utils.io_utils import (
write_to_port,
check_if_file_exists,
read_int,
read_file_from_port,
)
from test_automation.utils.auto_platform_base import AutoTestPlatformBase
# Moved the CpuUtils from test_bsp_demos/utils to tests/utils
[docs]
class CpuUtils:
"""Utility class for CPU-related operations."""
@staticmethod
[docs]
def get_clusters_and_cores(
cpu_count: int,
cores_per_cluster: int,
) -> Dict[int, List[int]]:
"""
Return a mapping of cluster IDs to lists of core IDs.
:param cpu_count: Total number of CPUs.
:param cores_per_cluster: Number of cores per cluster.
:returns: Mapping ``cluster_id -> [core_id, ...]``.
"""
clusters: Dict[int, List[int]] = {}
for cpu_id in range(cpu_count):
cluster_id = cpu_id // cores_per_cluster
core_id = cpu_id % cores_per_cluster
clusters.setdefault(cluster_id, []).append(core_id)
return clusters
@staticmethod
[docs]
def get_cpu_index(
cluster_id: int,
core_id: int,
cores_per_cluster: int,
) -> int:
"""
Compute a global CPU index from cluster and core IDs.
:param cluster_id: Cluster identifier.
:param core_id: Core identifier within the cluster.
:param cores_per_cluster: Number of cores per cluster.
:returns: Global CPU index.
"""
return (cluster_id * cores_per_cluster) + core_id
[docs]
def _list_state_dirs(
self, mgr, port, cpu: int, cpu_sysfs: str
) -> Dict[str, str]:
"""
List cpuidle state directories for a given CPU.
:param cpu: CPU index to list states for.
:param cpu_sysfs: Base sysfs path for CPU information.
:returns: Mapping of state names to their sysfs paths.
"""
base = p.join(cpu_sysfs, f"cpu{cpu}", "cpuidle")
# List C-state directories robustly
cmd = (
f'for d in "{base}"/state*; do '
'if [ -d "$d" ]; then basename "$d"; fi; '
"done"
)
out = mgr.run_cmd(port, cmd)
states: Dict[str, str] = {}
for line in out.splitlines():
name = line.strip()
if name:
states[name] = p.join(base, name)
return states
[docs]
def _load_states_per_cpu(
self, platform_base_obj, names, cpu_sysfs
) -> Dict[int, Dict[str, str]]:
"""
Load cpuidle states for each CPU.
:param platform_base_obj: Object containing configuration and manager.
:param names: List of state names to load.
:param cpu_sysfs: Base sysfs path for CPU information.
:returns: Mapping of CPU indices to their cpuidle states.
"""
cpu_count = int(platform_base_obj.config_data.get("pc_cpu_count"))
mgr = platform_base_obj.mgr
port = platform_base_obj.default_console
if cpu_count <= 0:
pytest.skip("target did not report any CPUs")
states_per_cpu: Dict[int, Dict[str, str]] = {}
required_states = len(names)
for cpu in range(cpu_count):
states = self._list_state_dirs(mgr, port, cpu, cpu_sysfs)
if len(states.keys()) < required_states:
pytest.skip(f"no C-states found for cpu{cpu}")
states_per_cpu[cpu] = states
return states_per_cpu
[docs]
def _check_default_status_for_state(
self,
mgr,
port,
cpu: int,
state_id: str,
states: Dict[str, str],
):
"""
Check the default status for a given cpuidle state on a specific CPU.
:param mgr: Manager object for executing commands.
:param port: Port identifier for command execution.
:param cpu: CPU index to check.
:param state_id: Identifier of the cpuidle state to check.
:param states: Mapping of state names to their sysfs paths for the CPU.
"""
default_path = p.join(states[state_id], "default_status")
assert check_if_file_exists(
mgr, port, default_path
), f"default_status not exposed on cpu{cpu}:{state_id}"
val = read_file_from_port(mgr, port, default_path).strip()
assert (
val == "enabled"
), f"{state_id} not enabled by default on cpu{cpu}"
[docs]
def _validate_required_files(self, mgr, port, *paths: str) -> None:
"""
Validate that required cpuidle files exist.
:param mgr: Manager object for executing commands.
:param port: Port identifier for command execution.
:param paths: List of file paths to validate.
:returns: None
"""
for path in paths:
assert check_if_file_exists(mgr, port, path), f"missing {path}"
[docs]
def _verify_usage_increases(
self,
mgr,
port,
usage_path: str,
cpu: int,
state_id: str,
time_short: int = 60,
):
"""
Verify that the usage count increases over a short period.
:param mgr: Manager object for executing commands.
:param port: Port identifier for command execution.
:param usage_path: Path to the usage file for the state.
:param cpu: CPU index to check.
:param state_id: Identifier of the cpuidle state to check.
:param time_short: Time to wait between checks in seconds.
:returns: None
"""
usage0 = read_int(mgr, port, usage_path)
time.sleep(time_short)
usage1 = read_int(mgr, port, usage_path)
assert (
usage1 > usage0
), f"no usage increase before disable on cpu{cpu}:{state_id}"
[docs]
def _disable_state(
self,
mgr,
port,
disable_path: str,
cpu: int,
state_id: str,
original_disable_values: Dict[int, Dict[str, str]],
):
"""
Disable a specific cpuidle state for a given CPU.
:param mgr: Manager object for executing commands.
:param port: Port identifier for command execution.
:param disable_path: Path to the disable file for the state.
:param cpu: CPU index to disable the state on.
:param state_id: Identifier of the cpuidle state to disable.
:param original_disable_values:
Dictionary to store original disable values for restoration.
"""
if cpu not in original_disable_values:
original_disable_values[cpu] = {}
original_disable_values[cpu][state_id] = read_file_from_port(
mgr, port, disable_path
).strip()
write_to_port(mgr, port, disable_path, 1)
assert read_file_from_port(mgr, port, disable_path).strip() == "1", (
"disable path did not reflect requested value for "
f"cpu{cpu}:{state_id}"
)
[docs]
def _verify_usage_stays_same_when_disabled(
self,
mgr,
port,
usage_path: str,
cpu: int,
state_id: str,
time_short: int = 60,
):
"""
Verify that the usage count does not change when the state is disabled.
:param mgr: Manager object for executing commands.
:param port: Port identifier for command execution.
:param usage_path: Path to the usage file for the state.
:param cpu: CPU index to check.
:param state_id: Identifier of the cpuidle state to check.
:param time_short: Time to wait between checks in seconds.
"""
usage_a = read_int(mgr, port, usage_path)
time.sleep(time_short)
usage_b = read_int(mgr, port, usage_path)
# While respective C-state is disabled, usage should not increase
assert (
usage_a == usage_b
), f"usage changed while {state_id} disabled on cpu{cpu}"
[docs]
def _verify_latency_residency_values(
self,
mgr,
port,
base: str,
props: Dict[str, int],
cpu: int,
state_id: str,
):
"""
Verify latency and residency values for a specific C-state on a CPU.
:param mgr: Manager object for executing commands.
:param port: Port identifier for command execution.
:param base: Base sysfs path for the C-state.
:param props: Expected properties containing 'latency' and 'residency'.
:param cpu: CPU index to check.
:param state_id: Identifier of the cpuidle state to check.
"""
latency = read_int(mgr, port, p.join(base, "latency"))
residency = read_int(mgr, port, p.join(base, "residency"))
assert (
latency == props["latency"]
), f"latency mismatch on cpu{cpu}:{state_id} (got {latency})"
assert (
residency == props["residency"]
), f"residency mismatch on cpu{cpu}:{state_id} (got {residency})"
[docs]
def _verify_usage_time_advancement(
self,
mgr,
port,
base: str,
cpu: int,
state_id: str,
time_long: int = 120,
):
"""
Verify that usage and time values advance over a longer period.
:param mgr: Manager object for executing commands.
:param port: Port identifier for command execution.
:param base: Base sysfs path for the C-state.
:param cpu: CPU index to check.
:param state_id: Identifier of the cpuidle state to check.
:param time_long: Time to wait between checks in seconds.
"""
usage1 = read_int(mgr, port, p.join(base, "usage"))
time1 = read_int(mgr, port, p.join(base, "time"))
time.sleep(time_long)
usage2 = read_int(mgr, port, p.join(base, "usage"))
time2 = read_int(mgr, port, p.join(base, "time"))
# Usage should have increased, and time should have advanced
assert (
usage2 > usage1
), f"state {state_id} not entered after {time_long}s on cpu{cpu}"
assert time2 > time1, f"time did not advance for cpu{cpu}:{state_id}"
[docs]
def available_governors(self, mgr, port, cpuidle_dir) -> List[str]:
"""
List available cpuidle governors.
:param mgr: Manager object for executing commands.
:param port: Port identifier for command execution.
:param cpuidle_dir: Base directory for cpuidle sysfs entries.
:returns: List of available cpuidle governors.
"""
path = p.join(cpuidle_dir, "available_governors")
assert check_if_file_exists(
mgr, port, path
), f"{path} not present on target"
return read_file_from_port(mgr, port, path).strip().split()
[docs]
def current_governor(self, mgr, port, cpuidle_dir, ro: bool = True) -> str:
"""
Get the current cpuidle governor.
:param mgr: Manager object for executing commands.
:param port: Port identifier for command execution.
:param cpuidle_dir: Base directory for cpuidle sysfs entries.
:param ro: Whether to read from the read-only current_governor_ro file.
:returns: Name of the current cpuidle governor.
"""
fname = "current_governor_ro" if ro else "current_governor"
path = p.join(cpuidle_dir, fname)
assert check_if_file_exists(
mgr, port, path
), f"{path} not present on target"
return read_file_from_port(mgr, port, path).strip()
[docs]
def set_cpu(
self, platform_base_obj: AutoTestPlatformBase, cpu_num: int, flag: str
) -> bool:
"""
Set a CPU core online/offline and verify the new state.
:param platform_base_obj: Platform fixture with console access.
:param cpu_num: CPU index to modify.
:param flag: Desired CPU online state as string.
:returns: True when the value matches flag otherwise False
"""
mgr = platform_base_obj.mgr
mgr.run_cmd(
platform_base_obj.default_console,
f'echo "{flag}" > \
"/sys/devices/system/cpu/cpu{cpu_num}/online"',
)
output = mgr.run_cmd(
platform_base_obj.default_console,
f"cat '/sys/devices/system/cpu/cpu{cpu_num}/online'",
)
return output == flag
[docs]
def enable_cpu(
self, platform_base_obj: AutoTestPlatformBase, cpu_num: int
) -> bool:
"""Enable a specific CPU core."""
return self.set_cpu(platform_base_obj, cpu_num, "1")
[docs]
def disable_cpu(
self, platform_base_obj: AutoTestPlatformBase, cpu_num: int
) -> bool:
"""Disable a specific CPU core."""
return self.set_cpu(platform_base_obj, cpu_num, "0")
[docs]
def validate_cpu_count_from_devicetree(
self, platform_base_obj: AutoTestPlatformBase, expected_num_cpus: int
) -> None:
"""
Validate CPU count in device tree against expected value.
:param platform_base_obj: Platform fixture with console access.
:param expected_num_cpus: Expected number of CPU nodes in device tree.
:raises AssertionError: If CPU count does not match expected value.
"""
mgr = platform_base_obj.mgr
cpus = mgr.run_cmd(
platform_base_obj.default_console,
"find /sys/firmware/devicetree/base/cpus/"
' -name "cpu@*" -maxdepth 1 | wc -l',
)
assert int(cpus.split("\n")[-1]) == expected_num_cpus, (
f"Expected {expected_num_cpus} CPUs,"
f"but only {cpus} CPUs are online"
)
[docs]
def assert_all_cores_online(
self, platform_base_obj: AutoTestPlatformBase, expected_num_cpus: int
) -> None:
"""
Check that all expected CPU cores are currently online.
:param platform_base_obj: Platform fixture with console access.
:param expected_num_cpus: Expected number of online processors.
:raises AssertionError: If online CPU count differs from expected
"""
mgr = platform_base_obj.mgr
cpus = mgr.run_cmd(
platform_base_obj.default_console,
'grep -c "processor" /proc/cpuinfo',
)
assert int(cpus.split("\n")[-1]) == expected_num_cpus, (
f"Expected {expected_num_cpus} CPUs online,"
f"but only {cpus} CPUs are online"
)
[docs]
def stop_individual_core(
self, platform_base_obj: AutoTestPlatformBase, expected_num_cpus: int
) -> None:
"""
Verify per-core CPU hotplug by toggling each core off then on.
:param platform_base_obj: Platform fixture with console access.
:param expected_num_cpus: Expected number of online processors.
:raises AssertionError: If online CPU count differs from expected
"""
for i in range(expected_num_cpus):
assert self.disable_cpu(
platform_base_obj, i
), f"Failed to disable core number: {i}"
assert self.enable_cpu(
platform_base_obj, i
), f"Failed to enable core number: {i}"
[docs]
def reenable_all_cpus(
self, platform_base_obj: AutoTestPlatformBase, expected_num_cpus: int
) -> None:
"""
Attempt to re-enable all CPU cores and assert recovery
:param platform_base_obj: Platform fixture with console access.
:param expected_num_cpus: number of CPU cores expected to be online.
:raises AssertionError: If one or more CPU cores cannot be re-enabled.
"""
offline_cpus = []
for core in range(expected_num_cpus):
if not self.enable_cpu(platform_base_obj, core):
offline_cpus.append(core)
f"Failed to enable core number: {core}"
assert (
len(offline_cpus) == 0
), f"Failed to enable {len(offline_cpus)} CPU's'"
[docs]
def check_cannot_disable_all_cores(
self, platform_base_obj: AutoTestPlatformBase, expected_num_cpus: int
) -> None:
"""
Verify the system refuses disabling the final online CPU core.
:param platform_base_obj: Platform fixture with console access.
:param expected_num_cpus: Total number of CPU cores in the system.
:raises AssertionError: last core disable unexpectedly succeeds.
"""
for cpu_id in range(expected_num_cpus - 1):
assert self.disable_cpu(
platform_base_obj, cpu_id
), f"Failed to disable core number: {cpu_id}"
last_cpu = expected_num_cpus - 1
try:
# Disabling the final online CPU is expected to fail
result = self.disable_cpu(platform_base_obj, last_cpu)
except RuntimeError:
return
assert not result, (
"Unexpectedly disabled last core number: " f"{last_cpu}"
)
__all__ = ["CpuUtils"]