Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,8 @@ Mono<MetaResponse> meta(MetaRequest request) {
Mono<ReadResponse> read(ReadRequest request) {
return get(request, ReadResponse.class, "read", request.getSourceId()).checkpoint();
}

Mono<ReadResponse> recentLogs(ReadRequest request) {
return get(request, ReadResponse.class, "read", request.getSourceId()).checkpoint();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public Mono<ReadResponse> read(ReadRequest request) {
return getReactorLogCacheEndpoints().read(request);
}

@Override
public Mono<ReadResponse> recentLogs(ReadRequest request) {
return getReactorLogCacheEndpoints().recentLogs(request);
}

/**
* The connection context
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public abstract class AbstractUaaTokenProvider implements TokenProvider {

private static final ZoneId UTC = ZoneId.of("UTC");

protected final ConcurrentMap<ConnectionContext, Mono<String>> accessTokens =
private final ConcurrentMap<ConnectionContext, Mono<String>> accessTokens =
new ConcurrentHashMap<>(1);

private final ConcurrentMap<ConnectionContext, RefreshToken> refreshTokenStreams =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.cloudfoundry.reactor.tokenprovider;

import org.cloudfoundry.reactor.ConnectionContext;
import org.cloudfoundry.reactor.TokenProvider;
import org.immutables.value.Value;
import reactor.netty.http.client.HttpClientForm;
Expand All @@ -36,10 +35,4 @@ void tokenRequestTransformer(HttpClientRequest request, HttpClientForm form) {
.attr("grant_type", "client_credentials")
.attr("response_type", "token");
}

@Override
public void invalidate(ConnectionContext connectionContext) {
this.accessTokens.remove(connectionContext);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ public interface DopplerClient {
*
* @param request the Recent Logs request
* @return the events from the recent logs
* @deprecated Use {@link org.cloudfoundry.logcache.v1.LogCacheClient#recentLogs(org.cloudfoundry.logcache.v1.ReadRequest)} instead.
* The Doppler recent logs endpoint has been removed since {@code Loggregator 107.0},
* shipped in {@code CFD 24.3}.
* @see org.cloudfoundry.logcache.v1.LogCacheClient#recentLogs(org.cloudfoundry.logcache.v1.ReadRequest)
*/
@Deprecated
Flux<Envelope> recentLogs(RecentLogsRequest request);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,12 @@ public interface LogCacheClient {
* @return the read response
*/
Mono<ReadResponse> read(ReadRequest request);

/**
* Makes the Log Cache RecentLogs /api/v1/read request
*
* @param request the Recent Logs request
* @return the events from the recent logs
*/
Mono<ReadResponse> recentLogs(ReadRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.cloudfoundry.client.v3.spaces.ListSpacesRequest;
import org.cloudfoundry.client.v3.spaces.SpaceResource;
import org.cloudfoundry.doppler.DopplerClient;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.networking.NetworkingClient;
import org.cloudfoundry.operations.advanced.Advanced;
import org.cloudfoundry.operations.advanced.DefaultAdvanced;
Expand Down Expand Up @@ -79,7 +80,7 @@ public Advanced advanced() {
@Override
@Value.Derived
public Applications applications() {
return new DefaultApplications(getCloudFoundryClientPublisher(), getDopplerClientPublisher(), getSpaceId());
return new DefaultApplications(getCloudFoundryClientPublisher(), getDopplerClientPublisher(), getLogCacheClientPublisher(), getSpaceId());
}

@Override
Expand Down Expand Up @@ -197,6 +198,19 @@ Mono<DopplerClient> getDopplerClientPublisher() {
.orElse(Mono.error(new IllegalStateException("DopplerClient must be set")));
}

/**
* The {@link LogCacheClient} to use for operations functionality
*/
@Nullable
abstract LogCacheClient getLogCacheClient();

@Value.Derived
Mono<LogCacheClient> getLogCacheClientPublisher() {
return Optional.ofNullable(getLogCacheClient())
.map(Mono::just)
.orElse(Mono.error(new IllegalStateException("LogCacheClient must be set")));
}

/**
* The {@link NetworkingClient} to use for operations functionality
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.cloudfoundry.operations.applications;

import org.cloudfoundry.doppler.LogMessage;
import org.cloudfoundry.logcache.v1.Log;
import org.cloudfoundry.logcache.v1.ReadRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -126,6 +128,15 @@ public interface Applications {
@Deprecated
Flux<LogMessage> logs(LogsRequest request);

/**
* List the applications logs from logCacheClient.
* If no messages are available, an empty Flux is returned.
*
* @param request the application logs request
* @return the applications logs
*/
Flux<Log> logsRecent(ReadRequest request);

/**
* List the applications logs.
* Only works with {@code Loggregator < 107.0}, shipped in {@code CFD < 24.3}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@
import org.cloudfoundry.doppler.LogMessage;
import org.cloudfoundry.doppler.RecentLogsRequest;
import org.cloudfoundry.doppler.StreamRequest;
import org.cloudfoundry.logcache.v1.EnvelopeBatch;
import org.cloudfoundry.logcache.v1.Log;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.operations.util.OperationsLogging;
import org.cloudfoundry.util.DateUtils;
import org.cloudfoundry.util.DelayTimeoutException;
Expand Down Expand Up @@ -200,6 +204,10 @@ public final class DefaultApplications implements Applications {
private static final Comparator<LogMessage> LOG_MESSAGE_COMPARATOR =
Comparator.comparing(LogMessage::getTimestamp);

private static final Comparator<org.cloudfoundry.logcache.v1.Envelope>
LOG_MESSAGE_COMPARATOR_LOG_CACHE =
Comparator.comparing(org.cloudfoundry.logcache.v1.Envelope::getTimestamp);

private static final Duration LOG_MESSAGE_TIMESPAN = Duration.ofMillis(500);

private static final int MAX_NUMBER_OF_RECENT_EVENTS = 50;
Expand All @@ -214,24 +222,29 @@ public final class DefaultApplications implements Applications {

private final Mono<DopplerClient> dopplerClient;

private final Mono<LogCacheClient> logCacheClient;

private final RandomWords randomWords;

private final Mono<String> spaceId;

public DefaultApplications(
Mono<CloudFoundryClient> cloudFoundryClient,
Mono<DopplerClient> dopplerClient,
Mono<LogCacheClient> logCacheClient,
Mono<String> spaceId) {
this(cloudFoundryClient, dopplerClient, new WordListRandomWords(), spaceId);
this(cloudFoundryClient, dopplerClient, logCacheClient, new WordListRandomWords(), spaceId);
}

DefaultApplications(
Mono<CloudFoundryClient> cloudFoundryClient,
Mono<DopplerClient> dopplerClient,
Mono<LogCacheClient> logCacheClient,
RandomWords randomWords,
Mono<String> spaceId) {
this.cloudFoundryClient = cloudFoundryClient;
this.dopplerClient = dopplerClient;
this.logCacheClient = logCacheClient;
this.randomWords = randomWords;
this.spaceId = spaceId;
}
Expand Down Expand Up @@ -529,6 +542,7 @@ public Flux<Task> listTasks(ListApplicationTasksRequest request) {
.checkpoint();
}

@Deprecated
@Override
public Flux<LogMessage> logs(LogsRequest request) {
return Mono.zip(this.cloudFoundryClient, this.spaceId)
Expand All @@ -544,6 +558,13 @@ public Flux<LogMessage> logs(LogsRequest request) {
.checkpoint();
}

@Override
public Flux<Log> logsRecent(ReadRequest request) {
return getRecentLogsLogCache(this.logCacheClient, request)
.transform(OperationsLogging.log("Get Application Logs"))
.checkpoint();
}

@Override
public Flux<ApplicationLog> logs(ApplicationLogsRequest request) {
return logs(LogsRequest.builder()
Expand Down Expand Up @@ -673,7 +694,6 @@ public Mono<Void> pushManifestV3(PushManifestV3Request request) {
} catch (IOException e) {
throw new RuntimeException("Could not serialize manifest", e);
}

return Mono.zip(this.cloudFoundryClient, this.spaceId)
.flatMap(
function(
Expand Down Expand Up @@ -1617,6 +1637,17 @@ private static Flux<LogMessage> getLogs(
}
}

private static Flux<Log> getRecentLogsLogCache(
Mono<LogCacheClient> logCacheClient, ReadRequest readRequest) {
return requestLogsRecentLogCache(logCacheClient, readRequest)
.map(EnvelopeBatch::getBatch)
.map(List::stream)
.flatMapIterable(envelopeStream -> envelopeStream.collect(Collectors.toList()))
.filter(e -> e.getLog() != null)
.sort(LOG_MESSAGE_COMPARATOR_LOG_CACHE)
.map(org.cloudfoundry.logcache.v1.Envelope::getLog);
}

@SuppressWarnings("unchecked")
private static Map<String, Object> getMetadataRequest(EventEntity entity) {
Map<String, Optional<Object>> metadata =
Expand Down Expand Up @@ -2501,6 +2532,7 @@ private static Flux<TaskResource> requestListTasks(
.build()));
}

@Deprecated
private static Flux<Envelope> requestLogsRecent(
Mono<DopplerClient> dopplerClient, String applicationId) {
return dopplerClient.flatMapMany(
Expand All @@ -2509,6 +2541,14 @@ private static Flux<Envelope> requestLogsRecent(
RecentLogsRequest.builder().applicationId(applicationId).build()));
}

private static Mono<EnvelopeBatch> requestLogsRecentLogCache(
Mono<LogCacheClient> logCacheClient, ReadRequest readRequest) {
return logCacheClient.flatMap(
client ->
client.recentLogs(readRequest)
.flatMap(response -> Mono.justOrEmpty(response.getEnvelopes())));
}

private static Flux<Envelope> requestLogsStream(
Mono<DopplerClient> dopplerClient, String applicationId) {
return dopplerClient.flatMapMany(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.cloudfoundry.client.v3.stacks.StacksV3;
import org.cloudfoundry.client.v3.tasks.Tasks;
import org.cloudfoundry.doppler.DopplerClient;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.routing.RoutingClient;
import org.cloudfoundry.routing.v1.routergroups.RouterGroups;
import org.cloudfoundry.uaa.UaaClient;
Expand Down Expand Up @@ -104,6 +105,8 @@ public abstract class AbstractOperationsTest {

protected final DopplerClient dopplerClient = mock(DopplerClient.class, RETURNS_SMART_NULLS);

protected final LogCacheClient logCacheClient = mock(LogCacheClient.class, RETURNS_SMART_NULLS);

protected final Events events = mock(Events.class, RETURNS_SMART_NULLS);

protected final FeatureFlags featureFlags = mock(FeatureFlags.class, RETURNS_SMART_NULLS);
Expand Down
Loading
Loading