Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.Utils;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;

public class MDCUtils {

Expand All @@ -34,6 +35,38 @@ public class MDCUtils {
private static final boolean enabled =
Utils.getBooleanFromSystemPropsOrDefault(Utils.USE_MDC_ENV_KEY, true);

private static final String INFORMER_EVENT_RESOURCE_NAME = "informer.event.resource.name";
private static final String INFORMER_EVENT_RESOURCE_NAMESPACE =
"informer.event.resource.namespace";
private static final String INFORMER_EVENT_RESOURCE_KIND = "informer.event.resource.kind";
private static final String INFORMER_EVENT_RESOURCE_VERSION =
"informer.event.resource.resourceVersion";
private static final String INFORMER_EVENT_ACTION = "informer.event.action";
private static final String INFORMER_NAME = "informer.name";

public static void addInformerEventInfo(
HasMetadata resource, ResourceAction action, String eventSourceName) {
if (enabled) {
MDC.put(INFORMER_EVENT_RESOURCE_NAME, resource.getMetadata().getName());
MDC.put(INFORMER_EVENT_RESOURCE_NAMESPACE, resource.getMetadata().getNamespace());
MDC.put(INFORMER_EVENT_RESOURCE_KIND, HasMetadata.getKind(resource.getClass()));
MDC.put(INFORMER_EVENT_RESOURCE_VERSION, resource.getMetadata().getNamespace());
MDC.put(INFORMER_EVENT_ACTION, action.name());
MDC.put(INFORMER_NAME, eventSourceName);
}
}

public static void removeInformerEventInfo() {
if (enabled) {
MDC.remove(INFORMER_EVENT_RESOURCE_NAME);
MDC.remove(INFORMER_EVENT_RESOURCE_NAMESPACE);
MDC.remove(INFORMER_EVENT_RESOURCE_KIND);
MDC.remove(INFORMER_EVENT_RESOURCE_VERSION);
MDC.remove(INFORMER_EVENT_ACTION);
MDC.remove(INFORMER_NAME);
}
}

public static void addResourceIDInfo(ResourceID resourceID) {
if (enabled) {
MDC.put(NAME, resourceID.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class Event {

private final ResourceID relatedCustomResource;
protected final ResourceID relatedCustomResource;

public Event(ResourceID targetCustomResource) {
this.relatedCustomResource = targetCustomResource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,18 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso

@Override
public synchronized void onAdd(T resource) {
handleOnAddOrUpdate(ResourceAction.ADDED, null, resource);
withMDC(
resource,
ResourceAction.ADDED,
() -> handleOnAddOrUpdate(ResourceAction.ADDED, null, resource));
}

@Override
public synchronized void onUpdate(T oldCustomResource, T newCustomResource) {
handleOnAddOrUpdate(ResourceAction.UPDATED, oldCustomResource, newCustomResource);
withMDC(
newCustomResource,
ResourceAction.UPDATED,
() -> handleOnAddOrUpdate(ResourceAction.UPDATED, oldCustomResource, newCustomResource));
}

private void handleOnAddOrUpdate(
Expand All @@ -160,10 +166,16 @@ private void handleOnAddOrUpdate(

@Override
public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown) {
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
// delete event is quite special here, that requires special care, since we clean up caches on
// delete event.
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown);
withMDC(
resource,
ResourceAction.DELETED,
() -> {
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
// delete event is quite special here, that requires special care, since we clean up
// caches on
// delete event.
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

public class ResourceEvent extends Event {

private final ResourceAction action;
private final HasMetadata resource;
protected final ResourceAction action;
protected final HasMetadata resource;

public ResourceEvent(ResourceAction action, ResourceID resourceID, HasMetadata resource) {
super(resourceID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Objects;
import java.util.Optional;

import io.fabric8.kubernetes.api.model.HasMetadata;
Expand All @@ -25,7 +26,7 @@
/** Used only for resource event filtering. */
public class ExtendedResourceEvent extends ResourceEvent {

private HasMetadata previousResource;
private final HasMetadata previousResource;

public ExtendedResourceEvent(
ResourceAction action,
Expand All @@ -39,4 +40,31 @@ public ExtendedResourceEvent(
public Optional<HasMetadata> getPreviousResource() {
return Optional.ofNullable(previousResource);
}

@Override
public String toString() {
return "ExtendedResourceEvent{"
+ "previousResourceVersion="
+ previousResource.getMetadata().getResourceVersion()
+ ", action="
+ action
+ ", resourceVersion="
+ resource.getMetadata().getResourceVersion()
+ ", relatedCustomResource="
+ relatedCustomResource
+ '}';
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
ExtendedResourceEvent that = (ExtendedResourceEvent) o;
return Objects.equals(previousResource, that.previousResource);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), previousResource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,42 +100,58 @@ private InformerEventSource(

@Override
public void onAdd(R newResource) {
if (log.isDebugEnabled()) {
log.debug(
"On add event received for resource id: {} type: {} version: {}",
ResourceID.fromResource(newResource),
resourceType().getSimpleName(),
newResource.getMetadata().getResourceVersion());
}
onAddOrUpdate(ResourceAction.ADDED, newResource, null);
withMDC(
newResource,
ResourceAction.ADDED,
() -> {
if (log.isDebugEnabled()) {
log.debug(
"On add event received for resource id: {} type: {} version: {}",
ResourceID.fromResource(newResource),
resourceType().getSimpleName(),
newResource.getMetadata().getResourceVersion());
}
onAddOrUpdate(ResourceAction.ADDED, newResource, null);
});
}

@Override
public void onUpdate(R oldObject, R newObject) {
if (log.isDebugEnabled()) {
log.debug(
"On update event received for resource id: {} type: {} version: {} old version: {} ",
ResourceID.fromResource(newObject),
resourceType().getSimpleName(),
newObject.getMetadata().getResourceVersion(),
oldObject.getMetadata().getResourceVersion());
}
onAddOrUpdate(ResourceAction.UPDATED, newObject, oldObject);
withMDC(
newObject,
ResourceAction.UPDATED,
() -> {
if (log.isDebugEnabled()) {
log.debug(
"On update event received for resource id: {} type: {} version: {} old version: {}"
+ " ",
ResourceID.fromResource(newObject),
resourceType().getSimpleName(),
newObject.getMetadata().getResourceVersion(),
oldObject.getMetadata().getResourceVersion());
}
onAddOrUpdate(ResourceAction.UPDATED, newObject, oldObject);
});
}

@Override
public synchronized void onDelete(R resource, boolean b) {
if (log.isDebugEnabled()) {
log.debug(
"On delete event received for resource id: {} type: {}",
ResourceID.fromResource(resource),
resourceType().getSimpleName());
}
primaryToSecondaryIndex.onDelete(resource);
temporaryResourceCache.onDeleteEvent(resource, b);
if (acceptedByDeleteFilters(resource, b)) {
propagateEvent(resource);
}
withMDC(
resource,
ResourceAction.DELETED,
() -> {
if (log.isDebugEnabled()) {
log.debug(
"On delete event received for resource id: {} type: {}",
ResourceID.fromResource(resource),
resourceType().getSimpleName());
}
primaryToSecondaryIndex.onDelete(resource);
temporaryResourceCache.onDeleteEvent(resource, b);
if (acceptedByDeleteFilters(resource, b)) {
propagateEvent(resource);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.*;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
Expand Down Expand Up @@ -93,11 +94,12 @@ public void changeNamespaces(Set<String> namespaces) {
@SuppressWarnings("unchecked")
public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<R> updateMethod) {
ResourceID id = ResourceID.fromResource(resourceToUpdate);
log.debug("Update and cache: {}", id);
log.debug("Starting event filter and cache update for: {}", id);
R updatedResource = null;
try {
temporaryResourceCache.startEventFilteringModify(id);
updatedResource = updateMethod.apply(resourceToUpdate);
log.debug("Resource update successful: {}", id);
handleRecentResourceUpdate(id, updatedResource, resourceToUpdate);
return updatedResource;
} finally {
Expand All @@ -124,6 +126,14 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
: null;
R prevVersionOfResource =
updatedForLambda != null ? updatedForLambda : extendedResourcePrevVersion;
if (log.isDebugEnabled()) {
log.debug(
"Extended previous resource version: {} resource from update present: {}"
+ " extendedPrevResource present: {}",
prevVersionOfResource.getMetadata().getResourceVersion(),
updatedForLambda != null,
extendedResourcePrevVersion != null);
}
handleEvent(
r.getAction(),
latestResource,
Expand Down Expand Up @@ -257,4 +267,13 @@ public String toString() {
public void setControllerConfiguration(ControllerConfiguration<R> controllerConfiguration) {
this.controllerConfiguration = controllerConfiguration;
}

protected void withMDC(R resource, ResourceAction action, Runnable runnable) {
try {
MDCUtils.addInformerEventInfo(resource, action, name());
runnable.run();
} finally {
MDCUtils.removeInformerEventInfo();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,19 @@ private synchronized EventHandling onEvent(
resource.getMetadata().getResourceVersion());
}
if (!unknownState) {
log.debug("Setting latest resource version to: {}", latestResourceVersion);
latestResourceVersion = resource.getMetadata().getResourceVersion();
}
var cached = cache.get(resourceId);
EventHandling result = EventHandling.NEW;
if (cached != null) {
int comp = ReconcilerUtilsInternal.compareResourceVersions(resource, cached);
if (comp >= 0 || unknownState) {
log.debug(
"Removing resource from temp cache. id: {} comparison: {} unknown state: {}",
resourceId,
comp,
unknownState);
cache.remove(resourceId);
// we propagate event only for our update or newer other can be discarded since we know we
// will receive
Expand All @@ -151,6 +157,7 @@ private synchronized EventHandling onEvent(
}
var ed = activeUpdates.get(resourceId);
if (ed != null && result != EventHandling.OBSOLETE) {
log.debug("Setting last event for id: {} delete: {}", resourceId, delete);
ed.setLastEvent(
delete
? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState)
Expand Down
2 changes: 1 addition & 1 deletion operator-framework/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<Configuration name="TestConfig" status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d %threadId %-30c{1.} [%-5level] %msg%n%throwable"/>
<PatternLayout pattern="%d %threadId %-30c{1.} [%-5level] %msg [%X{resource.namespace}/%X{resource.name} %X{resource.kind}][%X{informer.name}:%X{informer.event.action}:%X{informer.event.resource.name}::%X{informer.event.resource.resourceVersion}] %n%throwable "/>
</Console>
</Appenders>
<Loggers>
Expand Down
Loading