Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions app/mlp_thread_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class MLPThreadManager {
bool has_task = false;
bool shutdown = false;

timer t_mlp;
double time_mlp = 0.0;

PartitionConfig* config_ptr = nullptr;
std::vector<std::pair<LongNodeID, std::vector<LongNodeID>>>** batch_ptr = nullptr;

Expand All @@ -79,7 +82,9 @@ class MLPThreadManager {

// Ausführen der MLP-Funktion
if (config_ptr && batch_ptr) {
t_mlp.restart();
::perform_mlp_on_batch(*config_ptr, *batch_ptr);
time_mlp += t_mlp.elapsed();
}

// Signalisiere Fertigstellung
Expand Down Expand Up @@ -108,6 +113,10 @@ class MLPThreadManager {
}
}

double get_mlp_time() const {
return time_mlp;
}

// Führt perform_mlp_on_batch asynchron aus (wartet wenn nötig)
void execute(PartitionConfig& config,
std::vector<std::pair<LongNodeID, std::vector<LongNodeID>>>*& batch) {
Expand Down
4 changes: 2 additions & 2 deletions app/parse_parameters.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ int parse_parameters(int argn, char **argv,
struct arg_dbl *param_dbl2 = arg_dbl0(NULL, "param_dbl2", NULL, "");
struct arg_dbl *param_dbl3 = arg_dbl0(NULL, "param_dbl3", NULL, "");
struct arg_lit *param_enbld1 = arg_lit0(NULL, "param_enbld1", "(Default: disabled)");
struct arg_lit *param_enbld2 = arg_lit0(NULL, "param_enbld1", "(Default: disabled)");
struct arg_lit *param_enbld3 = arg_lit0(NULL, "param_enbld1", "(Default: disabled)");
struct arg_lit *param_enbld2 = arg_lit0(NULL, "param_enbld2", "(Default: disabled)");
struct arg_lit *param_enbld3 = arg_lit0(NULL, "param_enbld3", "(Default: disabled)");
struct arg_lit *write_npo = arg_lit0(NULL, "write_npo", "(Default: disabled)");
struct arg_lit *disable_part_adj_direct = arg_lit0(NULL, "disable_part_adj_direct", "(Default: enabled)");

Expand Down
79 changes: 43 additions & 36 deletions app/streampartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,23 @@ int main(int argn, char **argv) {
██ ██ ██ ██ ██ ██ ██ ██
██ ████ ██████ ██████ ███████
)" << std::endl; */
PartitionConfig partition_config;
std::string graph_filename;
timer t, processing_t, io_t, model_t;
EdgeWeight total_edge_cut = 0;

timer processing_t, io_t, t_mlp, first_phase_t, second_phase_t, part_single_node_t, buffer_add_node_t;
double global_mapping_time = 0;
// double buffer_mapping_time = 0;
double buffer_io_time = 0;
double io_time = 0;
double buffer_add_node_time = 0;
double model_construction_time = 0;
double first_phase_time = 0;
double second_phase_time = 0;
double updating_adj_time = 0;
double part_single_node_time = 0;
double mlp_time = 0;

PartitionConfig partition_config;
std::string graph_filename;
EdgeWeight total_edge_cut = 0;

quality_metrics qm;
balance_configuration bc;
std::vector<std::pair<LongNodeID, std::vector<LongNodeID>>> *batch_nodes;
Expand Down Expand Up @@ -109,14 +118,7 @@ int main(int argn, char **argv) {
partition_config.graph_filename = graph_filename;
partition_config.stream_input = true;

timer first_phase_t, second_phase_t, updating_adj_t, partitioning_t, calc_buffer_score_t;
timer t_mlp;
double first_phase_time = 0;
double second_phase_time = 0;
double updating_adj_time = 0;
double partitioning_time = 0;
// double calc_buffer_score_time = 0;
double time_mlp = 0;


if (partition_config.write_node_part_order) {
partition_config.node_part_order = new std::vector<std::string>();
Expand All @@ -138,8 +140,7 @@ int main(int argn, char **argv) {

double avg_block_size = static_cast<double>(partition_config.number_of_nodes) / partition_config.k;
partition_config.max_block_weight = static_cast<int>(std::ceil((1.0 + partition_config.imbalance / 100) * avg_block_size));

buffer_io_time += io_t.elapsed();
io_time += io_t.elapsed();

Buffer buffer(partition_config, partition_config.max_pq_size);

Expand All @@ -166,7 +167,7 @@ int main(int argn, char **argv) {

ss2 = std::make_unique<buffered_input>(lines.get());
ss2->simple_scan_line(cur_line);
buffer_io_time += io_t.elapsed();
io_time += io_t.elapsed();

unsigned degree = cur_line.size();

Expand Down Expand Up @@ -198,14 +199,12 @@ int main(int argn, char **argv) {
// Partition node directly
partition_config.count_misc1++;
mlp_thread_manager.wait_completion();
partitioning_t.restart();
part_single_node_t.restart();
partition_single_node(partition_config, global_node_id, cur_line, true);
partitioning_time += partitioning_t.elapsed();
updating_adj_t.restart();
part_single_node_time += part_single_node_t.elapsed();

// Update neighbors
buffer.update_neighbours_priority(cur_line);
updating_adj_time += updating_adj_t.elapsed();
}
continue;

Expand All @@ -217,23 +216,24 @@ int main(int argn, char **argv) {
mlp_thread_manager.wait_completion();
buffer.loadTopNodesToBatch(batch_nodes, partition_config.stream_buffer_len);

t_mlp.restart();
mlp_thread_manager.execute(partition_config, batch_nodes);
time_mlp += t_mlp.elapsed();
} else {
buffer.loadTopNodesToBatch(batch_nodes, partition_config.stream_buffer_len);

t_mlp.restart();
perform_mlp_on_batch(partition_config, batch_nodes);
time_mlp += t_mlp.elapsed();
mlp_time += t_mlp.elapsed();
}

} else {
buffer.partitionTopNode();
}
}

buffer_add_node_t.restart();
buffer.addNode(global_node_id, cur_line);
buffer_add_node_time += buffer_add_node_t.elapsed();

}
cur_line.clear();
first_phase_time += first_phase_t.elapsed();
Expand All @@ -247,14 +247,12 @@ int main(int argn, char **argv) {
if (partition_config.parallel_mlp) {
mlp_thread_manager.wait_completion();
buffer.loadTopNodesToBatch(batch_nodes, partition_config.stream_buffer_len);
t_mlp.restart();
mlp_thread_manager.execute(partition_config, batch_nodes);
time_mlp += t_mlp.elapsed();
} else {
buffer.loadTopNodesToBatch(batch_nodes, partition_config.stream_buffer_len);
t_mlp.restart();
perform_mlp_on_batch(partition_config, batch_nodes);
time_mlp += t_mlp.elapsed();
mlp_time += t_mlp.elapsed();
}


Expand All @@ -266,21 +264,32 @@ int main(int argn, char **argv) {
}
if (partition_config.parallel_mlp) {
mlp_thread_manager.wait_completion();
mlp_time = mlp_thread_manager.get_mlp_time();
}
second_phase_time += second_phase_t.elapsed();
updating_adj_time = buffer.get_update_adj_time();

}
double total_time = processing_t.elapsed();
long maxRSS = getMaxRSS();


if (partition_config.print_times) {
std::cout << "First phase time: " << first_phase_time << std::endl;
std::cout << "Second phase time: " << second_phase_time << std::endl;
std::cout << "MLP time: " << time_mlp << std::endl;
// std::cout << "Updating adj time: " << updating_adj_time << std::endl;
// std::cout << "Partitioning time: " << partitioning_time << std::endl;
// std::cout << "Calc buffer score time: " << calc_buffer_score_time << std::endl;
std::cout << "┌─────────────────────────┬───────────────┬───────────────┐" << std::endl;
std::cout << "│ Metric │ Time (s) │ Percentage │" << std::endl;
std::cout << "├─────────────────────────┼───────────────┼───────────────┤" << std::endl;
std::cout << "│ Total time │ " << std::setw(13) << std::fixed << std::setprecision(3) << total_time << " │ " << std::setw(13) << "100%" << " │" << std::endl;
std::cout << "│ First phase time │ " << std::setw(13) << std::fixed << std::setprecision(3) << first_phase_time << " │ " << std::setw(12) << std::fixed << std::setprecision(0) << (first_phase_time / total_time * 100) << "%" << " │" << std::endl;
std::cout << "│ Second phase time │ " << std::setw(13) << std::fixed << std::setprecision(3) << second_phase_time << " │ " << std::setw(12) << std::fixed << std::setprecision(0) << (second_phase_time / total_time * 100) << "%" << " │" << std::endl;
std::cout << "├─────────────────────────┼───────────────┼───────────────┤" << std::endl;
std::cout << "│ IO time │ " << std::setw(13) << std::fixed << std::setprecision(3) << io_time << " │ " << std::setw(12) << std::fixed << std::setprecision(0) << (io_time / total_time * 100) << "%" << " │" << std::endl;
std::cout << "│ Buffer add node time │ " << std::setw(13) << std::fixed << std::setprecision(3) << buffer_add_node_time << " │ " << std::setw(12) << std::fixed << std::setprecision(0) << (buffer_add_node_time / total_time * 100) << "%" << " │" << std::endl;
std::cout << "│ Updating adj time │ " << std::setw(13) << std::fixed << std::setprecision(3) << updating_adj_time << " │ " << std::setw(12) << std::fixed << std::setprecision(0) << (updating_adj_time / total_time * 100) << "%" << " │" << std::endl;
std::cout << "│ Part single node time │ " << std::setw(13) << std::fixed << std::setprecision(3) << part_single_node_time << " │ " << std::setw(12) << std::fixed << std::setprecision(0) << (part_single_node_time / total_time * 100) << "%" << " │" << std::endl;
std::cout << "│ MLP time │ " << std::setw(13) << std::fixed << std::setprecision(3) << mlp_time << " │ " << std::setw(12) << std::fixed << std::setprecision(0) << (mlp_time / total_time * 100) << "%" << " │" << std::endl;
double sum_detailed = io_time + buffer_add_node_time + updating_adj_time + mlp_time;
std::cout << "│ Sum of detailed times │ " << std::setw(13) << std::fixed << std::setprecision(3) << sum_detailed << " │ " << std::setw(12) << std::fixed << std::setprecision(0) << (sum_detailed / total_time * 100) << "%" << " │" << std::endl;
std::cout << "└─────────────────────────┴───────────────┴───────────────┘" << std::endl;
}
FlatBufferWriter fb_writer;

Expand All @@ -298,10 +307,8 @@ int main(int argn, char **argv) {
}

double total_time_rounded = std::round(total_time * 100.0) / 100.0;
std::cout << total_time_rounded << " " << total_edge_cut << " " << maxRSS << std::endl; // << " " << cnt_part_adj_directly << std::endl;
// std::cout << total_time_rounded << " " << total_edge_cut << " " << maxRSS << " " << time_mlp << std::endl; // << " " << cnt_part_adj_directly << std::endl;
// std::cout << "Count Dmax part: " << partition_config.count_misc1 << std::endl;
// std::cout << "count total single part: " << partition_config.count_misc2 << std::endl;
std::cout << total_time_rounded << " " << total_edge_cut << " " << maxRSS << std::endl;

// write the partition to the disc
std::stringstream filename;
if (!partition_config.filename_output.compare("")) {
Expand Down
13 changes: 13 additions & 0 deletions lib/data_structure/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <memory>
#include <optional>
#include <vector>
#include "timer.h"

#include "data_structure/graph_access.h"
#include "data_structure/priority_queues/bucket_pq.h"
Expand Down Expand Up @@ -109,6 +110,9 @@ class Buffer {
double progress;
float current_beta;

timer update_adj_t;
double update_adj_time = 0.0;

public:
Buffer(PartitionConfig &partition_config, LongNodeID max_pq_size)
: config(partition_config),
Expand Down Expand Up @@ -387,6 +391,8 @@ class Buffer {

// Update the priority value of the neighbours of the node that was just partitioned in the priority queue
void update_neighbours_priority(std::vector<LongNodeID> &adjacents, bool part_adj_directly = false) {
update_adj_t.restart();

if (part_adj_directly == false) {
part_adj_directly = config.part_adj_directly;
}
Expand All @@ -407,12 +413,14 @@ class Buffer {
// Check if all neighbours of the neighbour are partitioned, if so, partition the neighbour
if (part_adj_directly && adj_degree > 3 && adj_degree == adj_buffer_item.num_adj_partitioned ) { //&& config.buffer_score_type != BUFFER_SCORE_CBS2
// if (part_adj_directly && adj_degree > config.param_int1 && adj_degree == adj_buffer_item.num_adj_partitioned && config.buffer_score_type != BUFFER_SCORE_CBS2) {
update_adj_time += update_adj_t.elapsed();
pq.deleteNode(adj_id);
partition_single_node(config, adj_id, adj_adjacents);

// Update neighbors and clear buffer item
update_neighbours_priority(adj_adjacents);
completely_remove_node(adj_id);
update_adj_t.restart();
} else {
// Update buffer score of neighbours
float updated_buffer_score = calc_updated_buffer_score(adj_id, adj_buffer_item);
Expand All @@ -421,6 +429,11 @@ class Buffer {
}
}
}
update_adj_time += update_adj_t.elapsed();
}

double get_update_adj_time() {
return update_adj_time;
}

// Helper-Methoden
Expand Down