Source code for flusstools.geotools.shortest_path

"""This module is inspired by Michael Diener - read more at
 https://github.com/mdiener21/python-geospatial-analysis-cookbook/tree/master/ch08

Example use: ``create_shortest_path(shp_file_name, start_node_id, end_node_id)``
"""

import json

import networkx as nx
import numpy as np
import shapefile
from shapely.geometry import LineString, MultiPoint


[docs] def build_graph_from_lines(line_shp_name): """Builds an undirected graph from the line features of a shapefile. Replaces the removed ``networkx.read_shp``: every line feature becomes an edge between its first and last vertex. Edges carry a ``distance`` weight (summed segment length) and a ``Json`` attribute holding the full vertex coordinates, as expected by :func:`get_path`. Args: line_shp_name (str): Input line shapefile name. Returns: networkx.Graph: Undirected graph of the line network. """ reader = shapefile.Reader(line_shp_name) graph = nx.Graph() for shape in reader.shapes(): points = [tuple(pt) for pt in shape.points] if len(points) < 2: continue distance = float( np.sum(np.sqrt(np.sum(np.diff(np.array(points), axis=0) ** 2, axis=1))) ) graph.add_edge( points[0], points[-1], distance=distance, Json=json.dumps({"coordinates": [list(pt) for pt in points]}), ) return graph
[docs] def create_shortest_path(line_shp_name, start_node_id, end_node_id): """Calculates the shortest path from a network of lines. Args: line_shp_name (str): Input shapefile name start_node_id (int): Start node ID end_node_id (int): End node ID Returns: None: Creates a graph of nodes (coordinate pairs) connecting a start node with an end node in the defined ``line_shp_name``. """ # load shapefile into an undirected graph graph = build_graph_from_lines(line_shp_name) # with not all graphs connected, take the largest connected subgraph largest_component = max(nx.connected_components(graph), key=len) nx_list_subgraph = graph.subgraph(largest_component) # get all the nodes in the network nx_nodes = np.array(list(nx_list_subgraph.nodes())) # output the nodes to a GeoJSON file network_nodes = MultiPoint(nx_nodes) write_geojson( line_shp_name.split(".shp")[0] + "_nodes.geojson", network_nodes.__geo_interface__ ) # Compute the shortest path. Dijkstra's algorithm. nx_short_path = nx.shortest_path( nx_list_subgraph, source=tuple(nx_nodes[start_node_id]), target=tuple(nx_nodes[end_node_id]), weight="distance", ) # create numpy array of coordinates representing result path nx_array_path = get_full_path(nx_short_path, nx_list_subgraph) # convert numpy array to Shapely Linestring shortest_path = LineString(nx_array_path) write_geojson( line_shp_name.split(".shp")[0] + "_Xpath.geojson", shortest_path.__geo_interface__ )
[docs] def get_path(n0, n1, nx_list_subgraph): """Get path between nodes ``n0`` and ``n1``. Args: n0 (int): Node 1 n1 (int): Node 2 nx_list_subgraph (list):(see create shortest path) Returns: ndarray: An array of point coordinates along the line linking these two nodes. """ return np.array(json.loads(nx_list_subgraph[n0][n1]["Json"])["coordinates"])
[docs] def get_full_path(path, nx_list_subgraph): """Creates a numpy array of the line result. Args: path (str): Result of ``nx.shortest_path`` nx_list_subgraph (list): See ``create_shortest path`` function Returns: ndarray: Coordinate pairs along a path. """ p_list = [] curp = None for i in range(len(path) - 1): p = get_path(path[i], path[i + 1], nx_list_subgraph) if curp is None: curp = p if np.sum((p[0] - curp) ** 2) > np.sum((p[-1] - curp) ** 2): p = p[::-1, :] p_list.append(p) curp = p[-1] return np.vstack(p_list)
[docs] def write_geojson(outfilename, indata): """Creates a new GeoJSON file Args> outfilename (str): Name for the output file indata (array): Array to write tyo the geojson file. Returns: Creates a new GeoJSON file. """ with open(outfilename, "w") as file_out: file_out.write(json.dumps(indata))