Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions src/ReactiveUI.Binding/Observables/PropertyChangingObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ internal sealed class Subscription : IDisposable
/// </summary>
private readonly PropertyChangingObservable<T> _parent;

/// <summary>
/// Serializes the initial emit in the constructor with concurrent <see cref="OnPropertyChanging"/>
/// invocations on other threads, so a racing handler emit and the constructor's initial emit do
/// not interleave on the downstream observer.
/// </summary>
private readonly object _gate = new();

/// <summary>
/// The downstream observer. Set to <see langword="null"/> on disposal.
/// </summary>
Expand All @@ -86,10 +93,7 @@ public Subscription(PropertyChangingObservable<T> parent, IObserver<T> observer)
_observer = observer;

parent._source.PropertyChanging += OnPropertyChanging;

// Emit initial (StartWith) value
var initial = parent._getter(parent._source);
observer.OnNext(initial!);
EmitCurrent();
}

/// <inheritdoc/>
Expand Down Expand Up @@ -125,14 +129,28 @@ private void OnPropertyChanging(object? sender, PropertyChangingEventArgs e)
return;
}

var observer = Volatile.Read(ref _observer);
if (observer is null)
EmitCurrent();
}

/// <summary>
/// Reads the current property value under <see cref="_gate"/> and forwards it to the downstream
/// observer. Holding <see cref="_gate"/> across the read-emit pair ensures the constructor's
/// initial emit and any concurrent <see cref="OnPropertyChanging"/> invocation cannot
/// interleave on the downstream observer.
/// </summary>
private void EmitCurrent()
{
lock (_gate)
{
return;
var observer = Volatile.Read(ref _observer);
if (observer is null)
{
return;
}

var value = _parent._getter(_parent._source);
observer.OnNext(value!);
}

var value = _parent._getter(_parent._source);
observer.OnNext(value!);
}
}
}
55 changes: 36 additions & 19 deletions src/ReactiveUI.Binding/Observables/PropertyObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ internal sealed class Subscription : IDisposable
/// </summary>
private readonly EqualityComparer<T> _comparer;

/// <summary>
/// Serializes the initial emit in the constructor with concurrent <see cref="OnPropertyChanged"/>
/// invocations on other threads, so the handler always sees a consistent
/// <see cref="_hasValue"/> / <see cref="_lastValue"/> snapshot regardless of timing.
/// </summary>
private readonly object _gate = new();

/// <summary>
/// The downstream observer. Set to <see langword="null"/> on disposal.
/// </summary>
Expand Down Expand Up @@ -111,12 +118,7 @@ public Subscription(PropertyObservable<T> parent, IObserver<T> observer)
_comparer = EqualityComparer<T>.Default;

parent._source.PropertyChanged += OnPropertyChanged;

// Emit initial (StartWith) value
var initial = parent._getter(parent._source);
_lastValue = initial;
_hasValue = true;
observer.OnNext(initial!);
EmitCurrent();
}

/// <inheritdoc/>
Expand Down Expand Up @@ -152,22 +154,37 @@ private void OnPropertyChanged(object? sender, PropertyChangedEventArgs e)
return;
}

var observer = Volatile.Read(ref _observer);
if (observer is null)
{
return;
}

var value = _parent._getter(_parent._source);
EmitCurrent();
}

