diff --git a/.flake8 b/.flake8 new file mode 100644 index 000000000..ca93e0764 --- /dev/null +++ b/.flake8 @@ -0,0 +1,6 @@ +# Leka - iOS Monorepo +# Copyright APF France handicap +# SPDX-License-Identifier: Apache-2.0 + +[flake8] +max-line-length = 120 diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 000000000..d241c7c42 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,6 @@ +# Leka - iOS Monorepo +# Copyright APF France handicap +# SPDX-License-Identifier: Apache-2.0 + +[LOGGING] +disable=logging-fstring-interpolation diff --git a/requirements.txt b/requirements.txt index 019dd2d7a..d3038bf7c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ mbed-cli>=1.10.0,<2.0 -pyserial>=3,<=3.4 +pyserial>=3,<=3.5 intelhex>=2.3.0,<3.0.0 prettytable>=2.0,<3.0 imgtool>=2.0.0,<3.0.0 +colorama diff --git a/tools/modules/flash_utils.py b/tools/modules/flash_utils.py new file mode 100644 index 000000000..d3c5620a6 --- /dev/null +++ b/tools/modules/flash_utils.py @@ -0,0 +1,128 @@ +# Leka - LekaOS +# Copyright 2024 APF France handicap +# SPDX-License-Identifier: Apache-2.0 + +""" +Utility functions for flashing the OS binary to the device. +""" + +import logging +from pathlib import Path +import subprocess +import sys +import shutil +import time + +from colorama import Fore, Style + + +def check_external_tools(): + """ + Ensure that required external tools are available. + + Raises: + SystemExit: If any required tool is missing. + """ + required_tools = ["openocd", "st-flash"] + missing_tools = [tool for tool in required_tools if not shutil.which(tool)] + if missing_tools: + logging.error(f"Missing required tools: {', '.join(missing_tools)}") + sys.exit(1) + + +def flash_os_and_reset(os_bin_path: Path) -> bool: + """ + Flash the OS binary and reset the device. + + Args: + os_bin_path (Path): Path to the OS binary. + + Returns: + bool: True if both flashing and resetting succeed; False otherwise. + """ + try: + flash_os(str(os_bin_path)) + return True + except SystemExit: + return False + + +def flash_os(os_bin_path: str): + """ + Flash the OS binary to the device using OpenOCD. + + Args: + os_bin_path (str): Path to the OS binary file. + + Raises: + SystemExit: If flashing fails. + """ + print(f"Flashing {os_bin_path}...") + cmd_flash = ( + f"openocd -f interface/stlink.cfg " + f"-c 'transport select hla_swd' " + f"-f target/stm32f7x.cfg " + f"-c 'program {os_bin_path} 0x08000000' " + f"-c exit" + ) + print(cmd_flash) + flash = subprocess.run(cmd_flash, shell=True, check=False) + if flash.returncode != 0: + print(f"Flashing {os_bin_path}... {Fore.RED}❌{Style.RESET_ALL}") + sys.exit(1) + print(f"Flashing {os_bin_path}... {Fore.GREEN}✅{Style.RESET_ALL}") + + time.sleep(1) + + print("Resetting robot...") + cmd_reset = ( + "openocd -f interface/stlink.cfg " + "-c 'transport select hla_swd' " + "-f target/stm32f7x.cfg " + "-c init -c 'reset run' " + "-c exit" + ) + print(cmd_reset) + reset = subprocess.run(cmd_reset, shell=True, check=False) + if reset.returncode != 0: + print(f"Resetting robot... {Fore.RED}❌{Style.RESET_ALL}") + sys.exit(1) + print(f"Resetting robot... {Fore.GREEN}✅{Style.RESET_ALL}") + + time.sleep(1) + + +def erase_flash(): + """ + Erase the flash memory of the device using st-flash. + + Raises: + SystemExit: If erasing fails. + """ + print("Erasing flash...") + cmd_erase = "st-flash --connect-under-reset --reset erase" + ret = subprocess.run(cmd_erase, shell=True) + if ret.returncode != 0: + print(f"Erasing flash... {Fore.RED}❌{Style.RESET_ALL}") + sys.exit(1) + print(f"Erasing flash... {Fore.GREEN}✅{Style.RESET_ALL}") + + +def print_end_success(message: str): + """ + Print a success message in cyan with a checkmark. + + Args: + message (str): The message to print. + """ + print(f"{Fore.CYAN}{message}... ✅{Style.RESET_ALL}") + + +def print_end_failure(message: str): + """ + Print a failure message in red with a cross mark. + + Args: + message (str): The message to print. + """ + print(f"{Fore.RED}{message}... ❌{Style.RESET_ALL}") diff --git a/tools/modules/logger.py b/tools/modules/logger.py new file mode 100644 index 000000000..a1f93ce62 --- /dev/null +++ b/tools/modules/logger.py @@ -0,0 +1,28 @@ +# Leka - LekaOS +# Copyright 2024 APF France handicap +# SPDX-License-Identifier: Apache-2.0 + + +""" +Configure logging settings based on verbosity level. +""" + + +import logging +import sys + + +def configure_logging(verbose: bool): + """ + Configure logging settings based on verbosity level. + + Args: + verbose (bool): If True, set logging level to DEBUG; otherwise, INFO. + """ + logging.basicConfig( + level=logging.DEBUG if verbose else logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + if verbose: + logging.debug("Verbose logging enabled.") diff --git a/tools/modules/serial_utils.py b/tools/modules/serial_utils.py new file mode 100644 index 000000000..c1ec5653f --- /dev/null +++ b/tools/modules/serial_utils.py @@ -0,0 +1,139 @@ +# Leka - LekaOS +# Copyright 2024 APF France handicap +# SPDX-License-Identifier: Apache-2.0 + + +""" +Utility functions for serial communication. +""" + + +import glob +import sys +from time import sleep +from typing import List, Optional +import serial +from colorama import Fore, Style + +SERIAL_TIMEOUT = 0.1 # seconds + + +def connect_serial(port_pattern: str) -> serial.Serial: + """ + Connect to the first serial port matching the given pattern. + + Args: + port_pattern (str): Glob pattern to match serial ports. + + Returns: + serial.Serial: An open serial connection. + + Raises: + SystemExit: If unable to connect to the serial port. + """ + ports = glob.glob(port_pattern) + serial_port = ports[0] if ports else port_pattern + + try: + com = serial.Serial(serial_port, 115200, timeout=SERIAL_TIMEOUT) + print(f"Connected to {com.name}... {Fore.GREEN}✅{Style.RESET_ALL}") + return com + except serial.serialutil.SerialException as error: + print( + f"Error connecting to {serial_port}: {error}... {Fore.RED}❌{Style.RESET_ALL}" + ) + sys.exit(1) + + +def reset_buffer(com: serial.Serial): + """ + Reset the serial input and output buffers and send a break signal. + + Args: + com (serial.Serial): The serial connection to reset. + """ + print("Resetting COM buffer...") + try: + com.reset_input_buffer() + com.reset_output_buffer() + com.send_break(duration=1) + sleep(1) + print(f"Resetting COM buffer... {Fore.GREEN}✅{Style.RESET_ALL}") + except serial.SerialException as e: + print(f"Error resetting COM buffer: {e}... {Fore.RED}❌{Style.RESET_ALL}") + sys.exit(1) + + +def read_output_serial(com: serial.Serial) -> str: + """ + Read a line from the serial connection. + + Args: + com (serial.Serial): The serial connection to read from. + + Returns: + str: Decoded line from serial output. + """ + try: + data = com.readline().decode("utf-8").strip() + return data + except serial.SerialException as e: + print(f"Serial read error: {e}") + return "" + + +def wait_for_response(com: serial.Serial, response_timeout: float) -> Optional[str]: + """ + Wait for a response from the device within the specified timeout. + + Args: + com (serial.Serial): The serial connection to read from. + response_timeout (float): Timeout in seconds. + + Returns: + Optional[str]: The received data if any; otherwise, None. + """ + retries = int(response_timeout / 0.1) + for _ in range(retries): + sleep(0.1) + data = read_output_serial(com) + if data: + return data + return None + + +def wait_for_system_to_sleep(com: serial.Serial, duration: int) -> List[str]: + """ + Wait for the system to run for a specified duration, collecting relevant serial data. + + Args: + com (serial.Serial): The serial connection to read from. + duration (int): Duration in seconds to wait. + + Returns: + List[str]: Collected lines containing 'watchdog'. + """ + print("Waiting for system to run...") + data = [] + for second in range(duration): + if com.in_waiting > 0: + print(f"{Fore.GREEN}•{Style.RESET_ALL}", end="", flush=True) + lines = com.readlines() + for line in lines: + decoded_line = line.decode("utf-8", errors="replace").rstrip() + data.append(decoded_line) + else: + print("•", end="", flush=True) + + if (second + 1) % 60 == 0 and (second + 1) != duration: + print() + + sleep(1) + + print() + filtered_data = [line for line in data if "watchdog" in line][-10:] + print("\n".join(filtered_data)) + print( + f"Waiting for system to run for {duration} seconds... {Fore.GREEN}✅{Style.RESET_ALL}" + ) + return filtered_data diff --git a/tools/run_functional_tests.py b/tools/run_functional_tests.py index 4d488a9d7..b8c2a4f52 100755 --- a/tools/run_functional_tests.py +++ b/tools/run_functional_tests.py @@ -1,362 +1,372 @@ #!/usr/bin/env python3 +"""Run functional tests on the Leka device.""" +# Leka - LekaOS +# Copyright 2024 APF France handicap +# SPDX-License-Identifier: Apache-2.0 + +import argparse import datetime -import time -from colorama import Fore, Style -import os -import glob +import logging import re import sys -import argparse - -from time import sleep +from pathlib import Path +from typing import List +import colorama +from colorama import Fore, Style import serial -import serial.tools.list_ports - - -# -# MARK: - argparse -# - -TESTS_FUNCTIONAL_ROOT_DIRECTORY = "_build/LEKA_V1_2_DEV/tests/functional/tests/" +from modules.logger import configure_logging +from modules.serial_utils import connect_serial, reset_buffer, wait_for_response +from modules.flash_utils import ( + flash_os_and_reset, + erase_flash, +) +from modules.flash_utils import check_external_tools + +# Initialize Colorama +colorama.init(autoreset=True) + +# Constants +TESTS_FUNCTIONAL_ROOT_DIRECTORY = Path("_build/LEKA_V1_2_DEV/tests/functional/tests/") TESTS_BIN_EXTENSION = ".bin" -def valid_file(parser, arg): - path = os.path.join(TESTS_FUNCTIONAL_ROOT_DIRECTORY, arg) - if not os.path.exists(path): - parser.error("⚠️ The file %s does not exist!" % arg) - else: - base, ext = os.path.splitext(arg) - if not ext.endswith(TESTS_BIN_EXTENSION): - parser.error( - "⚠️ The filename %s must have a \'.bin\' extension !" % arg) - return path - - -parser = argparse.ArgumentParser(description='Run functional tests') - -parser.add_argument('-p', '--port', metavar='PORT', default='/dev/tty.usbmodem*', - help='serial port path used for the robot') -parser.add_argument('--response-timeout', metavar='RESPONSE_TIMEOUT', default=30.0, - help='response timeout is seconds') -parser.add_argument('--no-flash-erase', action='store_false', - help='disable flash erase') - -group = parser.add_mutually_exclusive_group(required=True) - -group.add_argument('-b', '--bin-files', metavar='BIN_FILES', nargs='+', type=lambda s: valid_file(parser, s), default=list(), - help='list binary executables') - -group.add_argument('--all', action='store_true', - help='select all binary executable') - - -args = parser.parse_args() +def valid_file(arg: str) -> Path: + """ + Validate that the provided file exists and has the correct extension. -# -# MARK: - Serial -# + Args: + arg (str): The filename to validate. -PORTS = glob.glob(args.port) -SERIAL_PORT = PORTS[0] if (len(PORTS) != 0) else args.port - -RESPONSE_TIMEOUT = args.response_timeout # in seconds -RESPONSE_RETRY_DELAY = 0.1 # in seconds -SERIAL_TIMEOUT = 0.1 # in seconds - -MAX_GET_LINE_RETRIES = RESPONSE_TIMEOUT / RESPONSE_RETRY_DELAY - -try: - com = serial.Serial(SERIAL_PORT, 115200, timeout=SERIAL_TIMEOUT) -except serial.serialutil.SerialException as error: - print(f"{error}") - parser.print_help() - sys.exit(1) - -print(f"Connected to {com.name}") + Returns: + Path: The validated file path. + Raises: + argparse.ArgumentTypeError: If the file does not exist or has an incorrect extension. + """ + path = TESTS_FUNCTIONAL_ROOT_DIRECTORY / arg + if not path.exists(): + raise argparse.ArgumentTypeError(f"⚠️ The file {path} does not exist!") + if path.suffix != TESTS_BIN_EXTENSION: + raise argparse.ArgumentTypeError( + f"⚠️ The filename {path} must have a '{TESTS_BIN_EXTENSION}' extension!" + ) + return path -def read_output_serial(): - return com.readline().decode("utf-8") +def list_bin_files() -> List[Path]: + """ + List all .bin files in the TESTS_FUNCTIONAL_ROOT_DIRECTORY recursively. -def wait_for_response(): - data = '' - no_response_counter = 0 + Returns: + List[Path]: List of binary file paths. + """ + return list(TESTS_FUNCTIONAL_ROOT_DIRECTORY.rglob(f"*{TESTS_BIN_EXTENSION}")) - while (no_response_counter <= MAX_GET_LINE_RETRIES): - sleep(RESPONSE_RETRY_DELAY) - data = read_output_serial() - if (data): - return data - no_response_counter += 1 - return None +def warning_print(message: str): + """ + Print a warning message in yellow. + Args: + message (str): The warning message to print. + """ + print(Fore.YELLOW + f"⚠️ Warning: {message}" + Style.RESET_ALL) -# -# MARK: - Functions -# -TESTS_FUNCTIONAL_BIN_FILES = list() +class Test: + """ + Represents a single test that can be flashed to the device and executed. + """ + + def __init__( + self, path: Path, serial_connection: serial.Serial, response_timeout: float + ): + """ + Initialize a test instance with the given binary path, serial connection, + and response timeout. + + Args: + path (Path): Path to the binary file. + serial_connection (serial.Serial): Serial connection to the device. + response_timeout (float): Timeout for device responses. + """ + self.path = path + self.serial = serial_connection + self.response_timeout = response_timeout + self.result_filepath = self.define_result_path() + self.failures: List[str] = [] + def define_result_path(self) -> Path: + """ + Define the path for the result file based on the current timestamp. -def list_bin_files(): - set = list() - for root, dirs, files in os.walk(TESTS_FUNCTIONAL_ROOT_DIRECTORY): - for filename in files: - if filename.endswith(TESTS_BIN_EXTENSION): - set.append( - os.path.join(root, filename)) + Returns: + Path: Path to the result file. + """ + timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H.%M.%S") + return self.path.parent / f"{self.path.stem}_{timestamp}.txt" - return set + def generate_result_file(self): + """ + Create an empty result file. + Raises: + SystemExit: If the file cannot be created. + """ + try: + self.result_filepath.touch(exist_ok=False) + logging.debug(f"Created result file at {self.result_filepath}") + except OSError as e: + logging.exception(f"Could not create or open file: {self.result_filepath}") + logging.error(f"Error: {e}") + sys.exit(1) -TESTS_FUNCTIONAL_BIN_FILES = list_bin_files() if args.all else args.bin_files + def edit_result_file(self, data: str): + """ + Append data to the result file. -FLASH_ERASE_FLAG = args.no_flash_erase + Args: + data (str): Data to append. + Raises: + SystemExit: If writing to the file fails. + """ + try: + with self.result_filepath.open("a") as file: + file.write(data + "\n") + logging.debug(f"Appended data to {self.result_filepath}") + except OSError as e: + logging.exception(f"Could not write to file: {self.result_filepath}") + logging.error(f"Error: {e}") + sys.exit(1) -def warningprint(*args, **kwargs): - print(Fore.YELLOW + "\n⚠️ Warning : " + - " ".join(map(str, args))+Style.RESET_ALL, **kwargs) + def flash(self) -> bool: + """ + Flash the binary to the device using OpenOCD and reset the device. + Returns: + bool: True if flashing and resetting succeed; False otherwise. + """ + return flash_os_and_reset(self.path) -# -# MARK: - Class Test -# + def run(self) -> bool: + """ + Run the test by flashing the binary and collecting test results. -class Test: + Returns: + bool: True if flashing succeeds; False otherwise. + """ + logging.info(f"Running test {self.path.name} ...") + self.generate_result_file() + if not self.flash(): + warning_print("Error flashing!") + return False + + while True: + data = wait_for_response(self.serial, self.response_timeout) + if data is not None: + if data == "<>": + logging.info(f"Running test {self.path.name} ... ✅️") + break + elif data != ".": + self.edit_result_file(data) + else: + logging.warning("No response received.") + break + return True - def __init__(self, path): - self.path = path + def check_status(self) -> bool: + """ + Check the test results to determine if all tests passed. - def generate_result_file(self): - def define_path(source_path): - base, ext = os.path.splitext(source_path) - timestamp = time.time() - date = str(datetime.datetime.fromtimestamp( - timestamp)) - date = date.replace(':', '.') - date = date.replace(' ', '_') - new_extension = ".txt" - target_path = base + "_" + date + new_extension - return target_path - - def create_file(path): - try: - file = open(path, "w") - except OSError as e: - print("Could not create or open file: " + path) - print("Error: " + e) - sys.exit(1) - file.close() - - self.result_filepath = define_path(self.path) - create_file(self.result_filepath) - - def edit_result_file(self, data): - result_filepath = self.result_filepath + Returns: + bool: True if all tests passed; False otherwise. + """ try: - with open(result_filepath, "a") as file: - file.write(data) - except FileNotFoundError as e: - print("The file: " + result_filepath + "doesn\'t exist") - print("Error: " + e) + with self.result_filepath.open("r") as file: + content = file.read() + if "All tests passed!" in content: + logging.info(f"All tests passed for {self.path.name}.") + return True + else: + failure_pattern = re.compile( + r".*\.cpp:\d+:.+: Failure|\[ FAILED \]" + ) + self.failures = failure_pattern.findall(content) + if self.failures: + logging.warning( + f"Failures in {self.path.name}: {self.failures}" + ) + return False + return False + except FileNotFoundError: + logging.error(f"The file: {self.result_filepath} doesn't exist") sys.exit(1) def print_result_file(self): - result_filepath = self.result_filepath + """ + Print the contents of the result file. + """ try: - with open(result_filepath, "r") as file: - data = file.read() - if (data): - print(data) - else: - warningprint("No data !") - except FileNotFoundError as e: - print("The file: " + result_filepath + "doesn\'t exist") - print("Error: " + e) - sys.exit(1) + with self.result_filepath.open("r") as file: + content = file.read() + print(content) + except OSError as e: + logging.error(f"Could not read file: {self.result_filepath}") + logging.error(f"Error: {e}") + + +def parse_arguments() -> argparse.Namespace: + """ + Parse command-line arguments. + + Returns: + argparse.Namespace: Parsed arguments. + """ + parser = argparse.ArgumentParser(description="Run functional tests") + + parser.add_argument( + "-p", + "--port", + metavar="PORT", + default="/dev/tty.usbmodem*", + help="Serial port path used for the robot connection", + ) + parser.add_argument( + "--response-timeout", + type=float, + default=30.0, + metavar="SECONDS", + help="Response timeout in seconds (default: 30.0)", + ) + parser.add_argument( + "--no-flash-erase", + action="store_false", + dest="flash_erase", + help="Disable flash erase", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Enable verbose (debug) logging", + ) + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "-b", + "--bin-files", + metavar="BIN_FILES", + nargs="+", + type=valid_file, + help="List of binary executables", + ) + group.add_argument( + "--all", action="store_true", help="Select all binary executables" + ) + parser.add_argument( + "--select-port", + action="store_true", + help="Interactively select the serial port if multiple are found", + ) + + return parser.parse_args() + + +def main() -> int: + """ + Main function to parse arguments, initialize tests, and run tests on the device. + + Returns: + int: Exit status code (0 for success, 1 for failure). + """ + args = parse_arguments() + configure_logging(args.verbose) + + # Check for required external tools + check_external_tools() + + # Determine which binary files to use + if args.all: + bin_files = list_bin_files() + else: + bin_files = args.bin_files - def flash(self): - print(f"Flashing {self.path}...") - CMD_FLASH = (f"openocd -f interface/stlink.cfg " - f"-c 'transport select hla_swd' " - f"-f target/stm32f7x.cfg " - f"-c 'program {self.path} 0x08000000' " - f"-c exit " - f">/dev/null 2>&1 ") - flash = os.system(CMD_FLASH) - - sleep(1) - - CMD_RESET = ("openocd -f interface/stlink.cfg " - "-c 'transport select hla_swd' " - "-f target/stm32f7x.cfg " - "-c init -c 'reset run' " - "-c exit " - f">/dev/null 2>&1 ") - reset = os.system(CMD_RESET) - return flash or reset - - def run(self): - self.generate_result_file() - ret = self.flash() + if not bin_files: + warning_print("No executable binaries found!") + sys.exit(1) - if ret: - warningprint("Error flashing !") - return ret + # Establish serial connection with optional selection + try: + com = connect_serial(args.port) + + # Flash erase if enabled + if args.flash_erase: + erase_flash() + + # Reset serial buffers + reset_buffer(com) + + logging.info("Running tests...") + run_tests = [] + for bin_file in bin_files: + test = Test(bin_file, com, args.response_timeout) + if test.run(): + run_tests.append(test) + + # Flash erase after tests if enabled + if args.flash_erase: + erase_flash() + + # Reset serial buffers again after flashing + reset_buffer(com) + + # Print summary + fails = [] + print("\nResults files:") + for test in run_tests: + print(test.result_filepath) + if not test.check_status(): + fails.append(test) + + print("\n TESTS") + for test in run_tests: + status = ( + f"{Fore.RED}❌{Style.RESET_ALL}" + if test in fails + else f"{Fore.GREEN}✅{Style.RESET_ALL}" + ) + print(f"{status} {test.path.name}") + + print("\n") + for test in fails: + print(Fore.YELLOW + f"{test.result_filepath}" + Style.RESET_ALL) + test.print_result_file() + + if fails: + print( + Fore.RED + + f" ❌ {len(fails)} out of {len(run_tests)} suites have failed..." + + Style.RESET_ALL + ) + return 1 else: - while True: - data = wait_for_response() - if data is not None: - if data.strip() == "<>": - return ret - elif data.strip() != ".": - self.edit_result_file(data) - else: - return ret - - def check_status(self): - - def all_tests_passed(file): - ploop = (".*All tests passed!") - pattern = re.compile(ploop) - ret = False - for line in file: - match = pattern.search(line) - if match is not None: - ret = True - break - return ret - - def failure_lines(file): - ploop = (".*\\.cpp:[0-9].+: Failure|\\[ FAILED \\]") - pattern = re.compile(ploop) - for line in file: - match = pattern.search(line) - if match is not None: - yield line.strip() - - ret = 0 - result_filepath = self.result_filepath - - try: - with open(result_filepath, "r") as file: - self.failures = list() - for line in failure_lines(file): - self.failures.append(line) - file.seek(0, 0) - if len(self.failures) or not all_tests_passed(file): - print("Failures : " + str(self.failures)) - print("All test passed : "+str(all_tests_passed(file))) - ret = 1 - - except FileNotFoundError as e: - print("The file: " + result_filepath + "doesn\'t exist") - print("Error: " + e) - sys.exit(1) - - return ret - - -def print_summary(): - - if not RUN_TESTS: - warningprint("No available set !") + print( + Fore.GREEN + + f" ✅ All {len(run_tests)} suites have passed!" + + Style.RESET_ALL + ) + return 0 + + except serial.SerialException as e: + logging.error(f"Could not open serial port {args.port}: {e}") sys.exit(1) - - FAILS = list() - print("\n") - print("Results files :") - for test in RUN_TESTS: - print(test.result_filepath) - fail = test.check_status() - if fail: - FAILS.append(test) - - print("\n") - print("{}{}".format(' ', 'TESTS')) - for test in RUN_TESTS: - path = test.path - status = " ❌ " if test in FAILS else " ✅ " - print("{}{}".format(status, path)) - - print("\n") - for test in FAILS: - print(Fore.YELLOW + test.result_filepath + Style.RESET_ALL) - test.print_result_file() - - if (FAILS): - print(Fore.RED + " ❌ %d in %d suites have failed..." % (len(FAILS), len(RUN_TESTS)) + - Style.RESET_ALL) - else: - print(Fore.GREEN + " ✅ All the %d suites have passed !" % len(RUN_TESTS) + - Style.RESET_ALL) - - ret = len(FAILS) - return ret - - -# -# MARK: - Main script -# - -RUN_TESTS = list() - - -def flash_erase(): - ret = os.system("st-flash --connect-under-reset --reset erase") - return ret - - -def reset_buffer(): - BREAK_DELAY = 1 - com.reset_input_buffer() - com.reset_output_buffer() - com.send_break(BREAK_DELAY) - sleep(BREAK_DELAY) - - -def main(): - ret = 0 - - print("Hello, World!") - - if not TESTS_FUNCTIONAL_BIN_FILES: - warningprint("No exec !") + except Exception as e: + logging.error(f"An unexpected error occurred: {e}") sys.exit(1) - if FLASH_ERASE_FLAG: - flash_erase() - - reset_buffer() - - print("Running tests...") - for filepath in TESTS_FUNCTIONAL_BIN_FILES: - test = Test(filepath) - error = test.run() - if not error: - RUN_TESTS.append(test) - - if FLASH_ERASE_FLAG: - flash_erase() - - reset_buffer() - - fails = print_summary() - if fails: - ret = 1 - - print("Erasing flash after tests...") - flash_erase() - - return ret - -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) diff --git a/tools/run_system_tests.py b/tools/run_system_tests.py index 75d94e8bc..e02af5a4a 100755 --- a/tools/run_system_tests.py +++ b/tools/run_system_tests.py @@ -1,222 +1,122 @@ #!/usr/bin/env python3 +"""Run sleep functionality tests on the Leka device.""" + # Leka - LekaOS # Copyright 2024 APF France handicap # SPDX-License-Identifier: Apache-2.0 -from colorama import Fore, Style -from time import sleep import argparse -import glob -import re -import serial -import serial.tools.list_ports -import subprocess import sys - - -# -# MARK: - argparse -# +import re +from modules.logger import configure_logging +from modules.serial_utils import connect_serial, reset_buffer, wait_for_system_to_sleep +from modules.flash_utils import ( + flash_os, + erase_flash, + print_end_failure, + print_end_success, +) +from colorama import Fore, Style OS_BIN_FILE_PATH = "_build/LEKA_V1_2_DEV/app/os/LekaOS.bin" -parser = argparse.ArgumentParser(description='Run functional tests') - -parser.add_argument('-p', '--port', metavar='PORT', - default='/dev/tty.usbmodem*', - help='serial port path used for the robot') - -parser.add_argument('--response-timeout', metavar='RESPONSE_TIMEOUT', - default=30.0, - help='response timeout is seconds') - -parser.add_argument('--no-flash-erase', action='store_false', - help='disable flash erase') - -parser.add_argument('-d', '--duration', metavar='DURATION', - default=18000, - help='duration in seconds to wait for the system to sleep') - -parser.add_argument('-s', '--deep-sleep-percentage', metavar='DEEP_SLEEP_PERCENTAGE', - default=95, - help='deep sleep percentage') - -args = parser.parse_args() - - -# -# MARK: - Serial -# - -print("Hello, System Tests! 🚀") - - -PORTS = glob.glob(args.port) -SERIAL_PORT = PORTS[0] if (len(PORTS) != 0) else args.port - -RESPONSE_TIMEOUT = args.response_timeout # in seconds -RESPONSE_RETRY_DELAY = 0.1 # in seconds -SERIAL_TIMEOUT = 0.1 # in seconds - -MAX_GET_LINE_RETRIES = RESPONSE_TIMEOUT / RESPONSE_RETRY_DELAY - - -def connect_serial(): - try: - global com - com = serial.Serial(SERIAL_PORT, 115200, timeout=SERIAL_TIMEOUT) - print_start(f"Connecting to {com.name}") - except serial.serialutil.SerialException as error: - print_end_failure(f"Connecting to {com.name}") - print(f"{error}") - parser.print_help() - sys.exit(1) - - print_end_success(f"Connecting to {com.name}") - - -def read_output_serial(): - return com.readline().decode("utf-8") - -def wait_for_response(): - data = '' - no_response_counter = 0 - - while (no_response_counter <= MAX_GET_LINE_RETRIES): - sleep(RESPONSE_RETRY_DELAY) - data = read_output_serial() - if (data): - return data - no_response_counter += 1 - - return None - - -# -# MARK: - Functions -# - -def print_start(message): - print(Fore.CYAN + f"\n{message}..." + Style.RESET_ALL) - - -def print_end_success(message): - print(Fore.CYAN + f"{message}... ✅" + Style.RESET_ALL) - - -def print_end_failure(message): - print(Fore.RED + f"{message}... ❌" + Style.RESET_ALL) - - -def flash_os(): - print_start(f"Flashing {OS_BIN_FILE_PATH}") - CMD_FLASH = (f"openocd -f interface/stlink.cfg " - f"-c 'transport select hla_swd' " - f"-f target/stm32f7x.cfg " - f"-c 'program {OS_BIN_FILE_PATH} 0x08000000' " - f"-c exit ") - print(CMD_FLASH) - flash = subprocess.run(CMD_FLASH, shell=True) - if not flash: - print_end_failure(f"Flashing {OS_BIN_FILE_PATH}") - sys.exit(1) - else: - print_end_success(f"Flashing {OS_BIN_FILE_PATH}") - - sleep(1) - - print_start("Reseting robot") - CMD_RESET = ("openocd -f interface/stlink.cfg " - "-c 'transport select hla_swd' " - "-f target/stm32f7x.cfg " - "-c init -c 'reset run' " - "-c exit ") - print(CMD_FLASH) - reset = subprocess.run(CMD_RESET, shell=True) - if not reset: - print_end_failure("Reseting robot") - sys.exit(1) - else: - print_end_success("Reseting robot") - - sleep(1) - - -# -# MARK: - Main script -# - - -def erase_flash(): - print_start("Erasing flash") - ret = subprocess.run("st-flash --connect-under-reset --reset erase", shell=True) - if not ret: - print_end_failure("Erasing flash") - sys.exit(1) - else: - print_end_success("Erasing flash") - - -def reset_buffer(): - print_start("Resetting com buffer") - BREAK_DELAY = 1 - com.reset_input_buffer() - com.reset_output_buffer() - com.send_break(BREAK_DELAY) - sleep(BREAK_DELAY) - print_end_success("Resetting com buffer") - - -def wait_for_system_to_sleep(duration=180): - print_start(f"Waiting for LekaOS to run for {duration} seconds") - data = [] - for second in range(duration): - if com.in_waiting > 0: - print(Fore.GREEN + "•" + Style.RESET_ALL, end='', flush=True) - lines = com.readlines() - for line in lines: - line = str(line, 'utf-8', errors='replace').rstrip() - data.append(line) - else: - print("•", end='', flush=True) - - if (second + 1) % 60 == 0 and (second + 1) != duration: - print() - - sleep(1) - - print() - data = list(filter(lambda string: 'watchdog' in string, data))[-10:] - print("\n".join(data)) - print_end_success(f"Waiting for LekaOS to run for {duration} seconds") - return data - - -def calculate_sleep_deep_statistics(lines): - def parse_line(line): - pattern = re.compile(r'slp:\s*(\d+)%.*?dsl:\s*(\d+)%') +def parse_arguments() -> argparse.Namespace: + """ + Parse command-line arguments. + + Returns: + argparse.Namespace: Parsed arguments. + """ + parser = argparse.ArgumentParser(description="Run sleep functionality tests") + + parser.add_argument( + "-p", + "--port", + metavar="PORT", + default="/dev/tty.usbmodem*", + help="Serial port path used for the robot connection", + ) + parser.add_argument( + "--response-timeout", + type=float, + default=30.0, + metavar="RESPONSE_TIMEOUT", + help="Response timeout in seconds (default: 30.0)", + ) + parser.add_argument( + "--no-flash-erase", + action="store_false", + dest="flash_erase", + help="Disable flash erase", + ) + parser.add_argument( + "-d", + "--duration", + type=int, + default=18000, + metavar="DURATION", + help="Duration in seconds to wait for the system to sleep (default: 18000)", + ) + parser.add_argument( + "-s", + "--deep-sleep-percentage", + type=int, + default=95, + metavar="DEEP_SLEEP_PERCENTAGE", + help="Deep sleep percentage (default: 95)", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Enable verbose (debug) logging", + ) + + return parser.parse_args() + + +def calculate_sleep_deep_statistics(lines: list) -> tuple: + """ + Calculate average sleep and deep sleep percentages from collected data. + + Args: + lines (list): List of strings containing sleep data. + + Returns: + tuple: Average sleep percentage and average deep sleep percentage. + """ + + def parse_line(line: str) -> tuple: + """ + Parse a line to extract sleep and deep sleep percentages. + + Args: + line (str): A line of text containing sleep data. + + Returns: + tuple: Sleep percentage and deep sleep percentage if matched; otherwise, (None, None). + """ + pattern = re.compile(r"slp:\s*(\d+)%.*?dsl:\s*(\d+)%") match = pattern.search(line) if match: return map(int, match.groups()) return None, None - print_start("Analyzing sleep data") - + print("Analyzing sleep data...") if not lines: - print("No sleep data found") - print_end_failure("Analyzing sleep data") + print_end_failure("No sleep data found") sys.exit(1) sum_sleep, sum_deep_sleep, count = 0, 0, 0 for line in lines: - sleep, deep_sleep = parse_line(line) - if sleep is not None and deep_sleep is not None: - sum_sleep += sleep - sum_deep_sleep += deep_sleep + sleep_val, deep_sleep_val = parse_line(line) + if sleep_val is not None and deep_sleep_val is not None: + sum_sleep += sleep_val + sum_deep_sleep += deep_sleep_val count += 1 if count == 0: @@ -226,48 +126,61 @@ def parse_line(line): print_end_success("Analyzing sleep data") return sum_sleep / count, sum_deep_sleep / count -# -# MARK: - Main -# +def main() -> int: + """ + Main function to parse arguments, initialize tests, and run sleep tests on the device. -FLASH_ERASE_FLAG = args.no_flash_erase -SLEEP_DURATION = int(args.duration) -DEEP_SLEEP_PERCENTAGE = int(args.deep_sleep_percentage) + Returns: + int: Exit status code (0 for success, 1 for failure). + """ + args = parse_arguments() + configure_logging(args.verbose) + # Connect to serial port + com = connect_serial(args.port) -def main(): - ret = 0 - connect_serial() - - if FLASH_ERASE_FLAG: + # Flash erase if enabled + if args.flash_erase: erase_flash() - flash_os() - - reset_buffer() + # Flash the OS binary + flash_os(OS_BIN_FILE_PATH) - data = wait_for_system_to_sleep(SLEEP_DURATION) + # Reset serial buffers + reset_buffer(com) - sleep, deep_sleep = calculate_sleep_deep_statistics(data) + # Wait for system to sleep and collect data + data = wait_for_system_to_sleep(com, args.duration) - print() + # Calculate sleep statistics + sleep_avg, deep_sleep_avg = calculate_sleep_deep_statistics(data) - print(Fore.CYAN + f"Average sleep: {sleep}%" + Style.RESET_ALL) - print(Fore.CYAN + f"Average deep sleep: {deep_sleep}%" + Style.RESET_ALL) + # Display results + print(f"\n{Fore.CYAN}Average sleep: {sleep_avg}%{Style.RESET_ALL}") + print(f"{Fore.CYAN}Average deep sleep: {deep_sleep_avg}%{Style.RESET_ALL}\n") - if deep_sleep >= DEEP_SLEEP_PERCENTAGE: - print(Fore.GREEN + f"Deep sleep is higher than {DEEP_SLEEP_PERCENTAGE}%, this is good! ✅" + Style.RESET_ALL) + # Evaluate deep sleep percentage + if deep_sleep_avg >= args.deep_sleep_percentage: + print( + f"{Fore.GREEN}Deep sleep is higher than {args.deep_sleep_percentage}%, \ + this is good! ✅{Style.RESET_ALL}" + ) ret = 0 else: - print(Fore.RED + f"Deep sleep is lower than {DEEP_SLEEP_PERCENTAGE}%, this is bad! ❌" + Style.RESET_ALL) + print( + f"{Fore.RED}Deep sleep is lower than {args.deep_sleep_percentage}%, \ + this is bad! ❌{Style.RESET_ALL}" + ) ret = 1 - print("Erasing flash after tests...") - erase_flash() + # Flash erase after tests if enabled + if args.flash_erase: + print("Erasing flash after tests...") + erase_flash() return ret -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main())