Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ The nuget will take care of deserializing the message into proper object and giv

This whole process revolves around the ability to differentiate messages by their payload types.
To do that, it adds a UserProperty to every message. This property is the unique identifier for a designated contract
across the whole system (name of the property : `PayloadTypeId`).
across the whole system (name of the property : `PayloadTypeId`, if you need to use another property name, please see *Customizing the PayloadTypeId property name* in [Advanced Scenarios](./docs/AdvancedScenarios.md)).

> Be careful when you have a system with several applications. If several applications send 2 different contracts
> with the same Id to a single queue/topic, the receiving handlers will not be able to differentiate between them.
Expand Down
26 changes: 26 additions & 0 deletions docs/AdvancedScenarios.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,29 @@ public class MyMessageSender
}
}
```


## Customizing the PayloadTypeId property name

This package uses a UserProperty attached to every message in order to differentiate them by their payload type.
This property is the unique identifier for a designated contract across the whole system.
The name of the property that holds the contract name is *PayloadTypeId* by default, but there are cases where you want to change that property name.
To send messages with a different property name you need to configure it in the receiver:

```csharp
services.RegisterServiceBusReception().FromQueue("myQueue", builder =>
{
builder.RegisterReception<Payload, MetadataHandler>();
builder.WithCustomPayloadTypeIdProperty("DifferentPayloadTypeIdPropertyName");
});
```

To receive messages with a different property name you need to configure it in the sender:

```csharp
services.RegisterServiceBusDispatch().ToQueue("myQueue", builder =>
{
builder.RegisterDispatch<PublishedEvent>()
.CustomizePayloadTypeIdProperty("DifferentPayloadTypeIdPropertyName");
});
```
6 changes: 6 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 5.3.0
- Added
- Introduced *CustomizePayloadTypeIdProperty* method so we can customize the *PayloadTypeId* property name when sending to a topic or queue
- Introduced *WithCustomPayloadTypeIdProperty* method so we can customize the *PayloadTypeId* property name when receiving messages

## 5.2.0
- Added
- Introduced SendDispatch methods on DispatchSender. Those methods allow to send single message bigger than 1MB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public MessageDispatchRegistration(
/// </summary>
public Type PayloadType { get; }

public string? PayloadTypeIdProperty { get; private set; }

/// <summary>
/// Sets the PayloadTypeId (by default it will take the <see cref="MemberInfo.Name"/> of the payload <see cref="Type"/> object)
/// </summary>
Expand All @@ -56,6 +58,17 @@ public MessageDispatchRegistration CustomizePayloadTypeId(string payloadId)
return this;
}

/// <summary>
/// Sets the PayloadTypeIdProperty from Metadata used to store the PayloadTypeId
/// </summary>
/// <param name="payloadTypeIdProperty"></param>
/// <returns></returns>
public MessageDispatchRegistration CustomizePayloadTypeIdProperty(string payloadTypeIdProperty)
{
PayloadTypeIdProperty = payloadTypeIdProperty;
return this;
}

/// <summary>
/// This method give you the possibility to customize outgoing messages right before they are dispatched.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ public void EnableSessionHandling(Action<ServiceBusSessionProcessorOptions> conf
{
SessionProcessorOptions = config;
}

public string? PayloadTypeIdProperty { get; private set; }

public void WithCustomPayloadTypeIdProperty(string payloadTypeIdProperty)
{
PayloadTypeIdProperty = payloadTypeIdProperty;
}
}
8 changes: 4 additions & 4 deletions src/Ev.ServiceBus.Abstractions/MessageHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ namespace Ev.ServiceBus.Abstractions;

public static class MessageHelper
{
public static string? GetPayloadTypeId(this ServiceBusReceivedMessage message)
public static string? GetPayloadTypeId(this ServiceBusReceivedMessage message, string? payloadTypeIdProperty)
{
return TryGetValue(message, UserProperties.PayloadTypeIdProperty);
return TryGetValue(message, payloadTypeIdProperty ?? UserProperties.DefaultPayloadTypeIdProperty);
}

private static string? TryGetValue(ServiceBusReceivedMessage message, string propertyName)
Expand All @@ -15,7 +15,7 @@ public static class MessageHelper
return value as string;
}

public static ServiceBusMessage CreateMessage(string contentType, byte[] body, string payloadTypeId)
public static ServiceBusMessage CreateMessage(string contentType, byte[] body, string payloadTypeId, string? payloadTypeIdProperty)
{
var message = new ServiceBusMessage(body)
{
Expand All @@ -24,7 +24,7 @@ public static ServiceBusMessage CreateMessage(string contentType, byte[] body, s
ApplicationProperties =
{
{UserProperties.MessageTypeProperty, "IntegrationEvent"},
{UserProperties.PayloadTypeIdProperty, payloadTypeId}
{payloadTypeIdProperty ?? UserProperties.DefaultPayloadTypeIdProperty, payloadTypeId}
}
};
return message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@ namespace Ev.ServiceBus.Abstractions;

public class MessageContext
{
public MessageContext(ProcessSessionMessageEventArgs args, ClientType clientType, string resourceId)
public MessageContext(ProcessSessionMessageEventArgs args, ClientType clientType, string resourceId, string? payloadTypeIdProperty)
{
SessionArgs = args;
ClientType = clientType;
ResourceId = resourceId;
Message = args.Message;
CancellationToken = args.CancellationToken;
PayloadTypeId = Message.GetPayloadTypeId();
PayloadTypeId = Message.GetPayloadTypeId(payloadTypeIdProperty);
}

public MessageContext(ProcessMessageEventArgs args, ClientType clientType, string resourceId)
public MessageContext(ProcessMessageEventArgs args, ClientType clientType, string resourceId, string? payloadTypeIdProperty)
{
Args = args;
ClientType = clientType;
ResourceId = resourceId;
Message = args.Message;
CancellationToken = args.CancellationToken;
PayloadTypeId = Message.GetPayloadTypeId();
PayloadTypeId = Message.GetPayloadTypeId(payloadTypeIdProperty);
}

public ServiceBusReceivedMessage Message { get; }
Expand Down
2 changes: 1 addition & 1 deletion src/Ev.ServiceBus.Abstractions/UserProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
/// </summary>
public static class UserProperties
{
public const string PayloadTypeIdProperty = "PayloadTypeId";
public const string DefaultPayloadTypeIdProperty = "PayloadTypeId";
public const string MessageTypeProperty = "MessageType";
}
4 changes: 2 additions & 2 deletions src/Ev.ServiceBus/Dispatch/DispatchSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ private ServiceBusMessage CreateMessage(
{
var originalCorrelationId = _messageMetadataAccessor.Metadata?.CorrelationId ?? Guid.NewGuid().ToString();
var result = _messagePayloadSerializer.SerializeBody(dispatch.Payload);
var message = MessageHelper.CreateMessage(result.ContentType, result.Body, registration.PayloadTypeId);
var message = MessageHelper.CreateMessage(result.ContentType, result.Body, registration.PayloadTypeId, registration.PayloadTypeIdProperty);

dispatch.ApplicationProperties.Remove(UserProperties.PayloadTypeIdProperty);
dispatch.ApplicationProperties.Remove(registration.PayloadTypeIdProperty ?? UserProperties.DefaultPayloadTypeIdProperty);
foreach (var dispatchApplicationProperty in dispatch.ApplicationProperties)
{
message.ApplicationProperties[dispatchApplicationProperty.Key] = dispatchApplicationProperty.Value;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Azure.Messaging.ServiceBus;
using Ev.ServiceBus.Abstractions;
Expand All @@ -16,6 +18,7 @@ public ComposedReceiverOptions(ReceiverOptions[] allOptions)
SessionMode = false;
ConnectionSettings = allOptions.First().ConnectionSettings;
FirstOption = allOptions.First();
PayloadTypeIdProperty = allOptions.FirstOrDefault(o => !string.IsNullOrEmpty(o.PayloadTypeIdProperty))?.PayloadTypeIdProperty;

ProcessorOptions = new ServiceBusProcessorOptions();
foreach (var config in AllOptions.Select(o => o.ServiceBusProcessorOptions))
Expand Down Expand Up @@ -44,7 +47,8 @@ public ComposedReceiverOptions(ReceiverOptions[] allOptions)
public ServiceBusProcessorOptions ProcessorOptions { get; }
public ServiceBusSessionProcessorOptions? SessionProcessorOptions { get; }
public ConnectionSettings? ConnectionSettings { get; }

public string? PayloadTypeIdProperty { get; }

internal void UpdateResourceId(string resourceId)
{
ResourceId = resourceId;
Expand Down
2 changes: 1 addition & 1 deletion src/Ev.ServiceBus/Management/Wrappers/ReceiverWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected virtual async Task RegisterMessageHandler()
_ => ProcessorClient
};
ProcessorClient!.ProcessErrorAsync += OnExceptionOccured;
ProcessorClient.ProcessMessageAsync += args => OnMessageReceived(new MessageContext(args, _composedOptions.ClientType, _composedOptions.ResourceId));
ProcessorClient.ProcessMessageAsync += args => OnMessageReceived(new MessageContext(args, _composedOptions.ClientType, _composedOptions.ResourceId, _composedOptions.PayloadTypeIdProperty));
await ProcessorClient.StartProcessingAsync();

_onExceptionReceivedHandler = _ => Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected override async Task RegisterMessageHandler()
_ => SessionProcessorClient
};
SessionProcessorClient!.ProcessErrorAsync += OnExceptionOccured;
SessionProcessorClient.ProcessMessageAsync += args => OnMessageReceived(new MessageContext(args, _composedOptions.ClientType, _composedOptions.ResourceId));
SessionProcessorClient.ProcessMessageAsync += args => OnMessageReceived(new MessageContext(args, _composedOptions.ClientType, _composedOptions.ResourceId, _composedOptions.PayloadTypeIdProperty));
await SessionProcessorClient.StartProcessingAsync();

_onExceptionReceivedHandler = _ => Task.CompletedTask;
Expand Down
9 changes: 9 additions & 0 deletions src/Ev.ServiceBus/Reception/ReceptionRegistrationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,13 @@ public void EnableSessionHandling(Action<ServiceBusSessionProcessorOptions> conf
{
_options.EnableSessionHandling(config);
}

/// <summary>
/// Overrides the default property name for PayloadTypeId key in metadata.
/// </summary>
/// <param name="customPayloadTypeIdProperty"></param>
public void WithCustomPayloadTypeIdProperty(string customPayloadTypeIdProperty)
{
_options.WithCustomPayloadTypeIdProperty(customPayloadTypeIdProperty);
}
}
4 changes: 2 additions & 2 deletions tests/Ev.ServiceBus.TestHelpers/TestMessageHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ namespace Ev.ServiceBus.TestHelpers;

public static class TestMessageHelper
{
public static ServiceBusMessage CreateEventMessage(string payloadTypeId, object payload)
public static ServiceBusMessage CreateEventMessage(string payloadTypeId, object payload, string payloadTypeIdProperty = UserProperties.DefaultPayloadTypeIdProperty)
{
var parser = new TextJsonPayloadSerializer();
var result = parser.SerializeBody(payload);
var message = MessageHelper.CreateMessage(result.ContentType, result.Body, payloadTypeId);
var message = MessageHelper.CreateMessage(result.ContentType, result.Body, payloadTypeId, payloadTypeIdProperty);

return message;
}
Expand Down
77 changes: 64 additions & 13 deletions tests/Ev.ServiceBus.UnitTests/DispatchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ namespace Ev.ServiceBus.UnitTests;

public class DispatchTest : IDisposable
{
private const string CustomPayloadTypeIdProperty = "customized-property";
private readonly Composer _composer;
private readonly List<ServiceBusMessage> _sentMessagesToTopic;
private readonly List<ServiceBusMessage> _sentMessagesToQueue;
private readonly List<ServiceBusMessage> _sentMessagesToTopicCustomProperty;
private readonly List<ServiceBusMessage> _sentMessagesToQueueCustomProperty;
private readonly List<ServiceBusMessage> _sentMessagesToQueueSession;

public DispatchTest()
Expand All @@ -37,26 +40,42 @@ public DispatchTest()

_sentMessagesToTopic = new List<ServiceBusMessage>();
_sentMessagesToQueue = new List<ServiceBusMessage>();
_sentMessagesToTopicCustomProperty = new List<ServiceBusMessage>();
_sentMessagesToQueueCustomProperty = new List<ServiceBusMessage>();
_sentMessagesToQueueSession = new List<ServiceBusMessage>();
_composer = new Composer();

_composer.WithAdditionalServices(services =>
{
services.RegisterServiceBusDispatch().ToTopic("testTopic", builder =>
{
builder.RegisterDispatch<PublishedEvent>().CustomizePayloadTypeId("MyEvent");
builder.RegisterDispatch<PublishedEvent>()
.CustomizePayloadTypeId("MyEvent");

// noise
builder.RegisterDispatch<PublishedEvent3>().CustomizePayloadTypeId("MyEvent3");
});
services.RegisterServiceBusDispatch().ToTopic("testTopicCustomProperty", builder =>
{
builder.RegisterDispatch<PublishedEvent>()
.CustomizePayloadTypeId("MyEventCustomProperty")
.CustomizePayloadTypeIdProperty(CustomPayloadTypeIdProperty);
});
services.RegisterServiceBusDispatch().ToQueue("testQueue", builder =>
{
builder.RegisterDispatch<PublishedThroughQueueEvent>().CustomizePayloadTypeId("MyEventThroughQueue");
builder.RegisterDispatch<PublishedThroughQueueEvent>()
.CustomizePayloadTypeId("MyEventThroughQueue");
});
services.RegisterServiceBusDispatch().ToQueue("testQueueCustomProperty", builder =>
{
builder.RegisterDispatch<PublishedThroughQueueEvent>()
.CustomizePayloadTypeId("MyEventThroughQueueWithCustomProperty")
.CustomizePayloadTypeIdProperty(CustomPayloadTypeIdProperty);
});

services.RegisterServiceBusDispatch().ToQueue("testQueueSession", builder =>
{
builder.RegisterDispatch<PublishedThroughSessionQueueEvent>().CustomizePayloadTypeId("MyEventThroughQueue");
builder.RegisterDispatch<PublishedThroughSessionQueueEvent>()
.CustomizePayloadTypeId("MyEventThroughQueue");
});

// noise
Expand Down Expand Up @@ -84,10 +103,20 @@ public DispatchTest()
{
_sentMessagesToTopic.Add(message);
});

topicClient.Mock.Setup(o => o.CreateMessageBatchAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(ServiceBusModelFactory.ServiceBusMessageBatch(0, _sentMessagesToTopic, createMessageBatchOptions));

var topicClientCustomProperty = _composer.ClientFactory.GetSenderMock("testTopicCustomProperty");
topicClientCustomProperty.Mock
.Setup(o => o.SendMessageAsync(It.IsAny<ServiceBusMessage>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask)
.Callback((ServiceBusMessage message, CancellationToken token) =>
{
_sentMessagesToTopicCustomProperty.Add(message);
});
topicClientCustomProperty.Mock.Setup(o => o.CreateMessageBatchAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(ServiceBusModelFactory.ServiceBusMessageBatch(0, _sentMessagesToTopicCustomProperty, createMessageBatchOptions));

var queueClient = _composer.ClientFactory.GetSenderMock("testQueue");
queueClient.Mock
.Setup(o => o.SendMessageAsync(It.IsAny<ServiceBusMessage>(), It.IsAny<CancellationToken>()))
Expand All @@ -99,6 +128,17 @@ public DispatchTest()
queueClient.Mock.Setup(o => o.CreateMessageBatchAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(ServiceBusModelFactory.ServiceBusMessageBatch(0, _sentMessagesToQueue, createMessageBatchOptions));

var queueClientCustomProperty = _composer.ClientFactory.GetSenderMock("testQueueCustomProperty");
queueClient.Mock
.Setup(o => o.SendMessageAsync(It.IsAny<ServiceBusMessage>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask)
.Callback((ServiceBusMessage message, CancellationToken token) =>
{
_sentMessagesToQueueCustomProperty.Add(message);
});
queueClientCustomProperty.Mock.Setup(o => o.CreateMessageBatchAsync(It.IsAny<CancellationToken>()))
.ReturnsAsync(ServiceBusModelFactory.ServiceBusMessageBatch(0, _sentMessagesToQueueCustomProperty, createMessageBatchOptions));

var queueClientSession = _composer.ClientFactory.GetSenderMock("testQueueSession");
queueClientSession.Mock
.Setup(o => o.SendMessageAsync(It.IsAny<ServiceBusMessage>(), It.IsAny<CancellationToken>()))
Expand Down Expand Up @@ -171,13 +211,15 @@ public void MessageMustContainTheRightMessageType(string clientToCheck)
}

[Theory]
[InlineData("topic", "MyEvent")]
[InlineData("queue", "MyEventThroughQueue")]
public void MessageMustContainTheRightPayloadTypeId(string clientToCheck, string payloadTypeId)
[InlineData("topic", "MyEvent", UserProperties.DefaultPayloadTypeIdProperty)]
[InlineData("queue", "MyEventThroughQueue", UserProperties.DefaultPayloadTypeIdProperty)]
[InlineData("topic2", "MyEventCustomProperty", CustomPayloadTypeIdProperty)]
[InlineData("queue2", "MyEventThroughQueueWithCustomProperty", CustomPayloadTypeIdProperty)]
public void MessageMustContainTheRightPayloadTypeIdInTheRightProperty(string clientToCheck, string payloadTypeId, string payloadTypeIdProperty)
{
var message = GetMessageFrom(clientToCheck);
Assert.True(message?.ApplicationProperties.ContainsKey("PayloadTypeId"));
Assert.Equal(payloadTypeId, message?.ApplicationProperties["PayloadTypeId"]);
Assert.True(message?.ApplicationProperties.ContainsKey(payloadTypeIdProperty));
Assert.Equal(payloadTypeId, message?.ApplicationProperties[payloadTypeIdProperty]);
}

[Theory]
Expand Down Expand Up @@ -531,11 +573,20 @@ private ServiceBusMessage GetMessageFrom(string clientToCheck)
{
return _sentMessagesToTopic.FirstOrDefault();
}
if (clientToCheck == "sessionQueue")
if (clientToCheck == "topic2")
{
return _sentMessagesToTopicCustomProperty.FirstOrDefault();
}
if (clientToCheck == "queue")
{
return _sentMessagesToQueue.FirstOrDefault();
}
if (clientToCheck == "queue2")
{
return _sentMessagesToQueueSession.FirstOrDefault();
return _sentMessagesToQueueCustomProperty.FirstOrDefault();
}
return _sentMessagesToQueue.FirstOrDefault();
// sessionQueue
return _sentMessagesToQueueSession.FirstOrDefault();
}

public void Dispose()
Expand Down
Loading