Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/dhnx/gistools/connect_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,8 @@ def process_geometry(
# Convert all MultiLineStrings to LineStrings
check_geometry_type(lines_all, types=["LineString"])

go.drop_detours(lines_all)

# ## check for near points
go.check_double_points(points_all, id_column="id_full")

Expand Down
153 changes: 152 additions & 1 deletion src/dhnx/gistools/geometry_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
SPDX-License-Identifier: MIT
"""

import math

try:
import geopandas as gpd

except ImportError:
print("Need to install geopandas to process geometry data.")

import networkx as nx

try:
import shapely
from shapely import wkt
Expand Down Expand Up @@ -629,6 +632,154 @@ def drop_parallel_lines(gdf):
return gdf


def drop_detours(lines_all):
"""Keep only lines that are the shortest connections between two points.
"""
graph = nx.Graph()
graph.add_edges_from([
(a, b, {"weight": length})
for a, b, length
in zip(
lines_all["from_node"],
lines_all["to_node"],
lines_all["length"],
)
])

# identify rows contaning "detour" lines
lines_to_drop = []
for row, line in lines_all.iterrows():
if line["length"] > nx.shortest_path_length(
graph,
source=line["from_node"],
target=line["to_node"],
weight="weight",
):
lines_to_drop.append(row)

lines_all.drop(lines_to_drop, inplace=True)

return lines_all


def simplify_graph(
graph: nx.Graph,
) -> bool:
graph_was_updated = False
graph_needs_iteration = True
while graph_needs_iteration:
graph_needs_iteration = False
detours_dropped = _drop_detours(graph)
forks_removed = _remove_useless_forks(graph)

# if something changed, we need a new iteration
graph_needs_iteration = detours_dropped or forks_removed

# graph was updated if new iteration is needed or it was updated before
graph_was_updated = graph_needs_iteration or graph_was_updated

return graph_was_updated


def annotate_distance(
graph: nx.Graph,
) -> None:
"""Inefficient algorithm that doesthe job."""
for source in list(graph.nodes()):
source_type = graph.nodes[source]["type"]
for target in list(graph.nodes()):
target_type = graph.nodes[target]["type"]
if source_type != target_type:
source_distance = graph.nodes[source].get("distance", math.inf)
target_distance = graph.nodes[target].get("distance", math.inf)
path_length = nx.shortest_path_length(
graph,
source=source,
target=target,
weight="weight",
)
graph.nodes[source]["distance"] = min(
path_length, source_distance
)
graph.nodes[target]["distance"] = min(
path_length, target_distance
)


def longest_distance(
graph: nx.Graph,
) -> float:
_longest_distance = 0.0
for source in list(graph.nodes()):
if graph.nodes[source]['type'] != "fork":
for target in list(graph.nodes()):
if graph.nodes[target]['type'] != "fork":
_longest_distance = max(
_longest_distance,
nx.shortest_path_length(
graph,
source=source,
target=target,
weight="weight",
),
)
return _longest_distance


def _drop_detours(
graph: nx.Graph,
) -> bool:
graph_was_updated = False
for (source, target) in list(graph.edges()):
edge_weight = graph[source][target]["weight"]
if edge_weight > nx.shortest_path_length(
graph,
source=source,
target=target,
weight="weight",
):
graph.remove_edge(source, target)
graph_was_updated = True

return graph_was_updated


def _remove_useless_forks(
graph: nx.Graph,
) -> bool:
"""Removes forks that only connect two lines as well as dead ends.

You need to iterate to also remove forks
that connected dead ends to meaningful lines.
"""
graph_was_updated = False
for node in list(graph.nodes()):
if graph.nodes[node]['type'] == "fork":
if graph.degree(node) == 1:
graph.remove_node(node)
graph_was_updated = True
elif graph.degree(node) == 2:
edges = list(graph.edges(node))
edge_weight = (
graph[edges[0][0]][edges[0][1]]["weight"]
+ graph[edges[1][0]][edges[1][1]]["weight"]
)
via = (
graph[edges[0][0]][edges[0][1]].get("via", [edges[0]])
+ graph[edges[1][0]][edges[1][1]].get("via", [edges[1]])
)

graph.add_edge(
edges[0][1],
edges[1][1],
weight=edge_weight,
via=via,
)
graph.remove_node(node)
graph_was_updated = True
return graph_was_updated


def check_crs(gdf, crs=4647, force_2d=True):
"""Convert CRS to EPSG:4647 - ETRS89 / UTM zone 32N (zE-N).

Expand Down
73 changes: 73 additions & 0 deletions tests/test_gistools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""

import geopandas as gpd
import networkx as nx
from shapely.geometry import LineString
from shapely.geometry import MultiLineString
from shapely.geometry import Point
Expand All @@ -35,3 +36,75 @@
results = go.split_multilinestr_to_linestr(gdf_line)
assert gdf_line.geometry.length.sum() == results.length.sum()
assert len(results.index) == 7


def test_drop_detours():
edgelist = [
(0, 1, {"weight": 5}),
(1, 2, {"weight": 2}),
(2, 0, {"weight": 2}),
]
graph = nx.Graph(edgelist)

go._drop_detours(graph)

# longer connection with direct edge has been dropped
assert list(graph.edges()) == [(0, 2), (1, 2)]

nodelist = [

Check failure on line 54 in tests/test_gistools.py

View workflow job for this annotation

GitHub Actions / Flake8

tests/test_gistools.py#L54

Expected 2 blank lines after class or function definition, found 1 (E305)
(0, {"type": "fork"}),
(1, {"type": "fork"}),
(2, {"type": "fork"}),
(3, {"type": "fork"}),
(4, {"type": "fork"}),
("s1", {"type": "sink"}),
("s2", {"type": "sink"}),
]
edgelist = [
("s1", 0, {"weight": 1}),
(0, 1, {"weight": 5}),
(1, 2, {"weight": 2}),
(2, 4, {"weight": 7}),
(2, 3, {"weight": 2}),
("s2", 3, {"weight": 1}),
]


def test_annotate_distance():
graph = nx.Graph()
graph.add_nodes_from(nodelist)
graph.add_edges_from(edgelist)
go.annotate_distance(graph)

distances = {
0: 1,
1: 5,
2: 3,
3: 1,
4: 10,
"s1": 1,
"s2": 1,
}
node_list = list(graph.nodes())
for node in node_list:
assert graph.nodes[node]["distance"] == distances[node]


def test_longest_distance():
graph = nx.Graph()
graph.add_nodes_from(nodelist)
graph.add_edges_from(edgelist)

assert go.longest_distance(graph) == 5 + 2 + 2 + 1 + 1


def test_remove_useless_forks():
graph = nx.Graph()
graph.add_nodes_from(nodelist)
graph.add_edges_from(edgelist)
graph_was_updated = go.simplify_graph(graph)

assert graph_was_updated
assert list(graph.edges()) == [("s1", "s2")]
assert len(graph["s1"]["s2"]["via"]) == 5
assert graph["s1"]["s2"]["weight"] == 1 + 5 + 2 + 2 + 1
Loading