From 6cc16e1d6faf9bd972d6387981decd7db5ad39eb Mon Sep 17 00:00:00 2001 From: wudi Date: Thu, 11 Jun 2026 18:43:06 +0800 Subject: [PATCH 1/6] add binlogphase bind be --- .../job/cdc/request/WriteRecordRequest.java | 2 + .../streaming/AbstractStreamingTask.java | 4 + .../insert/streaming/StreamingInsertJob.java | 51 ++++++++- .../streaming/StreamingMultiTblTask.java | 46 ++++++-- .../apache/doris/job/manager/JobManager.java | 2 +- .../job/offset/SourceOffsetProvider.java | 3 + .../offset/jdbc/JdbcSourceOffsetProvider.java | 15 ++- .../doris/job/util/StreamingJobUtils.java | 18 +++ .../doris/cdcclient/common/Constants.java | 7 ++ .../apache/doris/cdcclient/common/Env.java | 105 +++++++++++++++++- .../controller/ClientController.java | 1 + .../service/PipelineCoordinator.java | 46 ++++++-- .../reader/JdbcIncrementalSourceReader.java | 37 +++++- 13 files changed, 308 insertions(+), 29 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java index 511e4fcea749a7..05784404c310aa 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java @@ -31,4 +31,6 @@ public class WriteRecordRequest extends JobBaseRecordRequest { private String token; private String taskId; private Map streamLoadProps; + // previous task ended abnormally, rebuild reader instead of reusing + private boolean rebuildReader; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java index 62adf21daf6014..6f395cf7a4c0ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java @@ -83,6 +83,10 @@ public List getScanBackendIds() { public abstract void closeOrReleaseResources(); + // Release the remote cdc reader (keep slot). No-op for tasks without a cdc reader (e.g. TVF). + public void releaseRemoteReader() { + } + public void execute() throws JobException { while (retryCount <= MAX_RETRY) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 09090ba3ad184b..b88cca368556cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -75,6 +75,7 @@ import org.apache.doris.qe.ShowResultSetMetaData; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.system.Backend; import org.apache.doris.tablefunction.S3TableValuedFunction; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; @@ -180,6 +181,15 @@ public class StreamingInsertJob extends AbstractJob syncTables; + // Incremental(binlog) phase: bound BE for reader reuse; <=0 = unbound (replay-safe default). + @SerializedName("bbe") + private volatile long boundBackendId; + + // previous task ended abnormally (or FE restarted), next dispatch must rebuild the cdc reader + @Getter + @Setter + private transient volatile boolean needRebuildReader = true; + // The sampling window starts at the beginning of the sampling window. // If the error rate exceeds `max_filter_ratio` within the window, the sampling fails. @Setter @@ -393,6 +403,7 @@ private SourceOffsetProvider createOffsetProvider(Map jdbcSource provider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, jdbcSourceProps); } provider.setCloudCluster(this.cloudCluster); + provider.setBoundBackendId(boundBackendId); return provider; } @@ -646,6 +657,18 @@ offsetProvider, getConvertedSourceProperties(), targetDb, targetProperties, jobP getCreateUser(), cloudCluster); } + // Binlog phase: prefer the bound BE in selectBackend; rebind + persist on change. + public Backend resolveBoundBackend() throws JobException { + Backend selected = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId); + if (selected.getId() != boundBackendId) { + log.info("bind job {} to backend {} (was {})", getJobId(), selected.getId(), boundBackendId); + boundBackendId = selected.getId(); + logUpdateOperation(); + } + offsetProvider.setBoundBackendId(boundBackendId); + return selected; + } + private Map getConvertedSourceProperties() throws JobException { if (convertedSourceProperties == null) { this.convertedSourceProperties = StreamingJobUtils.convertCertFile(getDbId(), sourceProperties); @@ -766,7 +789,21 @@ public void clearRunningStreamTask(JobStatus newJobStatus) { log.info("clear running streaming insert task for job {}, task {}, status {} ", getJobId(), runningStreamTask.getTaskId(), runningStreamTask.getStatus()); runningStreamTask.cancel(JobStatus.STOPPED.equals(newJobStatus) ? false : true); - runningStreamTask.closeOrReleaseResources(); + // Reader release for manual pause is driven by the command entry; failure pause keeps it for reuse. + } + } + + // Command entry for a manual status change: reset the failure/retry budget, and on manual pause + // release the reader (keep slot). "Manual" is decided by the caller, never by reading failureReason. + public void onManualStatusAltered(JobStatus newStatus, FailureReason reason) { + lock.writeLock().lock(); + try { + resetFailureInfo(reason); + if (JobStatus.PAUSED.equals(newStatus) && runningStreamTask != null) { + runningStreamTask.releaseRemoteReader(); + } + } finally { + lock.writeLock().unlock(); } } @@ -791,9 +828,14 @@ public void onTaskSuccess(StreamingJobSchedulerTask task) throws JobException { public void onStreamTaskFail(AbstractStreamingTask task) throws JobException { try { + this.needRebuildReader = true; failedTaskCount.incrementAndGet(); Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task); - this.failureReason = new FailureReason(task.getErrMsg()); + // Don't overwrite a manual pause: a late failure callback would otherwise let auto resume wake it. + if (this.getFailureReason() == null + || !InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) { + this.failureReason = new FailureReason(task.getErrMsg()); + } if (MetricRepo.isInit) { MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L); } @@ -805,6 +847,7 @@ public void onStreamTaskFail(AbstractStreamingTask task) throws JobException { public void onStreamTaskSuccess(AbstractStreamingTask task) throws JobException { try { + this.needRebuildReader = false; resetFailureInfo(null); succeedTaskCount.incrementAndGet(); lastTaskSuccessTime = System.currentTimeMillis(); @@ -980,6 +1023,7 @@ public void replayOnUpdated(StreamingInsertJob replayJob) { setFailedTaskCount(replayJob.getFailedTaskCount()); setCanceledTaskCount(replayJob.getCanceledTaskCount()); setLastTaskSuccessTime(replayJob.getLastTaskSuccessTime()); + this.boundBackendId = replayJob.boundBackendId; } /** @@ -1402,6 +1446,9 @@ public void gsonPostProcess() throws IOException { if (null == lock) { this.lock = new ReentrantReadWriteLock(true); } + + // a stale reader may survive on the bound BE across FE restart/failover + this.needRebuildReader = true; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 9077ad01a827cc..65b93e81f38472 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -127,7 +127,7 @@ public void run() throws JobException { } private void sendWriteRequest() throws JobException { - Backend backend = StreamingJobUtils.selectBackend(cloudCluster); + Backend backend = resolveBackend(); log.info("start to run streaming multi task {} in backend {}/{}, offset is {}", taskId, backend.getId(), backend.getHost(), runningOffset.toString()); this.runningBackendId = backend.getId(); @@ -168,13 +168,40 @@ private void sendWriteRequest() throws JobException { log.warn("cdc_client RPC timeout api=/api/writeRecords taskId={} jobId={} backend={}:{} timeout_sec={}", taskId, getJobId(), backend.getHost(), backend.getBrpcPort(), Config.streaming_cdc_heavy_rpc_timeout_sec); + // the request may have been dispatched, the retry must not reuse the reader + markJobNeedRebuildReader(); throw new JobException("cdc_client RPC timeout: /api/writeRecords taskId=" + taskId); } catch (ExecutionException | InterruptedException ex) { log.error("Send write request failed: ", ex); + markJobNeedRebuildReader(); throw new JobException(ex); } } + private Backend resolveBackend() throws JobException { + // Snapshot phase keeps per-round selection; binlog phase binds to a fixed BE for reuse. + if (((JdbcOffset) runningOffset).snapshotSplit()) { + return StreamingJobUtils.selectBackend(cloudCluster); + } + Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); + if (!(job instanceof StreamingInsertJob)) { + return StreamingJobUtils.selectBackend(cloudCluster); + } + return ((StreamingInsertJob) job).resolveBoundBackend(); + } + + private StreamingInsertJob getStreamingJob() { + Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); + return job instanceof StreamingInsertJob ? (StreamingInsertJob) job : null; + } + + private void markJobNeedRebuildReader() { + StreamingInsertJob job = getStreamingJob(); + if (job != null) { + job.setNeedRebuildReader(true); + } + } + private String getToken() throws JobException { String token = ""; try { @@ -210,6 +237,10 @@ private WriteRecordRequest buildRequestParams() throws JobException { request.setFrontendAddress(feAddr); request.setMaxInterval(jobProperties.getMaxIntervalSecond()); request.setTaskTimeoutMs(getTaskTimeoutMs()); + StreamingInsertJob job = getStreamingJob(); + if (job != null) { + request.setRebuildReader(job.isNeedRebuildReader()); + } if (offsetProvider instanceof JdbcSourceOffsetProvider) { String schemas = ((JdbcSourceOffsetProvider) offsetProvider).getTableSchemas(); if (schemas != null) { @@ -308,7 +339,7 @@ public void successCallback(CommitOffsetRequest offsetRequest) throws JobExcepti @Override protected void onFail(String errMsg) throws JobException { - // Release this task's reader before reschedule so it stops competing for the shared slot. + // stop the possibly still-running remote reader before reschedule releaseRemoteReader(); super.onFail(errMsg); } @@ -321,15 +352,12 @@ public void cancel(boolean needWaitCancelComplete) { @Override public void closeOrReleaseResources() { - // No-op: reader is shared across tasks; release on reschedule is done in onFail(). + // No-op: the reader is async and reused; releasing here (per-iteration finally) would kill it. } - /** - * Best-effort, onFail reschedule only: stop this job's reader on {@link #runningBackendId} so a - * reschedule to another backend never leaves two readers competing for the same source (e.g. one - * PG replication slot, which is kept, not dropped). Failures are swallowed and must not block - * rescheduling. - */ + // Manual pause only (best-effort): release the reader on runningBackendId, keep slot (not drop), + // so resume can rebind elsewhere without two readers on the same source. Failures swallowed; idle reaper backs up. + @Override public void releaseRemoteReader() { if (runningBackendId <= 0) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 23f51890da5679..8c55856da57d7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -282,7 +282,7 @@ public void alterJobStatus(String jobName, JobStatus jobStatus, FailureReason re checkSameStatus(a, jobStatus); alterJobStatus(a.getJobId(), jobStatus); if (a instanceof StreamingInsertJob) { - ((StreamingInsertJob) a).resetFailureInfo(reason); + ((StreamingInsertJob) a).onManualStatusAltered(jobStatus, reason); } } catch (JobException e) { throw new JobException("Alter job status error, jobName is %s, errorMsg is %s", diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java index a6f9e582bff91d..8336966a5bd2bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -94,6 +94,9 @@ default void initOnCreate(List syncTables) throws JobException {} */ default void setCloudCluster(String cloudCluster) {} + /** Bind the BE this job is pinned to in the binlog phase, for reader-reuse heartbeat routing. */ + default void setBoundBackendId(long boundBackendId) {} + /** * Fetch remote meta information, such as listing files in S3 or getting latest offsets in Kafka. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 9b7d364895bd06..fd110597f77ca3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -107,6 +107,9 @@ public class JdbcSourceOffsetProvider implements SourceOffsetProvider { transient volatile String cloudCluster; + // Route fetchEndOffset/compareOffset to the bound BE (synced from job, not persisted). + transient volatile long boundBackendId; + /** Split progress (cdc-fetch view), >= committedSplitProgress. Rebuilt on restart. */ transient SplitProgress cdcSplitProgress = new SplitProgress(); @@ -254,9 +257,14 @@ public void updateOffset(Offset offset) { this.currentOffset = newOffset; } + @Override + public void setBoundBackendId(long boundBackendId) { + this.boundBackendId = boundBackendId; + } + @Override public void fetchRemoteMeta(Map properties) throws Exception { - Backend backend = StreamingJobUtils.selectBackend(cloudCluster); + Backend backend = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId); JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress()); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() @@ -354,7 +362,7 @@ public boolean hasMoreDataToConsume() { private boolean compareOffset(Map offsetFirst, Map offsetSecond) throws JobException { - Backend backend = StreamingJobUtils.selectBackend(cloudCluster); + Backend backend = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId); CompareOffsetRequest requestParams = new CompareOffsetRequest(getJobId(), sourceType.name(), sourceProperties, getFrontendAddress(), offsetFirst, offsetSecond); @@ -1002,7 +1010,8 @@ private void initSourceReader() throws JobException { public void cleanMeta(Long jobId) throws JobException { // clean meta table StreamingJobUtils.deleteJobMeta(jobId); - Backend backend = StreamingJobUtils.selectBackend(cloudCluster); + // Route to the bound BE so close tears down its live reader; dead/unbound falls back to random. + Backend backend = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId); JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress()); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index cff663e0142b68..ff67d2429e1753 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -274,6 +274,11 @@ public static JdbcClient getJdbcClient(DataSourceType sourceType, Map bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getBackendsByClusterName(cloudCluster) @@ -284,10 +289,23 @@ public static Backend selectBackend(String cloudCluster) throws JobException { throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", compute_group: " + cloudCluster); } + if (preferredBackendId > 0) { + for (Backend be : bes) { + if (be.getId() == preferredBackendId) { + return be; + } + } + } int idx = getLastSelectedBackendIndexAndUpdate(); return bes.get(Math.floorMod(idx, bes.size())); } + if (preferredBackendId > 0) { + Backend bound = Env.getCurrentSystemInfo().getBackend(preferredBackendId); + if (bound != null && bound.isLoadAvailable()) { + return bound; + } + } BeSelectionPolicy policy = new BeSelectionPolicy.Builder() .setEnableRoundRobin(true).needLoadAvailable().build(); policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate(); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java index 953903a80323a6..5d8b76c860c9c5 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java @@ -25,4 +25,11 @@ public class Constants { public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 3000L; public static final String DORIS_TARGET_DB = "doris_target_db"; + + // Idle from-to reader cleanup: release (keep slot) when idle past MULTIPLIER * max_interval. + public static final long IDLE_READER_SCAN_INTERVAL_MS = 15_000L; + public static final int IDLE_READER_TIMEOUT_MULTIPLIER = 6; + // Floor the idle timeout: PG reader rebuild is costly, absorb heartbeat jitter at small + // interval. + public static final long IDLE_READER_MIN_TIMEOUT_MS = 60_000L; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java index 28da598053b004..217f9d31532778 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java @@ -21,11 +21,15 @@ import org.apache.doris.cdcclient.source.factory.SourceReaderFactory; import org.apache.doris.cdcclient.source.reader.SourceReader; import org.apache.doris.job.cdc.request.JobBaseConfig; +import org.apache.doris.job.cdc.request.WriteRecordRequest; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -40,6 +44,7 @@ public class Env { private static volatile Env INSTANCE; private final Map jobContexts; private final Map jobLocks; + private final ScheduledExecutorService idleReaderScheduler; @Setter private int backendHttpPort; @Setter @Getter private String clusterToken; @Setter @Getter private volatile String feMasterAddress; @@ -47,6 +52,18 @@ public class Env { private Env() { this.jobContexts = new ConcurrentHashMap<>(); this.jobLocks = new ConcurrentHashMap<>(); + this.idleReaderScheduler = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "cdc-idle-reader-cleaner"); + t.setDaemon(true); + return t; + }); + this.idleReaderScheduler.scheduleWithFixedDelay( + this::releaseIdleReaders, + Constants.IDLE_READER_SCAN_INTERVAL_MS, + Constants.IDLE_READER_SCAN_INTERVAL_MS, + TimeUnit.MILLISECONDS); } public String getBackendHostPort() { @@ -90,9 +107,24 @@ public SourceReader getReaderAndClaim(JobBaseConfig jobConfig, String taskId) { DataSource ds = resolveDataSource(jobConfig.getDataSource()); String jobId = jobConfig.getJobId(); Lock lock = jobLocks.computeIfAbsent(jobId, k -> new ReentrantLock()); + SourceReader staleReader = null; + JobBaseConfig staleConfig = null; + SourceReader reader; lock.lock(); try { JobContext context = jobContexts.get(jobId); + if (context != null + && jobConfig instanceof WriteRecordRequest + && ((WriteRecordRequest) jobConfig).isRebuildReader()) { + // FE declared the previous task abnormal: swap in a fresh reader instance so the + // old task's thread can never reach the new fetcher. + LOG.info( + "Rebuild reader for job {} on FE request, discard current instance", jobId); + jobContexts.remove(jobId); + staleReader = context.reader; + staleConfig = context.jobConfig != null ? context.jobConfig : jobConfig; + context = null; + } if (context == null) { LOG.info("Creating new reader for job {}, dataSource {}", jobId, ds); context = new JobContext(jobId, ds, jobConfig.getConfig()); @@ -100,10 +132,30 @@ public SourceReader getReaderAndClaim(JobBaseConfig jobConfig, String taskId) { jobContexts.put(jobId, context); } context.ownerTaskId = taskId; - return context.getReader(ds); + context.jobConfig = jobConfig; + if (jobConfig instanceof WriteRecordRequest) { + context.maxIntervalMs = ((WriteRecordRequest) jobConfig).getMaxInterval() * 1000; + } + context.lastAliveTime = System.currentTimeMillis(); + reader = context.getReader(ds); } finally { lock.unlock(); } + if (staleReader != null) { + // free the engine/slot connection before the caller submits the new fetcher + try { + staleReader.release(staleConfig); + } catch (Exception ex) { + LOG.warn("Failed to release stale reader for job {}", jobId, ex); + } + } + return reader; + } + + /** Whether {@code taskId} is still the current claimer of this job's reader. */ + public boolean isOwner(String jobId, String taskId) { + JobContext context = jobContexts.get(jobId); + return context != null && Objects.equals(context.ownerTaskId, taskId); } /** @@ -192,12 +244,63 @@ public void close(String jobId) { } } + /** Liveness evidence (FE heartbeat or active poll): keep this job's reader alive. */ + public void keepAlive(String jobId) { + JobContext context = jobContexts.get(jobId); + if (context != null) { + context.lastAliveTime = System.currentTimeMillis(); + } + } + + // Release (keep slot) readers FE no longer drives; maxIntervalMs<=0 = untracked (e.g. TVF), + // skip. + private void releaseIdleReaders() { + long now = System.currentTimeMillis(); + for (String jobId : jobContexts.keySet()) { + Lock lock = jobLocks.get(jobId); + if (lock == null || !lock.tryLock()) { + continue; + } + try { + JobContext context = jobContexts.get(jobId); + if (context == null || context.lastAliveTime <= 0 || context.maxIntervalMs <= 0) { + continue; + } + long timeout = + Math.max( + (long) Constants.IDLE_READER_TIMEOUT_MULTIPLIER + * context.maxIntervalMs, + Constants.IDLE_READER_MIN_TIMEOUT_MS); + if (now - context.lastAliveTime <= timeout) { + continue; + } + LOG.info( + "Releasing idle reader for job {}, idle {} ms, keep slot", + jobId, + now - context.lastAliveTime); + jobContexts.remove(jobId); + if (context.reader != null && context.jobConfig != null) { + try { + context.reader.release(context.jobConfig); + } catch (Exception ex) { + LOG.warn("Failed to release idle reader for job {}", jobId, ex); + } + } + } finally { + lock.unlock(); + } + } + } + private static final class JobContext { private final String jobId; private volatile SourceReader reader; private volatile String ownerTaskId; private volatile Map config; private volatile DataSource dataSource; + private volatile JobBaseConfig jobConfig; + private volatile long maxIntervalMs; + private volatile long lastAliveTime; private JobContext(String jobId, DataSource dataSource, Map config) { this.jobId = jobId; diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java index 22509a55e9846f..4ae6970373d76b 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java @@ -107,6 +107,7 @@ public Object fetchEndOffset(@RequestBody JobBaseConfig jobConfig) { LOG.info("Fetching end offset for job {}", jobConfig.getJobId()); try { SourceReader reader = Env.getCurrentEnv().getReader(jobConfig); + Env.getCurrentEnv().keepAlive(jobConfig.getJobId()); return RestResponse.success(reader.getEndOffset(jobConfig)); } catch (Exception ex) { LOG.error("Failed to fetch end offset, jobId={}", jobConfig.getJobId(), ex); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index ebbd9c4acf1c88..1ff347c59b0aa6 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -403,7 +403,13 @@ public CompletableFuture writeRecordsAsync(WriteRecordRequest writeRecordR writeRecordRequest.getJobId(), writeRecordRequest.getTaskId()); } catch (Exception ex) { - closeJobStreamLoad(writeRecordRequest.getJobId()); + // a displaced task must not close the streamload the successor is using + if (Env.getCurrentEnv() + .isOwner( + writeRecordRequest.getJobId(), + writeRecordRequest.getTaskId())) { + closeJobStreamLoad(writeRecordRequest.getJobId()); + } String rootCauseMessage = ExceptionUtils.getRootCauseMessage(ex); taskErrorMaps.put(writeRecordRequest.getTaskId(), rootCauseMessage); LOG.error( @@ -454,6 +460,7 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception SplitReadResult readResult = null; boolean hasExecuteDDL = false; boolean isSnapshotSplit = false; + boolean stillOwner = false; try { // 1. submit split async readResult = sourceReader.prepareAndSubmitSplit(writeRecordRequest); @@ -477,6 +484,8 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception // 2. poll record while (!shouldStop) { + // Active poll keeps the reader alive so the reaper won't reclaim it mid-task. + Env.getCurrentEnv().keepAlive(writeRecordRequest.getJobId()); Iterator recordIterator = sourceReader.pollRecords(); if (!recordIterator.hasNext()) { @@ -530,6 +539,8 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception } while (recordIterator.hasNext()) { + // streamload backpressure can stall this loop past the reaper timeout + Env.getCurrentEnv().keepAlive(writeRecordRequest.getJobId()); SourceRecord element = recordIterator.next(); // Check if this is a heartbeat message @@ -591,7 +602,21 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception writeRecordRequest.getTaskId()); } finally { - cleanupReaderResources(sourceReader, writeRecordRequest.getJobId(), readResult); + stillOwner = + Env.getCurrentEnv() + .isOwner(writeRecordRequest.getJobId(), writeRecordRequest.getTaskId()); + // A displaced task must not touch the reader (finishSplitRecords would kill the + // successor's fetcher) nor commit anything. + if (stillOwner) { + cleanupReaderResources(sourceReader, writeRecordRequest.getJobId(), readResult); + } + } + if (!stillOwner) { + LOG.info( + "Skip commit for job {} task {}: reader released or taken over", + writeRecordRequest.getJobId(), + writeRecordRequest.getTaskId()); + return; } // 3. Extract offset from split state @@ -600,7 +625,6 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception batchStreamLoad.forceFlush(); // 5. request fe api update offset - String currentTaskId = batchStreamLoad.getCurrentTaskId(); // The offset must be reset before commitOffset to prevent the next taskId from being create // by the fe. batchStreamLoad.resetTaskId(); @@ -614,8 +638,9 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception if (hasExecuteDDL || (!isSnapshotSplit && feHadNoSchema)) { tableSchemas = sourceReader.serializeTableSchemas(); } + // own taskId, never the shared currentTaskId: FE rejects it if another task took over batchStreamLoad.commitOffset( - currentTaskId, + writeRecordRequest.getTaskId(), metaResponse, scannedRows, batchStreamLoad.getLoadStatistic(), @@ -742,6 +767,10 @@ public String getTaskFailReason(String taskId) { */ private void cleanupReaderResources( SourceReader sourceReader, String jobId, SplitReadResult readResult) { + boolean isSnapshotSplit = + readResult != null + && readResult.getSplit() != null + && sourceReader.isSnapshotSplit(readResult.getSplit()); try { // The LSN in the commit is the current offset, which is the offset from the last // successful write. @@ -750,11 +779,10 @@ private void cleanupReaderResources( sourceReader.commitSourceOffset(jobId, readResult.getSplit()); } } finally { - // This must be called after `commitSourceOffset`; otherwise, - // PG's confirmed lsn will not proceed. - // This operation must be performed before `batchStreamLoad.commitOffset`; - // otherwise, fe might create the next task for this job. - sourceReader.finishSplitRecords(); + // Binlog keeps the reader alive across tasks for reuse; only snapshot finishes here. + if (isSnapshotSplit) { + sourceReader.finishSplitRecords(); + } } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index f0987eb97eec05..14f0f58f4e021c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -407,11 +407,40 @@ private synchronized SplitReadResult prepareStreamSplit( baseReq.getJobId()); } Tuple2 splitFlag = createStreamSplit(offsetMeta, baseReq); - this.streamSplit = splitFlag.f0.asStreamSplit(); + StreamSplit newStreamSplit = splitFlag.f0.asStreamSplit(); + + // offset guard: reuse only when request start == reader's consumed position. Compare by + // compareTo (LSN), NOT equals -- PG drops lsn_proc/commit so same position differs by map. + if (this.streamReader != null && this.streamSplitState != null) { + Offset requestStart = newStreamSplit.getStartingOffset(); + Offset readerPos = this.streamSplitState.getStartingOffset(); + if (requestStart != null + && readerPos != null + && requestStart.compareTo(readerPos) == 0) { + LOG.info( + "Reuse live stream reader for job {} at offset {}", + baseReq.getJobId(), + requestStart); + // Refresh split so commitSourceOffset advances PG confirmed_lsn to the FE-committed + // offset (== reader pos); poll/offset keep using streamSplitState. + this.streamSplit = newStreamSplit; + SplitReadResult reuseResult = new SplitReadResult(); + reuseResult.setSplits(Collections.singletonList(this.streamSplit)); + Map reuseStates = new HashMap<>(); + reuseStates.put(this.streamSplit.splitId(), this.streamSplitState); + reuseResult.setSplitStates(reuseStates); + return reuseResult; + } + LOG.info( + "Rebuild stream reader for job {}: requestStart={}, readerPos={}", + baseReq.getJobId(), + requestStart, + readerPos); + } - // Close previous stream reader to release resources (e.g. PG replication slot) - // before creating a new one. This prevents connection leaks when a cancelled - // task's reader is still active while a new task arrives. + this.streamSplit = newStreamSplit; + // Close previous stream reader (rebuild path) before creating a new one. This prevents + // connection leaks when a cancelled task's reader is still active while a new task arrives. if (this.streamReader != null) { LOG.info( "Closing previous stream reader before creating new one for job {}", From 7ce1a562e4958c1c8c674ed77c1ff0955cb24555 Mon Sep 17 00:00:00 2001 From: wudi Date: Thu, 11 Jun 2026 23:09:33 +0800 Subject: [PATCH 2/6] fail task directly on write rpc timeout to avoid same-taskId in-place retry --- .../insert/streaming/AbstractStreamingTask.java | 7 +++++-- .../insert/streaming/StreamingMultiTblTask.java | 13 +++---------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java index 6f395cf7a4c0ed..5ad3ce5e2db5d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java @@ -43,6 +43,8 @@ public abstract class AbstractStreamingTask { private static final int MAX_RETRY = 3; private static final String LABEL_SPLITTER = "_"; private int retryCount = 0; + // in-place retry would reuse this taskId, breaking ownership-based zombie isolation + protected volatile boolean noRetry; protected String labelName; protected Offset runningOffset; protected UserIdentity userIdentity; @@ -100,8 +102,9 @@ public void execute() throws JobException { } this.errMsg = e.getMessage(); retryCount++; - if (retryCount > MAX_RETRY) { - log.error("Task execution failed after {} retries.", MAX_RETRY, e); + if (noRetry || retryCount > MAX_RETRY) { + log.error("Task execution failed, job id {}, task id {}, noRetry {}, retry {}.", + jobId, taskId, noRetry, retryCount, e); onFail(e.getMessage()); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 65b93e81f38472..07ffd303e529c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -168,12 +168,12 @@ private void sendWriteRequest() throws JobException { log.warn("cdc_client RPC timeout api=/api/writeRecords taskId={} jobId={} backend={}:{} timeout_sec={}", taskId, getJobId(), backend.getHost(), backend.getBrpcPort(), Config.streaming_cdc_heavy_rpc_timeout_sec); - // the request may have been dispatched, the retry must not reuse the reader - markJobNeedRebuildReader(); + // the request may have been dispatched and still running remotely + noRetry = true; throw new JobException("cdc_client RPC timeout: /api/writeRecords taskId=" + taskId); } catch (ExecutionException | InterruptedException ex) { log.error("Send write request failed: ", ex); - markJobNeedRebuildReader(); + noRetry = true; throw new JobException(ex); } } @@ -195,13 +195,6 @@ private StreamingInsertJob getStreamingJob() { return job instanceof StreamingInsertJob ? (StreamingInsertJob) job : null; } - private void markJobNeedRebuildReader() { - StreamingInsertJob job = getStreamingJob(); - if (job != null) { - job.setNeedRebuildReader(true); - } - } - private String getToken() throws JobException { String token = ""; try { From 9437a3161fe6d25f1c19e4dc5ef5f94ae27de6db Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 12 Jun 2026 13:46:21 +0800 Subject: [PATCH 3/6] [improve](streaming-job) reuse binlog reader for mysql and tidy bound-BE binding --- .../insert/streaming/StreamingInsertJob.java | 2 ++ .../streaming/StreamingMultiTblTask.java | 19 +++++------ .../reader/mysql/MySqlSourceReader.java | 32 ++++++++++++++++++- 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index b88cca368556cc..fed5f734d32a26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -1575,6 +1575,8 @@ public void replayOffsetProviderIfNeed() throws JobException { if (offsetProvider != null) { // when fe restart, offsetProvider.jobId/sourceProperties may be null offsetProvider.ensureInitialized(getJobId(), getProviderProps()); + // replayOnUpdated skips the transient provider; resync routing BE. + offsetProvider.setBoundBackendId(boundBackendId); offsetProvider.replayIfNeed(this); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 07ffd303e529c4..9b5aa1cbe0010c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -183,16 +183,16 @@ private Backend resolveBackend() throws JobException { if (((JdbcOffset) runningOffset).snapshotSplit()) { return StreamingJobUtils.selectBackend(cloudCluster); } - Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); - if (!(job instanceof StreamingInsertJob)) { - return StreamingJobUtils.selectBackend(cloudCluster); - } - return ((StreamingInsertJob) job).resolveBoundBackend(); + return getStreamingJob().resolveBoundBackend(); } - private StreamingInsertJob getStreamingJob() { + // Fail loud on a dropped/wrong-type job rather than return null and risk a downstream NPE. + private StreamingInsertJob getStreamingJob() throws JobException { Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId()); - return job instanceof StreamingInsertJob ? (StreamingInsertJob) job : null; + if (job == null) { + throw new JobException("Streaming job " + getJobId() + " not found"); + } + return (StreamingInsertJob) job; } private String getToken() throws JobException { @@ -230,10 +230,7 @@ private WriteRecordRequest buildRequestParams() throws JobException { request.setFrontendAddress(feAddr); request.setMaxInterval(jobProperties.getMaxIntervalSecond()); request.setTaskTimeoutMs(getTaskTimeoutMs()); - StreamingInsertJob job = getStreamingJob(); - if (job != null) { - request.setRebuildReader(job.isNeedRebuildReader()); - } + request.setRebuildReader(getStreamingJob().isNeedRebuildReader()); if (offsetProvider instanceof JdbcSourceOffsetProvider) { String schemas = ((JdbcSourceOffsetProvider) offsetProvider).getTableSchemas(); if (schemas != null) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index a4c19aedb9edd2..7dcc836efffdb4 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -434,7 +434,37 @@ private synchronized SplitReadResult prepareBinlogSplit( // Load tableSchemas from FE if available (avoids re-discover on restart) tryLoadTableSchemasFromRequest(baseReq); Tuple2 splitFlag = createBinlogSplit(offsetMeta, baseReq); - this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0; + MySqlBinlogSplit newBinlogSplit = (MySqlBinlogSplit) splitFlag.f0; + + // offset guard: reuse the live binlog reader only when the request start offset equals the + // reader's real consumed position, so steady-state rounds skip reconnect + binlog + // re-position. + if (this.binlogReader != null && this.binlogSplitState != null) { + BinlogOffset requestStart = newBinlogSplit.getStartingOffset(); + BinlogOffset readerPos = this.binlogSplitState.getStartingOffset(); + if (requestStart != null + && readerPos != null + && requestStart.compareTo(readerPos) == 0) { + LOG.info( + "Reuse live binlog reader for job {} at offset {}", + baseReq.getJobId(), + requestStart); + this.binlogSplit = newBinlogSplit; + SplitReadResult reuseResult = new SplitReadResult(); + reuseResult.setSplits(Collections.singletonList(this.binlogSplit)); + Map reuseStates = new HashMap<>(); + reuseStates.put(this.binlogSplit.splitId(), this.binlogSplitState); + reuseResult.setSplitStates(reuseStates); + return reuseResult; + } + LOG.info( + "Rebuild binlog reader for job {}: requestStart={}, readerPos={}", + baseReq.getJobId(), + requestStart, + readerPos); + } + + this.binlogSplit = newBinlogSplit; // Close previous binlog reader to release resources before creating a new one. // This prevents connection leaks when a cancelled task's reader is still active From b3ac27b17acb4514d9dfd14e41deb9117f5d1581 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 12 Jun 2026 14:32:03 +0800 Subject: [PATCH 4/6] [improve](streaming-job) fix tvf task cleanup on pause and harden idle reader reaping --- .../insert/streaming/StreamingInsertJob.java | 2 +- .../streaming/StreamingMultiTblTask.java | 9 +++++---- .../doris/cdcclient/common/Constants.java | 4 ++-- .../apache/doris/cdcclient/common/Env.java | 19 ++++++++++++------- 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index fed5f734d32a26..bd129b39a0fd33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -789,7 +789,7 @@ public void clearRunningStreamTask(JobStatus newJobStatus) { log.info("clear running streaming insert task for job {}, task {}, status {} ", getJobId(), runningStreamTask.getTaskId(), runningStreamTask.getStatus()); runningStreamTask.cancel(JobStatus.STOPPED.equals(newJobStatus) ? false : true); - // Reader release for manual pause is driven by the command entry; failure pause keeps it for reuse. + runningStreamTask.closeOrReleaseResources(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 9b5aa1cbe0010c..dcc84d11392767 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -329,14 +329,15 @@ public void successCallback(CommitOffsetRequest offsetRequest) throws JobExcepti @Override protected void onFail(String errMsg) throws JobException { - // stop the possibly still-running remote reader before reschedule + // Stop a possibly still-running reader now, so the PG slot frees before auto-resume re-acquires it. releaseRemoteReader(); super.onFail(errMsg); } @Override public void cancel(boolean needWaitCancelComplete) { - // No release here: DROP/STOP/PAUSE clean up via /api/close; releasing would orphan the engine. + // No release here: drop/stop free via /api/close and manual pause via /api/releaseReader; + // releasing in cancel would orphan the reused engine. super.cancel(needWaitCancelComplete); } @@ -345,8 +346,8 @@ public void closeOrReleaseResources() { // No-op: the reader is async and reused; releasing here (per-iteration finally) would kill it. } - // Manual pause only (best-effort): release the reader on runningBackendId, keep slot (not drop), - // so resume can rebind elsewhere without two readers on the same source. Failures swallowed; idle reaper backs up. + // Best-effort release on runningBackendId (keep slot): on task failure to stop a stuck/zombie + // reader early, and on manual pause so resume can rebind. Failures swallowed; idle reaper backs up. @Override public void releaseRemoteReader() { if (runningBackendId <= 0) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java index 5d8b76c860c9c5..6d3f735730b3fc 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java @@ -28,8 +28,8 @@ public class Constants { // Idle from-to reader cleanup: release (keep slot) when idle past MULTIPLIER * max_interval. public static final long IDLE_READER_SCAN_INTERVAL_MS = 15_000L; - public static final int IDLE_READER_TIMEOUT_MULTIPLIER = 6; + public static final int IDLE_READER_TIMEOUT_MULTIPLIER = 10; // Floor the idle timeout: PG reader rebuild is costly, absorb heartbeat jitter at small // interval. - public static final long IDLE_READER_MIN_TIMEOUT_MS = 60_000L; + public static final long IDLE_READER_MIN_TIMEOUT_MS = 90_000L; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java index 217f9d31532778..5727a77730bff6 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java @@ -261,6 +261,8 @@ private void releaseIdleReaders() { if (lock == null || !lock.tryLock()) { continue; } + SourceReader toRelease = null; + JobBaseConfig releaseConfig = null; try { JobContext context = jobContexts.get(jobId); if (context == null || context.lastAliveTime <= 0 || context.maxIntervalMs <= 0) { @@ -279,16 +281,19 @@ private void releaseIdleReaders() { jobId, now - context.lastAliveTime); jobContexts.remove(jobId); - if (context.reader != null && context.jobConfig != null) { - try { - context.reader.release(context.jobConfig); - } catch (Exception ex) { - LOG.warn("Failed to release idle reader for job {}", jobId, ex); - } - } + toRelease = context.reader; + releaseConfig = context.jobConfig; } finally { lock.unlock(); } + // Release outside the lock so blocking IO never stalls getReaderAndClaim/detach. + if (toRelease != null && releaseConfig != null) { + try { + toRelease.release(releaseConfig); + } catch (Exception ex) { + LOG.warn("Failed to release idle reader for job {}", jobId, ex); + } + } } } From 18ca280180a97c7c8b4d52424c6a193a92f6e52e Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 12 Jun 2026 15:12:54 +0800 Subject: [PATCH 5/6] [improve](streaming-job) force reader rebuild on manual resume to avoid sharing a still-polling reader --- .../job/extensions/insert/streaming/StreamingInsertJob.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index bd129b39a0fd33..7d82c1a45c80d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -800,6 +800,8 @@ public void onManualStatusAltered(JobStatus newStatus, FailureReason reason) { try { resetFailureInfo(reason); if (JobStatus.PAUSED.equals(newStatus) && runningStreamTask != null) { + // Force resume to swap in a fresh reader, in case the release RPC races or fails. + this.needRebuildReader = true; runningStreamTask.releaseRemoteReader(); } } finally { From 328ae0eba693c30c6f55b22385ef5aa10d005d19 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 12 Jun 2026 16:54:12 +0800 Subject: [PATCH 6/6] [improve](streaming-job) route drop-job close to the backend owning the live reader --- .../streaming/AbstractStreamingTask.java | 4 ++++ .../insert/streaming/StreamingInsertJob.java | 3 ++- .../streaming/StreamingMultiTblTask.java | 5 ++++ .../offset/jdbc/JdbcSourceOffsetProvider.java | 23 ++++++++++++++++--- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java index 5ad3ce5e2db5d2..224fe7c5dbb610 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java @@ -89,6 +89,10 @@ public List getScanBackendIds() { public void releaseRemoteReader() { } + public long getRunningBackendId() { + return -1; + } + public void execute() throws JobException { while (retryCount <= MAX_RETRY) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 7d82c1a45c80d8..df07640370b81a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -1621,7 +1621,8 @@ public void cleanup() throws JobException { } catch (JobException ex) { log.warn("refresh provider props before cleanMeta failed, job id: {}", getJobId(), ex); } - ((JdbcSourceOffsetProvider) this.offsetProvider).cleanMeta(getJobId()); + long runtimeBackendId = runningStreamTask != null ? runningStreamTask.getRunningBackendId() : -1; + ((JdbcSourceOffsetProvider) this.offsetProvider).cleanMeta(getJobId(), runtimeBackendId); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index dcc84d11392767..3d3fb5bda114d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -346,6 +346,11 @@ public void closeOrReleaseResources() { // No-op: the reader is async and reused; releasing here (per-iteration finally) would kill it. } + @Override + public long getRunningBackendId() { + return runningBackendId; + } + // Best-effort release on runningBackendId (keep slot): on task failure to stop a stuck/zombie // reader early, and on manual pause so resume can rebind. Failures swallowed; idle reaper backs up. @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index fd110597f77ca3..e2662844fd4003 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -1007,11 +1007,20 @@ private void initSourceReader() throws JobException { } } - public void cleanMeta(Long jobId) throws JobException { + public void cleanMeta(Long jobId, long runtimeBackendId) throws JobException { // clean meta table StreamingJobUtils.deleteJobMeta(jobId); - // Route to the bound BE so close tears down its live reader; dead/unbound falls back to random. - Backend backend = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId); + // Dropping the slot only succeeds on the BE owning the live reader (it stops its own engine + // first, freeing the slot). Prefer the runtime BE (covers the unbound snapshot phase), then + // the bound BE; both may be alive but not load-available, so route by isAlive. Only when + // neither is alive fall back to a random BE to drop the now-inactive slot. + Backend backend = aliveBackend(runtimeBackendId); + if (backend == null) { + backend = aliveBackend(boundBackendId); + } + if (backend == null) { + backend = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId); + } JobBaseConfig requestParams = new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress()); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() @@ -1036,6 +1045,14 @@ public void cleanMeta(Long jobId) throws JobException { } } + private static Backend aliveBackend(long backendId) { + if (backendId <= 0) { + return null; + } + Backend be = Env.getCurrentSystemInfo().getBackend(backendId); + return be != null && be.isAlive() ? be : null; + } + private String getFrontendAddress() { return Env.getCurrentEnv().getMasterHost() + ":" + Env.getCurrentEnv().getMasterHttpPort(); }