Skip to content

Commit 8f89eaf

Browse files
authored
Merge pull request #3 from Shuttle/v20
V20
2 parents 640d76a + 51e6fbb commit 8f89eaf

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+589
-655
lines changed

README.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,18 @@ Provides various classes and interfaces to facilitate thread-based processing.
99
## ProcessorThreadPool
1010

1111
``` c#
12-
public ProcessorThreadPool(string name, int threadCount, IProcessorFactory processorFactory, ProcessorThreadOptions processorThreadOptions);
12+
public ProcessorThreadPool(
13+
string name,
14+
int threadCount,
15+
IServiceScopeFactory serviceScopeFactory,
16+
IProcessorFactory processorFactory,
17+
ProcessorThreadOptions processorThreadOptions
18+
);
1319
```
1420

15-
Each thread pool has a `name` used only for identyfing the pool. The `threadCount` determines the number of `ProcessorThread` instances in the pool. Each `ProcessorThread` calls the `IProcessor.Execute(CancellationToken)` method, or `IProcessor.ExecuteAsync(CancellationToken)` method if started asynchronously, on the instance provided by the `IProcessorFactory.Create()` method in a loop while the `CancellationToken.IsCancellationRequested` returns `false`.
21+
Each thread pool has a `name` used only for identyfing the pool. The `threadCount` determines the number of `ProcessorThread` instances in the pool. Each `ProcessorThread` calls the `IProcessor.ExecuteAsync(CancellationToken)` method on the instance provided by the `IProcessorFactory.Create()` method in a loop while the `CancellationToken.IsCancellationRequested` returns `false`.
22+
23+
Every call to `IProcessor.ExecuteAsync(ProcessorThreadContext, CancellationToken)` is wrapped in a `ProcessorThreadContext` instance that provides the `State` along with the `IServiceScope` instance created by the `IServiceScopeFactory.CreateScope()` method.
1624

1725
## ProcessorThreadOptions
1826

Shuttle.Core.Threading.Tests/AmbientContextFixture.cs

Lines changed: 54 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,66 +2,65 @@
22
using System.Threading.Tasks;
33
using NUnit.Framework;
44

5-
namespace Shuttle.Core.Threading.Tests
5+
namespace Shuttle.Core.Threading.Tests;
6+
7+
[TestFixture]
8+
public class AmbientContextFixture
69
{
7-
[TestFixture]
8-
public class AmbientContextFixture
10+
[Test]
11+
public void Should_be_able_to_flow_data()
912
{
10-
[Test]
11-
public void Should_be_able_to_flow_data()
12-
{
13-
var d1 = new object();
14-
var t1 = default(object);
15-
var t10 = default(object);
16-
var t11 = default(object);
17-
var t12 = default(object);
18-
var t13 = default(object);
19-
var d2 = new object();
20-
var t2 = default(object);
21-
var t20 = default(object);
22-
var t21 = default(object);
23-
var t22 = default(object);
24-
var t23 = default(object);
13+
var d1 = new object();
14+
var t1 = default(object);
15+
var t10 = default(object);
16+
var t11 = default(object);
17+
var t12 = default(object);
18+
var t13 = default(object);
19+
var d2 = new object();
20+
var t2 = default(object);
21+
var t20 = default(object);
22+
var t21 = default(object);
23+
var t22 = default(object);
24+
var t23 = default(object);
2525

26-
Task.WaitAll(
27-
Task.Run(() =>
28-
{
29-
AmbientContext.SetData("d1", d1);
30-
new Thread(() => t10 = AmbientContext.GetData("d1")).Start();
31-
Task.WaitAll(
32-
Task.Run(() => t1 = AmbientContext.GetData("d1"))
33-
.ContinueWith(t => Task.Run(() => t11 = AmbientContext.GetData("d1"))),
34-
Task.Run(() => t12 = AmbientContext.GetData("d1")),
35-
Task.Run(() => t13 = AmbientContext.GetData("d1"))
36-
);
37-
}),
38-
Task.Run(() =>
39-
{
40-
AmbientContext.SetData("d2", d2);
41-
new Thread(() => t20 = AmbientContext.GetData("d2")).Start();
42-
Task.WaitAll(
43-
Task.Run(() => t2 = AmbientContext.GetData("d2"))
44-
.ContinueWith(t => Task.Run(() => t21 = AmbientContext.GetData("d2"))),
45-
Task.Run(() => t22 = AmbientContext.GetData("d2")),
46-
Task.Run(() => t23 = AmbientContext.GetData("d2"))
47-
);
48-
})
49-
);
26+
Task.WaitAll(
27+
Task.Run(() =>
28+
{
29+
AmbientContext.SetData("d1", d1);
30+
new Thread(() => t10 = AmbientContext.GetData("d1")).Start();
31+
Task.WaitAll(
32+
Task.Run(() => t1 = AmbientContext.GetData("d1"))
33+
.ContinueWith(t => Task.Run(() => t11 = AmbientContext.GetData("d1"))),
34+
Task.Run(() => t12 = AmbientContext.GetData("d1")),
35+
Task.Run(() => t13 = AmbientContext.GetData("d1"))
36+
);
37+
}),
38+
Task.Run(() =>
39+
{
40+
AmbientContext.SetData("d2", d2);
41+
new Thread(() => t20 = AmbientContext.GetData("d2")).Start();
42+
Task.WaitAll(
43+
Task.Run(() => t2 = AmbientContext.GetData("d2"))
44+
.ContinueWith(t => Task.Run(() => t21 = AmbientContext.GetData("d2"))),
45+
Task.Run(() => t22 = AmbientContext.GetData("d2")),
46+
Task.Run(() => t23 = AmbientContext.GetData("d2"))
47+
);
48+
})
49+
);
5050

51-
Assert.That(d1, Is.SameAs(t1));
52-
Assert.That(d1, Is.SameAs(t10));
53-
Assert.That(d1, Is.SameAs(t11));
54-
Assert.That(d1, Is.SameAs(t12));
55-
Assert.That(d1, Is.SameAs(t13));
51+
Assert.That(d1, Is.SameAs(t1));
52+
Assert.That(d1, Is.SameAs(t10));
53+
Assert.That(d1, Is.SameAs(t11));
54+
Assert.That(d1, Is.SameAs(t12));
55+
Assert.That(d1, Is.SameAs(t13));
5656

57-
Assert.That(d2, Is.SameAs(t2));
58-
Assert.That(d2, Is.SameAs(t20));
59-
Assert.That(d2, Is.SameAs(t21));
60-
Assert.That(d2, Is.SameAs(t22));
61-
Assert.That(d2, Is.SameAs(t23));
57+
Assert.That(d2, Is.SameAs(t2));
58+
Assert.That(d2, Is.SameAs(t20));
59+
Assert.That(d2, Is.SameAs(t21));
60+
Assert.That(d2, Is.SameAs(t22));
61+
Assert.That(d2, Is.SameAs(t23));
6262

63-
Assert.Null(AmbientContext.GetData("d1"));
64-
Assert.Null(AmbientContext.GetData("d2"));
65-
}
63+
Assert.That(AmbientContext.GetData("d1"), Is.Null);
64+
Assert.That(AmbientContext.GetData("d2"), Is.Null);
6665
}
6766
}

