Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 51 additions & 23 deletions src/Exceptionless.Core/Utility/IConnectionMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,63 +12,91 @@ public interface IConnectionMapping

public class ConnectionMapping : IConnectionMapping
{
private readonly ConcurrentDictionary<string, HashSet<string>> _connections = new();
private readonly ConcurrentDictionary<string, ConnectionSet> _connections = new();

internal int TrackedKeyCount => _connections.Count;

public Task AddAsync(string key, string connectionId)
{
if (key is null)
return Task.CompletedTask;

_connections.AddOrUpdate(key, [.. new[] { connectionId }], (_, hs) =>
while (true)
{
hs.Add(connectionId);
return hs;
});
var connections = _connections.GetOrAdd(key, _ => new ConnectionSet());

lock (connections.SyncRoot)
{
if (connections.IsDetachedFromMap)
continue;

return Task.CompletedTask;
connections.ConnectionIds.Add(connectionId);
return Task.CompletedTask;
}
}
}

public Task<ICollection<string>> GetConnectionsAsync(string key)
{
if (key is null)
return Task.FromResult<ICollection<string>>(new List<string>());
return Task.FromResult<ICollection<string>>([]);

return Task.FromResult<ICollection<string>>(_connections.GetOrAdd(key, []));
if (!_connections.TryGetValue(key, out var connections))
return Task.FromResult<ICollection<string>>([]);

lock (connections.SyncRoot)
{
if (connections.IsDetachedFromMap)
return Task.FromResult<ICollection<string>>([]);

return Task.FromResult<ICollection<string>>([.. connections.ConnectionIds]);
}
}

public Task<int> GetConnectionCountAsync(string key)
{
if (key is null)
return Task.FromResult(0);

if (_connections.TryGetValue(key, out var connections))
return Task.FromResult(connections.Count);
if (!_connections.TryGetValue(key, out var connections))
return Task.FromResult(0);

return Task.FromResult(0);
lock (connections.SyncRoot)
{
return Task.FromResult(connections.IsDetachedFromMap ? 0 : connections.ConnectionIds.Count);
}
}

public Task RemoveAsync(string key, string connectionId)
{
if (key is null)
return Task.CompletedTask;

bool shouldRemove = false;
_connections.AddOrUpdate(key, [], (_, hs) =>
if (!_connections.TryGetValue(key, out var connections))
return Task.CompletedTask;

lock (connections.SyncRoot)
{
hs.Remove(connectionId);
if (hs.Count == 0)
shouldRemove = true;
if (connections.IsDetachedFromMap)
return Task.CompletedTask;

return hs;
});
if (!connections.ConnectionIds.Remove(connectionId))
return Task.CompletedTask;

if (!shouldRemove)
return Task.CompletedTask;
if (connections.ConnectionIds.Count is not 0)
return Task.CompletedTask;

if (_connections.TryRemove(key, out var connections) && connections.Count > 0)
_connections.TryAdd(key, connections);
connections.IsDetachedFromMap = true;
_connections.TryRemove(key, out _);
return Task.CompletedTask;
}
}

return Task.CompletedTask;
private sealed class ConnectionSet
{
public object SyncRoot { get; } = new();
public HashSet<string> ConnectionIds { get; } = [];
public bool IsDetachedFromMap { get; set; }
}
}

Expand Down
41 changes: 31 additions & 10 deletions src/Exceptionless.Web/Hubs/MessageBusBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,25 @@ private async Task OnUserMembershipChangedAsync(UserMembershipChanged userMember
_logger.LogTrace("Attempting to update user {User} active groups for {UserConnectionCount} connections", userMembershipChanged.UserId, userConnectionIds.Count);
foreach (string connectionId in userConnectionIds)
{
if (userMembershipChanged.ChangeType == ChangeType.Added)
if (userMembershipChanged.ChangeType is ChangeType.Added)
await _connectionMapping.GroupAddAsync(userMembershipChanged.OrganizationId, connectionId);
else if (userMembershipChanged.ChangeType == ChangeType.Removed)
else if (userMembershipChanged.ChangeType is ChangeType.Removed)
await _connectionMapping.GroupRemoveAsync(userMembershipChanged.OrganizationId, connectionId);
}

await GroupSendAsync(userMembershipChanged.OrganizationId, userMembershipChanged);
}

