From 976bb79b14a3af74ee7bc8e24e19c669af9d249e Mon Sep 17 00:00:00 2001 From: "Kevin A. Brown" Date: Mon, 17 Jun 2024 00:13:12 -0500 Subject: [PATCH 1/9] MPI Replay: remove print_surrogate_stats() to compile cleanly --- src/network-workloads/model-net-mpi-replay.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/network-workloads/model-net-mpi-replay.c b/src/network-workloads/model-net-mpi-replay.c index 1433b2a3..642b0bcc 100644 --- a/src/network-workloads/model-net-mpi-replay.c +++ b/src/network-workloads/model-net-mpi-replay.c @@ -3756,7 +3756,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv ) if(alloc_spec) codes_jobmap_destroy(jobmap_ctx); - print_surrogate_stats(); + //print_surrogate_stats(); #ifdef USE_RDAMARIS } // end if(g_st_ross_rank) From 7f42f4aa5f79a872004b8c11a5bbda998a9ac6c3 Mon Sep 17 00:00:00 2001 From: "Kevin A. Brown" Date: Thu, 4 Jul 2024 23:17:00 -0500 Subject: [PATCH 2/9] director-b: started adding director LP for mpi-replay --- src/network-workloads/model-net-mpi-replay.c | 426 ++++++++++++++++++- 1 file changed, 411 insertions(+), 15 deletions(-) diff --git a/src/network-workloads/model-net-mpi-replay.c b/src/network-workloads/model-net-mpi-replay.c index 642b0bcc..eb1dc685 100644 --- a/src/network-workloads/model-net-mpi-replay.c +++ b/src/network-workloads/model-net-mpi-replay.c @@ -19,6 +19,288 @@ #include "codes/codes-jobmap.h" #include "codes/congestion-controller-core.h" + + +/* ========================================================== + START OF Director Code (To be moved to separate files) + ========================================================== +*/ + +struct +{ + int surr_iter_start; + int surr_iter_end; +} director_config_global; + +//struct director_config_struct dir_config_global; + + +#define DIR_MAX_PREDICTION 10 +#define NUM_DIR_TO_NW_EVENT 20 + +enum SIMULATION_MODE +{ + SIM_MODE_PDES=1, + SIM_MODE_ITERATION_SURROGATE, +}; + +typedef struct director_state director_state; +typedef struct director_message director_message; +typedef struct director_annotation director_annotation; + +enum DIR_EVENTS +{ + DIR_AN_ITER_MARK=1, + DIR_OP_NW, + DIR_REGISTERED_EVENT__SWITCH_TO_SURR, + DIR_REGISTERED_EVENT__SWITCH_TO_PDES, + DIR_REGISTERED_EVENT__MOVE_TO_NEXT, +}; + +enum DIR_OPERATIONS //currently unused +{ + DIR_AN_WK_START=1, + DIR_AN_WK_ITERATION_END, + DIR_AN_WK_END, + DIR_OP_SEND, + DIR_OP_RECV, +}; + +// state of the director LP +struct director_state +{ + tw_lpid director_id; + int simulation_mode; + + tw_stime predictions[DIR_MAX_PREDICTION]; + + void *nw_event_ptr[NUM_DIR_TO_NW_EVENT]; + int nw_event_size[NUM_DIR_TO_NW_EVENT]; + + tw_lpid nw_lpid; +}; + +// director event message struct +struct director_message +{ + int msg_type; + int op_type; + int num_rngs; + int value; + model_net_event_return event_rc; + struct codes_workload_op * mpi_op; + + void *buffer; // this pointer MUST be at the end of the structure +}; + +// director annotation struct +struct director_annotation +{ + int an_type; + int an_value; +}; + +/* Trigger Director Event from within network model*/ +static void codes_issue_director_event(tw_lp* lp, tw_lpid director_lpid, int dir_event_type, int value) +{ + + tw_event *e; + struct director_message* msg; + + tw_stime ts; + + ts = .0001; // Todo: maybe this should be dynamically adjustable + + e = tw_event_new(director_lpid, ts, lp ); + msg = (director_message*)tw_event_data(e); + + msg->msg_type = dir_event_type; + msg->value = value; + tw_event_send(e); +} + +/* Trigger CODES Event From Director */ +static void director_issue_codes_event(director_state * s, tw_lpid nw_lpid, int dir_registered_event_type, tw_stime ts, tw_lp* lp) +{ + + tw_event *e; + void* msg; + + //printf("==DIR: ts: %lf\n", ts); + e = tw_event_new(nw_lpid, ts, lp); + msg = (void*)tw_event_data(e); + + memcpy(msg, s->nw_event_ptr[dir_registered_event_type], s->nw_event_size[dir_registered_event_type]); + + //msg->msg_type = dir_registered_event_type; + tw_event_send(e); +} + +void director_register_events(director_state * s, director_message * msg, tw_lp * lp) +{ + int dir_registered_event_type = msg->msg_type; + int pdes_msg_size = msg->value; + + //printf("==DIR[%d] DIR Registering dir_event_type:%d (time: %lf)\n", + // s->director_id, dir_registered_event_type, tw_now(lp)); + + s->nw_event_size[dir_registered_event_type] = pdes_msg_size; + memcpy(s->nw_event_ptr[dir_registered_event_type], &msg->buffer, pdes_msg_size); + + //int pdes_event_type = msg->op_type; + //nw_message *buffer = &msg->buffer; + //nw_message *saved_msg = s->nw_event_ptr[dir_registered_event_type]; + //printf("==DIR s->director_id: %d | dir_registered_event_type: %d | pdes_event_type: %d (%d)\n", + // s->director_id, dir_registered_event_type, pdes_event_type, + // buffer->msg_type); +} + + + + +// initializes the director LP +void dir_test_init(director_state* s, tw_lp* lp) +{ + // initialize the LP's and load the data + memset(s, 0, sizeof(*s)); + s->simulation_mode = SIM_MODE_PDES; + s->director_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0); + + for(int i = 0; i < DIR_MAX_PREDICTION; i++){ + s->predictions[i] = (tw_stime) 1000000; + } + + for(int i = 0; i < NUM_DIR_TO_NW_EVENT; i++){ + s->nw_event_ptr[i] = (void*) calloc(1, g_tw_msg_sz); + } + + // get lp_id of the nw that matches this director + int num_nw_per_mgrp; + s->nw_lpid; + num_nw_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "nw-lp", NULL, 0); + codes_mapping_get_lp_id("MODELNET_GRP", "nw-lp", NULL, 1, s->director_id / num_nw_per_mgrp, s->director_id % num_nw_per_mgrp, &(s->nw_lpid)); + + // Get switching criteria from configuration + // if we're switch based on iteration - read iter start and end + // if switch based on virtual time - schedule sending switch event to CODES + // (stage 2) if switch based on accuracy - schedule polling for accuracy + // (stage 2) pass training data from CODES to surrogate + // (stage 3) using workload with network surrogates + + // Update global configs + if(s->director_id == 1) + { + director_config_global.surr_iter_start = 1; + director_config_global.surr_iter_end = 2; + } + //printf("\n==DIR s->director_id: %d | lp->gid: %llu | s->nw_lpid: %llu", s->director_id, LLU(lp->gid), LLU(s->nw_lpid)); + return; +} + +void dir_test_event_handler(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp) +{ + + switch(m->msg_type) + { + case DIR_OP_NW: + if(s->simulation_mode == SIM_MODE_PDES) + { + tw_error(TW_LOC, "DIR sent for non-annotation operation during PDES mode."); + } else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) + { + //printf("==DIR[%d] Skipping NW Op type:%d (time: %lf)\n", s->director_id, m->value, tw_now(lp)); + + tw_stime delay_ts = 0.001; + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); + } + break; + + case DIR_AN_ITER_MARK: + //printf("==DIR[%d] Non-blocking call (time %lf)\n", s->director_id, tw_now(lp)); + //printf("==DIR[%d] s->predictions[%d]: %lf\n", s->director_id, m->value, s->predictions[m->value]); + //fprintf(iteration_log, "DIR %d (time %lf)\n", s->director_id, tw_now(lp)); + //printf("==DIR[%d] DIR_AN_ITER_MARK m->value: %d (time: %lf)\n", s->director_id, m->value, tw_now(lp)); + if(s->simulation_mode == SIM_MODE_PDES) + { + if(m->value == director_config_global.surr_iter_start) + { + //printf("==DIR[%d] Triggering switch to SURR (time: %lf)\n", s->director_id, tw_now(lp)); + + s->simulation_mode = SIM_MODE_ITERATION_SURROGATE; + tw_stime delay_ts = s->predictions[m->value]; + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__SWITCH_TO_SURR, delay_ts, lp); + return; + } + else + { + tw_stime delay_ts = 0.001; + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); + return; + } + } + else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) + { + if(m->value == director_config_global.surr_iter_end) + { + //printf("==DIR[%d] Triggering switch to PDES (time: %lf)\n", s->director_id, tw_now(lp)); + + s->simulation_mode = SIM_MODE_PDES; + tw_stime delay_ts = 0.001; + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__SWITCH_TO_PDES, delay_ts, lp); + return; + } + else // we need to predict when the next iteration will start + { + tw_stime delay_ts = s->predictions[m->value]; + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); + return; + } + } + else + { + tw_error(TW_LOC, "[DIR] Simulation mode unknown."); + } + + break; + + case DIR_REGISTERED_EVENT__SWITCH_TO_SURR: + case DIR_REGISTERED_EVENT__SWITCH_TO_PDES: + case DIR_REGISTERED_EVENT__MOVE_TO_NEXT: + director_register_events(s, m, lp); + break; + + default: + break; + } +} + +void dir_test_finalize(director_state* s, tw_lp* lp) +{ + //printf("\n==DIR: FINALIZED"); +} + +tw_lptype dir_lp = { + (init_f) dir_test_init, + (pre_run_f) NULL, + (event_f) dir_test_event_handler, + (revent_f) NULL, //dir_test_event_handler_rc, + (commit_f) NULL, //dir_test_event_handler_commit, + (final_f) dir_test_finalize, + (map_f) codes_mapping, + sizeof(director_state) +}; + + + + +/* ========================================================== + END OF Director Code (To be moved to separate files) + ========================================================== +*/ + + + + /* turning on track lp will generate a lot of output messages */ #define DBG_COMM 1 #define MN_LP_NM "modelnet_dragonfly_custom" @@ -190,6 +472,8 @@ enum MPI_NW_EVENTS CLI_OTHER_FINISH, //received when another workload has finished // Surrogate events SURR_SKIP_ITERATION, // skips one (several) iteration(s) of simulation + CODES_CMD_SWITCH_TO_SURR, // transition simulation from PDES -> Surrogate + CODES_CMD_SWITCH_TO_PDES, // transition simulation from Surrogate -> PDES }; /* type of synthetic traffic */ @@ -366,6 +650,11 @@ struct nw_state char output_buf[512]; char col_stats[64]; struct ross_model_sample ross_sample; + + /* For hybrid simulations with DIRECTOR and surrogate */ + int director_enabled; + tw_lpid director_lpid; + int simulation_mode; }; /* data for handling reverse computation. @@ -418,6 +707,54 @@ struct nw_message } rc; }; + +/* ========================================================== + START of model-net replay DIRECTOR interface + ========================================================== +*/ + + +void codes_register_director_events(nw_state* s, int dir_event_type, int nw_event_type, size_t event_value, tw_lp* lp) +{ + tw_event *e; + director_message *msg; + + nw_message registered_nw_msg; + registered_nw_msg.msg_type = nw_event_type; + registered_nw_msg.op_type = event_value; + registered_nw_msg.fwd.app_id = s->nw_id; + + tw_stime ts = 0.001; + + //printf("==DIR[%d] NW Registering dir_event_type:%d | nw_event_type:%d (time: %lf)\n", + // s->nw_id, dir_event_type, nw_event_type, tw_now(lp)); + + //printf("==DIR: ts: %lf\n", ts); + if (sizeof(nw_message) + sizeof(director_message) > g_tw_msg_sz){ + tw_error(TW_LOC, "Error: NW-director trying to transmit an event of size " + "%d but ROSS is configured for events of size %zd\n", + sizeof(nw_message) + sizeof(director_message), g_tw_msg_sz); + } + + e = tw_event_new(s->director_lpid, ts, lp); + msg = (director_message*)tw_event_data(e); + msg->msg_type = dir_event_type; + msg->op_type = nw_event_type; + msg->value = sizeof(nw_message); + + memcpy(&msg->buffer, ®istered_nw_msg, msg->value); + + tw_event_send(e); +} + +/* ========================================================== + END of model-net replay DIRECTOR interface + ========================================================== +*/ + + + + static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op, int matched_req); static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp); @@ -2406,6 +2743,23 @@ void nw_test_init(nw_state* s, tw_lp* lp) // This had been -1 but if qos is not configured (single job no workload conf file) // then this will error out + /* START of DIRECTOR setup + */ + s->director_enabled = 0; + s->simulation_mode = SIM_MODE_PDES; + int num_dir_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "dir-lp", NULL, 0); + if(num_dir_per_mgrp > 0){ + s->director_enabled = 1; + codes_mapping_get_lp_id("MODELNET_GRP", "dir-lp", NULL, 1, s->nw_id / num_dir_per_mgrp, s->nw_id % num_dir_per_mgrp, &s->director_lpid); + //printf("\n==DIRNW s->nw_id: %d | lp->gid: %llu | s>director_lpid: %llu", s->nw_id, LLU(lp->gid), LLU(s->director_lpid)); + } + // register callbacks with director + codes_register_director_events(s, DIR_REGISTERED_EVENT__SWITCH_TO_SURR, CODES_CMD_SWITCH_TO_SURR, sizeof(nw_message), lp); + codes_register_director_events(s, DIR_REGISTERED_EVENT__SWITCH_TO_PDES, CODES_CMD_SWITCH_TO_PDES, sizeof(nw_message), lp); + codes_register_director_events(s, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, MPI_OP_GET_NEXT, sizeof(nw_message), lp); + /* + * END of DIRECTOR setup */ + char type_name[512]; if(!num_net_traces) @@ -2758,6 +3112,33 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) case SURR_SKIP_ITERATION: skip_iteration(s, lp, bf, m); + break; + + /* START of Sim. transition events sent by DIRECTOR + */ + case CODES_CMD_SWITCH_TO_SURR: + if(s->simulation_mode == SIM_MODE_PDES){ + //printf("==DIR[%d] NW switch to SURR (time: %lf)\n", s->nw_id, tw_now(lp)); + + s->simulation_mode = SIM_MODE_ITERATION_SURROGATE; + get_next_mpi_operation(s, bf, m, lp); + }else{ + + } + break; + + case CODES_CMD_SWITCH_TO_PDES: + if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE){ + //printf("==DIR[%d] NW switch to PDES (time: %lf)\n", s->nw_id, tw_now(lp)); + + s->simulation_mode = SIM_MODE_PDES; + get_next_mpi_operation(s, bf, m, lp); + }else{ + + } + break; + /* + * END of Sim. transition event sent by DIRECTOR */ } } @@ -2899,6 +3280,34 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l // printf("Client rank %llu completed workload, local rank %d .\n", s->nw_id, s->local_rank); return; + } + if(mpi_op->op_type == CODES_WK_MARK) // If annotation type + {// TODO: extend this section to include checks for all annotations + m->rc.saved_marker_time = tw_now(lp); + + // If we have reached the surrogate switch time, skip next iteration(s) + if (have_we_hit_surrogate_switch(mpi_op)) { + tw_event *e = tw_event_new(lp->gid, 2076575.16 * 91, lp); + nw_message* msg = (nw_message*) tw_event_data(e); + msg->msg_type = SURR_SKIP_ITERATION; + tw_event_send(e); + } + + if(s->director_enabled == 1) + { + //printf("===DIR: Value=%d\n", mpi_op->u.send.tag); + codes_issue_director_event(lp, s->director_lpid, DIR_AN_ITER_MARK, mpi_op->u.send.tag); + //codes_issue_next_event(lp); + + return; + } + } else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) // Else non-annotation type + { + if(s->director_enabled == 1) + { + codes_issue_director_event(lp, s->director_lpid, DIR_OP_NW, mpi_op->op_type); + return; + } } switch(mpi_op->op_type) { @@ -2989,21 +3398,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l } break; - case CODES_WK_MARK: - { - m->rc.saved_marker_time = tw_now(lp); - - // If we have reached the surrogate switch time, skip next iteration(s) - if (have_we_hit_surrogate_switch(mpi_op)) { - tw_event *e = tw_event_new(lp->gid, 2076575.16 * 91, lp); - nw_message* msg = (nw_message*) tw_event_data(e); - msg->msg_type = SURR_SKIP_ITERATION; - tw_event_send(e); - } else { - codes_issue_next_event(lp); - } - } - break; + default: @@ -3583,6 +3978,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv ) if (g_st_ev_trace || g_st_model_stats || g_st_use_analysis_lps) nw_lp_register_model(); + lp_type_register("dir-lp", &dir_lp); // DIRECTOR addition - register type net_ids = model_net_configure(&num_nets); // assert(num_nets == 1); net_id = *net_ids; From 5ae2e7c0e437780e477a3bf19f9cedf37578624b Mon Sep 17 00:00:00 2001 From: "Kevin A. Brown" Date: Fri, 16 Aug 2024 18:39:43 +0000 Subject: [PATCH 3/9] director-b: complete initial director LP prototype for mpi-replay --- codes/surrogate/director-client.h | 105 ++++ src/CMakeLists.txt | 15 +- src/network-workloads/model-net-mpi-replay.c | 317 ++---------- src/surrogate/director-client.C | 505 +++++++++++++++++++ 4 files changed, 664 insertions(+), 278 deletions(-) create mode 100644 codes/surrogate/director-client.h create mode 100644 src/surrogate/director-client.C diff --git a/codes/surrogate/director-client.h b/codes/surrogate/director-client.h new file mode 100644 index 00000000..1e663e85 --- /dev/null +++ b/codes/surrogate/director-client.h @@ -0,0 +1,105 @@ +#ifndef __DIRECTOR_CLIENT_H_DEFINED__ +#define __DIRECTOR_CLIENT_H_DEFINED__ + +#include +#include "codes/codes_mapping.h" + + + +#define NUM_DIR_TO_NW_EVENT 20 + + +enum SIMULATION_MODE +{ + SIM_MODE_PDES=1, + SIM_MODE_ITERATION_SURROGATE, +}; + + +typedef struct director_message director_message; +typedef struct director_annotation director_annotation; + +enum DIR_EVENTS +{ + DIR_AN_ITER_MARK=1, + DIR_OP_NW, + DIR_REGISTERED_EVENT__SWITCH_TO_SURR, + DIR_REGISTERED_EVENT__SWITCH_TO_PDES, + DIR_REGISTERED_EVENT__MOVE_TO_NEXT, +}; + +enum DIR_OPERATIONS //currently unused +{ + DIR_AN_WK_START=1, + DIR_AN_WK_ITERATION_END, + DIR_AN_WK_END, + DIR_OP_SEND, + DIR_OP_RECV, +}; + + +// director event message struct +struct director_message +{ + int msg_type; + int op_type; + int num_rngs; + int value; + //model_net_event_return event_rc; + //struct codes_workload_op * mpi_op; + + void *buffer; // this pointer MUST be at the end of the structure +}; + +// director annotation struct +struct director_annotation +{ + int an_type; + int an_value; +}; + + +#ifdef __cplusplus +extern "C" +{ +#endif + + +/** + * @brief Prepares a request to send to client with the specified command and arguments, + * receives a reply + + * @param cmd zmqml request command: 'query', 'launch', execute', send', 'nothing', 'exit' + * @param args the arguments for launch and execute + * @param bindata binary data from send + * @param surrdata containing the 'status' field and optionally 'et' and 'id'. + * 'status' is not present, returns a vector with "failed". + * Fromat is ":;:;..." + * + */ + +//extern char* dir_client_request(const char* cmd, +// const char* args, +// const char* data); + + +extern void director_lp_register_model(const char *); + + +/* +extern void director_parse_args(char *args, int **args_array, int *length); +static void director_issue_codes_event(director_state * s, tw_lpid nw_lpid, int dir_registered_event_type, tw_stime ts, tw_lp* lp); +extern void director_register_events(director_state * s, director_message * msg, tw_lp * lp); +extern void dir_test_init(director_state* s, tw_lp* lp); +extern void director_prepare_iteration_dataset(director_state* s, tw_stime * training_data, int training_cycle, int training_records); +extern void director_get_surrogate_prediction(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp, tw_stime* delay_ts); +extern void dir_test_event_handler(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp); +extern void dir_test_finalize(director_state* s, tw_lp* lp); +*/ + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d82c2584..cd38259e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -59,6 +59,7 @@ list(APPEND SRCS surrogate/switch.c surrogate/packet-latency-predictor/common.c surrogate/packet-latency-predictor/average.c + surrogate/director-client.C iokernellang/codesparser.h iokernellang/codesparser.c @@ -155,6 +156,12 @@ if(USE_ONLINE) endif() endif() +# ZMQML +add_library(zmqmlrequester SHARED IMPORTED ) +set_target_properties(zmqmlrequester PROPERTIES + IMPORTED_LOCATION "${ZMQML_BUILD_PATH}/libzmqmlrequester.so" + INTERFACE_INCLUDE_DIRECTORIES "${ZMQML_BUILD_PATH}") + #LINK ROSS # target_link_libraries(codes PUBLIC #{pkgcfg_lib_ROSS_ROSS}) # target_link_libraries(codes PUBLIC PkgConfig::ROSS) @@ -166,6 +173,7 @@ target_include_directories(codes PUBLIC ${PROJECT_SOURCE_DIR}/codes ${PROJECT_SOURCE_DIR}/src ${PROJECT_SOURCE_DIR}/src/modelconfig + $ ) target_link_libraries(codes PUBLIC ${LIBS_TO_LINK}) @@ -198,9 +206,14 @@ if(USE_DUMPI) list(APPEND CODES_TARGETS model-net-dumpi-traces-dump) endif() +# ZMQ +pkg_check_modules(PC_ZeroMQ QUIET zmq) +find_path(ZeroMQ_INCLUDE_DIR NAMES zmq.hpp PATHS ${PC_ZeroMQ_INCLUDE_DIRS}) +find_library(ZeroMQ_LIBRARY NAMES zmq PATHS ${PC_ZeroMQ_LIBRARY_DIRS}) + foreach(tar IN LISTS CODES_TARGETS) target_include_directories(${tar} PUBLIC ${CODES_INCLUDE_DIRS} ${ROSS_INCLUDE_DIRS}) - target_link_libraries(${tar} PUBLIC codes ${LIBS_TO_LINK}) + target_link_libraries(${tar} PUBLIC codes ${LIBS_TO_LINK} zmqmlrequester ${ZeroMQ_LIBRARY}) endforeach() diff --git a/src/network-workloads/model-net-mpi-replay.c b/src/network-workloads/model-net-mpi-replay.c index eb1dc685..b2acc964 100644 --- a/src/network-workloads/model-net-mpi-replay.c +++ b/src/network-workloads/model-net-mpi-replay.c @@ -20,284 +20,17 @@ #include "codes/congestion-controller-core.h" +#include "codes/surrogate/director-client.h" /* ========================================================== START OF Director Code (To be moved to separate files) ========================================================== */ -struct -{ - int surr_iter_start; - int surr_iter_end; -} director_config_global; //struct director_config_struct dir_config_global; -#define DIR_MAX_PREDICTION 10 -#define NUM_DIR_TO_NW_EVENT 20 - -enum SIMULATION_MODE -{ - SIM_MODE_PDES=1, - SIM_MODE_ITERATION_SURROGATE, -}; - -typedef struct director_state director_state; -typedef struct director_message director_message; -typedef struct director_annotation director_annotation; - -enum DIR_EVENTS -{ - DIR_AN_ITER_MARK=1, - DIR_OP_NW, - DIR_REGISTERED_EVENT__SWITCH_TO_SURR, - DIR_REGISTERED_EVENT__SWITCH_TO_PDES, - DIR_REGISTERED_EVENT__MOVE_TO_NEXT, -}; - -enum DIR_OPERATIONS //currently unused -{ - DIR_AN_WK_START=1, - DIR_AN_WK_ITERATION_END, - DIR_AN_WK_END, - DIR_OP_SEND, - DIR_OP_RECV, -}; - -// state of the director LP -struct director_state -{ - tw_lpid director_id; - int simulation_mode; - - tw_stime predictions[DIR_MAX_PREDICTION]; - - void *nw_event_ptr[NUM_DIR_TO_NW_EVENT]; - int nw_event_size[NUM_DIR_TO_NW_EVENT]; - - tw_lpid nw_lpid; -}; - -// director event message struct -struct director_message -{ - int msg_type; - int op_type; - int num_rngs; - int value; - model_net_event_return event_rc; - struct codes_workload_op * mpi_op; - - void *buffer; // this pointer MUST be at the end of the structure -}; - -// director annotation struct -struct director_annotation -{ - int an_type; - int an_value; -}; - -/* Trigger Director Event from within network model*/ -static void codes_issue_director_event(tw_lp* lp, tw_lpid director_lpid, int dir_event_type, int value) -{ - - tw_event *e; - struct director_message* msg; - - tw_stime ts; - - ts = .0001; // Todo: maybe this should be dynamically adjustable - - e = tw_event_new(director_lpid, ts, lp ); - msg = (director_message*)tw_event_data(e); - - msg->msg_type = dir_event_type; - msg->value = value; - tw_event_send(e); -} - -/* Trigger CODES Event From Director */ -static void director_issue_codes_event(director_state * s, tw_lpid nw_lpid, int dir_registered_event_type, tw_stime ts, tw_lp* lp) -{ - - tw_event *e; - void* msg; - - //printf("==DIR: ts: %lf\n", ts); - e = tw_event_new(nw_lpid, ts, lp); - msg = (void*)tw_event_data(e); - - memcpy(msg, s->nw_event_ptr[dir_registered_event_type], s->nw_event_size[dir_registered_event_type]); - - //msg->msg_type = dir_registered_event_type; - tw_event_send(e); -} - -void director_register_events(director_state * s, director_message * msg, tw_lp * lp) -{ - int dir_registered_event_type = msg->msg_type; - int pdes_msg_size = msg->value; - - //printf("==DIR[%d] DIR Registering dir_event_type:%d (time: %lf)\n", - // s->director_id, dir_registered_event_type, tw_now(lp)); - - s->nw_event_size[dir_registered_event_type] = pdes_msg_size; - memcpy(s->nw_event_ptr[dir_registered_event_type], &msg->buffer, pdes_msg_size); - - //int pdes_event_type = msg->op_type; - //nw_message *buffer = &msg->buffer; - //nw_message *saved_msg = s->nw_event_ptr[dir_registered_event_type]; - //printf("==DIR s->director_id: %d | dir_registered_event_type: %d | pdes_event_type: %d (%d)\n", - // s->director_id, dir_registered_event_type, pdes_event_type, - // buffer->msg_type); -} - - - - -// initializes the director LP -void dir_test_init(director_state* s, tw_lp* lp) -{ - // initialize the LP's and load the data - memset(s, 0, sizeof(*s)); - s->simulation_mode = SIM_MODE_PDES; - s->director_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0); - - for(int i = 0; i < DIR_MAX_PREDICTION; i++){ - s->predictions[i] = (tw_stime) 1000000; - } - - for(int i = 0; i < NUM_DIR_TO_NW_EVENT; i++){ - s->nw_event_ptr[i] = (void*) calloc(1, g_tw_msg_sz); - } - - // get lp_id of the nw that matches this director - int num_nw_per_mgrp; - s->nw_lpid; - num_nw_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "nw-lp", NULL, 0); - codes_mapping_get_lp_id("MODELNET_GRP", "nw-lp", NULL, 1, s->director_id / num_nw_per_mgrp, s->director_id % num_nw_per_mgrp, &(s->nw_lpid)); - - // Get switching criteria from configuration - // if we're switch based on iteration - read iter start and end - // if switch based on virtual time - schedule sending switch event to CODES - // (stage 2) if switch based on accuracy - schedule polling for accuracy - // (stage 2) pass training data from CODES to surrogate - // (stage 3) using workload with network surrogates - - // Update global configs - if(s->director_id == 1) - { - director_config_global.surr_iter_start = 1; - director_config_global.surr_iter_end = 2; - } - //printf("\n==DIR s->director_id: %d | lp->gid: %llu | s->nw_lpid: %llu", s->director_id, LLU(lp->gid), LLU(s->nw_lpid)); - return; -} - -void dir_test_event_handler(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp) -{ - - switch(m->msg_type) - { - case DIR_OP_NW: - if(s->simulation_mode == SIM_MODE_PDES) - { - tw_error(TW_LOC, "DIR sent for non-annotation operation during PDES mode."); - } else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) - { - //printf("==DIR[%d] Skipping NW Op type:%d (time: %lf)\n", s->director_id, m->value, tw_now(lp)); - - tw_stime delay_ts = 0.001; - director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); - } - break; - - case DIR_AN_ITER_MARK: - //printf("==DIR[%d] Non-blocking call (time %lf)\n", s->director_id, tw_now(lp)); - //printf("==DIR[%d] s->predictions[%d]: %lf\n", s->director_id, m->value, s->predictions[m->value]); - //fprintf(iteration_log, "DIR %d (time %lf)\n", s->director_id, tw_now(lp)); - //printf("==DIR[%d] DIR_AN_ITER_MARK m->value: %d (time: %lf)\n", s->director_id, m->value, tw_now(lp)); - if(s->simulation_mode == SIM_MODE_PDES) - { - if(m->value == director_config_global.surr_iter_start) - { - //printf("==DIR[%d] Triggering switch to SURR (time: %lf)\n", s->director_id, tw_now(lp)); - - s->simulation_mode = SIM_MODE_ITERATION_SURROGATE; - tw_stime delay_ts = s->predictions[m->value]; - director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__SWITCH_TO_SURR, delay_ts, lp); - return; - } - else - { - tw_stime delay_ts = 0.001; - director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); - return; - } - } - else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) - { - if(m->value == director_config_global.surr_iter_end) - { - //printf("==DIR[%d] Triggering switch to PDES (time: %lf)\n", s->director_id, tw_now(lp)); - - s->simulation_mode = SIM_MODE_PDES; - tw_stime delay_ts = 0.001; - director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__SWITCH_TO_PDES, delay_ts, lp); - return; - } - else // we need to predict when the next iteration will start - { - tw_stime delay_ts = s->predictions[m->value]; - director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); - return; - } - } - else - { - tw_error(TW_LOC, "[DIR] Simulation mode unknown."); - } - - break; - - case DIR_REGISTERED_EVENT__SWITCH_TO_SURR: - case DIR_REGISTERED_EVENT__SWITCH_TO_PDES: - case DIR_REGISTERED_EVENT__MOVE_TO_NEXT: - director_register_events(s, m, lp); - break; - - default: - break; - } -} - -void dir_test_finalize(director_state* s, tw_lp* lp) -{ - //printf("\n==DIR: FINALIZED"); -} - -tw_lptype dir_lp = { - (init_f) dir_test_init, - (pre_run_f) NULL, - (event_f) dir_test_event_handler, - (revent_f) NULL, //dir_test_event_handler_rc, - (commit_f) NULL, //dir_test_event_handler_commit, - (final_f) dir_test_finalize, - (map_f) codes_mapping, - sizeof(director_state) -}; - - - - -/* ========================================================== - END OF Director Code (To be moved to separate files) - ========================================================== -*/ - @@ -723,6 +456,7 @@ void codes_register_director_events(nw_state* s, int dir_event_type, int nw_even registered_nw_msg.msg_type = nw_event_type; registered_nw_msg.op_type = event_value; registered_nw_msg.fwd.app_id = s->nw_id; + //registered_nw_msg.fwd.data_type = 9999; tw_stime ts = 0.001; @@ -747,6 +481,24 @@ void codes_register_director_events(nw_state* s, int dir_event_type, int nw_even tw_event_send(e); } +/* Trigger Director Event from within network model*/ +static void codes_issue_director_event(tw_lp* lp, tw_lpid director_lpid, int dir_event_type, int value) +{ + + tw_event *e; + struct director_message* msg; + + tw_stime ts; + + ts = .0001; // Todo: maybe this should be dynamically adjustable + + e = tw_event_new(director_lpid, ts, lp ); + msg = (director_message*)tw_event_data(e); + + msg->msg_type = dir_event_type; + msg->value = value; + tw_event_send(e); +} /* ========================================================== END of model-net replay DIRECTOR interface ========================================================== @@ -2747,17 +2499,22 @@ void nw_test_init(nw_state* s, tw_lp* lp) */ s->director_enabled = 0; s->simulation_mode = SIM_MODE_PDES; - int num_dir_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "dir-lp", NULL, 0); + int num_dir_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "dir-nw-lp", NULL, 0); if(num_dir_per_mgrp > 0){ s->director_enabled = 1; - codes_mapping_get_lp_id("MODELNET_GRP", "dir-lp", NULL, 1, s->nw_id / num_dir_per_mgrp, s->nw_id % num_dir_per_mgrp, &s->director_lpid); + codes_mapping_get_lp_id("MODELNET_GRP", "dir-nw-lp", NULL, 1, s->nw_id / num_dir_per_mgrp, s->nw_id % num_dir_per_mgrp, &s->director_lpid); + + // register callbacks with director + codes_register_director_events(s, DIR_REGISTERED_EVENT__SWITCH_TO_SURR, CODES_CMD_SWITCH_TO_SURR, sizeof(nw_message), lp); + codes_register_director_events(s, DIR_REGISTERED_EVENT__SWITCH_TO_PDES, CODES_CMD_SWITCH_TO_PDES, sizeof(nw_message), lp); + codes_register_director_events(s, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, MPI_OP_GET_NEXT, sizeof(nw_message), lp); //printf("\n==DIRNW s->nw_id: %d | lp->gid: %llu | s>director_lpid: %llu", s->nw_id, LLU(lp->gid), LLU(s->director_lpid)); + } - // register callbacks with director - codes_register_director_events(s, DIR_REGISTERED_EVENT__SWITCH_TO_SURR, CODES_CMD_SWITCH_TO_SURR, sizeof(nw_message), lp); - codes_register_director_events(s, DIR_REGISTERED_EVENT__SWITCH_TO_PDES, CODES_CMD_SWITCH_TO_PDES, sizeof(nw_message), lp); - codes_register_director_events(s, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, MPI_OP_GET_NEXT, sizeof(nw_message), lp); - /* + else{ + //printf("\n==NW[%d] DIRECTOR: No director LPs found (dir-nw-lp)\n", s->nw_id); + } + /* * END of DIRECTOR setup */ char type_name[512]; @@ -3082,6 +2839,8 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) break; case MPI_OP_GET_NEXT: + //if(m->fwd.data_type == 9999) + // printf("===[%llu] N-MARK[%llu]: Value=%d\n", lp->gid, 1, 9999); get_next_mpi_operation(s, bf, m, lp); break; @@ -3297,8 +3056,11 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l { //printf("===DIR: Value=%d\n", mpi_op->u.send.tag); codes_issue_director_event(lp, s->director_lpid, DIR_AN_ITER_MARK, mpi_op->u.send.tag); - //codes_issue_next_event(lp); + //printf("===[%llu] N-MARK[%llu]: Value=%d\n", lp->gid, s->director_lpid, mpi_op->u.send.tag); + return; + }else{ + codes_issue_next_event(lp); return; } } else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) // Else non-annotation type @@ -3978,7 +3740,8 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv ) if (g_st_ev_trace || g_st_model_stats || g_st_use_analysis_lps) nw_lp_register_model(); - lp_type_register("dir-lp", &dir_lp); // DIRECTOR addition - register type + director_lp_register_model("dir-nw-lp"); + net_ids = model_net_configure(&num_nets); // assert(num_nets == 1); net_id = *net_ids; diff --git a/src/surrogate/director-client.C b/src/surrogate/director-client.C new file mode 100644 index 00000000..c3738064 --- /dev/null +++ b/src/surrogate/director-client.C @@ -0,0 +1,505 @@ +#include +#include +#include +#include +#include + +#include +#include +#include // std::min_element +#include //std::fixed +#include // std::precision + +#include "codes/surrogate/director-client.h" +#include "codes/configuration.h" +#include "zmqmlrequester.h" + + +#define NUM_ACTIVE_CLIENTS 72 //TODO: this should be calculated at runtime + +#define DIR_ZMQ_CMD_LENGTH 15 +#define DIR_ZMQ_ARG_LENGTH 100 + +#define DIR_MAX_PREDICTION 5 +#define DIR_MAX_TRAINING_RECORDS 10 +#define DIR_MAX_DATA_SIZE 15 + +struct +{ + int surr_iter_start; + int surr_iter_end; +} director_config_global; + +typedef struct director_state director_state; + +// Some flag to relocate/clean-up +int evaluate_perf = 1; +int training_enabled = 0; //TODO: Move this to the LP state +int surrogate_enabled = 0; +int inferencing_enabled = 1; + + +std::vector total_elapsed_times; +std::vector zmq_processing_times; + +// state of the director LP +struct director_state +{ + tw_lpid director_id; + int simulation_mode; + + int training_cycle_id; + int training_record_id; + tw_stime training_data[DIR_MAX_TRAINING_RECORDS]; + //std::vector training_data_vc; + + int next_prediction_index; + tw_stime predictions[DIR_MAX_PREDICTION]; + + void *nw_event_ptr[NUM_DIR_TO_NW_EVENT]; + int nw_event_size[NUM_DIR_TO_NW_EVENT]; + + tw_lpid nw_lpid; +}; + +std::vector director_get_str_list(const char *s, const char delimiter) { + std::vector result; + std::stringstream ss (s); + std::string item; + + while (getline (ss, item, delimiter)) { result.push_back (item); } + return result; +} +std::string director_get_list_str(std::vector s, const char delimiter) { + std::ostringstream mergedstr; + std::copy(s.begin(), s.end(), std::ostream_iterator(mergedstr, " ")); + std::cout << mergedstr.str() << std::endl; + return mergedstr.str(); +} + +std::vector director_client_request( + const char* cmd, + const char* args, + const std::string data) +{ + std::vector ret; + /* + std::cout << cmd << " ARGS " << args << std::endl; + //if(strcmp(cmd, "send-records") == 0){ + std::cout << data << std::endl; + //} + */ + if(strcmp(cmd, "exit") == 0){ + ret = zmqml_request(cmd); + return ret; + } + + auto start_time = std::chrono::steady_clock::now(); // TODO - find a way to enclose this in evaluate_perf? + + std::vector args_list; + args_list = director_get_str_list(args, ';'); + ret = zmqml_request(cmd, args_list, data); + + if (evaluate_perf == 1){ + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration(end_time - start_time).count(); + + total_elapsed_times.push_back(duration); + zmq_processing_times.push_back( std::stod(ret[1]) ); + + if(zmq_processing_times.size() == NUM_ACTIVE_CLIENTS){ + double sum = 0; + for (double ts : zmq_processing_times) sum += ts; + double mean = sum / zmq_processing_times.size(); + double sum_sq_diff = 0; + for (double ts : zmq_processing_times) sum_sq_diff += (ts - mean) * (ts - mean); + double std_dev = sqrt(sum_sq_diff / zmq_processing_times.size()); + auto min = std::min_element(zmq_processing_times.begin(), zmq_processing_times.end()); + auto max = std::max_element(zmq_processing_times.begin(), zmq_processing_times.end()); + + zmq_processing_times.clear(); + + std::cout << std::setprecision(9) << std::fixed; + /* + std::cout << "ZMQ_VALS: "; + for(auto d: zmq_processing_times) + std::cout << d << " ;"; + std::cout << std::endl; + */ + std::cout << "==DIR_STATS zmq-processing: " << cmd + << " latency: mean = " << mean + << ", min = " << *min + << ", max = " << *max + << ", std-deviation = " << std_dev + << std::endl; + + double tsum = 0; + for (double ts : total_elapsed_times) tsum += ts; + double tmean = tsum / total_elapsed_times.size(); + double tsum_sq_diff = 0; + for (double ts : total_elapsed_times) tsum_sq_diff += (ts - tmean) * (ts - tmean); + double tstd_dev = sqrt(tsum_sq_diff / total_elapsed_times.size()); + auto tmin = std::min_element(total_elapsed_times.begin(), total_elapsed_times.end()); + auto tmax = std::max_element(total_elapsed_times.begin(), total_elapsed_times.end()); + /* + std::cout << "TOTAL_VALS: "; + for(auto d: total_elapsed_times) + std::cout << d << " ;"; + std::cout << std::endl; + */ + total_elapsed_times.clear(); + + std::cout << "==DIR_STATS zmq-total: " << cmd + << " latency: mean = " << tmean + << ", min = " << *tmin + << ", max = " << *tmax + << ", std-deviation = " << tstd_dev + << std::endl; + } + } + /* + std::cout << cmd << "|" << args << " | "; + for(auto s: ret) + std::cout << s << " ;"; + std::cout << std::endl; + */ + + return ret; +} + + +/* Trigger CODES Event From Director */ +static void director_issue_codes_event(director_state * s, tw_lpid nw_lpid, int dir_registered_event_type, tw_stime ts, tw_lp* lp) +{ + + tw_event *e; + void* msg; + + //printf("==DIR: ts: %lf\n", ts); + e = tw_event_new(nw_lpid, ts, lp); + msg = (void*)tw_event_data(e); + + memcpy(msg, s->nw_event_ptr[dir_registered_event_type], s->nw_event_size[dir_registered_event_type]); + + //msg->msg_type = dir_registered_event_type; + tw_event_send(e); +} + +void director_register_events(director_state * s, director_message * msg, tw_lp * lp) +{ + int dir_registered_event_type = msg->msg_type; + int pdes_msg_size = msg->value; + + //printf("==DIR[%d] DIR Registering dir_event_type:%d (time: %lf)\n", + // s->director_id, dir_registered_event_type, tw_now(lp)); + + s->nw_event_size[dir_registered_event_type] = pdes_msg_size; + memcpy(s->nw_event_ptr[dir_registered_event_type], &msg->buffer, pdes_msg_size); + + //int pdes_event_type = msg->op_type; + //nw_message *buffer = &msg->buffer; + //nw_message *saved_msg = s->nw_event_ptr[dir_registered_event_type]; + //printf("==DIR s->director_id: %d | dir_registered_event_type: %d | pdes_event_type: %d (%d)\n", + // s->director_id, dir_registered_event_type, pdes_event_type, + // buffer->msg_type); +} + + + + +// initializes the director LP +void director_init(director_state* s, tw_lp* lp) +{ + // initialize the LP's and load the data + memset(s, 0, sizeof(*s)); + s->simulation_mode = SIM_MODE_PDES; + s->director_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0); + + s->training_cycle_id = 0; + s->training_record_id = 1; + s->training_data[0] = tw_now(lp); + //s->training_data_vc.push_back(tw_now(lp)); + s->next_prediction_index = -1; + for(int i = 0; i < DIR_MAX_PREDICTION; i++){ + s->predictions[i] = (tw_stime) 1000000; + } + + for(int i = 0; i < NUM_DIR_TO_NW_EVENT; i++){ + s->nw_event_ptr[i] = (void*) calloc(1, g_tw_msg_sz); + } + + // get lp_id of the nw that matches this director + int num_nw_per_mgrp; + s->nw_lpid; + num_nw_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "nw-lp", NULL, 0); + codes_mapping_get_lp_id("MODELNET_GRP", "nw-lp", NULL, 1, s->director_id / num_nw_per_mgrp, s->director_id % num_nw_per_mgrp, &(s->nw_lpid)); + + // Get switching criteria from configuration + // if we're switch based on iteration - read iter start and end + // if switch based on virtual time - schedule sending switch event to CODES + // (stage 2) if switch based on accuracy - schedule polling for accuracy + // (stage 2) pass training data from CODES to surrogate + // (stage 3) using workload with network surrogates + + // Update global configs + if(s->director_id == 1) + { + int rc = 1, rc1 = 1, rc2 = 1; + rc = configuration_get_value_int(&config, "DIRECTOR", "surrogate_enabled", NULL, &surrogate_enabled); + if(rc) + surrogate_enabled = 0; + if(surrogate_enabled){ + rc1 = configuration_get_value_int(&config, "DIRECTOR", "start_iter", NULL, &director_config_global.surr_iter_start); + rc2 = configuration_get_value_int(&config, "DIRECTOR", "end_iter", NULL, &director_config_global.surr_iter_end); + if(rc1 || rc2){ + director_config_global.surr_iter_start = 100000; + director_config_global.surr_iter_end = 100001; + surrogate_enabled = 0; + } + } + + rc = configuration_get_value_int(&config, "DIRECTOR", "inferencing_enabled", NULL, &inferencing_enabled); + if(rc) + inferencing_enabled = 0; + + rc = configuration_get_value_int(&config, "DIRECTOR", "training_enabled", NULL, &training_enabled); + if(rc) + training_enabled = 0; + } + //printf("\n==DIR s->director_id: %d | lp->gid: %llu | s->nw_lpid: %llu", s->director_id, LLU(lp->gid), LLU(s->nw_lpid)); + + /* + char commandstr[DIR_ZMQ_CMD_LENGTH]; + char args[DIR_ZMQ_ARG_LENGTH]; + std::vector ret_vals; + + sprintf(commandstr,"cmd-%d", s->director_id); + sprintf(args, "2;args-1;"); + + ret_vals = director_client_request(commandstr, args, ""); + //printf("UNIVERSE: %s - %s\n", commandstr, ret_vals[0]); + */ + + return; +} + +void director_prepare_iteration_dataset(director_state* s, tw_stime * training_data, int training_cycle, int training_records) +{ + //printf("==DIR[%d] Training Cycle: %d\n", s->director_id, training_cycle); + + std::string processed_training_data_str; + int i, length = 0; + double tmp_data = 0.0; + char tmp_data_str[DIR_MAX_DATA_SIZE]; + int written = 0; + + // Prepare dataset + for(i = 1; i < training_records; i++) + { + tmp_data = training_data[i] - training_data[i-1]; + written += sprintf(tmp_data_str, "%.2f;", tmp_data); + //strcat(processed_training_data_str, tmp_data_str); + + processed_training_data_str.append(std::to_string(tmp_data)); + processed_training_data_str.append(" "); + // printf(" %3d: %lf [%lf]\n", i, training_data[i], tmp_data); + } + //std::cout << "Processed Data: " << processed_training_data_str << std::endl; + + // Send dataset + std::vector ret_vals; + char commandstr[DIR_ZMQ_CMD_LENGTH]; + char args[DIR_ZMQ_ARG_LENGTH]; + + sprintf(commandstr, "send-records"); + sprintf(args, "%d;%d;%d", 3, s->director_id, training_records - 1); // num-of-args;num-record + ret_vals = director_client_request(commandstr, args, processed_training_data_str); +} + +void director_get_surrogate_prediction(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp, tw_stime* delay_ts) +{ + // Check if we have sufficient predictions + if(s->next_prediction_index == -1){ // we need more + //printf("==DIR[%d] DIR Prediction -- generating set (time: %lf)\n", + // s->director_id, tw_now(lp)); + + if(inferencing_enabled){ + // Pull more predictions + std::vector ret_vals; + char commandstr[15]; + char args[100]; + + sprintf(commandstr, "do-inference"); + sprintf(args, "%d;%d;%d;", 3, s->director_id, DIR_MAX_PREDICTION); // num-of-args;num-record + std::string input_data = ("1000000 1000000 1000000 1000000"); + + ret_vals = director_client_request(commandstr, args, input_data); + /* + std::cout << "PREDICTIONS: " << commandstr + << " [0]" << ret_vals[0] + << " [1]" << ret_vals[1] + << " [2]" << ret_vals[2] + << std::endl; + */ + std::vector predictions = director_get_str_list(ret_vals[2].c_str(), ' '); + + //std::cout << "PREDICTIONS: " << predictions.size() << " | "; + int i = 0; + for(auto p: predictions){ + //std::cout << " " << std::stof(p); + s->predictions[i] = std::stof(p); + i += 1; + } + //std::cout << std::endl; + assert(i <= DIR_MAX_PREDICTION); + } + + s->next_prediction_index = 0; + } + *delay_ts = s->predictions[s->next_prediction_index]; + + s->next_prediction_index = s->next_prediction_index + 1; + + // Check if we've exhuasted the predictions + if(s->next_prediction_index == DIR_MAX_PREDICTION){ + s->next_prediction_index = -1; + } +} + + +void director_event_handler(director_state* s, tw_bf * bf, director_message * m, tw_lp * lp) +{ + + switch(m->msg_type) + { + case DIR_OP_NW: + if(s->simulation_mode == SIM_MODE_PDES) + { + tw_error(TW_LOC, "DIR sent for non-annotation operation during PDES mode."); + } else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) + { + //printf("==DIR[%d] Skipping NW Op type:%d (time: %lf)\n", s->director_id, m->value, tw_now(lp)); + + tw_stime delay_ts = 0.001; + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); + } + break; + + case DIR_AN_ITER_MARK: + //fprintf(iteration_log, "DIR %d (time %lf)\n", s->director_id, tw_now(lp)); + //printf("==DIR[%d] DIR_AN_ITER_MARK m->value: %d (time: %lf)\n", s->director_id, m->value, tw_now(lp)); + if(s->simulation_mode == SIM_MODE_PDES) + { + // Manage training data + if(training_enabled && s->training_record_id < DIR_MAX_TRAINING_RECORDS) + {// There is space to store more training data + s->training_data[s->training_record_id] = tw_now(lp); + //s->training_data_vc.push_back(tw_now(lp)); + s->training_record_id = s->training_record_id + 1; + } + if(training_enabled && s->training_record_id == DIR_MAX_TRAINING_RECORDS) + {// We've filled all training data slots + //printf("==DIR[%d] Sending training dataset (time: %lf)\n", s->director_id, tw_now(lp)); + + // Prepare and send training data + director_prepare_iteration_dataset(s, s->training_data, s->training_cycle_id, DIR_MAX_TRAINING_RECORDS); + + // Increment cycle counter, reset record counter, and prime dataset + s->training_cycle_id = s->training_cycle_id + 1; + s->training_record_id = 1; + s->training_data[0] = tw_now(lp); + } + if(surrogate_enabled && m->value == director_config_global.surr_iter_start) + { + //printf("==DIR[%d] Triggering switch to SURR (time: %lf)\n", s->director_id, tw_now(lp)); + + s->simulation_mode = SIM_MODE_ITERATION_SURROGATE; + tw_stime delay_ts; + director_get_surrogate_prediction(s, bf, m, lp, &delay_ts); + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__SWITCH_TO_SURR, delay_ts, lp); + return; + } + else + { + tw_stime delay_ts = 0.001; + //printf("===[%llu] D-MARK[%llu]: Value=%d\n", s->nw_lpid, lp->gid, m->value ); + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); + return; + } + } + else if(s->simulation_mode == SIM_MODE_ITERATION_SURROGATE) + { + if(m->value == director_config_global.surr_iter_end) + { + //printf("==DIR[%d] Triggering switch to PDES (time: %lf)\n", s->director_id, tw_now(lp)); + + s->simulation_mode = SIM_MODE_PDES; + tw_stime delay_ts = 0.001; + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__SWITCH_TO_PDES, delay_ts, lp); + + if(training_enabled){ + // Restart training data collection + //s->training_data_vc.clear(); + s->training_data[0] = tw_now(lp); + s->training_record_id = 1; + } + return; + } + else // we need to predict when the next iteration will start + { + tw_stime delay_ts; + director_get_surrogate_prediction(s, bf, m, lp, &delay_ts); + director_issue_codes_event(s, s->nw_lpid, DIR_REGISTERED_EVENT__MOVE_TO_NEXT, delay_ts, lp); + return; + } + } + else + { + tw_error(TW_LOC, "[DIR] Simulation mode unknown."); + } + + break; + + case DIR_REGISTERED_EVENT__SWITCH_TO_SURR: + case DIR_REGISTERED_EVENT__SWITCH_TO_PDES: + case DIR_REGISTERED_EVENT__MOVE_TO_NEXT: + director_register_events(s, m, lp); + break; + + default: + break; + } +} + +void director_finalize(director_state* s, tw_lp* lp) +{ + if (s->director_id == 0 && (training_enabled || inferencing_enabled)) + director_client_request("exit", "", ""); + + //printf("\n==DIR: FINALIZED"); +} + +tw_lptype dir_lp = { + (init_f) director_init, + (pre_run_f) NULL, + (event_f) director_event_handler, + (revent_f) NULL, //director_event_handler_rc, + (commit_f) NULL, //director_event_handler_commit, + (final_f) director_finalize, + (map_f) codes_mapping, + sizeof(director_state) +}; + +extern void director_lp_register_model(const char * dir_lp_name){ + int num_dir_per_mgrp = codes_mapping_get_lp_count ("MODELNET_GRP", 1, "dir-nw-lp", NULL, 0); + if(num_dir_per_mgrp > 0){ + lp_type_register(dir_lp_name, &dir_lp); // DIRECTOR addition - register type + //printf("\n==DIR: Registered\n"); + } +} + + +/* ========================================================== + END OF Director Code (To be moved to separate files) + ========================================================== +*/ From 138f46a7d8ebd70ab3157ed8ea348f1f4f5cf91b Mon Sep 17 00:00:00 2001 From: "Kevin A. Brown" Date: Fri, 16 Aug 2024 18:58:43 +0000 Subject: [PATCH 4/9] zmqml: update zmq server and requester to interface with director LP --- src/surrogate/zmqml/zmqmlrequester.cpp | 5 ++ src/surrogate/zmqml/zmqmlserver.py | 68 +++++++++++++++++++++++++- 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/src/surrogate/zmqml/zmqmlrequester.cpp b/src/surrogate/zmqml/zmqmlrequester.cpp index 6f43758c..a8ebbd53 100644 --- a/src/surrogate/zmqml/zmqmlrequester.cpp +++ b/src/surrogate/zmqml/zmqmlrequester.cpp @@ -20,6 +20,7 @@ using namespace rapidjson; static string endpoint = "tcp://localhost:5555"; static int debug = 0; + /** * See zmqmlrequester.h */ @@ -82,6 +83,10 @@ vector zmqml_request(const string& cmd, if (response.HasMember("id")) { ret.push_back(response["id"].GetString()); } + + if (response.HasMember("predictions")) { + ret.push_back(response["predictions"].GetString()); + } } else { ret.push_back("failed"); } diff --git a/src/surrogate/zmqml/zmqmlserver.py b/src/surrogate/zmqml/zmqmlserver.py index 066b0512..4e8105ea 100755 --- a/src/surrogate/zmqml/zmqmlserver.py +++ b/src/surrogate/zmqml/zmqmlserver.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # zmqmlserver : ZeroMQ-based ML task dispatching server @@ -13,6 +13,7 @@ import time from itertools import count # generate unit id # from dataclasses import dataclass +import numpy as np # TODO: abstract a mechanism to call training from runmlpacketdelay import run_mlpacketdelay_training @@ -24,13 +25,15 @@ endpoint = "tcp://*:5555" debug = False - +debug2 = False # # # launch_id = count(start=1) # unique for launched thread launched_threads = {} # id:obj. keep track of active threads. remove the thread once it finished +training_records = {} # client_id:[] + class LaunchCMD: def __init__(self): # thread event @@ -153,6 +156,61 @@ def receivedata(args, bindata): elapsed_time = time.time() - st return (status, elapsed_time) +# +# receive training records +# +def receiverecords(args, bindata): + status = "failed" + st = time.time() + + num_args = int(args[0]) # 1st arg is num of args + client = int(args[1]) # 2nd arg is client id + num_records = int(args[2]) # 3rd arg is num records + records_str = str(bindata.decode('utf-8')) + records_str = records_str.strip() + records = list(records_str.split(" ")) + + if client not in training_records: + training_records[client] = [] + + training_records[client].extend([float(s) for s in records]) + + if (debug2) and (client == 51): + print(f"Training records[51] :{training_records[client]}") + + status = "done" + elapsed_time = time.time() - st + return (status, elapsed_time) + + +# +# do inference to get predictions +# +def launch_surrogate_inferencing(args, bindata): + status = "failed" + st = time.time() + + num_args = int(args[0]) # 1st arg is num of args + client = int(args[1]) # 2nd arg is client id + num_steps = int(args[2]) # 3rd arg is num steps to predict + records_str = str(bindata.decode('utf-8')) + records_str = records_str.strip() + records = list(records_str.split(" ")) + + input_records = [float(s) for s in records] + + inferences = [] + for i in range(num_steps): + inferences.append(2000000.0) + + inferences_str = ' '.join([str(f) for f in inferences]) + + status = "done" + elapsed_time = time.time() - st + return (status, elapsed_time, inferences_str) + + +# # # main listener loop @@ -192,6 +250,12 @@ def zmq_cmd_listener(): destfn = args[0] (status, et) = receivedata(args, bindata) retmsg = {"status":status, "et":str(et)} + elif cmd == "send-records": + (status, et) = receiverecords(args, bindata) + retmsg = {"status":status, "et":str(et)} + elif cmd == "do-inference": + (status, et, predictions) = launch_surrogate_inferencing(args, bindata) + retmsg = {"status":status, "et":str(et), "predictions": predictions} # send response back to the requester socket.send_json(retmsg) From 39fbc4fb405087ba199a6264f051e056ab980b04 Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Thu, 21 May 2026 10:33:23 -0400 Subject: [PATCH 5/9] Fix zmq and ROSS compilation issues The kronos-develop-director-b branch of CODES was using an outdated version of ROSS and also had compilation issues because of zeromq. This commit changes it to be compatible with the master branch of ROSS and fixes the zeromq compilation issues. --- CODES-compile-instructions.sh | 173 ++++++++++++++++++++++++++++++++++ codes/surrogate/switch.h | 2 +- src/CMakeLists.txt | 2 +- src/surrogate/init.c | 6 +- src/surrogate/switch.c | 61 ++++++------ src/surrogate/zmqml/Makefile | 2 +- src/util/rc-stack.c | 2 +- 7 files changed, 213 insertions(+), 35 deletions(-) create mode 100644 CODES-compile-instructions.sh diff --git a/CODES-compile-instructions.sh b/CODES-compile-instructions.sh new file mode 100644 index 00000000..c477e738 --- /dev/null +++ b/CODES-compile-instructions.sh @@ -0,0 +1,173 @@ +#!/usr/bin/env bash +set -euo pipefail +set -x + +# Switches +swm_enable=0 +union_enable=0 +torch_enable=1 + +# Uncomment below for MPICH +#export PATH=/usr/local/mpich-4.1.2/bin/:"$PATH" +# Note: remember to compile MPICH with nemesis not with UCX support + +################## Actual scripts starts from here ################## + +# SWM has to be enabled for UNION to work +if [ $union_enable = 1 ]; then + swm_enable=1 +fi + +# What to compile +CUR_DIR="$PWD" + +##### Downloading everything ##### + +if [ ! -d codes/.git ]; then + git clone https://github.com/codes-org/codes --depth=100 --branch=v1.5.0 +else + echo "Using existing codes checkout: $(realpath codes)" +fi + +if [ ! -d ross/.git ]; then + git clone https://github.com/ross-org/ross --depth=100 --branch=v8.1.0 +else + echo "Using existing ross checkout: $(realpath ross)" +fi + +if [ $swm_enable = 1 ]; then + git clone https://github.com/pmodels/argobots --depth=1 + git clone https://github.com/codes-org/swm-workloads --branch=v1.2 +fi + +if [ $union_enable = 1 ]; then + # Downloading conceptual + curl -L https://sourceforge.net/projects/conceptual/files/conceptual/1.5.1b/conceptual-1.5.1b.tar.gz -o conceptual-1.5.1b.tar.gz + tar xvf conceptual-1.5.1b.tar.gz + # Downloading union + git clone https://github.com/SPEAR-UIC/Union + pushd Union && git checkout 99b3df3 && popd +fi + +##### COMPILING ##### + +mkdir -p ross/build +pushd ross/build +cmake .. -DROSS_BUILD_MODELS=ON -DCMAKE_INSTALL_PREFIX="$(realpath ./bin)" \ + -DCMAKE_C_COMPILER=mpicc -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS="-g -Wall" +#make VERBOSE=1 +make install -j4 +err=$? +[[ $err -ne 0 ]] && exit $err +popd + +if [ $swm_enable = 1 ]; then + pushd swm-workloads/swm + ./prepare.sh + mkdir -p build + pushd build + ../configure --disable-shared --prefix="$(realpath ./bin)" CC=mpicc CXX=mpicxx CFLAGS=-g CXXFLAGS=-g + #make V=1 && make install + make -j4 && make install + err=$? + [[ $err -ne 0 ]] && exit $err + popd && popd + + pushd argobots + ./autogen.sh + mkdir -p build + pushd build + #../configure --enable-debug=all --disable-fast --disable-shared --prefix="$(realpath ./bin)" CC=mpicc CXX=mpicxx CFLAGS=-g CXXFLAGS=-g + ../configure --disable-shared --prefix="$(realpath ./bin)" CC=mpicc CXX=mpicxx CFLAGS=-g CXXFLAGS=-g + #make V=1 && make install + make -j4 && make install + err=$? + [[ $err -ne 0 ]] && exit $err + popd && popd +fi + +if [ $union_enable = 1 ]; then + pushd conceptual-1.5.1b + PYTHON=python2 ./configure --prefix="$(realpath ./install)" LIBS=-lm + make -j4 && make install + err=$? + [[ $err -ne 0 ]] && exit $err + popd + + pushd Union + # Python 2 override. Union expects Python 2 ONLY + mkdir -p python-override + ln -s /usr/bin/python2 python-override/python + # compiling + ./prepare.sh + PYTHON=python2 ./configure --disable-shared --with-conceptual="$(realpath ../conceptual-1.5.1b/install)" --with-conceptual-src="$(realpath ../conceptual-1.5.1b)" --prefix="$(realpath ./install)" CC=mpicc CXX=mpicxx + PATH="$PWD/python-override:$PATH" make -j4 && make install + err=$? + [[ $err -ne 0 ]] && exit $err + popd +fi + + +# Build local ZMQML requester library required by director-client.C +pushd codes/src/surrogate/zmqml +make clean +make +test -f libzmqmlrequester.so +test -f zmqmlrequester.h +popd + +# Make imported zmqmlrequester target visible to doc/example and tests. +python3 - <<'INNERPY' +from pathlib import Path +cm = Path("codes/src/CMakeLists.txt") +text = cm.read_text() +old = "add_library(zmqmlrequester SHARED IMPORTED )" +new = "add_library(zmqmlrequester SHARED IMPORTED GLOBAL)" +if old in text: + cm.write_text(text.replace(old, new)) +elif new in text: + pass +else: + raise SystemExit("Could not find zmqmlrequester imported target line in codes/src/CMakeLists.txt") +INNERPY + +mkdir -p codes/build +pushd codes/build + +make_args_codes=( + -DCMAKE_PREFIX_PATH="$(realpath "$CUR_DIR/ross/build/bin")" + -DCMAKE_CXX_COMPILER=mpicxx -DCMAKE_C_COMPILER=mpicc + -DCMAKE_C_FLAGS="-g -Wall" + -DCMAKE_CXX_FLAGS="-g -Wall" + -DCMAKE_BUILD_TYPE=Debug -DBUILD_TESTING=ON + -DCMAKE_INSTALL_PREFIX="$(realpath bin)" + -DZMQML_BUILD_PATH="$(realpath "$CUR_DIR/codes/src/surrogate/zmqml")" + -DZeroMQ_INCLUDE_DIR=/usr/include + -DZeroMQ_LIBRARY=/usr/lib/x86_64-linux-gnu/libzmq.so +) +if [ $swm_enable = 1 ]; then + make_args_codes=( + "${make_args_codes[@]}" + -DSWM_PKG_CONFIG_PATH="$(realpath "$CUR_DIR/swm-workloads/swm/build/maint")" + -DARGOBOTS_PKG_CONFIG_PATH="$(realpath "$CUR_DIR/argobots/build/maint")" + ) +fi +if [ $union_enable = 1 ]; then + make_args_codes=( + "${make_args_codes[@]}" + -DUNION_PKG_CONFIG_PATH="$(realpath "$CUR_DIR/Union/install/lib/pkgconfig")" + ) +fi +if [ $torch_enable = 1 ]; then + make_args_codes=("${make_args_codes[@]}" -DUSE_TORCH=true) +else + make_args_codes=("${make_args_codes[@]}" -DUSE_TORCH=false) +fi + +cmake .. "${make_args_codes[@]}" +#make VERBOSE=1 +make -j4 +err=$? +[[ $err -ne 0 ]] && exit $err + +popd diff --git a/codes/surrogate/switch.h b/codes/surrogate/switch.h index 553f3a11..82a31cf4 100644 --- a/codes/surrogate/switch.h +++ b/codes/surrogate/switch.h @@ -61,7 +61,7 @@ extern struct switch_at_struct switch_at; // Switch -void director_switch(tw_pe * pe, tw_event_sig gvt_sig); +void director_switch(tw_pe * pe, bool past_end_time); #ifdef __cplusplus } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cd38259e..9439ce2f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -157,7 +157,7 @@ if(USE_ONLINE) endif() # ZMQML -add_library(zmqmlrequester SHARED IMPORTED ) +add_library(zmqmlrequester SHARED IMPORTED GLOBAL) set_target_properties(zmqmlrequester PROPERTIES IMPORTED_LOCATION "${ZMQML_BUILD_PATH}/libzmqmlrequester.so" INTERFACE_INCLUDE_DIRECTORIES "${ZMQML_BUILD_PATH}") diff --git a/src/surrogate/init.c b/src/surrogate/init.c index 79bb7e71..42049d72 100644 --- a/src/surrogate/init.c +++ b/src/surrogate/init.c @@ -64,14 +64,14 @@ void surrogate_configure( PRINTF_ONCE("\n"); // Injecting into ROSS the function to be called at GVT and the instant in time to trigger GVT - g_tw_gvt_arbitrary_fun = director_switch; + g_tw_gvt_hook = director_switch; #ifdef USE_RAND_TIEBREAKER tw_event_sig time_stamp = {0}; time_stamp.recv_ts = switch_at.time_stampts[0]; - tw_trigger_arbitrary_fun_at(time_stamp); + tw_trigger_gvt_hook_at_event_sig(time_stamp); #else - tw_trigger_arbitrary_fun_at(switch_at.time_stampts[0]); + tw_trigger_gvt_hook_at(switch_at.time_stampts[0]); #endif // freeing timestamps before it dissapears diff --git a/src/surrogate/switch.c b/src/surrogate/switch.c index 4b29ab18..a906e152 100644 --- a/src/surrogate/switch.c +++ b/src/surrogate/switch.c @@ -79,9 +79,9 @@ static void rollback_and_cancel_events_pe(tw_pe * pe, tw_event_sig gvt_sig) { tw_stime const gvt = gvt_sig.recv_ts; // Backtracking the simulation to GVT for (unsigned int i = 0; i < g_tw_nkp; i++) { - tw_kp_rollback_to_sig(g_tw_kp[i], gvt_sig); + tw_kp_rollback_to_sig(g_tw_kp[i], &gvt_sig); } - assert(tw_event_sig_compare(pe->GVT_sig, gvt_sig) == 0); + assert(tw_event_sig_compare_ptr(&pe->GVT_sig, &gvt_sig) == 0); assert(pe->GVT_sig.recv_ts == gvt); // redundant but needed because compiler cries that gvt is never used #else static void rollback_and_cancel_events_pe(tw_pe * pe, tw_stime gvt) { @@ -100,10 +100,7 @@ static void rollback_and_cancel_events_pe(tw_pe * pe, tw_stime gvt) { pe->stats.s_net_read += tw_clock_read() - start; } - pe->gvt_status = 1; - tw_sched_event_q(pe); - tw_sched_cancel_q(pe); - tw_gvt_step2(pe); + tw_scheduler_rollback_and_cancel_events_pe(pe); if (DEBUG_DIRECTOR > 1) { printf("PE %lu: Time stamp at the end of GVT time: %f - AVL-tree sized: %d\n", g_tw_mynode, gvt, pe->avl_tree_size); @@ -146,7 +143,7 @@ static void shift_events_to_future_pe(tw_pe * pe, tw_stime gvt) { // Filtering events to freeze assert(next_event->prev == NULL); #ifdef USE_RAND_TIEBREAKER - assert(tw_event_sig_compare(next_event->sig, gvt_sig) >= 0); + assert(tw_event_sig_compare_ptr(&next_event->sig, &gvt_sig) >= 0); #else assert(next_event->recv_ts >= gvt); #endif @@ -165,11 +162,11 @@ static void shift_events_to_future_pe(tw_pe * pe, tw_stime gvt) { next_event->recv_ts += switch_offset; next_event->sig.recv_ts = next_event->recv_ts; } - assert(next_event->recv_ts >= g_tw_trigger_arbitrary_fun.sig_at.recv_ts); + assert(next_event->recv_ts >= g_tw_gvt_hook_trigger.sig_at.recv_ts); #else next_event->recv_ts += switch_offset; } - assert(next_event->recv_ts >= g_tw_trigger_arbitrary_fun.at); + assert(next_event->recv_ts >= g_tw_gvt_hook_trigger.at); #endif // store event in deque_events to inject immediately back to the queue @@ -382,11 +379,12 @@ static void events_surrogate_to_high_def_switch(tw_pe * pe, tw_stime gvt) { } +void director_switch(tw_pe * pe, bool past_end_time) { #ifdef USE_RAND_TIEBREAKER -void director_switch(tw_pe * pe, tw_event_sig gvt_sig) { + tw_event_sig const gvt_sig = pe->GVT_sig; tw_stime const gvt = gvt_sig.recv_ts; #else -void director_switch(tw_pe * pe, tw_stime gvt) { + tw_stime const gvt = pe->GVT; #endif assert(is_surrogate_configured); @@ -400,15 +398,18 @@ void director_switch(tw_pe * pe, tw_stime gvt) { printf("GVT %d at %f in %s arbitrary-fun-status=", i++, gvt, surr_config.director.is_surrogate_on() ? "surrogate-mode" : "high-definition"); - switch (g_tw_trigger_arbitrary_fun.active) { - case ARBITRARY_FUN_enabled: - printf("enabled\n"); + switch (g_tw_gvt_hook_trigger.status) { + case GVT_HOOK_STATUS_timestamp: + printf("timestamp\n"); break; - case ARBITRARY_FUN_disabled: + case GVT_HOOK_STATUS_disabled: printf("disabled\n"); break; - case ARBITRARY_FUN_triggered: - printf("triggered\n"); + case GVT_HOOK_STATUS_every_n_gvt: + printf("every-n-gvt\n"); + break; + case GVT_HOOK_STATUS_model_call: + printf("model-call\n"); break; } } @@ -430,16 +431,20 @@ void director_switch(tw_pe * pe, tw_stime gvt) { return; } - // Detecting if we are going to switch - if (switch_at.current_i < switch_at.total - && g_tw_trigger_arbitrary_fun.active == ARBITRARY_FUN_triggered) { + // Detecting if we are going to switch. + // + // Newer ROSS calls g_tw_gvt_hook only after the timestamp trigger fires, + // and it sets g_tw_gvt_hook_trigger.status back to GVT_HOOK_STATUS_disabled + // before entering this hook. Therefore, do not check for the old + // ARBITRARY_FUN_triggered state here; it no longer exists. + if (switch_at.current_i < switch_at.total) { double const switch_time = switch_at.time_stampts[switch_at.current_i]; #ifdef USE_RAND_TIEBREAKER - assert(g_tw_trigger_arbitrary_fun.sig_at.recv_ts == switch_at.time_stampts[switch_at.current_i]); + assert(g_tw_gvt_hook_trigger.sig_at.recv_ts == switch_time); #else - assert(g_tw_trigger_arbitrary_fun.at == switch_at.time_stampts[switch_at.current_i]); + assert(g_tw_gvt_hook_trigger.at == switch_time); #endif - assert(gvt >= switch_time); // current gvt shouldn't be that far ahead from the point we wanted to trigger it + assert(gvt >= switch_time); // current gvt should not be before the requested switch time } else { return; } @@ -457,10 +462,10 @@ void director_switch(tw_pe * pe, tw_stime gvt) { // Rollback if in optimistic mode #ifdef USE_RAND_TIEBREAKER if (g_tw_synchronization_protocol == OPTIMISTIC) { - assert(tw_event_sig_compare(pe->GVT_sig, gvt_sig) == 0); + assert(tw_event_sig_compare_ptr(&pe->GVT_sig, &gvt_sig) == 0); rollback_and_cancel_events_pe(pe, gvt_sig); - //assert(tw_event_sig_compare(pe->GVT_sig, gvt_sig) <= 0); - assert(tw_event_sig_compare(pe->GVT_sig, gvt_sig) == 0); + //assert(tw_event_sig_compare_ptr(&pe->GVT_sig, &gvt_sig) <= 0); + assert(tw_event_sig_compare_ptr(&pe->GVT_sig, &gvt_sig) == 0); } #else if (g_tw_synchronization_protocol == OPTIMISTIC) { @@ -502,10 +507,10 @@ void director_switch(tw_pe * pe, tw_stime gvt) { tw_event_sig time_stamp = {0}; time_stamp.recv_ts = next_switch; //printf("Adding a trigger to activate next switch!\n"); - tw_trigger_arbitrary_fun_at(time_stamp); + tw_trigger_gvt_hook_at_event_sig(time_stamp); #else //printf("Adding a trigger to activate next switch!\n"); - tw_trigger_arbitrary_fun_at(next_switch); + tw_trigger_gvt_hook_at(next_switch); #endif } diff --git a/src/surrogate/zmqml/Makefile b/src/surrogate/zmqml/Makefile index 4c28ed54..b4abcfab 100644 --- a/src/surrogate/zmqml/Makefile +++ b/src/surrogate/zmqml/Makefile @@ -7,7 +7,7 @@ TARGETS=libzmqmlrequester.so demozmqmlrequester all: $(TARGETS) libzmqmlrequester.so: zmqmlrequester.o - $(CXX) -shared -o $@ $^ + $(CXX) -shared -o $@ $^ $(LDFLAGS) zmqmlrequester.o: zmqmlrequester.cpp zmqmlrequester.h $(CXX) $(CXXFLAGS) -fPIC -c $< -o $@ diff --git a/src/util/rc-stack.c b/src/util/rc-stack.c index ebb2131f..5f68123e 100644 --- a/src/util/rc-stack.c +++ b/src/util/rc-stack.c @@ -107,7 +107,7 @@ void rc_stack_gc(tw_lp const *lp, struct rc_stack *s) { while (ent != &s->head) { rc_entry *r = qlist_entry(ent, rc_entry, ql); #ifdef USE_RAND_TIEBREAKER - if (lp == NULL || tw_event_sig_compare(r->e_sig, lp->pe->GVT_sig) == -1) { + if (lp == NULL || tw_event_sig_compare_ptr(&r->e_sig, &lp->pe->GVT_sig) == -1) { #else if (lp == NULL || r->time < lp->pe->GVT){ #endif From 0651b5ec34855b6ed258f046363682ff3ec1befa Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Thu, 21 May 2026 13:58:01 -0400 Subject: [PATCH 6/9] Fix torch-jit compilation Compilation with torch-jit was not occuring even with torch_enable set to 1. This commit fixes torch-jit compilation with GPU support. --- CODES-compile-instructions.sh | 115 +++++++++++++++++++++++++++++++++- 1 file changed, 112 insertions(+), 3 deletions(-) diff --git a/CODES-compile-instructions.sh b/CODES-compile-instructions.sh index c477e738..46f27b8c 100644 --- a/CODES-compile-instructions.sh +++ b/CODES-compile-instructions.sh @@ -134,8 +134,99 @@ INNERPY mkdir -p codes/build pushd codes/build +torch_cmake_prefix="" +torch_dir="" + +if [ "$torch_enable" = 1 ]; then + torch_cmake_prefix="$(python3 - <<'INNERPY' +import torch +print(torch.utils.cmake_prefix_path) +INNERPY +)" + torch_dir="${torch_cmake_prefix}/Torch" + + if [ ! -f "${torch_dir}/TorchConfig.cmake" ]; then + echo "ERROR: TorchConfig.cmake not found at: ${torch_dir}/TorchConfig.cmake" >&2 + echo " torch.utils.cmake_prefix_path returned: ${torch_cmake_prefix}" >&2 + exit 1 + fi + + echo "Using Torch CMake prefix: ${torch_cmake_prefix}" + echo "Using Torch_DIR: ${torch_dir}" + + # Optional CUDA toolkit override for CUDA-enabled PyTorch. + # Set CUDA_HOME before running this script, e.g.: + # export CUDA_HOME=/usr/local/cuda-12.4 + # or: + # export CUDA_HOME=/usr/local/cuda + if python3 - <<'INNERPY' +import torch, sys +sys.exit(0 if torch.version.cuda is not None else 1) +INNERPY + then + if [ -z "${CUDA_HOME:-}" ]; then + if [ -d /usr/local/cuda ]; then + CUDA_HOME=/usr/local/cuda + else + echo "ERROR: CUDA-enabled PyTorch detected, but CUDA_HOME is not set and /usr/local/cuda does not exist." >&2 + echo " Set CUDA_HOME to your CUDA toolkit root, e.g. /usr/local/cuda-12.4." >&2 + exit 1 + fi + fi + + if [ ! -f "${CUDA_HOME}/include/cuda_runtime_api.h" ]; then + echo "ERROR: Missing CUDA header: ${CUDA_HOME}/include/cuda_runtime_api.h" >&2 + exit 1 + fi + + if [ ! -f "${CUDA_HOME}/lib64/libcudart.so" ] && [ ! -f "${CUDA_HOME}/lib/libcudart.so" ]; then + echo "ERROR: Missing CUDA runtime library under ${CUDA_HOME}/lib64 or ${CUDA_HOME}/lib" >&2 + exit 1 + fi + + if [ ! -x "${CUDA_HOME}/bin/nvcc" ]; then + echo "ERROR: Missing CUDA compiler: ${CUDA_HOME}/bin/nvcc" >&2 + exit 1 + fi + + if [ ! -d "${CUDA_HOME}/nvvm/libdevice" ]; then + echo "ERROR: Missing CUDA libdevice directory: ${CUDA_HOME}/nvvm/libdevice" >&2 + exit 1 + fi + + cuda_arch="" + if command -v nvidia-smi >/dev/null 2>&1; then + cuda_arch="$(nvidia-smi --query-gpu=compute_cap --format=csv,noheader 2>/dev/null | head -n1 | tr -d '.[:space:]' || true)" + fi + + if [ -z "${cuda_arch}" ]; then + echo "WARNING: Could not auto-detect GPU compute capability with nvidia-smi." >&2 + echo " Falling back to CMAKE_CUDA_ARCHITECTURES=80." >&2 + cuda_arch="80" + fi + + export CUDA_HOME + export CUDA_PATH="${CUDA_HOME}" + export CUDA_ROOT="${CUDA_HOME}" + export CUDA_TOOLKIT_ROOT_DIR="${CUDA_HOME}" + export CUDAToolkit_ROOT="${CUDA_HOME}" + export CUDACXX="${CUDA_HOME}/bin/nvcc" + export PATH="${CUDA_HOME}/bin:${PATH}" + export LD_LIBRARY_PATH="${CUDA_HOME}/lib64:${CUDA_HOME}/lib:${LD_LIBRARY_PATH:-}" + + echo "Using CUDA_HOME: ${CUDA_HOME}" + echo "Using CUDACXX: ${CUDACXX}" + echo "Using CMAKE_CUDA_ARCHITECTURES=${cuda_arch}" + fi +fi + +cmake_prefix_path="$(realpath "$CUR_DIR/ross/build/bin")" +if [ "$torch_enable" = 1 ]; then + cmake_prefix_path="${cmake_prefix_path};${torch_cmake_prefix}" +fi + make_args_codes=( - -DCMAKE_PREFIX_PATH="$(realpath "$CUR_DIR/ross/build/bin")" + -DCMAKE_PREFIX_PATH="${cmake_prefix_path}" -DCMAKE_CXX_COMPILER=mpicxx -DCMAKE_C_COMPILER=mpicc -DCMAKE_C_FLAGS="-g -Wall" -DCMAKE_CXX_FLAGS="-g -Wall" @@ -158,8 +249,26 @@ if [ $union_enable = 1 ]; then -DUNION_PKG_CONFIG_PATH="$(realpath "$CUR_DIR/Union/install/lib/pkgconfig")" ) fi -if [ $torch_enable = 1 ]; then - make_args_codes=("${make_args_codes[@]}" -DUSE_TORCH=true) +if [ "$torch_enable" = 1 ]; then + make_args_codes=( + "${make_args_codes[@]}" + -DUSE_TORCH=true + -DTorch_DIR="${torch_dir}" + ) + + if [ -n "${CUDA_HOME:-}" ]; then + make_args_codes=( + "${make_args_codes[@]}" + -DCUDA_TOOLKIT_ROOT_DIR="${CUDA_HOME}" + -DCUDAToolkit_ROOT="${CUDA_HOME}" + -DCUDA_PATH="${CUDA_HOME}" + -DCUDA_ROOT="${CUDA_HOME}" + -DCMAKE_CUDA_COMPILER="${CUDA_HOME}/bin/nvcc" + -DCMAKE_CUDA_ARCHITECTURES="${cuda_arch}" + -DCUDA_INCLUDE_DIRS="${CUDA_HOME}/include" + -DCUDA_CUDART_LIBRARY="${CUDA_HOME}/lib64/libcudart.so" + ) + fi else make_args_codes=("${make_args_codes[@]}" -DUSE_TORCH=false) fi From 01a2b16c3bf4c38990262808ecc9bee8c9be4477 Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Thu, 21 May 2026 14:50:42 -0400 Subject: [PATCH 7/9] Allow cpu-based PyTorch usage --- CODES-compile-instructions.sh | 72 ++++++++++++++++++++++++----------- 1 file changed, 50 insertions(+), 22 deletions(-) diff --git a/CODES-compile-instructions.sh b/CODES-compile-instructions.sh index 46f27b8c..1ba6f3c6 100644 --- a/CODES-compile-instructions.sh +++ b/CODES-compile-instructions.sh @@ -154,47 +154,56 @@ INNERPY echo "Using Torch CMake prefix: ${torch_cmake_prefix}" echo "Using Torch_DIR: ${torch_dir}" - # Optional CUDA toolkit override for CUDA-enabled PyTorch. - # Set CUDA_HOME before running this script, e.g.: + # CUDA is intentionally opt-in. + # Default to CPU-only Torch-JIT compilation unless CUDA_HOME is explicitly set. + # + # To enable CUDA, run for example: # export CUDA_HOME=/usr/local/cuda-12.4 - # or: - # export CUDA_HOME=/usr/local/cuda - if python3 - <<'INNERPY' -import torch, sys -sys.exit(0 if torch.version.cuda is not None else 1) + # ./CODES-compile-instructions.sh + torch_cuda_version="$(python3 - <<'INNERPY' +import torch +print(torch.version.cuda or "") INNERPY - then - if [ -z "${CUDA_HOME:-}" ]; then - if [ -d /usr/local/cuda ]; then - CUDA_HOME=/usr/local/cuda - else - echo "ERROR: CUDA-enabled PyTorch detected, but CUDA_HOME is not set and /usr/local/cuda does not exist." >&2 - echo " Set CUDA_HOME to your CUDA toolkit root, e.g. /usr/local/cuda-12.4." >&2 - exit 1 - fi - fi +)" + + cuda_arch="" + if [ -z "${CUDA_HOME:-}" ] && [ -n "${torch_cuda_version}" ]; then + echo "ERROR: CUDA_HOME is not set, so this script is defaulting to CPU-only Torch-JIT compilation." >&2 + echo " However, the active Python environment has a CUDA-enabled PyTorch build:" >&2 + echo " torch.version.cuda=${torch_cuda_version}" >&2 + echo "" >&2 + echo " CMake cannot use a CUDA-enabled PyTorch package as a CPU-only LibTorch package." >&2 + echo " Choose one of the following:" >&2 + echo " 1. For CPU-only compilation, install a CPU-only PyTorch build in this environment." >&2 + echo " 2. For CUDA compilation, export CUDA_HOME to your CUDA toolkit root." >&2 + echo "" >&2 + echo " Example CUDA build:" >&2 + echo " export CUDA_HOME=/usr/local/cuda-12.4" >&2 + echo " bash CODES-compile-instructions.sh" >&2 + exit 1 + fi + if [ -n "${CUDA_HOME:-}" ]; then if [ ! -f "${CUDA_HOME}/include/cuda_runtime_api.h" ]; then - echo "ERROR: Missing CUDA header: ${CUDA_HOME}/include/cuda_runtime_api.h" >&2 + echo "ERROR: CUDA_HOME is set, but missing CUDA header: ${CUDA_HOME}/include/cuda_runtime_api.h" >&2 exit 1 fi if [ ! -f "${CUDA_HOME}/lib64/libcudart.so" ] && [ ! -f "${CUDA_HOME}/lib/libcudart.so" ]; then - echo "ERROR: Missing CUDA runtime library under ${CUDA_HOME}/lib64 or ${CUDA_HOME}/lib" >&2 + echo "ERROR: CUDA_HOME is set, but missing CUDA runtime library under ${CUDA_HOME}/lib64 or ${CUDA_HOME}/lib" >&2 exit 1 fi if [ ! -x "${CUDA_HOME}/bin/nvcc" ]; then - echo "ERROR: Missing CUDA compiler: ${CUDA_HOME}/bin/nvcc" >&2 + echo "ERROR: CUDA_HOME is set, but missing CUDA compiler: ${CUDA_HOME}/bin/nvcc" >&2 exit 1 fi if [ ! -d "${CUDA_HOME}/nvvm/libdevice" ]; then - echo "ERROR: Missing CUDA libdevice directory: ${CUDA_HOME}/nvvm/libdevice" >&2 + echo "ERROR: CUDA_HOME is set, but missing CUDA libdevice directory: ${CUDA_HOME}/nvvm/libdevice" >&2 exit 1 fi - cuda_arch="" if command -v nvidia-smi >/dev/null 2>&1; then cuda_arch="$(nvidia-smi --query-gpu=compute_cap --format=csv,noheader 2>/dev/null | head -n1 | tr -d '.[:space:]' || true)" fi @@ -214,9 +223,22 @@ INNERPY export PATH="${CUDA_HOME}/bin:${PATH}" export LD_LIBRARY_PATH="${CUDA_HOME}/lib64:${CUDA_HOME}/lib:${LD_LIBRARY_PATH:-}" + echo "CUDA_HOME is set; enabling CUDA Torch-JIT compilation." echo "Using CUDA_HOME: ${CUDA_HOME}" echo "Using CUDACXX: ${CUDACXX}" echo "Using CMAKE_CUDA_ARCHITECTURES=${cuda_arch}" + else + echo "CUDA_HOME is not set; forcing CPU-only Torch-JIT compilation." + + # Prevent accidental CUDA discovery from /usr/local/cuda, nvcc on PATH, + # inherited CMake cache variables, or CUDA-enabled PyTorch metadata. + unset CUDA_HOME + unset CUDA_PATH + unset CUDA_ROOT + unset CUDA_TOOLKIT_ROOT_DIR + unset CUDAToolkit_ROOT + unset CUDACXX + unset CMAKE_CUDA_COMPILER fi fi @@ -268,6 +290,12 @@ if [ "$torch_enable" = 1 ]; then -DCUDA_INCLUDE_DIRS="${CUDA_HOME}/include" -DCUDA_CUDART_LIBRARY="${CUDA_HOME}/lib64/libcudart.so" ) + else + make_args_codes=( + "${make_args_codes[@]}" + -DCMAKE_DISABLE_FIND_PACKAGE_CUDA=ON + -DCMAKE_DISABLE_FIND_PACKAGE_CUDAToolkit=ON + ) fi else make_args_codes=("${make_args_codes[@]}" -DUSE_TORCH=false) From e42e75a89c80882558f7b2f40c8d6e8900833ace Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Wed, 27 May 2026 10:48:25 -0400 Subject: [PATCH 8/9] Move ML models to surrogate directory --- .../ml_models}/combine_packet_latency_traces_across_ranks.py | 0 .../surrogate/ml_models}/train_packet_latency_torchjit.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename {ml_models => src/surrogate/ml_models}/combine_packet_latency_traces_across_ranks.py (100%) rename {ml_models => src/surrogate/ml_models}/train_packet_latency_torchjit.py (100%) diff --git a/ml_models/combine_packet_latency_traces_across_ranks.py b/src/surrogate/ml_models/combine_packet_latency_traces_across_ranks.py similarity index 100% rename from ml_models/combine_packet_latency_traces_across_ranks.py rename to src/surrogate/ml_models/combine_packet_latency_traces_across_ranks.py diff --git a/ml_models/train_packet_latency_torchjit.py b/src/surrogate/ml_models/train_packet_latency_torchjit.py similarity index 100% rename from ml_models/train_packet_latency_torchjit.py rename to src/surrogate/ml_models/train_packet_latency_torchjit.py From dc0936faf03d3dfb47f455fb565d73e473cf4256 Mon Sep 17 00:00:00 2001 From: Sanjay Chari Date: Mon, 1 Jun 2026 11:10:32 -0400 Subject: [PATCH 9/9] Make zeromq dependency handling robust --- CODES-compile-instructions.sh | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/CODES-compile-instructions.sh b/CODES-compile-instructions.sh index 1ba6f3c6..9eb00143 100644 --- a/CODES-compile-instructions.sh +++ b/CODES-compile-instructions.sh @@ -108,6 +108,32 @@ if [ $union_enable = 1 ]; then fi + +# Make system pkg-config metadata visible even when Conda's pkg-config is active. +# This is needed for libzmq.pc on systems where ZeroMQ is installed through the OS +# but the active Conda environment's pkg-config only searches Conda pkgconfig dirs. +if ! pkg-config --exists libzmq 2>/dev/null; then + for pcdir in \ + /usr/lib/x86_64-linux-gnu/pkgconfig \ + /usr/lib64/pkgconfig \ + /usr/lib/pkgconfig \ + /usr/local/lib/pkgconfig \ + /usr/local/lib64/pkgconfig \ + /opt/homebrew/lib/pkgconfig \ + /usr/share/pkgconfig + do + if [ -d "$pcdir" ]; then + export PKG_CONFIG_PATH="$pcdir:${PKG_CONFIG_PATH:-}" + fi + done +fi + +if ! pkg-config --exists libzmq 2>/dev/null; then + echo "WARNING: pkg-config still cannot find libzmq.pc." >&2 + echo " If ZMQML fails to build, install the ZeroMQ development package" >&2 + echo " or set PKG_CONFIG_PATH to the directory containing libzmq.pc." >&2 +fi + # Build local ZMQML requester library required by director-client.C pushd codes/src/surrogate/zmqml make clean