From 983aafa76ffde479186c6c11a12306a3b982939b Mon Sep 17 00:00:00 2001 From: Scott K Logan Date: Thu, 18 Jun 2026 16:52:00 -0500 Subject: [PATCH] Add a whole bunch of tests to improve coverage I'm planning some large-scale rewrites in this package and I'd like a strong foundation of tests to have confidence that the rewrites aren't regression existing behavior. Assisted-by: Gemini 3.5 Flash --- test/spell_check.words | 4 + test/test_event_executor.py | 9 + test/test_event_handler_parallel_status.py | 35 ++++ test/test_executor_parallel.py | 191 ++++++++++++++++++++- 4 files changed, 237 insertions(+), 2 deletions(-) create mode 100644 test/test_event_executor.py create mode 100644 test/test_event_handler_parallel_status.py diff --git a/test/spell_check.words b/test/spell_check.words index dced634..e1048c6 100644 --- a/test/spell_check.words +++ b/test/spell_check.words @@ -1,5 +1,7 @@ apache +argparse asyncio +capsys colcon contextlib coroutine @@ -16,9 +18,11 @@ pathlib plugin pydocstyle pytest +readouterr returncode scspell setuptools sigint thomas traceback +unittest diff --git a/test/test_event_executor.py b/test/test_event_executor.py new file mode 100644 index 0000000..dfc84ff --- /dev/null +++ b/test/test_event_executor.py @@ -0,0 +1,9 @@ +# Copyright 2026 Open Source Robotics Foundation, Inc. +# Licensed under the Apache License, Version 2.0 + +from colcon_parallel_executor.event.executor import ParallelStatus + + +def test_parallel_status(): + status = ParallelStatus(['job1', 'job2']) + assert status.processing == ('job1', 'job2') diff --git a/test/test_event_handler_parallel_status.py b/test/test_event_handler_parallel_status.py new file mode 100644 index 0000000..42fb19f --- /dev/null +++ b/test/test_event_handler_parallel_status.py @@ -0,0 +1,35 @@ +# Copyright 2026 Open Source Robotics Foundation, Inc. +# Licensed under the Apache License, Version 2.0 + +from unittest.mock import patch + +from colcon_parallel_executor.event.executor import ParallelStatus +from colcon_parallel_executor.event_handler.parallel_status \ + import ParallelStatusEventHandler +import pytest + + +@pytest.mark.parametrize(('isatty'), ( + (True,), + (False,), +)) +def test_parallel_status_event_handler(capsys, isatty): + with patch('sys.stdout.isatty', return_value=isatty): + handler = ParallelStatusEventHandler() + assert handler.enabled == isatty + + event = (ParallelStatus(['job2', 'job1']), ) + handler(event) + + if not isatty: + return + + captured = capsys.readouterr() + assert 'Processing' in captured.out + assert 'job1' in captured.out + assert 'job2' in captured.out + + # Test with non-matching event + handler(('string event',)) + captured = capsys.readouterr() + assert captured.out == '' diff --git a/test/test_executor_parallel.py b/test/test_executor_parallel.py index 6680767..47093ae 100644 --- a/test/test_executor_parallel.py +++ b/test/test_executor_parallel.py @@ -1,6 +1,7 @@ # Copyright 2016-2018 Dirk Thomas # Licensed under the Apache License, Version 2.0 +import argparse import asyncio from collections import OrderedDict import os @@ -9,10 +10,13 @@ from threading import Thread import time from types import SimpleNamespace +from unittest.mock import Mock +from unittest.mock import patch from colcon_core.executor import Job from colcon_core.executor import OnError from colcon_core.subprocess import SIGINT_RESULT +from colcon_parallel_executor.executor.parallel import counting_number from colcon_parallel_executor.executor.parallel \ import ParallelExecutorExtension import pytest @@ -22,9 +26,9 @@ class Job1(Job): - def __init__(self): + def __init__(self, identifier='job1'): super().__init__( - identifier='job1', dependencies=set(), task=None, + identifier=identifier, dependencies=set(), task=None, task_context=None) async def __call__(self, *args, **kwargs): @@ -213,3 +217,186 @@ def delayed_sigint(): assert rc == signal.SIGINT ran_jobs.clear() + + +def test_counting_number(): + assert counting_number('0') == 0 + assert counting_number(1) == 1 + with pytest.raises(ValueError): + counting_number('-1') + + +def test_add_arguments(): + parser = argparse.ArgumentParser() + extension = ParallelExecutorExtension() + extension.add_arguments(parser=parser) + args = parser.parse_args(['--parallel-workers', '5']) + assert args.parallel_workers == 5 + + +def test_parallel_run_until_complete_exception(): + extension = ParallelExecutorExtension() + args = SimpleNamespace(parallel_workers=2) + jobs = OrderedDict() + jobs['one'] = Job1() + + def mock_run(future): + future.get_coro().close() + raise RuntimeError('mock error') + + with patch( + 'asyncio.base_events.BaseEventLoop.run_until_complete', + side_effect=mock_run + ): + rc = extension.execute(args, jobs) + assert rc == 1 + + +def test_parallel_timeout(): + extension = ParallelExecutorExtension() + extension.put_event_into_queue = Mock() + + args = SimpleNamespace(parallel_workers=2) + jobs = OrderedDict() + jobs['eight'] = Job8() # sleep 3 + + original_wait = asyncio.wait + + def mock_wait(*args, **kwargs): + kwargs['timeout'] = 0.01 + return original_wait(*args, **kwargs) + + with patch( + 'colcon_parallel_executor.executor.parallel.asyncio.wait', + new=mock_wait + ): + rc = extension.execute(args, jobs) + + assert rc == 0 + assert extension.put_event_into_queue.called + ran_jobs.clear() + + +class Job9(Job): + + def __init__(self): + super().__init__( + identifier='job9', dependencies=set(), task=None, + task_context=None) + + async def __call__(self, *args, **kwargs): + raise KeyboardInterrupt() + + +def test_parallel_job_keyboard_interrupt(): + extension = ParallelExecutorExtension() + args = SimpleNamespace(parallel_workers=2) + jobs = OrderedDict() + jobs['nine'] = Job9() + rc = extension.execute(args, jobs) + assert rc == signal.SIGINT + + +class Job10(Job): + + def __init__(self): + super().__init__( + identifier='job10', dependencies=set(), task=None, + task_context=None) + + async def __call__(self, *args, **kwargs): + return SIGINT_RESULT + + +def test_parallel_job_sigint_result(): + extension = ParallelExecutorExtension() + args = SimpleNamespace(parallel_workers=2) + jobs = OrderedDict() + jobs['ten'] = Job10() + rc = extension.execute(args, jobs) + assert rc == signal.SIGINT + + +def test_parallel_future_keyboard_interrupt(): + extension = ParallelExecutorExtension() + args = SimpleNamespace(parallel_workers=2) + jobs = OrderedDict() + jobs['eleven'] = Job1() + + original_wait = asyncio.wait + + async def mock_wait(*args, **kwargs): + done, pending = await original_wait(*args, **kwargs) + for f in done: + f.exception = Mock(return_value=KeyboardInterrupt()) + return done, pending + + with patch( + 'colcon_parallel_executor.executor.parallel.asyncio.wait', + new=mock_wait + ): + rc = extension.execute(args, jobs) + + assert rc == signal.SIGINT + + +def test_parallel_future_cancelled(): + extension = ParallelExecutorExtension() + args = SimpleNamespace(parallel_workers=2) + jobs = OrderedDict() + jobs['twelve'] = Job1() + + original_wait = asyncio.wait + + async def mock_wait(*args, **kwargs): + done, pending = await original_wait(*args, **kwargs) + for f in done: + f.cancelled = Mock(return_value=True) + return done, pending + + with patch( + 'colcon_parallel_executor.executor.parallel.asyncio.wait', + new=mock_wait + ): + rc = extension.execute(args, jobs) + + assert rc == signal.SIGINT + + +def test_priority(): + extension = ParallelExecutorExtension() + assert extension.PRIORITY > 100 + + +class NonCoroutineJob(Job): + + def __init__(self): + super().__init__( + identifier='non_coro', dependencies=set(), task=None, + task_context=None) + + def __call__(self, *args, **kwargs): + pass + + +def test_job_not_coroutine(): + extension = ParallelExecutorExtension() + args = SimpleNamespace(parallel_workers=2) + jobs = OrderedDict() + jobs['one'] = NonCoroutineJob() + + rc = extension.execute(args, jobs) + assert rc == 1 + + +def test_parallel_workers_zero(): + extension = ParallelExecutorExtension() + args = SimpleNamespace(parallel_workers=0) + jobs = OrderedDict() + jobs['one'] = Job1('job1') + jobs['two'] = Job1('job2') + + rc = extension.execute(args, jobs) + assert rc == 0 + assert set(ran_jobs) == {'job1', 'job2'} + ran_jobs.clear()