Skip to content

Coalesce function unsubscribes only after receiving 3 samples#1374

Open
simonvoelcker wants to merge 3 commits intofrequenz-floss:v1.x.xfrom
simonvoelcker:coalesce_cautious_unsubscribe
Open

Coalesce function unsubscribes only after receiving 3 samples#1374
simonvoelcker wants to merge 3 commits intofrequenz-floss:v1.x.xfrom
simonvoelcker:coalesce_cautious_unsubscribe

Conversation

@simonvoelcker
Copy link
Contributor

@simonvoelcker simonvoelcker commented Mar 13, 2026

Addresses #1332.

Background

The Coalesce function subscribes and unsubscribes from components based on data availability. If it receives None-values, it will subscribe to the next component, if any, from its parameter list. Once the other components start sending data again, we may unsubscribe from "later" components. We currently unsubscribe from all components "after" a given one once that given one sends us one non-None sample. This is undesired because we may receive intermittent None values for various reasons and would then unsubscribe and resubscribe excessively.

Change

The solution is to count non-None samples from earlier streams and unsubscribe only once a threshold (3) is reached.

Discussion

If there are only two components (main, backup), it is quite straightforward. With more components, although probably rarely seen in practice, one could come up with a more sophisticated solution that tracks each components samples and marks them as stable when 3 samples are reached. Happy to discuss, but I think it's overkill.

@github-actions github-actions bot added part:tests Affects the unit, integration and performance (benchmarks) tests part:data-pipeline Affects the data pipeline labels Mar 13, 2026
@simonvoelcker simonvoelcker force-pushed the coalesce_cautious_unsubscribe branch 2 times, most recently from dae4a2a to c69f4a4 Compare March 13, 2026 14:24
@github-actions github-actions bot added the part:docs Affects the documentation label Mar 13, 2026
@simonvoelcker simonvoelcker force-pushed the coalesce_cautious_unsubscribe branch 2 times, most recently from 89eb72f to 6a483f0 Compare March 13, 2026 14:32
@simonvoelcker simonvoelcker marked this pull request as ready for review March 13, 2026 14:52
@simonvoelcker simonvoelcker requested a review from a team as a code owner March 13, 2026 14:52
@simonvoelcker simonvoelcker requested review from shsms and removed request for a team March 13, 2026 14:52
@simonvoelcker simonvoelcker self-assigned this Mar 16, 2026
@simonvoelcker simonvoelcker enabled auto-merge March 17, 2026 10:00
Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM in general, all comments are minor enough to be optional for me, but I would still leave the final approval to @shsms as I'm not very familiar with formulas code.

