[improve](streamingjob) add binlogphase bind be#64423
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
There was a problem hiding this comment.
Pull request overview
This PR improves streaming insert (CDC) binlog-phase stability and reader reuse by pinning binlog execution to a bound BE, preventing displaced tasks from interfering with a successor task’s reader/streamload, and adding an idle-reader reaper on the cdc_client side to release unused readers while keeping upstream slots.
Changes:
- Add BE binding/preference for binlog phase (FE selects preferred BE; offset provider routes RPCs to the bound BE).
- Add cdc_client-side reader ownership + liveness tracking (keepAlive) and an idle-reader cleanup scheduler.
- Prevent displaced tasks from closing shared resources and ensure commit/cleanup uses the owning taskId.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java | Add offset-based guard to reuse/rebuild the live stream reader. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java | Ownership checks to avoid displaced-task cleanup/commit; keepAlive during polling; adjust finishSplitRecords behavior for binlog reuse. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java | keepAlive heartbeat when fetching end offset. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java | Add ownership, keepAlive, FE-requested rebuild, and scheduled idle-reader release. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java | Add idle reader cleanup timing constants. |
| fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java | Add selectBackend overload to prefer a bound backendId. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java | Add bound-backend binding hook for providers. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | Route fetch/compare/clean RPCs to bound BE; store boundBackendId transiently. |
| fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java | Route manual status alter handling to StreamingInsertJob.onManualStatusAltered. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java | Bind BE selection in binlog phase; mark rebuildReader on RPC errors/timeouts; set rebuildReader on requests; adjust release semantics. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Persist boundBackendId; track needRebuildReader; release reader on manual pause; avoid overwriting manual pause failure reason. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java | Add default releaseRemoteReader hook for tasks without CDC readers. |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java | Add rebuildReader flag for FE→cdc_client coordination. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } catch (ExecutionException | InterruptedException ex) { | ||
| log.error("Send write request failed: ", ex); | ||
| markJobNeedRebuildReader(); | ||
| throw new JobException(ex); | ||
| } |
| /** Liveness evidence (FE heartbeat or active poll): keep this job's reader alive. */ | ||
| public void keepAlive(String jobId) { | ||
| JobContext context = jobContexts.get(jobId); | ||
| if (context != null) { | ||
| context.lastAliveTime = System.currentTimeMillis(); | ||
| } | ||
| } |
| jobContexts.remove(jobId); | ||
| if (context.reader != null && context.jobConfig != null) { | ||
| try { | ||
| context.reader.release(context.jobConfig); | ||
| } catch (Exception ex) { |
| boolean isSnapshotSplit = | ||
| readResult != null | ||
| && readResult.getSplit() != null | ||
| && sourceReader.isSnapshotSplit(readResult.getSplit()); | ||
| try { |
|
run buildall |
There was a problem hiding this comment.
I reviewed the actual 13-file PR diff from GitHub. The goal is to pin incremental/binlog streaming work to a BE for reader reuse and rebuild/release readers when needed. The implementation is focused, but I found two lifecycle/data-correctness issues that need to be fixed before approval.\n\nCritical checkpoint conclusions:\n- Goal/test proof: the goal is only partially met; there are no PR tests covering BE rebind, displaced tasks, manual pause, or reader reuse failure paths.\n- Concurrency/lifecycle: not safe yet. Reader ownership can move while job-scoped stream-load state remains, and BE rebinding can leave the old live reader running.\n- Data writes/transactionality: blocking issue. Rows buffered by a displaced task can be flushed by a later task without the old offset being committed.\n- Persistence/compatibility: boundBackendId is persisted, and I did not find a thrift/storage-format change. The remaining concern is operational lifecycle during rebind/failover rather than serialization compatibility.\n- Configuration/observability: no new config items; logs are present, but they do not compensate for the lifecycle gaps.\n- Parallel paths: snapshot vs binlog behavior was considered. The issues are specific to the new binlog reader reuse/rebind path and displaced async writes.\n\nUser focus: no additional user-provided focus points were present.
| } | ||
| } | ||
| if (!stillOwner) { | ||
| LOG.info( |
There was a problem hiding this comment.
This early return leaves the job-scoped DorisBatchStreamLoad in batchStreamLoadMap untouched. The path is reachable after a task has already called writeRecord() and then loses ownership because FE released/rebuilt/rebound the reader; in that case the old task skips forceFlush() and commitOffset(), but its rows can still remain in bufferMap/flushQueue. The next task reuses the same loader in getOrCreateBatchStreamLoad() and only clears LoadStatistic, so it can flush the previous task's uncommitted rows under the new task while FE never advanced the old offset. Please discard or isolate the stream-load buffers when skipping commit, without closing a successor task's active loader.
|
|
||
| // Binlog phase: prefer the bound BE in selectBackend; rebind + persist on change. | ||
| public Backend resolveBoundBackend() throws JobException { | ||
| Backend selected = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId); |
There was a problem hiding this comment.
When the preferred BE is no longer load-available, this rebinds the job to a new BE but does not release the live reader on the old boundBackendId. isLoadAvailable() can be false while the BE process is still alive, and the binlog path intentionally keeps the old cdc_client reader open across tasks. Dispatching the next binlog task to the new BE can therefore create a second reader for the same source/PG replication slot or duplicate MySQL binlog consumption until the idle reaper eventually runs. Please cleanly release the previous bound BE's reader before persisting the new binding, or prevent rebind until that reader is known stopped.
TPC-H: Total hot run time: 29024 ms |
TPC-DS: Total hot run time: 168887 ms |
|
/review |
|
run buildall |
1 similar comment
|
run buildall |
There was a problem hiding this comment.
I found two additional blocking issues beyond the existing inline review threads: the persisted bound BE is not replayed into the transient offset provider, and the CDC client still treats a point-in-time ownership check as a lease while it continues to touch the shared reader/loader state.
Critical checkpoint conclusions:
- Goal/test: the PR goal is clear, but I did not run tests in this review-only pass; lifecycle/concurrency tests around FE failover and task takeover would be needed.
- Focused change: the change is focused on CDC reader reuse and bound-BE routing.
- Concurrency/locks/lifecycle: not satisfied. Existing threads already cover several races/leaks, and the added positive-owner race below is another blocker.
- Persistence/replay/data consistency: not satisfied because journal replay can leave provider routing stale after
boundBackendIdchanges. - Config/compatibility/FE-BE variables: no additional compatibility issue found beyond the bound-BE routing sync problem.
- Tests/observability/performance: no additional test evidence was provided in this review context; no separate performance regression identified.
User focus: no additional user-provided review focus was present, so I performed the full PR review without a special focus area.
| @@ -591,7 +602,21 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception | |||
| writeRecordRequest.getTaskId()); | |||
|
|
|||
| } finally { | |||
There was a problem hiding this comment.
This is only a point-in-time ownership read. A stale task can observe stillOwner == true, then the FE timeout/auto-resume path can dispatch a successor that calls getReaderAndClaim() before this method reaches cleanupReaderResources(), forceFlush(), resetTaskId(), or commitOffset(). Those later operations use the job-scoped DorisBatchStreamLoad; the successor can already have changed currentTaskId/load props and cleared stats, so the old task can flush or commit mixed state after it no longer owns the reader. This is distinct from the existing !stillOwner early-return thread: here the stale task first observes true and then loses ownership before the guarded operations execute. The same lease gap exists in the catch-side closeJobStreamLoad() guard above. Please make the owner-to-cleanup/flush/commit decision atomic with ownership transition, or revalidate under an owned lease immediately before touching the reader/loader and committing.
| public void onStreamTaskSuccess(AbstractStreamingTask task) throws JobException { | ||
| try { | ||
| this.needRebuildReader = false; | ||
| resetFailureInfo(null); |
There was a problem hiding this comment.
This copies the persisted binding into the job, but the existing JdbcSourceOffsetProvider keeps its transient boundBackendId unchanged. gsonPostProcess() covers full-image load because createOffsetProvider() sets the bound BE, but journal replay uses this method on an already-created provider. If this follower becomes master before another binlog dispatch calls resolveBoundBackend(), handleRunningState().fetchMeta() or the scheduler's hasMoreDataToConsume() can route provider RPCs (fetchEndOffset/compareOffset, and later cleanup) with boundBackendId == 0 or a stale value, probing or closing a random BE instead of the persisted bound BE. Please sync offsetProvider.setBoundBackendId(boundBackendId) whenever replay/copy updates this field.
TPC-H: Total hot run time: 28958 ms |
TPC-H: Total hot run time: 28970 ms |
TPC-DS: Total hot run time: 168893 ms |
TPC-DS: Total hot run time: 169481 ms |
FE Regression Coverage ReportIncrement line coverage |
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)