Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ record FlowExecution {
* Status of jobs belonging to the flow
*/
jobStatuses: array[JobStatus]

/**
* Collection of flow-level issues from the orchestration layer.
* Populated when errors occur before any job starts (e.g., compilation failures)
* or when an error affects the entire flow (e.g., flow SLA exceeded).
*/
issues: array[Issue] = []
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -157,6 +158,7 @@ public static FlowExecution convertFlowStatus(FlowStatus monitoringFlowStatus,
long flowEndTime = 0L;
long maxJobEndTime = Long.MIN_VALUE;
String flowMessage = "";
List<org.apache.gobblin.service.Issue> flowIssues = new ArrayList<>();

while (jobStatusIter.hasNext()) {
org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus = jobStatusIter.next();
Expand All @@ -167,6 +169,11 @@ public static FlowExecution convertFlowStatus(FlowStatus monitoringFlowStatus,
if (queriedJobStatus.getMessage() != null) {
flowMessage = queriedJobStatus.getMessage();
}
if (includeIssues && queriedJobStatus.getIssues() != null) {
flowIssues.addAll(queriedJobStatus.getIssues().get().stream()
.map(FlowExecutionResource::convertIssueToRestApiObject)
.collect(Collectors.toList()));
}
continue;
}

Expand Down Expand Up @@ -220,7 +227,8 @@ public static FlowExecution convertFlowStatus(FlowStatus monitoringFlowStatus,
.setExecutionEndTime(flowEndTime))
.setMessage(flowMessage)
.setExecutionStatus(monitoringFlowStatus.getFlowExecutionStatus())
.setJobStatuses(jobStatusArray);
.setJobStatuses(jobStatusArray)
.setIssues(new IssueArray(flowIssues));
}

private static org.apache.gobblin.service.Issue convertIssueToRestApiObject(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.exception.ExceptionUtils;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.event.EventSubmitter;
Expand All @@ -43,6 +45,7 @@
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
Expand Down Expand Up @@ -148,6 +151,9 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
jobFailedTimer.stop(jobMetadata);
}
}
OrchestratorIssueEmitter.emitJobIssue(DagProc.eventSubmitter, dagId, DagUtils.getJobName(dagNode),
IssueSeverity.ERROR, message + " due to " + e.getMessage(),
ExceptionUtils.getStackTrace(e));
try {
// when there is no exception, quota will be released in job status monitor or re-evaluate dag proc
dagManagementStateStore.releaseQuota(dagNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
Expand Down Expand Up @@ -65,6 +66,8 @@ protected boolean isDagStillPresent(Optional<Dag<JobExecutionPlan>> dag, DagMana
if (!dag.isPresent()) {
log.error("Dag not present when validating {}. It may already have cancelled/finished. Dag {}",
getDagId(), dagAction);
OrchestratorIssueEmitter.emitFlowIssue(eventSubmitter, getDagId(), IssueSeverity.WARN,
"DAG not present during deadline check. It may already have cancelled/finished: " + getDagId());
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagUtils;
Expand Down Expand Up @@ -62,11 +63,16 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,

dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
dag.setMessage("Flow killed due to exceeding SLA of " + flowFinishDeadline + " ms");
OrchestratorIssueEmitter.emitFlowIssue(eventSubmitter, getDagId(), IssueSeverity.ERROR,
"Flow killed due to exceeding SLA of " + flowFinishDeadline + " ms");
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
} else {
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
log.error("EnforceFlowFinishDeadline dagAction received before due time. flowStartTime {}, flowFinishDeadline {} ", flowStartTime, flowFinishDeadline);
OrchestratorIssueEmitter.emitFlowIssue(eventSubmitter, getDagId(), IssueSeverity.WARN,
"EnforceFlowFinishDeadline dagAction received before due time. flowStartTime "
+ flowStartTime + ", flowFinishDeadline " + flowFinishDeadline);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
Expand Down Expand Up @@ -59,6 +60,8 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
// this should never happen; a job for which DEADLINE_ENFORCEMENT dag action is created must have a dag node in store
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Dag node {} not found for EnforceJobStartDeadlineDagProc", getDagNodeId());
OrchestratorIssueEmitter.emitJobIssue(eventSubmitter, getDagId(), getDagNodeId().getJobName(),
IssueSeverity.ERROR, "Dag node not found for EnforceJobStartDeadlineDagProc: " + getDagNodeId());
return;
}

Expand All @@ -68,6 +71,8 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
if (!jobStatus.isPresent()) {
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Some job status should be present for dag node {} that this EnforceJobStartDeadlineDagProc belongs.", getDagNodeId());
OrchestratorIssueEmitter.emitJobIssue(eventSubmitter, getDagId(), getDagNodeId().getJobName(),
IssueSeverity.ERROR, "Job status missing for EnforceJobStartDeadlineDagProc: " + getDagNodeId());
return;
}

Expand All @@ -81,6 +86,9 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore,
DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore);
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
dag.setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
OrchestratorIssueEmitter.emitJobIssue(eventSubmitter, getDagId(), DagUtils.getJobName(dagNode),
IssueSeverity.ERROR, "Job exceeded the job start deadline of " + timeOutForJobStart
+ " ms after orchestration. Job: " + DagUtils.getJobName(dagNode));
}
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
Expand Down Expand Up @@ -60,6 +61,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
if (!dag.isPresent()) {
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Did not find Dag with id {}, it might be already cancelled/finished and thus cleaned up from the store.", getDagId());
OrchestratorIssueEmitter.emitFlowIssue(eventSubmitter, getDagId(), IssueSeverity.WARN,
"DAG not found for kill request. It might be already cancelled/finished: " + getDagId());
return;
}

Expand All @@ -73,6 +76,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
} else {
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
log.error("Did not find Dag node with id {}, it might be already cancelled/finished and thus cleaned up from the store.", getDagNodeId());
OrchestratorIssueEmitter.emitJobIssue(eventSubmitter, getDagId(), getDagNodeId().getJobName(),
IssueSeverity.WARN, "DagNode not found for kill request. It might be already cancelled/finished: " + getDagNodeId());
}
} else {
DagProcUtils.cancelDag(dag.get(), dagManagementStateStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.net.URISyntaxException;
import java.util.Optional;

import org.apache.commons.lang3.exception.ExceptionUtils;

import com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -29,6 +31,7 @@
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagUtils;
Expand Down Expand Up @@ -72,6 +75,9 @@ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dag
}
return dag;
} catch (URISyntaxException | SpecNotFoundException | InterruptedException | IOException e) {
OrchestratorIssueEmitter.emitFlowIssue(eventSubmitter, getDagId(), IssueSeverity.ERROR,
"Flow launch initialization failed: " + e.getClass().getSimpleName() + " - " + e.getMessage(),
ExceptionUtils.getStackTrace(e));
throw new RuntimeException(e);
}
}
Expand All @@ -81,6 +87,8 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
if (!dag.isPresent()) {
log.warn("Dag with id " + getDagId() + " could not be compiled.");
OrchestratorIssueEmitter.emitFlowIssue(eventSubmitter, getDagId(), IssueSeverity.ERROR,
"DAG could not be compiled for " + getDagId());
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
} else {
DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), getDagId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration.proc;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Collections;

