Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions src/api/Elastic.Documentation.Api.Core/Changes/ChangesUsecase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Buffers;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;

namespace Elastic.Documentation.Api.Core.Changes;

/// <summary>Use case for the documentation changes feed.</summary>
public partial class ChangesUsecase(IChangesGateway changesGateway, ILogger<ChangesUsecase> logger)
{
public async Task<ChangesApiResponse> GetChangesAsync(ChangesApiRequest request, Cancel ctx = default)
{
var cursor = DecodeCursor(request.Cursor);
var pageSize = Math.Clamp(request.PageSize, 1, ChangesDefaults.MaxPageSize);

var result = await changesGateway.GetChangesAsync(
new ChangesRequest
{
Since = request.Since,
PageSize = pageSize,
Cursor = cursor
},
ctx
);

var nextCursor = result.NextCursor is not null
? EncodeCursor(result.NextCursor)
: null;

var hasMore = nextCursor is not null;

LogChanges(logger, request.Since, result.Pages.Count, hasMore);

return new ChangesApiResponse
{
Pages = result.Pages,
HasMore = hasMore,
NextCursor = nextCursor
};
}

private static ChangesPageCursor? DecodeCursor(string? cursor)
{
if (string.IsNullOrWhiteSpace(cursor))
return null;

try
{
var remainder = cursor.Length % 4;
var paddingLength = (4 - remainder) % 4;
var base64 = cursor
.Replace('-', '+')
.Replace('_', '/')
+ new string('=', paddingLength);

var json = Encoding.UTF8.GetString(Convert.FromBase64String(base64));
using var doc = JsonDocument.Parse(json);
var root = doc.RootElement;
var arrayLength = root.GetArrayLength();
if (root.ValueKind != JsonValueKind.Array || arrayLength < 2)
return null;

var epochEl = root[0];
var urlEl = root[1];
if (epochEl.ValueKind != JsonValueKind.Number || urlEl.ValueKind != JsonValueKind.String)
return null;

return new ChangesPageCursor(epochEl.GetInt64(), urlEl.GetString()!);
}
catch (Exception ex) when (ex is FormatException or JsonException or InvalidOperationException)
{
return null;
}
}

private static string EncodeCursor(ChangesPageCursor cursor)
{
var buffer = new ArrayBufferWriter<byte>();
using var writer = new Utf8JsonWriter(buffer);
writer.WriteStartArray();
writer.WriteNumberValue(cursor.LastUpdatedEpochMs);
writer.WriteStringValue(cursor.Url);
writer.WriteEndArray();
writer.Flush();

return Convert.ToBase64String(buffer.WrittenSpan)
.TrimEnd('=')
.Replace('+', '-')
.Replace('/', '_');
}

[LoggerMessage(Level = LogLevel.Information,
Message = "Changes feed returned {Count} pages since {Since} (hasMore: {HasMore})")]
private static partial void LogChanges(ILogger logger, DateTimeOffset since, int count, bool hasMore);
}

/// <summary>API request for the changes feed endpoint.</summary>
public record ChangesApiRequest
{
public required DateTimeOffset Since { get; init; }
public int PageSize { get; init; } = ChangesDefaults.PageSize;
public string? Cursor { get; init; }
}

/// <summary>API response for the changes feed endpoint.</summary>
public record ChangesApiResponse
{
public required IReadOnlyList<ChangedPageDto> Pages { get; init; }
public required bool HasMore { get; init; }
public string? NextCursor { get; init; }
}

