]
+Build the Apache Tez AM Docker image
+-help Display help
+-tez Build image with the specified Tez version
+-repo Docker repository
+EOF
+}
+
+while [ $# -gt 0 ]; do
+ case "$1" in
+ -h)
+ usage
+ exit 0
+ ;;
+ -tez)
+ shift
+ TEZ_VERSION=$1
+ shift
+ ;;
+ -repo)
+ shift
+ REPO=$1
+ shift
+ ;;
+ *)
+ shift
+ ;;
+ esac
+done
+
+SCRIPT_DIR=$(
+ cd "$(dirname "$0")"
+ pwd
+)
+
+DIST_DIR=${DIST_DIR:-"$SCRIPT_DIR/../../.."}
+PROJECT_ROOT=${PROJECT_ROOT:-"$SCRIPT_DIR/../../../.."}
+
+REPO=${REPO:-apache}
+WORK_DIR="$(mktemp -d)"
+
+# Defaults Tez versions from pom.xml if not provided
+TEZ_VERSION=${TEZ_VERSION:-$(mvn -f "$PROJECT_ROOT/pom.xml" -q help:evaluate -Dexpression=project.version -DforceStdout)}
+
+#####################################
+# Pick tez tarball from local build #
+#####################################
+TEZ_FILE_NAME="tez-$TEZ_VERSION.tar.gz"
+LOCAL_DIST_PATH="$DIST_DIR/target/$TEZ_FILE_NAME"
+
+if [ -f "$LOCAL_DIST_PATH" ]; then
+ echo "--> Found local Tez build artifact at: $LOCAL_DIST_PATH"
+ cp "$LOCAL_DIST_PATH" "$WORK_DIR/"
+else
+ echo "--> Error: Local Tez artifact not found at $LOCAL_DIST_PATH"
+ echo "--> Please build the project first (e.g., mvn clean install -DskipTests)."
+ exit 1
+fi
+
+# -------------------------------------------------------------------------
+# BUILD CONTEXT PREPARATION
+# -------------------------------------------------------------------------
+cp -R "$SCRIPT_DIR/conf" "$WORK_DIR/" 2>/dev/null || mkdir -p "$WORK_DIR/conf"
+cp "$SCRIPT_DIR/am-entrypoint.sh" "$WORK_DIR/"
+cp "$SCRIPT_DIR/Dockerfile.am" "$WORK_DIR/"
+
+echo "Building Docker image..."
+docker build \
+ "$WORK_DIR" \
+ -f "$WORK_DIR/Dockerfile.am" \
+ -t "$REPO/tez-am:$TEZ_VERSION" \
+ --build-arg "BUILD_ENV=unarchive" \
+ --build-arg "TEZ_VERSION=$TEZ_VERSION"
+
+rm -r "${WORK_DIR}"
+echo "Docker image $REPO/tez-am:$TEZ_VERSION built successfully."
diff --git a/tez-dist/src/docker/tez-am/conf/core-site.xml b/tez-dist/src/docker/tez-am/conf/core-site.xml
new file mode 100644
index 0000000000..3a41c6fd52
--- /dev/null
+++ b/tez-dist/src/docker/tez-am/conf/core-site.xml
@@ -0,0 +1,13 @@
+
+
+
+ fs.defaultFS
+ hdfs://namenode:9000
+
+
+
+ hadoop.tmp.dir
+ /data/tmp
+
+
+
diff --git a/tez-dist/src/docker/tez-am/conf/hdfs-site.xml b/tez-dist/src/docker/tez-am/conf/hdfs-site.xml
new file mode 100644
index 0000000000..554fbc0797
--- /dev/null
+++ b/tez-dist/src/docker/tez-am/conf/hdfs-site.xml
@@ -0,0 +1,58 @@
+
+
+
+ dfs.replication
+ 1
+
+
+
+ dfs.block.size
+ 67108864
+
+
+
+ dfs.namenode.name.dir
+ file:///data/namenode
+
+
+
+ dfs.datanode.data.dir
+ file:///data/datanode
+
+
+
+ dfs.namenode.rpc-bind-host
+ 0.0.0.0
+
+
+
+ dfs.datanode.address
+ 0.0.0.0:9866
+
+
+
+ dfs.datanode.http.address
+ 0.0.0.0:9864
+
+
+
+ dfs.client.use.datanode.hostname
+ true
+
+
+
+ dfs.datanode.use.datanode.hostname
+ true
+
+
+
+ dfs.permissions.enabled
+ false
+
+
+
+ dfs.datanode.hostname
+ datanode
+
+
+
diff --git a/tez-dist/src/docker/tez-am/conf/tez-site.xml b/tez-dist/src/docker/tez-am/conf/tez-site.xml
new file mode 100644
index 0000000000..ff2291657b
--- /dev/null
+++ b/tez-dist/src/docker/tez-am/conf/tez-site.xml
@@ -0,0 +1,60 @@
+
+
+
+
+
+
+
+ tez.am.client.am.port-range
+ 10001-10003
+
+
+
+ tez.am.tez-ui.webservice.enable
+ false
+
+
+
+
+ tez.am.zookeeper.quorum
+ zookeeper:2181
+
+
+
+ tez.am.log.level
+ INFO
+
+
+
+ tez.local.mode
+ true
+
+
+
+
+ tez.session.am.dag.submit.timeout.secs
+ -1
+
+
+
+
+ dfs.client.use.datanode.hostname
+ true
+
+
+
diff --git a/tez-dist/src/docker/tez-am/docker-compose.yml b/tez-dist/src/docker/tez-am/docker-compose.yml
new file mode 100644
index 0000000000..d71c521523
--- /dev/null
+++ b/tez-dist/src/docker/tez-am/docker-compose.yml
@@ -0,0 +1,143 @@
+---
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: tez-cluster
+
+services:
+ zookeeper:
+ image: zookeeper:3.8.4
+ container_name: zookeeper
+ hostname: zookeeper
+ networks:
+ - hadoop-network
+ ports:
+ - "2181:2181"
+ environment:
+ ZOO_MY_ID: 1
+ volumes:
+ - zookeeper_data:/data
+ - zookeeper_datalog:/datalog
+ - zookeeper_logs:/logs
+
+ namenode:
+ image: apache/hadoop:3.4.2-lean
+ container_name: namenode
+ hostname: namenode
+ platform: linux/amd64
+ networks:
+ - hadoop-network
+ ports:
+ - "9870:9870" # NameNode Web UI
+ - "9000:9000" # IPC
+ volumes:
+ - hadoop_data:/data # Default persistence path
+ - hadoop_logs:/opt/hadoop/logs
+ - ./conf/core-site.xml:/opt/hadoop/etc/hadoop/core-site.xml
+ - ./conf/hdfs-site.xml:/opt/hadoop/etc/hadoop/hdfs-site.xml
+ healthcheck:
+ test:
+ - "CMD-SHELL"
+ - >
+ su -s /bin/bash hadoop -c
+ '/opt/hadoop/bin/hdfs dfsadmin -safemode get | grep -q "Safe mode is OFF"'
+ interval: 5s
+ timeout: 5s
+ retries: 5
+ user: root
+ command: >
+ /bin/bash -c "
+ chown -R hadoop:hadoop /data /opt/hadoop/logs;
+ su -s /bin/bash hadoop -c '
+ if [ ! -f /data/namenode/current/VERSION ]; then
+ echo \"Formatting NameNode...\";
+ /opt/hadoop/bin/hdfs namenode -format -force -nonInteractive;
+ fi;
+ /opt/hadoop/bin/hdfs namenode
+ '"
+
+ datanode:
+ image: apache/hadoop:3.4.2-lean
+ container_name: datanode
+ hostname: datanode
+ platform: linux/amd64
+ depends_on:
+ namenode:
+ condition: service_healthy
+ networks:
+ - hadoop-network
+ ports:
+ - "9864:9864" # DataNode Web UI
+ - "9866:9866" # DataNode
+ volumes:
+ - hadoop_data:/data # Default persistence path
+ - hadoop_logs:/opt/hadoop/logs
+ - ./conf/core-site.xml:/opt/hadoop/etc/hadoop/core-site.xml
+ - ./conf/hdfs-site.xml:/opt/hadoop/etc/hadoop/hdfs-site.xml
+ user: root
+ command: >
+ /bin/bash -c "
+ chown -R hadoop:hadoop /data /opt/hadoop/logs;
+ su -s /bin/bash hadoop -c '/opt/hadoop/bin/hdfs datanode'
+ "
+
+ tez-am:
+ image: apache/tez-am:${TEZ_VERSION:-1.0.0-SNAPSHOT}
+ container_name: tez-am
+ hostname: tez-am
+ networks:
+ - hadoop-network
+ ports:
+ - "10001:10001"
+ # - "5005:5005" # Uncomment for remote debugging
+ env_file:
+ - ./am.env
+ # Already define TEZ_CUSTOM_CONF_DIR in the env file,
+ # but adding here for clarity
+ # environment:
+ # - TEZ_CUSTOM_CONF_DIR=/opt/tez/custom-conf
+ # Uncomment the following lines if you want to mount a custom
+ # tez-site.xml for the Tez AM
+ # volumes:
+ # - ./custom-tez-site.xml:/opt/tez/custom-conf/tez-site.xml
+ # Uncomment the following lines to mount custom plugins or JARs
+ # required by Tez AM (e.g., UDFs, or dependencies previously managed
+ # via YARN localization)
+ # - ./tez-plugins:/opt/tez/plugins
+ depends_on:
+ zookeeper:
+ condition: service_started
+ namenode:
+ condition: service_healthy
+ datanode:
+ condition: service_started
+
+networks:
+ hadoop-network:
+ name: hadoop-network
+ driver: bridge
+
+volumes:
+ hadoop_data:
+ name: hadoop_data
+ hadoop_logs:
+ name: hadoop_logs
+ zookeeper_data:
+ name: zookeeper_data
+ zookeeper_datalog:
+ name: zookeeper_datalog
+ zookeeper_logs:
+ name: zookeeper_logs
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/ExternalAmWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/ExternalAmWordCount.java
new file mode 100644
index 0000000000..151e8a68b2
--- /dev/null
+++ b/tez-examples/src/main/java/org/apache/tez/examples/ExternalAmWordCount.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.examples;
+
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sample Program inspired for WordCount but to run with External Tez AM with Zookeeper
+ */
+public class ExternalAmWordCount {
+ private static final Logger LOG = LoggerFactory.getLogger(ExternalAmWordCount.class);
+ private static final String ZK_ADDRESS = "zookeeper:2181";
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 2) {
+ System.err.println(
+ "Usage: java -cp com.github.raghav.ExternalAmWordCount ");
+ System.exit(2);
+ }
+
+ var inputPath = args[0];
+ var outputPath = args[1];
+
+ var conf = new Configuration();
+ var tezConf = new TezConfiguration(conf);
+
+ tezConf.set(TezConfiguration.TEZ_FRAMEWORK_MODE, "STANDALONE_ZOOKEEPER");
+ tezConf.set(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM, ZK_ADDRESS);
+ tezConf.set(TezConfiguration.TEZ_AM_CURATOR_SESSION_TIMEOUT, "30000ms");
+ tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, false);
+ tezConf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true);
+
+ // Prevent Tez from using the current directory for staging (avoids deleting your custom jar)
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, "/tmp/tez-staging");
+
+ LOG.info("Starting Tez Client with ZK Address: {}", ZK_ADDRESS);
+
+ var tezClient = TezClient.create("ExternalAmWordCount", tezConf, true);
+
+ try {
+ tezClient.start();
+ LOG.info("Waiting for Tez AM to register");
+ tezClient.waitTillReady();
+ LOG.info("Tez AM discovered! Submitting DAG...");
+
+ var app = new ExternalAmWordCount();
+ var dag = app.createDAG(tezConf, inputPath, outputPath);
+ var dagClient = tezClient.submitDAG(dag);
+
+ var dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
+
+ if (dagStatus.getState() == State.SUCCEEDED) {
+ LOG.info("DAG Succeeded");
+ System.exit(0);
+ } else {
+ LOG.error("DAG Failed with state: {}", dagStatus.getState());
+ System.exit(1);
+ }
+
+ } finally {
+ tezClient.stop();
+ }
+ }
+
+ private DAG createDAG(TezConfiguration tezConf, String inputPath, String outputPath) {
+ var dataSource =
+ MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath)
+ .build();
+
+ var dataSink =
+ MROutput.createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, outputPath)
+ .build();
+
+ var tokenizerVertex =
+ Vertex.create("Tokenizer", ProcessorDescriptor.create(TokenProcessor.class.getName()))
+ .addDataSource("Input", dataSource);
+
+ var summerVertex =
+ Vertex.create(
+ "Summer", ProcessorDescriptor.create(SumProcessor.class.getName()), 1) // 1 Reducer
+ .addDataSink("Output", dataSink);
+
+ var edgeConf =
+ OrderedPartitionedKVEdgeConfig.newBuilder(
+ Text.class.getName(), IntWritable.class.getName(), HashPartitioner.class.getName())
+ .setFromConfiguration(tezConf)
+ .build();
+
+ return DAG.create("ZkWordCountDAG")
+ .addVertex(tokenizerVertex)
+ .addVertex(summerVertex)
+ .addEdge(Edge.create(tokenizerVertex, summerVertex, edgeConf.createDefaultEdgeProperty()));
+ }
+
+ public static class TokenProcessor extends SimpleMRProcessor {
+ private static final IntWritable one = new IntWritable(1);
+ private final Text word = new Text();
+
+ public TokenProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ // Get inputs/outputs
+ var inputs = getInputs();
+ var outputs = getOutputs();
+
+ var reader = (org.apache.tez.mapreduce.lib.MRReader) inputs.get("Input").getReader();
+ var writer = (KeyValueWriter) outputs.get("Summer").getWriter();
+
+ while (reader.next()) {
+ var val = reader.getCurrentValue();
+ var line = val.toString();
+ var tokenizer = new StringTokenizer(line);
+
+ while (tokenizer.hasMoreTokens()) {
+ word.set(tokenizer.nextToken());
+ writer.write(word, one);
+ }
+ }
+ }
+ }
+
+ public static class SumProcessor extends SimpleMRProcessor {
+ public SumProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run() throws Exception {
+ var inputs = getInputs();
+ var outputs = getOutputs();
+
+ var reader = (KeyValuesReader) inputs.get("Tokenizer").getReader();
+ var writer = (KeyValueWriter) outputs.get("Output").getWriter();
+
+ while (reader.next()) {
+ var key = reader.getCurrentKey();
+ var values = reader.getCurrentValues();
+
+ int sum = 0;
+ for (var val : values) {
+ sum += ((IntWritable) val).get();
+ }
+ writer.write(key, new IntWritable(sum));
+ }
+ }
+ }
+}