import org.apache.commons.codec.digest.DigestUtils;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.troubleshooter.Issue;
import org.apache.gobblin.runtime.troubleshooter.IssueEventBuilder;
import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;


/**
* Emits orchestration-layer issues through the existing issue pipeline:
* {@link IssueEventBuilder} -> Kafka -> {@link org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler}
* -> {@link org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository}.
*
* Issue codes use {@code S} prefix + 6-char hex hash (service-layer), matching the executor-side {@code T} prefix convention.
* Thread-safe: no shared mutable state; flow/job context passed explicitly to prevent cross-flow contamination.
*/
@Slf4j
public final class OrchestratorIssueEmitter {

private static final int HASH_LENGTH = 6;
private static final String HASH_PREFIX = "S";

private OrchestratorIssueEmitter() {
}

/** Emit a flow-level issue (jobName = "NA") for errors before job creation or affecting the entire flow. */
public static void emitFlowIssue(EventSubmitter eventSubmitter, Dag.DagId dagId,
IssueSeverity severity, String summary) {
emit(eventSubmitter, dagId.getFlowGroup(), dagId.getFlowName(),
String.valueOf(dagId.getFlowExecutionId()), JobStatusRetriever.NA_KEY, severity, summary, "");
}

public static void emitFlowIssue(EventSubmitter eventSubmitter, Dag.DagId dagId,
IssueSeverity severity, String summary, String details) {
emit(eventSubmitter, dagId.getFlowGroup(), dagId.getFlowName(),
String.valueOf(dagId.getFlowExecutionId()), JobStatusRetriever.NA_KEY, severity, summary, details);
}

/** Emit a flow-level issue using string identifiers (for callers without a DagId). */
public static void emitFlowIssue(EventSubmitter eventSubmitter, String flowGroup, String flowName,
String flowExecutionId, IssueSeverity severity, String summary) {
emit(eventSubmitter, flowGroup, flowName, flowExecutionId, JobStatusRetriever.NA_KEY, severity, summary, "");
}

/** Emit a job-level issue tied to a specific job. */
public static void emitJobIssue(EventSubmitter eventSubmitter, Dag.DagId dagId, String jobName,
IssueSeverity severity, String summary) {
emit(eventSubmitter, dagId.getFlowGroup(), dagId.getFlowName(),
String.valueOf(dagId.getFlowExecutionId()), jobName, severity, summary, "");
}

public static void emitJobIssue(EventSubmitter eventSubmitter, Dag.DagId dagId, String jobName,
IssueSeverity severity, String summary, String details) {
emit(eventSubmitter, dagId.getFlowGroup(), dagId.getFlowName(),
String.valueOf(dagId.getFlowExecutionId()), jobName, severity, summary, details);
}

static String generateIssueCode(String summary) {
return HASH_PREFIX + DigestUtils.sha256Hex(summary).substring(0, HASH_LENGTH).toUpperCase();
}

private static void emit(EventSubmitter eventSubmitter, String flowGroup, String flowName,
String flowExecutionId, String jobName, IssueSeverity severity, String summary, String details) {
try {
Issue issue = Issue.builder()
.time(ZonedDateTime.now(ZoneOffset.UTC))
.severity(severity)
.code(generateIssueCode(summary))
.summary(summary)
.details(details != null ? details : "")
.sourceClass(OrchestratorIssueEmitter.class.getName())
.properties(Collections.emptyMap())
.build();

IssueEventBuilder eventBuilder = new IssueEventBuilder(IssueEventBuilder.JOB_ISSUE);
eventBuilder.setIssue(issue);
eventBuilder.addMetadata(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
eventBuilder.addMetadata(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
eventBuilder.addMetadata(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, flowExecutionId);
eventBuilder.addMetadata(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
eventBuilder.addMetadata("issueSource", "service-layer");

eventSubmitter.submit(eventBuilder);
} catch (Exception e) {
log.error("Failed to emit service-layer issue: summary={}", summary, e);
}
}
}
Loading
Loading