diff --git a/.gitignore b/.gitignore index b023d553..746bcbe3 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,8 @@ /maint/codes.pc /test-driver .deps +src/surrogate/zmqml/demozmqmlrequester +src/surrogate/zmqml/libzmqmlrequester.so # make generated artifacts .dirstamp diff --git a/CODES-compile-instructions.sh b/CODES-compile-instructions.sh index 14178c99..9eb00143 100644 --- a/CODES-compile-instructions.sh +++ b/CODES-compile-instructions.sh @@ -1,9 +1,11 @@ -#!/usr/bin bash -x +#!/usr/bin/env bash +set -euo pipefail +set -x # Switches -swm_enable=1 -union_enable=1 -torch_enable=0 +swm_enable=0 +union_enable=0 +torch_enable=1 # Uncomment below for MPICH #export PATH=/usr/local/mpich-4.1.2/bin/:"$PATH" @@ -21,8 +23,17 @@ CUR_DIR="$PWD" ##### Downloading everything ##### -git clone https://github.com/codes-org/codes --depth=100 --branch=v1.5.0 -git clone https://github.com/ross-org/ross --depth=100 --branch=v8.1.0 +if [ ! -d codes/.git ]; then + git clone https://github.com/codes-org/codes --depth=100 --branch=v1.5.0 +else + echo "Using existing codes checkout: $(realpath codes)" +fi + +if [ ! -d ross/.git ]; then + git clone https://github.com/ross-org/ross --depth=100 --branch=v8.1.0 +else + echo "Using existing ross checkout: $(realpath ross)" +fi if [ $swm_enable = 1 ]; then git clone https://github.com/pmodels/argobots --depth=1 @@ -40,7 +51,7 @@ fi ##### COMPILING ##### -mkdir ross/build +mkdir -p ross/build pushd ross/build cmake .. -DROSS_BUILD_MODELS=ON -DCMAKE_INSTALL_PREFIX="$(realpath ./bin)" \ -DCMAKE_C_COMPILER=mpicc -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS="-g -Wall" @@ -53,7 +64,7 @@ popd if [ $swm_enable = 1 ]; then pushd swm-workloads/swm ./prepare.sh - mkdir build + mkdir -p build pushd build ../configure --disable-shared --prefix="$(realpath ./bin)" CC=mpicc CXX=mpicxx CFLAGS=-g CXXFLAGS=-g #make V=1 && make install @@ -64,7 +75,7 @@ if [ $swm_enable = 1 ]; then pushd argobots ./autogen.sh - mkdir build + mkdir -p build pushd build #../configure --enable-debug=all --disable-fast --disable-shared --prefix="$(realpath ./bin)" CC=mpicc CXX=mpicxx CFLAGS=-g CXXFLAGS=-g ../configure --disable-shared --prefix="$(realpath ./bin)" CC=mpicc CXX=mpicxx CFLAGS=-g CXXFLAGS=-g @@ -97,16 +108,181 @@ if [ $union_enable = 1 ]; then fi -mkdir codes/build + +# Make system pkg-config metadata visible even when Conda's pkg-config is active. +# This is needed for libzmq.pc on systems where ZeroMQ is installed through the OS +# but the active Conda environment's pkg-config only searches Conda pkgconfig dirs. +if ! pkg-config --exists libzmq 2>/dev/null; then + for pcdir in \ + /usr/lib/x86_64-linux-gnu/pkgconfig \ + /usr/lib64/pkgconfig \ + /usr/lib/pkgconfig \ + /usr/local/lib/pkgconfig \ + /usr/local/lib64/pkgconfig \ + /opt/homebrew/lib/pkgconfig \ + /usr/share/pkgconfig + do + if [ -d "$pcdir" ]; then + export PKG_CONFIG_PATH="$pcdir:${PKG_CONFIG_PATH:-}" + fi + done +fi + +if ! pkg-config --exists libzmq 2>/dev/null; then + echo "WARNING: pkg-config still cannot find libzmq.pc." >&2 + echo " If ZMQML fails to build, install the ZeroMQ development package" >&2 + echo " or set PKG_CONFIG_PATH to the directory containing libzmq.pc." >&2 +fi + +# Build local ZMQML requester library required by director-client.C +pushd codes/src/surrogate/zmqml +make clean +make +test -f libzmqmlrequester.so +test -f zmqmlrequester.h +popd + +# Make imported zmqmlrequester target visible to doc/example and tests. +python3 - <<'INNERPY' +from pathlib import Path +cm = Path("codes/src/CMakeLists.txt") +text = cm.read_text() +old = "add_library(zmqmlrequester SHARED IMPORTED )" +new = "add_library(zmqmlrequester SHARED IMPORTED GLOBAL)" +if old in text: + cm.write_text(text.replace(old, new)) +elif new in text: + pass +else: + raise SystemExit("Could not find zmqmlrequester imported target line in codes/src/CMakeLists.txt") +INNERPY + +mkdir -p codes/build pushd codes/build +torch_cmake_prefix="" +torch_dir="" + +if [ "$torch_enable" = 1 ]; then + torch_cmake_prefix="$(python3 - <<'INNERPY' +import torch +print(torch.utils.cmake_prefix_path) +INNERPY +)" + torch_dir="${torch_cmake_prefix}/Torch" + + if [ ! -f "${torch_dir}/TorchConfig.cmake" ]; then + echo "ERROR: TorchConfig.cmake not found at: ${torch_dir}/TorchConfig.cmake" >&2 + echo " torch.utils.cmake_prefix_path returned: ${torch_cmake_prefix}" >&2 + exit 1 + fi + + echo "Using Torch CMake prefix: ${torch_cmake_prefix}" + echo "Using Torch_DIR: ${torch_dir}" + + # CUDA is intentionally opt-in. + # Default to CPU-only Torch-JIT compilation unless CUDA_HOME is explicitly set. + # + # To enable CUDA, run for example: + # export CUDA_HOME=/usr/local/cuda-12.4 + # ./CODES-compile-instructions.sh + torch_cuda_version="$(python3 - <<'INNERPY' +import torch +print(torch.version.cuda or "") +INNERPY +)" + + cuda_arch="" + if [ -z "${CUDA_HOME:-}" ] && [ -n "${torch_cuda_version}" ]; then + echo "ERROR: CUDA_HOME is not set, so this script is defaulting to CPU-only Torch-JIT compilation." >&2 + echo " However, the active Python environment has a CUDA-enabled PyTorch build:" >&2 + echo " torch.version.cuda=${torch_cuda_version}" >&2 + echo "" >&2 + echo " CMake cannot use a CUDA-enabled PyTorch package as a CPU-only LibTorch package." >&2 + echo " Choose one of the following:" >&2 + echo " 1. For CPU-only compilation, install a CPU-only PyTorch build in this environment." >&2 + echo " 2. For CUDA compilation, export CUDA_HOME to your CUDA toolkit root." >&2 + echo "" >&2 + echo " Example CUDA build:" >&2 + echo " export CUDA_HOME=/usr/local/cuda-12.4" >&2 + echo " bash CODES-compile-instructions.sh" >&2 + exit 1 + fi + + if [ -n "${CUDA_HOME:-}" ]; then + if [ ! -f "${CUDA_HOME}/include/cuda_runtime_api.h" ]; then + echo "ERROR: CUDA_HOME is set, but missing CUDA header: ${CUDA_HOME}/include/cuda_runtime_api.h" >&2 + exit 1 + fi + + if [ ! -f "${CUDA_HOME}/lib64/libcudart.so" ] && [ ! -f "${CUDA_HOME}/lib/libcudart.so" ]; then + echo "ERROR: CUDA_HOME is set, but missing CUDA runtime library under ${CUDA_HOME}/lib64 or ${CUDA_HOME}/lib" >&2 + exit 1 + fi + + if [ ! -x "${CUDA_HOME}/bin/nvcc" ]; then + echo "ERROR: CUDA_HOME is set, but missing CUDA compiler: ${CUDA_HOME}/bin/nvcc" >&2 + exit 1 + fi + + if [ ! -d "${CUDA_HOME}/nvvm/libdevice" ]; then + echo "ERROR: CUDA_HOME is set, but missing CUDA libdevice directory: ${CUDA_HOME}/nvvm/libdevice" >&2 + exit 1 + fi + + if command -v nvidia-smi >/dev/null 2>&1; then + cuda_arch="$(nvidia-smi --query-gpu=compute_cap --format=csv,noheader 2>/dev/null | head -n1 | tr -d '.[:space:]' || true)" + fi + + if [ -z "${cuda_arch}" ]; then + echo "WARNING: Could not auto-detect GPU compute capability with nvidia-smi." >&2 + echo " Falling back to CMAKE_CUDA_ARCHITECTURES=80." >&2 + cuda_arch="80" + fi + + export CUDA_HOME + export CUDA_PATH="${CUDA_HOME}" + export CUDA_ROOT="${CUDA_HOME}" + export CUDA_TOOLKIT_ROOT_DIR="${CUDA_HOME}" + export CUDAToolkit_ROOT="${CUDA_HOME}" + export CUDACXX="${CUDA_HOME}/bin/nvcc" + export PATH="${CUDA_HOME}/bin:${PATH}" + export LD_LIBRARY_PATH="${CUDA_HOME}/lib64:${CUDA_HOME}/lib:${LD_LIBRARY_PATH:-}" + + echo "CUDA_HOME is set; enabling CUDA Torch-JIT compilation." + echo "Using CUDA_HOME: ${CUDA_HOME}" + echo "Using CUDACXX: ${CUDACXX}" + echo "Using CMAKE_CUDA_ARCHITECTURES=${cuda_arch}" + else + echo "CUDA_HOME is not set; forcing CPU-only Torch-JIT compilation." + + # Prevent accidental CUDA discovery from /usr/local/cuda, nvcc on PATH, + # inherited CMake cache variables, or CUDA-enabled PyTorch metadata. + unset CUDA_HOME + unset CUDA_PATH + unset CUDA_ROOT + unset CUDA_TOOLKIT_ROOT_DIR + unset CUDAToolkit_ROOT + unset CUDACXX + unset CMAKE_CUDA_COMPILER + fi +fi + +cmake_prefix_path="$(realpath "$CUR_DIR/ross/build/bin")" +if [ "$torch_enable" = 1 ]; then + cmake_prefix_path="${cmake_prefix_path};${torch_cmake_prefix}" +fi + make_args_codes=( - -DCMAKE_PREFIX_PATH="$(realpath "$CUR_DIR/ross/build/bin")" + -DCMAKE_PREFIX_PATH="${cmake_prefix_path}" -DCMAKE_CXX_COMPILER=mpicxx -DCMAKE_C_COMPILER=mpicc -DCMAKE_C_FLAGS="-g -Wall" -DCMAKE_CXX_FLAGS="-g -Wall" -DCMAKE_BUILD_TYPE=Debug -DBUILD_TESTING=ON -DCMAKE_INSTALL_PREFIX="$(realpath bin)" + -DZMQML_BUILD_PATH="$(realpath "$CUR_DIR/codes/src/surrogate/zmqml")" + -DZeroMQ_INCLUDE_DIR=/usr/include + -DZeroMQ_LIBRARY=/usr/lib/x86_64-linux-gnu/libzmq.so ) if [ $swm_enable = 1 ]; then make_args_codes=( @@ -121,8 +297,32 @@ if [ $union_enable = 1 ]; then -DUNION_PKG_CONFIG_PATH="$(realpath "$CUR_DIR/Union/install/lib/pkgconfig")" ) fi -if [ $torch_enable = 1 ]; then - make_args_codes=("${make_args_codes[@]}" -DUSE_TORCH=true) +if [ "$torch_enable" = 1 ]; then + make_args_codes=( + "${make_args_codes[@]}" + -DUSE_TORCH=true + -DTorch_DIR="${torch_dir}" + ) + + if [ -n "${CUDA_HOME:-}" ]; then + make_args_codes=( + "${make_args_codes[@]}" + -DCUDA_TOOLKIT_ROOT_DIR="${CUDA_HOME}" + -DCUDAToolkit_ROOT="${CUDA_HOME}" + -DCUDA_PATH="${CUDA_HOME}" + -DCUDA_ROOT="${CUDA_HOME}" + -DCMAKE_CUDA_COMPILER="${CUDA_HOME}/bin/nvcc" + -DCMAKE_CUDA_ARCHITECTURES="${cuda_arch}" + -DCUDA_INCLUDE_DIRS="${CUDA_HOME}/include" + -DCUDA_CUDART_LIBRARY="${CUDA_HOME}/lib64/libcudart.so" + ) + else + make_args_codes=( + "${make_args_codes[@]}" + -DCMAKE_DISABLE_FIND_PACKAGE_CUDA=ON + -DCMAKE_DISABLE_FIND_PACKAGE_CUDAToolkit=ON + ) + fi else make_args_codes=("${make_args_codes[@]}" -DUSE_TORCH=false) fi diff --git a/codes/surrogate/director-client.h b/codes/surrogate/director-client.h new file mode 100644 index 00000000..1e663e85 --- /dev/null +++ b/codes/surrogate/director-client.h @@ -0,0 +1,105 @@ +#ifndef __DIRECTOR_CLIENT_H_DEFINED__ +#define __DIRECTOR_CLIENT_H_DEFINED__ + +#include +#include "codes/codes_mapping.h" + + + +#define NUM_DIR_TO_NW_EVENT 20 + + +enum SIMULATION_MODE +{ + SIM_MODE_PDES=1, + SIM_MODE_ITERATION_SURROGATE, +}; + + +typedef struct director_message director_message; +typedef struct director_annotation director_annotation; + +enum DIR_EVENTS +{ + DIR_AN_ITER_MARK=1, + DIR_OP_NW, + DIR_REGISTERED_EVENT__SWITCH_TO_SURR, + DIR_REGISTERED_EVENT__SWITCH_TO_PDES, + DIR_REGISTERED_EVENT__MOVE_TO_NEXT, +}; + +enum DIR_OPERATIONS //currently unused +{ + DIR_AN_WK_START=1, + DIR_AN_WK_ITERATION_END, + DIR_AN_WK_END, + DIR_OP_SEND, + DIR_OP_RECV, +}; + + +// director event message struct +struct director_message +{ + int msg_type; + int op_type; + int num_rngs; + int value; + //model_net_event_return event_rc; + //struct codes_workload_op * mpi_op; + + void *buffer; // this pointer MUST be at the end of the structure +}; + +// director annotation struct +struct director_annotation +{ + int an_type; + int an_value; +}; + + +#ifdef __cplusplus +extern "C" +{ +#endif + + +/** + * @brief Prepares a request to send to client with the specified command and arguments, + * receives a reply + + * @param cmd zmqml request command: 'query', 'launch', execute', send', 'nothing', 'exit' + * @param args the arguments for launch and execute + * @param bindata binary data from send + * @param surrdata containing the 'status' field and optionally 'et' and 'id'. + * 'status' is not present, returns a vector with "failed". + * Fromat is ":;:;..." + * + */ + +//extern char* dir_client_request(const char* cmd, +// const char* args, +// const char* data); + + +extern void director_lp_register_model(const char *); + + +/* +extern void director_parse_args(char *args, int **args_array, int *length); +static void director_issue_codes_event(director_state * s, tw_lpid nw_lpid, int dir_registered_event_type, tw_stime ts, tw_lp* lp); +extern void director_register_events(director_state * s, director_message * msg, tw_lp * lp); +extern void dir_test_init(director_state* s, tw_lp* lp); +extern void director_prepare_iteration_dataset(director_state* s, tw_stime * training_data, int training_cycle, int training_records); +extern void director_get_surrogate_prediction(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp, tw_stime* delay_ts); +extern void dir_test_event_handler(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp); +extern void dir_test_finalize(director_state* s, tw_lp* lp); +*/ + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 08950e7f..c4152b80 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -62,6 +62,7 @@ list(APPEND SRCS surrogate/app-iteration-predictor/average.c surrogate/packet-latency-predictor/common.c surrogate/packet-latency-predictor/average.c + surrogate/director-client.C iokernellang/codesparser.h iokernellang/codesparser.c @@ -158,6 +159,12 @@ if(USE_ONLINE) endif() endif() +# ZMQML +add_library(zmqmlrequester SHARED IMPORTED GLOBAL) +set_target_properties(zmqmlrequester PROPERTIES + IMPORTED_LOCATION "${ZMQML_BUILD_PATH}/libzmqmlrequester.so" + INTERFACE_INCLUDE_DIRECTORIES "${ZMQML_BUILD_PATH}") + #LINK ROSS # target_link_libraries(codes PUBLIC #{pkgcfg_lib_ROSS_ROSS}) # target_link_libraries(codes PUBLIC PkgConfig::ROSS) @@ -169,6 +176,7 @@ target_include_directories(codes PUBLIC ${PROJECT_SOURCE_DIR}/codes ${PROJECT_SOURCE_DIR}/src ${PROJECT_SOURCE_DIR}/src/modelconfig + $ ) target_link_libraries(codes PUBLIC ${LIBS_TO_LINK}) @@ -201,9 +209,14 @@ if(USE_DUMPI) list(APPEND CODES_TARGETS model-net-dumpi-traces-dump) endif() +# ZMQ +pkg_check_modules(PC_ZeroMQ QUIET zmq) +find_path(ZeroMQ_INCLUDE_DIR NAMES zmq.hpp PATHS ${PC_ZeroMQ_INCLUDE_DIRS}) +find_library(ZeroMQ_LIBRARY NAMES zmq PATHS ${PC_ZeroMQ_LIBRARY_DIRS}) + foreach(tar IN LISTS CODES_TARGETS) target_include_directories(${tar} PUBLIC ${CODES_INCLUDE_DIRS} ${ROSS_INCLUDE_DIRS}) - target_link_libraries(${tar} PUBLIC codes ${LIBS_TO_LINK}) + target_link_libraries(${tar} PUBLIC codes ${LIBS_TO_LINK} zmqmlrequester ${ZeroMQ_LIBRARY}) endforeach() diff --git a/src/network-workloads/model-net-mpi-replay.c b/src/network-workloads/model-net-mpi-replay.c index 018c4337..df28d9f5 100644 --- a/src/network-workloads/model-net-mpi-replay.c +++ b/src/network-workloads/model-net-mpi-replay.c @@ -23,6 +23,21 @@ #include "codes/surrogate/init.h" #include "surrogate/app-iteration-predictor/common.h" + +#include "codes/surrogate/director-client.h" + +/* ========================================================== + START OF Director Code (To be moved to separate files) + ========================================================== +*/ + + +//struct director_config_struct dir_config_global; + + + + + /* turning on track lp will generate a lot of output messages */ #define DBG_COMM 1 #define MN_LP_NM "modelnet_dragonfly_custom" @@ -208,6 +223,8 @@ enum MPI_NW_EVENTS CLI_OTHER_FINISH, //received when another workload has finished // Surrogate events SURR_SKIP_ITERATION, // skips one (several) iteration(s) of simulation + CODES_CMD_SWITCH_TO_SURR, // transition simulation from PDES -> Surrogate + CODES_CMD_SWITCH_TO_PDES, // transition simulation from Surrogate -> PDES }; /* type of synthetic traffic */ @@ -393,6 +410,11 @@ struct nw_state char output_buf[512]; char col_stats[64]; struct ross_model_sample ross_sample; + + /* For hybrid simulations with DIRECTOR and surrogate */ + int director_enabled; + tw_lpid director_lpid; + int simulation_mode; }; /* data for handling reverse computation. @@ -498,6 +520,73 @@ struct nw_message } rc; }; + +/* ========================================================== + START of model-net replay DIRECTOR interface + ========================================================== +*/ + + +void codes_register_director_events(nw_state* s, int dir_event_type, int nw_event_type, size_t event_value, tw_lp* lp) +{ + tw_event *e; + director_message *msg; + + nw_message registered_nw_msg; + registered_nw_msg.msg_type = nw_event_type; + registered_nw_msg.op_type = event_value; + registered_nw_msg.fwd.app_id = s->nw_id; + //registered_nw_msg.fwd.data_type = 9999; + + tw_stime ts = 0.001; + + //printf("==DIR[%d] NW Registering dir_event_type:%d | nw_event_type:%d (time: %lf)\n", + // s->nw_id, dir_event_type, nw_event_type, tw_now(lp)); + + //printf("==DIR: ts: %lf\n", ts); + if (sizeof(nw_message) + sizeof(director_message) > g_tw_msg_sz){ + tw_error(TW_LOC, "Error: NW-director trying to transmit an event of size " + "%d but ROSS is configured for events of size %zd\n", + sizeof(nw_message) + sizeof(director_message), g_tw_msg_sz); + } + + e = tw_event_new(s->director_lpid, ts, lp); + msg = (director_message*)tw_event_data(e); + msg->msg_type = dir_event_type; + msg->op_type = nw_event_type; + msg->value = sizeof(nw_message); + + memcpy(&msg->buffer, ®istered_nw_msg, msg->value); + + tw_event_send(e); +} + +/* Trigger Director Event from within network model*/ +static void codes_issue_director_event(tw_lp* lp, tw_lpid director_lpid, int dir_event_type, int value) +{ + + tw_event *e; + struct director_message* msg; + + tw_stime ts; + + ts = .0001; // Todo: maybe this should be dynamically adjustable + + e = tw_event_new(director_lpid, ts, lp ); + msg = (director_message*)tw_event_data(e); + + msg->msg_type = dir_event_type; + msg->value = value; + tw_event_send(e); +} +/* ========================================================== + END of model-net replay DIRECTOR interface + ========================================================== +*/ + + + + static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op, int matched_req); static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp); @@ -2489,6 +2578,28 @@ void nw_test_init(nw_state* s, tw_lp* lp) // then this will error out s->wrkld_id = -1; + /* START of DIRECTOR setup + */ + s->director_enabled = 0; + s->simulation_mode = SIM_MODE_PDES; + int num_dir_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "dir-nw-lp", NULL, 0); + if(num_dir_per_mgrp > 0){ + s->director_enabled = 1; + codes_mapping_get_lp_id("MODELNET_GRP", "dir-nw-lp", NULL, 1, s->nw_id / num_dir_per_mgrp, s->nw_id % num_dir_per_mgrp, &s->director_lpid); + + // register callbacks with director + codes_register_director_events(s, DIR_REGISTERED_EVENT__SWITCH_TO_SURR, CODES_CMD_SWITCH_TO_SURR, sizeof(nw_message), lp); + codes_register_director_events(s, DIR_REGISTERED_EVENT__SWITCH_TO_PDES, CODES_CMD_SWITCH_TO_PDES, sizeof(nw_message), lp); + codes_register_director_events(s, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, MPI_OP_GET_NEXT, sizeof(nw_message), lp); + //printf("\n==DIRNW s->nw_id: %d | lp->gid: %llu | s>director_lpid: %llu", s->nw_id, LLU(lp->gid), LLU(s->director_lpid)); + + } + else{ + //printf("\n==NW[%d] DIRECTOR: No director LPs found (dir-nw-lp)\n", s->nw_id); + } + /* + * END of DIRECTOR setup */ + char type_name[512]; if(!num_net_traces) @@ -2843,6 +2954,8 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) break; case MPI_OP_GET_NEXT: + //if(m->fwd.data_type == 9999) + // printf("===[%llu] N-MARK[%llu]: Value=%d\n", lp->gid, 1, 9999); get_next_mpi_operation(s, bf, m, lp); break; @@ -2875,6 +2988,22 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) case SURR_SKIP_ITERATION: skip_to_iteration(s, lp, bf, m); break; + + /* START of Sim. transition events sent by DIRECTOR */ + case CODES_CMD_SWITCH_TO_SURR: + if (s->simulation_mode == SIM_MODE_PDES) { + s->simulation_mode = SIM_MODE_ITERATION_SURROGATE; + get_next_mpi_operation(s, bf, m, lp); + } + break; + + case CODES_CMD_SWITCH_TO_PDES: + if (s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) { + s->simulation_mode = SIM_MODE_PDES; + get_next_mpi_operation(s, bf, m, lp); + } + break; + /* END of Sim. transition event sent by DIRECTOR */ } } @@ -3019,6 +3148,29 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l // printf("Client rank %llu completed workload, local rank %d .\n", s->nw_id, s->local_rank); return; + } + if(mpi_op->op_type == CODES_WK_MARK) // If annotation type + {// TODO: extend this section to include checks for all annotations + m->rc.mpi_next.mark.saved_marker_time = tw_now(lp); + + if(s->director_enabled == 1) + { + //printf("===DIR: Value=%d\n", mpi_op->u.send.tag); + codes_issue_director_event(lp, s->director_lpid, DIR_AN_ITER_MARK, mpi_op->u.send.tag); + //printf("===[%llu] N-MARK[%llu]: Value=%d\n", lp->gid, s->director_lpid, mpi_op->u.send.tag); + + return; + }else{ + codes_issue_next_event(lp); + return; + } + } else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) // Else non-annotation type + { + if(s->director_enabled == 1) + { + codes_issue_director_event(lp, s->director_lpid, DIR_OP_NW, mpi_op->op_type); + return; + } } switch(mpi_op->op_type) { @@ -4255,6 +4407,8 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv ) if (g_st_ev_trace || g_st_model_stats || g_st_use_analysis_lps) nw_lp_register_model(); + director_lp_register_model("dir-nw-lp"); + net_ids = model_net_configure(&num_nets); // assert(num_nets == 1); net_id = *net_ids; diff --git a/src/surrogate/director-client.C b/src/surrogate/director-client.C new file mode 100644 index 00000000..c3738064 --- /dev/null +++ b/src/surrogate/director-client.C @@ -0,0 +1,505 @@ +#include +#include +#include +#include +#include + +#include +#include +#include // std::min_element +#include //std::fixed +#include // std::precision + +#include "codes/surrogate/director-client.h" +#include "codes/configuration.h" +#include "zmqmlrequester.h" + + +#define NUM_ACTIVE_CLIENTS 72 //TODO: this should be calculated at runtime + +#define DIR_ZMQ_CMD_LENGTH 15 +#define DIR_ZMQ_ARG_LENGTH 100 + +#define DIR_MAX_PREDICTION 5 +#define DIR_MAX_TRAINING_RECORDS 10 +#define DIR_MAX_DATA_SIZE 15 + +struct +{ + int surr_iter_start; + int surr_iter_end; +} director_config_global; + +typedef struct director_state director_state; + +// Some flag to relocate/clean-up +int evaluate_perf = 1; +int training_enabled = 0; //TODO: Move this to the LP state +int surrogate_enabled = 0; +int inferencing_enabled = 1; + + +std::vector total_elapsed_times; +std::vector zmq_processing_times; + +// state of the director LP +struct director_state +{ + tw_lpid director_id; + int simulation_mode; + + int training_cycle_id; + int training_record_id; + tw_stime training_data[DIR_MAX_TRAINING_RECORDS]; + //std::vector training_data_vc; + + int next_prediction_index; + tw_stime predictions[DIR_MAX_PREDICTION]; + + void *nw_event_ptr[NUM_DIR_TO_NW_EVENT]; + int nw_event_size[NUM_DIR_TO_NW_EVENT]; + + tw_lpid nw_lpid; +}; + +std::vector director_get_str_list(const char *s, const char delimiter) { + std::vector result; + std::stringstream ss (s); + std::string item; + + while (getline (ss, item, delimiter)) { result.push_back (item); } + return result; +} +std::string director_get_list_str(std::vector s, const char delimiter) { + std::ostringstream mergedstr; + std::copy(s.begin(), s.end(), std::ostream_iterator(mergedstr, " ")); + std::cout << mergedstr.str() << std::endl; + return mergedstr.str(); +} + +std::vector director_client_request( + const char* cmd, + const char* args, + const std::string data) +{ + std::vector ret; + /* + std::cout << cmd << " ARGS " << args << std::endl; + //if(strcmp(cmd, "send-records") == 0){ + std::cout << data << std::endl; + //} + */ + if(strcmp(cmd, "exit") == 0){ + ret = zmqml_request(cmd); + return ret; + } + + auto start_time = std::chrono::steady_clock::now(); // TODO - find a way to enclose this in evaluate_perf? + + std::vector args_list; + args_list = director_get_str_list(args, ';'); + ret = zmqml_request(cmd, args_list, data); + + if (evaluate_perf == 1){ + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration(end_time - start_time).count(); + + total_elapsed_times.push_back(duration); + zmq_processing_times.push_back( std::stod(ret[1]) ); + + if(zmq_processing_times.size() == NUM_ACTIVE_CLIENTS){ + double sum = 0; + for (double ts : zmq_processing_times) sum += ts; + double mean = sum / zmq_processing_times.size(); + double sum_sq_diff = 0; + for (double ts : zmq_processing_times) sum_sq_diff += (ts - mean) * (ts - mean); + double std_dev = sqrt(sum_sq_diff / zmq_processing_times.size()); + auto min = std::min_element(zmq_processing_times.begin(), zmq_processing_times.end()); + auto max = std::max_element(zmq_processing_times.begin(), zmq_processing_times.end()); + + zmq_processing_times.clear(); + + std::cout << std::setprecision(9) << std::fixed; + /* + std::cout << "ZMQ_VALS: "; + for(auto d: zmq_processing_times) + std::cout << d << " ;"; + std::cout << std::endl; + */ + std::cout << "==DIR_STATS zmq-processing: " << cmd + << " latency: mean = " << mean + << ", min = " << *min + << ", max = " << *max + << ", std-deviation = " << std_dev + << std::endl; + + double tsum = 0; + for (double ts : total_elapsed_times) tsum += ts; + double tmean = tsum / total_elapsed_times.size(); + double tsum_sq_diff = 0; + for (double ts : total_elapsed_times) tsum_sq_diff += (ts - tmean) * (ts - tmean); + double tstd_dev = sqrt(tsum_sq_diff / total_elapsed_times.size()); + auto tmin = std::min_element(total_elapsed_times.begin(), total_elapsed_times.end()); + auto tmax = std::max_element(total_elapsed_times.begin(), total_elapsed_times.end()); + /* + std::cout << "TOTAL_VALS: "; + for(auto d: total_elapsed_times) + std::cout << d << " ;"; + std::cout << std::endl; + */ + total_elapsed_times.clear(); + + std::cout << "==DIR_STATS zmq-total: " << cmd + << " latency: mean = " << tmean + << ", min = " << *tmin + << ", max = " << *tmax + << ", std-deviation = " << tstd_dev + << std::endl; + } + } + /* + std::cout << cmd << "|" << args << " | "; + for(auto s: ret) + std::cout << s << " ;"; + std::cout << std::endl; + */ + + return ret; +} + + +/* Trigger CODES Event From Director */ +static void director_issue_codes_event(director_state * s, tw_lpid nw_lpid, int dir_registered_event_type, tw_stime ts, tw_lp* lp) +{ + + tw_event *e; + void* msg; + + //printf("==DIR: ts: %lf\n", ts); + e = tw_event_new(nw_lpid, ts, lp); + msg = (void*)tw_event_data(e); + + memcpy(msg, s->nw_event_ptr[dir_registered_event_type], s->nw_event_size[dir_registered_event_type]); + + //msg->msg_type = dir_registered_event_type; + tw_event_send(e); +} + +void director_register_events(director_state * s, director_message * msg, tw_lp * lp) +{ + int dir_registered_event_type = msg->msg_type; + int pdes_msg_size = msg->value; + + //printf("==DIR[%d] DIR Registering dir_event_type:%d (time: %lf)\n", + // s->director_id, dir_registered_event_type, tw_now(lp)); + + s->nw_event_size[dir_registered_event_type] = pdes_msg_size; + memcpy(s->nw_event_ptr[dir_registered_event_type], &msg->buffer, pdes_msg_size); + + //int pdes_event_type = msg->op_type; + //nw_message *buffer = &msg->buffer; + //nw_message *saved_msg = s->nw_event_ptr[dir_registered_event_type]; + //printf("==DIR s->director_id: %d | dir_registered_event_type: %d | pdes_event_type: %d (%d)\n", + // s->director_id, dir_registered_event_type, pdes_event_type, + // buffer->msg_type); +} + + + + +// initializes the director LP +void director_init(director_state* s, tw_lp* lp) +{ + // initialize the LP's and load the data + memset(s, 0, sizeof(*s)); + s->simulation_mode = SIM_MODE_PDES; + s->director_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0); + + s->training_cycle_id = 0; + s->training_record_id = 1; + s->training_data[0] = tw_now(lp); + //s->training_data_vc.push_back(tw_now(lp)); + s->next_prediction_index = -1; + for(int i = 0; i < DIR_MAX_PREDICTION; i++){ + s->predictions[i] = (tw_stime) 1000000; + } + + for(int i = 0; i < NUM_DIR_TO_NW_EVENT; i++){ + s->nw_event_ptr[i] = (void*) calloc(1, g_tw_msg_sz); + } + + // get lp_id of the nw that matches this director + int num_nw_per_mgrp; + s->nw_lpid; + num_nw_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "nw-lp", NULL, 0); + codes_mapping_get_lp_id("MODELNET_GRP", "nw-lp", NULL, 1, s->director_id / num_nw_per_mgrp, s->director_id % num_nw_per_mgrp, &(s->nw_lpid)); + + // Get switching criteria from configuration + // if we're switch based on iteration - read iter start and end + // if switch based on virtual time - schedule sending switch event to CODES + // (stage 2) if switch based on accuracy - schedule polling for accuracy + // (stage 2) pass training data from CODES to surrogate + // (stage 3) using workload with network surrogates + + // Update global configs + if(s->director_id == 1) + { + int rc = 1, rc1 = 1, rc2 = 1; + rc = configuration_get_value_int(&config, "DIRECTOR", "surrogate_enabled", NULL, &surrogate_enabled); + if(rc) + surrogate_enabled = 0; + if(surrogate_enabled){ + rc1 = configuration_get_value_int(&config, "DIRECTOR", "start_iter", NULL, &director_config_global.surr_iter_start); + rc2 = configuration_get_value_int(&config, "DIRECTOR", "end_iter", NULL, &director_config_global.surr_iter_end); + if(rc1 || rc2){ + director_config_global.surr_iter_start = 100000; + director_config_global.surr_iter_end = 100001; + surrogate_enabled = 0; + } + } + + rc = configuration_get_value_int(&config, "DIRECTOR", "inferencing_enabled", NULL, &inferencing_enabled); + if(rc) + inferencing_enabled = 0; + + rc = configuration_get_value_int(&config, "DIRECTOR", "training_enabled", NULL, &training_enabled); + if(rc) + training_enabled = 0; + } + //printf("\n==DIR s->director_id: %d | lp->gid: %llu | s->nw_lpid: %llu", s->director_id, LLU(lp->gid), LLU(s->nw_lpid)); + + /* + char commandstr[DIR_ZMQ_CMD_LENGTH]; + char args[DIR_ZMQ_ARG_LENGTH]; + std::vector ret_vals; + + sprintf(commandstr,"cmd-%d", s->director_id); + sprintf(args, "2;args-1;"); + + ret_vals = director_client_request(commandstr, args, ""); + //printf("UNIVERSE: %s - %s\n", commandstr, ret_vals[0]); + */ + + return; +} + +void director_prepare_iteration_dataset(director_state* s, tw_stime * training_data, int training_cycle, int training_records) +{ + //printf("==DIR[%d] Training Cycle: %d\n", s->director_id, training_cycle); + + std::string processed_training_data_str; + int i, length = 0; + double tmp_data = 0.0; + char tmp_data_str[DIR_MAX_DATA_SIZE]; + int written = 0; + + // Prepare dataset + for(i = 1; i < training_records; i++) + { + tmp_data = training_data[i] - training_data[i-1]; + written += sprintf(tmp_data_str, "%.2f;", tmp_data); + //strcat(processed_training_data_str, tmp_data_str); + + processed_training_data_str.append(std::to_string(tmp_data)); + processed_training_data_str.append(" "); + // printf(" %3d: %lf [%lf]\n", i, training_data[i], tmp_data); + } + //std::cout << "Processed Data: " << processed_training_data_str << std::endl; + + // Send dataset + std::vector ret_vals; + char commandstr[DIR_ZMQ_CMD_LENGTH]; + char args[DIR_ZMQ_ARG_LENGTH]; + + sprintf(commandstr, "send-records"); + sprintf(args, "%d;%d;%d", 3, s->director_id, training_records - 1); // num-of-args;num-record + ret_vals = director_client_request(commandstr, args, processed_training_data_str); +} + +void director_get_surrogate_prediction(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp, tw_stime* delay_ts) +{ + // Check if we have sufficient predictions + if(s->next_prediction_index == -1){ // we need more + //printf("==DIR[%d] DIR Prediction -- generating set (time: %lf)\n", + // s->director_id, tw_now(lp)); + + if(inferencing_enabled){ + // Pull more predictions + std::vector ret_vals; + char commandstr[15]; + char args[100]; + + sprintf(commandstr, "do-inference"); + sprintf(args, "%d;%d;%d;", 3, s->director_id, DIR_MAX_PREDICTION); // num-of-args;num-record + std::string input_data = ("1000000 1000000 1000000 1000000"); + + ret_vals = director_client_request(commandstr, args, input_data); + /* + std::cout << "PREDICTIONS: " << commandstr + << " [0]" << ret_vals[0] + << " [1]" << ret_vals[1] + << " [2]" << ret_vals[2] + << std::endl; + */ + std::vector predictions = director_get_str_list(ret_vals[2].c_str(), ' '); + + //std::cout << "PREDICTIONS: " << predictions.size() << " | "; + int i = 0; + for(auto p: predictions){ + //std::cout << " " << std::stof(p); + s->predictions[i] = std::stof(p); + i += 1; + } + //std::cout << std::endl; + assert(i <= DIR_MAX_PREDICTION); + } + + s->next_prediction_index = 0; + } + *delay_ts = s->predictions[s->next_prediction_index]; + + s->next_prediction_index = s->next_prediction_index + 1; + + // Check if we've exhuasted the predictions + if(s->next_prediction_index == DIR_MAX_PREDICTION){ + s->next_prediction_index = -1; + } +} + + +void director_event_handler(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp) +{ + + switch(m->msg_type) + { + case DIR_OP_NW: + if(s->simulation_mode == SIM_MODE_PDES) + { + tw_error(TW_LOC, "DIR sent for non-annotation operation during PDES mode."); + } else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) + { + //printf("==DIR[%d] Skipping NW Op type:%d (time: %lf)\n", s->director_id, m->value, tw_now(lp)); + + tw_stime delay_ts = 0.001; + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); + } + break; + + case DIR_AN_ITER_MARK: + //fprintf(iteration_log, "DIR %d (time %lf)\n", s->director_id, tw_now(lp)); + //printf("==DIR[%d] DIR_AN_ITER_MARK m->value: %d (time: %lf)\n", s->director_id, m->value, tw_now(lp)); + if(s->simulation_mode == SIM_MODE_PDES) + { + // Manage training data + if(training_enabled && s->training_record_id < DIR_MAX_TRAINING_RECORDS) + {// There is space to store more training data + s->training_data[s->training_record_id] = tw_now(lp); + //s->training_data_vc.push_back(tw_now(lp)); + s->training_record_id = s->training_record_id + 1; + } + if(training_enabled && s->training_record_id == DIR_MAX_TRAINING_RECORDS) + {// We've filled all training data slots + //printf("==DIR[%d] Sending training dataset (time: %lf)\n", s->director_id, tw_now(lp)); + + // Prepare and send training data + director_prepare_iteration_dataset(s, s->training_data, s->training_cycle_id, DIR_MAX_TRAINING_RECORDS); + + // Increment cycle counter, reset record counter, and prime dataset + s->training_cycle_id = s->training_cycle_id + 1; + s->training_record_id = 1; + s->training_data[0] = tw_now(lp); + } + if(surrogate_enabled && m->value == director_config_global.surr_iter_start) + { + //printf("==DIR[%d] Triggering switch to SURR (time: %lf)\n", s->director_id, tw_now(lp)); + + s->simulation_mode = SIM_MODE_ITERATION_SURROGATE; + tw_stime delay_ts; + director_get_surrogate_prediction(s, bf, m, lp, &delay_ts); + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__SWITCH_TO_SURR, delay_ts, lp); + return; + } + else + { + tw_stime delay_ts = 0.001; + //printf("===[%llu] D-MARK[%llu]: Value=%d\n", s->nw_lpid, lp->gid, m->value ); + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); + return; + } + } + else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) + { + if(m->value == director_config_global.surr_iter_end) + { + //printf("==DIR[%d] Triggering switch to PDES (time: %lf)\n", s->director_id, tw_now(lp)); + + s->simulation_mode = SIM_MODE_PDES; + tw_stime delay_ts = 0.001; + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__SWITCH_TO_PDES, delay_ts, lp); + + if(training_enabled){ + // Restart training data collection + //s->training_data_vc.clear(); + s->training_data[0] = tw_now(lp); + s->training_record_id = 1; + } + return; + } + else // we need to predict when the next iteration will start + { + tw_stime delay_ts; + director_get_surrogate_prediction(s, bf, m, lp, &delay_ts); + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); + return; + } + } + else + { + tw_error(TW_LOC, "[DIR] Simulation mode unknown."); + } + + break; + + case DIR_REGISTERED_EVENT__SWITCH_TO_SURR: + case DIR_REGISTERED_EVENT__SWITCH_TO_PDES: + case DIR_REGISTERED_EVENT__MOVE_TO_NEXT: + director_register_events(s, m, lp); + break; + + default: + break; + } +} + +void director_finalize(director_state* s, tw_lp* lp) +{ + if (s->director_id == 0 && (training_enabled || inferencing_enabled)) + director_client_request("exit", "", ""); + + //printf("\n==DIR: FINALIZED"); +} + +tw_lptype dir_lp = { + (init_f) director_init, + (pre_run_f) NULL, + (event_f) director_event_handler, + (revent_f) NULL, //director_event_handler_rc, + (commit_f) NULL, //director_event_handler_commit, + (final_f) director_finalize, + (map_f) codes_mapping, + sizeof(director_state) +}; + +extern void director_lp_register_model(const char * dir_lp_name){ + int num_dir_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "dir-nw-lp", NULL, 0); + if(num_dir_per_mgrp > 0){ + lp_type_register(dir_lp_name, &dir_lp); // DIRECTOR addition - register type + //printf("\n==DIR: Registered\n"); + } +} + + +/* ========================================================== + END OF Director Code (To be moved to separate files) + ========================================================== +*/ diff --git a/src/surrogate/init.c b/src/surrogate/init.c index dc165eae..317f173a 100644 --- a/src/surrogate/init.c +++ b/src/surrogate/init.c @@ -155,7 +155,6 @@ bool network_surrogate_configure( network_director_configure(sc, network_director_enabled ? &switch_network_at: NULL, freeze_network_on_switch); - //surr_config.director.switch_surrogate(); if (DEBUG_DIRECTOR && g_tw_mynode == 0) { fprintf(stderr, "Simulation starting on network %s mode\n", sc->model.is_surrogate_on() ? "surrogate" : "high-fidelity"); } diff --git a/src/surrogate/ml_models/combine_packet_latency_traces_across_ranks.py b/src/surrogate/ml_models/combine_packet_latency_traces_across_ranks.py new file mode 100644 index 00000000..c141abd6 --- /dev/null +++ b/src/surrogate/ml_models/combine_packet_latency_traces_across_ranks.py @@ -0,0 +1,58 @@ +from pathlib import Path +import pandas as pd + +trace_dir = Path("packet-latency-trace") +out = Path("packet_latency_train.csv") + +cols = [ + "src_terminal", + "dest_terminal", + "packet_id", + "is_surrogate_on", + "is_predicted", + "size", + "workload_injection", + "next_packet_delay", + "start", + "end", + "latency", + "is_there_another_pckt_in_queue", +] + +files = sorted(trace_dir.glob("packets-delay-gid=*.txt")) +if not files: + raise SystemExit(f"No packet trace files found under {trace_dir}") + +dfs = [] +for f in files: + df = pd.read_csv(f, comment="#", header=None, names=cols) + df["rank_file"] = f.name + dfs.append(df) + +df = pd.concat(dfs, ignore_index=True) + +# For training the current torch-jit predictor, use only real PDES packets. +df = df[ + (df["is_surrogate_on"] == 0) & + (df["is_predicted"] == 0) & + (df["end"] >= 0) & + (df["latency"] > 0) +].copy() + +# Current torch-jit model predicts [latency, next_packet_delay]. +# Drop rows where next_packet_delay is unavailable. +df = df[df["next_packet_delay"] >= 0].copy() + +df.to_csv(out, index=False) + +print(f"Wrote {out}") +print(f"rows={len(df)}") + +print(df[[ + "src_terminal", + "dest_terminal", + "size", + "is_there_another_pckt_in_queue", + "latency", + "next_packet_delay", +]].describe()) diff --git a/src/surrogate/ml_models/train_packet_latency_torchjit.py b/src/surrogate/ml_models/train_packet_latency_torchjit.py new file mode 100644 index 00000000..bd20713f --- /dev/null +++ b/src/surrogate/ml_models/train_packet_latency_torchjit.py @@ -0,0 +1,258 @@ +from pathlib import Path +import argparse +import math +import random + +import pandas as pd +import torch +from torch import nn +from torch.utils.data import DataLoader, TensorDataset, random_split + + + +SEED = 1234 +BATCH_SIZE = 4096 +EPOCHS = 50 +LR = 1e-3 +WEIGHT_DECAY = 1e-4 + +random.seed(SEED) +torch.manual_seed(SEED) + + +class PacketLatencyModel(nn.Module): + """ + TorchScript-compatible model for CODES torch-jit.C. + + C++ passes a LongTensor shaped [1, 4]: + + src_terminal + dest_terminal + packet_size + is_there_another_pckt_in_queue + + This model returns a FloatTensor shaped [1, 2]: + + latency + next_packet_delay + """ + + def __init__(self, n_src: int, n_dst: int, max_size: float, y_mean: torch.Tensor, y_std: torch.Tensor): + super().__init__() + + emb_dim = 16 + hidden = 96 + + self.src_emb = nn.Embedding(n_src, emb_dim) + self.dst_emb = nn.Embedding(n_dst, emb_dim) + + self.register_buffer("max_size", torch.tensor(float(max_size), dtype=torch.float32)) + self.register_buffer("y_mean", y_mean.float()) + self.register_buffer("y_std", y_std.float()) + + self.net = nn.Sequential( + nn.Linear(emb_dim * 2 + 2, hidden), + nn.ReLU(), + nn.Linear(hidden, hidden), + nn.ReLU(), + nn.Linear(hidden, 2), + ) + + self.softplus = nn.Softplus() + + def forward(self, x: torch.Tensor) -> torch.Tensor: + src = x[:, 0].long() + dst = x[:, 1].long() + + # Clamp defensively so an unseen id does not crash inference. + src = torch.clamp(src, 0, self.src_emb.num_embeddings - 1) + dst = torch.clamp(dst, 0, self.dst_emb.num_embeddings - 1) + + size = x[:, 2].float().unsqueeze(1) + has_next = x[:, 3].float().unsqueeze(1) + + size_norm = size / torch.clamp(self.max_size, min=1.0) + z = torch.cat( + [ + self.src_emb(src), + self.dst_emb(dst), + size_norm, + has_next, + ], + dim=1, + ) + + # Train in standardized target space, then unscale inside the scripted model. + pred_scaled = self.net(z) + pred = pred_scaled * self.y_std + self.y_mean + + # CODES expects positive latency/delay. Softplus prevents negative predictions. + return self.softplus(pred) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Train a TorchScript packet-latency model for CODES torch-jit surrogate mode." + ) + parser.add_argument( + "--csv", + required=True, + type=Path, + help="Path to merged packet-latency training CSV.", + ) + parser.add_argument( + "--out", + required=True, + type=Path, + help="Output path for the saved TorchScript .pt model.", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + csv_path = args.csv + out_path = args.out + + if not csv_path.exists(): + raise SystemExit(f"Missing {csv_path}. Run the trace merge step first.") + + out_path.parent.mkdir(parents=True, exist_ok=True) + + df = pd.read_csv(csv_path) + + required = [ + "src_terminal", + "dest_terminal", + "size", + "is_there_another_pckt_in_queue", + "latency", + "next_packet_delay", + ] + missing = [c for c in required if c not in df.columns] + if missing: + raise SystemExit(f"CSV is missing columns: {missing}") + + df = df.dropna(subset=required).copy() + df = df[ + (df["latency"] > 0) + & (df["next_packet_delay"] >= 0) + & (df["src_terminal"] >= 0) + & (df["dest_terminal"] >= 0) + & (df["size"] > 0) + ].copy() + + if len(df) < 100: + raise SystemExit(f"Too few usable rows after filtering: {len(df)}") + + feature_cols = [ + "src_terminal", + "dest_terminal", + "size", + "is_there_another_pckt_in_queue", + ] + target_cols = [ + "latency", + "next_packet_delay", + ] + + X = torch.tensor(df[feature_cols].to_numpy(), dtype=torch.long) + y = torch.tensor(df[target_cols].to_numpy(), dtype=torch.float32) + + n_src = int(df["src_terminal"].max()) + 1 + n_dst = int(df["dest_terminal"].max()) + 1 + max_size = float(df["size"].max()) + + y_mean = y.mean(dim=0) + y_std = y.std(dim=0).clamp_min(1e-6) + y_scaled = (y - y_mean) / y_std + + dataset = TensorDataset(X, y_scaled) + + n_val = max(1, int(0.1 * len(dataset))) + n_train = len(dataset) - n_val + + generator = torch.Generator().manual_seed(SEED) + train_ds, val_ds = random_split(dataset, [n_train, n_val], generator=generator) + + train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True) + val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False) + + model = PacketLatencyModel( + n_src=n_src, + n_dst=n_dst, + max_size=max_size, + y_mean=y_mean, + y_std=y_std, + ) + + optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY) + loss_fn = nn.SmoothL1Loss() + + best_val = math.inf + best_state = None + + for epoch in range(EPOCHS): + model.train() + train_loss = 0.0 + + for xb, yb_scaled in train_loader: + pred = model(xb) + y_true = yb_scaled * y_std + y_mean + + loss = loss_fn(pred, y_true) + + optimizer.zero_grad(set_to_none=True) + loss.backward() + optimizer.step() + + train_loss += loss.item() * xb.size(0) + + train_loss /= n_train + + model.eval() + val_loss = 0.0 + with torch.no_grad(): + for xb, yb_scaled in val_loader: + pred = model(xb) + y_true = yb_scaled * y_std + y_mean + loss = loss_fn(pred, y_true) + val_loss += loss.item() * xb.size(0) + + val_loss /= n_val + + if val_loss < best_val: + best_val = val_loss + best_state = {k: v.detach().clone() for k, v in model.state_dict().items()} + + print(f"epoch={epoch:03d} train_loss={train_loss:.6f} val_loss={val_loss:.6f}") + + if best_state is not None: + model.load_state_dict(best_state) + + model.eval() + + # Exact shape/dtype sanity check for src/surrogate/packet-latency-predictor/torch-jit.C + dummy = torch.zeros((1, 4), dtype=torch.long) + with torch.no_grad(): + out = model(dummy) + + if tuple(out.shape) != (1, 2): + raise RuntimeError(f"Bad output shape: expected (1, 2), got {tuple(out.shape)}") + + if out.dtype != torch.float32: + raise RuntimeError(f"Bad output dtype: expected float32, got {out.dtype}") + + scripted = torch.jit.script(model) + scripted.save(str(out_path)) + + print() + print(f"Saved TorchScript model to: {out_path}") + print(f"rows used: {len(df)}") + print(f"n_src={n_src}, n_dst={n_dst}, max_size={max_size}") + print(f"best_val_loss={best_val:.6f}") + print(f"dummy_output={out.tolist()}") + + +if __name__ == "__main__": + main() diff --git a/src/surrogate/network-surrogate.c b/src/surrogate/network-surrogate.c index c2278583..ea119196 100644 --- a/src/surrogate/network-surrogate.c +++ b/src/surrogate/network-surrogate.c @@ -21,7 +21,6 @@ static double frozen_events_switch_time = 0.0; // Time when we switched to surr static struct lp_types_switch const * get_type_switch(char const * const name) { for (size_t i = 0; i < net_surr_config.n_lp_types; i++) { - //printf("THIS %s and %s\n", surr_config.lp_types[i].lpname, name); if (strcmp(net_surr_config.lp_types[i].lpname, name) == 0) { return &net_surr_config.lp_types[i]; } diff --git a/src/surrogate/packet-latency-predictor/torch-jit.C b/src/surrogate/packet-latency-predictor/torch-jit.C index e2c1384c..d69a649d 100644 --- a/src/surrogate/packet-latency-predictor/torch-jit.C +++ b/src/surrogate/packet-latency-predictor/torch-jit.C @@ -112,9 +112,9 @@ static void predict_latency_rc_dummy(struct latency_surrogate * data, tw_lp * lp struct packet_latency_predictor torch_latency_predictor = { - .init = (init_pred_f) init_pred_dummy, - .feed = (feed_pred_f) feed_pred_dummy, - .predict = (predict_pred_f) surrogate_torch_predict, - .predict_rc = (predict_pred_rc_f) predict_latency_rc_dummy, + .init = (init_pred_lat_f) init_pred_dummy, + .feed = (feed_pred_lat_f) feed_pred_dummy, + .predict = (predict_pred_lat_f) surrogate_torch_predict, + .predict_rc = (predict_pred_lat_rc_f) predict_latency_rc_dummy, .predictor_data_sz = 0 }; diff --git a/src/surrogate/zmqml/Makefile b/src/surrogate/zmqml/Makefile index 4c28ed54..b4abcfab 100644 --- a/src/surrogate/zmqml/Makefile +++ b/src/surrogate/zmqml/Makefile @@ -7,7 +7,7 @@ TARGETS=libzmqmlrequester.so demozmqmlrequester all: $(TARGETS) libzmqmlrequester.so: zmqmlrequester.o - $(CXX) -shared -o $@ $^ + $(CXX) -shared -o $@ $^ $(LDFLAGS) zmqmlrequester.o: zmqmlrequester.cpp zmqmlrequester.h $(CXX) $(CXXFLAGS) -fPIC -c $< -o $@ diff --git a/src/surrogate/zmqml/zmqmlrequester.cpp b/src/surrogate/zmqml/zmqmlrequester.cpp index 6f43758c..a8ebbd53 100644 --- a/src/surrogate/zmqml/zmqmlrequester.cpp +++ b/src/surrogate/zmqml/zmqmlrequester.cpp @@ -20,6 +20,7 @@ using namespace rapidjson; static string endpoint = "tcp://localhost:5555"; static int debug = 0; + /** * See zmqmlrequester.h */ @@ -82,6 +83,10 @@ vector zmqml_request(const string& cmd, if (response.HasMember("id")) { ret.push_back(response["id"].GetString()); } + + if (response.HasMember("predictions")) { + ret.push_back(response["predictions"].GetString()); + } } else { ret.push_back("failed"); } diff --git a/src/surrogate/zmqml/zmqmlserver.py b/src/surrogate/zmqml/zmqmlserver.py index 066b0512..4e8105ea 100755 --- a/src/surrogate/zmqml/zmqmlserver.py +++ b/src/surrogate/zmqml/zmqmlserver.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # zmqmlserver : ZeroMQ-based ML task dispatching server @@ -13,6 +13,7 @@ import time from itertools import count # generate unit id # from dataclasses import dataclass +import numpy as np # TODO: abstract a mechanism to call training from runmlpacketdelay import run_mlpacketdelay_training @@ -24,13 +25,15 @@ endpoint = "tcp://*:5555" debug = False - +debug2 = False # # # launch_id = count(start=1) # unique for launched thread launched_threads = {} # id:obj. keep track of active threads. remove the thread once it finished +training_records = {} # client_id:[] + class LaunchCMD: def __init__(self): # thread event @@ -153,6 +156,61 @@ def receivedata(args, bindata): elapsed_time = time.time() - st return (status, elapsed_time) +# +# receive training records +# +def receiverecords(args, bindata): + status = "failed" + st = time.time() + + num_args = int(args[0]) # 1st arg is num of args + client = int(args[1]) # 2nd arg is client id + num_records = int(args[2]) # 3rd arg is num records + records_str = str(bindata.decode('utf-8')) + records_str = records_str.strip() + records = list(records_str.split(" ")) + + if client not in training_records: + training_records[client] = [] + + training_records[client].extend([float(s) for s in records]) + + if (debug2) and (client == 51): + print(f"Training records[51] :{training_records[client]}") + + status = "done" + elapsed_time = time.time() - st + return (status, elapsed_time) + + +# +# do inference to get predictions +# +def launch_surrogate_inferencing(args, bindata): + status = "failed" + st = time.time() + + num_args = int(args[0]) # 1st arg is num of args + client = int(args[1]) # 2nd arg is client id + num_steps = int(args[2]) # 3rd arg is num steps to predict + records_str = str(bindata.decode('utf-8')) + records_str = records_str.strip() + records = list(records_str.split(" ")) + + input_records = [float(s) for s in records] + + inferences = [] + for i in range(num_steps): + inferences.append(2000000.0) + + inferences_str = ' '.join([str(f) for f in inferences]) + + status = "done" + elapsed_time = time.time() - st + return (status, elapsed_time, inferences_str) + + +# # # main listener loop @@ -192,6 +250,12 @@ def zmq_cmd_listener(): destfn = args[0] (status, et) = receivedata(args, bindata) retmsg = {"status":status, "et":str(et)} + elif cmd == "send-records": + (status, et) = receiverecords(args, bindata) + retmsg = {"status":status, "et":str(et)} + elif cmd == "do-inference": + (status, et, predictions) = launch_surrogate_inferencing(args, bindata) + retmsg = {"status":status, "et":str(et), "predictions": predictions} # send response back to the requester socket.send_json(retmsg)