diff --git a/.gitignore b/.gitignore index b023d553..66e2d514 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,8 @@ /maint/codes.pc /test-driver .deps +src/surrogate/zmqml/demozmqmlrequester +src/surrogate/zmqml/libzmqmlrequester.so # make generated artifacts .dirstamp @@ -44,3 +46,4 @@ install-mastiff/include/codes/model-net-method.h /build* .cache compile_commands.json +__pycache__/ diff --git a/CODES-compile-instructions.sh b/CODES-compile-instructions.sh index 14178c99..b203d57d 100644 --- a/CODES-compile-instructions.sh +++ b/CODES-compile-instructions.sh @@ -1,9 +1,11 @@ -#!/usr/bin bash -x +#!/usr/bin/env bash +set -euo pipefail +set -x # Switches -swm_enable=1 -union_enable=1 -torch_enable=0 +swm_enable=0 +union_enable=0 +torch_enable=1 # Uncomment below for MPICH #export PATH=/usr/local/mpich-4.1.2/bin/:"$PATH" @@ -19,14 +21,33 @@ fi # What to compile CUR_DIR="$PWD" + + ##### Downloading everything ##### -git clone https://github.com/codes-org/codes --depth=100 --branch=v1.5.0 -git clone https://github.com/ross-org/ross --depth=100 --branch=v8.1.0 +if [ ! -d codes/.git ]; then + git clone https://github.com/codes-org/codes --depth=100 --branch=v1.5.0 +else + echo "Using existing codes checkout: $(realpath codes)" +fi + +if [ ! -d ross/.git ]; then + git clone https://github.com/ross-org/ross --depth=100 --branch=v8.1.0 +else + echo "Using existing ross checkout: $(realpath ross)" +fi if [ $swm_enable = 1 ]; then + if [ ! -d argobots/.git ]; then git clone https://github.com/pmodels/argobots --depth=1 +else + echo "Using existing argobots checkout: $(realpath argobots)" +fi + if [ ! -d swm-workloads/.git ]; then git clone https://github.com/codes-org/swm-workloads --branch=v1.2 +else + echo "Using existing swm-workloads checkout: $(realpath swm-workloads)" +fi fi if [ $union_enable = 1 ]; then @@ -34,15 +55,19 @@ if [ $union_enable = 1 ]; then curl -L https://sourceforge.net/projects/conceptual/files/conceptual/1.5.1b/conceptual-1.5.1b.tar.gz -o conceptual-1.5.1b.tar.gz tar xvf conceptual-1.5.1b.tar.gz # Downloading union - git clone https://github.com/SPEAR-UIC/Union + if [ ! -d Union/.git ]; then + git clone https://github.com/SPEAR-UIC/Union + else + echo "Using existing Union checkout: $(realpath Union)" + fi pushd Union && git checkout 99b3df3 && popd fi ##### COMPILING ##### -mkdir ross/build +mkdir -p ross/build pushd ross/build -cmake .. -DROSS_BUILD_MODELS=ON -DCMAKE_INSTALL_PREFIX="$(realpath ./bin)" \ +cmake .. -DCMAKE_C_COMPILER=mpicc -DCMAKE_CXX_COMPILER=mpicxx -DROSS_BUILD_MODELS=ON -DCMAKE_INSTALL_PREFIX="$(realpath ./bin)" \ -DCMAKE_C_COMPILER=mpicc -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS="-g -Wall" #make VERBOSE=1 make install -j4 @@ -53,7 +78,7 @@ popd if [ $swm_enable = 1 ]; then pushd swm-workloads/swm ./prepare.sh - mkdir build + mkdir -p build pushd build ../configure --disable-shared --prefix="$(realpath ./bin)" CC=mpicc CXX=mpicxx CFLAGS=-g CXXFLAGS=-g #make V=1 && make install @@ -64,7 +89,7 @@ if [ $swm_enable = 1 ]; then pushd argobots ./autogen.sh - mkdir build + mkdir -p build pushd build #../configure --enable-debug=all --disable-fast --disable-shared --prefix="$(realpath ./bin)" CC=mpicc CXX=mpicxx CFLAGS=-g CXXFLAGS=-g ../configure --disable-shared --prefix="$(realpath ./bin)" CC=mpicc CXX=mpicxx CFLAGS=-g CXXFLAGS=-g @@ -97,16 +122,186 @@ if [ $union_enable = 1 ]; then fi -mkdir codes/build + +# Make system pkg-config metadata visible even when Conda's pkg-config is active. +# This is needed for libzmq.pc on systems where ZeroMQ is installed through the OS +# but the active Conda environment's pkg-config only searches Conda pkgconfig dirs. +if ! pkg-config --exists libzmq 2>/dev/null; then + for pcdir in \ + /usr/lib/x86_64-linux-gnu/pkgconfig \ + /usr/lib64/pkgconfig \ + /usr/lib/pkgconfig \ + /usr/local/lib/pkgconfig \ + /usr/local/lib64/pkgconfig \ + /opt/homebrew/lib/pkgconfig \ + /usr/share/pkgconfig + do + if [ -d "$pcdir" ]; then + export PKG_CONFIG_PATH="$pcdir:${PKG_CONFIG_PATH:-}" + fi + done +fi + +if ! pkg-config --exists libzmq 2>/dev/null; then + echo "WARNING: pkg-config still cannot find libzmq.pc." >&2 + echo " If ZMQML fails to build, install the ZeroMQ development package" >&2 + echo " or set PKG_CONFIG_PATH to the directory containing libzmq.pc." >&2 +fi + +# Build local ZMQML requester library required by director-client.C +pushd codes/src/surrogate/zmqml +make clean +make +test -f libzmqmlrequester.so +test -f zmqmlrequester.h +popd + +# Make imported zmqmlrequester target visible to doc/example and tests. +python3 - <<'INNERPY' +from pathlib import Path +cm = Path("codes/src/CMakeLists.txt") +text = cm.read_text() +old = "add_library(zmqmlrequester SHARED IMPORTED )" +new = "add_library(zmqmlrequester SHARED IMPORTED GLOBAL)" +if old in text: + cm.write_text(text.replace(old, new)) +elif new in text: + pass +else: + raise SystemExit("Could not find zmqmlrequester imported target line in codes/src/CMakeLists.txt") +INNERPY + +mkdir -p codes/build pushd codes/build +torch_cmake_prefix="" +torch_dir="" + +if [ "$torch_enable" = 1 ]; then + torch_cmake_prefix="$(python3 - <<'INNERPY' +import torch +print(torch.utils.cmake_prefix_path) +INNERPY +)" + torch_dir="${torch_cmake_prefix}/Torch" + + if [ ! -f "${torch_dir}/TorchConfig.cmake" ]; then + echo "ERROR: TorchConfig.cmake not found at: ${torch_dir}/TorchConfig.cmake" >&2 + echo " torch.utils.cmake_prefix_path returned: ${torch_cmake_prefix}" >&2 + exit 1 + fi + + echo "Using Torch CMake prefix: ${torch_cmake_prefix}" + echo "Using Torch_DIR: ${torch_dir}" + + # CUDA is intentionally opt-in. + # Default to CPU-only Torch-JIT compilation unless CUDA_HOME is explicitly set. + # + # To enable CUDA, run for example: + # export CUDA_HOME=/usr/local/cuda-12.4 + # ./CODES-compile-instructions.sh + torch_cuda_version="$(python3 - <<'INNERPY' +import torch +print(torch.version.cuda or "") +INNERPY +)" + + cuda_arch="" + if [ -z "${CUDA_HOME:-}" ] && [ -n "${torch_cuda_version}" ]; then + echo "ERROR: CUDA_HOME is not set, so this script is defaulting to CPU-only Torch-JIT compilation." >&2 + echo " However, the active Python environment has a CUDA-enabled PyTorch build:" >&2 + echo " torch.version.cuda=${torch_cuda_version}" >&2 + echo "" >&2 + echo " CMake cannot use a CUDA-enabled PyTorch package as a CPU-only LibTorch package." >&2 + echo " Choose one of the following:" >&2 + echo " 1. For CPU-only compilation, install a CPU-only PyTorch build in this environment." >&2 + echo " 2. For CUDA compilation, export CUDA_HOME to your CUDA toolkit root." >&2 + echo "" >&2 + echo " Example CUDA build:" >&2 + echo " export CUDA_HOME=/usr/local/cuda-12.4" >&2 + echo " bash CODES-compile-instructions.sh" >&2 + exit 1 + fi + + if [ -n "${CUDA_HOME:-}" ]; then + if [ ! -f "${CUDA_HOME}/include/cuda_runtime_api.h" ]; then + echo "ERROR: CUDA_HOME is set, but missing CUDA header: ${CUDA_HOME}/include/cuda_runtime_api.h" >&2 + exit 1 + fi + + if [ ! -f "${CUDA_HOME}/lib64/libcudart.so" ] && [ ! -f "${CUDA_HOME}/lib/libcudart.so" ]; then + echo "ERROR: CUDA_HOME is set, but missing CUDA runtime library under ${CUDA_HOME}/lib64 or ${CUDA_HOME}/lib" >&2 + exit 1 + fi + + if [ ! -x "${CUDA_HOME}/bin/nvcc" ]; then + echo "ERROR: CUDA_HOME is set, but missing CUDA compiler: ${CUDA_HOME}/bin/nvcc" >&2 + exit 1 + fi + + if [ ! -d "${CUDA_HOME}/nvvm/libdevice" ]; then + echo "ERROR: CUDA_HOME is set, but missing CUDA libdevice directory: ${CUDA_HOME}/nvvm/libdevice" >&2 + exit 1 + fi + + if command -v nvidia-smi >/dev/null 2>&1; then + cuda_arch="$(nvidia-smi --query-gpu=compute_cap --format=csv,noheader 2>/dev/null | head -n1 | tr -d '.[:space:]' || true)" + fi + + if [ -z "${cuda_arch}" ]; then + echo "WARNING: Could not auto-detect GPU compute capability with nvidia-smi." >&2 + echo " Falling back to CMAKE_CUDA_ARCHITECTURES=80." >&2 + cuda_arch="80" + fi + + export CUDA_HOME + export CUDA_PATH="${CUDA_HOME}" + export CUDA_ROOT="${CUDA_HOME}" + export CUDA_TOOLKIT_ROOT_DIR="${CUDA_HOME}" + export CUDAToolkit_ROOT="${CUDA_HOME}" + export CUDACXX="${CUDA_HOME}/bin/nvcc" + export PATH="${CUDA_HOME}/bin:${PATH}" + export LD_LIBRARY_PATH="${CUDA_HOME}/lib64:${CUDA_HOME}/lib:${LD_LIBRARY_PATH:-}" + + echo "CUDA_HOME is set; enabling CUDA Torch-JIT compilation." + echo "Using CUDA_HOME: ${CUDA_HOME}" + echo "Using CUDACXX: ${CUDACXX}" + echo "Using CMAKE_CUDA_ARCHITECTURES=${cuda_arch}" + else + echo "CUDA_HOME is not set; forcing CPU-only Torch-JIT compilation." + + # Prevent accidental CUDA discovery from /usr/local/cuda, nvcc on PATH, + # inherited CMake cache variables, or CUDA-enabled PyTorch metadata. + unset CUDA_HOME + unset CUDA_PATH + unset CUDA_ROOT + unset CUDA_TOOLKIT_ROOT_DIR + unset CUDAToolkit_ROOT + unset CUDACXX + unset CMAKE_CUDA_COMPILER + fi +fi + +cmake_prefix_path="$(realpath "$CUR_DIR/ross/build/bin")" +if [ "$torch_enable" = 1 ]; then + cmake_prefix_path="${cmake_prefix_path};${torch_cmake_prefix}" +fi + make_args_codes=( - -DCMAKE_PREFIX_PATH="$(realpath "$CUR_DIR/ross/build/bin")" + -DCMAKE_PREFIX_PATH="${cmake_prefix_path}" -DCMAKE_CXX_COMPILER=mpicxx -DCMAKE_C_COMPILER=mpicc -DCMAKE_C_FLAGS="-g -Wall" -DCMAKE_CXX_FLAGS="-g -Wall" + -DTHREADS_PREFER_PTHREAD_FLAG=ON + -DCMAKE_THREAD_LIBS_INIT="-pthread" + -DCMAKE_HAVE_THREADS_LIBRARY=1 + -DCMAKE_USE_PTHREADS_INIT=1 + -DCMAKE_USE_WIN32_THREADS_INIT=0 -DCMAKE_BUILD_TYPE=Debug -DBUILD_TESTING=ON -DCMAKE_INSTALL_PREFIX="$(realpath bin)" + -DZMQML_BUILD_PATH="$(realpath "$CUR_DIR/codes/src/surrogate/zmqml")" + -DZeroMQ_INCLUDE_DIR=/usr/include + -DZeroMQ_LIBRARY=/usr/lib/x86_64-linux-gnu/libzmq.so ) if [ $swm_enable = 1 ]; then make_args_codes=( @@ -121,8 +316,32 @@ if [ $union_enable = 1 ]; then -DUNION_PKG_CONFIG_PATH="$(realpath "$CUR_DIR/Union/install/lib/pkgconfig")" ) fi -if [ $torch_enable = 1 ]; then - make_args_codes=("${make_args_codes[@]}" -DUSE_TORCH=true) +if [ "$torch_enable" = 1 ]; then + make_args_codes=( + "${make_args_codes[@]}" + -DUSE_TORCH=true + -DTorch_DIR="${torch_dir}" + ) + + if [ -n "${CUDA_HOME:-}" ]; then + make_args_codes=( + "${make_args_codes[@]}" + -DCUDA_TOOLKIT_ROOT_DIR="${CUDA_HOME}" + -DCUDAToolkit_ROOT="${CUDA_HOME}" + -DCUDA_PATH="${CUDA_HOME}" + -DCUDA_ROOT="${CUDA_HOME}" + -DCMAKE_CUDA_COMPILER="${CUDA_HOME}/bin/nvcc" + -DCMAKE_CUDA_ARCHITECTURES="${cuda_arch}" + -DCUDA_INCLUDE_DIRS="${CUDA_HOME}/include" + -DCUDA_CUDART_LIBRARY="${CUDA_HOME}/lib64/libcudart.so" + ) + else + make_args_codes=( + "${make_args_codes[@]}" + -DCMAKE_DISABLE_FIND_PACKAGE_CUDA=ON + -DCMAKE_DISABLE_FIND_PACKAGE_CUDAToolkit=ON + ) + fi else make_args_codes=("${make_args_codes[@]}" -DUSE_TORCH=false) fi diff --git a/codes/surrogate/director-client.h b/codes/surrogate/director-client.h new file mode 100644 index 00000000..1e663e85 --- /dev/null +++ b/codes/surrogate/director-client.h @@ -0,0 +1,105 @@ +#ifndef __DIRECTOR_CLIENT_H_DEFINED__ +#define __DIRECTOR_CLIENT_H_DEFINED__ + +#include +#include "codes/codes_mapping.h" + + + +#define NUM_DIR_TO_NW_EVENT 20 + + +enum SIMULATION_MODE +{ + SIM_MODE_PDES=1, + SIM_MODE_ITERATION_SURROGATE, +}; + + +typedef struct director_message director_message; +typedef struct director_annotation director_annotation; + +enum DIR_EVENTS +{ + DIR_AN_ITER_MARK=1, + DIR_OP_NW, + DIR_REGISTERED_EVENT__SWITCH_TO_SURR, + DIR_REGISTERED_EVENT__SWITCH_TO_PDES, + DIR_REGISTERED_EVENT__MOVE_TO_NEXT, +}; + +enum DIR_OPERATIONS //currently unused +{ + DIR_AN_WK_START=1, + DIR_AN_WK_ITERATION_END, + DIR_AN_WK_END, + DIR_OP_SEND, + DIR_OP_RECV, +}; + + +// director event message struct +struct director_message +{ + int msg_type; + int op_type; + int num_rngs; + int value; + //model_net_event_return event_rc; + //struct codes_workload_op * mpi_op; + + void *buffer; // this pointer MUST be at the end of the structure +}; + +// director annotation struct +struct director_annotation +{ + int an_type; + int an_value; +}; + + +#ifdef __cplusplus +extern "C" +{ +#endif + + +/** + * @brief Prepares a request to send to client with the specified command and arguments, + * receives a reply + + * @param cmd zmqml request command: 'query', 'launch', execute', send', 'nothing', 'exit' + * @param args the arguments for launch and execute + * @param bindata binary data from send + * @param surrdata containing the 'status' field and optionally 'et' and 'id'. + * 'status' is not present, returns a vector with "failed". + * Fromat is ":;:;..." + * + */ + +//extern char* dir_client_request(const char* cmd, +// const char* args, +// const char* data); + + +extern void director_lp_register_model(const char *); + + +/* +extern void director_parse_args(char *args, int **args_array, int *length); +static void director_issue_codes_event(director_state * s, tw_lpid nw_lpid, int dir_registered_event_type, tw_stime ts, tw_lp* lp); +extern void director_register_events(director_state * s, director_message * msg, tw_lp * lp); +extern void dir_test_init(director_state* s, tw_lp* lp); +extern void director_prepare_iteration_dataset(director_state* s, tw_stime * training_data, int training_cycle, int training_records); +extern void director_get_surrogate_prediction(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp, tw_stime* delay_ts); +extern void dir_test_event_handler(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp); +extern void dir_test_finalize(director_state* s, tw_lp* lp); +*/ + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/codes/surrogate/packet-latency-predictor/average.h b/codes/surrogate/packet-latency-predictor/average.h index f793bfa3..7c72eefd 100644 --- a/codes/surrogate/packet-latency-predictor/average.h +++ b/codes/surrogate/packet-latency-predictor/average.h @@ -18,6 +18,8 @@ extern "C" { extern double ignore_until; +void average_latency_predictor_set_debug_prints(int enabled); + struct packet_latency_predictor average_latency_predictor(int num_terminals); #ifdef __cplusplus diff --git a/codes/surrogate/packet-latency-predictor/common.h b/codes/surrogate/packet-latency-predictor/common.h index 3faa7bff..a84e20c1 100644 --- a/codes/surrogate/packet-latency-predictor/common.h +++ b/codes/surrogate/packet-latency-predictor/common.h @@ -28,6 +28,18 @@ struct packet_start { double processing_packet_delay; // delay for this packet to be processed from previous packet in the queue uint32_t packet_size; bool is_there_another_pckt_in_queue; // is there another packet in queue + uint64_t caller_lp_gid; // true ROSS LP gid of the source terminal LP + + /* + * Optional ML-facing context for LP-aware Torch-JIT mode. + * Existing predictors may ignore these fields. + */ + uint32_t src_router_id; + uint32_t src_group_id; + uint32_t dst_router_id; + uint32_t dst_group_id; + uint32_t terminal_queue_length; + uint32_t terminal_vc_occupancy; }; struct packet_end { diff --git a/codes/surrogate/packet-latency-predictor/torch-jit.h b/codes/surrogate/packet-latency-predictor/torch-jit.h index 80e532a5..16fe94a1 100644 --- a/codes/surrogate/packet-latency-predictor/torch-jit.h +++ b/codes/surrogate/packet-latency-predictor/torch-jit.h @@ -2,14 +2,43 @@ #define CODES_SURROGATE_TORCHJIT_H #include +#include #include "codes/surrogate/init.h" #ifdef __cplusplus extern "C" { #endif +void surrogate_torch_set_lp_aware_mode(bool enabled); +void surrogate_torch_set_debug_prints(bool enabled); void surrogate_torch_init(char const * dir); +struct router_timing_prediction_start { + float router_id; + float group_id; + float output_port; + float output_chan; + float to_terminal; + float is_global; + float packet_size; + float chunk_size; + float output_vc_occupancy; + float output_queued_count; + float next_output_available_delta; + float nominal_router_delay; +}; + +void surrogate_torch_init_lp_type_models( + char const *terminal_model_path, + char const *router_timing_model_path, + char const *default_model_path); + +bool surrogate_torch_router_timing_model_enabled(void); + +double surrogate_torch_predict_router_queueing_delay( + struct router_timing_prediction_start const *start, + double fallback_queueing_delay); + extern struct packet_latency_predictor torch_latency_predictor; #ifdef __cplusplus diff --git a/doc/example/CMakeLists.txt b/doc/example/CMakeLists.txt index f665d234..1fceb9dd 100644 --- a/doc/example/CMakeLists.txt +++ b/doc/example/CMakeLists.txt @@ -11,6 +11,9 @@ endforeach() # Saving default config files to run experiments with configure_file(tutorial-ping-pong.conf.in tutorial-ping-pong.template.conf.in @ONLY) configure_file(tutorial-ping-pong-surrogate.conf.in tutorial-ping-pong-surrogate.template.conf.in @ONLY) +configure_file(kb.dfdally-72-zeromq-director.conf.in kb.dfdally-72-zeromq-director.template.conf.in @ONLY) +configure_file(kb.dfdally-72-milc-small.workload.conf.in kb.dfdally-72-milc-small.workload.template.conf.in @ONLY) +configure_file(kb.dfdally-72-zeromq-director.conf.in kb.dfdally-72-zeromq-director.template.conf.in @ONLY) set(single_quote "'") set(double_quote "\"") @@ -22,5 +25,15 @@ set(PACKET_LATENCY_TRACE_PATH "packet-latency-trace/") set(IGNORE_UNTIL "200e4") set(PREDICTOR_TYPE "average") string(REPLACE ${single_quote} ${double_quote} SWITCH_TIMESTAMPS "'1000e4', '8900e4'") +set(DIRECTOR_START_ITER "3") +set(DIRECTOR_END_ITER "8") +set(INFERENCING_ENABLED "0") +set(SURROGATE_ENABLED "1") +set(TRAINING_ENABLED "1") configure_file(tutorial-ping-pong.conf.in tutorial-ping-pong.conf) configure_file(tutorial-ping-pong-surrogate.conf.in tutorial-ping-pong-surrogate.conf) +configure_file(kb.dfdally-72-zeromq-director.conf.in kb.dfdally-72-zeromq-director.conf) +configure_file(kb.dfdally-72-milc-small.workload.conf.in kb.dfdally-72-milc-small.workload.conf) +configure_file(kb.dfdally-72-milc-small.json kb.dfdally-72-milc-small.json COPYONLY) +configure_file(kb.dfdally-72-milc-small.alloc.conf kb.dfdally-72-milc-small.alloc.conf COPYONLY) +configure_file(kb.dfdally-72-zeromq-director.conf.in kb.dfdally-72-zeromq-director.conf) diff --git a/doc/example/kb.dfdally-72-milc-small.alloc.conf b/doc/example/kb.dfdally-72-milc-small.alloc.conf new file mode 100644 index 00000000..c4177fe9 --- /dev/null +++ b/doc/example/kb.dfdally-72-milc-small.alloc.conf @@ -0,0 +1 @@ +0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 diff --git a/doc/example/kb.dfdally-72-milc-small.json b/doc/example/kb.dfdally-72-milc-small.json new file mode 100644 index 00000000..4c9d5e4e --- /dev/null +++ b/doc/example/kb.dfdally-72-milc-small.json @@ -0,0 +1,16 @@ +{ + "jobs": { + "size": 72, + "cfg": { + "app": "milc", + "iteration_cnt": 10, + "compute_delay": 100, + "dimension_cnt": 4, + "dimension_sizes": [2, 2, 3, 6], + "msg_size": 4096, + "max_dimension_distance": 1, + "randomize_communication_order": false, + "cpu_freq": 4000000000.0 + } + } +} diff --git a/doc/example/kb.dfdally-72-milc-small.workload.conf.in b/doc/example/kb.dfdally-72-milc-small.workload.conf.in new file mode 100644 index 00000000..e371c47b --- /dev/null +++ b/doc/example/kb.dfdally-72-milc-small.workload.conf.in @@ -0,0 +1 @@ +72 @CMAKE_CURRENT_BINARY_DIR@/kb.dfdally-72-milc-small.json 1 0 diff --git a/doc/example/kb.dfdally-72-zeromq-director.conf.in b/doc/example/kb.dfdally-72-zeromq-director.conf.in new file mode 100644 index 00000000..7441bc5a --- /dev/null +++ b/doc/example/kb.dfdally-72-zeromq-director.conf.in @@ -0,0 +1,107 @@ +# ZeroMQ Director example for model-net-mpi-replay on a 72-terminal +# dragonfly-dally network. +# +# This example is self-contained inside the CODES repository. It uses: +# +# doc/example/kb.dfdally-72-zeromq-director.conf +# doc/example/kb.dfdally-72-milc-small.workload.conf +# doc/example/kb.dfdally-72-milc-small.json +# doc/example/kb.dfdally-72-milc-small.alloc.conf + +LPGROUPS +{ + MODELNET_GRP + { + repetitions="36"; + nw-lp="2"; + dir-nw-lp="2"; + + modelnet_dragonfly_dally="2"; + modelnet_dragonfly_dally_router="1"; + } +} + +DIRECTOR +{ + start_iter="${DIRECTOR_START_ITER}"; + end_iter="${DIRECTOR_END_ITER}"; + + # Common modes: + # + # Pure PDES: + # inferencing_enabled="0"; + # surrogate_enabled="0"; + # training_enabled="0"; + # + # Training/data collection: + # inferencing_enabled="0"; + # surrogate_enabled="1"; + # training_enabled="1"; + # + # Inference only: + # inferencing_enabled="1"; + # surrogate_enabled="1"; + # training_enabled="0"; + # + # Train + inference: + # inferencing_enabled="1"; + # surrogate_enabled="1"; + # training_enabled="1"; + + inferencing_enabled="${INFERENCING_ENABLED}"; + surrogate_enabled="${SURROGATE_ENABLED}"; + training_enabled="${TRAINING_ENABLED}"; +} + +PARAMS +{ + message_size="824"; + + packet_size="${PACKET_SIZE}"; + chunk_size="${CHUNK_SIZE}"; + + num_routers="4"; + num_groups="9"; + num_row_chans="1"; + num_col_chans="1"; + num_cns_per_router="2"; + num_global_channels="2"; + + cn_bandwidth="5.25"; + local_bandwidth="5.25"; + global_bandwidth="5.25"; + + cn_vc_size="32768"; + local_vc_size="16384"; + global_vc_size="16384"; + + cn_delay="10"; + local_delay="10"; + global_delay="100"; + router_delay="300"; + nic_seq_delay="0"; + + cn_credit_delay="10"; + local_credit_delay="10"; + global_credit_delay="100"; + + num_qos_levels="1"; + bw_reset_window="250000.0"; + max_qos_monitor="5500000000"; + qos_bucket_max="10"; + qos_min_bws="5,30,20,5"; + qos_max_bws="10,80,60,20"; + + routing="prog-adaptive"; + adaptive_threshold="2560"; + route_scoring_metric="alpha"; + route_scoring_factors="2"; + route_scoring_factors_local_intm="2"; + route_scoring_factors_local_dest="2"; + + modelnet_order=("dragonfly_dally","dragonfly_dally_router"); + modelnet_scheduler="round-robin"; + + intra-group-connections="@CMAKE_SOURCE_DIR@/src/network-workloads/conf/dragonfly-dally/dfdally-72-intra"; + inter-group-connections="@CMAKE_SOURCE_DIR@/src/network-workloads/conf/dragonfly-dally/dfdally-72-inter"; +} diff --git a/doc/example/tutorial-ping-pong-surrogate.conf.in b/doc/example/tutorial-ping-pong-surrogate.conf.in index fd53f4d1..8040d921 100644 --- a/doc/example/tutorial-ping-pong-surrogate.conf.in +++ b/doc/example/tutorial-ping-pong-surrogate.conf.in @@ -55,6 +55,9 @@ PARAMS routing="prog-adaptive"; # folder path to store packet latency from terminal to terminal, if no value is given it won't save anything save_packet_latency_path="${PACKET_LATENCY_TRACE_PATH}"; +# folder path to store router-local timing rows for router queueing-delay training + save_router_timing_trace_path="${ROUTER_TIMING_TRACE_PATH}"; + save_router_timing_trace_stride="${ROUTER_TIMING_TRACE_STRIDE}"; # router buffer occupancy snapshots router_buffer_snapshots=( ${BUFFER_SNAPSHOTS} ); } @@ -76,8 +79,18 @@ NETWORK_SURROGATE { ignore_until="${IGNORE_UNTIL}"; # parameters for torch-jit latency predictor - torch_jit_mode="single-static-model-for-all-terminals"; +# accepted modes: +# single-static-model-for-all-terminals: uses torch_jit_model_path, input [1,4], output [1,2] +# lp-aware-single-static-model: uses torch_jit_model_path, input [1,12], output [1,2] +# lp-aware-lp-type-models: uses the terminal/router/default paths below + torch_jit_mode="${TORCH_JIT_MODE}"; torch_jit_model_path="${TORCH_JIT_MODEL_PATH}"; + torch_jit_terminal_model_path="${TORCH_JIT_TERMINAL_MODEL_PATH}"; + torch_jit_router_timing_model_path="${TORCH_JIT_ROUTER_TIMING_MODEL_PATH}"; + torch_jit_default_model_path="${TORCH_JIT_DEFAULT_MODEL_PATH}"; + +# temporary surrogate debug prints. Options: 0 or 1 + debug_prints="${SURROGATE_DEBUG_PRINTS}"; # selecting network treatment on switching to surrogate. Options: frezee, nothing network_treatment_on_switch="${NETWORK_TREATMENT}"; diff --git a/doc/example/tutorial-ping-pong.conf.in b/doc/example/tutorial-ping-pong.conf.in index e8e2ce4e..042ee417 100644 --- a/doc/example/tutorial-ping-pong.conf.in +++ b/doc/example/tutorial-ping-pong.conf.in @@ -54,4 +54,6 @@ PARAMS router_buffer_snapshots=( ${BUFFER_SNAPSHOTS} ); # folder path to store packet latency from terminal to terminal, if no value is given it won't save anything save_packet_latency_path="${PACKET_LATENCY_TRACE_PATH}"; + save_router_timing_trace_path="${ROUTER_TIMING_TRACE_PATH}"; + save_router_timing_trace_stride="${ROUTER_TIMING_TRACE_STRIDE}"; } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 08950e7f..c4152b80 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -62,6 +62,7 @@ list(APPEND SRCS surrogate/app-iteration-predictor/average.c surrogate/packet-latency-predictor/common.c surrogate/packet-latency-predictor/average.c + surrogate/director-client.C iokernellang/codesparser.h iokernellang/codesparser.c @@ -158,6 +159,12 @@ if(USE_ONLINE) endif() endif() +# ZMQML +add_library(zmqmlrequester SHARED IMPORTED GLOBAL) +set_target_properties(zmqmlrequester PROPERTIES + IMPORTED_LOCATION "${ZMQML_BUILD_PATH}/libzmqmlrequester.so" + INTERFACE_INCLUDE_DIRECTORIES "${ZMQML_BUILD_PATH}") + #LINK ROSS # target_link_libraries(codes PUBLIC #{pkgcfg_lib_ROSS_ROSS}) # target_link_libraries(codes PUBLIC PkgConfig::ROSS) @@ -169,6 +176,7 @@ target_include_directories(codes PUBLIC ${PROJECT_SOURCE_DIR}/codes ${PROJECT_SOURCE_DIR}/src ${PROJECT_SOURCE_DIR}/src/modelconfig + $ ) target_link_libraries(codes PUBLIC ${LIBS_TO_LINK}) @@ -201,9 +209,14 @@ if(USE_DUMPI) list(APPEND CODES_TARGETS model-net-dumpi-traces-dump) endif() +# ZMQ +pkg_check_modules(PC_ZeroMQ QUIET zmq) +find_path(ZeroMQ_INCLUDE_DIR NAMES zmq.hpp PATHS ${PC_ZeroMQ_INCLUDE_DIRS}) +find_library(ZeroMQ_LIBRARY NAMES zmq PATHS ${PC_ZeroMQ_LIBRARY_DIRS}) + foreach(tar IN LISTS CODES_TARGETS) target_include_directories(${tar} PUBLIC ${CODES_INCLUDE_DIRS} ${ROSS_INCLUDE_DIRS}) - target_link_libraries(${tar} PUBLIC codes ${LIBS_TO_LINK}) + target_link_libraries(${tar} PUBLIC codes ${LIBS_TO_LINK} zmqmlrequester ${ZeroMQ_LIBRARY}) endforeach() diff --git a/src/network-workloads/model-net-mpi-replay.c b/src/network-workloads/model-net-mpi-replay.c index 018c4337..df28d9f5 100644 --- a/src/network-workloads/model-net-mpi-replay.c +++ b/src/network-workloads/model-net-mpi-replay.c @@ -23,6 +23,21 @@ #include "codes/surrogate/init.h" #include "surrogate/app-iteration-predictor/common.h" + +#include "codes/surrogate/director-client.h" + +/* ========================================================== + START OF Director Code (To be moved to separate files) + ========================================================== +*/ + + +//struct director_config_struct dir_config_global; + + + + + /* turning on track lp will generate a lot of output messages */ #define DBG_COMM 1 #define MN_LP_NM "modelnet_dragonfly_custom" @@ -208,6 +223,8 @@ enum MPI_NW_EVENTS CLI_OTHER_FINISH, //received when another workload has finished // Surrogate events SURR_SKIP_ITERATION, // skips one (several) iteration(s) of simulation + CODES_CMD_SWITCH_TO_SURR, // transition simulation from PDES -> Surrogate + CODES_CMD_SWITCH_TO_PDES, // transition simulation from Surrogate -> PDES }; /* type of synthetic traffic */ @@ -393,6 +410,11 @@ struct nw_state char output_buf[512]; char col_stats[64]; struct ross_model_sample ross_sample; + + /* For hybrid simulations with DIRECTOR and surrogate */ + int director_enabled; + tw_lpid director_lpid; + int simulation_mode; }; /* data for handling reverse computation. @@ -498,6 +520,73 @@ struct nw_message } rc; }; + +/* ========================================================== + START of model-net replay DIRECTOR interface + ========================================================== +*/ + + +void codes_register_director_events(nw_state* s, int dir_event_type, int nw_event_type, size_t event_value, tw_lp* lp) +{ + tw_event *e; + director_message *msg; + + nw_message registered_nw_msg; + registered_nw_msg.msg_type = nw_event_type; + registered_nw_msg.op_type = event_value; + registered_nw_msg.fwd.app_id = s->nw_id; + //registered_nw_msg.fwd.data_type = 9999; + + tw_stime ts = 0.001; + + //printf("==DIR[%d] NW Registering dir_event_type:%d | nw_event_type:%d (time: %lf)\n", + // s->nw_id, dir_event_type, nw_event_type, tw_now(lp)); + + //printf("==DIR: ts: %lf\n", ts); + if (sizeof(nw_message) + sizeof(director_message) > g_tw_msg_sz){ + tw_error(TW_LOC, "Error: NW-director trying to transmit an event of size " + "%d but ROSS is configured for events of size %zd\n", + sizeof(nw_message) + sizeof(director_message), g_tw_msg_sz); + } + + e = tw_event_new(s->director_lpid, ts, lp); + msg = (director_message*)tw_event_data(e); + msg->msg_type = dir_event_type; + msg->op_type = nw_event_type; + msg->value = sizeof(nw_message); + + memcpy(&msg->buffer, ®istered_nw_msg, msg->value); + + tw_event_send(e); +} + +/* Trigger Director Event from within network model*/ +static void codes_issue_director_event(tw_lp* lp, tw_lpid director_lpid, int dir_event_type, int value) +{ + + tw_event *e; + struct director_message* msg; + + tw_stime ts; + + ts = .0001; // Todo: maybe this should be dynamically adjustable + + e = tw_event_new(director_lpid, ts, lp ); + msg = (director_message*)tw_event_data(e); + + msg->msg_type = dir_event_type; + msg->value = value; + tw_event_send(e); +} +/* ========================================================== + END of model-net replay DIRECTOR interface + ========================================================== +*/ + + + + static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op, int matched_req); static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp); @@ -2489,6 +2578,28 @@ void nw_test_init(nw_state* s, tw_lp* lp) // then this will error out s->wrkld_id = -1; + /* START of DIRECTOR setup + */ + s->director_enabled = 0; + s->simulation_mode = SIM_MODE_PDES; + int num_dir_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "dir-nw-lp", NULL, 0); + if(num_dir_per_mgrp > 0){ + s->director_enabled = 1; + codes_mapping_get_lp_id("MODELNET_GRP", "dir-nw-lp", NULL, 1, s->nw_id / num_dir_per_mgrp, s->nw_id % num_dir_per_mgrp, &s->director_lpid); + + // register callbacks with director + codes_register_director_events(s, DIR_REGISTERED_EVENT__SWITCH_TO_SURR, CODES_CMD_SWITCH_TO_SURR, sizeof(nw_message), lp); + codes_register_director_events(s, DIR_REGISTERED_EVENT__SWITCH_TO_PDES, CODES_CMD_SWITCH_TO_PDES, sizeof(nw_message), lp); + codes_register_director_events(s, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, MPI_OP_GET_NEXT, sizeof(nw_message), lp); + //printf("\n==DIRNW s->nw_id: %d | lp->gid: %llu | s>director_lpid: %llu", s->nw_id, LLU(lp->gid), LLU(s->director_lpid)); + + } + else{ + //printf("\n==NW[%d] DIRECTOR: No director LPs found (dir-nw-lp)\n", s->nw_id); + } + /* + * END of DIRECTOR setup */ + char type_name[512]; if(!num_net_traces) @@ -2843,6 +2954,8 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) break; case MPI_OP_GET_NEXT: + //if(m->fwd.data_type == 9999) + // printf("===[%llu] N-MARK[%llu]: Value=%d\n", lp->gid, 1, 9999); get_next_mpi_operation(s, bf, m, lp); break; @@ -2875,6 +2988,22 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) case SURR_SKIP_ITERATION: skip_to_iteration(s, lp, bf, m); break; + + /* START of Sim. transition events sent by DIRECTOR */ + case CODES_CMD_SWITCH_TO_SURR: + if (s->simulation_mode == SIM_MODE_PDES) { + s->simulation_mode = SIM_MODE_ITERATION_SURROGATE; + get_next_mpi_operation(s, bf, m, lp); + } + break; + + case CODES_CMD_SWITCH_TO_PDES: + if (s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) { + s->simulation_mode = SIM_MODE_PDES; + get_next_mpi_operation(s, bf, m, lp); + } + break; + /* END of Sim. transition event sent by DIRECTOR */ } } @@ -3019,6 +3148,29 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l // printf("Client rank %llu completed workload, local rank %d .\n", s->nw_id, s->local_rank); return; + } + if(mpi_op->op_type == CODES_WK_MARK) // If annotation type + {// TODO: extend this section to include checks for all annotations + m->rc.mpi_next.mark.saved_marker_time = tw_now(lp); + + if(s->director_enabled == 1) + { + //printf("===DIR: Value=%d\n", mpi_op->u.send.tag); + codes_issue_director_event(lp, s->director_lpid, DIR_AN_ITER_MARK, mpi_op->u.send.tag); + //printf("===[%llu] N-MARK[%llu]: Value=%d\n", lp->gid, s->director_lpid, mpi_op->u.send.tag); + + return; + }else{ + codes_issue_next_event(lp); + return; + } + } else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) // Else non-annotation type + { + if(s->director_enabled == 1) + { + codes_issue_director_event(lp, s->director_lpid, DIR_OP_NW, mpi_op->op_type); + return; + } } switch(mpi_op->op_type) { @@ -4255,6 +4407,8 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv ) if (g_st_ev_trace || g_st_model_stats || g_st_use_analysis_lps) nw_lp_register_model(); + director_lp_register_model("dir-nw-lp"); + net_ids = model_net_configure(&num_nets); // assert(num_nets == 1); net_id = *net_ids; diff --git a/src/networks/model-net/dragonfly-dally.C b/src/networks/model-net/dragonfly-dally.C index 91befa1b..cefff9c0 100644 --- a/src/networks/model-net/dragonfly-dally.C +++ b/src/networks/model-net/dragonfly-dally.C @@ -22,6 +22,9 @@ #include "codes/model-net-method.h" #include "codes/model-net-lp.h" #include "codes/surrogate/init.h" +#ifdef USE_TORCH +#include "codes/surrogate/packet-latency-predictor/torch-jit.h" +#endif #include "codes/net/dragonfly-dally.h" #include "quicklist.h" #include "sys/file.h" @@ -102,6 +105,14 @@ static long global_stalled_chunk_counter = 0; #define OUTPUT_SNAPSHOT 1 static int num_snapshots = 0; + +static uint64_t dfdally_post_switch_terminal_events = 0; +static uint64_t dfdally_post_switch_router_events = 0; +static double dfdally_last_post_switch_terminal_now = -1.0; +static double dfdally_last_post_switch_router_now = -1.0; +static bool dfdally_surrogate_debug_prints = false; +static bool dfdally_surrogate_has_switched_once = false; + tw_stime * snapshot_times; char snapshot_filename[128]; @@ -191,6 +202,12 @@ static char router_sample_file[MAX_NAME_LENGTH]; static FILE * packet_latency_f = NULL; static void setup_packet_latency_path(char const * const dir_to_save); +// File to store router-local timing rows for training router queueing-delay models. +static FILE * router_timing_trace_f = NULL; +static int router_timing_trace_stride = 1; +static unsigned long long router_timing_trace_counter = 0; +static void setup_router_timing_trace_path(char const * const dir_to_save); + // ==== START OF Parameters to tune surrogate mode ==== // @@ -2477,6 +2494,30 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params) setup_packet_latency_path(packet_latency_path); } + // Router timing trace path to store pure-PDES router-local queueing-delay training rows. + char router_timing_trace_path[MAX_NAME_LENGTH]; + router_timing_trace_path[0] = '\0'; + configuration_get_value(&config, "PARAMS", "save_router_timing_trace_path", anno, + router_timing_trace_path, MAX_NAME_LENGTH); + if(strlen(router_timing_trace_path) > 0) { + setup_router_timing_trace_path(router_timing_trace_path); + } + + char router_timing_trace_stride_str[MAX_NAME_LENGTH]; + router_timing_trace_stride_str[0] = '\0'; + configuration_get_value(&config, "PARAMS", "save_router_timing_trace_stride", anno, + router_timing_trace_stride_str, MAX_NAME_LENGTH); + if(strlen(router_timing_trace_stride_str) > 0) { + int const parsed_stride = atoi(router_timing_trace_stride_str); + if(parsed_stride > 0) { + router_timing_trace_stride = parsed_stride; + } else { + tw_warning(TW_LOC, + "Ignoring invalid save_router_timing_trace_stride=%s; using stride=1", + router_timing_trace_stride_str); + } + } + // START Surrogate configuration char enable_str[MAX_NAME_LENGTH]; enable_str[0] = '\0'; @@ -2485,6 +2526,17 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params) if (rc_enable > 0) { enable_network_surrogate = (strcmp(enable_str, "1") == 0 || strcmp(enable_str, "true") == 0); } + + char debug_prints_str[MAX_NAME_LENGTH]; + debug_prints_str[0] = '\0'; + configuration_get_value(&config, "NETWORK_SURROGATE", "debug_prints", anno, + debug_prints_str, MAX_NAME_LENGTH); + dfdally_surrogate_debug_prints = (strcmp(debug_prints_str, "1") == 0 || + strcmp(debug_prints_str, "true") == 0 || + strcmp(debug_prints_str, "TRUE") == 0 || + strcmp(debug_prints_str, "yes") == 0 || + strcmp(debug_prints_str, "YES") == 0); + // if surrogate mode has been set up if (enable_network_surrogate) { struct network_surrogate_config surr_conf = { @@ -2571,7 +2623,90 @@ static void setup_packet_latency_path(char const * const dir_to_save) { tw_error(TW_LOC, "File %s could not be opened", filename_path); } - fprintf(packet_latency_f, "#src_terminal,dest_terminal,packet_id,is_surrogate_on,is_predicted,size,workload_injection,next_packet_delay,start,end,latency,is_there_another_pckt_in_queue\n"); + fprintf(packet_latency_f, + "#src_terminal,dest_terminal,packet_size,is_there_another_pckt_in_queue," + "caller_lp_gid,src_router_id,src_group_id,dst_router_id,dst_group_id," + "terminal_queue_length,terminal_vc_occupancy,processing_packet_delay," + "travel_end_time_delta,next_packet_delay," + "packet_id,is_surrogate_on,is_predicted,workload_injection,start,end,latency\n"); +} + + +static void setup_router_timing_trace_path(char const * const dir_to_save) { + assert(router_timing_trace_f == NULL); + int const NO_ERROR = 0; + struct stat st; + memset(&st, 0, sizeof(struct stat)); + if(g_tw_mynode == 0 && stat(dir_to_save, &st) == -1) { + int res = mkdir(dir_to_save, 0700); + if (res != NO_ERROR) { + tw_error(TW_LOC, "Error (%d) occurred when attempting to mkdir folder `%s`", errno, dir_to_save); + } + } + MPI_Barrier(MPI_COMM_CODES); + + char const fmt[] = "%s/router-timing-gid=%lu.txt"; + int sz = snprintf(NULL, 0, fmt, dir_to_save, g_tw_mynode); + char filename_path[sz + 1]; + snprintf(filename_path, sizeof(filename_path), fmt, dir_to_save, g_tw_mynode); + router_timing_trace_f = fopen(filename_path, "w+"); + if(!router_timing_trace_f) { + tw_error(TW_LOC, "File %s could not be opened", filename_path); + } + + fprintf(router_timing_trace_f, + "#router_id,group_id,output_port,output_chan,to_terminal,is_global," + "packet_size,chunk_size,output_vc_occupancy,output_queued_count," + "next_output_available_delta,nominal_router_delay,router_queueing_delay," + "actual_router_delay,is_surrogate_on,is_predicted,now,packet_id,src_terminal,dest_terminal,next_stop\n"); +} + +static inline void router_timing_trace_save_to_file( + router_state const *s, + terminal_dally_message const *pkt, + int output_port, + int output_chan, + int to_terminal, + int is_global, + double next_output_available_delta, + double nominal_router_delay, + double actual_router_delay, + bool surrogate_on, + bool is_predicted, + tw_lp *lp) +{ + if (!router_timing_trace_f) { return; } + + unsigned long long const trace_idx = router_timing_trace_counter++; + if(router_timing_trace_stride > 1 && + (trace_idx % (unsigned long long)router_timing_trace_stride) != 0ULL) { + return; + } + + double const router_queueing_delay = maxd(0.0, actual_router_delay - nominal_router_delay); + fprintf(router_timing_trace_f, + "%u,%d,%d,%d,%d,%d,%u,%d,%d,%d,%f,%f,%f,%f,%d,%d,%f,%lu,%u,%u,%llu\n", + s->router_id, + s->group_id, + output_port, + output_chan, + to_terminal, + is_global, + pkt->packet_size, + s->params->chunk_size, + s->vc_occupancy[output_port][output_chan], + s->queued_count[output_port], + next_output_available_delta, + nominal_router_delay, + router_queueing_delay, + actual_router_delay, + surrogate_on, + is_predicted, + tw_now(lp), + (unsigned long)pkt->packet_ID, + pkt->dfdally_src_terminal_id, + pkt->dfdally_dest_terminal_id, + (unsigned long long)pkt->next_stop); } /* report dragonfly statistics like average and maximum packet latency, average number of hops traversed */ @@ -2608,6 +2743,11 @@ void dragonfly_dally_report_stats() if (packet_latency_f) { fclose(packet_latency_f); + packet_latency_f = NULL; + } + if (router_timing_trace_f) { + fclose(router_timing_trace_f); + router_timing_trace_f = NULL; } /* print statistics */ if(!g_tw_mynode) @@ -3019,6 +3159,94 @@ static int get_next_router_vcg(router_state * s, tw_bf * bf, terminal_dally_mess return -1; } +static uint32_t dfdally_clamp_u64_to_u32(uint64_t value) +{ + uint64_t const max_u32 = (uint64_t)((uint32_t)-1); + return value > max_u32 ? (uint32_t)-1 : (uint32_t)value; +} + +static unsigned int dfdally_safe_rail_id( + terminal_state const *s, + terminal_dally_message const *msg) +{ + if (!s || !s->params || s->params->num_rails <= 0) { + return 0; + } + + if (msg && msg->rail_id >= 0 && msg->rail_id < s->params->num_rails) { + return (unsigned int)msg->rail_id; + } + + return 0; +} + +static uint32_t dfdally_terminal_queue_length_bytes(terminal_state const *s) +{ + if (!s || !s->params || !s->terminal_length) { + return (uint32_t)-1; + } + + uint64_t total = 0; + for (int rail = 0; rail < s->params->num_rails; rail++) { + if (!s->terminal_length[rail]) { + continue; + } + for (int qos = 0; qos < s->params->num_qos_levels; qos++) { + if (s->terminal_length[rail][qos] > 0) { + total += (uint64_t)s->terminal_length[rail][qos]; + } + } + } + + return dfdally_clamp_u64_to_u32(total); +} + +static uint32_t dfdally_terminal_vc_occupancy_bytes(terminal_state const *s) +{ + if (!s || !s->params || !s->vc_occupancy) { + return (uint32_t)-1; + } + + uint64_t total = 0; + for (int rail = 0; rail < s->params->num_rails; rail++) { + if (!s->vc_occupancy[rail]) { + continue; + } + for (int qos = 0; qos < s->params->num_qos_levels; qos++) { + if (s->vc_occupancy[rail][qos] > 0) { + total += (uint64_t)s->vc_occupancy[rail][qos]; + } + } + } + + return dfdally_clamp_u64_to_u32(total); +} + +static uint32_t dfdally_terminal_src_router_id( + terminal_state const *s, + terminal_dally_message const *msg) +{ + if (!s || !s->params) { + return (uint32_t)-1; + } + + unsigned int const rail_id = dfdally_safe_rail_id(s, msg); + + /* + * In high-fidelity mode, s->router_id is valid. In frozen surrogate mode, + * dragonfly_dally_terminal_highdef_to_surrogate() zeroes most network + * state, so s->router_id may be NULL. Average mode still goes through + * packet_generate_predicted(), so do not dereference s->router_id unless + * it exists. + */ + if (s->router_id) { + return s->router_id[rail_id]; + } + + return dfdally_get_assigned_router_id_from_terminal( + s->params, s->terminal_id, rail_id); +} + static inline void packet_latency_save_to_file( unsigned int terminal_id, struct packet_start * start, @@ -3028,19 +3256,37 @@ static inline void packet_latency_save_to_file( ) { if (!packet_latency_f) { return; } // Don't save if there isn't a file to save to if (end->travel_end_time > g_tw_ts_end) { return; } // This packet could never arrive to its destination! - fprintf(packet_latency_f, "%u,%u,%lu,%d,%d,%u,%f,%f,%f,%f,%f,%d\n", - terminal_id, start->dfdally_dest_terminal_id, start->packet_ID, - surrogate_on, is_predicted, start->packet_size, + double const latency = end->travel_end_time - start->travel_start_time; + fprintf(packet_latency_f, + "%u,%u,%u,%d,%llu,%u,%u,%u,%u,%u,%u,%f,%f,%f,%lu,%d,%d,%f,%f,%f,%f\n", + terminal_id, + start->dfdally_dest_terminal_id, + start->packet_size, + start->is_there_another_pckt_in_queue, + (unsigned long long)start->caller_lp_gid, + start->src_router_id, + start->src_group_id, + start->dst_router_id, + start->dst_group_id, + start->terminal_queue_length, + start->terminal_vc_occupancy, + start->processing_packet_delay, + latency, + end->next_packet_delay, + start->packet_ID, + surrogate_on, + is_predicted, start->workload_injection_time, - end->next_packet_delay, start->travel_start_time, - end->travel_end_time, end->travel_end_time - start->travel_start_time, - start->is_there_another_pckt_in_queue); + start->travel_start_time, + end->travel_end_time, + latency); } // ==== START OF Surrogate functions definition ==== static void switch_surrogate(void) { is_dally_surrogate_on = ! is_dally_surrogate_on; + dfdally_surrogate_has_switched_once = true; } static bool is_surrogate_on_fun(void) { @@ -3408,6 +3654,14 @@ static void terminal_commit_packet_generate(terminal_state * s, tw_bf * bf, term // TODO (elkin): In the future, this ugly initialization could be done all in a single "line" instead of setting all values one by one. The reason to do it this way is because some old compilers do not understand other ways of initializing struct packet_sent sent; + unsigned int const feature_rail_id = dfdally_safe_rail_id(s, msg); + uint32_t const src_router_id = dfdally_terminal_src_router_id(s, msg); + uint32_t const src_group_id = src_router_id / s->params->num_routers; + uint32_t const dst_router_id = + dfdally_get_assigned_router_id_from_terminal( + s->params, msg->dfdally_dest_terminal_id, feature_rail_id); + uint32_t const dst_group_id = dst_router_id / s->params->num_routers; + sent.start.packet_ID = msg->packet_ID; sent.start.dest_terminal_lpid = msg->dest_terminal_lpid; sent.start.dfdally_dest_terminal_id = msg->dfdally_dest_terminal_id; @@ -3416,6 +3670,13 @@ static void terminal_commit_packet_generate(terminal_state * s, tw_bf * bf, term sent.start.processing_packet_delay = processing_packet_delay; sent.start.packet_size = msg->packet_size; sent.start.is_there_another_pckt_in_queue = msg->is_there_another_pckt_in_queue; + sent.start.caller_lp_gid = lp->gid; + sent.start.src_router_id = src_router_id; + sent.start.src_group_id = src_group_id; + sent.start.dst_router_id = dst_router_id; + sent.start.dst_group_id = dst_group_id; + sent.start.terminal_queue_length = dfdally_terminal_queue_length_bytes(s); + sent.start.terminal_vc_occupancy = dfdally_terminal_vc_occupancy_bytes(s); sent.next_packet_delay = -1; sent.message_data = msg_data; sent.remote_event_data = remote_data; @@ -3454,6 +3715,14 @@ static void terminal_dally_commit(terminal_state * s, case T_GENERATE: if(bf->c10) { // if the packet was sent as a prediction, store the prediction in memory assert(dally_surrogate_configured); + unsigned int const feature_rail_id = dfdally_safe_rail_id(s, msg); + uint32_t const src_router_id = dfdally_terminal_src_router_id(s, msg); + uint32_t const src_group_id = src_router_id / s->params->num_routers; + uint32_t const dst_router_id = + dfdally_get_assigned_router_id_from_terminal( + s->params, msg->dfdally_dest_terminal_id, feature_rail_id); + uint32_t const dst_group_id = dst_router_id / s->params->num_routers; + auto start = (struct packet_start) { .packet_ID = msg->packet_ID, .dest_terminal_lpid = msg->dest_terminal_lpid, @@ -3462,7 +3731,14 @@ static void terminal_dally_commit(terminal_state * s, .workload_injection_time = msg->msg_start_time, .processing_packet_delay = -1, .packet_size = msg->packet_size, - .is_there_another_pckt_in_queue = msg->is_there_another_pckt_in_queue + .is_there_another_pckt_in_queue = msg->is_there_another_pckt_in_queue, + + .src_router_id = src_router_id, + .src_group_id = src_group_id, + .dst_router_id = dst_router_id, + .dst_group_id = dst_group_id, + .terminal_queue_length = dfdally_terminal_queue_length_bytes(s), + .terminal_vc_occupancy = dfdally_terminal_vc_occupancy_bytes(s) }; // Saving @@ -4118,6 +4394,15 @@ static void packet_generate_predicted(terminal_state * s, tw_bf * bf, terminal_d // Using predictor to find latency double const processing_packet_delay = tw_now(lp) - s->last_in_queue_time; + + unsigned int const feature_rail_id = dfdally_safe_rail_id(s, msg); + uint32_t const src_router_id = dfdally_terminal_src_router_id(s, msg); + uint32_t const src_group_id = src_router_id / s->params->num_routers; + uint32_t const dst_router_id = + dfdally_get_assigned_router_id_from_terminal( + s->params, msg->dfdally_dest_terminal_id, feature_rail_id); + uint32_t const dst_group_id = dst_router_id / s->params->num_routers; + auto start = (struct packet_start) { .packet_ID = msg->packet_ID, .dest_terminal_lpid = msg->dest_terminal_lpid, @@ -4126,9 +4411,27 @@ static void packet_generate_predicted(terminal_state * s, tw_bf * bf, terminal_d .workload_injection_time = msg->msg_start_time, .processing_packet_delay = processing_packet_delay, .packet_size = msg->packet_size, - .is_there_another_pckt_in_queue = msg->is_there_another_pckt_in_queue + .is_there_another_pckt_in_queue = msg->is_there_another_pckt_in_queue, + + .src_router_id = src_router_id, + .src_group_id = src_group_id, + .dst_router_id = dst_router_id, + .dst_group_id = dst_group_id, + /* + * In predicted/surrogate generation, the high-fidelity terminal queue + * structures are bypassed, so dfdally_terminal_queue_length_bytes(s) + * can be zero even though the model was trained on the queue occupancy + * observed during high-fidelity packet generation. Avoid feeding an + * out-of-distribution zero into the LP-aware Torch-JIT model. + */ + .terminal_queue_length = dfdally_terminal_queue_length_bytes(s), + .terminal_vc_occupancy = dfdally_terminal_vc_occupancy_bytes(s) }; + if (start.terminal_queue_length == 0 && start.packet_size > 0) { + start.terminal_queue_length = start.packet_size; + } + struct packet_end const end = terminal_predictor->predict(s->predictor_data, lp, s->terminal_id, &start); double const latency = end.travel_end_time - start.travel_start_time; @@ -6649,14 +6952,76 @@ static void router_packet_send( router_state * s, tw_bf * bf, terminal_dally_mes injection_delay += s->params->router_delay; + double const next_output_available_delta = + maxd(0.0, s->next_output_available_time[output_port] - tw_now(lp)); + double const nominal_router_delay = injection_delay + propagation_delay; + msg->saved_available_time = s->next_output_available_time[output_port]; - s->next_output_available_time[output_port] = + s->next_output_available_time[output_port] = maxd(s->next_output_available_time[output_port], tw_now(lp)); s->next_output_available_time[output_port] += injection_delay; injection_ts = s->next_output_available_time[output_port] - tw_now(lp); propagation_ts = injection_ts + propagation_delay; + /* + * High-fidelity router-local delay target for ML training. + * + * propagation_ts is only the delay from this router send event to the next + * hop. It does not include the time the chunk already spent waiting in this + * router's queue before router_packet_send() fired. For the router timing + * model, the useful target is the total local residence+service+propagation + * time from this_router_arrival through arrival at the next hop. + * + * nominal_router_delay is the deterministic no-queueing service+propagation + * delay. Therefore: + * + * queueing_delay = max(0, highdef_actual_router_delay - nominal_router_delay) + * + * captures router-local waiting/contention time. + */ + double const highdef_actual_router_delay = + maxd(0.0, s->next_output_available_time[output_port] - + cur_entry->msg.this_router_arrival) + + propagation_delay; + bool router_timing_prediction_used = false; +#ifdef USE_TORCH + if (is_dally_surrogate_on && surrogate_torch_router_timing_model_enabled()) { + struct router_timing_prediction_start timing_start = { + .router_id = (float)s->router_id, + .group_id = (float)s->group_id, + .output_port = (float)output_port, + .output_chan = (float)output_chan, + .to_terminal = (float)to_terminal, + .is_global = (float)global, + .packet_size = (float)cur_entry->msg.packet_size, + .chunk_size = (float)s->params->chunk_size, + .output_vc_occupancy = (float)s->vc_occupancy[output_port][output_chan], + .output_queued_count = (float)s->queued_count[output_port], + .next_output_available_delta = (float)next_output_available_delta, + .nominal_router_delay = (float)nominal_router_delay, + }; + double const predicted_queueing_delay = + surrogate_torch_predict_router_queueing_delay(&timing_start, 0.0); + propagation_ts = nominal_router_delay + predicted_queueing_delay; + router_timing_prediction_used = true; + } +#endif + + router_timing_trace_save_to_file( + s, + &cur_entry->msg, + output_port, + output_chan, + to_terminal, + global, + next_output_available_delta, + nominal_router_delay, + highdef_actual_router_delay, + is_dally_surrogate_on, + router_timing_prediction_used, + lp); + cur_entry->msg.this_router_ptp_latency = s->next_output_available_time[output_port] - cur_entry->msg.this_router_arrival; msg->this_router_ptp_latency = cur_entry->msg.this_router_ptp_latency; @@ -6910,6 +7275,27 @@ terminal_dally_event( terminal_state * s, terminal_dally_message * msg, tw_lp * lp ) { + if (dfdally_surrogate_debug_prints && dfdally_surrogate_has_switched_once && !is_dally_surrogate_on) { + dfdally_post_switch_terminal_events++; + if (dfdally_post_switch_terminal_events <= 20 || + dfdally_post_switch_terminal_events % 100000 == 0) { + fprintf(stderr, + "[post-switch terminal debug] count=%llu now=%f lp=%llu type=%d " + "last_now=%f delta_now=%f\n", + (unsigned long long)dfdally_post_switch_terminal_events, + tw_now(lp), + (unsigned long long)lp->gid, + msg->type, + dfdally_last_post_switch_terminal_now, + dfdally_last_post_switch_terminal_now < 0.0 + ? -1.0 + : tw_now(lp) - dfdally_last_post_switch_terminal_now); + fflush(stderr); + } + dfdally_last_post_switch_terminal_now = tw_now(lp); + } + + msg->num_cll = 0; msg->num_rngs = 0; @@ -6980,6 +7366,27 @@ terminal_dally_event( terminal_state * s, static void router_dally_event(router_state * s, tw_bf * bf, terminal_dally_message * msg, tw_lp * lp) { + if (dfdally_surrogate_debug_prints && dfdally_surrogate_has_switched_once && !is_dally_surrogate_on) { + dfdally_post_switch_router_events++; + if (dfdally_post_switch_router_events <= 20 || + dfdally_post_switch_router_events % 100000 == 0) { + fprintf(stderr, + "[post-switch router debug] count=%llu now=%f lp=%llu type=%d " + "last_now=%f delta_now=%f\n", + (unsigned long long)dfdally_post_switch_router_events, + tw_now(lp), + (unsigned long long)lp->gid, + msg->type, + dfdally_last_post_switch_router_now, + dfdally_last_post_switch_router_now < 0.0 + ? -1.0 + : tw_now(lp) - dfdally_last_post_switch_router_now); + fflush(stderr); + } + dfdally_last_post_switch_router_now = tw_now(lp); + } + + msg->num_cll = 0; msg->num_rngs = 0; diff --git a/src/surrogate/director-client.C b/src/surrogate/director-client.C new file mode 100644 index 00000000..c3738064 --- /dev/null +++ b/src/surrogate/director-client.C @@ -0,0 +1,505 @@ +#include +#include +#include +#include +#include + +#include +#include +#include // std::min_element +#include //std::fixed +#include // std::precision + +#include "codes/surrogate/director-client.h" +#include "codes/configuration.h" +#include "zmqmlrequester.h" + + +#define NUM_ACTIVE_CLIENTS 72 //TODO: this should be calculated at runtime + +#define DIR_ZMQ_CMD_LENGTH 15 +#define DIR_ZMQ_ARG_LENGTH 100 + +#define DIR_MAX_PREDICTION 5 +#define DIR_MAX_TRAINING_RECORDS 10 +#define DIR_MAX_DATA_SIZE 15 + +struct +{ + int surr_iter_start; + int surr_iter_end; +} director_config_global; + +typedef struct director_state director_state; + +// Some flag to relocate/clean-up +int evaluate_perf = 1; +int training_enabled = 0; //TODO: Move this to the LP state +int surrogate_enabled = 0; +int inferencing_enabled = 1; + + +std::vector total_elapsed_times; +std::vector zmq_processing_times; + +// state of the director LP +struct director_state +{ + tw_lpid director_id; + int simulation_mode; + + int training_cycle_id; + int training_record_id; + tw_stime training_data[DIR_MAX_TRAINING_RECORDS]; + //std::vector training_data_vc; + + int next_prediction_index; + tw_stime predictions[DIR_MAX_PREDICTION]; + + void *nw_event_ptr[NUM_DIR_TO_NW_EVENT]; + int nw_event_size[NUM_DIR_TO_NW_EVENT]; + + tw_lpid nw_lpid; +}; + +std::vector director_get_str_list(const char *s, const char delimiter) { + std::vector result; + std::stringstream ss (s); + std::string item; + + while (getline (ss, item, delimiter)) { result.push_back (item); } + return result; +} +std::string director_get_list_str(std::vector s, const char delimiter) { + std::ostringstream mergedstr; + std::copy(s.begin(), s.end(), std::ostream_iterator(mergedstr, " ")); + std::cout << mergedstr.str() << std::endl; + return mergedstr.str(); +} + +std::vector director_client_request( + const char* cmd, + const char* args, + const std::string data) +{ + std::vector ret; + /* + std::cout << cmd << " ARGS " << args << std::endl; + //if(strcmp(cmd, "send-records") == 0){ + std::cout << data << std::endl; + //} + */ + if(strcmp(cmd, "exit") == 0){ + ret = zmqml_request(cmd); + return ret; + } + + auto start_time = std::chrono::steady_clock::now(); // TODO - find a way to enclose this in evaluate_perf? + + std::vector args_list; + args_list = director_get_str_list(args, ';'); + ret = zmqml_request(cmd, args_list, data); + + if (evaluate_perf == 1){ + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration(end_time - start_time).count(); + + total_elapsed_times.push_back(duration); + zmq_processing_times.push_back( std::stod(ret[1]) ); + + if(zmq_processing_times.size() == NUM_ACTIVE_CLIENTS){ + double sum = 0; + for (double ts : zmq_processing_times) sum += ts; + double mean = sum / zmq_processing_times.size(); + double sum_sq_diff = 0; + for (double ts : zmq_processing_times) sum_sq_diff += (ts - mean) * (ts - mean); + double std_dev = sqrt(sum_sq_diff / zmq_processing_times.size()); + auto min = std::min_element(zmq_processing_times.begin(), zmq_processing_times.end()); + auto max = std::max_element(zmq_processing_times.begin(), zmq_processing_times.end()); + + zmq_processing_times.clear(); + + std::cout << std::setprecision(9) << std::fixed; + /* + std::cout << "ZMQ_VALS: "; + for(auto d: zmq_processing_times) + std::cout << d << " ;"; + std::cout << std::endl; + */ + std::cout << "==DIR_STATS zmq-processing: " << cmd + << " latency: mean = " << mean + << ", min = " << *min + << ", max = " << *max + << ", std-deviation = " << std_dev + << std::endl; + + double tsum = 0; + for (double ts : total_elapsed_times) tsum += ts; + double tmean = tsum / total_elapsed_times.size(); + double tsum_sq_diff = 0; + for (double ts : total_elapsed_times) tsum_sq_diff += (ts - tmean) * (ts - tmean); + double tstd_dev = sqrt(tsum_sq_diff / total_elapsed_times.size()); + auto tmin = std::min_element(total_elapsed_times.begin(), total_elapsed_times.end()); + auto tmax = std::max_element(total_elapsed_times.begin(), total_elapsed_times.end()); + /* + std::cout << "TOTAL_VALS: "; + for(auto d: total_elapsed_times) + std::cout << d << " ;"; + std::cout << std::endl; + */ + total_elapsed_times.clear(); + + std::cout << "==DIR_STATS zmq-total: " << cmd + << " latency: mean = " << tmean + << ", min = " << *tmin + << ", max = " << *tmax + << ", std-deviation = " << tstd_dev + << std::endl; + } + } + /* + std::cout << cmd << "|" << args << " | "; + for(auto s: ret) + std::cout << s << " ;"; + std::cout << std::endl; + */ + + return ret; +} + + +/* Trigger CODES Event From Director */ +static void director_issue_codes_event(director_state * s, tw_lpid nw_lpid, int dir_registered_event_type, tw_stime ts, tw_lp* lp) +{ + + tw_event *e; + void* msg; + + //printf("==DIR: ts: %lf\n", ts); + e = tw_event_new(nw_lpid, ts, lp); + msg = (void*)tw_event_data(e); + + memcpy(msg, s->nw_event_ptr[dir_registered_event_type], s->nw_event_size[dir_registered_event_type]); + + //msg->msg_type = dir_registered_event_type; + tw_event_send(e); +} + +void director_register_events(director_state * s, director_message * msg, tw_lp * lp) +{ + int dir_registered_event_type = msg->msg_type; + int pdes_msg_size = msg->value; + + //printf("==DIR[%d] DIR Registering dir_event_type:%d (time: %lf)\n", + // s->director_id, dir_registered_event_type, tw_now(lp)); + + s->nw_event_size[dir_registered_event_type] = pdes_msg_size; + memcpy(s->nw_event_ptr[dir_registered_event_type], &msg->buffer, pdes_msg_size); + + //int pdes_event_type = msg->op_type; + //nw_message *buffer = &msg->buffer; + //nw_message *saved_msg = s->nw_event_ptr[dir_registered_event_type]; + //printf("==DIR s->director_id: %d | dir_registered_event_type: %d | pdes_event_type: %d (%d)\n", + // s->director_id, dir_registered_event_type, pdes_event_type, + // buffer->msg_type); +} + + + + +// initializes the director LP +void director_init(director_state* s, tw_lp* lp) +{ + // initialize the LP's and load the data + memset(s, 0, sizeof(*s)); + s->simulation_mode = SIM_MODE_PDES; + s->director_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0); + + s->training_cycle_id = 0; + s->training_record_id = 1; + s->training_data[0] = tw_now(lp); + //s->training_data_vc.push_back(tw_now(lp)); + s->next_prediction_index = -1; + for(int i = 0; i < DIR_MAX_PREDICTION; i++){ + s->predictions[i] = (tw_stime) 1000000; + } + + for(int i = 0; i < NUM_DIR_TO_NW_EVENT; i++){ + s->nw_event_ptr[i] = (void*) calloc(1, g_tw_msg_sz); + } + + // get lp_id of the nw that matches this director + int num_nw_per_mgrp; + s->nw_lpid; + num_nw_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "nw-lp", NULL, 0); + codes_mapping_get_lp_id("MODELNET_GRP", "nw-lp", NULL, 1, s->director_id / num_nw_per_mgrp, s->director_id % num_nw_per_mgrp, &(s->nw_lpid)); + + // Get switching criteria from configuration + // if we're switch based on iteration - read iter start and end + // if switch based on virtual time - schedule sending switch event to CODES + // (stage 2) if switch based on accuracy - schedule polling for accuracy + // (stage 2) pass training data from CODES to surrogate + // (stage 3) using workload with network surrogates + + // Update global configs + if(s->director_id == 1) + { + int rc = 1, rc1 = 1, rc2 = 1; + rc = configuration_get_value_int(&config, "DIRECTOR", "surrogate_enabled", NULL, &surrogate_enabled); + if(rc) + surrogate_enabled = 0; + if(surrogate_enabled){ + rc1 = configuration_get_value_int(&config, "DIRECTOR", "start_iter", NULL, &director_config_global.surr_iter_start); + rc2 = configuration_get_value_int(&config, "DIRECTOR", "end_iter", NULL, &director_config_global.surr_iter_end); + if(rc1 || rc2){ + director_config_global.surr_iter_start = 100000; + director_config_global.surr_iter_end = 100001; + surrogate_enabled = 0; + } + } + + rc = configuration_get_value_int(&config, "DIRECTOR", "inferencing_enabled", NULL, &inferencing_enabled); + if(rc) + inferencing_enabled = 0; + + rc = configuration_get_value_int(&config, "DIRECTOR", "training_enabled", NULL, &training_enabled); + if(rc) + training_enabled = 0; + } + //printf("\n==DIR s->director_id: %d | lp->gid: %llu | s->nw_lpid: %llu", s->director_id, LLU(lp->gid), LLU(s->nw_lpid)); + + /* + char commandstr[DIR_ZMQ_CMD_LENGTH]; + char args[DIR_ZMQ_ARG_LENGTH]; + std::vector ret_vals; + + sprintf(commandstr,"cmd-%d", s->director_id); + sprintf(args, "2;args-1;"); + + ret_vals = director_client_request(commandstr, args, ""); + //printf("UNIVERSE: %s - %s\n", commandstr, ret_vals[0]); + */ + + return; +} + +void director_prepare_iteration_dataset(director_state* s, tw_stime * training_data, int training_cycle, int training_records) +{ + //printf("==DIR[%d] Training Cycle: %d\n", s->director_id, training_cycle); + + std::string processed_training_data_str; + int i, length = 0; + double tmp_data = 0.0; + char tmp_data_str[DIR_MAX_DATA_SIZE]; + int written = 0; + + // Prepare dataset + for(i = 1; i < training_records; i++) + { + tmp_data = training_data[i] - training_data[i-1]; + written += sprintf(tmp_data_str, "%.2f;", tmp_data); + //strcat(processed_training_data_str, tmp_data_str); + + processed_training_data_str.append(std::to_string(tmp_data)); + processed_training_data_str.append(" "); + // printf(" %3d: %lf [%lf]\n", i, training_data[i], tmp_data); + } + //std::cout << "Processed Data: " << processed_training_data_str << std::endl; + + // Send dataset + std::vector ret_vals; + char commandstr[DIR_ZMQ_CMD_LENGTH]; + char args[DIR_ZMQ_ARG_LENGTH]; + + sprintf(commandstr, "send-records"); + sprintf(args, "%d;%d;%d", 3, s->director_id, training_records - 1); // num-of-args;num-record + ret_vals = director_client_request(commandstr, args, processed_training_data_str); +} + +void director_get_surrogate_prediction(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp, tw_stime* delay_ts) +{ + // Check if we have sufficient predictions + if(s->next_prediction_index == -1){ // we need more + //printf("==DIR[%d] DIR Prediction -- generating set (time: %lf)\n", + // s->director_id, tw_now(lp)); + + if(inferencing_enabled){ + // Pull more predictions + std::vector ret_vals; + char commandstr[15]; + char args[100]; + + sprintf(commandstr, "do-inference"); + sprintf(args, "%d;%d;%d;", 3, s->director_id, DIR_MAX_PREDICTION); // num-of-args;num-record + std::string input_data = ("1000000 1000000 1000000 1000000"); + + ret_vals = director_client_request(commandstr, args, input_data); + /* + std::cout << "PREDICTIONS: " << commandstr + << " [0]" << ret_vals[0] + << " [1]" << ret_vals[1] + << " [2]" << ret_vals[2] + << std::endl; + */ + std::vector predictions = director_get_str_list(ret_vals[2].c_str(), ' '); + + //std::cout << "PREDICTIONS: " << predictions.size() << " | "; + int i = 0; + for(auto p: predictions){ + //std::cout << " " << std::stof(p); + s->predictions[i] = std::stof(p); + i += 1; + } + //std::cout << std::endl; + assert(i <= DIR_MAX_PREDICTION); + } + + s->next_prediction_index = 0; + } + *delay_ts = s->predictions[s->next_prediction_index]; + + s->next_prediction_index = s->next_prediction_index + 1; + + // Check if we've exhuasted the predictions + if(s->next_prediction_index == DIR_MAX_PREDICTION){ + s->next_prediction_index = -1; + } +} + + +void director_event_handler(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp) +{ + + switch(m->msg_type) + { + case DIR_OP_NW: + if(s->simulation_mode == SIM_MODE_PDES) + { + tw_error(TW_LOC, "DIR sent for non-annotation operation during PDES mode."); + } else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) + { + //printf("==DIR[%d] Skipping NW Op type:%d (time: %lf)\n", s->director_id, m->value, tw_now(lp)); + + tw_stime delay_ts = 0.001; + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); + } + break; + + case DIR_AN_ITER_MARK: + //fprintf(iteration_log, "DIR %d (time %lf)\n", s->director_id, tw_now(lp)); + //printf("==DIR[%d] DIR_AN_ITER_MARK m->value: %d (time: %lf)\n", s->director_id, m->value, tw_now(lp)); + if(s->simulation_mode == SIM_MODE_PDES) + { + // Manage training data + if(training_enabled && s->training_record_id < DIR_MAX_TRAINING_RECORDS) + {// There is space to store more training data + s->training_data[s->training_record_id] = tw_now(lp); + //s->training_data_vc.push_back(tw_now(lp)); + s->training_record_id = s->training_record_id + 1; + } + if(training_enabled && s->training_record_id == DIR_MAX_TRAINING_RECORDS) + {// We've filled all training data slots + //printf("==DIR[%d] Sending training dataset (time: %lf)\n", s->director_id, tw_now(lp)); + + // Prepare and send training data + director_prepare_iteration_dataset(s, s->training_data, s->training_cycle_id, DIR_MAX_TRAINING_RECORDS); + + // Increment cycle counter, reset record counter, and prime dataset + s->training_cycle_id = s->training_cycle_id + 1; + s->training_record_id = 1; + s->training_data[0] = tw_now(lp); + } + if(surrogate_enabled && m->value == director_config_global.surr_iter_start) + { + //printf("==DIR[%d] Triggering switch to SURR (time: %lf)\n", s->director_id, tw_now(lp)); + + s->simulation_mode = SIM_MODE_ITERATION_SURROGATE; + tw_stime delay_ts; + director_get_surrogate_prediction(s, bf, m, lp, &delay_ts); + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__SWITCH_TO_SURR, delay_ts, lp); + return; + } + else + { + tw_stime delay_ts = 0.001; + //printf("===[%llu] D-MARK[%llu]: Value=%d\n", s->nw_lpid, lp->gid, m->value ); + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); + return; + } + } + else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) + { + if(m->value == director_config_global.surr_iter_end) + { + //printf("==DIR[%d] Triggering switch to PDES (time: %lf)\n", s->director_id, tw_now(lp)); + + s->simulation_mode = SIM_MODE_PDES; + tw_stime delay_ts = 0.001; + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__SWITCH_TO_PDES, delay_ts, lp); + + if(training_enabled){ + // Restart training data collection + //s->training_data_vc.clear(); + s->training_data[0] = tw_now(lp); + s->training_record_id = 1; + } + return; + } + else // we need to predict when the next iteration will start + { + tw_stime delay_ts; + director_get_surrogate_prediction(s, bf, m, lp, &delay_ts); + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); + return; + } + } + else + { + tw_error(TW_LOC, "[DIR] Simulation mode unknown."); + } + + break; + + case DIR_REGISTERED_EVENT__SWITCH_TO_SURR: + case DIR_REGISTERED_EVENT__SWITCH_TO_PDES: + case DIR_REGISTERED_EVENT__MOVE_TO_NEXT: + director_register_events(s, m, lp); + break; + + default: + break; + } +} + +void director_finalize(director_state* s, tw_lp* lp) +{ + if (s->director_id == 0 && (training_enabled || inferencing_enabled)) + director_client_request("exit", "", ""); + + //printf("\n==DIR: FINALIZED"); +} + +tw_lptype dir_lp = { + (init_f) director_init, + (pre_run_f) NULL, + (event_f) director_event_handler, + (revent_f) NULL, //director_event_handler_rc, + (commit_f) NULL, //director_event_handler_commit, + (final_f) director_finalize, + (map_f) codes_mapping, + sizeof(director_state) +}; + +extern void director_lp_register_model(const char * dir_lp_name){ + int num_dir_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "dir-nw-lp", NULL, 0); + if(num_dir_per_mgrp > 0){ + lp_type_register(dir_lp_name, &dir_lp); // DIRECTOR addition - register type + //printf("\n==DIR: Registered\n"); + } +} + + +/* ========================================================== + END OF Director Code (To be moved to separate files) + ========================================================== +*/ diff --git a/src/surrogate/init.c b/src/surrogate/init.c index dc165eae..1e680a15 100644 --- a/src/surrogate/init.c +++ b/src/surrogate/init.c @@ -85,11 +85,22 @@ bool network_surrogate_configure( } // Determining which predictor to set up and return + char debug_prints_str[MAX_NAME_LENGTH]; + debug_prints_str[0] = '\0'; + configuration_get_value(&config, "NETWORK_SURROGATE", "debug_prints", anno, + debug_prints_str, MAX_NAME_LENGTH); + bool debug_prints = (strcmp(debug_prints_str, "1") == 0 || + strcmp(debug_prints_str, "true") == 0 || + strcmp(debug_prints_str, "TRUE") == 0 || + strcmp(debug_prints_str, "yes") == 0 || + strcmp(debug_prints_str, "YES") == 0); + char latency_pred_name[MAX_NAME_LENGTH]; latency_pred_name[0] = '\0'; configuration_get_value(&config, "NETWORK_SURROGATE", "packet_latency_predictor", anno, latency_pred_name, MAX_NAME_LENGTH); if (*latency_pred_name) { if (strcmp(latency_pred_name, "average") == 0) { + average_latency_predictor_set_debug_prints(debug_prints); current_net_predictor = average_latency_predictor(sc->total_terminals); *pl_pred = ¤t_net_predictor; @@ -98,14 +109,50 @@ bool network_surrogate_configure( char torch_jit_mode[MAX_NAME_LENGTH]; torch_jit_mode[0] = '\0'; configuration_get_value(&config, "NETWORK_SURROGATE", "torch_jit_mode", anno, torch_jit_mode, MAX_NAME_LENGTH); - if (strcmp(torch_jit_mode, "single-static-model-for-all-terminals") != 0) { - tw_error(TW_LOC, "Unknown torch-jit mode `%s`", torch_jit_mode); - } - char torch_jit_model_path[MAX_NAME_LENGTH]; - torch_jit_model_path[0] = '\0'; - configuration_get_value(&config, "NETWORK_SURROGATE", "torch_jit_model_path", anno, torch_jit_model_path, MAX_NAME_LENGTH); - surrogate_torch_init(torch_jit_model_path); + surrogate_torch_set_debug_prints(debug_prints); + + if (strcmp(torch_jit_mode, "single-static-model-for-all-terminals") == 0) { + surrogate_torch_set_lp_aware_mode(false); + + char torch_jit_model_path[MAX_NAME_LENGTH]; + torch_jit_model_path[0] = '\0'; + configuration_get_value(&config, "NETWORK_SURROGATE", "torch_jit_model_path", anno, + torch_jit_model_path, MAX_NAME_LENGTH); + surrogate_torch_init(torch_jit_model_path); + } else if (strcmp(torch_jit_mode, "lp-aware-single-static-model") == 0) { + surrogate_torch_set_lp_aware_mode(true); + + char torch_jit_model_path[MAX_NAME_LENGTH]; + torch_jit_model_path[0] = '\0'; + configuration_get_value(&config, "NETWORK_SURROGATE", "torch_jit_model_path", anno, + torch_jit_model_path, MAX_NAME_LENGTH); + surrogate_torch_init(torch_jit_model_path); + } else if (strcmp(torch_jit_mode, "lp-aware-lp-type-models") == 0) { + char terminal_model_path[MAX_NAME_LENGTH]; + char router_timing_model_path[MAX_NAME_LENGTH]; + char default_model_path[MAX_NAME_LENGTH]; + terminal_model_path[0] = '\0'; + router_timing_model_path[0] = '\0'; + default_model_path[0] = '\0'; + + configuration_get_value(&config, "NETWORK_SURROGATE", "torch_jit_terminal_model_path", anno, + terminal_model_path, MAX_NAME_LENGTH); + configuration_get_value(&config, "NETWORK_SURROGATE", "torch_jit_router_timing_model_path", anno, + router_timing_model_path, MAX_NAME_LENGTH); + configuration_get_value(&config, "NETWORK_SURROGATE", "torch_jit_default_model_path", anno, + default_model_path, MAX_NAME_LENGTH); + + surrogate_torch_init_lp_type_models( + terminal_model_path, + router_timing_model_path, + default_model_path); + } else { + tw_error(TW_LOC, + "Unknown torch-jit mode `%s` (expected single-static-model-for-all-terminals, " + "lp-aware-single-static-model, or lp-aware-lp-type-models)", + torch_jit_mode); + } *pl_pred = &torch_latency_predictor; #endif @@ -119,6 +166,7 @@ bool network_surrogate_configure( ")", latency_pred_name); } } else { + average_latency_predictor_set_debug_prints(debug_prints); current_net_predictor = average_latency_predictor(sc->total_terminals); *pl_pred = ¤t_net_predictor; master_printf("Enabling average packet latency predictor (default behaviour)\n"); @@ -155,7 +203,6 @@ bool network_surrogate_configure( network_director_configure(sc, network_director_enabled ? &switch_network_at: NULL, freeze_network_on_switch); - //surr_config.director.switch_surrogate(); if (DEBUG_DIRECTOR && g_tw_mynode == 0) { fprintf(stderr, "Simulation starting on network %s mode\n", sc->model.is_surrogate_on() ? "surrogate" : "high-fidelity"); } diff --git a/src/surrogate/ml_models/component_level/train_torch_jit_packet_latency.py b/src/surrogate/ml_models/component_level/train_torch_jit_packet_latency.py new file mode 100755 index 00000000..301ca71f --- /dev/null +++ b/src/surrogate/ml_models/component_level/train_torch_jit_packet_latency.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +"""Train/export a TorchScript terminal/default packet-latency model. + +Input CSV rows come from PARAMS.save_packet_latency_path. The model contract is: + input FloatTensor[*,12] + output FloatTensor[*,2] = [travel_delta, next_packet_delay] +""" +import argparse +from pathlib import Path +import glob + +import numpy as np +import pandas as pd +import torch + +def resolve_device(name: str) -> torch.device: + if name == "auto": + name = "cuda" if torch.cuda.is_available() else "cpu" + if name == "cuda" and not torch.cuda.is_available(): + raise RuntimeError("CUDA was requested, but torch.cuda.is_available() is false.") + device = torch.device(name) + print(f"Using device: {device}") + if device.type == "cuda": + print(f"CUDA device: {torch.cuda.get_device_name(0)}") + return device + +from torch import nn +from torch.utils.data import DataLoader, TensorDataset + +FEATURE_COLUMNS = [ + "src_terminal", "dest_terminal", "packet_size", "is_there_another_pckt_in_queue", + "caller_lp_gid", "src_router_id", "src_group_id", "dst_router_id", "dst_group_id", + "terminal_queue_length", "terminal_vc_occupancy", "processing_packet_delay", +] +TARGET_COLUMNS = ["travel_end_time_delta", "next_packet_delay"] +ALL_COLUMNS = FEATURE_COLUMNS + TARGET_COLUMNS + [ + "packet_id", "is_surrogate_on", "is_predicted", "workload_injection", "start", "end", "latency" +] + +class MLP(nn.Module): + def __init__(self, in_dim: int, out_dim: int, hidden: int): + super().__init__() + self.net = nn.Sequential( + nn.Linear(in_dim, hidden), nn.ReLU(), + nn.Linear(hidden, hidden), nn.ReLU(), + nn.Linear(hidden, out_dim), + ) + def forward(self, x): + return self.net(x.float()) + +def read_trace(path: str) -> pd.DataFrame: + p = Path(path) + files = sorted(glob.glob(str(p / "packets-delay-gid=*.txt"))) if p.is_dir() else [str(p)] + if not files: + raise SystemExit(f"No packet latency trace files found under {path}") + frames = [pd.read_csv(f, comment="#", names=ALL_COLUMNS) for f in files] + df = pd.concat(frames, ignore_index=True) + df = df[(df["is_surrogate_on"] == 0) & (df["is_predicted"] == 0)] + df = df.replace([np.inf, -np.inf], np.nan).dropna(subset=FEATURE_COLUMNS + TARGET_COLUMNS) + df = df[(df["travel_end_time_delta"] > 0.0) & (df["next_packet_delay"] >= 0.0)] + if df.empty: + raise SystemExit("No usable high-fidelity packet latency rows after filtering") + return df + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--trace", required=True, help="Trace file or directory from save_packet_latency_path") + ap.add_argument("--out", required=True, help="Output TorchScript .pt path") + ap.add_argument("--epochs", type=int, default=30) + ap.add_argument("--batch-size", type=int, default=1024) + ap.add_argument("--hidden", type=int, default=128) + ap.add_argument("--lr", type=float, default=1e-3) + ap.add_argument("--device", default="auto", choices=["auto", "cpu", "cuda"], + help="Training device. auto uses CUDA if available, otherwise CPU.") + args = ap.parse_args() + device = resolve_device(args.device) + + df = read_trace(args.trace) + x = torch.tensor(df[FEATURE_COLUMNS].to_numpy(dtype=np.float32), device=device) + y = torch.tensor(df[TARGET_COLUMNS].to_numpy(dtype=np.float32), device=device) + + x_mu, x_sigma = x.mean(0), x.std(0).clamp_min(1e-6) + y_mu, y_sigma = y.mean(0), y.std(0).clamp_min(1e-6) + x_n = (x - x_mu) / x_sigma + y_n = (y - y_mu) / y_sigma + + model = MLP(len(FEATURE_COLUMNS), len(TARGET_COLUMNS), args.hidden).to(device) + opt = torch.optim.AdamW(model.parameters(), lr=args.lr) + loss_fn = nn.SmoothL1Loss() + loader = DataLoader(TensorDataset(x_n, y_n), batch_size=args.batch_size, shuffle=True) + + model.train() + for epoch in range(args.epochs): + losses = [] + for xb, yb in loader: + opt.zero_grad(set_to_none=True) + loss = loss_fn(model(xb), yb) + loss.backward() + opt.step() + losses.append(float(loss.detach())) + print(f"epoch={epoch+1} loss={np.mean(losses):.6g}") + + class NormalizedWrapper(nn.Module): + def __init__(self, base, x_mu, x_sigma, y_mu, y_sigma): + super().__init__() + self.base = base.eval() + self.register_buffer("x_mu", x_mu) + self.register_buffer("x_sigma", x_sigma) + self.register_buffer("y_mu", y_mu) + self.register_buffer("y_sigma", y_sigma) + def forward(self, x): + y = self.base((x.float() - self.x_mu) / self.x_sigma) + return y * self.y_sigma + self.y_mu + + # Export a CPU TorchScript module. Training may happen on CUDA, but the + # C++ surrogate path should be able to load and run this model on CPU. + model = model.to("cpu").eval() + x_mu = x_mu.detach().to("cpu") + x_sigma = x_sigma.detach().to("cpu") + y_mu = y_mu.detach().to("cpu") + y_sigma = y_sigma.detach().to("cpu") + + wrapped = NormalizedWrapper(model, x_mu, x_sigma, y_mu, y_sigma).to("cpu").eval() + example = torch.zeros(1, len(FEATURE_COLUMNS), dtype=torch.float32) + + with torch.no_grad(): + traced = torch.jit.trace(wrapped, example) + Path(args.out).parent.mkdir(parents=True, exist_ok=True) + traced.save(args.out) + print(f"saved {args.out}") + +if __name__ == "__main__": + main() diff --git a/src/surrogate/ml_models/component_level/train_torch_jit_router_timing.py b/src/surrogate/ml_models/component_level/train_torch_jit_router_timing.py new file mode 100755 index 00000000..974d2188 --- /dev/null +++ b/src/surrogate/ml_models/component_level/train_torch_jit_router_timing.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +"""Train/export a TorchScript router queueing-delay model. + +Input CSV rows come from PARAMS.save_router_timing_trace_path. The model contract is: + input FloatTensor[*,12] + output FloatTensor[*,1] = [router_queueing_delay] +""" +import argparse +from pathlib import Path +import glob + +import numpy as np +import pandas as pd +import torch + +def resolve_device(name: str) -> torch.device: + if name == "auto": + name = "cuda" if torch.cuda.is_available() else "cpu" + if name == "cuda" and not torch.cuda.is_available(): + raise RuntimeError("CUDA was requested, but torch.cuda.is_available() is false.") + device = torch.device(name) + print(f"Using device: {device}") + if device.type == "cuda": + print(f"CUDA device: {torch.cuda.get_device_name(0)}") + return device + +from torch import nn +from torch.utils.data import DataLoader, TensorDataset + +FEATURE_COLUMNS = [ + "router_id", "group_id", "output_port", "output_chan", "to_terminal", "is_global", + "packet_size", "chunk_size", "output_vc_occupancy", "output_queued_count", + "next_output_available_delta", "nominal_router_delay", +] +TARGET_COLUMNS = ["router_queueing_delay"] +ALL_COLUMNS = FEATURE_COLUMNS + TARGET_COLUMNS + [ + "actual_router_delay", "is_surrogate_on", "is_predicted", "now", "packet_id", + "src_terminal", "dest_terminal", "next_stop" +] + +class MLP(nn.Module): + def __init__(self, in_dim: int, out_dim: int, hidden: int): + super().__init__() + self.net = nn.Sequential( + nn.Linear(in_dim, hidden), nn.ReLU(), + nn.Linear(hidden, hidden), nn.ReLU(), + nn.Linear(hidden, out_dim), + ) + def forward(self, x): + return self.net(x.float()) + +def _filter_router_rows(df: pd.DataFrame) -> pd.DataFrame: + df = df[(df["is_surrogate_on"] == 0) & (df["is_predicted"] == 0)] + df = df.replace([np.inf, -np.inf], np.nan).dropna(subset=FEATURE_COLUMNS + TARGET_COLUMNS) + df = df[df["router_queueing_delay"] >= 0.0] + return df + + +def read_trace(path: str, max_rows: int, seed: int, chunksize: int) -> pd.DataFrame: + """Read router timing traces without materializing the full trace. + + If max_rows > 0, maintain a bounded random sample while streaming chunks. + This still trains on real high-fidelity rows, but avoids loading a 40+ GB + CSV trace into memory at once. + """ + p = Path(path) + files = sorted(glob.glob(str(p / "router-timing-gid=*.txt"))) if p.is_dir() else [str(p)] + if not files: + raise SystemExit(f"No router timing trace files found under {path}") + + rng = np.random.default_rng(seed) + kept = [] + total_seen = 0 + total_usable = 0 + + for f in files: + print(f"reading {f}", flush=True) + reader = pd.read_csv( + f, + comment="#", + names=ALL_COLUMNS, + chunksize=chunksize, + ) + + for chunk in reader: + total_seen += len(chunk) + chunk = _filter_router_rows(chunk) + if chunk.empty: + continue + + total_usable += len(chunk) + + if max_rows <= 0: + kept.append(chunk) + continue + + kept.append(chunk) + df = pd.concat(kept, ignore_index=True) + + if len(df) > max_rows: + # Randomly keep max_rows rows from all usable rows seen so far. + # Use a fresh integer seed from rng so repeated chunks do not + # produce identical samples. + sample_seed = int(rng.integers(0, 2**31 - 1)) + df = df.sample(n=max_rows, random_state=sample_seed).reset_index(drop=True) + + kept = [df] + + if not kept: + raise SystemExit("No usable high-fidelity router timing rows after filtering") + + df = pd.concat(kept, ignore_index=True) + + if max_rows > 0 and len(df) > max_rows: + df = df.sample(n=max_rows, random_state=seed).reset_index(drop=True) + + print(f"router trace rows seen: {total_seen}", flush=True) + print(f"usable high-fidelity rows seen: {total_usable}", flush=True) + print(f"training rows kept: {len(df)}", flush=True) + + if df.empty: + raise SystemExit("No usable high-fidelity router timing rows after filtering") + + return df + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--trace", required=True, help="Trace file or directory from save_router_timing_trace_path") + ap.add_argument("--out", required=True, help="Output TorchScript .pt path") + ap.add_argument("--epochs", type=int, default=30) + ap.add_argument("--batch-size", type=int, default=1024) + ap.add_argument("--hidden", type=int, default=128) + ap.add_argument("--lr", type=float, default=1e-3) + ap.add_argument("--device", default="auto", choices=["auto", "cpu", "cuda"], + help="Training device. auto uses CUDA if available, otherwise CPU.") + ap.add_argument("--max-rows", type=int, default=2000000, + help="Maximum usable router rows to keep. Use <=0 to load all rows.") + ap.add_argument("--read-chunksize", type=int, default=1000000, + help="CSV rows per pandas chunk while streaming router traces.") + ap.add_argument("--seed", type=int, default=0) + args = ap.parse_args() + device = resolve_device(args.device) + + df = read_trace( + args.trace, + max_rows=args.max_rows, + seed=args.seed, + chunksize=args.read_chunksize, + ) + x = torch.tensor(df[FEATURE_COLUMNS].to_numpy(dtype=np.float32), device=device) + y = torch.tensor(df[TARGET_COLUMNS].to_numpy(dtype=np.float32), device=device) + + x_mu, x_sigma = x.mean(0), x.std(0).clamp_min(1e-6) + y_mu, y_sigma = y.mean(0), y.std(0).clamp_min(1e-6) + x_n = (x - x_mu) / x_sigma + y_n = (y - y_mu) / y_sigma + + model = MLP(len(FEATURE_COLUMNS), len(TARGET_COLUMNS), args.hidden).to(device) + opt = torch.optim.AdamW(model.parameters(), lr=args.lr) + loss_fn = nn.SmoothL1Loss() + loader = DataLoader(TensorDataset(x_n, y_n), batch_size=args.batch_size, shuffle=True) + + model.train() + for epoch in range(args.epochs): + losses = [] + for xb, yb in loader: + opt.zero_grad(set_to_none=True) + loss = loss_fn(model(xb), yb) + loss.backward() + opt.step() + losses.append(float(loss.detach())) + print(f"epoch={epoch+1} loss={np.mean(losses):.6g}") + + class NormalizedWrapper(nn.Module): + def __init__(self, base, x_mu, x_sigma, y_mu, y_sigma): + super().__init__() + self.base = base.eval() + self.register_buffer("x_mu", x_mu) + self.register_buffer("x_sigma", x_sigma) + self.register_buffer("y_mu", y_mu) + self.register_buffer("y_sigma", y_sigma) + def forward(self, x): + y = self.base((x.float() - self.x_mu) / self.x_sigma) + return y * self.y_sigma + self.y_mu + + # Export a CPU TorchScript module. Training may happen on CUDA, but the + # C++ surrogate path should be able to load and run this model on CPU. + model = model.to("cpu").eval() + x_mu = x_mu.detach().to("cpu") + x_sigma = x_sigma.detach().to("cpu") + y_mu = y_mu.detach().to("cpu") + y_sigma = y_sigma.detach().to("cpu") + + wrapped = NormalizedWrapper(model, x_mu, x_sigma, y_mu, y_sigma).to("cpu").eval() + example = torch.zeros(1, len(FEATURE_COLUMNS), dtype=torch.float32) + + with torch.no_grad(): + traced = torch.jit.trace(wrapped, example) + Path(args.out).parent.mkdir(parents=True, exist_ok=True) + traced.save(args.out) + print(f"saved {args.out}") + +if __name__ == "__main__": + main() diff --git a/src/surrogate/ml_models/director/combine_packet_latency_traces_across_ranks.py b/src/surrogate/ml_models/director/combine_packet_latency_traces_across_ranks.py new file mode 100644 index 00000000..c141abd6 --- /dev/null +++ b/src/surrogate/ml_models/director/combine_packet_latency_traces_across_ranks.py @@ -0,0 +1,58 @@ +from pathlib import Path +import pandas as pd + +trace_dir = Path("packet-latency-trace") +out = Path("packet_latency_train.csv") + +cols = [ + "src_terminal", + "dest_terminal", + "packet_id", + "is_surrogate_on", + "is_predicted", + "size", + "workload_injection", + "next_packet_delay", + "start", + "end", + "latency", + "is_there_another_pckt_in_queue", +] + +files = sorted(trace_dir.glob("packets-delay-gid=*.txt")) +if not files: + raise SystemExit(f"No packet trace files found under {trace_dir}") + +dfs = [] +for f in files: + df = pd.read_csv(f, comment="#", header=None, names=cols) + df["rank_file"] = f.name + dfs.append(df) + +df = pd.concat(dfs, ignore_index=True) + +# For training the current torch-jit predictor, use only real PDES packets. +df = df[ + (df["is_surrogate_on"] == 0) & + (df["is_predicted"] == 0) & + (df["end"] >= 0) & + (df["latency"] > 0) +].copy() + +# Current torch-jit model predicts [latency, next_packet_delay]. +# Drop rows where next_packet_delay is unavailable. +df = df[df["next_packet_delay"] >= 0].copy() + +df.to_csv(out, index=False) + +print(f"Wrote {out}") +print(f"rows={len(df)}") + +print(df[[ + "src_terminal", + "dest_terminal", + "size", + "is_there_another_pckt_in_queue", + "latency", + "next_packet_delay", +]].describe()) diff --git a/src/surrogate/ml_models/director/lp_aware_packet_latency_model.py b/src/surrogate/ml_models/director/lp_aware_packet_latency_model.py new file mode 100644 index 00000000..b67c1463 --- /dev/null +++ b/src/surrogate/ml_models/director/lp_aware_packet_latency_model.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +""" +LP-aware TorchScript packet-latency model. + +This is intentionally separate from the existing Torch-JIT model/exporter. +Use it only with: + + torch_jit_mode = lp-aware-single-static-model + +C++ feature order: + 0 src_terminal + 1 dfdally_dest_terminal_id + 2 packet_size + 3 is_there_another_pckt_in_queue + 4 caller_lp_gid + 5 src_router_id + 6 src_group_id + 7 dst_router_id + 8 dst_group_id + 9 terminal_queue_length + 10 terminal_vc_occupancy + 11 processing_packet_delay +""" + +from __future__ import annotations + +import argparse +from pathlib import Path + +import torch +import torch.nn as nn + + +LP_AWARE_PACKET_FEATURES = [ + "src_terminal", + "dfdally_dest_terminal_id", + "packet_size", + "is_there_another_pckt_in_queue", + "caller_lp_gid", + "src_router_id", + "src_group_id", + "dst_router_id", + "dst_group_id", + "terminal_queue_length", + "terminal_vc_occupancy", + "processing_packet_delay", +] + +TORCH_PACKET_FEATURE_COUNT = len(LP_AWARE_PACKET_FEATURES) + + +class LPAwarePacketLatencyModel(nn.Module): + def __init__(self, hidden_dim: int = 64): + super().__init__() + self.net = nn.Sequential( + nn.Linear(TORCH_PACKET_FEATURE_COUNT, hidden_dim), + nn.ReLU(), + nn.Linear(hidden_dim, hidden_dim), + nn.ReLU(), + nn.Linear(hidden_dim, 2), + nn.Softplus(), + ) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + # C++ passes float32 [batch, 12]. + x = x.to(torch.float32) + return self.net(x) + + +def export_torchscript(output_path: Path, hidden_dim: int) -> None: + model = LPAwarePacketLatencyModel(hidden_dim=hidden_dim) + model.eval() + + example = torch.zeros((1, TORCH_PACKET_FEATURE_COUNT), dtype=torch.float32) + + with torch.no_grad(): + traced = torch.jit.trace(model, example) + + output_path.parent.mkdir(parents=True, exist_ok=True) + traced.save(str(output_path)) + print(f"saved LP-aware TorchScript model to {output_path}") + print(f"input shape: [batch, {TORCH_PACKET_FEATURE_COUNT}], dtype=float32") + print("output shape: [batch, 2]") + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument( + "--output", + type=Path, + default=Path("lp_aware_packet_latency_model.pt"), + help="Path to write the TorchScript model.", + ) + parser.add_argument( + "--hidden-dim", + type=int, + default=64, + help="Hidden dimension for the prototype MLP.", + ) + args = parser.parse_args() + + export_torchscript(args.output, args.hidden_dim) + + +if __name__ == "__main__": + main() diff --git a/src/surrogate/ml_models/director/train_packet_latency_torchjit.py b/src/surrogate/ml_models/director/train_packet_latency_torchjit.py new file mode 100644 index 00000000..bd20713f --- /dev/null +++ b/src/surrogate/ml_models/director/train_packet_latency_torchjit.py @@ -0,0 +1,258 @@ +from pathlib import Path +import argparse +import math +import random + +import pandas as pd +import torch +from torch import nn +from torch.utils.data import DataLoader, TensorDataset, random_split + + + +SEED = 1234 +BATCH_SIZE = 4096 +EPOCHS = 50 +LR = 1e-3 +WEIGHT_DECAY = 1e-4 + +random.seed(SEED) +torch.manual_seed(SEED) + + +class PacketLatencyModel(nn.Module): + """ + TorchScript-compatible model for CODES torch-jit.C. + + C++ passes a LongTensor shaped [1, 4]: + + src_terminal + dest_terminal + packet_size + is_there_another_pckt_in_queue + + This model returns a FloatTensor shaped [1, 2]: + + latency + next_packet_delay + """ + + def __init__(self, n_src: int, n_dst: int, max_size: float, y_mean: torch.Tensor, y_std: torch.Tensor): + super().__init__() + + emb_dim = 16 + hidden = 96 + + self.src_emb = nn.Embedding(n_src, emb_dim) + self.dst_emb = nn.Embedding(n_dst, emb_dim) + + self.register_buffer("max_size", torch.tensor(float(max_size), dtype=torch.float32)) + self.register_buffer("y_mean", y_mean.float()) + self.register_buffer("y_std", y_std.float()) + + self.net = nn.Sequential( + nn.Linear(emb_dim * 2 + 2, hidden), + nn.ReLU(), + nn.Linear(hidden, hidden), + nn.ReLU(), + nn.Linear(hidden, 2), + ) + + self.softplus = nn.Softplus() + + def forward(self, x: torch.Tensor) -> torch.Tensor: + src = x[:, 0].long() + dst = x[:, 1].long() + + # Clamp defensively so an unseen id does not crash inference. + src = torch.clamp(src, 0, self.src_emb.num_embeddings - 1) + dst = torch.clamp(dst, 0, self.dst_emb.num_embeddings - 1) + + size = x[:, 2].float().unsqueeze(1) + has_next = x[:, 3].float().unsqueeze(1) + + size_norm = size / torch.clamp(self.max_size, min=1.0) + z = torch.cat( + [ + self.src_emb(src), + self.dst_emb(dst), + size_norm, + has_next, + ], + dim=1, + ) + + # Train in standardized target space, then unscale inside the scripted model. + pred_scaled = self.net(z) + pred = pred_scaled * self.y_std + self.y_mean + + # CODES expects positive latency/delay. Softplus prevents negative predictions. + return self.softplus(pred) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Train a TorchScript packet-latency model for CODES torch-jit surrogate mode." + ) + parser.add_argument( + "--csv", + required=True, + type=Path, + help="Path to merged packet-latency training CSV.", + ) + parser.add_argument( + "--out", + required=True, + type=Path, + help="Output path for the saved TorchScript .pt model.", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + csv_path = args.csv + out_path = args.out + + if not csv_path.exists(): + raise SystemExit(f"Missing {csv_path}. Run the trace merge step first.") + + out_path.parent.mkdir(parents=True, exist_ok=True) + + df = pd.read_csv(csv_path) + + required = [ + "src_terminal", + "dest_terminal", + "size", + "is_there_another_pckt_in_queue", + "latency", + "next_packet_delay", + ] + missing = [c for c in required if c not in df.columns] + if missing: + raise SystemExit(f"CSV is missing columns: {missing}") + + df = df.dropna(subset=required).copy() + df = df[ + (df["latency"] > 0) + & (df["next_packet_delay"] >= 0) + & (df["src_terminal"] >= 0) + & (df["dest_terminal"] >= 0) + & (df["size"] > 0) + ].copy() + + if len(df) < 100: + raise SystemExit(f"Too few usable rows after filtering: {len(df)}") + + feature_cols = [ + "src_terminal", + "dest_terminal", + "size", + "is_there_another_pckt_in_queue", + ] + target_cols = [ + "latency", + "next_packet_delay", + ] + + X = torch.tensor(df[feature_cols].to_numpy(), dtype=torch.long) + y = torch.tensor(df[target_cols].to_numpy(), dtype=torch.float32) + + n_src = int(df["src_terminal"].max()) + 1 + n_dst = int(df["dest_terminal"].max()) + 1 + max_size = float(df["size"].max()) + + y_mean = y.mean(dim=0) + y_std = y.std(dim=0).clamp_min(1e-6) + y_scaled = (y - y_mean) / y_std + + dataset = TensorDataset(X, y_scaled) + + n_val = max(1, int(0.1 * len(dataset))) + n_train = len(dataset) - n_val + + generator = torch.Generator().manual_seed(SEED) + train_ds, val_ds = random_split(dataset, [n_train, n_val], generator=generator) + + train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True) + val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False) + + model = PacketLatencyModel( + n_src=n_src, + n_dst=n_dst, + max_size=max_size, + y_mean=y_mean, + y_std=y_std, + ) + + optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY) + loss_fn = nn.SmoothL1Loss() + + best_val = math.inf + best_state = None + + for epoch in range(EPOCHS): + model.train() + train_loss = 0.0 + + for xb, yb_scaled in train_loader: + pred = model(xb) + y_true = yb_scaled * y_std + y_mean + + loss = loss_fn(pred, y_true) + + optimizer.zero_grad(set_to_none=True) + loss.backward() + optimizer.step() + + train_loss += loss.item() * xb.size(0) + + train_loss /= n_train + + model.eval() + val_loss = 0.0 + with torch.no_grad(): + for xb, yb_scaled in val_loader: + pred = model(xb) + y_true = yb_scaled * y_std + y_mean + loss = loss_fn(pred, y_true) + val_loss += loss.item() * xb.size(0) + + val_loss /= n_val + + if val_loss < best_val: + best_val = val_loss + best_state = {k: v.detach().clone() for k, v in model.state_dict().items()} + + print(f"epoch={epoch:03d} train_loss={train_loss:.6f} val_loss={val_loss:.6f}") + + if best_state is not None: + model.load_state_dict(best_state) + + model.eval() + + # Exact shape/dtype sanity check for src/surrogate/packet-latency-predictor/torch-jit.C + dummy = torch.zeros((1, 4), dtype=torch.long) + with torch.no_grad(): + out = model(dummy) + + if tuple(out.shape) != (1, 2): + raise RuntimeError(f"Bad output shape: expected (1, 2), got {tuple(out.shape)}") + + if out.dtype != torch.float32: + raise RuntimeError(f"Bad output dtype: expected float32, got {out.dtype}") + + scripted = torch.jit.script(model) + scripted.save(str(out_path)) + + print() + print(f"Saved TorchScript model to: {out_path}") + print(f"rows used: {len(df)}") + print(f"n_src={n_src}, n_dst={n_dst}, max_size={max_size}") + print(f"best_val_loss={best_val:.6f}") + print(f"dummy_output={out.tolist()}") + + +if __name__ == "__main__": + main() diff --git a/src/surrogate/network-surrogate.c b/src/surrogate/network-surrogate.c index c2278583..ea119196 100644 --- a/src/surrogate/network-surrogate.c +++ b/src/surrogate/network-surrogate.c @@ -21,7 +21,6 @@ static double frozen_events_switch_time = 0.0; // Time when we switched to surr static struct lp_types_switch const * get_type_switch(char const * const name) { for (size_t i = 0; i < net_surr_config.n_lp_types; i++) { - //printf("THIS %s and %s\n", surr_config.lp_types[i].lpname, name); if (strcmp(net_surr_config.lp_types[i].lpname, name) == 0) { return &net_surr_config.lp_types[i]; } diff --git a/src/surrogate/packet-latency-predictor/average.c b/src/surrogate/packet-latency-predictor/average.c index 4b14aedb..2beeb5e7 100644 --- a/src/surrogate/packet-latency-predictor/average.c +++ b/src/surrogate/packet-latency-predictor/average.c @@ -3,6 +3,11 @@ double ignore_until = 0; static int num_terminals = 0; +static int average_predictor_debug_prints = 0; + +void average_latency_predictor_set_debug_prints(int enabled) { + average_predictor_debug_prints = enabled ? 1 : 0; +} // === Average packet latency functionality @@ -87,6 +92,23 @@ static struct packet_end predict_latency(struct latency_surrogate * data, tw_lp // TODO (Elkin): 10 is an arbitrary small value, but it should be nic_ts as implemented in `packet_getenerate` in dragonfly-dally double const next_packet_delay = data->aggregated_next_packet_delay.total_msgs == 0 ? 10 : data->aggregated_next_packet_delay.sum_latency / data->aggregated_next_packet_delay.total_msgs; + + static unsigned long long average_predict_calls = 0; + average_predict_calls++; + if (average_predictor_debug_prints && + (average_predict_calls <= 20 || average_predict_calls % 10000 == 0)) { + fprintf(stderr, + "[average predict debug] calls=%llu latency=%f next_packet_delay=%f " + "next_delay_points=%u total_latency_points=%u dest_latency_points=%u\n", + average_predict_calls, + latency, + next_packet_delay, + data->aggregated_next_packet_delay.total_msgs, + data->aggregated_latency_for_all.total_msgs, + data->aggregated_latency[dest_terminal].total_msgs); + fflush(stderr); + } + return (struct packet_end) { .travel_end_time = packet_dest->travel_start_time + latency, .next_packet_delay = next_packet_delay, diff --git a/src/surrogate/packet-latency-predictor/torch-jit.C b/src/surrogate/packet-latency-predictor/torch-jit.C index e2c1384c..acfbb187 100644 --- a/src/surrogate/packet-latency-predictor/torch-jit.C +++ b/src/surrogate/packet-latency-predictor/torch-jit.C @@ -6,115 +6,405 @@ #include #include #include +#include +#include #include #include +/* Backward-compatible global model used by the two original Torch-JIT modes. */ static torch::jit::Module packet_latency_model; +/* New optional models used only by torch_jit_mode="lp-aware-lp-type-models". */ +static torch::jit::Module terminal_packet_latency_model; +static torch::jit::Module default_packet_latency_model; +static torch::jit::Module router_timing_model; -inline void assert_correct_dims(at::Tensor * t) { - int const dims = t->ndimension(); +static bool torch_jit_lp_aware_mode = false; +static bool torch_jit_lp_type_model_mode = false; +static bool terminal_packet_latency_model_loaded = false; +static bool default_packet_latency_model_loaded = false; +static bool router_timing_model_loaded = false; +static bool torch_jit_debug_prints = false; - for (int i = 0; i < dims-1; i++) { - assert(at::size(*t, i) == 1); - } - assert(at::size(*t, dims - 1) == 2); +static uint64_t torch_jit_predict_calls = 0; +static uint64_t torch_jit_router_predict_calls = 0; +static double torch_jit_forward_total_sec = 0.0; +static double torch_jit_router_forward_total_sec = 0.0; +static double torch_jit_min_travel_delta = 1.0e300; +static double torch_jit_max_travel_delta = -1.0e300; +static double torch_jit_min_next_delay = 1.0e300; +static double torch_jit_max_next_delay = -1.0e300; +static double torch_jit_min_router_queueing_delay = 1.0e300; +static double torch_jit_max_router_queueing_delay = -1.0e300; + +static constexpr int TORCH_JIT_LEGACY_FEATURE_COUNT = 4; +static constexpr int TORCH_JIT_LP_AWARE_FEATURE_COUNT = 12; +static constexpr int TORCH_JIT_ROUTER_TIMING_FEATURE_COUNT = 12; + +void surrogate_torch_set_lp_aware_mode(bool enabled) { + torch_jit_lp_aware_mode = enabled; +} + +void surrogate_torch_set_debug_prints(bool enabled) { + torch_jit_debug_prints = enabled; +} + +static bool has_path(char const *path) { + return path != nullptr && path[0] != '\0'; } +static void load_module_or_die(torch::jit::Module *module, char const *path, char const *label) { + if (!has_path(path)) { + tw_error(TW_LOC, "Missing Torch-JIT %s path", label); + } -void surrogate_torch_init(char const * dir) { - std::cout << "Loading Torch-JIT model\n"; try { - // Deserialize the ScriptModule from a file - packet_latency_model = torch::jit::load(dir); + *module = torch::jit::load(path); + } catch (const c10::Error& e) { + tw_error(TW_LOC, "Error loading Torch-JIT %s model from `%s`", label, path); } - catch (const c10::Error& e) { - tw_error(TW_LOC, "Error loading Torch-JIT model"); + + if (module->is_training()) { + std::cerr << "The Torch-JIT " << label + << " model was saved before running .eval(); inference will use training-mode behavior." + << std::endl; } +} - // Configuring to run on a single thread - at::set_num_threads(1); +static void validate_output_dims(at::Tensor const &output, int expected_last_dim, char const *label) { + int const dims = output.ndimension(); + if (dims < 1) { + tw_error(TW_LOC, "Torch-JIT %s model returned a scalar; expected [1,%d]", label, expected_last_dim); + } + for (int i = 0; i < dims - 1; i++) { + if (at::size(output, i) != 1) { + tw_error(TW_LOC, "Torch-JIT %s model returned unexpected dim %d size %lld; expected 1", + label, i, (long long)at::size(output, i)); + } + } + if (at::size(output, dims - 1) != expected_last_dim) { + tw_error(TW_LOC, "Torch-JIT %s model returned last dim %lld; expected %d", + label, (long long)at::size(output, dims - 1), expected_last_dim); + } +} - // === Checking consistency of model with dummy input - if (packet_latency_model.is_training()) { - std::cerr << "The Torch-JIT model was saved before running .eval(). " - "The output from the model will be as if it was in training mode, " - "meaning, it might be faulty." - << std::endl; +static void validate_packet_latency_model(torch::jit::Module *model, int feature_count, at::ScalarType dtype, char const *label) { + std::vector inputs; + torch::NoGradGuard no_grad; + + if (dtype == at::kFloat) { + std::vector data_input(feature_count, 0.0f); + inputs.emplace_back(torch::from_blob(data_input.data(), {1, feature_count}, at::kFloat).clone()); + } else if (dtype == at::kLong) { + std::vector data_input(feature_count, 0); + inputs.emplace_back(torch::from_blob(data_input.data(), {1, feature_count}, at::kLong).clone()); + } else { + tw_error(TW_LOC, "Unsupported Torch-JIT validation dtype for %s", label); } - long int data_input[] = {0, 0, 0, 0}; - size_t const n_input = sizeof(data_input) / sizeof(long int); + at::Tensor output = model->forward(inputs).toTensor(); + validate_output_dims(output, 2, label); +} +static void validate_router_timing_model(torch::jit::Module *model, char const *label) { std::vector inputs; torch::NoGradGuard no_grad; - inputs.emplace_back(torch::from_blob(data_input, {1, (int) n_input}, at::kLong)); + std::vector data_input(TORCH_JIT_ROUTER_TIMING_FEATURE_COUNT, 0.0f); + inputs.emplace_back(torch::from_blob(data_input.data(), + {1, TORCH_JIT_ROUTER_TIMING_FEATURE_COUNT}, + at::kFloat).clone()); + at::Tensor output = model->forward(inputs).toTensor(); + validate_output_dims(output, 1, label); +} - // Predicting value - at::Tensor output = packet_latency_model.forward(inputs).toTensor(); - assert_correct_dims(&output); - // === End of check - std::cout << "Torch-JIT model loaded successfully\n"; +static std::vector build_lp_aware_packet_features( + tw_lp *lp, + unsigned int src_terminal, + struct packet_start const *packet_dest) +{ + assert(packet_dest != nullptr); + + return { + (float)src_terminal, + (float)packet_dest->dfdally_dest_terminal_id, + (float)packet_dest->packet_size, + packet_dest->is_there_another_pckt_in_queue ? 1.0f : 0.0f, + lp ? (float)lp->gid : -1.0f, + (float)packet_dest->src_router_id, + (float)packet_dest->src_group_id, + (float)packet_dest->dst_router_id, + (float)packet_dest->dst_group_id, + (float)packet_dest->terminal_queue_length, + (float)packet_dest->terminal_vc_occupancy, + (float)packet_dest->processing_packet_delay + }; } +void surrogate_torch_init(char const *dir) { + std::cout << "Loading Torch-JIT packet-latency model\n"; -static struct packet_end surrogate_torch_predict(void *, tw_lp * lp, unsigned int src_terminal, struct packet_start const * packet_dest) { - //auto t_start = std::chrono::high_resolution_clock::now(); + torch_jit_lp_type_model_mode = false; + terminal_packet_latency_model_loaded = false; + default_packet_latency_model_loaded = false; + router_timing_model_loaded = false; - // Create a vector of inputs. - long int data_input[] = { - src_terminal, - packet_dest->dfdally_dest_terminal_id, - packet_dest->packet_size, - packet_dest->is_there_another_pckt_in_queue - }; - size_t n_input = sizeof(data_input) / sizeof(long int); + load_module_or_die(&packet_latency_model, dir, "packet-latency"); + + at::set_num_threads(1); + + if (torch_jit_lp_aware_mode) { + validate_packet_latency_model(&packet_latency_model, + TORCH_JIT_LP_AWARE_FEATURE_COUNT, + at::kFloat, + "lp-aware packet-latency"); + } else { + validate_packet_latency_model(&packet_latency_model, + TORCH_JIT_LEGACY_FEATURE_COUNT, + at::kLong, + "legacy packet-latency"); + } + + std::cout << "Torch-JIT packet-latency model loaded successfully\n"; +} + +void surrogate_torch_init_lp_type_models( + char const *terminal_model_path, + char const *router_timing_model_path, + char const *default_model_path) +{ + std::cout << "Loading Torch-JIT LP-type-aware models\n"; + + torch_jit_lp_aware_mode = true; + torch_jit_lp_type_model_mode = true; + terminal_packet_latency_model_loaded = false; + default_packet_latency_model_loaded = false; + router_timing_model_loaded = false; + + at::set_num_threads(1); + + if (has_path(terminal_model_path)) { + load_module_or_die(&terminal_packet_latency_model, terminal_model_path, "terminal packet-latency"); + validate_packet_latency_model(&terminal_packet_latency_model, + TORCH_JIT_LP_AWARE_FEATURE_COUNT, + at::kFloat, + "terminal packet-latency"); + terminal_packet_latency_model_loaded = true; + } + + if (has_path(default_model_path)) { + load_module_or_die(&default_packet_latency_model, default_model_path, "default packet-latency"); + validate_packet_latency_model(&default_packet_latency_model, + TORCH_JIT_LP_AWARE_FEATURE_COUNT, + at::kFloat, + "default packet-latency"); + default_packet_latency_model_loaded = true; + } + + if (!terminal_packet_latency_model_loaded && !default_packet_latency_model_loaded) { + tw_error(TW_LOC, "torch_jit_mode=lp-aware-lp-type-models requires torch_jit_terminal_model_path or torch_jit_default_model_path"); + } + if (has_path(router_timing_model_path)) { + load_module_or_die(&router_timing_model, router_timing_model_path, "router timing"); + validate_router_timing_model(&router_timing_model, "router timing"); + router_timing_model_loaded = true; + } else { + tw_warning(TW_LOC, "No torch_jit_router_timing_model_path configured; router timing inference disabled."); + } + + std::cout << "Torch-JIT LP-type-aware models loaded successfully\n"; +} + +bool surrogate_torch_router_timing_model_enabled(void) { + return torch_jit_lp_type_model_mode && router_timing_model_loaded; +} + +static struct packet_end surrogate_torch_predict(void *, tw_lp *lp, unsigned int src_terminal, struct packet_start const *packet_dest) { std::vector inputs; - inputs.emplace_back(torch::from_blob(data_input, {1, (int) n_input}, at::kLong)); + torch::jit::Module *model = &packet_latency_model; + + if (torch_jit_lp_type_model_mode) { + model = terminal_packet_latency_model_loaded ? &terminal_packet_latency_model : &default_packet_latency_model; + } - at::Tensor output = packet_latency_model.forward(inputs).toTensor(); - //assert_correct_dims(&output); + if (torch_jit_lp_aware_mode || torch_jit_lp_type_model_mode) { + std::vector data_input = build_lp_aware_packet_features(lp, src_terminal, packet_dest); + if (torch_jit_debug_prints && torch_jit_predict_calls < 20) { + fprintf(stderr, + "[torch-jit feature debug] " + "src_terminal=%g dest_terminal=%g packet_size=%g another=%g " + "caller_lp_gid=%g src_router_id=%g src_group_id=%g " + "dst_router_id=%g dst_group_id=%g terminal_queue_length=%g " + "terminal_vc_occupancy=%g processing_packet_delay=%g\n", + (double)data_input[0], (double)data_input[1], (double)data_input[2], + (double)data_input[3], (double)data_input[4], (double)data_input[5], + (double)data_input[6], (double)data_input[7], (double)data_input[8], + (double)data_input[9], (double)data_input[10], (double)data_input[11]); + fflush(stderr); + } + + assert((int)data_input.size() == TORCH_JIT_LP_AWARE_FEATURE_COUNT); + inputs.emplace_back(torch::from_blob(data_input.data(), + {1, TORCH_JIT_LP_AWARE_FEATURE_COUNT}, + at::kFloat).clone()); + } else { + long int data_input[] = { + (long int)src_terminal, + (long int)packet_dest->dfdally_dest_terminal_id, + (long int)packet_dest->packet_size, + packet_dest->is_there_another_pckt_in_queue ? 1L : 0L + }; + inputs.emplace_back(torch::from_blob(data_input, + {1, TORCH_JIT_LEGACY_FEATURE_COUNT}, + at::kLong).clone()); + } + + torch::NoGradGuard no_grad; + auto const torch_jit_t0 = std::chrono::high_resolution_clock::now(); + at::Tensor output = model->forward(inputs).toTensor(); + auto const torch_jit_t1 = std::chrono::high_resolution_clock::now(); + validate_output_dims(output, 2, "packet-latency inference"); + + torch_jit_forward_total_sec += std::chrono::duration(torch_jit_t1 - torch_jit_t0).count(); + torch_jit_predict_calls++; + + output = output.to(at::kFloat).contiguous(); auto *out_data = output.data_ptr(); + double const raw_travel_delta = (double)out_data[0]; + double const raw_next_delay = (double)out_data[1]; + + double const min_travel_delta = 10.0; + double const min_next_packet_delay = 10.0; + + double const predicted_travel_delta = + std::isfinite(raw_travel_delta) && raw_travel_delta > min_travel_delta + ? raw_travel_delta + : min_travel_delta; + + double const predicted_next_packet_delay = + std::isfinite(raw_next_delay) && raw_next_delay > min_next_packet_delay + ? raw_next_delay + : min_next_packet_delay; + + if (raw_travel_delta < torch_jit_min_travel_delta) torch_jit_min_travel_delta = raw_travel_delta; + if (raw_travel_delta > torch_jit_max_travel_delta) torch_jit_max_travel_delta = raw_travel_delta; + if (raw_next_delay < torch_jit_min_next_delay) torch_jit_min_next_delay = raw_next_delay; + if (raw_next_delay > torch_jit_max_next_delay) torch_jit_max_next_delay = raw_next_delay; + + if (torch_jit_debug_prints && + (torch_jit_predict_calls <= 20 || torch_jit_predict_calls % 10000 == 0)) { + fprintf(stderr, + "[torch-jit predict debug] calls=%llu avg_forward_us=%g " + "raw_travel_delta=%g raw_next_delay=%g " + "effective_travel_delta=%g effective_next_delay=%g " + "minmax_travel=[%g,%g] minmax_next=[%g,%g]\n", + (unsigned long long)torch_jit_predict_calls, + 1.0e6 * torch_jit_forward_total_sec / (double)torch_jit_predict_calls, + raw_travel_delta, raw_next_delay, + predicted_travel_delta, predicted_next_packet_delay, + torch_jit_min_travel_delta, torch_jit_max_travel_delta, + torch_jit_min_next_delay, torch_jit_max_next_delay); + fflush(stderr); + } + return (struct packet_end) { - .travel_end_time = packet_dest->travel_start_time + (out_data[0] > 0 ? out_data[0] : 10), - .next_packet_delay = out_data[1] > 0 ? out_data[1] : 200, + .travel_end_time = packet_dest->travel_start_time + predicted_travel_delta, + .next_packet_delay = predicted_next_packet_delay, }; - - //auto t_end = std::chrono::high_resolution_clock::now(); - //double total = std::chrono::duration(t_end-t_start).count(); } +double surrogate_torch_predict_router_queueing_delay( + struct router_timing_prediction_start const *start, + double fallback_queueing_delay) +{ + if (!surrogate_torch_router_timing_model_enabled() || start == nullptr) { + return fallback_queueing_delay; + } -// Dummies to use when no actual data is fed -static void init_pred_dummy(void * data, tw_lp * lp, unsigned int src_terminal) { - (void) data; - (void) lp; - (void) src_terminal; -} + std::vector data_input = { + start->router_id, + start->group_id, + start->output_port, + start->output_chan, + start->to_terminal, + start->is_global, + start->packet_size, + start->chunk_size, + start->output_vc_occupancy, + start->output_queued_count, + start->next_output_available_delta, + start->nominal_router_delay + }; + assert((int)data_input.size() == TORCH_JIT_ROUTER_TIMING_FEATURE_COUNT); + + std::vector inputs; + inputs.emplace_back(torch::from_blob(data_input.data(), + {1, TORCH_JIT_ROUTER_TIMING_FEATURE_COUNT}, + at::kFloat).clone()); + torch::NoGradGuard no_grad; + auto const torch_jit_t0 = std::chrono::high_resolution_clock::now(); + at::Tensor output = router_timing_model.forward(inputs).toTensor(); + auto const torch_jit_t1 = std::chrono::high_resolution_clock::now(); + validate_output_dims(output, 1, "router timing inference"); + + torch_jit_router_forward_total_sec += std::chrono::duration(torch_jit_t1 - torch_jit_t0).count(); + torch_jit_router_predict_calls++; + + output = output.to(at::kFloat).contiguous(); + double const raw_queueing_delay = (double)output.data_ptr()[0]; + double const predicted_queueing_delay = + std::isfinite(raw_queueing_delay) && raw_queueing_delay >= 0.0 + ? raw_queueing_delay + : fallback_queueing_delay; -static void feed_pred_dummy(struct latency_surrogate * data, tw_lp * lp, unsigned int src_terminal, struct packet_start const * start, struct packet_end const * end) { - (void) data; - (void) lp; - (void) src_terminal; - (void) start; - (void) end; + if (raw_queueing_delay < torch_jit_min_router_queueing_delay) torch_jit_min_router_queueing_delay = raw_queueing_delay; + if (raw_queueing_delay > torch_jit_max_router_queueing_delay) torch_jit_max_router_queueing_delay = raw_queueing_delay; + + if (torch_jit_debug_prints && + (torch_jit_router_predict_calls <= 20 || torch_jit_router_predict_calls % 10000 == 0)) { + fprintf(stderr, + "[torch-jit router timing debug] calls=%llu avg_forward_us=%g " + "raw_queueing_delay=%g effective_queueing_delay=%g minmax_queueing=[%g,%g]\n", + (unsigned long long)torch_jit_router_predict_calls, + 1.0e6 * torch_jit_router_forward_total_sec / (double)torch_jit_router_predict_calls, + raw_queueing_delay, + predicted_queueing_delay, + torch_jit_min_router_queueing_delay, + torch_jit_max_router_queueing_delay); + fflush(stderr); + } + + return predicted_queueing_delay; } +// Dummies to use when no actual data is fed +static void init_pred_dummy(void *data, tw_lp *lp, unsigned int src_terminal) { + (void)data; + (void)lp; + (void)src_terminal; +} -static void predict_latency_rc_dummy(struct latency_surrogate * data, tw_lp * lp) { - (void) data; - (void) lp; +static void feed_pred_dummy(struct latency_surrogate *data, tw_lp *lp, unsigned int src_terminal, + struct packet_start const *start, struct packet_end const *end) { + (void)data; + (void)lp; + (void)src_terminal; + (void)start; + (void)end; } +static void predict_latency_rc_dummy(struct latency_surrogate *data, tw_lp *lp) { + (void)data; + (void)lp; +} struct packet_latency_predictor torch_latency_predictor = { - .init = (init_pred_f) init_pred_dummy, - .feed = (feed_pred_f) feed_pred_dummy, - .predict = (predict_pred_f) surrogate_torch_predict, - .predict_rc = (predict_pred_rc_f) predict_latency_rc_dummy, + .init = (init_pred_lat_f) init_pred_dummy, + .feed = (feed_pred_lat_f) feed_pred_dummy, + .predict = (predict_pred_lat_f) surrogate_torch_predict, + .predict_rc = (predict_pred_lat_rc_f) predict_latency_rc_dummy, .predictor_data_sz = 0 }; diff --git a/src/surrogate/zmqml/Makefile b/src/surrogate/zmqml/Makefile index 4c28ed54..b4abcfab 100644 --- a/src/surrogate/zmqml/Makefile +++ b/src/surrogate/zmqml/Makefile @@ -7,7 +7,7 @@ TARGETS=libzmqmlrequester.so demozmqmlrequester all: $(TARGETS) libzmqmlrequester.so: zmqmlrequester.o - $(CXX) -shared -o $@ $^ + $(CXX) -shared -o $@ $^ $(LDFLAGS) zmqmlrequester.o: zmqmlrequester.cpp zmqmlrequester.h $(CXX) $(CXXFLAGS) -fPIC -c $< -o $@ diff --git a/src/surrogate/zmqml/zmqmlrequester.cpp b/src/surrogate/zmqml/zmqmlrequester.cpp index 6f43758c..a8ebbd53 100644 --- a/src/surrogate/zmqml/zmqmlrequester.cpp +++ b/src/surrogate/zmqml/zmqmlrequester.cpp @@ -20,6 +20,7 @@ using namespace rapidjson; static string endpoint = "tcp://localhost:5555"; static int debug = 0; + /** * See zmqmlrequester.h */ @@ -82,6 +83,10 @@ vector zmqml_request(const string& cmd, if (response.HasMember("id")) { ret.push_back(response["id"].GetString()); } + + if (response.HasMember("predictions")) { + ret.push_back(response["predictions"].GetString()); + } } else { ret.push_back("failed"); } diff --git a/src/surrogate/zmqml/zmqmlserver.py b/src/surrogate/zmqml/zmqmlserver.py index 066b0512..4e8105ea 100755 --- a/src/surrogate/zmqml/zmqmlserver.py +++ b/src/surrogate/zmqml/zmqmlserver.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # zmqmlserver : ZeroMQ-based ML task dispatching server @@ -13,6 +13,7 @@ import time from itertools import count # generate unit id # from dataclasses import dataclass +import numpy as np # TODO: abstract a mechanism to call training from runmlpacketdelay import run_mlpacketdelay_training @@ -24,13 +25,15 @@ endpoint = "tcp://*:5555" debug = False - +debug2 = False # # # launch_id = count(start=1) # unique for launched thread launched_threads = {} # id:obj. keep track of active threads. remove the thread once it finished +training_records = {} # client_id:[] + class LaunchCMD: def __init__(self): # thread event @@ -153,6 +156,61 @@ def receivedata(args, bindata): elapsed_time = time.time() - st return (status, elapsed_time) +# +# receive training records +# +def receiverecords(args, bindata): + status = "failed" + st = time.time() + + num_args = int(args[0]) # 1st arg is num of args + client = int(args[1]) # 2nd arg is client id + num_records = int(args[2]) # 3rd arg is num records + records_str = str(bindata.decode('utf-8')) + records_str = records_str.strip() + records = list(records_str.split(" ")) + + if client not in training_records: + training_records[client] = [] + + training_records[client].extend([float(s) for s in records]) + + if (debug2) and (client == 51): + print(f"Training records[51] :{training_records[client]}") + + status = "done" + elapsed_time = time.time() - st + return (status, elapsed_time) + + +# +# do inference to get predictions +# +def launch_surrogate_inferencing(args, bindata): + status = "failed" + st = time.time() + + num_args = int(args[0]) # 1st arg is num of args + client = int(args[1]) # 2nd arg is client id + num_steps = int(args[2]) # 3rd arg is num steps to predict + records_str = str(bindata.decode('utf-8')) + records_str = records_str.strip() + records = list(records_str.split(" ")) + + input_records = [float(s) for s in records] + + inferences = [] + for i in range(num_steps): + inferences.append(2000000.0) + + inferences_str = ' '.join([str(f) for f in inferences]) + + status = "done" + elapsed_time = time.time() - st + return (status, elapsed_time, inferences_str) + + +# # # main listener loop @@ -192,6 +250,12 @@ def zmq_cmd_listener(): destfn = args[0] (status, et) = receivedata(args, bindata) retmsg = {"status":status, "et":str(et)} + elif cmd == "send-records": + (status, et) = receiverecords(args, bindata) + retmsg = {"status":status, "et":str(et)} + elif cmd == "do-inference": + (status, et, predictions) = launch_surrogate_inferencing(args, bindata) + retmsg = {"status":status, "et":str(et), "predictions": predictions} # send response back to the requester socket.send_json(retmsg)