Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public class WriteRecordRequest extends JobBaseRecordRequest {
private String token;
private String taskId;
private Map<String, String> streamLoadProps;
// previous task ended abnormally, rebuild reader instead of reusing
private boolean rebuildReader;
Comment thread
JNSimba marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public abstract class AbstractStreamingTask {
private static final int MAX_RETRY = 3;
private static final String LABEL_SPLITTER = "_";
private int retryCount = 0;
// in-place retry would reuse this taskId, breaking ownership-based zombie isolation
protected volatile boolean noRetry;
protected String labelName;
protected Offset runningOffset;
protected UserIdentity userIdentity;
Expand Down Expand Up @@ -83,6 +85,14 @@ public List<Long> getScanBackendIds() {

public abstract void closeOrReleaseResources();

// Release the remote cdc reader (keep slot). No-op for tasks without a cdc reader (e.g. TVF).
public void releaseRemoteReader() {
}

public long getRunningBackendId() {
return -1;
}

public void execute() throws JobException {
while (retryCount <= MAX_RETRY) {
try {
Expand All @@ -96,8 +106,9 @@ public void execute() throws JobException {
}
this.errMsg = e.getMessage();
retryCount++;
if (retryCount > MAX_RETRY) {
log.error("Task execution failed after {} retries.", MAX_RETRY, e);
if (noRetry || retryCount > MAX_RETRY) {
log.error("Task execution failed, job id {}, task id {}, noRetry {}, retry {}.",
jobId, taskId, noRetry, retryCount, e);
onFail(e.getMessage());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.tablefunction.S3TableValuedFunction;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
Expand Down Expand Up @@ -180,6 +181,15 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
@SerializedName("st")
private List<String> syncTables;

// Incremental(binlog) phase: bound BE for reader reuse; <=0 = unbound (replay-safe default).
@SerializedName("bbe")
private volatile long boundBackendId;

// previous task ended abnormally (or FE restarted), next dispatch must rebuild the cdc reader
@Getter
@Setter
private transient volatile boolean needRebuildReader = true;

// The sampling window starts at the beginning of the sampling window.
// If the error rate exceeds `max_filter_ratio` within the window, the sampling fails.
@Setter
Expand Down Expand Up @@ -393,6 +403,7 @@ private SourceOffsetProvider createOffsetProvider(Map<String, String> jdbcSource
provider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, jdbcSourceProps);
}
provider.setCloudCluster(this.cloudCluster);
provider.setBoundBackendId(boundBackendId);
return provider;
}

Expand Down Expand Up @@ -646,6 +657,18 @@ offsetProvider, getConvertedSourceProperties(), targetDb, targetProperties, jobP
getCreateUser(), cloudCluster);
}

// Binlog phase: prefer the bound BE in selectBackend; rebind + persist on change.
public Backend resolveBoundBackend() throws JobException {
Backend selected = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId);
Comment thread
JNSimba marked this conversation as resolved.
if (selected.getId() != boundBackendId) {
log.info("bind job {} to backend {} (was {})", getJobId(), selected.getId(), boundBackendId);
boundBackendId = selected.getId();
logUpdateOperation();
}
offsetProvider.setBoundBackendId(boundBackendId);
return selected;
}

private Map<String, String> getConvertedSourceProperties() throws JobException {
if (convertedSourceProperties == null) {
this.convertedSourceProperties = StreamingJobUtils.convertCertFile(getDbId(), sourceProperties);
Expand Down Expand Up @@ -770,6 +793,22 @@ public void clearRunningStreamTask(JobStatus newJobStatus) {
}
}

// Command entry for a manual status change: reset the failure/retry budget, and on manual pause
// release the reader (keep slot). "Manual" is decided by the caller, never by reading failureReason.
public void onManualStatusAltered(JobStatus newStatus, FailureReason reason) {
Comment thread
JNSimba marked this conversation as resolved.
lock.writeLock().lock();
Comment thread
JNSimba marked this conversation as resolved.
try {
resetFailureInfo(reason);
if (JobStatus.PAUSED.equals(newStatus) && runningStreamTask != null) {
// Force resume to swap in a fresh reader, in case the release RPC races or fails.
this.needRebuildReader = true;
runningStreamTask.releaseRemoteReader();
}
} finally {
lock.writeLock().unlock();
}
}

public boolean hasMoreDataToConsume() {
return offsetProvider.hasMoreDataToConsume();
}
Expand All @@ -791,9 +830,14 @@ public void onTaskSuccess(StreamingJobSchedulerTask task) throws JobException {

public void onStreamTaskFail(AbstractStreamingTask task) throws JobException {
try {
this.needRebuildReader = true;
failedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
this.failureReason = new FailureReason(task.getErrMsg());
// Don't overwrite a manual pause: a late failure callback would otherwise let auto resume wake it.
if (this.getFailureReason() == null
|| !InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) {
this.failureReason = new FailureReason(task.getErrMsg());
}
if (MetricRepo.isInit) {
MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
}
Expand All @@ -805,6 +849,7 @@ public void onStreamTaskFail(AbstractStreamingTask task) throws JobException {

public void onStreamTaskSuccess(AbstractStreamingTask task) throws JobException {
try {
this.needRebuildReader = false;
resetFailureInfo(null);
Comment thread
JNSimba marked this conversation as resolved.
succeedTaskCount.incrementAndGet();
lastTaskSuccessTime = System.currentTimeMillis();
Expand Down Expand Up @@ -980,6 +1025,7 @@ public void replayOnUpdated(StreamingInsertJob replayJob) {
setFailedTaskCount(replayJob.getFailedTaskCount());
setCanceledTaskCount(replayJob.getCanceledTaskCount());
setLastTaskSuccessTime(replayJob.getLastTaskSuccessTime());
this.boundBackendId = replayJob.boundBackendId;
}

/**
Expand Down Expand Up @@ -1402,6 +1448,9 @@ public void gsonPostProcess() throws IOException {
if (null == lock) {
this.lock = new ReentrantReadWriteLock(true);
}

// a stale reader may survive on the bound BE across FE restart/failover
this.needRebuildReader = true;
}

/**
Expand Down Expand Up @@ -1528,6 +1577,8 @@ public void replayOffsetProviderIfNeed() throws JobException {
if (offsetProvider != null) {
// when fe restart, offsetProvider.jobId/sourceProperties may be null
offsetProvider.ensureInitialized(getJobId(), getProviderProps());
// replayOnUpdated skips the transient provider; resync routing BE.
offsetProvider.setBoundBackendId(boundBackendId);
offsetProvider.replayIfNeed(this);
}
}
Expand Down Expand Up @@ -1570,7 +1621,8 @@ public void cleanup() throws JobException {
} catch (JobException ex) {
log.warn("refresh provider props before cleanMeta failed, job id: {}", getJobId(), ex);
}
((JdbcSourceOffsetProvider) this.offsetProvider).cleanMeta(getJobId());
long runtimeBackendId = runningStreamTask != null ? runningStreamTask.getRunningBackendId() : -1;
((JdbcSourceOffsetProvider) this.offsetProvider).cleanMeta(getJobId(), runtimeBackendId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void run() throws JobException {
}

private void sendWriteRequest() throws JobException {
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
Backend backend = resolveBackend();
log.info("start to run streaming multi task {} in backend {}/{}, offset is {}",
taskId, backend.getId(), backend.getHost(), runningOffset.toString());
this.runningBackendId = backend.getId();
Expand Down Expand Up @@ -168,13 +168,33 @@ private void sendWriteRequest() throws JobException {
log.warn("cdc_client RPC timeout api=/api/writeRecords taskId={} jobId={} backend={}:{} timeout_sec={}",
taskId, getJobId(), backend.getHost(), backend.getBrpcPort(),
Config.streaming_cdc_heavy_rpc_timeout_sec);
// the request may have been dispatched and still running remotely
noRetry = true;
throw new JobException("cdc_client RPC timeout: /api/writeRecords taskId=" + taskId);
} catch (ExecutionException | InterruptedException ex) {
log.error("Send write request failed: ", ex);
noRetry = true;
throw new JobException(ex);
}
}

private Backend resolveBackend() throws JobException {
// Snapshot phase keeps per-round selection; binlog phase binds to a fixed BE for reuse.
if (((JdbcOffset) runningOffset).snapshotSplit()) {
return StreamingJobUtils.selectBackend(cloudCluster);
}
return getStreamingJob().resolveBoundBackend();
}

// Fail loud on a dropped/wrong-type job rather than return null and risk a downstream NPE.
private StreamingInsertJob getStreamingJob() throws JobException {
Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
if (job == null) {
throw new JobException("Streaming job " + getJobId() + " not found");
}
return (StreamingInsertJob) job;
}

private String getToken() throws JobException {
String token = "";
try {
Expand Down Expand Up @@ -210,6 +230,7 @@ private WriteRecordRequest buildRequestParams() throws JobException {
request.setFrontendAddress(feAddr);
request.setMaxInterval(jobProperties.getMaxIntervalSecond());
request.setTaskTimeoutMs(getTaskTimeoutMs());
request.setRebuildReader(getStreamingJob().isNeedRebuildReader());
if (offsetProvider instanceof JdbcSourceOffsetProvider) {
String schemas = ((JdbcSourceOffsetProvider) offsetProvider).getTableSchemas();
if (schemas != null) {
Expand Down Expand Up @@ -308,28 +329,31 @@ public void successCallback(CommitOffsetRequest offsetRequest) throws JobExcepti

@Override
protected void onFail(String errMsg) throws JobException {
// Release this task's reader before reschedule so it stops competing for the shared slot.
// Stop a possibly still-running reader now, so the PG slot frees before auto-resume re-acquires it.
releaseRemoteReader();
super.onFail(errMsg);
}

@Override
public void cancel(boolean needWaitCancelComplete) {
// No release here: DROP/STOP/PAUSE clean up via /api/close; releasing would orphan the engine.
// No release here: drop/stop free via /api/close and manual pause via /api/releaseReader;
// releasing in cancel would orphan the reused engine.
super.cancel(needWaitCancelComplete);
}

@Override
public void closeOrReleaseResources() {
// No-op: reader is shared across tasks; release on reschedule is done in onFail().
// No-op: the reader is async and reused; releasing here (per-iteration finally) would kill it.
}

/**
* Best-effort, onFail reschedule only: stop this job's reader on {@link #runningBackendId} so a
* reschedule to another backend never leaves two readers competing for the same source (e.g. one
* PG replication slot, which is kept, not dropped). Failures are swallowed and must not block
* rescheduling.
*/
@Override
public long getRunningBackendId() {
return runningBackendId;
}

// Best-effort release on runningBackendId (keep slot): on task failure to stop a stuck/zombie
// reader early, and on manual pause so resume can rebind. Failures swallowed; idle reaper backs up.
@Override
public void releaseRemoteReader() {
if (runningBackendId <= 0) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public void alterJobStatus(String jobName, JobStatus jobStatus, FailureReason re
checkSameStatus(a, jobStatus);
alterJobStatus(a.getJobId(), jobStatus);
if (a instanceof StreamingInsertJob) {
((StreamingInsertJob) a).resetFailureInfo(reason);
((StreamingInsertJob) a).onManualStatusAltered(jobStatus, reason);
}
} catch (JobException e) {
throw new JobException("Alter job status error, jobName is %s, errorMsg is %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ default void initOnCreate(List<String> syncTables) throws JobException {}
*/
default void setCloudCluster(String cloudCluster) {}

/** Bind the BE this job is pinned to in the binlog phase, for reader-reuse heartbeat routing. */
default void setBoundBackendId(long boundBackendId) {}

/**
* Fetch remote meta information, such as listing files in S3 or getting latest offsets in Kafka.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public class JdbcSourceOffsetProvider implements SourceOffsetProvider {

transient volatile String cloudCluster;

// Route fetchEndOffset/compareOffset to the bound BE (synced from job, not persisted).
transient volatile long boundBackendId;

/** Split progress (cdc-fetch view), >= committedSplitProgress. Rebuilt on restart. */
transient SplitProgress cdcSplitProgress = new SplitProgress();

Expand Down Expand Up @@ -254,9 +257,14 @@ public void updateOffset(Offset offset) {
this.currentOffset = newOffset;
}

@Override
public void setBoundBackendId(long boundBackendId) {
this.boundBackendId = boundBackendId;
}

@Override
public void fetchRemoteMeta(Map<String, String> properties) throws Exception {
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
Backend backend = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId);
JobBaseConfig requestParams =
new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
Expand Down Expand Up @@ -354,7 +362,7 @@ public boolean hasMoreDataToConsume() {

private boolean compareOffset(Map<String, String> offsetFirst, Map<String, String> offsetSecond)
throws JobException {
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
Backend backend = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId);
CompareOffsetRequest requestParams =
new CompareOffsetRequest(getJobId(), sourceType.name(), sourceProperties,
getFrontendAddress(), offsetFirst, offsetSecond);
Expand Down Expand Up @@ -999,10 +1007,20 @@ private void initSourceReader() throws JobException {
}
}

public void cleanMeta(Long jobId) throws JobException {
public void cleanMeta(Long jobId, long runtimeBackendId) throws JobException {
// clean meta table
StreamingJobUtils.deleteJobMeta(jobId);
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
// Dropping the slot only succeeds on the BE owning the live reader (it stops its own engine
// first, freeing the slot). Prefer the runtime BE (covers the unbound snapshot phase), then
// the bound BE; both may be alive but not load-available, so route by isAlive. Only when
// neither is alive fall back to a random BE to drop the now-inactive slot.
Backend backend = aliveBackend(runtimeBackendId);
if (backend == null) {
backend = aliveBackend(boundBackendId);
}
if (backend == null) {
backend = StreamingJobUtils.selectBackend(cloudCluster, boundBackendId);
}
JobBaseConfig requestParams =
Comment thread
JNSimba marked this conversation as resolved.
new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
Expand All @@ -1027,6 +1045,14 @@ public void cleanMeta(Long jobId) throws JobException {
}
}

private static Backend aliveBackend(long backendId) {
if (backendId <= 0) {
return null;
}
Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
return be != null && be.isAlive() ? be : null;
}

private String getFrontendAddress() {
return Env.getCurrentEnv().getMasterHost() + ":" + Env.getCurrentEnv().getMasterHttpPort();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ public static JdbcClient getJdbcClient(DataSourceType sourceType, Map<String, St
}

public static Backend selectBackend(String cloudCluster) throws JobException {
return selectBackend(cloudCluster, -1);
}

// Prefer preferredBackendId if it is in the cluster's available BEs (also enforces cloud group).
public static Backend selectBackend(String cloudCluster, long preferredBackendId) throws JobException {
if (Config.isCloudMode() && StringUtils.isNotEmpty(cloudCluster)) {
List<Backend> bes = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getBackendsByClusterName(cloudCluster)
Expand All @@ -284,10 +289,23 @@ public static Backend selectBackend(String cloudCluster) throws JobException {
throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG
+ ", compute_group: " + cloudCluster);
}
if (preferredBackendId > 0) {
for (Backend be : bes) {
if (be.getId() == preferredBackendId) {
return be;
}
}
}
int idx = getLastSelectedBackendIndexAndUpdate();
return bes.get(Math.floorMod(idx, bes.size()));
}

if (preferredBackendId > 0) {
Backend bound = Env.getCurrentSystemInfo().getBackend(preferredBackendId);
if (bound != null && bound.isLoadAvailable()) {
return bound;
}
}
BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
.setEnableRoundRobin(true).needLoadAvailable().build();
policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
Expand Down
Loading
Loading