Shuttle.Core.Threading.Tests/MockProcessor.cs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,14 @@ public class MockProcessor : IProcessor
88
{
99
private readonly TimeSpan _executionDuration;
1010

11-
public int ExecutionCount { get; private set; }
12-
1311
public MockProcessor(TimeSpan executionDuration)
1412
{
1513
_executionDuration = executionDuration;
1614
}
1715

18-
public void Execute(CancellationToken cancellationToken)
19-
{
20-
ExecuteAsync(cancellationToken).GetAwaiter().GetResult();
21-
}
16+
public int ExecutionCount { get; private set; }
2217

23-
public async Task ExecuteAsync(CancellationToken cancellationToken)
18+
public async Task ExecuteAsync(IProcessorThreadContext context, CancellationToken cancellationToken)
2419
{
2520
await Task.Delay(_executionDuration, cancellationToken).ConfigureAwait(false);
2621
ExecutionCount++;

Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,26 @@
11
using System;
22
using System.Threading;
33
using System.Threading.Tasks;
4+
using Microsoft.Extensions.DependencyInjection;
5+
using Moq;
46
using NUnit.Framework;
57

68
namespace Shuttle.Core.Threading.Tests;
79

810
public class ProcessorThreadFixture
911
{
10-
[Test]
11-
public void Should_be_able_to_execute_processor_thread()
12-
{
13-
Should_be_able_to_execute_processor_thread_async(true).GetAwaiter().GetResult();
14-
}
15-
1612
[Test]
1713
public async Task Should_be_able_to_execute_processor_thread_async()
18-
{
19-
await Should_be_able_to_execute_processor_thread_async(false);
20-
}
21-
22-
private async Task Should_be_able_to_execute_processor_thread_async(bool sync)
2314
{
2415
const int minimumExecutionCount = 5;
2516

17+
var serviceScopeFactory = new Mock<IServiceScopeFactory>();
18+
19+
serviceScopeFactory.Setup(m => m.CreateScope()).Returns(new Mock<IServiceScope>().Object);
20+
2621
var executionDuration = TimeSpan.FromMilliseconds(200);
2722
var mockProcessor = new MockProcessor(executionDuration);
28-
var processorThread = new ProcessorThread("thread", mockProcessor, new ProcessorThreadOptions());
23+
var processorThread = new ProcessorThread("thread", serviceScopeFactory.Object, mockProcessor, new());
2924
var cancellationTokenSource = new CancellationTokenSource();
3025
var cancellationToken = cancellationTokenSource.Token;
3126

@@ -51,7 +46,7 @@ private async Task Should_be_able_to_execute_processor_thread_async(bool sync)
5146

5247
processorThread.ProcessorThreadStopped += (sender, args) =>
5348
{
54-
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopped] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId} / aborted = '{args.Aborted}'");
49+
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopped] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
5550
};
5651

5752
processorThread.ProcessorThreadStopping += (sender, args) =>
@@ -64,14 +59,7 @@ private async Task Should_be_able_to_execute_processor_thread_async(bool sync)
6459
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadOperationCanceled] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
6560
};
6661

