Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.LocalDAGAppMaster;
import org.apache.tez.dag.app.LocalNodeContext;
import org.apache.tez.dag.app.NodeContext;
import org.apache.tez.dag.app.dag.DAG;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -369,10 +371,11 @@ public void run() {
long appSubmitTime = System.currentTimeMillis();

dagAppMaster =
createDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
createDAGAppMaster(applicationAttemptId, cId,
SystemClock.getInstance(), appSubmitTime, isSession, userDir.toUri().getPath(),
new String[] {localDir.toUri().getPath()}, new String[] {logDir.toUri().getPath()},
amCredentials, UserGroupInformation.getCurrentUser().getShortUserName());
amCredentials, UserGroupInformation.getCurrentUser().getShortUserName(),
new LocalNodeContext(currentHost, nmPort, nmHttpPort));
DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
clientHandler = new DAGClientHandler(dagAppMaster);
((AsyncDispatcher)dagAppMaster.getDispatcher()).setDrainEventsOnStop();
Expand All @@ -395,27 +398,32 @@ public void run() {

// this can be overridden by test code to create a mock app
@VisibleForTesting
protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId cId, String currentHost, int nmPort,
int nmHttpPort,
Clock clock, long appSubmitTime, boolean isSession,
String userDir,
String[] localDirs, String[] logDirs,
Credentials credentials, String jobUserName) throws
IOException {
protected DAGAppMaster createDAGAppMaster(
ApplicationAttemptId applicationAttemptId,
ContainerId cId,
Clock clock,
long appSubmitTime,
boolean isSession,
String userDir,
String[] localDirs,
String[] logDirs,
Credentials credentials,
String jobUserName,
NodeContext nodeContext)
throws IOException {

// Read in additional information about external services
AMPluginDescriptorProto amPluginDescriptorProto =
TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
.getAmPluginDescriptor();

return isLocalWithoutNetwork
? new LocalDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
? new LocalDAGAppMaster(applicationAttemptId, cId,
SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs,
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto)
: new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, nodeContext)
: new DAGAppMaster(applicationAttemptId, cId,
SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs,
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto);
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, nodeContext);
}

@Override
Expand Down
88 changes: 47 additions & 41 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,16 @@
* The state machine is encapsulated in the implementation of Job interface.
* All state changes happens via Job interface. Each event
* results in a Finite State Transition in Job.
*
* <p>
* Tez DAG AppMaster is the composition of loosely coupled services. The services
* interact with each other via events. The components resembles the
* Actors model. The component acts on received event and send out the
* events to other components.
* This keeps it highly concurrent with no or minimal synchronization needs.
*
* <p>
* The events are dispatched by a central Dispatch mechanism. All components
* register to the Dispatcher.
*
* <p>
* The information is shared across different components using AppContext.
*/