private async Task OnEntityChangedAsync(EntityChanged ec, CancellationToken cancellationToken = default)
internal async Task OnEntityChangedAsync(EntityChanged ec, CancellationToken cancellationToken = default)
{
if (ec is null)
return;

var entityChanged = ExtendedEntityChanged.Create(ec);
if (UserTypeName == entityChanged.Type)
if (String.Equals(UserTypeName, entityChanged.Type, StringComparison.Ordinal))
{
// It's pointless to send a user added message to the new user.
if (entityChanged.ChangeType == ChangeType.Added)
if (entityChanged.ChangeType is ChangeType.Added)
{
_logger.LogTrace("Ignoring {UserTypeName} message for added user: {UserId}", UserTypeName, entityChanged.Id);
return;
Expand All @@ -97,22 +97,43 @@ private async Task OnEntityChangedAsync(EntityChanged ec, CancellationToken canc
}

// Only allow specific token messages to be sent down to the client.
if (TokenTypeName == entityChanged.Type)
if (String.Equals(TokenTypeName, entityChanged.Type, StringComparison.Ordinal))
{
string? userId = entityChanged.Data.GetValueOrDefault<string>(ExtendedEntityChanged.KnownKeys.UserId);
bool isAuthToken = entityChanged.Data.GetValueOrDefault<bool>(ExtendedEntityChanged.KnownKeys.IsAuthenticationToken);

if (userId is not null)
{
var userConnectionIds = await _connectionMapping.GetUserIdConnectionsAsync(userId);
_logger.LogTrace("Sending {TokenTypeName} message for added user: {UserId} (to {UserConnectionCount} connections)", TokenTypeName, userId, userConnectionIds.Count);

// Auth token removed = logout. Close sockets immediately without sending;
// there is no point delivering a message to a connection we are about to tear down.
if (isAuthToken && entityChanged.ChangeType is ChangeType.Removed)
{
_logger.LogTrace("Auth token removed for user {UserId}; closing {ConnectionCount} WebSocket connection(s)", userId, userConnectionIds.Count);
string? organizationId = entityChanged.OrganizationId;
foreach (string connectionId in userConnectionIds)
{
if (organizationId is { Length: > 0 })
await _connectionMapping.GroupRemoveAsync(organizationId, connectionId);

await _connectionMapping.UserIdRemoveAsync(userId, connectionId);
await _connectionManager.RemoveWebSocketAsync(connectionId);
}

return;
}

_logger.LogTrace("Sending {TokenTypeName} message for user: {UserId} (to {UserConnectionCount} connections)", TokenTypeName, userId, userConnectionIds.Count);
foreach (string connectionId in userConnectionIds)
await TypedSendAsync(connectionId, entityChanged);

return;
}

if (entityChanged.Data.GetValueOrDefault<bool>(ExtendedEntityChanged.KnownKeys.IsAuthenticationToken))
if (isAuthToken)
{
_logger.LogTrace("Ignoring {TokenTypeName} Authentication Token message: {UserId}", TokenTypeName, entityChanged.Id);
_logger.LogTrace("Ignoring {TokenTypeName} Authentication Token message: {TokenId}", TokenTypeName, entityChanged.Id);
return;
}

Expand Down Expand Up @@ -163,7 +184,7 @@ private Task OnSystemNotificationAsync(SystemNotification notification, Cancella
private async Task GroupSendAsync(string group, object value)
{
var connectionIds = await _connectionMapping.GetGroupConnectionsAsync(group);
if (connectionIds.Count == 0)
if (connectionIds.Count is 0)
{
_logger.LogTrace("Ignoring group message to {Group}: No Connections", group);
return;
Expand Down
3 changes: 3 additions & 0 deletions src/Exceptionless.Web/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Exceptionless.Tests")]
10 changes: 5 additions & 5 deletions src/Exceptionless.Web/Utility/Handlers/OverageMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ public async Task Invoke(HttpContext context)
bool tooBig = false;
if (String.Equals(context.Request.Method, "POST", StringComparison.OrdinalIgnoreCase) && context.Request.Headers is not null)
{
if (context.Request.Headers.ContentLength is <= 0)
long? contentLength = context.Request.Headers.ContentLength;
if (contentLength is <= 0)
{
AppDiagnostics.PostsBlocked.Add(1);
context.Response.StatusCode = StatusCodes.Status411LengthRequired;
return;
}

long size = context.Request.Headers.ContentLength.GetValueOrDefault();
long size = contentLength.GetValueOrDefault();
if (size > 0)
AppDiagnostics.PostsSize.Record(size);

Expand All @@ -71,7 +72,6 @@ public async Task Invoke(HttpContext context)
}
}


// block large submissions, client should break them up or remove some of the data.
if (tooBig)
{
Expand All @@ -90,9 +90,9 @@ public async Task Invoke(HttpContext context)
return;
}

// if user auth, check to see if the org is suspended
// if user auth, check to see if the organization is suspended
// api tokens are marked as suspended immediately
if (context.Request.GetAuthType() == AuthType.User)
if (context.Request.GetAuthType() is AuthType.User)
{
AppDiagnostics.PostsBlocked.Add(1);
var organization = await _organizationRepository.GetByIdAsync(organizationId, o => o.Cache());
Expand Down
Loading