Skip to content

Orchestration Layer Issue Capturing#4167

Merged
abhishekmjain merged 20 commits intoapache:masterfrom
agam-99:ETL-18960
Mar 11, 2026
Merged

Orchestration Layer Issue Capturing#4167
abhishekmjain merged 20 commits intoapache:masterfrom
agam-99:ETL-18960

Conversation

@agam-99
Copy link
Contributor

@agam-99 agam-99 commented Feb 17, 2026

Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

  • My PR addresses the following Gobblin JIRA issues and references them in the PR title.
    • [ETL-18960] Capture orchestration-layer issues in Gobblin service

Description

  • Here are some details about my PR, including screenshots (if applicable):

This PR adds explicit issue capturing at the orchestration/service layer in Gobblin. Previously, issues were only captured at the executor/job level (via AutoTroubleshooterLogAppender), leaving orchestration errors (compilation failures,
SLA violations, job submission failures) invisible to users.

What changed:

  1. New ServiceLayerIssueEmitter utility — emits issues through the existing IssueEventBuilder → Kafka → JobIssueEventHandlerMultiContextIssueRepository pipeline. Issue codes use S prefix + 6-char hex hash, consistent with the
    executor-side T prefix convention.

  2. Instrumented 18 orchestration error points across all DagProc subclasses and FlowCompilationValidationHelper:

    • Flow-level issues (jobName=NA): compilation failures, flow SLA exceeded, DAG not found for kill/resume, concurrent execution blocked
    • Job-level issues (specific jobName): job submission failures (incl. ksudo/proxy validation), job start SLA exceeded, unexpected job states
  3. Added issues array to FlowExecution.pdl — surfaces flow-level issues in the REST API response for cases where jobStatuses is empty (e.g., compilation failures). Previously, issues stored against the NA pseudo-job context were
    never included in the API response.

  4. Updated FlowExecutionResource.convertFlowStatus() — extracts issues from the NA pseudo-job and populates the new flow-level issues array.

Backward compatibility:

  • Executor-side issue capturing (AutoTroubleshooterLogAppender) is completely untouched
  • Job-level issues continue to appear in jobStatuses[i].issues[]
  • New issues field on FlowExecution defaults to [] (Rest.li compatible: [MD-COMPAT]: true)
  • All downstream clients (Java/Python Rest.li, carbon CLI) are backward compatible

Thread safety:

  • ServiceLayerIssueEmitter has no shared mutable state — flow/job context is passed explicitly (not read from MDC), preventing cross-flow contamination

Tests

  • My PR adds the following unit tests:
    • DagProcServiceLayerIssueIntegrationTest.java — verifies MDC context isolation, issue attribution, and context ID format consistency

Commits

  • My commits all reference JIRA issues in their subject lines. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

agam-99 and others added 16 commits February 16, 2026 09:48
…sh codes

- Remove explicit code parameter from emit methods; auto-generate
  S-prefix + 6-char SHA256 hex hash codes consistent with previous
  ServiceLayerLog4j2Appender convention (executor uses T prefix)
- Simplify call signatures: emitFlowIssue(submitter, dagId, severity, summary)
  and emitJobIssue(submitter, dagId, jobName, severity, summary)
- Clean up inline fully-qualified imports across all DagProc subclasses
  and DagProcUtils (IssueSeverity, ExceptionUtils)
- Remove unused imports
- Add thread safety documentation to ServiceLayerIssueEmitter

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…urce

Use addAll instead of reassignment to be consistent with standard
collection patterns and avoid potential issues with list reference.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move ServiceLayerIssueEmitter call after the JOB_FAILED timer event
in DagProcUtils.submitJobToExecutorInternal(). Previously, the issue
event was emitted before JOB_FAILED, so when KafkaJobStatusMonitor
processed the FAILED event, the error classifier would find the
S-prefixed issue in the repository, fail to classify it, and produce
a duplicate T0000 "ErrorCategory: UNKNOWN" issue.

By emitting after JOB_FAILED, the classifier runs on an empty issue
list (matching the old log interceptor behavior where issues were
emitted in the finally block after the exception propagated).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
These files were inadvertently modified during the build process
and should not be part of this PR.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…Emitter

