Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Set;

public class TaskRuntime<T extends StagedTaskDescriptor<?, ?, ?>> extends StatedPersistentBase {

private static final Logger LOG = LoggerFactory.getLogger(TaskRuntime.class);

private final SimpleFuture future = new SimpleFuture();
private final TaskStatusMachine statusMachine = new TaskStatusMachine();
private OptimizingTaskId taskId;
Expand Down Expand Up @@ -135,7 +139,23 @@ void schedule(OptimizerThread thread) {
void ack(OptimizerThread thread) {
invokeConsistency(
() -> {
validThread(thread);
// If task was reset (token cleared) due to optimizer expiry, the ack is stale.
// Log a warning and skip this ack instead of throwing exception to avoid blocking
// the optimizing process.
if (token == null) {
LOG.warn(
"Ignoring stale ack for task {} because it has been reset (optimizer expired). "
+ "The task should have been retried by OptimizerKeeper.",
taskId);
return;
}

// For non-stale acks, validate thread match
if (!thread.getToken().equals(getToken()) || thread.getThreadId() != threadId) {
throw new TaskRuntimeException(
"The optimizer thread does not match, the thread in the task is OptimizerThread(token=%s, threadId=%s), and the thread in the request is OptimizerThread(token=%s, threadId=%s).",
getToken(), threadId, thread.getToken(), thread.getThreadId());
}
statusMachine.accept(Status.ACKED);
persistTaskRuntime();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.amoro.optimizing.TableOptimizing;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.optimizing.OptimizingQueue;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
Expand Down Expand Up @@ -70,6 +71,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -830,4 +832,39 @@ public void run() {
}
}
}

/**
* Test that stale ack after task reset does not throw exception. Uses reflection to directly
* reset the task via the queue, simulating the race condition where the task is reset
* (token=null) but the optimizer is still registered.
*/
@Test
public void testStaleAckAfterTaskReset() throws Exception {
OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
Assertions.assertNotNull(task);
assertTaskStatus(TaskRuntime.Status.SCHEDULED);

// Reset the task via queue to simulate: task retried by OptimizerKeeper but optimizer still
// alive
Method getQueueByToken =
DefaultOptimizingService.class.getDeclaredMethod("getQueueByToken", String.class);
getQueueByToken.setAccessible(true);
OptimizingQueue queue = (OptimizingQueue) getQueueByToken.invoke(optimizingService(), token);
queue.collectTasks(t -> t.getTaskId().equals(task.getTaskId())).forEach(queue::retryTask);

assertTaskStatus(TaskRuntime.Status.PLANNED);

// Ack with still-valid old token → reaches TaskRuntime.ack() with token==null, gracefully
// ignored
optimizingService().ackTask(token, THREAD_ID, task.getTaskId());

assertTaskStatus(TaskRuntime.Status.PLANNED);

OptimizingTask task2 = optimizingService().pollTask(token, THREAD_ID);
Assertions.assertEquals(task.getTaskId(), task2.getTaskId());

optimizingService().ackTask(token, THREAD_ID, task2.getTaskId());
optimizingService().completeTask(token, buildOptimizingTaskResult(task2.getTaskId()));
assertTaskCompleted(optimizingService().listTasks(defaultResourceGroup().getName()).get(0));
}
}
Loading