-
Notifications
You must be signed in to change notification settings - Fork 440
TEZ-4689: Introduce Node abstraction for DAGAppMaster instead of sepagrate NodeManager-related fields #461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -209,16 +209,16 @@ | |
| * The state machine is encapsulated in the implementation of Job interface. | ||
| * All state changes happens via Job interface. Each event | ||
| * results in a Finite State Transition in Job. | ||
| * | ||
| * <p> | ||
| * Tez DAG AppMaster is the composition of loosely coupled services. The services | ||
| * interact with each other via events. The components resembles the | ||
| * Actors model. The component acts on received event and send out the | ||
| * events to other components. | ||
| * This keeps it highly concurrent with no or minimal synchronization needs. | ||
| * | ||
| * <p> | ||
| * The events are dispatched by a central Dispatch mechanism. All components | ||
| * register to the Dispatcher. | ||
| * | ||
| * <p> | ||
| * The information is shared across different components using AppContext. | ||
| */ | ||
|
|
||
|
|
@@ -245,9 +245,6 @@ public class DAGAppMaster extends AbstractService { | |
| private String appName; | ||
| private final ApplicationAttemptId appAttemptID; | ||
| private final ContainerId containerID; | ||
| private final String nmHost; | ||
| private final int nmPort; | ||
| private final int nmHttpPort; | ||
| private final String workingDirectory; | ||
| private final String[] localDirs; | ||
| private final String[] logDirs; | ||
|
|
@@ -309,6 +306,7 @@ public class DAGAppMaster extends AbstractService { | |
|
|
||
| private ListeningExecutorService execService; | ||
| private final PluginManager pluginManager; | ||
| private final NodeContext nodeContext; | ||
|
|
||
|
|
||
| /** | ||
|
|
@@ -344,20 +342,18 @@ public class DAGAppMaster extends AbstractService { | |
| private TezDAGHook[] hooks = {}; | ||
|
|
||
| public DAGAppMaster(ApplicationAttemptId applicationAttemptId, | ||
| ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, | ||
| ContainerId containerId, | ||
| Clock clock, long appSubmitTime, boolean isSession, String workingDirectory, | ||
| String [] localDirs, String[] logDirs, String clientVersion, | ||
| Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) { | ||
| Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto, NodeContext nodeContext) { | ||
| super(DAGAppMaster.class.getName()); | ||
| this.mdcContext = LoggingUtils.setupLog4j(); | ||
| this.clock = clock; | ||
| this.startTime = clock.getTime(); | ||
| this.appSubmitTime = appSubmitTime; | ||
| this.appAttemptID = applicationAttemptId; | ||
| this.containerID = containerId; | ||
| this.nmHost = nmHost; | ||
| this.nmPort = nmPort; | ||
| this.nmHttpPort = nmHttpPort; | ||
| this.nodeContext = nodeContext; | ||
| this.state = DAGAppMasterState.NEW; | ||
| this.isSession = isSession; | ||
| this.workingDirectory = workingDirectory; | ||
|
|
@@ -371,9 +367,6 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId, | |
| .createRemoteUser(jobUserName); | ||
| this.appMasterUgi.addCredentials(amCredentials); | ||
|
|
||
| this.containerLogs = getRunningLogURL(this.nmHost + ":" + this.nmHttpPort, | ||
| this.containerID.toString(), this.appMasterUgi.getShortUserName()); | ||
|
|
||
| LOG.info("Created DAGAppMaster for application " + applicationAttemptId | ||
| + ", versionInfo=" + dagVersionInfo); | ||
| TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), "am"); | ||
|
|
@@ -443,6 +436,14 @@ protected void serviceInit(final Configuration conf) throws Exception { | |
| this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, | ||
| TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); | ||
|
|
||
| if (!isLocal) { | ||
| this.containerLogs = | ||
| getRunningLogURL( | ||
| nodeContext.getNodeHostString() + ":" + nodeContext.getNodeHttpPort(), | ||
| this.containerID.toString(), | ||
| this.appMasterUgi.getShortUserName()); | ||
| } | ||
|
|
||
| UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf); | ||
|
|
||
| PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(isLocal, defaultPayload); | ||
|
|
@@ -1207,15 +1208,7 @@ public ContainerId getAppContainerId() { | |
| } | ||
|
|
||
| public String getAppNMHost() { | ||
| return nmHost; | ||
| } | ||
|
|
||
| public int getAppNMPort() { | ||
| return nmPort; | ||
| } | ||
|
|
||
| public int getAppNMHttpPort() { | ||
| return nmHttpPort; | ||
| return nodeContext.getNodeHostString(); | ||
| } | ||
|
|
||
| public int getRpcPort() { | ||
|
|
@@ -2415,13 +2408,15 @@ public static void main(String[] args) { | |
| // Install the tez class loader, which can be used add new resources | ||
| TezClassLoader.setupTezClassLoader(); | ||
| Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); | ||
| final String pid = System.getenv().get("JVM_PID"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. while I like using ProcessHandle instead of also, not 100% sure if JVM_PID belongs to the actual Java process id or the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll investigate on this and get back to you on this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So i tested by putting both the Screenshot confirming the same. Will revert this change. Thanks for pointing this!! |
||
|
|
||
| String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.name()); | ||
| String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.name()); | ||
| String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name()); | ||
| final String pid = System.getenv().get("JVM_PID"); | ||
| String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV); | ||
| String clientVersion = System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV); | ||
| String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name()); | ||
| String pwd = System.getenv(ApplicationConstants.Environment.PWD.name()); | ||
| String localDirs = System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()); | ||
| String logDirs = System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()); | ||
|
|
||
| if (clientVersion == null) { | ||
| clientVersion = VersionInfo.UNKNOWN; | ||
| } | ||
|
|
@@ -2431,18 +2426,23 @@ public static void main(String[] args) { | |
|
|
||
| Configuration conf = new Configuration(); | ||
|
|
||
| AMExtensions amExtensions = getFrameworkService(conf).getAMExtensions(); | ||
| ServerFrameworkService frameworkService = getFrameworkService(conf); | ||
| AMExtensions amExtensions = frameworkService.getAMExtensions(); | ||
| DAGProtos.ConfigurationProto confProto = amExtensions.loadConfigurationProto(); | ||
| TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList()); | ||
|
|
||
| NodeContext nodeContext = | ||
| (frameworkService instanceof YarnServerFrameworkService) | ||
| ? new YarnNodeManagerContext() | ||
| : null; | ||
|
|
||
| ContainerId containerId = amExtensions.allocateContainerId(conf); | ||
|
|
||
| ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); | ||
| org.apache.hadoop.ipc.CallerContext.setCurrent(new org.apache.hadoop.ipc.CallerContext | ||
| .Builder("tez_appmaster_" + containerId.getApplicationAttemptId() | ||
| ).build()); | ||
| long appSubmitTime = Long.parseLong(appSubmitTimeStr); | ||
| String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name()); | ||
|
|
||
| // Command line options | ||
| Option option = Option.builder() | ||
|
|
@@ -2462,9 +2462,9 @@ public static void main(String[] args) { | |
| + ", jvmPid=" + pid | ||
| + ", userFromEnv=" + jobUserName | ||
| + ", cliSessionOption=" + sessionModeCliOption | ||
| + ", pwd=" + System.getenv(ApplicationConstants.Environment.PWD.name()) | ||
| + ", localDirs=" + System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()) | ||
| + ", logDirs=" + System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())); | ||
| + ", pwd=" + pwd | ||
| + ", localDirs=" + localDirs | ||
| + ", logDirs=" + logDirs); | ||
|
|
||
| AMPluginDescriptorProto amPluginDescriptorProto = null; | ||
| if (confProto.hasAmPluginDescriptor()) { | ||
|
|
@@ -2477,20 +2477,26 @@ public static void main(String[] args) { | |
| TezUtilsInternal.setSecurityUtilConfigration(LOG, conf); | ||
|
|
||
| DAGAppMaster appMaster = | ||
| new DAGAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString), | ||
| Integer.parseInt(nodeHttpPortString), new SystemClock(), appSubmitTime, sessionModeCliOption, | ||
| System.getenv(ApplicationConstants.Environment.PWD.name()), | ||
| TezCommonUtils.getTrimmedStrings(System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())), | ||
| TezCommonUtils.getTrimmedStrings(System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())), | ||
| clientVersion, credentials, jobUserName, amPluginDescriptorProto); | ||
| new DAGAppMaster( | ||
| applicationAttemptId, | ||
| containerId, | ||
| new SystemClock(), | ||
| appSubmitTime, | ||
| sessionModeCliOption, | ||
| pwd, | ||
| TezCommonUtils.getTrimmedStrings(localDirs), | ||
| TezCommonUtils.getTrimmedStrings(logDirs), | ||
| clientVersion, | ||
| credentials, | ||
| jobUserName, | ||
| amPluginDescriptorProto, | ||
| nodeContext); | ||
| ShutdownHookManager.get().addShutdownHook(new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); | ||
|
|
||
| // log the system properties | ||
| if (LOG.isInfoEnabled()) { | ||
| String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(conf); | ||
| if (systemPropsToLog != null) { | ||
| LOG.info(systemPropsToLog); | ||
| } | ||
| LOG.info(systemPropsToLog); | ||
| } | ||
|
|
||
| initAndStartAppMaster(appMaster, conf); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license | ||
| * agreements. See the NOTICE file distributed with this work for additional information regarding | ||
| * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance with the License. You may obtain a | ||
| * copy of the License at | ||
| * | ||
| * <p>http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * <p>Unless required by applicable law or agreed to in writing, software distributed under the | ||
| * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
| * express or implied. See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.tez.dag.app; | ||
|
|
||
| /** Local implementation of NodeContext. */ | ||
| public final class LocalNodeContext implements NodeContext { | ||
|
|
||
| private final String nodeHostString; | ||
| private final int nodePort; | ||
| private final int nodeHttpPort; | ||
|
|
||
| public LocalNodeContext(String nodeHostString, int nodePortString, int nmHttpPort) { | ||
| this.nodeHostString = nodeHostString; | ||
| this.nodePort = nodePortString; | ||
| this.nodeHttpPort = nmHttpPort; | ||
| } | ||
|
|
||
| @Override | ||
| public String getNodeHostString() { | ||
| return nodeHostString; | ||
| } | ||
|
|
||
| @Override | ||
| public int getNodePort() { | ||
| return nodePort; | ||
| } | ||
|
|
||
| @Override | ||
| public int getNodeHttpPort() { | ||
| return nodeHttpPort; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license | ||
| * agreements. See the NOTICE file distributed with this work for additional information regarding | ||
| * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance with the License. You may obtain a | ||
| * copy of the License at | ||
| * | ||
| * <p>http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * <p>Unless required by applicable law or agreed to in writing, software distributed under the | ||
| * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
| * express or implied. See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.tez.dag.app; | ||
|
|
||
| /** Provides context information about the node on which the DAGAppMaster is running. */ | ||
| public sealed interface NodeContext permits YarnNodeManagerContext, LocalNodeContext { | ||
|
|
||
| /** | ||
| * @return The node host string | ||
| */ | ||
| String getNodeHostString(); | ||
|
|
||
| /** | ||
| * @return The node port string | ||
| */ | ||
| int getNodePort(); | ||
|
|
||
| /** | ||
| * @return The node HTTP port string | ||
| */ | ||
| int getNodeHttpPort(); | ||
| } |


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.containerLogsis making use ofnmHttpPortandnmHost, moved it from constructor toserviceInit()and made nmHttpPort as local variable