/// <summary>A single changed page in the API response.</summary>
public record ChangedPageDto
{
public required string Url { get; init; }
public required string Title { get; init; }
public required DateTimeOffset LastUpdated { get; init; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

namespace Elastic.Documentation.Api.Core.Changes;

/// <summary>Gateway interface for querying documentation page changes.</summary>
public interface IChangesGateway
{
Task<ChangesResult> GetChangesAsync(ChangesRequest request, Cancel ctx = default);
}

/// <summary>Internal request for the changes gateway.</summary>
public record ChangesRequest
{
public required DateTimeOffset Since { get; init; }
public int PageSize { get; init; } = ChangesDefaults.PageSize;
public ChangesPageCursor? Cursor { get; init; }
}

/// <summary>Internal result from the changes gateway.</summary>
public record ChangesResult
{
public required IReadOnlyList<ChangedPageDto> Pages { get; init; }
public ChangesPageCursor? NextCursor { get; init; }
}

/// <summary>Cursor for search_after pagination over changed pages.</summary>
public record ChangesPageCursor(long LastUpdatedEpochMs, string Url);

/// <summary>Shared defaults for the changes feed.</summary>
public static class ChangesDefaults
{
public const int PageSize = 100;
public const int MaxPageSize = 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System.Text.Json.Serialization;
using Elastic.Documentation.Api.Core.AskAi;
using Elastic.Documentation.Api.Core.Changes;
using Elastic.Documentation.Api.Core.Search;

namespace Elastic.Documentation.Api.Core;
Expand Down Expand Up @@ -31,5 +32,8 @@ public record OutputMessage(string Role, MessagePart[] Parts, string FinishReaso
[JsonSerializable(typeof(FullSearchApiRequest))]
[JsonSerializable(typeof(FullSearchApiResponse))]
[JsonSerializable(typeof(FullSearchAggregations))]

[JsonSerializable(typeof(ChangesApiResponse))]
[JsonSerializable(typeof(ChangedPageDto))]
[JsonSourceGenerationOptions(PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase)]
public partial class ApiJsonContext : JsonSerializerContext;
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information

using Elastic.Documentation.Api.Core.AskAi;
using Elastic.Documentation.Api.Core.Changes;
using Elastic.Documentation.Api.Core.Search;
using Elastic.Documentation.Api.Core.Telemetry;
using Elastic.Documentation.Configuration.Products;
Expand All @@ -23,6 +24,7 @@ public static void MapElasticDocsApiEndpoints(this IEndpointRouteBuilder group,
MapAskAiEndpoint(group);
MapNavigationSearch(group);
MapFullSearch(group);
MapChanges(group);
if (mapOtlpEndpoints)
MapOtlpProxyEndpoint(group);
}
Expand Down Expand Up @@ -109,6 +111,26 @@ Cancel ctx
});
}

private static void MapChanges(IEndpointRouteBuilder group) =>
group.MapGet("/changes",
async (
[FromQuery(Name = "since")] DateTimeOffset since,
[FromQuery(Name = "cursor")] string? cursor,
[FromQuery(Name = "size")] int? pageSize,
ChangesUsecase changesUsecase,
Cancel ctx
) =>
{
var request = new ChangesApiRequest
{
Since = since,
PageSize = pageSize ?? ChangesDefaults.PageSize,
Cursor = cursor
};
var response = await changesUsecase.GetChangesAsync(request, ctx);
return Results.Ok(response);
});

private static void MapOtlpProxyEndpoint(IEndpointRouteBuilder group)
{
// Use /o/* to avoid adblocker detection (common blocklists target /otlp, /telemetry, etc.)
Expand Down
152 changes: 152 additions & 0 deletions src/services/Elastic.Documentation.Search/ChangesGateway.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using Elastic.Clients.Elasticsearch;
using Elastic.Documentation.Api.Core.Changes;
using Elastic.Documentation.Search.Common;
using Microsoft.Extensions.Logging;

namespace Elastic.Documentation.Search;

/// <summary>
/// Elasticsearch gateway for the documentation changes feed.
/// Queries last_updated > since with search_after cursor pagination.
/// Uses a shared Point In Time (PIT) for consistent pagination across requests.
/// </summary>
public partial class ChangesGateway(
ElasticsearchClientAccessor clientAccessor,
SharedPointInTimeManager pitManager,
ILogger<ChangesGateway> logger
) : IChangesGateway
{
public async Task<ChangesResult> GetChangesAsync(ChangesRequest request, Cancel ctx = default)
{
var fetchSize = request.PageSize + 1;

try
{
var pitId = await pitManager.GetPitIdAsync(ctx);

var response = await Search(request, pitId, fetchSize, ctx);

if (!response.IsValidResponse && IsExpiredPit(response))
{
LogPitExpired(logger);
pitId = await pitManager.GetPitIdAsync(ctx, expiredPitId: pitId);
response = await Search(request, pitId, fetchSize, ctx);
}

if (!response.IsValidResponse)
{
var reason = response.ElasticsearchServerError?.Error.Reason ?? "Unknown";
throw new InvalidOperationException(
$"Elasticsearch changes query failed (HTTP {response.ApiCallDetails?.HttpStatusCode}): {reason}"
);
}

pitManager.RefreshKeepAlive();

return BuildResult(response, request.PageSize);
}
catch (Exception ex)
{
logger.LogError(ex, "Error querying Elasticsearch for changes since {Since}", request.Since);
throw;
}
}

private async Task<SearchResponse<DocumentationDocument>> Search(
ChangesRequest request, string pitId, int fetchSize, Cancel ctx
) =>
await clientAccessor.Client.SearchAsync<DocumentationDocument>(s =>
{
_ = s
.Size(fetchSize)
.TrackTotalHits(t => t.Enabled(false))
.Pit(p => p.Id(pitId).KeepAlive(SharedPointInTimeManager.PitKeepAlive))
.Query(q => q.Range(r => r
.Date(dr => dr
.Field(f => f.LastUpdated)
.Gt(request.Since.ToString("O"))
)
))
.Sort(
so => so.Field(f => f.LastUpdated, sf => sf.Order(SortOrder.Asc)),
so => so.Field(f => f.Url, sf => sf.Order(SortOrder.Asc))
)
.Source(sf => sf
.Filter(f => f
.Includes(
e => e.Url,
e => e.Title,
e => e.SearchTitle,
e => e.Type,
e => e.LastUpdated
)
)
);

if (request.Cursor is { } cursor)
{
_ = s.SearchAfter(
FieldValue.Long(cursor.LastUpdatedEpochMs),
FieldValue.String(cursor.Url)
);
}
}, ctx);

private static bool IsExpiredPit(SearchResponse<DocumentationDocument> response) =>
response.ElasticsearchServerError?.Error.Type is "search_phase_execution_exception"
|| response.ElasticsearchServerError?.Error.Reason?.Contains("point in time", StringComparison.OrdinalIgnoreCase) == true
|| response.ElasticsearchServerError?.Error.Reason?.Contains("No search context found", StringComparison.OrdinalIgnoreCase) == true;

private static ChangesResult BuildResult(SearchResponse<DocumentationDocument> response, int pageSize)
{
var hits = response.Hits.ToList();
var hasMore = hits.Count > pageSize;

var pages = hits
.Take(pageSize)
.Where(h => h.Source is not null)
.Select(h =>
{
var doc = h.Source!;
return new ChangedPageDto
{
Url = doc.Url,
Title = doc.Title,
LastUpdated = doc.LastUpdated
};
})
.ToList();

var nextCursor = (ChangesPageCursor?)null;
if (hasMore)
{
var lastHit = hits[pageSize - 1];
if (lastHit.Sort is { Count: >= 2 })
{
var sortEpoch = lastHit.Sort.ElementAt(0);
var sortUrl = lastHit.Sort.ElementAt(1);

// ES returns date sort values as double (JSON has no int/float distinction)
var epochMs = sortEpoch.TryGetLong(out var l) ? l!.Value
: sortEpoch.TryGetDouble(out var d) ? (long)d!.Value
: default(long?);

if (epochMs is not null && sortUrl.TryGetString(out var url))
nextCursor = new ChangesPageCursor(epochMs.Value, url!);
}
}

return new ChangesResult
{
Pages = pages,
NextCursor = nextCursor
};
}

[LoggerMessage(Level = LogLevel.Warning, Message = "PIT expired or not found, opening a new one and retrying with existing search_after position")]
private static partial void LogPitExpired(ILogger logger);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using Elastic.Documentation.Api.Core.Changes;
using Elastic.Documentation.Api.Core.Search;
using Elastic.Documentation.Search.Common;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -36,6 +37,11 @@ public static IServiceCollection AddSearchServices(this IServiceCollection servi
_ = services.AddScoped<FullSearchUsecase>();
logger?.LogInformation("Full search use case registered with hybrid RRF support");

// Changes feed (cursor-paginated changes since a given date)
_ = services.AddSingleton<SharedPointInTimeManager>();
_ = services.AddScoped<IChangesGateway, ChangesGateway>();
_ = services.AddScoped<ChangesUsecase>();

return services;
}

Expand Down
Loading
Loading