Coalesce function unsubscribes only after receiving 3 samples#1374
Coalesce function unsubscribes only after receiving 3 samples#1374simonvoelcker wants to merge 3 commits intofrequenz-floss:v1.x.xfrom
Conversation
dae4a2a to
c69f4a4
Compare
89eb72f to
6a483f0
Compare
| """ | ||
| self.num_samples += 1 | ||
|
|
||
| if self.num_samples >= 3: |
There was a problem hiding this comment.
I would make this number a constant, like MINIMUM_STABLE_SAMPLES (there is probably a better name).
There was a problem hiding this comment.
I did not extract because this is the kind of very specific thing for which there is no obvious name, and adding one for the sake of it often makes it more confusing. If you have a good name and think that it is helpful to use that, please share.
There was a problem hiding this comment.
I think it is worth extracting even if the name is not great to give it more visibility, as magic numbers buried deep in the code might be easy to miss otherwise. I would ask AI for a name to see if they can do better than us...
| ) | ||
|
|
||
|
|
||
| class TestCoalesceFunction: |
There was a problem hiding this comment.
Since you split out some tests into their own file, I wonder what's the relationship between formulas and functions. Wouldn't it make also sense to separate test for coalesce function to its own file, or make these tests part of the existing TestFormulas? Sorry if the question is silly, I'm not familiar with this code (so I didn't follow all the test logic) to assess it.
There was a problem hiding this comment.
I split out the validation tests because (a) the file went over the 1000 line limit and (b) the validation tests were the most obvious candidate to move to their own file. What is left is indeed a bit of a mix of tests for formula evaluation (includes functions used in formulas) and tests for the subscribe/unsubscribe behavior of the coalesce function specifically. If would probably make sense to have test_formula_evaluation.py for the former, but then, what is the other one? test_formulas.py is the lowest common abstraction, IMO.
There was a problem hiding this comment.
I think we can keep the name for general tests without a better grouping. For example
I also just noticed these tests are under a _formulas directory, so we can also either keep test_formula.py or use something like test_general.py, and I would remove the prefix from other files, like _formulas/test_evaluation.py, _formulas/test_validation.py, again, to keep the resulting test names as short as possible (and not redundant).
| values: list[float | None] | ||
| expected_subscriptions: list[bool] | ||
|
|
||
| async def run_test( # pylint: disable=too-many-locals |
There was a problem hiding this comment.
I guess you tried to keep the current file style, so I'm OK if you prefer to keep it as it is, but I'd recommend using @pytest.mark.parametrize() for new tests.
Example conversion done by AI (with some extra test case wrapping)
class CoalesceSample(NamedTuple):
"""Helper class to represent expected behavior of coalesce function."""
values: list[float | None]
expected_subscriptions: list[bool]
class CoalesceFunctionTestCase(NamedTuple):
"""A full coalesce function test case."""
name: str
formula_str: str
samples: list[CoalesceSample]
class TestCoalesceFunction:
"""Test coalesce function subscribe/unsubscribe behavior."""
@pytest.mark.parametrize(
"test_case",
[
CoalesceFunctionTestCase(
name="subscribe_when_none_encountered",
formula_str="COALESCE(#0, #1, #2, 0.0)",
samples=[
CoalesceSample(
values=[10.0, None, None],
expected_subscriptions=[True, False, False],
),
# No need to subscribe unless stream #1 gives None
CoalesceSample(
values=[10.0, 12.0, 15.0],
expected_subscriptions=[True, False, False],
),
# If None is encountered, one subscription is added per sample
CoalesceSample(
values=[None, None, 15.0],
expected_subscriptions=[True, True, False],
),
CoalesceSample(
values=[None, None, 15.0],
expected_subscriptions=[True, True, True],
),
],
),
CoalesceFunctionTestCase(
name="unsubscribe_only_after_3_samples",
formula_str="COALESCE(#0, #1, #2, 0.0)",
samples=[
# First subscription is added before the first sample.
# Every sample can add one subscription.
CoalesceSample(
values=[None, None, 15.0],
expected_subscriptions=[True, True, False],
),
CoalesceSample(
values=[None, None, 15.0],
expected_subscriptions=[True, True, True],
),
# After 3 samples, the last subscription is dropped.
CoalesceSample(
values=[None, 12.0, 15.0],
expected_subscriptions=[True, True, True],
),
CoalesceSample(
values=[10.0, None, 15.0],
expected_subscriptions=[True, True, True],
),
CoalesceSample(
values=[None, 12.0, 15.0],
expected_subscriptions=[True, True, False],
),
],
),
],
ids=lambda tc: tc.name,
)
async def test_coalesce_subscriptions(
self,
test_case: CoalesceFunctionTestCase,
) -> None:
"""Test coalesce subscribe/unsubscribe behavior."""
# Component IDs are 0, 1, 2 for convenience.
channels: list[Broadcast[Sample[Quantity]]] = [
Broadcast(name=str(num)) for num in range(3)
]
senders = [channel.new_sender() for channel in channels]
receivers: list[Receiver[Sample[Quantity]] | None] = [None, None, None]
def new_receiver(component_id: ComponentId) -> Receiver[Sample[Quantity]]:
"""Create a new receiver, overwriting any existing one.
When Coalesce unsubscribes, it closes its receiver.
"""
comp_id = int(component_id)
receiver = channels[comp_id].new_receiver()
receivers[comp_id] = receiver
return receiver
telem_fetcher = MagicMock(spec=ResampledStreamFetcher)
telem_fetcher.fetch_stream = AsyncMock(side_effect=new_receiver)
formula = parse(
name="f2",
formula=test_case.formula_str,
create_method=Quantity,
telemetry_fetcher=telem_fetcher,
)
result_chan = formula.new_receiver()
await asyncio.sleep(0.1)
now = datetime.now()
async def send_sample(values: list[float | None]) -> None:
nonlocal now
now += timedelta(seconds=1)
_ = await asyncio.gather(
*[
senders[comp_id].send(
Sample(
now,
None if value is None else Quantity(value),
)
)
for comp_id, value in enumerate(values)
]
)
_ = await result_chan.receive()
for index, sample in enumerate(test_case.samples):
await send_sample(sample.values)
active_subscriptions = [
receiver is not None and not getattr(receiver, "_closed", True)
for receiver in receivers
]
assert active_subscriptions == sample.expected_subscriptions, (
f"{test_case.name}, step {index}: values={sample.values}"
)
await formula.stop()There was a problem hiding this comment.
I'm a big fan of parametrize, but I would not use it in this case, personally. It is possible only because the setup happens to be identical for subscribe and unsubscribe, but if that ever changes we're better off with two test cases. I also find it difficult to debug non-trivial parametrized tests (which this one would be).
There was a problem hiding this comment.
But you are sharing the code now, si if it changes you'll need to rewrite the run_test too. It is just a change in form.
But as I said, I'm good with keeping it as it is for the other reasons too.
What makes me curious now is what did you find hard to debug when using parametrization, as I never felt parametrization interfered with debugging.
There was a problem hiding this comment.
I briefly forgot that it is possible to run the individual test cases in a parametrized test - without that ability, breakpoints trigger for each case before the one of interest is even reached. So yes, it is not difficult per se, but it adds a layer of indirection.
6a483f0 to
3c5bb91
Compare
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
The statements removed here would remove component data subscriptions for components that come after a constant in Coalesce's parameter list. However, such subscriptions are never created. We only add subscriptions, one at a time, until data is available, which is always the case when we reach a constant in the parameter list. Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
3c5bb91 to
769be61
Compare
|
Sorry, I'll take a look in the morning. |
Addresses #1332.
Background
The Coalesce function subscribes and unsubscribes from components based on data availability. If it receives None-values, it will subscribe to the next component, if any, from its parameter list. Once the other components start sending data again, we may unsubscribe from "later" components. We currently unsubscribe from all components "after" a given one once that given one sends us one non-None sample. This is undesired because we may receive intermittent None values for various reasons and would then unsubscribe and resubscribe excessively.
Change
The solution is to count non-None samples from earlier streams and unsubscribe only once a threshold (3) is reached.
Discussion
If there are only two components (main, backup), it is quite straightforward. With more components, although probably rarely seen in practice, one could come up with a more sophisticated solution that tracks each components samples and marks them as stable when 3 samples are reached. Happy to discuss, but I think it's overkill.