diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java index d0c3f9f7c1..a622311a00 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java @@ -33,12 +33,16 @@ import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.Set; public class TaskRuntime> extends StatedPersistentBase { + private static final Logger LOG = LoggerFactory.getLogger(TaskRuntime.class); + private final SimpleFuture future = new SimpleFuture(); private final TaskStatusMachine statusMachine = new TaskStatusMachine(); private OptimizingTaskId taskId; @@ -135,7 +139,23 @@ void schedule(OptimizerThread thread) { void ack(OptimizerThread thread) { invokeConsistency( () -> { - validThread(thread); + // If task was reset (token cleared) due to optimizer expiry, the ack is stale. + // Log a warning and skip this ack instead of throwing exception to avoid blocking + // the optimizing process. + if (token == null) { + LOG.warn( + "Ignoring stale ack for task {} because it has been reset (optimizer expired). " + + "The task should have been retried by OptimizerKeeper.", + taskId); + return; + } + + // For non-stale acks, validate thread match + if (!thread.getToken().equals(getToken()) || thread.getThreadId() != threadId) { + throw new TaskRuntimeException( + "The optimizer thread does not match, the thread in the task is OptimizerThread(token=%s, threadId=%s), and the thread in the request is OptimizerThread(token=%s, threadId=%s).", + getToken(), threadId, thread.getToken(), thread.getThreadId()); + } statusMachine.accept(Status.ACKED); persistTaskRuntime(); }); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index 417150b390..33a4ad49e2 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -42,6 +42,7 @@ import org.apache.amoro.optimizing.TableOptimizing; import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.resource.ResourceGroup; +import org.apache.amoro.server.optimizing.OptimizingQueue; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.optimizing.TaskRuntime; import org.apache.amoro.server.persistence.SqlSessionFactoryProvider; @@ -70,6 +71,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -830,4 +832,39 @@ public void run() { } } } + + /** + * Test that stale ack after task reset does not throw exception. Uses reflection to directly + * reset the task via the queue, simulating the race condition where the task is reset + * (token=null) but the optimizer is still registered. + */ + @Test + public void testStaleAckAfterTaskReset() throws Exception { + OptimizingTask task = optimizingService().pollTask(token, THREAD_ID); + Assertions.assertNotNull(task); + assertTaskStatus(TaskRuntime.Status.SCHEDULED); + + // Reset the task via queue to simulate: task retried by OptimizerKeeper but optimizer still + // alive + Method getQueueByToken = + DefaultOptimizingService.class.getDeclaredMethod("getQueueByToken", String.class); + getQueueByToken.setAccessible(true); + OptimizingQueue queue = (OptimizingQueue) getQueueByToken.invoke(optimizingService(), token); + queue.collectTasks(t -> t.getTaskId().equals(task.getTaskId())).forEach(queue::retryTask); + + assertTaskStatus(TaskRuntime.Status.PLANNED); + + // Ack with still-valid old token → reaches TaskRuntime.ack() with token==null, gracefully + // ignored + optimizingService().ackTask(token, THREAD_ID, task.getTaskId()); + + assertTaskStatus(TaskRuntime.Status.PLANNED); + + OptimizingTask task2 = optimizingService().pollTask(token, THREAD_ID); + Assertions.assertEquals(task.getTaskId(), task2.getTaskId()); + + optimizingService().ackTask(token, THREAD_ID, task2.getTaskId()); + optimizingService().completeTask(token, buildOptimizingTaskResult(task2.getTaskId())); + assertTaskCompleted(optimizingService().listTasks(defaultResourceGroup().getName()).get(0)); + } }