Source code for tests.utils.cpu_utils

# SPDX-FileCopyrightText: <text>Copyright 2025-2026 Arm Limited
# and/or its affiliates <open-source-office@arm.com></text>
#
# SPDX-License-Identifier: MIT

from __future__ import annotations
import posixpath as p
import time
import pytest
from typing import Dict, List

from test_automation.utils.io_utils import (
    write_to_port,
    check_if_file_exists,
    read_int,
    read_file_from_port,
)
from test_automation.utils.auto_platform_base import AutoTestPlatformBase


# Moved the CpuUtils from test_bsp_demos/utils to tests/utils
[docs] class CpuUtils: """Utility class for CPU-related operations.""" @staticmethod
[docs] def get_clusters_and_cores( cpu_count: int, cores_per_cluster: int, ) -> Dict[int, List[int]]: """ Return a mapping of cluster IDs to lists of core IDs. :param cpu_count: Total number of CPUs. :param cores_per_cluster: Number of cores per cluster. :returns: Mapping ``cluster_id -> [core_id, ...]``. """ clusters: Dict[int, List[int]] = {} for cpu_id in range(cpu_count): cluster_id = cpu_id // cores_per_cluster core_id = cpu_id % cores_per_cluster clusters.setdefault(cluster_id, []).append(core_id) return clusters
@staticmethod
[docs] def get_cpu_index( cluster_id: int, core_id: int, cores_per_cluster: int, ) -> int: """ Compute a global CPU index from cluster and core IDs. :param cluster_id: Cluster identifier. :param core_id: Core identifier within the cluster. :param cores_per_cluster: Number of cores per cluster. :returns: Global CPU index. """ return (cluster_id * cores_per_cluster) + core_id
[docs] def _list_state_dirs( self, mgr, port, cpu: int, cpu_sysfs: str ) -> Dict[str, str]: """ List cpuidle state directories for a given CPU. :param cpu: CPU index to list states for. :param cpu_sysfs: Base sysfs path for CPU information. :returns: Mapping of state names to their sysfs paths. """ base = p.join(cpu_sysfs, f"cpu{cpu}", "cpuidle") # List C-state directories robustly cmd = ( f'for d in "{base}"/state*; do ' 'if [ -d "$d" ]; then basename "$d"; fi; ' "done" ) out = mgr.run_cmd(port, cmd) states: Dict[str, str] = {} for line in out.splitlines(): name = line.strip() if name: states[name] = p.join(base, name) return states
[docs] def _load_states_per_cpu( self, platform_base_obj, names, cpu_sysfs ) -> Dict[int, Dict[str, str]]: """ Load cpuidle states for each CPU. :param platform_base_obj: Object containing configuration and manager. :param names: List of state names to load. :param cpu_sysfs: Base sysfs path for CPU information. :returns: Mapping of CPU indices to their cpuidle states. """ cpu_count = int(platform_base_obj.config_data.get("pc_cpu_count")) mgr = platform_base_obj.mgr port = platform_base_obj.default_console if cpu_count <= 0: pytest.skip("target did not report any CPUs") states_per_cpu: Dict[int, Dict[str, str]] = {} required_states = len(names) for cpu in range(cpu_count): states = self._list_state_dirs(mgr, port, cpu, cpu_sysfs) if len(states.keys()) < required_states: pytest.skip(f"no C-states found for cpu{cpu}") states_per_cpu[cpu] = states return states_per_cpu
[docs] def _check_default_status_for_state( self, mgr, port, cpu: int, state_id: str, states: Dict[str, str], ): """ Check the default status for a given cpuidle state on a specific CPU. :param mgr: Manager object for executing commands. :param port: Port identifier for command execution. :param cpu: CPU index to check. :param state_id: Identifier of the cpuidle state to check. :param states: Mapping of state names to their sysfs paths for the CPU. """ default_path = p.join(states[state_id], "default_status") assert check_if_file_exists( mgr, port, default_path ), f"default_status not exposed on cpu{cpu}:{state_id}" val = read_file_from_port(mgr, port, default_path).strip() assert ( val == "enabled" ), f"{state_id} not enabled by default on cpu{cpu}"
[docs] def _validate_required_files(self, mgr, port, *paths: str) -> None: """ Validate that required cpuidle files exist. :param mgr: Manager object for executing commands. :param port: Port identifier for command execution. :param paths: List of file paths to validate. :returns: None """ for path in paths: assert check_if_file_exists(mgr, port, path), f"missing {path}"
[docs] def _verify_usage_increases( self, mgr, port, usage_path: str, cpu: int, state_id: str, time_short: int = 60, ): """ Verify that the usage count increases over a short period. :param mgr: Manager object for executing commands. :param port: Port identifier for command execution. :param usage_path: Path to the usage file for the state. :param cpu: CPU index to check. :param state_id: Identifier of the cpuidle state to check. :param time_short: Time to wait between checks in seconds. :returns: None """ usage0 = read_int(mgr, port, usage_path) time.sleep(time_short) usage1 = read_int(mgr, port, usage_path) assert ( usage1 > usage0 ), f"no usage increase before disable on cpu{cpu}:{state_id}"
[docs] def _disable_state( self, mgr, port, disable_path: str, cpu: int, state_id: str, original_disable_values: Dict[int, Dict[str, str]], ): """ Disable a specific cpuidle state for a given CPU. :param mgr: Manager object for executing commands. :param port: Port identifier for command execution. :param disable_path: Path to the disable file for the state. :param cpu: CPU index to disable the state on. :param state_id: Identifier of the cpuidle state to disable. :param original_disable_values: Dictionary to store original disable values for restoration. """ if cpu not in original_disable_values: original_disable_values[cpu] = {} original_disable_values[cpu][state_id] = read_file_from_port( mgr, port, disable_path ).strip() write_to_port(mgr, port, disable_path, 1) assert read_file_from_port(mgr, port, disable_path).strip() == "1", ( "disable path did not reflect requested value for " f"cpu{cpu}:{state_id}" )
[docs] def _verify_usage_stays_same_when_disabled( self, mgr, port, usage_path: str, cpu: int, state_id: str, time_short: int = 60, ): """ Verify that the usage count does not change when the state is disabled. :param mgr: Manager object for executing commands. :param port: Port identifier for command execution. :param usage_path: Path to the usage file for the state. :param cpu: CPU index to check. :param state_id: Identifier of the cpuidle state to check. :param time_short: Time to wait between checks in seconds. """ usage_a = read_int(mgr, port, usage_path) time.sleep(time_short) usage_b = read_int(mgr, port, usage_path) # While respective C-state is disabled, usage should not increase assert ( usage_a == usage_b ), f"usage changed while {state_id} disabled on cpu{cpu}"
[docs] def _verify_latency_residency_values( self, mgr, port, base: str, props: Dict[str, int], cpu: int, state_id: str, ): """ Verify latency and residency values for a specific C-state on a CPU. :param mgr: Manager object for executing commands. :param port: Port identifier for command execution. :param base: Base sysfs path for the C-state. :param props: Expected properties containing 'latency' and 'residency'. :param cpu: CPU index to check. :param state_id: Identifier of the cpuidle state to check. """ latency = read_int(mgr, port, p.join(base, "latency")) residency = read_int(mgr, port, p.join(base, "residency")) assert ( latency == props["latency"] ), f"latency mismatch on cpu{cpu}:{state_id} (got {latency})" assert ( residency == props["residency"] ), f"residency mismatch on cpu{cpu}:{state_id} (got {residency})"
[docs] def _verify_usage_time_advancement( self, mgr, port, base: str, cpu: int, state_id: str, time_long: int = 120, ): """ Verify that usage and time values advance over a longer period. :param mgr: Manager object for executing commands. :param port: Port identifier for command execution. :param base: Base sysfs path for the C-state. :param cpu: CPU index to check. :param state_id: Identifier of the cpuidle state to check. :param time_long: Time to wait between checks in seconds. """ usage1 = read_int(mgr, port, p.join(base, "usage")) time1 = read_int(mgr, port, p.join(base, "time")) time.sleep(time_long) usage2 = read_int(mgr, port, p.join(base, "usage")) time2 = read_int(mgr, port, p.join(base, "time")) # Usage should have increased, and time should have advanced assert ( usage2 > usage1 ), f"state {state_id} not entered after {time_long}s on cpu{cpu}" assert time2 > time1, f"time did not advance for cpu{cpu}:{state_id}"
[docs] def available_governors(self, mgr, port, cpuidle_dir) -> List[str]: """ List available cpuidle governors. :param mgr: Manager object for executing commands. :param port: Port identifier for command execution. :param cpuidle_dir: Base directory for cpuidle sysfs entries. :returns: List of available cpuidle governors. """ path = p.join(cpuidle_dir, "available_governors") assert check_if_file_exists( mgr, port, path ), f"{path} not present on target" return read_file_from_port(mgr, port, path).strip().split()
[docs] def current_governor(self, mgr, port, cpuidle_dir, ro: bool = True) -> str: """ Get the current cpuidle governor. :param mgr: Manager object for executing commands. :param port: Port identifier for command execution. :param cpuidle_dir: Base directory for cpuidle sysfs entries. :param ro: Whether to read from the read-only current_governor_ro file. :returns: Name of the current cpuidle governor. """ fname = "current_governor_ro" if ro else "current_governor" path = p.join(cpuidle_dir, fname) assert check_if_file_exists( mgr, port, path ), f"{path} not present on target" return read_file_from_port(mgr, port, path).strip()
[docs] def set_cpu( self, platform_base_obj: AutoTestPlatformBase, cpu_num: int, flag: str ) -> bool: """ Set a CPU core online/offline and verify the new state. :param platform_base_obj: Platform fixture with console access. :param cpu_num: CPU index to modify. :param flag: Desired CPU online state as string. :returns: True when the value matches flag otherwise False """ mgr = platform_base_obj.mgr mgr.run_cmd( platform_base_obj.default_console, f'echo "{flag}" > \ "/sys/devices/system/cpu/cpu{cpu_num}/online"', ) output = mgr.run_cmd( platform_base_obj.default_console, f"cat '/sys/devices/system/cpu/cpu{cpu_num}/online'", ) return output == flag
[docs] def enable_cpu( self, platform_base_obj: AutoTestPlatformBase, cpu_num: int ) -> bool: """Enable a specific CPU core.""" return self.set_cpu(platform_base_obj, cpu_num, "1")
[docs] def disable_cpu( self, platform_base_obj: AutoTestPlatformBase, cpu_num: int ) -> bool: """Disable a specific CPU core.""" return self.set_cpu(platform_base_obj, cpu_num, "0")
[docs] def validate_cpu_count_from_devicetree( self, platform_base_obj: AutoTestPlatformBase, expected_num_cpus: int ) -> None: """ Validate CPU count in device tree against expected value. :param platform_base_obj: Platform fixture with console access. :param expected_num_cpus: Expected number of CPU nodes in device tree. :raises AssertionError: If CPU count does not match expected value. """ mgr = platform_base_obj.mgr cpus = mgr.run_cmd( platform_base_obj.default_console, "find /sys/firmware/devicetree/base/cpus/" ' -name "cpu@*" -maxdepth 1 | wc -l', ) assert int(cpus.split("\n")[-1]) == expected_num_cpus, ( f"Expected {expected_num_cpus} CPUs," f"but only {cpus} CPUs are online" )
[docs] def assert_all_cores_online( self, platform_base_obj: AutoTestPlatformBase, expected_num_cpus: int ) -> None: """ Check that all expected CPU cores are currently online. :param platform_base_obj: Platform fixture with console access. :param expected_num_cpus: Expected number of online processors. :raises AssertionError: If online CPU count differs from expected """ mgr = platform_base_obj.mgr cpus = mgr.run_cmd( platform_base_obj.default_console, 'grep -c "processor" /proc/cpuinfo', ) assert int(cpus.split("\n")[-1]) == expected_num_cpus, ( f"Expected {expected_num_cpus} CPUs online," f"but only {cpus} CPUs are online" )
[docs] def stop_individual_core( self, platform_base_obj: AutoTestPlatformBase, expected_num_cpus: int ) -> None: """ Verify per-core CPU hotplug by toggling each core off then on. :param platform_base_obj: Platform fixture with console access. :param expected_num_cpus: Expected number of online processors. :raises AssertionError: If online CPU count differs from expected """ for i in range(expected_num_cpus): assert self.disable_cpu( platform_base_obj, i ), f"Failed to disable core number: {i}" assert self.enable_cpu( platform_base_obj, i ), f"Failed to enable core number: {i}"
[docs] def reenable_all_cpus( self, platform_base_obj: AutoTestPlatformBase, expected_num_cpus: int ) -> None: """ Attempt to re-enable all CPU cores and assert recovery :param platform_base_obj: Platform fixture with console access. :param expected_num_cpus: number of CPU cores expected to be online. :raises AssertionError: If one or more CPU cores cannot be re-enabled. """ offline_cpus = [] for core in range(expected_num_cpus): if not self.enable_cpu(platform_base_obj, core): offline_cpus.append(core) f"Failed to enable core number: {core}" assert ( len(offline_cpus) == 0 ), f"Failed to enable {len(offline_cpus)} CPU's'"
[docs] def check_cannot_disable_all_cores( self, platform_base_obj: AutoTestPlatformBase, expected_num_cpus: int ) -> None: """ Verify the system refuses disabling the final online CPU core. :param platform_base_obj: Platform fixture with console access. :param expected_num_cpus: Total number of CPU cores in the system. :raises AssertionError: last core disable unexpectedly succeeds. """ for cpu_id in range(expected_num_cpus - 1): assert self.disable_cpu( platform_base_obj, cpu_id ), f"Failed to disable core number: {cpu_id}" last_cpu = expected_num_cpus - 1 try: # Disabling the final online CPU is expected to fail result = self.disable_cpu(platform_base_obj, last_cpu) except RuntimeError: return assert not result, ( "Unexpectedly disabled last core number: " f"{last_cpu}" )
__all__ = ["CpuUtils"]