"""
self.num_samples += 1

if self.num_samples >= 3:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this number a constant, like MINIMUM_STABLE_SAMPLES (there is probably a better name).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not extract because this is the kind of very specific thing for which there is no obvious name, and adding one for the sake of it often makes it more confusing. If you have a good name and think that it is helpful to use that, please share.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worth extracting even if the name is not great to give it more visibility, as magic numbers buried deep in the code might be easy to miss otherwise. I would ask AI for a name to see if they can do better than us...

)


class TestCoalesceFunction:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you split out some tests into their own file, I wonder what's the relationship between formulas and functions. Wouldn't it make also sense to separate test for coalesce function to its own file, or make these tests part of the existing TestFormulas? Sorry if the question is silly, I'm not familiar with this code (so I didn't follow all the test logic) to assess it.

Copy link
Contributor Author

@simonvoelcker simonvoelcker Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split out the validation tests because (a) the file went over the 1000 line limit and (b) the validation tests were the most obvious candidate to move to their own file. What is left is indeed a bit of a mix of tests for formula evaluation (includes functions used in formulas) and tests for the subscribe/unsubscribe behavior of the coalesce function specifically. If would probably make sense to have test_formula_evaluation.py for the former, but then, what is the other one? test_formulas.py is the lowest common abstraction, IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep the name for general tests without a better grouping. For example

I also just noticed these tests are under a _formulas directory, so we can also either keep test_formula.py or use something like test_general.py, and I would remove the prefix from other files, like _formulas/test_evaluation.py, _formulas/test_validation.py, again, to keep the resulting test names as short as possible (and not redundant).

values: list[float | None]
expected_subscriptions: list[bool]

async def run_test( # pylint: disable=too-many-locals
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you tried to keep the current file style, so I'm OK if you prefer to keep it as it is, but I'd recommend using @pytest.mark.parametrize() for new tests.

Example conversion done by AI (with some extra test case wrapping)
class CoalesceSample(NamedTuple):
    """Helper class to represent expected behavior of coalesce function."""

    values: list[float | None]
    expected_subscriptions: list[bool]


class CoalesceFunctionTestCase(NamedTuple):
    """A full coalesce function test case."""

    name: str
    formula_str: str
    samples: list[CoalesceSample]


class TestCoalesceFunction:
    """Test coalesce function subscribe/unsubscribe behavior."""

    @pytest.mark.parametrize(
        "test_case",
        [
            CoalesceFunctionTestCase(
                name="subscribe_when_none_encountered",
                formula_str="COALESCE(#0, #1, #2, 0.0)",
                samples=[
                    CoalesceSample(
                        values=[10.0, None, None],
                        expected_subscriptions=[True, False, False],
                    ),
                    # No need to subscribe unless stream #1 gives None
                    CoalesceSample(
                        values=[10.0, 12.0, 15.0],
                        expected_subscriptions=[True, False, False],
                    ),
                    # If None is encountered, one subscription is added per sample
                    CoalesceSample(
                        values=[None, None, 15.0],
                        expected_subscriptions=[True, True, False],
                    ),
                    CoalesceSample(
                        values=[None, None, 15.0],
                        expected_subscriptions=[True, True, True],
                    ),
                ],
            ),
            CoalesceFunctionTestCase(
                name="unsubscribe_only_after_3_samples",
                formula_str="COALESCE(#0, #1, #2, 0.0)",
                samples=[
                    # First subscription is added before the first sample.
                    # Every sample can add one subscription.
                    CoalesceSample(
                        values=[None, None, 15.0],
                        expected_subscriptions=[True, True, False],
                    ),
                    CoalesceSample(
                        values=[None, None, 15.0],
                        expected_subscriptions=[True, True, True],
                    ),
                    # After 3 samples, the last subscription is dropped.
                    CoalesceSample(
                        values=[None, 12.0, 15.0],
                        expected_subscriptions=[True, True, True],
                    ),
                    CoalesceSample(
                        values=[10.0, None, 15.0],
                        expected_subscriptions=[True, True, True],
                    ),
                    CoalesceSample(
                        values=[None, 12.0, 15.0],
                        expected_subscriptions=[True, True, False],
                    ),
                ],
            ),
        ],
        ids=lambda tc: tc.name,
    )
    async def test_coalesce_subscriptions(
        self,
        test_case: CoalesceFunctionTestCase,
    ) -> None:
        """Test coalesce subscribe/unsubscribe behavior."""
        # Component IDs are 0, 1, 2 for convenience.
        channels: list[Broadcast[Sample[Quantity]]] = [
            Broadcast(name=str(num)) for num in range(3)
        ]
        senders = [channel.new_sender() for channel in channels]
        receivers: list[Receiver[Sample[Quantity]] | None] = [None, None, None]

        def new_receiver(component_id: ComponentId) -> Receiver[Sample[Quantity]]:
            """Create a new receiver, overwriting any existing one.

            When Coalesce unsubscribes, it closes its receiver.
            """
            comp_id = int(component_id)
            receiver = channels[comp_id].new_receiver()
            receivers[comp_id] = receiver
            return receiver

        telem_fetcher = MagicMock(spec=ResampledStreamFetcher)
        telem_fetcher.fetch_stream = AsyncMock(side_effect=new_receiver)
        formula = parse(
            name="f2",
            formula=test_case.formula_str,
            create_method=Quantity,
            telemetry_fetcher=telem_fetcher,
        )

        result_chan = formula.new_receiver()
        await asyncio.sleep(0.1)
        now = datetime.now()

        async def send_sample(values: list[float | None]) -> None:
            nonlocal now
            now += timedelta(seconds=1)
            _ = await asyncio.gather(
                *[
                    senders[comp_id].send(
                        Sample(
                            now,
                            None if value is None else Quantity(value),
                        )
                    )
                    for comp_id, value in enumerate(values)
                ]
            )
            _ = await result_chan.receive()

        for index, sample in enumerate(test_case.samples):
            await send_sample(sample.values)
            active_subscriptions = [
                receiver is not None and not getattr(receiver, "_closed", True)
                for receiver in receivers
            ]
            assert active_subscriptions == sample.expected_subscriptions, (
                f"{test_case.name}, step {index}: values={sample.values}"
            )

        await formula.stop()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a big fan of parametrize, but I would not use it in this case, personally. It is possible only because the setup happens to be identical for subscribe and unsubscribe, but if that ever changes we're better off with two test cases. I also find it difficult to debug non-trivial parametrized tests (which this one would be).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you are sharing the code now, si if it changes you'll need to rewrite the run_test too. It is just a change in form.

But as I said, I'm good with keeping it as it is for the other reasons too.

What makes me curious now is what did you find hard to debug when using parametrization, as I never felt parametrization interfered with debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I briefly forgot that it is possible to run the individual test cases in a parametrized test - without that ability, breakpoints trigger for each case before the one of interest is even reached. So yes, it is not difficult per se, but it adds a layer of indirection.

@simonvoelcker simonvoelcker force-pushed the coalesce_cautious_unsubscribe branch from 6a483f0 to 3c5bb91 Compare March 17, 2026 12:48
@simonvoelcker simonvoelcker added the cmd:skip-release-notes It is not necessary to update release notes for this PR label Mar 17, 2026
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
The statements removed here would remove component data subscriptions for components that come after a constant in Coalesce's parameter list. However, such subscriptions are never created. We only add subscriptions, one at a time, until data is available, which is always the case when we reach a constant in the parameter list.

Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
@simonvoelcker simonvoelcker force-pushed the coalesce_cautious_unsubscribe branch from 3c5bb91 to 769be61 Compare March 17, 2026 13:03
@shsms
Copy link
Contributor

shsms commented Mar 17, 2026

Sorry, I'll take a look in the morning.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cmd:skip-release-notes It is not necessary to update release notes for this PR part:data-pipeline Affects the data pipeline part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests

Projects

Status: To do

Development

Successfully merging this pull request may close these issues.

3 participants