- Revert gobblin-yarn changes (from separate PR apache#4165)
- Revert log4j dependency addition in build.gradle (no longer needed)
- Trim verbose javadoc in ServiceLayerIssueEmitter
- Remove debug log from emit()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* Thread-safe: no shared mutable state; flow/job context passed explicitly to prevent cross-flow contamination.
*/
@Slf4j
public final class ServiceLayerIssueEmitter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's rename this to OrchestratorIssueEmitter

@abhishekmjain
Copy link
Contributor

can you please add unit tests for all the dag procs?

String jobName = this.dagNodeId != null ? this.dagNodeId.getJobName() : JobStatusRetriever.NA_KEY;

try (
Closeable c1 = MDC.putCloseable(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where are these getting used?

jobMetadata, specExecutorUri);
}

private static void submitJobToExecutorInternal(DagManagementStateStore dagManagementStateStore,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a need for a new function here?

@codecov-commenter
Copy link

Codecov Report

❌ Patch coverage is 56.36364% with 48 lines in your changes missing coverage. Please review.
✅ Project coverage is 55.41%. Comparing base (d445b1e) to head (9c49d7e).
⚠️ Report is 6 commits behind head on master.

Files with missing lines Patch % Lines
...modules/utils/FlowCompilationValidationHelper.java 0.00% 9 Missing ⚠️
...s/orchestration/proc/ServiceLayerIssueEmitter.java 78.37% 7 Missing and 1 partial ⚠️
.../apache/gobblin/service/FlowExecutionResource.java 0.00% 7 Missing ⚠️
...vice/modules/orchestration/proc/LaunchDagProc.java 0.00% 5 Missing ⚠️
.../modules/orchestration/proc/ReevaluateDagProc.java 16.66% 4 Missing and 1 partial ⚠️
...estration/proc/EnforceJobStartDeadlineDagProc.java 33.33% 4 Missing ⚠️
...ervice/modules/orchestration/proc/KillDagProc.java 0.00% 4 Missing ⚠️
...in/service/modules/orchestration/proc/DagProc.java 91.66% 1 Missing and 1 partial ⚠️
...orchestration/proc/DeadlineEnforcementDagProc.java 0.00% 2 Missing ⚠️
...tration/proc/EnforceFlowFinishDeadlineDagProc.java 50.00% 1 Missing ⚠️
... and 1 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4167      +/-   ##
============================================
+ Coverage     48.36%   55.41%   +7.04%     
+ Complexity     8742     1624    -7118     
============================================
  Files          1616      311    -1305     
  Lines         64748    10911   -53837     
  Branches       7302     1100    -6202     
============================================
- Hits          31317     6046   -25271     
+ Misses        30645     4344   -26301     
+ Partials       2786      521    -2265     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

agam-99 and others added 3 commits March 10, 2026 19:03
- Revert DagProc.java to master (remove MDC/Closeable, config field)
- Revert DagProcUtils.java MDC and submitJobToExecutorInternal extraction,
  keep only the ServiceLayerIssueEmitter.emitJobIssue call in catch block
- Rewrite DagProcServiceLayerIssueIntegrationTest to test
  ServiceLayerIssueEmitter directly (issue codes, emit methods, null safety)
- Add emitter verification tests to existing test classes:
  LaunchDagProcTest, KillDagProcTest, ReevaluateDagProcTest,
  ResumeDagProcTest, EnforceDeadlineDagProcsTest

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- EnforceDeadlineDagProcsTest: remove duplicate Dag import
- ReevaluateDagProcTest: remove unused EventSubmitter import

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rename across all 14 files (source + tests) for clarity.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
if (queriedJobStatus.getMessage() != null) {
flowMessage = queriedJobStatus.getMessage();
}
if (includeIssues) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getIssues() returns an Optional — if it's empty this will throw NoSuchElementException. Should be:

  queriedJobStatus.getIssues().ifPresent(issues ->
      flowIssues.addAll(issues.stream()
          .map(FlowExecutionResource::convertIssueToRestApiObject)
          .collect(Collectors.toList())));

CLAUDE.md Outdated
Scope: work only within this directory. Do not search parent or sibling directories unless explicitly asked.

## Purpose
Apache Gobblin open-source fork — LinkedIn's ETL framework (sources, converters, writers).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open-source fork?

if (!isRetry && !FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
// this may happen if adding job status in the store failed/delayed after adding a ReevaluateDagAction in KafkaJobStatusMonitor
throw new RuntimeException(String.format("Job status for dagNode %s is %s. Re-evaluate dag action should have been "
String message = String.format("Job status for dagNode %s is %s. Re-evaluate dag action should have been "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this variable needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's used by both emitJobIssue and throw new RuntimeException(message), so extracted it

if (jobStatus.isShouldRetry()) {
log.info("Retrying job: {}, current attempts: {}, max attempts: {}",
DagUtils.getFullyQualifiedJobName(dagNode), jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
// todo - be careful when unsetting this, it is possible that this is set to FAILED because some other job in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we emit a WARN issue from here about job being retried?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, added

- FlowExecutionResource: add null check for getIssues() supplier
  before calling .get() to prevent NPE
- ReevaluateDagProc: emit WARN issue when job is being retried
  with attempt count info
- Remove accidentally committed CLAUDE.md

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Contributor

@pratapaditya04 pratapaditya04 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@abhishekmjain abhishekmjain merged commit 452283e into apache:master Mar 11, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants