diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java index 511e4fcea749a7..05784404c310aa 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java @@ -31,4 +31,6 @@ public class WriteRecordRequest extends JobBaseRecordRequest { private String token; private String taskId; private Map streamLoadProps; + // previous task ended abnormally, rebuild reader instead of reusing + private boolean rebuildReader; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java index 62adf21daf6014..224fe7c5dbb610 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java @@ -43,6 +43,8 @@ public abstract class AbstractStreamingTask { private static final int MAX_RETRY = 3; private static final String LABEL_SPLITTER = "_"; private int retryCount = 0; + // in-place retry would reuse this taskId, breaking ownership-based zombie isolation + protected volatile boolean noRetry; protected String labelName; protected Offset runningOffset; protected UserIdentity userIdentity; @@ -83,6 +85,14 @@ public List getScanBackendIds() { public abstract void closeOrReleaseResources(); + // Release the remote cdc reader (keep slot). No-op for tasks without a cdc reader (e.g. TVF). + public void releaseRemoteReader() { + } + + public long getRunningBackendId() { + return -1; + } + public void execute() throws JobException { while (retryCount <= MAX_RETRY) { try { @@ -96,8 +106,9 @@ public void execute() throws JobException { } this.errMsg = e.getMessage(); retryCount++; - if (retryCount > MAX_RETRY) { - log.error("Task execution failed after {} retries.", MAX_RETRY, e); + if (noRetry || retryCount > MAX_RETRY) { + log.error("Task execution failed, job id {}, task id {}, noRetry {}, retry {}.", + jobId, taskId, noRetry, retryCount, e); onFail(e.getMessage()); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 09090ba3ad184b..df07640370b81a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -75,6 +75,7 @@ import org.apache.doris.qe.ShowResultSetMetaData; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.system.Backend; import org.apache.doris.tablefunction.S3TableValuedFunction; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; @@ -180,6 +181,15 @@ public class StreamingInsertJob extends AbstractJob syncTables; + // Incremental(binlog) phase: bound BE for reader reuse; <=0 = unbound (replay-safe default). + @SerializedName("bbe") + private volatile long boundBackendId; + + // previous task ended abnormally (or FE restarted), next dispatch must rebuild the cdc reader + @Getter + @Setter + private transient volatile boolean needRebuildReader = true; + // The sampling window starts at the beginning of the sampling window. // If the error rate exceeds `max_filter_ratio` within the window, the sampling fails. @Setter @@ -393,6 +403,7 @@ private SourceOffsetProvider createOffsetProvider(Map jdbcSource provider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, jdbcSourceProps); } provider.setCloudCluster(this.cloudCluster); + provider.setBoundBackendId(boundBackendId); return provider; } @@ -646,6 +657,18 @@ offsetProvider, getConvertedSourceProperties(), targetDb, targetProperties, jobP getCreateUser(), cloudCluster); } + // Binlog phase: prefer the bound BE in selectBackend; rebind + persist on change. + public Backend resolveBoundBackend() throws JobException { + Backend selected = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId); + if (selected.getId() != boundBackendId) { + log.info("bind job {} to backend {} (was {})", getJobId(), selected.getId(), boundBackendId); + boundBackendId = selected.getId(); + logUpdateOperation(); + } + offsetProvider.setBoundBackendId(boundBackendId); + return selected; + } + private Map getConvertedSourceProperties() throws JobException { if (convertedSourceProperties == null) { this.convertedSourceProperties = StreamingJobUtils.convertCertFile(getDbId(), sourceProperties); @@ -770,6 +793,22 @@ public void clearRunningStreamTask(JobStatus newJobStatus) { } } + // Command entry for a manual status change: reset the failure/retry budget, and on manual pause + // release the reader (keep slot). "Manual" is decided by the caller, never by reading failureReason. + public void onManualStatusAltered(JobStatus newStatus, FailureReason reason) { + lock.writeLock().lock(); + try { + resetFailureInfo(reason); + if (JobStatus.PAUSED.equals(newStatus) && runningStreamTask != null) { + // Force resume to swap in a fresh reader, in case the release RPC races or fails. + this.needRebuildReader = true; + runningStreamTask.releaseRemoteReader(); + } + } finally { + lock.writeLock().unlock(); + } + } + public boolean hasMoreDataToConsume() { return offsetProvider.hasMoreDataToConsume(); } @@ -791,9 +830,14 @@ public void onTaskSuccess(StreamingJobSchedulerTask task) throws JobException { public void onStreamTaskFail(AbstractStreamingTask task) throws JobException { try { + this.needRebuildReader = true; failedTaskCount.incrementAndGet(); Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task); - this.failureReason = new FailureReason(task.getErrMsg()); + // Don't overwrite a manual pause: a late failure callback would otherwise let auto resume wake it. + if (this.getFailureReason() == null + || !InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) { + this.failureReason = new FailureReason(task.getErrMsg()); + } if (MetricRepo.isInit) { MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L); } @@ -805,6 +849,7 @@ public void onStreamTaskFail(AbstractStreamingTask task) throws JobException { public void onStreamTaskSuccess(AbstractStreamingTask task) throws JobException { try { + this.needRebuildReader = false; resetFailureInfo(null); succeedTaskCount.incrementAndGet(); lastTaskSuccessTime = System.currentTimeMillis(); @@ -980,6 +1025,7 @@ public void replayOnUpdated(StreamingInsertJob replayJob) { setFailedTaskCount(replayJob.getFailedTaskCount()); setCanceledTaskCount(replayJob.getCanceledTaskCount()); setLastTaskSuccessTime(replayJob.getLastTaskSuccessTime()); + this.boundBackendId = replayJob.boundBackendId; } /** @@ -1402,6 +1448,9 @@ public void gsonPostProcess() throws IOException { if (null == lock) { this.lock = new ReentrantReadWriteLock(true); } + + // a stale reader may survive on the bound BE across FE restart/failover + this.needRebuildReader = true; } /** @@ -1528,6 +1577,8 @@ public void replayOffsetProviderIfNeed() throws JobException { if (offsetProvider != null) { // when fe restart, offsetProvider.jobId/sourceProperties may be null offsetProvider.ensureInitialized(getJobId(), getProviderProps()); + // replayOnUpdated skips the transient provider; resync routing BE. + offsetProvider.setBoundBackendId(boundBackendId); offsetProvider.replayIfNeed(this); } } @@ -1570,7 +1621,8 @@ public void cleanup() throws JobException { } catch (JobException ex) { log.warn("refresh provider props before cleanMeta failed, job id: {}", getJobId(), ex); } - ((JdbcSourceOffsetProvider) this.offsetProvider).cleanMeta(getJobId()); + long runtimeBackendId = runningStreamTask != null ? runningStreamTask.getRunningBackendId() : -1; + ((JdbcSourceOffsetProvider) this.offsetProvider).cleanMeta(getJobId(), runtimeBackendId); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 9077ad01a827cc..3d3fb5bda114d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -127,7 +127,7 @@ public void run() throws JobException { } private void sendWriteRequest() throws JobException { - Backend backend = StreamingJobUtils.selectBackend(cloudCluster); + Backend backend = resolveBackend(); log.info("start to run streaming multi task {} in backend {}/{}, offset is {}", taskId, backend.getId(), backend.getHost(), runningOffset.toString()); this.runningBackendId = backend.getId(); @@ -168,13 +168,33 @@ private void sendWriteRequest() throws JobException { log.warn("cdc_client RPC timeout api=/api/writeRecords taskId={} jobId={} backend={}:{} timeout_sec={}", taskId, getJobId(), backend.getHost(), backend.getBrpcPort(), Config.streaming_cdc_heavy_rpc_timeout_sec); + // the request may have been dispatched and still running remotely + noRetry = true; throw new JobException("cdc_client RPC timeout: /api/writeRecords taskId=" + taskId); } catch (ExecutionException | InterruptedException ex) { log.error("Send write request failed: ", ex); + noRetry = true; throw new JobException(ex); } } + private Backend resolveBackend() throws JobException { + // Snapshot phase keeps per-round selection; binlog phase binds to a fixed BE for reuse. + if (((JdbcOffset) runningOffset).snapshotSplit()) { + return StreamingJobUtils.selectBackend(cloudCluster); + } + return getStreamingJob().resolveBoundBackend(); + } + + // Fail loud on a dropped/wrong-type job rather than return null and risk a downstream NPE. + private StreamingInsertJob getStreamingJob() throws JobException { + Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); + if (job == null) { + throw new JobException("Streaming job " + getJobId() + " not found"); + } + return (StreamingInsertJob) job; + } + private String getToken() throws JobException { String token = ""; try { @@ -210,6 +230,7 @@ private WriteRecordRequest buildRequestParams() throws JobException { request.setFrontendAddress(feAddr); request.setMaxInterval(jobProperties.getMaxIntervalSecond()); request.setTaskTimeoutMs(getTaskTimeoutMs()); + request.setRebuildReader(getStreamingJob().isNeedRebuildReader()); if (offsetProvider instanceof JdbcSourceOffsetProvider) { String schemas = ((JdbcSourceOffsetProvider) offsetProvider).getTableSchemas(); if (schemas != null) { @@ -308,28 +329,31 @@ public void successCallback(CommitOffsetRequest offsetRequest) throws JobExcepti @Override protected void onFail(String errMsg) throws JobException { - // Release this task's reader before reschedule so it stops competing for the shared slot. + // Stop a possibly still-running reader now, so the PG slot frees before auto-resume re-acquires it. releaseRemoteReader(); super.onFail(errMsg); } @Override public void cancel(boolean needWaitCancelComplete) { - // No release here: DROP/STOP/PAUSE clean up via /api/close; releasing would orphan the engine. + // No release here: drop/stop free via /api/close and manual pause via /api/releaseReader; + // releasing in cancel would orphan the reused engine. super.cancel(needWaitCancelComplete); } @Override public void closeOrReleaseResources() { - // No-op: reader is shared across tasks; release on reschedule is done in onFail(). + // No-op: the reader is async and reused; releasing here (per-iteration finally) would kill it. } - /** - * Best-effort, onFail reschedule only: stop this job's reader on {@link #runningBackendId} so a - * reschedule to another backend never leaves two readers competing for the same source (e.g. one - * PG replication slot, which is kept, not dropped). Failures are swallowed and must not block - * rescheduling. - */ + @Override + public long getRunningBackendId() { + return runningBackendId; + } + + // Best-effort release on runningBackendId (keep slot): on task failure to stop a stuck/zombie + // reader early, and on manual pause so resume can rebind. Failures swallowed; idle reaper backs up. + @Override public void releaseRemoteReader() { if (runningBackendId <= 0) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 23f51890da5679..8c55856da57d7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -282,7 +282,7 @@ public void alterJobStatus(String jobName, JobStatus jobStatus, FailureReason re checkSameStatus(a, jobStatus); alterJobStatus(a.getJobId(), jobStatus); if (a instanceof StreamingInsertJob) { - ((StreamingInsertJob) a).resetFailureInfo(reason); + ((StreamingInsertJob) a).onManualStatusAltered(jobStatus, reason); } } catch (JobException e) { throw new JobException("Alter job status error, jobName is %s, errorMsg is %s", diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java index a6f9e582bff91d..8336966a5bd2bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -94,6 +94,9 @@ default void initOnCreate(List syncTables) throws JobException {} */ default void setCloudCluster(String cloudCluster) {} + /** Bind the BE this job is pinned to in the binlog phase, for reader-reuse heartbeat routing. */ + default void setBoundBackendId(long boundBackendId) {} + /** * Fetch remote meta information, such as listing files in S3 or getting latest offsets in Kafka. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 9b7d364895bd06..e2662844fd4003 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -107,6 +107,9 @@ public class JdbcSourceOffsetProvider implements SourceOffsetProvider { transient volatile String cloudCluster; + // Route fetchEndOffset/compareOffset to the bound BE (synced from job, not persisted). + transient volatile long boundBackendId; + /** Split progress (cdc-fetch view), >= committedSplitProgress. Rebuilt on restart. */ transient SplitProgress cdcSplitProgress = new SplitProgress(); @@ -254,9 +257,14 @@ public void updateOffset(Offset offset) { this.currentOffset = newOffset; } + @Override + public void setBoundBackendId(long boundBackendId) { + this.boundBackendId = boundBackendId; + } + @Override public void fetchRemoteMeta(Map properties) throws Exception { - Backend backend = StreamingJobUtils.selectBackend(cloudCluster); + Backend backend = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId); JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress()); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() @@ -354,7 +362,7 @@ public boolean hasMoreDataToConsume() { private boolean compareOffset(Map offsetFirst, Map offsetSecond) throws JobException { - Backend backend = StreamingJobUtils.selectBackend(cloudCluster); + Backend backend = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId); CompareOffsetRequest requestParams = new CompareOffsetRequest(getJobId(), sourceType.name(), sourceProperties, getFrontendAddress(), offsetFirst, offsetSecond); @@ -999,10 +1007,20 @@ private void initSourceReader() throws JobException { } } - public void cleanMeta(Long jobId) throws JobException { + public void cleanMeta(Long jobId, long runtimeBackendId) throws JobException { // clean meta table StreamingJobUtils.deleteJobMeta(jobId); - Backend backend = StreamingJobUtils.selectBackend(cloudCluster); + // Dropping the slot only succeeds on the BE owning the live reader (it stops its own engine + // first, freeing the slot). Prefer the runtime BE (covers the unbound snapshot phase), then + // the bound BE; both may be alive but not load-available, so route by isAlive. Only when + // neither is alive fall back to a random BE to drop the now-inactive slot. + Backend backend = aliveBackend(runtimeBackendId); + if (backend == null) { + backend = aliveBackend(boundBackendId); + } + if (backend == null) { + backend = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId); + } JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress()); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() @@ -1027,6 +1045,14 @@ public void cleanMeta(Long jobId) throws JobException { } } + private static Backend aliveBackend(long backendId) { + if (backendId <= 0) { + return null; + } + Backend be = Env.getCurrentSystemInfo().getBackend(backendId); + return be != null && be.isAlive() ? be : null; + } + private String getFrontendAddress() { return Env.getCurrentEnv().getMasterHost() + ":" + Env.getCurrentEnv().getMasterHttpPort(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index cff663e0142b68..ff67d2429e1753 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -274,6 +274,11 @@ public static JdbcClient getJdbcClient(DataSourceType sourceType, Map bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getBackendsByClusterName(cloudCluster) @@ -284,10 +289,23 @@ public static Backend selectBackend(String cloudCluster) throws JobException { throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", compute_group: " + cloudCluster); } + if (preferredBackendId > 0) { + for (Backend be : bes) { + if (be.getId() == preferredBackendId) { + return be; + } + } + } int idx = getLastSelectedBackendIndexAndUpdate(); return bes.get(Math.floorMod(idx, bes.size())); } + if (preferredBackendId > 0) { + Backend bound = Env.getCurrentSystemInfo().getBackend(preferredBackendId); + if (bound != null && bound.isLoadAvailable()) { + return bound; + } + } BeSelectionPolicy policy = new BeSelectionPolicy.Builder() .setEnableRoundRobin(true).needLoadAvailable().build(); policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate(); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java index 953903a80323a6..6d3f735730b3fc 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java @@ -25,4 +25,11 @@ public class Constants { public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 3000L; public static final String DORIS_TARGET_DB = "doris_target_db"; + + // Idle from-to reader cleanup: release (keep slot) when idle past MULTIPLIER * max_interval. + public static final long IDLE_READER_SCAN_INTERVAL_MS = 15_000L; + public static final int IDLE_READER_TIMEOUT_MULTIPLIER = 10; + // Floor the idle timeout: PG reader rebuild is costly, absorb heartbeat jitter at small + // interval. + public static final long IDLE_READER_MIN_TIMEOUT_MS = 90_000L; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java index 28da598053b004..5727a77730bff6 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java @@ -21,11 +21,15 @@ import org.apache.doris.cdcclient.source.factory.SourceReaderFactory; import org.apache.doris.cdcclient.source.reader.SourceReader; import org.apache.doris.job.cdc.request.JobBaseConfig; +import org.apache.doris.job.cdc.request.WriteRecordRequest; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -40,6 +44,7 @@ public class Env { private static volatile Env INSTANCE; private final Map jobContexts; private final Map jobLocks; + private final ScheduledExecutorService idleReaderScheduler; @Setter private int backendHttpPort; @Setter @Getter private String clusterToken; @Setter @Getter private volatile String feMasterAddress; @@ -47,6 +52,18 @@ public class Env { private Env() { this.jobContexts = new ConcurrentHashMap<>(); this.jobLocks = new ConcurrentHashMap<>(); + this.idleReaderScheduler = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "cdc-idle-reader-cleaner"); + t.setDaemon(true); + return t; + }); + this.idleReaderScheduler.scheduleWithFixedDelay( + this::releaseIdleReaders, + Constants.IDLE_READER_SCAN_INTERVAL_MS, + Constants.IDLE_READER_SCAN_INTERVAL_MS, + TimeUnit.MILLISECONDS); } public String getBackendHostPort() { @@ -90,9 +107,24 @@ public SourceReader getReaderAndClaim(JobBaseConfig jobConfig, String taskId) { DataSource ds = resolveDataSource(jobConfig.getDataSource()); String jobId = jobConfig.getJobId(); Lock lock = jobLocks.computeIfAbsent(jobId, k -> new ReentrantLock()); + SourceReader staleReader = null; + JobBaseConfig staleConfig = null; + SourceReader reader; lock.lock(); try { JobContext context = jobContexts.get(jobId); + if (context != null + && jobConfig instanceof WriteRecordRequest + && ((WriteRecordRequest) jobConfig).isRebuildReader()) { + // FE declared the previous task abnormal: swap in a fresh reader instance so the + // old task's thread can never reach the new fetcher. + LOG.info( + "Rebuild reader for job {} on FE request, discard current instance", jobId); + jobContexts.remove(jobId); + staleReader = context.reader; + staleConfig = context.jobConfig != null ? context.jobConfig : jobConfig; + context = null; + } if (context == null) { LOG.info("Creating new reader for job {}, dataSource {}", jobId, ds); context = new JobContext(jobId, ds, jobConfig.getConfig()); @@ -100,10 +132,30 @@ public SourceReader getReaderAndClaim(JobBaseConfig jobConfig, String taskId) { jobContexts.put(jobId, context); } context.ownerTaskId = taskId; - return context.getReader(ds); + context.jobConfig = jobConfig; + if (jobConfig instanceof WriteRecordRequest) { + context.maxIntervalMs = ((WriteRecordRequest) jobConfig).getMaxInterval() * 1000; + } + context.lastAliveTime = System.currentTimeMillis(); + reader = context.getReader(ds); } finally { lock.unlock(); } + if (staleReader != null) { + // free the engine/slot connection before the caller submits the new fetcher + try { + staleReader.release(staleConfig); + } catch (Exception ex) { + LOG.warn("Failed to release stale reader for job {}", jobId, ex); + } + } + return reader; + } + + /** Whether {@code taskId} is still the current claimer of this job's reader. */ + public boolean isOwner(String jobId, String taskId) { + JobContext context = jobContexts.get(jobId); + return context != null && Objects.equals(context.ownerTaskId, taskId); } /** @@ -192,12 +244,68 @@ public void close(String jobId) { } } + /** Liveness evidence (FE heartbeat or active poll): keep this job's reader alive. */ + public void keepAlive(String jobId) { + JobContext context = jobContexts.get(jobId); + if (context != null) { + context.lastAliveTime = System.currentTimeMillis(); + } + } + + // Release (keep slot) readers FE no longer drives; maxIntervalMs<=0 = untracked (e.g. TVF), + // skip. + private void releaseIdleReaders() { + long now = System.currentTimeMillis(); + for (String jobId : jobContexts.keySet()) { + Lock lock = jobLocks.get(jobId); + if (lock == null || !lock.tryLock()) { + continue; + } + SourceReader toRelease = null; + JobBaseConfig releaseConfig = null; + try { + JobContext context = jobContexts.get(jobId); + if (context == null || context.lastAliveTime <= 0 || context.maxIntervalMs <= 0) { + continue; + } + long timeout = + Math.max( + (long) Constants.IDLE_READER_TIMEOUT_MULTIPLIER + * context.maxIntervalMs, + Constants.IDLE_READER_MIN_TIMEOUT_MS); + if (now - context.lastAliveTime <= timeout) { + continue; + } + LOG.info( + "Releasing idle reader for job {}, idle {} ms, keep slot", + jobId, + now - context.lastAliveTime); + jobContexts.remove(jobId); + toRelease = context.reader; + releaseConfig = context.jobConfig; + } finally { + lock.unlock(); + } + // Release outside the lock so blocking IO never stalls getReaderAndClaim/detach. + if (toRelease != null && releaseConfig != null) { + try { + toRelease.release(releaseConfig); + } catch (Exception ex) { + LOG.warn("Failed to release idle reader for job {}", jobId, ex); + } + } + } + } + private static final class JobContext { private final String jobId; private volatile SourceReader reader; private volatile String ownerTaskId; private volatile Map config; private volatile DataSource dataSource; + private volatile JobBaseConfig jobConfig; + private volatile long maxIntervalMs; + private volatile long lastAliveTime; private JobContext(String jobId, DataSource dataSource, Map config) { this.jobId = jobId; diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java index 22509a55e9846f..4ae6970373d76b 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java @@ -107,6 +107,7 @@ public Object fetchEndOffset(@RequestBody JobBaseConfig jobConfig) { LOG.info("Fetching end offset for job {}", jobConfig.getJobId()); try { SourceReader reader = Env.getCurrentEnv().getReader(jobConfig); + Env.getCurrentEnv().keepAlive(jobConfig.getJobId()); return RestResponse.success(reader.getEndOffset(jobConfig)); } catch (Exception ex) { LOG.error("Failed to fetch end offset, jobId={}", jobConfig.getJobId(), ex); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index ebbd9c4acf1c88..1ff347c59b0aa6 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -403,7 +403,13 @@ public CompletableFuture writeRecordsAsync(WriteRecordRequest writeRecordR writeRecordRequest.getJobId(), writeRecordRequest.getTaskId()); } catch (Exception ex) { - closeJobStreamLoad(writeRecordRequest.getJobId()); + // a displaced task must not close the streamload the successor is using + if (Env.getCurrentEnv() + .isOwner( + writeRecordRequest.getJobId(), + writeRecordRequest.getTaskId())) { + closeJobStreamLoad(writeRecordRequest.getJobId()); + } String rootCauseMessage = ExceptionUtils.getRootCauseMessage(ex); taskErrorMaps.put(writeRecordRequest.getTaskId(), rootCauseMessage); LOG.error( @@ -454,6 +460,7 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception SplitReadResult readResult = null; boolean hasExecuteDDL = false; boolean isSnapshotSplit = false; + boolean stillOwner = false; try { // 1. submit split async readResult = sourceReader.prepareAndSubmitSplit(writeRecordRequest); @@ -477,6 +484,8 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception // 2. poll record while (!shouldStop) { + // Active poll keeps the reader alive so the reaper won't reclaim it mid-task. + Env.getCurrentEnv().keepAlive(writeRecordRequest.getJobId()); Iterator recordIterator = sourceReader.pollRecords(); if (!recordIterator.hasNext()) { @@ -530,6 +539,8 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception } while (recordIterator.hasNext()) { + // streamload backpressure can stall this loop past the reaper timeout + Env.getCurrentEnv().keepAlive(writeRecordRequest.getJobId()); SourceRecord element = recordIterator.next(); // Check if this is a heartbeat message @@ -591,7 +602,21 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception writeRecordRequest.getTaskId()); } finally { - cleanupReaderResources(sourceReader, writeRecordRequest.getJobId(), readResult); + stillOwner = + Env.getCurrentEnv() + .isOwner(writeRecordRequest.getJobId(), writeRecordRequest.getTaskId()); + // A displaced task must not touch the reader (finishSplitRecords would kill the + // successor's fetcher) nor commit anything. + if (stillOwner) { + cleanupReaderResources(sourceReader, writeRecordRequest.getJobId(), readResult); + } + } + if (!stillOwner) { + LOG.info( + "Skip commit for job {} task {}: reader released or taken over", + writeRecordRequest.getJobId(), + writeRecordRequest.getTaskId()); + return; } // 3. Extract offset from split state @@ -600,7 +625,6 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception batchStreamLoad.forceFlush(); // 5. request fe api update offset - String currentTaskId = batchStreamLoad.getCurrentTaskId(); // The offset must be reset before commitOffset to prevent the next taskId from being create // by the fe. batchStreamLoad.resetTaskId(); @@ -614,8 +638,9 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception if (hasExecuteDDL || (!isSnapshotSplit && feHadNoSchema)) { tableSchemas = sourceReader.serializeTableSchemas(); } + // own taskId, never the shared currentTaskId: FE rejects it if another task took over batchStreamLoad.commitOffset( - currentTaskId, + writeRecordRequest.getTaskId(), metaResponse, scannedRows, batchStreamLoad.getLoadStatistic(), @@ -742,6 +767,10 @@ public String getTaskFailReason(String taskId) { */ private void cleanupReaderResources( SourceReader sourceReader, String jobId, SplitReadResult readResult) { + boolean isSnapshotSplit = + readResult != null + && readResult.getSplit() != null + && sourceReader.isSnapshotSplit(readResult.getSplit()); try { // The LSN in the commit is the current offset, which is the offset from the last // successful write. @@ -750,11 +779,10 @@ private void cleanupReaderResources( sourceReader.commitSourceOffset(jobId, readResult.getSplit()); } } finally { - // This must be called after `commitSourceOffset`; otherwise, - // PG's confirmed lsn will not proceed. - // This operation must be performed before `batchStreamLoad.commitOffset`; - // otherwise, fe might create the next task for this job. - sourceReader.finishSplitRecords(); + // Binlog keeps the reader alive across tasks for reuse; only snapshot finishes here. + if (isSnapshotSplit) { + sourceReader.finishSplitRecords(); + } } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index f0987eb97eec05..14f0f58f4e021c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -407,11 +407,40 @@ private synchronized SplitReadResult prepareStreamSplit( baseReq.getJobId()); } Tuple2 splitFlag = createStreamSplit(offsetMeta, baseReq); - this.streamSplit = splitFlag.f0.asStreamSplit(); + StreamSplit newStreamSplit = splitFlag.f0.asStreamSplit(); + + // offset guard: reuse only when request start == reader's consumed position. Compare by + // compareTo (LSN), NOT equals -- PG drops lsn_proc/commit so same position differs by map. + if (this.streamReader != null && this.streamSplitState != null) { + Offset requestStart = newStreamSplit.getStartingOffset(); + Offset readerPos = this.streamSplitState.getStartingOffset(); + if (requestStart != null + && readerPos != null + && requestStart.compareTo(readerPos) == 0) { + LOG.info( + "Reuse live stream reader for job {} at offset {}", + baseReq.getJobId(), + requestStart); + // Refresh split so commitSourceOffset advances PG confirmed_lsn to the FE-committed + // offset (== reader pos); poll/offset keep using streamSplitState. + this.streamSplit = newStreamSplit; + SplitReadResult reuseResult = new SplitReadResult(); + reuseResult.setSplits(Collections.singletonList(this.streamSplit)); + Map reuseStates = new HashMap<>(); + reuseStates.put(this.streamSplit.splitId(), this.streamSplitState); + reuseResult.setSplitStates(reuseStates); + return reuseResult; + } + LOG.info( + "Rebuild stream reader for job {}: requestStart={}, readerPos={}", + baseReq.getJobId(), + requestStart, + readerPos); + } - // Close previous stream reader to release resources (e.g. PG replication slot) - // before creating a new one. This prevents connection leaks when a cancelled - // task's reader is still active while a new task arrives. + this.streamSplit = newStreamSplit; + // Close previous stream reader (rebuild path) before creating a new one. This prevents + // connection leaks when a cancelled task's reader is still active while a new task arrives. if (this.streamReader != null) { LOG.info( "Closing previous stream reader before creating new one for job {}", diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index a4c19aedb9edd2..7dcc836efffdb4 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -434,7 +434,37 @@ private synchronized SplitReadResult prepareBinlogSplit( // Load tableSchemas from FE if available (avoids re-discover on restart) tryLoadTableSchemasFromRequest(baseReq); Tuple2 splitFlag = createBinlogSplit(offsetMeta, baseReq); - this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0; + MySqlBinlogSplit newBinlogSplit = (MySqlBinlogSplit) splitFlag.f0; + + // offset guard: reuse the live binlog reader only when the request start offset equals the + // reader's real consumed position, so steady-state rounds skip reconnect + binlog + // re-position. + if (this.binlogReader != null && this.binlogSplitState != null) { + BinlogOffset requestStart = newBinlogSplit.getStartingOffset(); + BinlogOffset readerPos = this.binlogSplitState.getStartingOffset(); + if (requestStart != null + && readerPos != null + && requestStart.compareTo(readerPos) == 0) { + LOG.info( + "Reuse live binlog reader for job {} at offset {}", + baseReq.getJobId(), + requestStart); + this.binlogSplit = newBinlogSplit; + SplitReadResult reuseResult = new SplitReadResult(); + reuseResult.setSplits(Collections.singletonList(this.binlogSplit)); + Map reuseStates = new HashMap<>(); + reuseStates.put(this.binlogSplit.splitId(), this.binlogSplitState); + reuseResult.setSplitStates(reuseStates); + return reuseResult; + } + LOG.info( + "Rebuild binlog reader for job {}: requestStart={}, readerPos={}", + baseReq.getJobId(), + requestStart, + readerPos); + } + + this.binlogSplit = newBinlogSplit; // Close previous binlog reader to release resources before creating a new one. // This prevents connection leaks when a cancelled task's reader is still active