From 4a4809b1e1fe3195316a218177966c0c12f8ef9e Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Mon, 15 Jul 2024 18:10:06 +0200 Subject: [PATCH 01/19] docs: :construction: write the first definition of new function --- .../algorithms/centrality/closeness.py | 17 +++++++ .../algorithms/shortest_paths/dense.py | 49 +++++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 nx_parallel/algorithms/centrality/closeness.py create mode 100644 nx_parallel/algorithms/shortest_paths/dense.py diff --git a/nx_parallel/algorithms/centrality/closeness.py b/nx_parallel/algorithms/centrality/closeness.py new file mode 100644 index 00000000..5435d776 --- /dev/null +++ b/nx_parallel/algorithms/centrality/closeness.py @@ -0,0 +1,17 @@ +__all__ = ["closeness_centrality"] + + +def closeness_centrality( + G, u=None, distance=None, wf_improved=True, get_chunks="chunks" +): + """The parallel implementation first divide the nodes into chunks and + + networkx.closeness_centrality : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.centrality.closeness_centrality.html + + Parameters + ---------- + get_chunks : str, function (default = "chunks") + A function that takes in a list of all the nodes as input and returns an + iterable `node_chunks`. The default chunking is done by slicing the + `nodes` into `n` chunks, where `n` is the number of CPU cores. + """ diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py new file mode 100644 index 00000000..abddd65e --- /dev/null +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -0,0 +1,49 @@ +"""Floyd-Warshall algorithm for shortest paths.""" + +__all__ = [ + "floyd_warshall_numpy", +] + + +def floyd_warshall_numpy(G, nodelist=None, weight="weight", get_chunks="chunks"): + """ + Parallel implementation of Floyd warshall using the tiled floyd warshall algorithm from + 'All-Pairs Shortest-Paths for Large Graphs on the GPU, Authors: Gary J. Katz and Joseph T. Kider Jr' + + networkx.floyd_warshall_numpy : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.dense.floyd_warshall_numpy.html + + Parameters + ---------- + get_chunks : str, function (default = "chunks") + A function that takes in a list of all the nodes as input and returns an + iterable `node_chunks`. The default chunking is done by slicing the + `nodes` into `n` chunks, where `n` is the number of CPU cores. + """ + + +def _partial_floyd_warshall_numpy(A, k, i, j): + """ + Compute partial FW in the determined sub-block for the execution of + parallel tiled FW. + + Parameters + ---------- + A : 2D numpy.ndarray + matrix that reppresent the adjacency matrix of the graph + + k : tuple + range (start-end) of the primary block in the current iteration + + i : tuple + range (start-end) of the rows in the current block computed + + j : tuple + range (start-end) of the columns in the current block computed + + Returns + ------- + A : 2D numpy.ndarray + adjacency matrix updated after floyd warshall + """ + + # np.add.outer(A[:,k],A[k,:]) From 62c646d9088853f7c47483461676e13426a92909 Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Thu, 18 Jul 2024 10:07:30 +0200 Subject: [PATCH 02/19] feat: :sparkles: implement the core function of tiled floyd warshall --- .../algorithms/shortest_paths/dense.py | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index abddd65e..18f5fbbd 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -1,4 +1,8 @@ """Floyd-Warshall algorithm for shortest paths.""" +from joblib import Parallel, delayed +import nx_parallel as nxp +import networkx as nx +import numpy as np __all__ = [ "floyd_warshall_numpy", @@ -20,8 +24,37 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", get_chunks="chunks") `nodes` into `n` chunks, where `n` is the number of CPU cores. """ + if nodelist is not None: + if not (len(nodelist) == len(G) == len(set(nodelist))): + raise nxp.NetworkXError( + "nodelist must contain every node in G with no repeats." + "If you wanted a subgraph of G use G.subgraph(nodelist)" + ) -def _partial_floyd_warshall_numpy(A, k, i, j): + # To handle cases when an edge has weight=0, we must make sure that + # nonedges are not given the value 0 as well. + A = nx.to_numpy_array( + G, nodelist, multigraph_weight=min, weight=weight, nonedge=np.inf + ) + n, m = A.shape + ( + k, + i, + j, + ) = 1 # TODO: chunking in submatrix and assign k i j iterable for sub block that are not primary + # total_cores = nxp.cpu_count() + # blocking_factor = ( + # n / total_cores + # ) # TODO: write a more specific chunking for spliting non dividable matrix + + Parallel(n_jobs=1, require="sharedmem")( + delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for i in range(10) + ) + + return A + + +def _partial_floyd_warshall_numpy(A, k_iteration, i_iteration, j_iteration): """ Compute partial FW in the determined sub-block for the execution of parallel tiled FW. @@ -31,13 +64,13 @@ def _partial_floyd_warshall_numpy(A, k, i, j): A : 2D numpy.ndarray matrix that reppresent the adjacency matrix of the graph - k : tuple + k_iteration : tuple range (start-end) of the primary block in the current iteration - i : tuple + i_iteration : tuple range (start-end) of the rows in the current block computed - j : tuple + j_iteration : tuple range (start-end) of the columns in the current block computed Returns @@ -46,4 +79,8 @@ def _partial_floyd_warshall_numpy(A, k, i, j): adjacency matrix updated after floyd warshall """ - # np.add.outer(A[:,k],A[k,:]) + for k in range(k_iteration[0], k_iteration[1] + 1): + for i in range(i_iteration[0], i_iteration[1] + 1): + for j in range(j_iteration[0], j_iteration[1] + 1): + A[i][j] = np.minimum(A[i][j], (A[i][k] + A[k][j])) + return A From b93a4e45695b0ca04f0231827bed63a21fa1a19f Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Sun, 21 Jul 2024 20:22:03 +0200 Subject: [PATCH 03/19] feat: :construction: Add some private function and a parameter for main function --- .../algorithms/shortest_paths/dense.py | 96 +++++++++++++++---- 1 file changed, 78 insertions(+), 18 deletions(-) diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index 18f5fbbd..3df35dab 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -3,13 +3,14 @@ import nx_parallel as nxp import networkx as nx import numpy as np +import math __all__ = [ "floyd_warshall_numpy", ] -def floyd_warshall_numpy(G, nodelist=None, weight="weight", get_chunks="chunks"): +def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None): """ Parallel implementation of Floyd warshall using the tiled floyd warshall algorithm from 'All-Pairs Shortest-Paths for Large Graphs on the GPU, Authors: Gary J. Katz and Joseph T. Kider Jr' @@ -18,10 +19,10 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", get_chunks="chunks") Parameters ---------- - get_chunks : str, function (default = "chunks") - A function that takes in a list of all the nodes as input and returns an - iterable `node_chunks`. The default chunking is done by slicing the - `nodes` into `n` chunks, where `n` is the number of CPU cores. + blocking_factor : number + The number used for divinding the adjacency matrix in block. + The default blocking factor is get by finding the optimal value + for the core available """ if nodelist is not None: @@ -37,19 +38,34 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", get_chunks="chunks") G, nodelist, multigraph_weight=min, weight=weight, nonedge=np.inf ) n, m = A.shape - ( - k, - i, - j, - ) = 1 # TODO: chunking in submatrix and assign k i j iterable for sub block that are not primary - # total_cores = nxp.cpu_count() - # blocking_factor = ( - # n / total_cores - # ) # TODO: write a more specific chunking for spliting non dividable matrix - - Parallel(n_jobs=1, require="sharedmem")( - delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for i in range(10) - ) + matrix_len = m * n + # TODO: chunking in submatrix and assign k i j iterable for sub block that are not primary + total_cores = nxp.cpu_count() + if blocking_factor is None: + blocking_factor = _find_nearest_divisor(n, total_cores) + + no_of_primary = matrix_len / blocking_factor + # TODO: write a more specific chunking for spliting non dividable matrix + for it in range(no_of_primary): + # Phase 1: Compute Primary block + k_start = (it * matrix_len) // no_of_primary + k_end = k_start + (matrix_len // no_of_primary) - 1 + k = (k_start, k_end) + # Execute Normal floyd warshall for the primary block submatrix + A = _partial_floyd_warshall_numpy(A, k, k, k) + # Phase 2: Compute Cross block + i, j = 0 + Parallel(n_jobs=(no_of_primary - 1) * 2, require="sharedmem")( + delayed(_partial_floyd_warshall_numpy)(A, k, i, j) + for i_start in range( + k_start, + k_end, + ) + ) + # Phase 3: Compute remaining + Parallel(n_jobs=(no_of_primary - 1) ** 2, require="sharedmem")( + delayed(_partial_floyd_warshall_numpy)(A, k, i, j) + ) return A @@ -84,3 +100,47 @@ def _partial_floyd_warshall_numpy(A, k_iteration, i_iteration, j_iteration): for j in range(j_iteration[0], j_iteration[1] + 1): A[i][j] = np.minimum(A[i][j], (A[i][k] + A[k][j])) return A + + +def _calculate_divisor(i, x, y): + if x % i == 0: + divisor1 = i + result1 = x // i + difference1 = abs((result1 - 1) ** 2 - y) + + divisor2 = x // i + result2 = i + difference2 = abs((result2 - 1) ** 2 - y) + + if difference1 < difference2: + return divisor1, result1, difference1 + else: + return divisor2, result2, difference2 + return None + + +def _find_nearest_divisor(x, y): + """ + find the optimal value for the blocking factor parameter + + Parameters + ---------- + x : node number + + y : cpu core available + """ + # Find the square root of x + sqrt_x = int(math.sqrt(x)) + 1 + + # Execute the calculation in parallel + results = Parallel(n_jobs=-1)( + delayed(_calculate_divisor)(i, x, y) for i in range(1, sqrt_x) + ) + + # Filter out None results + results = [r for r in results if r is not None] + + # Find the best divisor + best_divisor, best_result, min_difference = min(results, key=lambda x: x[2]) + + return best_divisor From 45770c2b88ec2ff22982d68bcbd9eecd7f878def Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Wed, 24 Jul 2024 18:53:42 +0200 Subject: [PATCH 04/19] feat: :sparkles: Add code for the third phase of the alogrithm --- .../algorithms/shortest_paths/dense.py | 41 +++++++++++++------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index 3df35dab..39ec6dd9 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -39,32 +39,44 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None ) n, m = A.shape matrix_len = m * n - # TODO: chunking in submatrix and assign k i j iterable for sub block that are not primary + + # TODO: handle graph with a prime number of node, as the matrix is not divisible total_cores = nxp.cpu_count() if blocking_factor is None: blocking_factor = _find_nearest_divisor(n, total_cores) no_of_primary = matrix_len / blocking_factor - # TODO: write a more specific chunking for spliting non dividable matrix - for it in range(no_of_primary): - # Phase 1: Compute Primary block - k_start = (it * matrix_len) // no_of_primary + + for primary_block in range(no_of_primary): + k_start = (primary_block * matrix_len) // no_of_primary k_end = k_start + (matrix_len // no_of_primary) - 1 k = (k_start, k_end) + # Phase 1: Compute Primary block # Execute Normal floyd warshall for the primary block submatrix A = _partial_floyd_warshall_numpy(A, k, k, k) # Phase 2: Compute Cross block - i, j = 0 + params = [] + for block in range(no_of_primary): + # skip the primary block computed in phase 1 + if block != primary_block: + # append the actual indices of the matrix by multiply the block number with the blocking factor + block_coord = _block_range(blocking_factor, block) + params.append((block_coord, k)) + params.append((k, block_coord)) Parallel(n_jobs=(no_of_primary - 1) * 2, require="sharedmem")( - delayed(_partial_floyd_warshall_numpy)(A, k, i, j) - for i_start in range( - k_start, - k_end, - ) + delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) # Phase 3: Compute remaining + params.clear() + for block_i in range(no_of_primary): + for block_j in range(no_of_primary): + # skip all block previously computed, so skip every block with primary block value + if block_i != primary_block and block_j != primary_block: + i_range = _block_range(blocking_factor, block_i) + j_range = _block_range(blocking_factor, block_j) + params.append((i_range, j_range)) Parallel(n_jobs=(no_of_primary - 1) ** 2, require="sharedmem")( - delayed(_partial_floyd_warshall_numpy)(A, k, i, j) + delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) return A @@ -102,6 +114,10 @@ def _partial_floyd_warshall_numpy(A, k_iteration, i_iteration, j_iteration): return A +def _block_range(blocking_factor, block): + return (block * blocking_factor, (block + 1) * blocking_factor) + + def _calculate_divisor(i, x, y): if x % i == 0: divisor1 = i @@ -119,6 +135,7 @@ def _calculate_divisor(i, x, y): return None +# TODO add side case for prime number def _find_nearest_divisor(x, y): """ find the optimal value for the blocking factor parameter From f5e6065be6c09fb2cf4dfececeeec9bf4b22a74b Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Thu, 25 Jul 2024 22:26:55 +0200 Subject: [PATCH 05/19] feat: :sparkles: Add feature for graph with a prime number of nodes --- .../algorithms/shortest_paths/dense.py | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index 39ec6dd9..5105e31a 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -43,7 +43,7 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None # TODO: handle graph with a prime number of node, as the matrix is not divisible total_cores = nxp.cpu_count() if blocking_factor is None: - blocking_factor = _find_nearest_divisor(n, total_cores) + blocking_factor, is_prime = _find_nearest_divisor(n, total_cores) no_of_primary = matrix_len / blocking_factor @@ -60,7 +60,10 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None # skip the primary block computed in phase 1 if block != primary_block: # append the actual indices of the matrix by multiply the block number with the blocking factor - block_coord = _block_range(blocking_factor, block) + if is_prime and block == no_of_primary - 1: + block_coord = (block * blocking_factor, n) + else: + block_coord = _block_range(blocking_factor, block) params.append((block_coord, k)) params.append((k, block_coord)) Parallel(n_jobs=(no_of_primary - 1) * 2, require="sharedmem")( @@ -72,6 +75,14 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None for block_j in range(no_of_primary): # skip all block previously computed, so skip every block with primary block value if block_i != primary_block and block_j != primary_block: + if is_prime: + if block_i == no_of_primary - 1: + i_range = (block * blocking_factor, n) + j_range = _block_range(blocking_factor, block_j) + if block_j == no_of_primary - 1: + j_range = (block * blocking_factor, n) + i_range = _block_range(blocking_factor, block_i) + else: i_range = _block_range(blocking_factor, block_i) j_range = _block_range(blocking_factor, block_j) params.append((i_range, j_range)) @@ -157,7 +168,11 @@ def _find_nearest_divisor(x, y): # Filter out None results results = [r for r in results if r is not None] + if len(results) <= 1: + # This check if a number is prime, although repeat process with a non prime number + best_divisor, _ = _find_nearest_divisor(x - 1, y) + return best_divisor, True # Find the best divisor - best_divisor, best_result, min_difference = min(results, key=lambda x: x[2]) + best_divisor, _, _ = min(results, key=lambda x: x[2]) - return best_divisor + return best_divisor, False From 645d67d313086ce7c24e7f5718e929a9c429c394 Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Thu, 25 Jul 2024 22:34:00 +0200 Subject: [PATCH 06/19] docs: :memo: Remove TODO marker and update docs --- nx_parallel/algorithms/centrality/closeness.py | 10 +++++----- nx_parallel/algorithms/shortest_paths/dense.py | 4 +--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/nx_parallel/algorithms/centrality/closeness.py b/nx_parallel/algorithms/centrality/closeness.py index 5435d776..37cf2ccf 100644 --- a/nx_parallel/algorithms/centrality/closeness.py +++ b/nx_parallel/algorithms/centrality/closeness.py @@ -2,7 +2,7 @@ def closeness_centrality( - G, u=None, distance=None, wf_improved=True, get_chunks="chunks" + G, u=None, distance=None, wf_improved=True, blocking_factor=None ): """The parallel implementation first divide the nodes into chunks and @@ -10,8 +10,8 @@ def closeness_centrality( Parameters ---------- - get_chunks : str, function (default = "chunks") - A function that takes in a list of all the nodes as input and returns an - iterable `node_chunks`. The default chunking is done by slicing the - `nodes` into `n` chunks, where `n` is the number of CPU cores. + blocking_factor : number + The number used for divinding the adjacency matrix in sub-matrix. + The default blocking factor is get by finding the optimal value + for the core available """ diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index 5105e31a..6c826507 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -20,7 +20,7 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None Parameters ---------- blocking_factor : number - The number used for divinding the adjacency matrix in block. + The number used for divinding the adjacency matrix in sub-matrix. The default blocking factor is get by finding the optimal value for the core available """ @@ -40,7 +40,6 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None n, m = A.shape matrix_len = m * n - # TODO: handle graph with a prime number of node, as the matrix is not divisible total_cores = nxp.cpu_count() if blocking_factor is None: blocking_factor, is_prime = _find_nearest_divisor(n, total_cores) @@ -146,7 +145,6 @@ def _calculate_divisor(i, x, y): return None -# TODO add side case for prime number def _find_nearest_divisor(x, y): """ find the optimal value for the blocking factor parameter From 8eb1b93f626a1d2d323f4d4231918e277324c301 Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Wed, 31 Jul 2024 14:23:57 +0200 Subject: [PATCH 07/19] feat: :sparkles: Implement the closeness centrality function --- .../algorithms/centrality/closeness.py | 49 ++++++++++++++++++- .../algorithms/shortest_paths/dense.py | 20 +++++--- 2 files changed, 60 insertions(+), 9 deletions(-) diff --git a/nx_parallel/algorithms/centrality/closeness.py b/nx_parallel/algorithms/centrality/closeness.py index 37cf2ccf..a73e4d46 100644 --- a/nx_parallel/algorithms/centrality/closeness.py +++ b/nx_parallel/algorithms/centrality/closeness.py @@ -1,10 +1,18 @@ +""" +Closeness centrality measures. +""" + +from joblib import Parallel, delayed +import nx_parallel as nxp + __all__ = ["closeness_centrality"] def closeness_centrality( G, u=None, distance=None, wf_improved=True, blocking_factor=None ): - """The parallel implementation first divide the nodes into chunks and + """The parallel implementation of closeness centrality, that use parallel tiled floy warshall to find the + geodesic distance of the node networkx.closeness_centrality : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.centrality.closeness_centrality.html @@ -15,3 +23,42 @@ def closeness_centrality( The default blocking factor is get by finding the optimal value for the core available """ + if G.is_directed(): + G = G.reverse() # create a reversed graph view + + A = nxp.floyd_warshall_numpy(G, blocking_factor=blocking_factor) + + len_G = len(G) + + closeness_dict = Parallel(n_jobs=-1)( + delayed(_closeness_measure(n, wf_improved, len_G))(A) for n in A + ) + + if u is not None: + return closeness_dict[u] + return closeness_dict + + +def _closeness_measure(n, wf_improved, len_G): + """calculate the closeness centrality measure of one node using the row of edges i + + Parameters + ---------- + n : 1D numpy.ndarray + the array of distance from every other node + + Returns + ------- + k : numebr + the closeness value for the selected node + """ + totsp = sum(n) + closeness_value = 0.0 + if totsp > 0.0 and len_G > 1: + closeness_value = (len(n) - 1.0) / totsp + # normalize to number of nodes-1 in connected part + if wf_improved: + s = (len(n) - 1.0) / (len_G - 1) + closeness_value *= s + + return closeness_value diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index 6c826507..0bb30003 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -12,8 +12,8 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None): """ - Parallel implementation of Floyd warshall using the tiled floyd warshall algorithm from - 'All-Pairs Shortest-Paths for Large Graphs on the GPU, Authors: Gary J. Katz and Joseph T. Kider Jr' + Parallel implementation of Floyd warshall using the tiled floyd warshall algorithm [1]_. + networkx.floyd_warshall_numpy : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.shortest_paths.dense.floyd_warshall_numpy.html @@ -23,6 +23,12 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None The number used for divinding the adjacency matrix in sub-matrix. The default blocking factor is get by finding the optimal value for the core available + + References + ---------- + .. [1] Gary J. Katz and Joseph T. Kider Jr: + All-Pairs Shortest-Paths for Large Graphs on the GPU, 2008. + """ if nodelist is not None: @@ -65,7 +71,7 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None block_coord = _block_range(blocking_factor, block) params.append((block_coord, k)) params.append((k, block_coord)) - Parallel(n_jobs=(no_of_primary - 1) * 2, require="sharedmem")( + A = Parallel(n_jobs=(no_of_primary - 1) * 2, require="sharedmem")( delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) # Phase 3: Compute remaining @@ -85,7 +91,7 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None i_range = _block_range(blocking_factor, block_i) j_range = _block_range(blocking_factor, block_j) params.append((i_range, j_range)) - Parallel(n_jobs=(no_of_primary - 1) ** 2, require="sharedmem")( + A = Parallel(n_jobs=(no_of_primary - 1) ** 2, require="sharedmem")( delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) @@ -130,12 +136,10 @@ def _block_range(blocking_factor, block): def _calculate_divisor(i, x, y): if x % i == 0: - divisor1 = i - result1 = x // i + divisor1, result2 = i + result1, divisor2 = x // i difference1 = abs((result1 - 1) ** 2 - y) - divisor2 = x // i - result2 = i difference2 = abs((result2 - 1) ** 2 - y) if difference1 < difference2: From 90e10632f2fab9d5669795e05517d3a2e3a10427 Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Wed, 31 Jul 2024 16:26:18 +0200 Subject: [PATCH 08/19] feat: :wrench: add closeness alg into interface.py and __init__.py --- nx_parallel/algorithms/shortest_paths/__init__.py | 1 + nx_parallel/interface.py | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/nx_parallel/algorithms/shortest_paths/__init__.py b/nx_parallel/algorithms/shortest_paths/__init__.py index 342518c0..35cbaeac 100644 --- a/nx_parallel/algorithms/shortest_paths/__init__.py +++ b/nx_parallel/algorithms/shortest_paths/__init__.py @@ -1,3 +1,4 @@ from .generic import * from .weighted import * from .unweighted import * +from .dense import * diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index 0c409731..52af98bb 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -3,6 +3,7 @@ betweenness_centrality, edge_betweenness_centrality, ) +from nx_parallel.algorithms.centrality.closeness import closeness_centrality from nx_parallel.algorithms.shortest_paths.generic import all_pairs_all_shortest_paths from nx_parallel.algorithms.shortest_paths.weighted import ( all_pairs_dijkstra, @@ -16,6 +17,7 @@ all_pairs_shortest_path, all_pairs_shortest_path_length, ) +from nx_parallel.algorithms.shortest_paths.dense import floyd_warshall_numpy from nx_parallel.algorithms.efficiency_measures import local_efficiency from nx_parallel.algorithms.isolate import number_of_isolates from nx_parallel.algorithms.tournament import ( @@ -78,6 +80,7 @@ class BackendInterface: # Centrality betweenness_centrality = betweenness_centrality edge_betweenness_centrality = edge_betweenness_centrality + closeness_centrality = closeness_centrality # Efficiency local_efficiency = local_efficiency @@ -93,6 +96,9 @@ class BackendInterface: all_pairs_bellman_ford_path = all_pairs_bellman_ford_path johnson = johnson + # Shortest Paths : dense + floyd_warshall_numpy = floyd_warshall_numpy + # Clustering square_clustering = square_clustering From e88c3b71e4fd219cb7ef4f6a1259ba4970b7d7c2 Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Fri, 2 Aug 2024 11:17:23 +0200 Subject: [PATCH 09/19] fix: :construction: change parallel graph object into normal graph object --- nx_parallel/algorithms/centrality/closeness.py | 3 +++ nx_parallel/algorithms/shortest_paths/dense.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/nx_parallel/algorithms/centrality/closeness.py b/nx_parallel/algorithms/centrality/closeness.py index a73e4d46..69345706 100644 --- a/nx_parallel/algorithms/centrality/closeness.py +++ b/nx_parallel/algorithms/centrality/closeness.py @@ -23,6 +23,9 @@ def closeness_centrality( The default blocking factor is get by finding the optimal value for the core available """ + if hasattr(G, "graph_object"): + G = G.graph_object + if G.is_directed(): G = G.reverse() # create a reversed graph view diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index 0bb30003..6e1d31e7 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -38,6 +38,9 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None "If you wanted a subgraph of G use G.subgraph(nodelist)" ) + if hasattr(G, "graph_object"): + G = G.graph_object + # To handle cases when an edge has weight=0, we must make sure that # nonedges are not given the value 0 as well. A = nx.to_numpy_array( From d73fd49fee69b3de7c8f5be56af032131406f0dc Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Sat, 3 Aug 2024 18:10:09 +0200 Subject: [PATCH 10/19] fix: :construction: Fix indexing problem of the matrix, still not working --- .../algorithms/shortest_paths/dense.py | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index 6e1d31e7..344f91b8 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -30,6 +30,8 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None All-Pairs Shortest-Paths for Large Graphs on the GPU, 2008. """ + if hasattr(G, "graph_object"): + G = G.graph_object if nodelist is not None: if not (len(nodelist) == len(G) == len(set(nodelist))): @@ -38,26 +40,21 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None "If you wanted a subgraph of G use G.subgraph(nodelist)" ) - if hasattr(G, "graph_object"): - G = G.graph_object - # To handle cases when an edge has weight=0, we must make sure that # nonedges are not given the value 0 as well. A = nx.to_numpy_array( G, nodelist, multigraph_weight=min, weight=weight, nonedge=np.inf ) - n, m = A.shape - matrix_len = m * n + n, _ = A.shape total_cores = nxp.cpu_count() if blocking_factor is None: blocking_factor, is_prime = _find_nearest_divisor(n, total_cores) - - no_of_primary = matrix_len / blocking_factor + no_of_primary = n // blocking_factor for primary_block in range(no_of_primary): - k_start = (primary_block * matrix_len) // no_of_primary - k_end = k_start + (matrix_len // no_of_primary) - 1 + k_start = (primary_block * n) // no_of_primary + k_end = k_start + (n // no_of_primary) - 1 k = (k_start, k_end) # Phase 1: Compute Primary block # Execute Normal floyd warshall for the primary block submatrix @@ -134,13 +131,13 @@ def _partial_floyd_warshall_numpy(A, k_iteration, i_iteration, j_iteration): def _block_range(blocking_factor, block): - return (block * blocking_factor, (block + 1) * blocking_factor) + return (block * blocking_factor, (block + 1) * blocking_factor - 1) def _calculate_divisor(i, x, y): if x % i == 0: - divisor1, result2 = i - result1, divisor2 = x // i + divisor1 = result2 = i + result1 = divisor2 = x // i difference1 = abs((result1 - 1) ** 2 - y) difference2 = abs((result2 - 1) ** 2 - y) From ee252ff93239f759657d0ba10eab345b41575f5e Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Sun, 4 Aug 2024 13:03:18 +0200 Subject: [PATCH 11/19] fix: :construction: fix matrix shape problem still give numpy.float64 object is not callable on closeness.py --- nx_parallel/algorithms/shortest_paths/dense.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index 344f91b8..0557138a 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -58,7 +58,8 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None k = (k_start, k_end) # Phase 1: Compute Primary block # Execute Normal floyd warshall for the primary block submatrix - A = _partial_floyd_warshall_numpy(A, k, k, k) + _partial_floyd_warshall_numpy(A, k, k, k) + print(A.shape) # Phase 2: Compute Cross block params = [] for block in range(no_of_primary): @@ -71,7 +72,8 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None block_coord = _block_range(blocking_factor, block) params.append((block_coord, k)) params.append((k, block_coord)) - A = Parallel(n_jobs=(no_of_primary - 1) * 2, require="sharedmem")( + print(params) + Parallel(n_jobs=(no_of_primary - 1) * 2, require="sharedmem")( delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) # Phase 3: Compute remaining @@ -91,7 +93,7 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None i_range = _block_range(blocking_factor, block_i) j_range = _block_range(blocking_factor, block_j) params.append((i_range, j_range)) - A = Parallel(n_jobs=(no_of_primary - 1) ** 2, require="sharedmem")( + Parallel(n_jobs=(no_of_primary - 1) ** 2, require="sharedmem")( delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) @@ -122,12 +124,10 @@ def _partial_floyd_warshall_numpy(A, k_iteration, i_iteration, j_iteration): A : 2D numpy.ndarray adjacency matrix updated after floyd warshall """ - - for k in range(k_iteration[0], k_iteration[1] + 1): - for i in range(i_iteration[0], i_iteration[1] + 1): - for j in range(j_iteration[0], j_iteration[1] + 1): + for k in range(k_iteration[0], k_iteration[1]): + for i in range(i_iteration[0], i_iteration[1]): + for j in range(j_iteration[0], j_iteration[1]): A[i][j] = np.minimum(A[i][j], (A[i][k] + A[k][j])) - return A def _block_range(blocking_factor, block): From 1e68ae4b92edea36aa28884fb5c17f04bc1a1de2 Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Fri, 9 Aug 2024 00:09:42 +0200 Subject: [PATCH 12/19] fix: :bug: Fix Floyd Warshall Tiling, functioning --- .../algorithms/shortest_paths/dense.py | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index 0557138a..746f25f3 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -35,7 +35,7 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None if nodelist is not None: if not (len(nodelist) == len(G) == len(set(nodelist))): - raise nxp.NetworkXError( + raise nx.NetworkXError( "nodelist must contain every node in G with no repeats." "If you wanted a subgraph of G use G.subgraph(nodelist)" ) @@ -46,6 +46,7 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None G, nodelist, multigraph_weight=min, weight=weight, nonedge=np.inf ) n, _ = A.shape + np.fill_diagonal(A, 0) # diagonal elements should be zero total_cores = nxp.cpu_count() if blocking_factor is None: @@ -55,24 +56,27 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None for primary_block in range(no_of_primary): k_start = (primary_block * n) // no_of_primary k_end = k_start + (n // no_of_primary) - 1 + if is_prime and primary_block == no_of_primary - 1: + k_end = k_end + (n % no_of_primary) k = (k_start, k_end) # Phase 1: Compute Primary block # Execute Normal floyd warshall for the primary block submatrix _partial_floyd_warshall_numpy(A, k, k, k) - print(A.shape) + # Phase 2: Compute Cross block + params = [] for block in range(no_of_primary): # skip the primary block computed in phase 1 if block != primary_block: # append the actual indices of the matrix by multiply the block number with the blocking factor if is_prime and block == no_of_primary - 1: - block_coord = (block * blocking_factor, n) + block_coord = (block * blocking_factor, n - 1) else: block_coord = _block_range(blocking_factor, block) params.append((block_coord, k)) params.append((k, block_coord)) - print(params) + Parallel(n_jobs=(no_of_primary - 1) * 2, require="sharedmem")( delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) @@ -81,18 +85,16 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None for block_i in range(no_of_primary): for block_j in range(no_of_primary): # skip all block previously computed, so skip every block with primary block value - if block_i != primary_block and block_j != primary_block: + if block_i != primary_block or block_j != primary_block: + i_range = _block_range(blocking_factor, block_i) + j_range = _block_range(blocking_factor, block_j) if is_prime: if block_i == no_of_primary - 1: - i_range = (block * blocking_factor, n) - j_range = _block_range(blocking_factor, block_j) + i_range = (block * blocking_factor, n - 1) if block_j == no_of_primary - 1: - j_range = (block * blocking_factor, n) - i_range = _block_range(blocking_factor, block_i) - else: - i_range = _block_range(blocking_factor, block_i) - j_range = _block_range(blocking_factor, block_j) + j_range = (block * blocking_factor, n - 1) params.append((i_range, j_range)) + Parallel(n_jobs=(no_of_primary - 1) ** 2, require="sharedmem")( delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) @@ -124,9 +126,9 @@ def _partial_floyd_warshall_numpy(A, k_iteration, i_iteration, j_iteration): A : 2D numpy.ndarray adjacency matrix updated after floyd warshall """ - for k in range(k_iteration[0], k_iteration[1]): - for i in range(i_iteration[0], i_iteration[1]): - for j in range(j_iteration[0], j_iteration[1]): + for k in range(k_iteration[0], k_iteration[1] + 1): + for i in range(i_iteration[0], i_iteration[1] + 1): + for j in range(j_iteration[0], j_iteration[1] + 1): A[i][j] = np.minimum(A[i][j], (A[i][k] + A[k][j])) From c4e89c0bbe58059d29293723244f243d15b78287 Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Wed, 14 Aug 2024 22:03:48 +0200 Subject: [PATCH 13/19] feat: :sparkles: Introduce normal floyd_warshall instead of numpy floyd_warshall assertion error on closeness centrality test --- .../algorithms/centrality/closeness.py | 28 ++-- .../algorithms/shortest_paths/dense.py | 151 ++++++++++++++---- nx_parallel/interface.py | 4 +- 3 files changed, 143 insertions(+), 40 deletions(-) diff --git a/nx_parallel/algorithms/centrality/closeness.py b/nx_parallel/algorithms/centrality/closeness.py index 69345706..dc1993c8 100644 --- a/nx_parallel/algorithms/centrality/closeness.py +++ b/nx_parallel/algorithms/centrality/closeness.py @@ -29,20 +29,24 @@ def closeness_centrality( if G.is_directed(): G = G.reverse() # create a reversed graph view - A = nxp.floyd_warshall_numpy(G, blocking_factor=blocking_factor) - + A = nxp.floyd_warshall(G, blocking_factor=blocking_factor) len_G = len(G) - - closeness_dict = Parallel(n_jobs=-1)( - delayed(_closeness_measure(n, wf_improved, len_G))(A) for n in A + if wf_improved: + print("Matrice con wf improved: \n", A) + else: + print("Matrice: \n", A) + key_value_pair = Parallel(n_jobs=-1)( + delayed(_closeness_measure)(k, n, wf_improved, len_G) for k, n in A.items() ) - + closeness_dict = {} + for key, value in key_value_pair: + closeness_dict[key] = value if u is not None: return closeness_dict[u] return closeness_dict -def _closeness_measure(n, wf_improved, len_G): +def _closeness_measure(k, v, wf_improved, len_G): """calculate the closeness centrality measure of one node using the row of edges i Parameters @@ -55,13 +59,15 @@ def _closeness_measure(n, wf_improved, len_G): k : numebr the closeness value for the selected node """ - totsp = sum(n) + n = v.values() + n_reachable = [x for x in n if x != float("inf")] + totsp = sum(n_reachable) closeness_value = 0.0 if totsp > 0.0 and len_G > 1: - closeness_value = (len(n) - 1.0) / totsp + closeness_value = (len(n_reachable) - 1.0) / totsp # normalize to number of nodes-1 in connected part if wf_improved: - s = (len(n) - 1.0) / (len_G - 1) + s = (len(n_reachable) - 1.0) / (len_G - 1) closeness_value *= s - return closeness_value + return k, closeness_value diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index 746f25f3..dba30b5e 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -1,16 +1,14 @@ """Floyd-Warshall algorithm for shortest paths.""" from joblib import Parallel, delayed import nx_parallel as nxp -import networkx as nx -import numpy as np import math __all__ = [ - "floyd_warshall_numpy", + "floyd_warshall", ] -def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None): +def floyd_warshall(G, weight="weight", blocking_factor=None): """ Parallel implementation of Floyd warshall using the tiled floyd warshall algorithm [1]_. @@ -24,6 +22,11 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None The default blocking factor is get by finding the optimal value for the core available + Returns + ------- + A : 2D array + All pairs shortest paths Graph adjacency matrix + References ---------- .. [1] Gary J. Katz and Joseph T. Kider Jr: @@ -32,27 +35,26 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None """ if hasattr(G, "graph_object"): G = G.graph_object - - if nodelist is not None: - if not (len(nodelist) == len(G) == len(set(nodelist))): - raise nx.NetworkXError( - "nodelist must contain every node in G with no repeats." - "If you wanted a subgraph of G use G.subgraph(nodelist)" - ) - - # To handle cases when an edge has weight=0, we must make sure that - # nonedges are not given the value 0 as well. - A = nx.to_numpy_array( - G, nodelist, multigraph_weight=min, weight=weight, nonedge=np.inf - ) - n, _ = A.shape - np.fill_diagonal(A, 0) # diagonal elements should be zero + undirected = not G.is_directed() + nodelist = list(G) + A = _adjacency_matrix(G, weight, nodelist, undirected) + n = G.number_of_nodes() total_cores = nxp.cpu_count() if blocking_factor is None: blocking_factor, is_prime = _find_nearest_divisor(n, total_cores) no_of_primary = n // blocking_factor + if no_of_primary <= 1: + return floyd_simple(G, weight) + print( + "blocking factor: ", + blocking_factor, + " number of block: ", + no_of_primary, + " number of core: ", + total_cores, + ) for primary_block in range(no_of_primary): k_start = (primary_block * n) // no_of_primary k_end = k_start + (n // no_of_primary) - 1 @@ -90,16 +92,18 @@ def floyd_warshall_numpy(G, nodelist=None, weight="weight", blocking_factor=None j_range = _block_range(blocking_factor, block_j) if is_prime: if block_i == no_of_primary - 1: - i_range = (block * blocking_factor, n - 1) + i_range = (block_i * blocking_factor, n - 1) if block_j == no_of_primary - 1: - j_range = (block * blocking_factor, n - 1) + j_range = (block_j * blocking_factor, n - 1) params.append((i_range, j_range)) - + # Parallel(n_jobs=(no_of_primary - 1) ** 2, require="sharedmem")( delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) + print("Matrice di adiacenza \n", A) + dist = _matrix_to_dict(A, nodelist) - return A + return dist def _partial_floyd_warshall_numpy(A, k_iteration, i_iteration, j_iteration): @@ -129,7 +133,7 @@ def _partial_floyd_warshall_numpy(A, k_iteration, i_iteration, j_iteration): for k in range(k_iteration[0], k_iteration[1] + 1): for i in range(i_iteration[0], i_iteration[1] + 1): for j in range(j_iteration[0], j_iteration[1] + 1): - A[i][j] = np.minimum(A[i][j], (A[i][k] + A[k][j])) + A[i][j] = min(A[i][j], (A[i][k] + A[k][j])) def _block_range(blocking_factor, block): @@ -140,9 +144,10 @@ def _calculate_divisor(i, x, y): if x % i == 0: divisor1 = result2 = i result1 = divisor2 = x // i - difference1 = abs((result1 - 1) ** 2 - y) - - difference2 = abs((result2 - 1) ** 2 - y) + # difference1 = abs((result1 - 1) ** 2 - y) + difference1 = abs(result1 - y) + # difference2 = abs((result2 - 1) ** 2 - y) + difference2 = abs(result2 - y) if difference1 < difference2: return divisor1, result1, difference1 @@ -161,6 +166,8 @@ def _find_nearest_divisor(x, y): y : cpu core available """ + if x < y: + return 1, False # Find the square root of x sqrt_x = int(math.sqrt(x)) + 1 @@ -180,3 +187,93 @@ def _find_nearest_divisor(x, y): best_divisor, _, _ = min(results, key=lambda x: x[2]) return best_divisor, False + + +def _adjacency_matrix(G, weight, nodelist, undirected): + """ + Generate an adjacency python matrix + + Parameters + ---------- + G : graph + The NetworkX graph used to construct the array. + + weight : string or None optional (default = 'weight') + The edge attribute that holds the numerical value used for + the edge weight. If an edge does not have that attribute, then the + value 1 is used instead. + + Returns + ------- + A : 2D array + Graph adjacency matrix + """ + + n = len(nodelist) + # Initialize the adjacency matrix with infinity values + A = [[float("inf") for _ in range(n)] for _ in range(n)] + + # Set diagonal elements to 0 (distance from node to itself) + for i in range(n): + A[i][i] = 0 + + def process_edge(src, dest, attribute, undirected): + src_idx = nodelist.index(src) + dest_idx = nodelist.index(dest) + A[src_idx][dest_idx] = attribute.get(weight, 1.0) + if undirected: + A[dest_idx][src_idx] = attribute.get(weight, 1.0) + + # Parallel processing of edges, modifying A directly + Parallel(n_jobs=-1, require="sharedmem")( + delayed(process_edge)(src, dest, attribute, undirected) + for src, dest, attribute in G.edges(data=True) + ) + return A + + +def _matrix_to_dict(A, nodelist): + """ + Convert a matrix (list of lists) to a dictionary of dictionaries. + + Parameters + ---------- + A : list of lists + The adjacency matrix to be converted. + + Returns + ------- + dist : dict + The resulting dictionary of distance. + """ + dist = {i: {} for i in nodelist} + + def process_row(row, i): + for column, j in enumerate(nodelist): + dist[i][j] = A[row][column] + + # Parallel processing of rows, modifying dist directly + Parallel(n_jobs=-1, require="sharedmem")( + delayed(process_row)(row, i) for row, i in enumerate(nodelist) + ) + + return dist + + +# TODO to remove for floyd warshall serial networkx +def floyd_simple(G, weight="weight"): + if hasattr(G, "graph_object"): + G = G.graph_object + undirected = not G.is_directed() + nodelist = list(G) + A = _adjacency_matrix(G, weight, nodelist, undirected) + n = G.number_of_nodes() + + for k in range(n): + for i in range(n): + for j in range(n): + A[i][j] = min(A[i][j], (A[i][k] + A[k][j])) + + dist = _matrix_to_dict(A, nodelist) + + return dist diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index 52af98bb..2c88c7b4 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -17,7 +17,7 @@ all_pairs_shortest_path, all_pairs_shortest_path_length, ) -from nx_parallel.algorithms.shortest_paths.dense import floyd_warshall_numpy +from nx_parallel.algorithms.shortest_paths.dense import floyd_warshall from nx_parallel.algorithms.efficiency_measures import local_efficiency from nx_parallel.algorithms.isolate import number_of_isolates from nx_parallel.algorithms.tournament import ( @@ -97,7 +97,7 @@ class BackendInterface: johnson = johnson # Shortest Paths : dense - floyd_warshall_numpy = floyd_warshall_numpy + floyd_warshall = floyd_warshall # Clustering square_clustering = square_clustering From 03071c2a5eb80e01c2780131f00019a7cc009d3f Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Sun, 18 Aug 2024 19:33:25 +0200 Subject: [PATCH 14/19] fix: :bug: fix bug in the submatrix division function les miserables test closeness --- .../algorithms/centrality/closeness.py | 8 ++-- .../algorithms/shortest_paths/dense.py | 41 +++++++------------ 2 files changed, 19 insertions(+), 30 deletions(-) diff --git a/nx_parallel/algorithms/centrality/closeness.py b/nx_parallel/algorithms/centrality/closeness.py index dc1993c8..088f7df5 100644 --- a/nx_parallel/algorithms/centrality/closeness.py +++ b/nx_parallel/algorithms/centrality/closeness.py @@ -31,10 +31,7 @@ def closeness_centrality( A = nxp.floyd_warshall(G, blocking_factor=blocking_factor) len_G = len(G) - if wf_improved: - print("Matrice con wf improved: \n", A) - else: - print("Matrice: \n", A) + key_value_pair = Parallel(n_jobs=-1)( delayed(_closeness_measure)(k, n, wf_improved, len_G) for k, n in A.items() ) @@ -60,8 +57,11 @@ def _closeness_measure(k, v, wf_improved, len_G): the closeness value for the selected node """ n = v.values() + # print(n) n_reachable = [x for x in n if x != float("inf")] + # print(n_reachable,len(n_reachable)) totsp = sum(n_reachable) + # print(totsp) closeness_value = 0.0 if totsp > 0.0 and len_G > 1: closeness_value = (len(n_reachable) - 1.0) / totsp diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index dba30b5e..4dae6bf5 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -41,12 +41,16 @@ def floyd_warshall(G, weight="weight", blocking_factor=None): n = G.number_of_nodes() total_cores = nxp.cpu_count() + print( + "number of nodes: ", + n, + " number of core: ", + total_cores, + ) if blocking_factor is None: blocking_factor, is_prime = _find_nearest_divisor(n, total_cores) no_of_primary = n // blocking_factor - if no_of_primary <= 1: - return floyd_simple(G, weight) print( "blocking factor: ", blocking_factor, @@ -62,9 +66,10 @@ def floyd_warshall(G, weight="weight", blocking_factor=None): k_end = k_end + (n % no_of_primary) k = (k_start, k_end) # Phase 1: Compute Primary block + # print("\n\niteration:",primary_block,"\n\n") # Execute Normal floyd warshall for the primary block submatrix _partial_floyd_warshall_numpy(A, k, k, k) - + # print("After phase 1 - it",primary_block,":\n",A) # Phase 2: Compute Cross block params = [] @@ -82,6 +87,8 @@ def floyd_warshall(G, weight="weight", blocking_factor=None): Parallel(n_jobs=(no_of_primary - 1) * 2, require="sharedmem")( delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) + # print("phase 2, coordinate", params) + # print("After phase 2 - it",primary_block,":\n",A) # Phase 3: Compute remaining params.clear() for block_i in range(no_of_primary): @@ -96,11 +103,12 @@ def floyd_warshall(G, weight="weight", blocking_factor=None): if block_j == no_of_primary - 1: j_range = (block_j * blocking_factor, n - 1) params.append((i_range, j_range)) - # Parallel(n_jobs=(no_of_primary - 1) ** 2, require="sharedmem")( delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) - print("Matrice di adiacenza \n", A) + # print("phase 3, coordinate", params) + # print("After phase 3 - it",primary_block,":\n",A) + # print("Matrice di adiacenza \n", A) dist = _matrix_to_dict(A, nodelist) return dist @@ -173,13 +181,13 @@ def _find_nearest_divisor(x, y): # Execute the calculation in parallel results = Parallel(n_jobs=-1)( - delayed(_calculate_divisor)(i, x, y) for i in range(1, sqrt_x) + delayed(_calculate_divisor)(i, x, y) for i in range(2, sqrt_x) ) # Filter out None results results = [r for r in results if r is not None] - if len(results) <= 1: + if len(results) <= 0: # This check if a number is prime, although repeat process with a non prime number best_divisor, _ = _find_nearest_divisor(x - 1, y) return best_divisor, True @@ -258,22 +266,3 @@ def process_row(row, i): ) return dist - - -# TODO to remove for floyd warshall serial networkx -def floyd_simple(G, weight="weight"): - if hasattr(G, "graph_object"): - G = G.graph_object - undirected = not G.is_directed() - nodelist = list(G) - A = _adjacency_matrix(G, weight, nodelist, undirected) - n = G.number_of_nodes() - - for k in range(n): - for i in range(n): - for j in range(n): - A[i][j] = min(A[i][j], (A[i][k] + A[k][j])) - - dist = _matrix_to_dict(A, nodelist) - - return dist From 6e5932cf88f5fe45540e8088e5d41e35cc3fd577 Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Mon, 19 Aug 2024 18:04:26 +0200 Subject: [PATCH 15/19] fix: :construction: fix weight problem Fix weight problem for optional unweighted graphs but it's painfully slow --- nx_parallel/algorithms/centrality/closeness.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nx_parallel/algorithms/centrality/closeness.py b/nx_parallel/algorithms/centrality/closeness.py index 088f7df5..764d31cf 100644 --- a/nx_parallel/algorithms/centrality/closeness.py +++ b/nx_parallel/algorithms/centrality/closeness.py @@ -29,7 +29,7 @@ def closeness_centrality( if G.is_directed(): G = G.reverse() # create a reversed graph view - A = nxp.floyd_warshall(G, blocking_factor=blocking_factor) + A = nxp.floyd_warshall(G, weight=distance, blocking_factor=blocking_factor) len_G = len(G) key_value_pair = Parallel(n_jobs=-1)( From 020eaad45b38e826c977c444165f24111b9034c4 Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Mon, 23 Sep 2024 17:01:11 +0200 Subject: [PATCH 16/19] style :art: fix format style --- _nx_parallel/__init__.py | 15 +++++++++++ .../algorithms/shortest_paths/dense.py | 26 +++---------------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/_nx_parallel/__init__.py b/_nx_parallel/__init__.py index 449682ce..05547c6e 100644 --- a/_nx_parallel/__init__.py +++ b/_nx_parallel/__init__.py @@ -90,6 +90,13 @@ def get_info(): 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." }, }, + "closeness_centrality": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/closeness.py#L11", + "additional_docs": "The parallel implementation of closeness centrality, that use parallel tiled floy warshall to find the geodesic distance of the node", + "additional_parameters": { + "blocking_factor : number": "The number used for divinding the adjacency matrix in sub-matrix. The default blocking factor is get by finding the optimal value for the core available" + }, + }, "closeness_vitality": { "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/vitality.py#L10", "additional_docs": "The parallel computation is implemented only when the node is not specified. The closeness vitality for each node is computed concurrently.", @@ -104,6 +111,14 @@ def get_info(): 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." }, }, + "floyd_warshall": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/dense.py#L11", + "additional_docs": "Parallel implementation of Floyd warshall using the tiled floyd warshall algorithm [1]_.", + "additional_parameters": { + "blocking_factor : number": "The number used for divinding the adjacency matrix in sub-matrix. The default blocking factor is get by finding the optimal value for the core available", + "A : 2D array": "All pairs shortest paths Graph adjacency matrix", + }, + }, "is_reachable": { "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L13", "additional_docs": "The function parallelizes the calculation of two neighborhoods of vertices in `G` and checks closure conditions for each neighborhood subset in parallel.", diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index 4dae6bf5..dcc02f9a 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -40,25 +40,12 @@ def floyd_warshall(G, weight="weight", blocking_factor=None): A = _adjacency_matrix(G, weight, nodelist, undirected) n = G.number_of_nodes() - total_cores = nxp.cpu_count() - print( - "number of nodes: ", - n, - " number of core: ", - total_cores, - ) + total_cores = nxp.get_n_jobs() + if blocking_factor is None: blocking_factor, is_prime = _find_nearest_divisor(n, total_cores) no_of_primary = n // blocking_factor - print( - "blocking factor: ", - blocking_factor, - " number of block: ", - no_of_primary, - " number of core: ", - total_cores, - ) for primary_block in range(no_of_primary): k_start = (primary_block * n) // no_of_primary k_end = k_start + (n // no_of_primary) - 1 @@ -66,10 +53,9 @@ def floyd_warshall(G, weight="weight", blocking_factor=None): k_end = k_end + (n % no_of_primary) k = (k_start, k_end) # Phase 1: Compute Primary block - # print("\n\niteration:",primary_block,"\n\n") # Execute Normal floyd warshall for the primary block submatrix _partial_floyd_warshall_numpy(A, k, k, k) - # print("After phase 1 - it",primary_block,":\n",A) + # Phase 2: Compute Cross block params = [] @@ -87,8 +73,7 @@ def floyd_warshall(G, weight="weight", blocking_factor=None): Parallel(n_jobs=(no_of_primary - 1) * 2, require="sharedmem")( delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) - # print("phase 2, coordinate", params) - # print("After phase 2 - it",primary_block,":\n",A) + # Phase 3: Compute remaining params.clear() for block_i in range(no_of_primary): @@ -106,9 +91,6 @@ def floyd_warshall(G, weight="weight", blocking_factor=None): Parallel(n_jobs=(no_of_primary - 1) ** 2, require="sharedmem")( delayed(_partial_floyd_warshall_numpy)(A, k, i, j) for (i, j) in params ) - # print("phase 3, coordinate", params) - # print("After phase 3 - it",primary_block,":\n",A) - # print("Matrice di adiacenza \n", A) dist = _matrix_to_dict(A, nodelist) return dist From 6ecb2e8e01a1cf264c5e5a043c2bd282b8795d2b Mon Sep 17 00:00:00 2001 From: Aditi Juneja <91629733+Schefflera-Arboricola@users.noreply.github.com> Date: Fri, 27 Sep 2024 18:51:34 +0530 Subject: [PATCH 17/19] sorry missed closeness_centrality --- nx_parallel/interface.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index 5f3f1b5e..79cdbb4b 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -18,6 +18,7 @@ # Centrality "betweenness_centrality", "edge_betweenness_centrality", + "closeness_centrality", # Efficiency "local_efficiency", # Shortest Paths : generic From 0733949d4469c173b6a5b920cff7893b3c9c97f6 Mon Sep 17 00:00:00 2001 From: Aditi Juneja <91629733+Schefflera-Arboricola@users.noreply.github.com> Date: Fri, 27 Sep 2024 18:53:43 +0530 Subject: [PATCH 18/19] Updating __init__.py --- nx_parallel/algorithms/centrality/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nx_parallel/algorithms/centrality/__init__.py b/nx_parallel/algorithms/centrality/__init__.py index cf7adb68..ae7abf03 100644 --- a/nx_parallel/algorithms/centrality/__init__.py +++ b/nx_parallel/algorithms/centrality/__init__.py @@ -1 +1,2 @@ from .betweenness import * +from .closeness import * From 1048683b7a299d0dd49146bfae04b8966661a1f4 Mon Sep 17 00:00:00 2001 From: Marco Galeri Date: Fri, 27 Sep 2024 20:22:45 +0200 Subject: [PATCH 19/19] style: :green_heart: fix style --- _nx_parallel/__init__.py | 2 +- nx_parallel/algorithms/shortest_paths/dense.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/_nx_parallel/__init__.py b/_nx_parallel/__init__.py index d1576784..96bc66c5 100644 --- a/_nx_parallel/__init__.py +++ b/_nx_parallel/__init__.py @@ -112,7 +112,7 @@ def get_info(): }, }, "floyd_warshall": { - "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/dense.py#L11", + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/dense.py#L12", "additional_docs": "Parallel implementation of Floyd warshall using the tiled floyd warshall algorithm [1]_.", "additional_parameters": { "blocking_factor : number": "The number used for divinding the adjacency matrix in sub-matrix. The default blocking factor is get by finding the optimal value for the core available", diff --git a/nx_parallel/algorithms/shortest_paths/dense.py b/nx_parallel/algorithms/shortest_paths/dense.py index dcc02f9a..a8bbbaf1 100644 --- a/nx_parallel/algorithms/shortest_paths/dense.py +++ b/nx_parallel/algorithms/shortest_paths/dense.py @@ -1,4 +1,5 @@ """Floyd-Warshall algorithm for shortest paths.""" + from joblib import Parallel, delayed import nx_parallel as nxp import math