-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmap_reduce.cpp
More file actions
123 lines (104 loc) · 5.85 KB
/
map_reduce.cpp
File metadata and controls
123 lines (104 loc) · 5.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// This is an open source non-commercial project. Dear PVS-Studio, please check it.
// PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
#include "map_reduce.h"
#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/use_future.hpp>
#include "ssh/node.h"
#include "util.h"
#include "configurator/config.h"
#include "json_server.h"
namespace map_reduce {
static std::future<std::vector<std::pair<std::unique_ptr<KeyValueType>, std::unique_ptr<KeyValueType>>>>
get_result(const std::shared_ptr<job_config> &cfg) {
std::promise<std::vector<std::pair<std::unique_ptr<KeyValueType>, std::unique_ptr<KeyValueType >>>> promise;
auto future = promise.get_future();
std::thread([cfg](
std::promise<std::vector<std::pair<std::unique_ptr<KeyValueType>, std::unique_ptr<KeyValueType >>>> promise) {
boost::asio::io_context io_service;
std::vector<std::pair<std::unique_ptr<KeyValueType>, std::unique_ptr<KeyValueType>>> res;
auto json_handler = std::make_shared<std::function<void(const std::string &)>>(
[&](const std::string &json) mutable {
try {
auto[key, value] = get_key_value_from_json(json, cfg->key_out_factory,
cfg->value_res_factory);
res.emplace_back(std::move(key), std::move(value));
} catch (data_ended_error &e) {
promise.set_value(std::move(res));
}
});
json_server s(io_service, 8002, json_handler);
io_service.run();
}, std::move(promise)).detach();
return future;
}
static void
send_config(ssh::node &n, const fs::path &base_directory, const fs::path &dll_path,
const std::string &config_name) {
n.execute_command("mkdir " + ("~" / base_directory).string() + " 2> /dev/null", false);
n.scp_send_file(dll_path, base_directory / config_name);
n.execute_command("chmod +x " + ("~" / base_directory / config_name).string(), false);
}
void
run_reduce_node(const std::string &reduce_address, const std::string &master_address, const size_t map_num,
const fs::path &base_directory, const fs::path &dll_path) {
auto[reduce_ip, reduce_port] = parse_ip_port(reduce_address);
auto[master_ip, master_port] = parse_ip_port(master_address);
const fs::path reduce_node_path("reduce_node");
ssh::node reduce_node(reduce_ip);
reduce_node.connect();
send_config(reduce_node, base_directory, dll_path, "libreduce_config.so");
reduce_node.execute_command(
"cd " + ("~" / base_directory).string() +
"&& export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:" + ("~" / base_directory).string() +
"&& nohup " + reduce_node_path.string() +
" --input_num=" + std::to_string(map_num) +
" --master_node_address=" + master_ip + ":" + std::to_string(master_port) +
" --port=" + std::to_string(reduce_port) +
" --config_file=libreduce_config.so > reduce.out 2> reduce.err < /dev/null &",
false);
}
void run_map_nodes(const std::vector<std::string> &map_ips, const std::string &reduce_address,
const std::vector<fs::path> &map_input_file, const fs::path &base_directory,
const fs::path &dll_path) {
auto[reduce_ip, reduce_port] = parse_ip_port(reduce_address);
const fs::path map_node_path("map_node");
for (size_t i = 0; i < map_ips.size(); i++) {
ssh::node map_node(map_ips[i]);
map_node.connect();
send_config(map_node, base_directory, dll_path, "libmap_config.so");
map_node.execute_command(
"cd " + ("~" / base_directory).string() +
"&& export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:" + ("~" / base_directory).string() +
"&& " + map_node_path.string() +
" --input_file=" + map_input_file[i].string() +
" --reduce_node_address=" + reduce_ip + ":" + std::to_string(reduce_port) +
" --config_file=libmap_config.so > map.out 2> map.err < /dev/null",
false);
}
}
std::future<std::vector<std::pair<std::unique_ptr<KeyValueType>, std::unique_ptr<KeyValueType>>>>
run_task(const std::vector<std::string> &map_ips, const std::string &reduce_address,
const std::string &master_address,
const std::vector<fs::path> &map_input_files, const fs::path &dll_path) {
if (map_ips.size() != map_input_files.size())
throw std::logic_error("Number of input files for map nodes should be the same as number of maps' ip");
const fs::path base_directory = ".cache/mapreduce";
auto library_handler = get_config_dll_handler(dll_path.filename());
auto cfg = get_config(library_handler);
auto future = get_result(cfg);
ssh_init();
run_reduce_node(reduce_address, master_address, map_ips.size(), base_directory, dll_path);
run_map_nodes(map_ips, reduce_address, map_input_files, base_directory, dll_path);
ssh_finalize();
return future;
}
std::vector<std::pair<std::unique_ptr<KeyValueType>, std::unique_ptr<KeyValueType>>>
run_task_blocking(const std::vector<std::string> &map_ips, const std::string &reduce_address,
const std::string &master_address,
const std::vector<fs::path> &map_input_file, const fs::path &dll_path) {
auto future = run_task(map_ips, reduce_address, master_address, map_input_file, dll_path);
future.wait();
return future.get();
}
}