Expand All @@ -245,9 +245,6 @@ public class DAGAppMaster extends AbstractService {
private String appName;
private final ApplicationAttemptId appAttemptID;
private final ContainerId containerID;
private final String nmHost;
private final int nmPort;
private final int nmHttpPort;
private final String workingDirectory;
private final String[] localDirs;
private final String[] logDirs;
Expand Down Expand Up @@ -309,6 +306,7 @@ public class DAGAppMaster extends AbstractService {

private ListeningExecutorService execService;
private final PluginManager pluginManager;
private final NodeContext nodeContext;


/**
Expand Down Expand Up @@ -344,20 +342,18 @@ public class DAGAppMaster extends AbstractService {
private TezDAGHook[] hooks = {};

public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
ContainerId containerId,
Clock clock, long appSubmitTime, boolean isSession, String workingDirectory,
String [] localDirs, String[] logDirs, String clientVersion,
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) {
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto, NodeContext nodeContext) {
super(DAGAppMaster.class.getName());
this.mdcContext = LoggingUtils.setupLog4j();
this.clock = clock;
this.startTime = clock.getTime();
this.appSubmitTime = appSubmitTime;
this.appAttemptID = applicationAttemptId;
this.containerID = containerId;
this.nmHost = nmHost;
this.nmPort = nmPort;
this.nmHttpPort = nmHttpPort;
this.nodeContext = nodeContext;
this.state = DAGAppMasterState.NEW;
this.isSession = isSession;
this.workingDirectory = workingDirectory;
Expand All @@ -371,9 +367,6 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
.createRemoteUser(jobUserName);
this.appMasterUgi.addCredentials(amCredentials);

this.containerLogs = getRunningLogURL(this.nmHost + ":" + this.nmHttpPort,
this.containerID.toString(), this.appMasterUgi.getShortUserName());

LOG.info("Created DAGAppMaster for application " + applicationAttemptId
+ ", versionInfo=" + dagVersionInfo);
TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), "am");
Expand Down Expand Up @@ -443,6 +436,14 @@ protected void serviceInit(final Configuration conf) throws Exception {
this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);

if (!isLocal) {
this.containerLogs =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.containerLogs is making use of nmHttpPort and nmHost, moved it from constructor to serviceInit() and made nmHttpPort as local variable

getRunningLogURL(
nodeContext.getNodeHostString() + ":" + nodeContext.getNodeHttpPort(),
this.containerID.toString(),
this.appMasterUgi.getShortUserName());
}

UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf);

PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(isLocal, defaultPayload);
Expand Down Expand Up @@ -1207,15 +1208,7 @@ public ContainerId getAppContainerId() {
}

public String getAppNMHost() {
return nmHost;
}

public int getAppNMPort() {
return nmPort;
}

public int getAppNMHttpPort() {
return nmHttpPort;
return nodeContext.getNodeHostString();
}

public int getRpcPort() {
Expand Down Expand Up @@ -2415,13 +2408,15 @@ public static void main(String[] args) {
// Install the tez class loader, which can be used add new resources
TezClassLoader.setupTezClassLoader();
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
final String pid = System.getenv().get("JVM_PID");
Copy link
Contributor

@abstractdog abstractdog Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while I like using ProcessHandle instead of JVM_PID, I'm not sure if this is 100% correct
I'm concerned by the fact that JVM_PID is used in yarn too, see in hadoop code, e.g. https://github.com/apache/hadoop/blob/f49c49aa083b9324ae432902f94a7aaf98f543ea/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java#L126

also, not 100% sure if JVM_PID belongs to the actual Java process id or the launch_container.sh shell script provided by yarn: even if the shell script must be the parent of the jvm process, they are not the same

Copy link
Contributor Author

@Aggarwal-Raghav Aggarwal-Raghav Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll investigate on this and get back to you on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So i tested by putting both

LOG.info("######");
      LOG.info("System.getenv().get(\"JVM_PID\") value is: {}", old_pid);
      LOG.info("ProcessHandle.current().pid() value is : {}", pid);
      LOG.info("######");

the JVM_PID belongs to launch_container.sh and
ProcessHandler one is DagAppMaster

Screenshot confirming the same.
Screenshot 2026-03-04 at 11 52 14 PM
Screenshot 2026-03-04 at 11 49 46 PM

Will revert this change. Thanks for pointing this!!


String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.name());
String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.name());
String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
final String pid = System.getenv().get("JVM_PID");
String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
String clientVersion = System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV);
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
String pwd = System.getenv(ApplicationConstants.Environment.PWD.name());
String localDirs = System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name());
String logDirs = System.getenv(ApplicationConstants.Environment.LOG_DIRS.name());

if (clientVersion == null) {
clientVersion = VersionInfo.UNKNOWN;
}
Expand All @@ -2431,18 +2426,23 @@ public static void main(String[] args) {

Configuration conf = new Configuration();

AMExtensions amExtensions = getFrameworkService(conf).getAMExtensions();
ServerFrameworkService frameworkService = getFrameworkService(conf);
AMExtensions amExtensions = frameworkService.getAMExtensions();
DAGProtos.ConfigurationProto confProto = amExtensions.loadConfigurationProto();
TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList());

NodeContext nodeContext =
(frameworkService instanceof YarnServerFrameworkService)
? new YarnNodeManagerContext()
: null;

ContainerId containerId = amExtensions.allocateContainerId(conf);

ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
org.apache.hadoop.ipc.CallerContext.setCurrent(new org.apache.hadoop.ipc.CallerContext
.Builder("tez_appmaster_" + containerId.getApplicationAttemptId()
).build());
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());

// Command line options
Option option = Option.builder()
Expand All @@ -2462,9 +2462,9 @@ public static void main(String[] args) {
+ ", jvmPid=" + pid
+ ", userFromEnv=" + jobUserName
+ ", cliSessionOption=" + sessionModeCliOption
+ ", pwd=" + System.getenv(ApplicationConstants.Environment.PWD.name())
+ ", localDirs=" + System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())
+ ", logDirs=" + System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()));
+ ", pwd=" + pwd
+ ", localDirs=" + localDirs
+ ", logDirs=" + logDirs);

AMPluginDescriptorProto amPluginDescriptorProto = null;
if (confProto.hasAmPluginDescriptor()) {
Expand All @@ -2477,20 +2477,26 @@ public static void main(String[] args) {
TezUtilsInternal.setSecurityUtilConfigration(LOG, conf);

DAGAppMaster appMaster =
new DAGAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), new SystemClock(), appSubmitTime, sessionModeCliOption,
System.getenv(ApplicationConstants.Environment.PWD.name()),
TezCommonUtils.getTrimmedStrings(System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())),
TezCommonUtils.getTrimmedStrings(System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())),
clientVersion, credentials, jobUserName, amPluginDescriptorProto);
new DAGAppMaster(
applicationAttemptId,
containerId,
new SystemClock(),
appSubmitTime,
sessionModeCliOption,
pwd,
TezCommonUtils.getTrimmedStrings(localDirs),
TezCommonUtils.getTrimmedStrings(logDirs),
clientVersion,
credentials,
jobUserName,
amPluginDescriptorProto,
nodeContext);
ShutdownHookManager.get().addShutdownHook(new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);

// log the system properties
if (LOG.isInfoEnabled()) {
String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(conf);
if (systemPropsToLog != null) {
LOG.info(systemPropsToLog);
}
LOG.info(systemPropsToLog);
}

initAndStartAppMaster(appMaster, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
public class LocalDAGAppMaster extends DAGAppMaster {

public LocalDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId,
String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime, boolean isSession,
Clock clock, long appSubmitTime, boolean isSession,
String workingDirectory, String[] localDirs, String[] logDirs, String clientVersion,
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) {
super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto, NodeContext nodeContext) {
super(applicationAttemptId, containerId, clock, appSubmitTime,
isSession, workingDirectory, localDirs, logDirs, clientVersion, credentials, jobUserName,
pluginDescriptorProto);
pluginDescriptorProto, nodeContext);
}

@Override
Expand Down
44 changes: 44 additions & 0 deletions tez-dag/src/main/java/org/apache/tez/dag/app/LocalNodeContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app;

/** Local implementation of NodeContext. */
public final class LocalNodeContext implements NodeContext {

private final String nodeHostString;
private final int nodePort;
private final int nodeHttpPort;

public LocalNodeContext(String nodeHostString, int nodePortString, int nmHttpPort) {
this.nodeHostString = nodeHostString;
this.nodePort = nodePortString;
this.nodeHttpPort = nmHttpPort;
}

@Override
public String getNodeHostString() {
return nodeHostString;
}

@Override
public int getNodePort() {
return nodePort;
}

@Override
public int getNodeHttpPort() {
return nodeHttpPort;
}
}
34 changes: 34 additions & 0 deletions tez-dag/src/main/java/org/apache/tez/dag/app/NodeContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app;

/** Provides context information about the node on which the DAGAppMaster is running. */
public sealed interface NodeContext permits YarnNodeManagerContext, LocalNodeContext {

/**
* @return The node host string
*/
String getNodeHostString();

/**
* @return The node port string
*/
int getNodePort();

/**
* @return The node HTTP port string
*/
int getNodeHttpPort();
}
Loading