if (_parent._distinctUntilChanged && _hasValue && _comparer.Equals(value!, _lastValue!))
/// <summary>
/// Reads the current property value under <see cref="_gate"/> and forwards it to the downstream
/// observer when the distinct-until-changed gate allows. Holding <see cref="_gate"/> across the
/// read-decision-emit sequence ensures the constructor's initial emit and any concurrent
/// <see cref="OnPropertyChanged"/> invocation cannot interleave on the downstream observer or
/// publish a duplicate when both see the same current value.
/// </summary>
private void EmitCurrent()
{
lock (_gate)
{
return;
var observer = Volatile.Read(ref _observer);
if (observer is null)
{
return;
}

var value = _parent._getter(_parent._source);

if (_parent._distinctUntilChanged && _hasValue && _comparer.Equals(value!, _lastValue!))
{
return;
}

_lastValue = value;
_hasValue = true;
observer.OnNext(value!);
}

_lastValue = value;
_hasValue = true;
observer.OnNext(value!);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.ComponentModel;
using System.Globalization;
using ReactiveUI.Binding.Observables;

namespace ReactiveUI.Binding.Tests.Observables;

/// <summary>
/// Stress tests probing the subscribe-vs-initial-emit window in <see cref="PropertyObservable{T}.Subscription"/>.
/// </summary>
public class PropertyObservableSubscribeRaceProbeTests
{
/// <summary>
/// Probes the constructor's subscribe-then-read window for a duplicate emission when a background
/// thread mutates the property between the handler attach and the constructor's state writes.
/// Without the fix, the racing handler runs while <c>_hasValue</c> is still <see langword="false"/>,
/// so its distinct check is skipped; the constructor then re-emits the same value with no dedup.
/// </summary>
/// <returns>A <see cref="Task"/> representing the asynchronous unit test.</returns>
[Test]
public async Task Subscribe_ConcurrentMutationDuringInitialEmit_DoesNotEmitConsecutiveDuplicate()
{
const int iterations = 5_000;
const int mutationsPerIteration = 32;

var duplicateCount = 0;
var maxObservedLength = 0;

for (var i = 0; i < iterations; i++)
{
var vm = new MutableViewModel { Name = "v0" };
using var mutatorReady = new ManualResetEventSlim(false);
using var mutatorDone = new ManualResetEventSlim(false);

var mutator = new Thread(() =>
{
mutatorReady.Set();
for (var j = 1; j <= mutationsPerIteration; j++)
{
vm.Name = "v" + j.ToString(CultureInfo.InvariantCulture);
}

mutatorDone.Set();
})
{ IsBackground = true };
mutator.Start();
mutatorReady.Wait();

var collector = new ThreadSafeList<string?>();
var observable = new PropertyObservable<string?>(vm, nameof(vm.Name), x => ((MutableViewModel)x).Name, distinctUntilChanged: true);
using (observable.Subscribe(new AnonymousObserver<string?>(collector.Add, _ => { }, () => { })))
{
mutatorDone.Wait();
mutator.Join();
}

var observed = collector.Snapshot();
if (observed.Count > maxObservedLength)
{
maxObservedLength = observed.Count;
}

for (var k = 1; k < observed.Count; k++)
{
if (string.Equals(observed[k], observed[k - 1], StringComparison.Ordinal))
{
duplicateCount++;
break;
}
}
}

// The handler-emit + constructor-emit duplicate must never reach the downstream when the user
// asked for distinctUntilChanged: true. Reaching the maxObservedLength > 0 assertion confirms
// the stress actually exercised the pipeline (not all iterations were no-ops).
await Assert.That(duplicateCount).IsEqualTo(0);
await Assert.That(maxObservedLength).IsGreaterThan(0);
}

/// <summary>
/// End-to-end invariant test: after the mutator has finished its writes and a settlement
/// PropertyChanged re-fire has been delivered through a synchronization barrier, the subscriber's
/// last observed value must equal the property's final value. The barrier serves only to defeat
/// the orthogonal INPC event-accessor visibility race (the mutator's PropertyChanged.Invoke can
/// otherwise read a stale snapshot of the delegate that omits the just-attached handler) so the
/// assertion measures the subscription's behaviour rather than that quirk.
/// </summary>
/// <returns>A <see cref="Task"/> representing the asynchronous unit test.</returns>
[Test]
public async Task Subscribe_AfterSettlement_LastObservedValueMatchesProperty()
{
const int iterations = 5_000;
const int mutationsPerIteration = 32;

var mismatchCount = 0;

for (var i = 0; i < iterations; i++)
{
var vm = new MutableViewModel { Name = "v0" };
using var mutatorReady = new ManualResetEventSlim(false);
using var settlementAllowed = new ManualResetEventSlim(false);
using var mutatorDone = new ManualResetEventSlim(false);

var mutator = new Thread(() =>
{
mutatorReady.Set();
for (var j = 1; j <= mutationsPerIteration; j++)
{
vm.Name = "v" + j.ToString(CultureInfo.InvariantCulture);
}

settlementAllowed.Wait();
vm.RaisePropertyChanged();
mutatorDone.Set();
})
{ IsBackground = true };
mutator.Start();
mutatorReady.Wait();

var collector = new ThreadSafeList<string?>();
var observable = new PropertyObservable<string?>(vm, nameof(vm.Name), x => ((MutableViewModel)x).Name, distinctUntilChanged: true);
using (observable.Subscribe(new AnonymousObserver<string?>(collector.Add, _ => { }, () => { })))
{
settlementAllowed.Set();
mutatorDone.Wait();
mutator.Join();
}

var observed = collector.Snapshot();
if (observed.Count == 0 || !string.Equals(observed[^1], vm.Name, StringComparison.Ordinal))
{
mismatchCount++;
}
}

await Assert.That(mismatchCount).IsEqualTo(0);
}

/// <summary>Single-property view model with a hand-rolled INPC setter.</summary>
private sealed class MutableViewModel : INotifyPropertyChanged
{
/// <summary>Backing field for <see cref="Name"/>.</summary>
private string? _name;

/// <inheritdoc/>
public event PropertyChangedEventHandler? PropertyChanged;

/// <summary>Gets or sets the observed property; raises <see cref="PropertyChanged"/> on every set.</summary>
public string? Name
{
get => _name;
set
{
_name = value;
PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Name)));
}
}

/// <summary>Raises <see cref="PropertyChanged"/> for <see cref="Name"/> without changing the field.
/// Used by the settlement test to defeat the INPC event-accessor visibility race.</summary>
public void RaisePropertyChanged() =>
PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Name)));
}

/// <summary>Thread-safe list wrapper used to collect cross-thread emissions.</summary>
/// <typeparam name="T">The element type stored in the list.</typeparam>
private sealed class ThreadSafeList<T>
{
/// <summary>Serializes mutations and snapshot reads.</summary>
private readonly object _gate = new();

/// <summary>The collected items.</summary>
private readonly List<T> _items = [];

/// <summary>Appends an item to the list under the internal gate.</summary>
/// <param name="value">The item to append.</param>
public void Add(T value)
{
lock (_gate)
{
_items.Add(value);
}
}

/// <summary>Returns a fresh copy of the current items.</summary>
/// <returns>A new list containing the items observed so far.</returns>
public List<T> Snapshot()
{
lock (_gate)
{
return [.. _items];
}
}
}

/// <summary>Anonymous observer that forwards each callback to the supplied delegates.</summary>
/// <typeparam name="T">The element type observed.</typeparam>
/// <param name="onNext">Action invoked for each <c>OnNext</c> value.</param>
/// <param name="onError">Action invoked when an error propagates.</param>
/// <param name="onCompleted">Action invoked on completion.</param>
private sealed class AnonymousObserver<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted) : IObserver<T>
{
/// <inheritdoc/>
public void OnNext(T value) => onNext(value);

/// <inheritdoc/>
public void OnError(Exception error) => onError(error);

/// <inheritdoc/>
public void OnCompleted() => onCompleted();
}
}
Loading