diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 3cf48d3..631256f 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1,13 +1,9 @@
repos:
- - repo: https://github.com/charliermarsh/ruff-pre-commit
- rev: v0.8.0
- hooks:
- - id: ruff
- - id: ruff-format
-
- - repo: https://github.com/pre-commit/mirrors-mypy
- rev: "v1.13.0"
- hooks:
- - id: mypy
- additional_dependencies:
- [matplotlib, pandas-stubs, pytest, types-requests]
+ - repo: local
+ hooks:
+ - id: pixi-check
+ name: Run pixi checks
+ entry: pixi run -e dev check
+ language: system
+ pass_filenames: false
+ always_run: true
diff --git a/CLAUDE.md b/CLAUDE.md
new file mode 100644
index 0000000..07804a9
--- /dev/null
+++ b/CLAUDE.md
@@ -0,0 +1,17 @@
+# Project Context
+
+When working with this codebase, prioritize readability over cleverness. Ask clarifying questions before making architectural changes.
+
+## Project overview
+
+Mappymatch is a python package used to match a series of GPS waypoints (Trace) to a road network.
+
+
+## Common Commands
+
+### running full check (test, types, lint, format)
+
+```
+pixi run -e dev check
+```
+
diff --git a/mappymatch/maps/nx/nx_map.py b/mappymatch/maps/nx/nx_map.py
index 6f330e4..9ea4867 100644
--- a/mappymatch/maps/nx/nx_map.py
+++ b/mappymatch/maps/nx/nx_map.py
@@ -209,6 +209,7 @@ def from_geofence(
network_type: NetworkType = NetworkType.DRIVE,
custom_filter: Optional[str] = None,
additional_metadata_keys: Optional[set | list] = None,
+ filter_to_largest_component: bool = True,
) -> NxMap:
"""
Read an OSM network graph into a NxMap
@@ -218,7 +219,9 @@ def from_geofence(
xy: whether to use xy coordinates or lat/lon
network_type: the network type to use for the graph
custom_filter: a custom filter to pass to osmnx like '["highway"~"motorway|primary"]'
- additional_metadata_keys: additional keys to preserve in road metadata like '["maxspeed", "highway"]
+ additional_metadata_keys: additional keys to preserve in road metadata like '["maxspeed", "highway"]'
+ filter_to_largest_component: if True, keep only the largest strongly connected component;
+ if False, keep all components (may result in routing failures between disconnected components)
Returns:
a NxMap
@@ -237,6 +240,7 @@ def from_geofence(
xy=xy,
custom_filter=custom_filter,
additional_metadata_keys=additional_metadata_keys,
+ filter_to_largest_component=filter_to_largest_component,
)
return NxMap(nx_graph)
@@ -374,12 +378,17 @@ def shortest_path(
else:
dest_id = dest_road.road_id.end
- nx_route = nx.shortest_path(
- self.g,
- origin_id,
- dest_id,
- weight=weight,
- )
+ try:
+ nx_route = nx.shortest_path(
+ self.g,
+ origin_id,
+ dest_id,
+ weight=weight,
+ )
+ except nx.NetworkXNoPath:
+ # No path exists between origin and destination
+ # This can happen when the graph has multiple disconnected components
+ return []
path = []
for i in range(1, len(nx_route)):
diff --git a/mappymatch/maps/nx/readers/osm_readers.py b/mappymatch/maps/nx/readers/osm_readers.py
index 72b7e31..fe1fec7 100644
--- a/mappymatch/maps/nx/readers/osm_readers.py
+++ b/mappymatch/maps/nx/readers/osm_readers.py
@@ -38,6 +38,7 @@ def nx_graph_from_osmnx(
xy: bool = True,
custom_filter: Optional[str] = None,
additional_metadata_keys: Optional[set] = None,
+ filter_to_largest_component: bool = True,
) -> nx.MultiDiGraph:
"""
Build a networkx graph from OSM data
@@ -48,6 +49,8 @@ def nx_graph_from_osmnx(
xy: whether to use xy coordinates or lat/lon
custom_filter: a custom filter to pass to osmnx
additional_metadata_keys: additional keys to preserve in metadata
+ filter_to_largest_component: if True, keep only the largest strongly connected component;
+ if False, keep all components (may result in routing failures between disconnected components)
Returns:
a networkx graph of the OSM network
@@ -68,6 +71,7 @@ def nx_graph_from_osmnx(
network_type,
xy=xy,
additional_metadata_keys=additional_metadata_keys,
+ filter_to_largest_component=filter_to_largest_component,
)
@@ -76,6 +80,7 @@ def parse_osmnx_graph(
network_type: NetworkType,
xy: bool = True,
additional_metadata_keys: Optional[set] = None,
+ filter_to_largest_component: bool = True,
) -> nx.MultiDiGraph:
"""
Parse the raw osmnx graph into a graph that we can use with our NxMap
@@ -85,6 +90,8 @@ def parse_osmnx_graph(
xy: whether to use xy coordinates or lat/lon
network_type: the network type to use for the graph
additional_metadata_keys: additional keys to preserve in metadata
+ filter_to_largest_component: if True, keep only the largest strongly connected component;
+ if False, keep all components (may result in routing failures between disconnected components)
Returns:
a cleaned networkx graph of the OSM network
@@ -107,15 +114,16 @@ def parse_osmnx_graph(
nx.set_edge_attributes(g, kilometers, "kilometers")
# this makes sure there are no graph 'dead-ends'
- sg_components = nx.strongly_connected_components(g)
+ if filter_to_largest_component:
+ sg_components = nx.strongly_connected_components(g)
- if not sg_components:
- raise MapException(
- "road network has no strongly connected components and is not routable; "
- "check polygon boundaries."
- )
+ if not sg_components:
+ raise MapException(
+ "road network has no strongly connected components and is not routable; "
+ "check polygon boundaries."
+ )
- g = nx.MultiDiGraph(g.subgraph(max(sg_components, key=len)))
+ g = nx.MultiDiGraph(g.subgraph(max(sg_components, key=len)))
for u, v, d in g.edges(data=True):
if "geometry" not in d:
diff --git a/mappymatch/matchers/lcss/constructs.py b/mappymatch/matchers/lcss/constructs.py
index e07bea1..d1e2609 100644
--- a/mappymatch/matchers/lcss/constructs.py
+++ b/mappymatch/matchers/lcss/constructs.py
@@ -115,7 +115,7 @@ def score_and_match(
if m < 1:
# todo: find a better way to handle this edge case
raise Exception("traces of 0 points can't be matched")
- elif n < 2:
+ elif n == 0:
# a path was not found for this segment; might not be matchable;
# we set a score of zero and return a set of no-matches
matches = [
diff --git a/mappymatch/matchers/lcss/lcss.py b/mappymatch/matchers/lcss/lcss.py
index bb2626f..7647134 100644
--- a/mappymatch/matchers/lcss/lcss.py
+++ b/mappymatch/matchers/lcss/lcss.py
@@ -1,15 +1,13 @@
import functools as ft
import logging
-from shapely.geometry import Point
-
-from mappymatch.constructs.coordinate import Coordinate
from mappymatch.maps.map_interface import MapInterface
from mappymatch.matchers.lcss.constructs import TrajectorySegment
from mappymatch.matchers.lcss.ops import (
add_matches_for_stationary_points,
drop_stationary_points,
find_stationary_points,
+ join_segment,
new_path,
same_trajectory_scheme,
split_trajectory_segment,
@@ -59,31 +57,6 @@ def __init__(
self.distance_threshold = distance_threshold
def match_trace(self, trace: Trace) -> MatchResult:
- def _join_segment(a: TrajectorySegment, b: TrajectorySegment):
- new_traces = a.trace + b.trace
- new_path = a.path + b.path
-
- # test to see if there is a gap between the paths and if so,
- # try to connect it
- if len(a.path) > 1 and len(b.path) > 1:
- end_road = a.path[-1]
- start_road = b.path[0]
- if end_road.road_id.end != start_road.road_id.start:
- o = Coordinate(
- coordinate_id=None,
- geom=Point(end_road.geom.coords[-1]),
- crs=new_traces.crs,
- )
- d = Coordinate(
- coordinate_id=None,
- geom=Point(start_road.geom.coords[0]),
- crs=new_traces.crs,
- )
- path = self.road_map.shortest_path(o, d)
- new_path = a.path + path + b.path
-
- return TrajectorySegment(new_traces, new_path)
-
stationary_index = find_stationary_points(trace)
sub_trace = drop_stationary_points(trace, stationary_index)
@@ -115,7 +88,7 @@ def _join_segment(a: TrajectorySegment, b: TrajectorySegment):
# split and check the score
new_split = split_trajectory_segment(road_map, scored_segment)
joined_segment = ft.reduce(
- _join_segment, new_split
+ lambda a, b: join_segment(road_map, a, b), new_split
).score_and_match(de, dt)
if joined_segment.score > scored_segment.score:
# we found a better fit
@@ -128,7 +101,9 @@ def _join_segment(a: TrajectorySegment, b: TrajectorySegment):
scheme = next_scheme
- joined_segment = ft.reduce(_join_segment, scheme).score_and_match(de, dt)
+ joined_segment = ft.reduce(
+ lambda a, b: join_segment(road_map, a, b), scheme
+ ).score_and_match(de, dt)
matches = joined_segment.matches
diff --git a/mappymatch/matchers/lcss/ops.py b/mappymatch/matchers/lcss/ops.py
index bd2af7b..be9d69b 100644
--- a/mappymatch/matchers/lcss/ops.py
+++ b/mappymatch/matchers/lcss/ops.py
@@ -2,6 +2,8 @@
from copy import deepcopy
from typing import Any, List, NamedTuple
+from shapely.geometry import Point
+
from mappymatch.constructs.coordinate import Coordinate
from mappymatch.constructs.match import Match
from mappymatch.constructs.road import Road
@@ -16,6 +18,49 @@
log = logging.getLogger(__name__)
+def join_segment(
+ road_map: MapInterface, a: TrajectorySegment, b: TrajectorySegment
+) -> TrajectorySegment:
+ """
+ Join two trajectory segments together, attempting to route between them if needed.
+
+ Args:
+ road_map: The road map to use for routing
+ a: The first trajectory segment
+ b: The second trajectory segment
+
+ Returns:
+ A new trajectory segment combining both segments
+ """
+ new_traces = a.trace + b.trace
+ new_path = a.path + b.path
+
+ # test to see if there is a gap between the paths and if so,
+ # try to connect it
+ if len(a.path) > 0 and len(b.path) > 0:
+ end_road = a.path[-1]
+ start_road = b.path[0]
+ if end_road.road_id.end != start_road.road_id.start:
+ o = Coordinate(
+ coordinate_id=None,
+ geom=Point(end_road.geom.coords[-1]),
+ crs=new_traces.crs,
+ )
+ d = Coordinate(
+ coordinate_id=None,
+ geom=Point(start_road.geom.coords[0]),
+ crs=new_traces.crs,
+ )
+ path = road_map.shortest_path(o, d)
+ # If no path exists (disconnected components), just concatenate the paths
+ if path:
+ new_path = a.path + path + b.path
+ else:
+ new_path = a.path + b.path
+
+ return TrajectorySegment(new_traces, new_path)
+
+
def new_path(
road_map: MapInterface,
trace: Trace,
diff --git a/mappymatch/utils/plot.py b/mappymatch/utils/plot.py
deleted file mode 100644
index 36cf03b..0000000
--- a/mappymatch/utils/plot.py
+++ /dev/null
@@ -1,244 +0,0 @@
-from typing import List, Optional, Union
-
-import folium
-import geopandas as gpd
-import matplotlib.pyplot as plt
-import pandas as pd
-from pyproj import CRS
-from shapely.geometry import Point
-
-from mappymatch.constructs.geofence import Geofence
-from mappymatch.constructs.match import Match
-from mappymatch.constructs.road import Road
-from mappymatch.constructs.trace import Trace
-from mappymatch.maps.nx.nx_map import NxMap
-from mappymatch.matchers.matcher_interface import MatchResult
-from mappymatch.utils.crs import LATLON_CRS, XY_CRS
-
-
-def plot_geofence(geofence: Geofence, m: Optional[folium.Map] = None):
- """
- Plot geofence.
-
- Args:
- geofence: The geofence to plot
- m: the folium map to plot on
-
- Returns:
- The updated folium map with the geofence.
- """
- if not geofence.crs == LATLON_CRS:
- raise NotImplementedError("can currently only plot a geofence with lat lon crs")
-
- if not m:
- c = geofence.geometry.centroid.coords[0]
- m = folium.Map(location=[c[1], c[0]], zoom_start=11)
-
- folium.GeoJson(geofence.geometry).add_to(m)
-
- return m
-
-
-def plot_trace(
- trace: Trace,
- m: Optional[folium.Map] = None,
- point_color: str = "black",
- line_color: Optional[str] = "green",
-):
- """
- Plot a trace.
-
- Args:
- trace: The trace.
- m: the folium map to plot on
- point_color: The color the points will be plotted in.
- line_color: The color for lines. If None, no lines will be plotted.
-
- Returns:
- An updated folium map with a plot of trace.
- """
-
- if not trace.crs == LATLON_CRS:
- trace = trace.to_crs(LATLON_CRS)
-
- if not m:
- mid_coord = trace.coords[int(len(trace) / 2)]
- m = folium.Map(location=[mid_coord.y, mid_coord.x], zoom_start=11)
-
- for i, c in enumerate(trace.coords):
- folium.Circle(
- location=(c.y, c.x),
- radius=5,
- color=point_color,
- tooltip=str(i),
- fill=True,
- fill_opacity=0.8,
- fill_color=point_color,
- ).add_to(m)
-
- if line_color is not None:
- folium.PolyLine([(p.y, p.x) for p in trace.coords], color=line_color).add_to(m)
-
- return m
-
-
-def plot_matches(matches: Union[MatchResult, List[Match]], crs=XY_CRS):
- """
- Plots a trace and the relevant matches on a folium map.
-
- Args:
- matches: A list of matches or a MatchResult.
- crs: what crs to plot in. Defaults to XY_CRS.
-
- Returns:
- A folium map with trace and matches plotted.
- """
- if isinstance(matches, MatchResult):
- matches = matches.matches
-
- def _match_to_road(m):
- """Private function."""
- d = {"road_id": m.road.road_id, "geom": m.road.geom}
- return d
-
- def _match_to_coord(m):
- """Private function."""
- d = {
- "road_id": m.road.road_id,
- "geom": Point(m.coordinate.x, m.coordinate.y),
- "distance": m.distance,
- }
-
- return d
-
- road_df = pd.DataFrame([_match_to_road(m) for m in matches if m.road])
- road_df = road_df.loc[road_df.road_id.shift() != road_df.road_id]
- road_gdf = gpd.GeoDataFrame(road_df, geometry=road_df.geom, crs=crs).drop(
- columns=["geom"]
- )
- road_gdf = road_gdf.to_crs(LATLON_CRS)
-
- coord_df = pd.DataFrame([_match_to_coord(m) for m in matches if m.road])
-
- coord_gdf = gpd.GeoDataFrame(coord_df, geometry=coord_df.geom, crs=crs).drop(
- columns=["geom"]
- )
- coord_gdf = coord_gdf.to_crs(LATLON_CRS)
-
- mid_i = int(len(coord_gdf) / 2)
- mid_coord = coord_gdf.iloc[mid_i].geometry
-
- fmap = folium.Map(location=[mid_coord.y, mid_coord.x], zoom_start=11)
-
- for coord in coord_gdf.itertuples():
- folium.Circle(
- location=(coord.geometry.y, coord.geometry.x),
- radius=5,
- tooltip=f"road_id: {coord.road_id}\ndistance: {coord.distance}",
- ).add_to(fmap)
-
- for road in road_gdf.itertuples():
- folium.PolyLine(
- [(lat, lon) for lon, lat in road.geometry.coords],
- color="red",
- tooltip=road.road_id,
- ).add_to(fmap)
-
- return fmap
-
-
-def plot_map(tmap: NxMap, m: Optional[folium.Map] = None):
- """
- Plot the roads on an NxMap.
-
- Args:
- tmap: The Nxmap to plot.
- m: the folium map to add to
-
- Returns:
- The folium map with the roads plotted.
- """
-
- # TODO make this generic to all map types, not just NxMap
- roads = list(tmap.g.edges(data=True))
- road_df = pd.DataFrame([r[2] for r in roads])
- gdf = gpd.GeoDataFrame(road_df, geometry=road_df[tmap._geom_key], crs=tmap.crs)
- if gdf.crs != LATLON_CRS:
- gdf = gdf.to_crs(LATLON_CRS)
-
- if not m:
- c = gdf.iloc[int(len(gdf) / 2)].geometry.centroid.coords[0]
- m = folium.Map(location=[c[1], c[0]], zoom_start=11)
-
- for t in gdf.itertuples():
- folium.PolyLine(
- [(lat, lon) for lon, lat in t.geometry.coords],
- color="red",
- ).add_to(m)
-
- return m
-
-
-def plot_match_distances(matches: MatchResult):
- """
- Plot the points deviance from known roads with matplotlib.
-
- Args:
- matches (MatchResult): The coordinates of guessed points in the area in the form of a MatchResult object.
- """
-
- y = [
- m.distance for m in matches.matches
- ] # y contains distances to the expected line for all of the matches which will be plotted on the y-axis.
- x = [
- i for i in range(0, len(y))
- ] # x contains placeholder values for every y value (distance measurement) along the x-axis.
-
- plt.figure(figsize=(15, 7))
- plt.autoscale(enable=True)
- plt.scatter(x, y)
- plt.title("Distance To Nearest Road")
- plt.ylabel("Meters")
- plt.xlabel("Point Along The Path")
- plt.show()
-
-
-def plot_path(
- path: List[Road],
- crs: CRS,
- m: Optional[folium.Map] = None,
- line_color="red",
- line_weight=10,
- line_opacity=0.8,
-):
- """
- Plot a list of roads.
-
- Args:
- path: The path to plot.
- crs: The crs of the path.
- m: The folium map to add to.
- line_color: The color of the line.
- line_weight: The weight of the line.
- line_opacity: The opacity of the line.
- """
- road_df = pd.DataFrame([{"geom": r.geom} for r in path])
- road_gdf = gpd.GeoDataFrame(road_df, geometry=road_df.geom, crs=crs)
- road_gdf = road_gdf.to_crs(LATLON_CRS)
-
- if m is None:
- mid_i = int(len(road_gdf) / 2)
- mid_coord = road_gdf.iloc[mid_i].geometry.coords[0]
-
- m = folium.Map(location=[mid_coord[1], mid_coord[0]], zoom_start=11)
-
- for i, road in enumerate(road_gdf.itertuples()):
- folium.PolyLine(
- [(lat, lon) for lon, lat in road.geometry.coords],
- color=line_color,
- tooltip=i,
- weight=line_weight,
- opacity=line_opacity,
- ).add_to(m)
-
- return m
diff --git a/mappymatch/utils/plot/__init__.py b/mappymatch/utils/plot/__init__.py
new file mode 100644
index 0000000..79fca6c
--- /dev/null
+++ b/mappymatch/utils/plot/__init__.py
@@ -0,0 +1,22 @@
+"""
+Plot module for mappymatch.
+
+This module provides plotting utilities for geofences, traces, matches, maps, paths, and trajectory segments.
+"""
+
+from mappymatch.utils.plot.geofence import plot_geofence
+from mappymatch.utils.plot.map import plot_map
+from mappymatch.utils.plot.matches import plot_match_distances, plot_matches
+from mappymatch.utils.plot.path import plot_path
+from mappymatch.utils.plot.trace import plot_trace
+from mappymatch.utils.plot.trajectory_segment import plot_trajectory_segment
+
+__all__ = [
+ "plot_geofence",
+ "plot_trace",
+ "plot_matches",
+ "plot_match_distances",
+ "plot_map",
+ "plot_path",
+ "plot_trajectory_segment",
+]
diff --git a/mappymatch/utils/plot/geofence.py b/mappymatch/utils/plot/geofence.py
new file mode 100644
index 0000000..e39d626
--- /dev/null
+++ b/mappymatch/utils/plot/geofence.py
@@ -0,0 +1,29 @@
+from typing import Optional
+
+import folium
+
+from mappymatch.constructs.geofence import Geofence
+from mappymatch.utils.crs import LATLON_CRS
+
+
+def plot_geofence(geofence: Geofence, m: Optional[folium.Map] = None):
+ """
+ Plot geofence.
+
+ Args:
+ geofence: The geofence to plot
+ m: the folium map to plot on
+
+ Returns:
+ The updated folium map with the geofence.
+ """
+ if not geofence.crs == LATLON_CRS:
+ raise NotImplementedError("can currently only plot a geofence with lat lon crs")
+
+ if not m:
+ c = geofence.geometry.centroid.coords[0]
+ m = folium.Map(location=[c[1], c[0]], zoom_start=11)
+
+ folium.GeoJson(geofence.geometry).add_to(m)
+
+ return m
diff --git a/mappymatch/utils/plot/map.py b/mappymatch/utils/plot/map.py
new file mode 100644
index 0000000..4fb47ae
--- /dev/null
+++ b/mappymatch/utils/plot/map.py
@@ -0,0 +1,76 @@
+from typing import Optional
+
+import folium
+import geopandas as gpd
+import pandas as pd
+
+from mappymatch.constructs.road import RoadId
+from mappymatch.maps.nx.nx_map import NxMap
+from mappymatch.utils.crs import LATLON_CRS
+
+
+def plot_map(tmap: NxMap, m: Optional[folium.Map] = None, highlight: bool = False):
+ """
+ Plot the roads on an NxMap.
+
+ Args:
+ tmap: The Nxmap to plot.
+ m: the folium map to add to
+ highlight: Whether to enable hover highlighting and popups (default: False)
+
+ Returns:
+ The folium map with the roads plotted.
+ """
+
+ # TODO make this generic to all map types, not just NxMap
+ roads = list(tmap.g.edges(data=True, keys=True))
+ road_data = []
+ for u, v, key, data in roads:
+ road_id = RoadId(start=u, end=v, key=key)
+ data_copy = data.copy()
+ data_copy["road_id"] = road_id
+ road_data.append(data_copy)
+
+ road_df = pd.DataFrame(road_data)
+ gdf = gpd.GeoDataFrame(road_df, geometry=road_df[tmap._geom_key], crs=tmap.crs)
+ if gdf.crs != LATLON_CRS:
+ gdf = gdf.to_crs(LATLON_CRS)
+
+ if not m:
+ c = gdf.iloc[int(len(gdf) / 2)].geometry.centroid.coords[0]
+ m = folium.Map(location=[c[1], c[0]], zoom_start=11)
+
+ # Convert road_id to string for GeoJSON compatibility
+ gdf["road_id_str"] = gdf["road_id"].astype(str)
+
+ # Create GeoJson layer with optional popup and highlighting
+ if highlight:
+ popup = folium.GeoJsonPopup(fields=["road_id_str"])
+ tooltip = folium.GeoJsonTooltip(fields=["road_id_str"])
+ folium.GeoJson(
+ gdf.to_json(),
+ style_function=lambda x: {
+ "color": "red",
+ "weight": 3,
+ "opacity": 0.7,
+ },
+ highlight_function=lambda x: {
+ "color": "yellow",
+ "weight": 6,
+ "opacity": 1.0,
+ },
+ popup=popup,
+ tooltip=tooltip,
+ popup_keep_highlighted=True,
+ ).add_to(m)
+ else:
+ folium.GeoJson(
+ gdf.to_json(),
+ style_function=lambda x: {
+ "color": "red",
+ "weight": 3,
+ "opacity": 0.7,
+ },
+ ).add_to(m)
+
+ return m
diff --git a/mappymatch/utils/plot/matches.py b/mappymatch/utils/plot/matches.py
new file mode 100644
index 0000000..658c4de
--- /dev/null
+++ b/mappymatch/utils/plot/matches.py
@@ -0,0 +1,100 @@
+from typing import List, Union
+
+import folium
+import geopandas as gpd
+import matplotlib.pyplot as plt
+import pandas as pd
+from shapely.geometry import Point
+
+from mappymatch.constructs.match import Match
+from mappymatch.matchers.matcher_interface import MatchResult
+from mappymatch.utils.crs import LATLON_CRS, XY_CRS
+
+
+def plot_matches(matches: Union[MatchResult, List[Match]], crs=XY_CRS):
+ """
+ Plots a trace and the relevant matches on a folium map.
+
+ Args:
+ matches: A list of matches or a MatchResult.
+ crs: what crs to plot in. Defaults to XY_CRS.
+
+ Returns:
+ A folium map with trace and matches plotted.
+ """
+ if isinstance(matches, MatchResult):
+ matches = matches.matches
+
+ def _match_to_road(m):
+ """Private function."""
+ d = {"road_id": m.road.road_id, "geom": m.road.geom}
+ return d
+
+ def _match_to_coord(m):
+ """Private function."""
+ d = {
+ "road_id": m.road.road_id,
+ "geom": Point(m.coordinate.x, m.coordinate.y),
+ "distance": m.distance,
+ }
+
+ return d
+
+ road_df = pd.DataFrame([_match_to_road(m) for m in matches if m.road])
+ road_df = road_df.loc[road_df.road_id.shift() != road_df.road_id]
+ road_gdf = gpd.GeoDataFrame(road_df, geometry=road_df.geom, crs=crs).drop(
+ columns=["geom"]
+ )
+ road_gdf = road_gdf.to_crs(LATLON_CRS)
+
+ coord_df = pd.DataFrame([_match_to_coord(m) for m in matches if m.road])
+
+ coord_gdf = gpd.GeoDataFrame(coord_df, geometry=coord_df.geom, crs=crs).drop(
+ columns=["geom"]
+ )
+ coord_gdf = coord_gdf.to_crs(LATLON_CRS)
+
+ mid_i = int(len(coord_gdf) / 2)
+ mid_coord = coord_gdf.iloc[mid_i].geometry
+
+ fmap = folium.Map(location=[mid_coord.y, mid_coord.x], zoom_start=11)
+
+ for coord in coord_gdf.itertuples():
+ folium.Circle(
+ location=(coord.geometry.y, coord.geometry.x),
+ radius=5,
+ tooltip=f"road_id: {coord.road_id}\ndistance: {coord.distance}",
+ ).add_to(fmap)
+
+ for road in road_gdf.itertuples():
+ folium.PolyLine(
+ [(lat, lon) for lon, lat in road.geometry.coords],
+ color="red",
+ tooltip=road.road_id,
+ ).add_to(fmap)
+
+ return fmap
+
+
+def plot_match_distances(matches: MatchResult):
+ """
+ Plot the points deviance from known roads with matplotlib.
+
+ Args:
+ matches (MatchResult): The coordinates of guessed points in the area in the form of a MatchResult object.
+ """
+
+ y = [
+ m.distance for m in matches.matches
+ ] # y contains distances to the expected line for all of the matches which will be plotted on the y-axis.
+ x = [
+ i for i in range(0, len(y))
+ ] # x contains placeholder values for every y value (distance measurement) along the x-axis.
+
+ plt.figure(figsize=(15, 7))
+ plt.autoscale(enable=True)
+ plt.scatter(x, y)
+ plt.title("Distance To Nearest Road")
+ plt.ylabel("Meters")
+ plt.xlabel("Point Along The Path")
+ plt.show()
diff --git a/mappymatch/utils/plot/path.py b/mappymatch/utils/plot/path.py
new file mode 100644
index 0000000..cf7e6ff
--- /dev/null
+++ b/mappymatch/utils/plot/path.py
@@ -0,0 +1,50 @@
+from typing import List, Optional
+
+import folium
+import geopandas as gpd
+import pandas as pd
+from pyproj import CRS
+
+from mappymatch.constructs.road import Road
+from mappymatch.utils.crs import LATLON_CRS
+
+
+def plot_path(
+ path: List[Road],
+ crs: CRS,
+ m: Optional[folium.Map] = None,
+ line_color="red",
+ line_weight=10,
+ line_opacity=0.8,
+):
+ """
+ Plot a list of roads.
+
+ Args:
+ path: The path to plot.
+ crs: The crs of the path.
+ m: The folium map to add to.
+ line_color: The color of the line.
+ line_weight: The weight of the line.
+ line_opacity: The opacity of the line.
+ """
+ road_df = pd.DataFrame([{"geom": r.geom} for r in path])
+ road_gdf = gpd.GeoDataFrame(road_df, geometry=road_df.geom, crs=crs)
+ road_gdf = road_gdf.to_crs(LATLON_CRS)
+
+ if m is None:
+ mid_i = int(len(road_gdf) / 2)
+ mid_coord = road_gdf.iloc[mid_i].geometry.coords[0]
+
+ m = folium.Map(location=[mid_coord[1], mid_coord[0]], zoom_start=11)
+
+ for i, road in enumerate(road_gdf.itertuples()):
+ folium.PolyLine(
+ [(lat, lon) for lon, lat in road.geometry.coords],
+ color=line_color,
+ tooltip=i,
+ weight=line_weight,
+ opacity=line_opacity,
+ ).add_to(m)
+
+ return m
diff --git a/mappymatch/utils/plot/trace.py b/mappymatch/utils/plot/trace.py
new file mode 100644
index 0000000..00a8571
--- /dev/null
+++ b/mappymatch/utils/plot/trace.py
@@ -0,0 +1,49 @@
+from typing import Optional
+
+import folium
+
+from mappymatch.constructs.trace import Trace
+from mappymatch.utils.crs import LATLON_CRS
+
+
+def plot_trace(
+ trace: Trace,
+ m: Optional[folium.Map] = None,
+ point_color: str = "black",
+ line_color: Optional[str] = "green",
+):
+ """
+ Plot a trace.
+
+ Args:
+ trace: The trace.
+ m: the folium map to plot on
+ point_color: The color the points will be plotted in.
+ line_color: The color for lines. If None, no lines will be plotted.
+
+ Returns:
+ An updated folium map with a plot of trace.
+ """
+
+ if not trace.crs == LATLON_CRS:
+ trace = trace.to_crs(LATLON_CRS)
+
+ if not m:
+ mid_coord = trace.coords[int(len(trace) / 2)]
+ m = folium.Map(location=[mid_coord.y, mid_coord.x], zoom_start=11)
+
+ for i, c in enumerate(trace.coords):
+ folium.Circle(
+ location=(c.y, c.x),
+ radius=5,
+ color=point_color,
+ tooltip=str(i),
+ fill=True,
+ fill_opacity=0.8,
+ fill_color=point_color,
+ ).add_to(m)
+
+ if line_color is not None:
+ folium.PolyLine([(p.y, p.x) for p in trace.coords], color=line_color).add_to(m)
+
+ return m
diff --git a/mappymatch/utils/plot/trajectory_segment.py b/mappymatch/utils/plot/trajectory_segment.py
new file mode 100644
index 0000000..3510423
--- /dev/null
+++ b/mappymatch/utils/plot/trajectory_segment.py
@@ -0,0 +1,136 @@
+from typing import Optional
+
+import folium
+import geopandas as gpd
+import pandas as pd
+from shapely.geometry import Point
+
+from mappymatch.matchers.lcss.constructs import TrajectorySegment
+from mappymatch.utils.crs import LATLON_CRS
+
+
+def plot_trajectory_segment(
+ segment: TrajectorySegment,
+ m: Optional[folium.Map] = None,
+ trace_point_color: str = "black",
+ path_line_color: str = "red",
+ path_line_weight: int = 10,
+ path_line_opacity: float = 0.8,
+ show_matches: bool = True,
+ match_point_color: str = "blue",
+ show_cutting_points: bool = True,
+ cutting_point_color: str = "orange",
+):
+ """
+ Plot a TrajectorySegment showing the trace, path, matches, and cutting points.
+
+ Args:
+ segment: The TrajectorySegment to plot.
+ m: The folium map to plot on. If None, a new map will be created.
+ trace_point_color: The color for trace points.
+ path_line_color: The color for the path line.
+ path_line_weight: The weight of the path line.
+ path_line_opacity: The opacity of the path line.
+ show_matches: Whether to show matched points.
+ match_point_color: The color for matched points.
+ show_cutting_points: Whether to show cutting points.
+ cutting_point_color: The color for cutting points.
+
+ Returns:
+ A folium map with the trajectory segment plotted.
+ """
+ trace = segment.trace
+ path = segment.path
+ matches = segment.matches
+ cutting_points = segment.cutting_points
+
+ original_crs = trace.crs
+
+ if trace.crs != LATLON_CRS:
+ trace = trace.to_crs(LATLON_CRS)
+
+ # Create map if not provided
+ if m is None:
+ mid_coord = trace.coords[int(len(trace) / 2)]
+ m = folium.Map(location=[mid_coord.y, mid_coord.x], zoom_start=13)
+
+ # Plot trace points
+ for i, c in enumerate(trace.coords):
+ folium.Circle(
+ location=(c.y, c.x),
+ radius=5,
+ color=trace_point_color,
+ tooltip=f"Trace Point {i}",
+ fill=True,
+ fill_opacity=0.8,
+ fill_color=trace_point_color,
+ ).add_to(m)
+
+ # Plot path (roads) if available
+ if path:
+ road_df = pd.DataFrame([{"road_id": r.road_id, "geom": r.geom} for r in path])
+ road_gdf = gpd.GeoDataFrame(
+ road_df, geometry=road_df.geom, crs=original_crs
+ ).drop(columns=["geom"])
+ road_gdf = road_gdf.to_crs(LATLON_CRS)
+
+ for road in road_gdf.itertuples():
+ folium.PolyLine(
+ [(lat, lon) for lon, lat in road.geometry.coords],
+ color=path_line_color,
+ tooltip=f"Road ID: {road.road_id}",
+ weight=path_line_weight,
+ opacity=path_line_opacity,
+ ).add_to(m)
+
+ # Plot matches if requested
+ if show_matches and matches:
+ for i, match in enumerate(matches):
+ if match.road:
+ coord = match.coordinate
+ if original_crs != LATLON_CRS:
+ # Convert coordinate to lat/lon
+ coord_gdf = gpd.GeoDataFrame(
+ [{"geom": Point(coord.x, coord.y)}],
+ geometry="geom",
+ crs=original_crs,
+ )
+ coord_gdf = coord_gdf.to_crs(LATLON_CRS)
+ coord_point = coord_gdf.iloc[0].geometry
+ y, x = coord_point.y, coord_point.x
+ else:
+ y, x = coord.y, coord.x
+
+ folium.CircleMarker(
+ location=(y, x),
+ radius=7,
+ color=match_point_color,
+ tooltip=f"Match {i}
Road ID: {match.road.road_id}
Distance: {match.distance:.2f}m",
+ fill=True,
+ fill_opacity=0.6,
+ fill_color=match_point_color,
+ ).add_to(m)
+
+ # Plot cutting points if requested
+ if show_cutting_points and cutting_points:
+ for cp in cutting_points:
+ coord = trace.coords[cp.trace_index]
+ folium.CircleMarker(
+ location=(coord.y, coord.x),
+ radius=10,
+ color=cutting_point_color,
+ tooltip=f"Cutting Point at index {cp.trace_index}",
+ fill=True,
+ fill_opacity=0.9,
+ fill_color=cutting_point_color,
+ ).add_to(m)
+
+ # Add segment score to map if available
+ if segment.score > 0:
+ folium.Marker(
+ location=(trace.coords[0].y, trace.coords[0].x),
+ popup=f"Segment Score: {segment.score:.4f}",
+ icon=folium.Icon(color="lightgray", icon="info-sign"),
+ ).add_to(m)
+
+ return m
diff --git a/pixi.lock b/pixi.lock
index 65c2f43..15c7d94 100644
--- a/pixi.lock
+++ b/pixi.lock
@@ -1501,8 +1501,8 @@ packages:
timestamp: 1727963202283
- pypi: ./
name: mappymatch
- version: 0.6.1
- sha256: c8873537cd7f6b95f5de72738ec234e87c154e9208f958737768bcbbb8456a96
+ version: 0.7.0
+ sha256: ce0afb10e4ff436fc2331337d9f46caa26533faf116febb658103eb203f6e7c8
requires_dist:
- folium>=0.20,<1
- geopandas>=1,<2
@@ -1537,7 +1537,7 @@ packages:
- pytest>=9,<10 ; extra == 'tests'
- ruff>=0.14,<1 ; extra == 'tests'
- types-requests ; extra == 'tests'
- requires_python: '>=3.10,<3.14'
+ requires_python: '>=3.10'
editable: true
- pypi: https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl
name: markdown-it-py
diff --git a/tests/test_disconnected_components.py b/tests/test_disconnected_components.py
new file mode 100644
index 0000000..2053455
--- /dev/null
+++ b/tests/test_disconnected_components.py
@@ -0,0 +1,260 @@
+from unittest import TestCase
+from unittest.mock import Mock
+
+import networkx as nx
+import osmnx as ox
+import geopandas as gpd
+from shapely.geometry import LineString, Point
+
+from mappymatch.constructs.coordinate import Coordinate
+from mappymatch.constructs.road import Road, RoadId
+from mappymatch.constructs.trace import Trace
+from mappymatch.maps.nx.nx_map import NxMap
+from mappymatch.maps.nx.readers.osm_readers import (
+ NetworkType,
+ parse_osmnx_graph,
+)
+from mappymatch.matchers.lcss.constructs import TrajectorySegment
+from mappymatch.matchers.lcss.ops import join_segment
+from mappymatch.utils.crs import XY_CRS
+from tests import get_test_dir
+
+
+class TestDisconnectedComponents(TestCase):
+ """Test handling of disconnected graph components"""
+
+ def setUp(self):
+ """Create a simple disconnected graph for testing"""
+ # Create a graph with two disconnected components
+ self.g = nx.MultiDiGraph()
+
+ # Component 1: Simple path from 0 -> 1 -> 2
+ self.g.add_edge(
+ 0,
+ 1,
+ 0,
+ geometry=LineString([(0, 0), (1, 0)]),
+ kilometers=1.0,
+ travel_time=60.0,
+ metadata={},
+ )
+ self.g.add_edge(
+ 1,
+ 2,
+ 0,
+ geometry=LineString([(1, 0), (2, 0)]),
+ kilometers=1.0,
+ travel_time=60.0,
+ metadata={},
+ )
+
+ # Component 2: Separate path from 10 -> 11 -> 12
+ self.g.add_edge(
+ 10,
+ 11,
+ 0,
+ geometry=LineString([(10, 10), (11, 10)]),
+ kilometers=1.0,
+ travel_time=60.0,
+ metadata={},
+ )
+ self.g.add_edge(
+ 11,
+ 12,
+ 0,
+ geometry=LineString([(11, 10), (12, 10)]),
+ kilometers=1.0,
+ travel_time=60.0,
+ metadata={},
+ )
+
+ # Add required graph attributes
+ self.g.graph["crs"] = XY_CRS
+ self.g.graph["distance_weight"] = "kilometers"
+ self.g.graph["time_weight"] = "travel_time"
+ self.g.graph["geometry_key"] = "geometry"
+
+ def test_parse_osmnx_graph_keeps_all_components(self):
+ """Test that parse_osmnx_graph can keep all components when filter_to_largest_component=False"""
+ # Load test graph
+ gfile = get_test_dir() / "test_assets" / "osmnx_drive_graph.graphml"
+ osmnx_graph = ox.load_graphml(gfile)
+
+ # Parse without filtering
+ cleaned_graph = parse_osmnx_graph(
+ osmnx_graph, NetworkType.DRIVE, filter_to_largest_component=False
+ )
+
+ # Graph should have edges and basic structure
+ self.assertGreater(len(cleaned_graph.edges), 0)
+ self.assertEqual(cleaned_graph.graph["network_type"], NetworkType.DRIVE.value)
+
+ def test_parse_osmnx_graph_filters_to_largest(self):
+ """Test that parse_osmnx_graph filters to largest component by default"""
+ # Load test graph
+ gfile = get_test_dir() / "test_assets" / "osmnx_drive_graph.graphml"
+ osmnx_graph = ox.load_graphml(gfile)
+
+ # Parse with filtering (default behavior)
+ cleaned_graph = parse_osmnx_graph(
+ osmnx_graph, NetworkType.DRIVE, filter_to_largest_component=True
+ )
+
+ # Graph should be strongly connected
+ self.assertTrue(nx.is_strongly_connected(cleaned_graph))
+
+ def test_shortest_path_returns_empty_for_disconnected_nodes(self):
+ """Test that shortest_path returns empty list when no path exists"""
+ # Create NxMap from disconnected graph
+ nx_map = NxMap(self.g)
+
+ # Create coordinates in different components
+ origin = Coordinate(None, Point(0.5, 0), XY_CRS)
+ destination = Coordinate(None, Point(10.5, 10), XY_CRS)
+
+ # Should return empty list instead of raising exception
+ path = nx_map.shortest_path(origin, destination)
+
+ self.assertEqual(path, [])
+
+ def test_shortest_path_works_within_component(self):
+ """Test that shortest_path works normally within a connected component"""
+ # Create NxMap from disconnected graph
+ nx_map = NxMap(self.g)
+
+ # Create coordinates in the same component
+ origin = Coordinate(None, Point(0.5, 0), XY_CRS)
+ destination = Coordinate(None, Point(1.5, 0), XY_CRS)
+
+ # Should find a path
+ path = nx_map.shortest_path(origin, destination)
+
+ self.assertGreater(len(path), 0)
+ self.assertIsInstance(path[0], Road)
+
+ def test_lcss_merge_handles_empty_path(self):
+ """Test that LCSS merge handles empty path between disconnected segments"""
+ # Create mock road map
+ mock_map = Mock(spec=NxMap)
+ mock_map.crs = XY_CRS
+
+ # Mock shortest_path to return empty list (disconnected components)
+ mock_map.shortest_path.return_value = []
+
+ # Create mock trajectory segments with paths
+ coords1 = [
+ Coordinate(None, Point(0, 0), XY_CRS),
+ Coordinate(None, Point(1, 0), XY_CRS),
+ ]
+ coords2 = [
+ Coordinate(None, Point(10, 10), XY_CRS),
+ Coordinate(None, Point(11, 10), XY_CRS),
+ ]
+
+ gdf1 = gpd.GeoDataFrame(
+ {"geometry": [c.geom for c in coords1]}, crs=XY_CRS, index=[0, 1]
+ )
+ gdf2 = gpd.GeoDataFrame(
+ {"geometry": [c.geom for c in coords2]}, crs=XY_CRS, index=[2, 3]
+ )
+
+ trace1 = Trace(gdf1)
+ trace2 = Trace(gdf2)
+
+ road1 = Road(
+ RoadId(0, 1, 0),
+ LineString([(0, 0), (1, 0)]),
+ metadata={},
+ )
+ road2 = Road(
+ RoadId(10, 11, 0),
+ LineString([(10, 10), (11, 10)]),
+ metadata={},
+ )
+
+ segment_a = TrajectorySegment(trace=trace1, path=[road1])
+ segment_b = TrajectorySegment(trace=trace2, path=[road2])
+
+ # Merge segments using imported join_segment function
+ result = join_segment(mock_map, segment_a, segment_b)
+
+ # Should concatenate paths without intermediate routing
+ self.assertEqual(len(result.path), 2)
+ self.assertEqual(result.path[0].road_id, road1.road_id)
+ self.assertEqual(result.path[1].road_id, road2.road_id)
+
+ def test_lcss_merge_handles_connected_path(self):
+ """Test that LCSS merge works normally when routing succeeds"""
+ # Create mock road map
+ mock_map = Mock(spec=NxMap)
+ mock_map.crs = XY_CRS
+
+ # Mock shortest_path to return a connecting road
+ connecting_road = Road(
+ RoadId(1, 2, 0),
+ LineString([(1, 0), (2, 0)]),
+ metadata={},
+ )
+ mock_map.shortest_path.return_value = [connecting_road]
+
+ # Create mock trajectory segments
+ coords1 = [
+ Coordinate(None, Point(0, 0), XY_CRS),
+ Coordinate(None, Point(1, 0), XY_CRS),
+ ]
+ coords2 = [
+ Coordinate(None, Point(2, 0), XY_CRS),
+ Coordinate(None, Point(3, 0), XY_CRS),
+ ]
+
+ gdf1 = gpd.GeoDataFrame(
+ {"geometry": [c.geom for c in coords1]}, crs=XY_CRS, index=[0, 1]
+ )
+ gdf2 = gpd.GeoDataFrame(
+ {"geometry": [c.geom for c in coords2]}, crs=XY_CRS, index=[2, 3]
+ )
+
+ trace1 = Trace(gdf1)
+ trace2 = Trace(gdf2)
+
+ road1 = Road(
+ RoadId(0, 1, 0),
+ LineString([(0, 0), (1, 0)]),
+ metadata={},
+ )
+ road3 = Road(
+ RoadId(2, 3, 0),
+ LineString([(2, 0), (3, 0)]),
+ metadata={},
+ )
+
+ segment_a = TrajectorySegment(trace=trace1, path=[road1])
+ segment_b = TrajectorySegment(trace=trace2, path=[road3])
+
+ # Merge segments using imported join_segment function
+ result = join_segment(mock_map, segment_a, segment_b)
+
+ # Should include connecting road
+ self.assertEqual(len(result.path), 3)
+ self.assertEqual(result.path[0].road_id, road1.road_id)
+ self.assertEqual(result.path[1].road_id, connecting_road.road_id)
+ self.assertEqual(result.path[2].road_id, road3.road_id)
+
+ def test_networkx_no_path_exception_handling(self):
+ """Test that NetworkXNoPath exception is caught and handled"""
+ # Create NxMap from disconnected graph
+ nx_map = NxMap(self.g)
+
+ # Verify graph is disconnected
+ self.assertFalse(nx.is_strongly_connected(self.g))
+
+ # Try to find path between disconnected components
+ origin = Coordinate(None, Point(0.1, 0), XY_CRS)
+ destination = Coordinate(None, Point(10.1, 10), XY_CRS)
+
+ # Should not raise exception, should return empty list
+ try:
+ path = nx_map.shortest_path(origin, destination)
+ self.assertEqual(path, [])
+ except nx.NetworkXNoPath:
+ self.fail("NetworkXNoPath exception should be caught and handled")