67-
if (sync)
68-
{
69-
processorThread.Start();
70-
}
71-
else
72-
{
73-
await processorThread.StartAsync();
74-
}
62+
await processorThread.StartAsync();
7563

7664
var timeout = DateTime.Now.AddSeconds(500);
7765
var timedOut = false;
@@ -85,7 +73,7 @@ private async Task Should_be_able_to_execute_processor_thread_async(bool sync)
8573

8674
cancellationTokenSource.Cancel();
8775

88-
processorThread.Stop();
76+
await processorThread.StopAsync();
8977

9078
Assert.That(timedOut, Is.False, $"[TIMEOUT] : Did not complete {minimumExecutionCount} executions before {timeout:O}");
9179
}

Shuttle.Core.Threading.Tests/ProcessorThreadPoolFixture.cs

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,34 @@
1-
using NUnit.Framework;
2-
using System.Threading.Tasks;
3-
using System.Threading;
4-
using System;
1+
using System;
52
using System.Linq;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Microsoft.Extensions.DependencyInjection;
66
using Moq;
7+
using NUnit.Framework;
78

89
namespace Shuttle.Core.Threading.Tests;
910

1011
public class ProcessorThreadPoolFixture
1112
{
12-
[Test]
13-
public void Should_be_able_to_execute_processor_thread_pool()
14-
{
15-
Should_be_able_to_execute_processor_thread_pool_async(true).GetAwaiter().GetResult();
16-
}
17-
1813
[Test]
1914
public async Task Should_be_able_to_execute_processor_thread_pool_async()
20-
{
21-
await Should_be_able_to_execute_processor_thread_pool_async(false);
22-
}
23-
24-
private async Task Should_be_able_to_execute_processor_thread_pool_async(bool sync)
2515
{
2616
const int minimumExecutionCount = 5;
2717

18+
var serviceScopeFactory = new Mock<IServiceScopeFactory>();
19+
20+
serviceScopeFactory.Setup(m => m.CreateScope()).Returns(new Mock<IServiceScope>().Object);
21+
2822
var executionDuration = TimeSpan.FromMilliseconds(500);
2923
var cancellationTokenSource = new CancellationTokenSource();
3024
var cancellationToken = cancellationTokenSource.Token;
3125
var processorFactory = new Mock<IProcessorFactory>();
3226

3327
processorFactory.Setup(m => m.Create()).Returns(() => new MockProcessor(executionDuration));
3428

35-
var processorThreadPool = new ProcessorThreadPool("thread-pool", 5, processorFactory.Object, new ProcessorThreadOptions());
29+
var processorThreadPool = new ProcessorThreadPool("thread-pool", 5, serviceScopeFactory.Object, processorFactory.Object, new());
3630

37-
processorThreadPool.ProcessorThreadCreated += (sender, args) =>
31+
processorThreadPool.ProcessorThreadCreated += (_, args) =>
3832
{
3933
args.ProcessorThread.ProcessorException += (sender, args) =>
4034
{
@@ -58,7 +52,7 @@ private async Task Should_be_able_to_execute_processor_thread_pool_async(bool sy
5852

5953
args.ProcessorThread.ProcessorThreadStopped += (sender, args) =>
6054
{
61-
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopped] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId} / aborted = '{args.Aborted}'");
55+
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopped] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
6256
};
6357

6458
args.ProcessorThread.ProcessorThreadStopping += (sender, args) =>
@@ -72,14 +66,7 @@ private async Task Should_be_able_to_execute_processor_thread_pool_async(bool sy
7266
};
7367
};
7468

75-
if (sync)
76-
{
77-
processorThreadPool.Start();
78-
}
79-
else
80-
{
81-
await processorThreadPool.StartAsync();
82-
}
69+
await processorThreadPool.StartAsync();
8370

8471
var timeout = DateTime.Now.AddSeconds(5);
8572
var timedOut = false;
@@ -93,7 +80,7 @@ private async Task Should_be_able_to_execute_processor_thread_pool_async(bool sy
9380

9481
cancellationTokenSource.Cancel();
9582

96-
processorThreadPool.Stop();
83+
await processorThreadPool.StopAsync();
9784

9885
Assert.That(timedOut, Is.False, $"[TIMEOUT] : Did not complete {minimumExecutionCount} executions before {timeout:O}");
9986
}

0 commit comments

Comments
 (0)