1.思路:
1.抽取路网(管网)各节点,点去重+点线关联;---根据数据及业务场景可选择线起始点/起中终点/全部节点;
2.构建路网(管网)各节点KDtree;
3.使用邻接矩阵存储点连通性;--不通:-1、自身点:0、相连:1(或权重/距离--可根据业务场景设置);
4.基于KDtree分别捕获到 [ 出发点 , 到达点] 最近的一个捕捉节点(k=1);
5.根据捕捉节点进行图的广度优先遍历--结合迪杰克斯特拉(Dijkstra)算法思路--因为节点重复,根据节点键值取存储线id后,需要根据前后节点筛选线;
6.前进方向确定(线遍历后排列顺序+线坐标点集合顺序)--根据返回数据格式确定;
7.路书制作( 八方位-["北", "东北", "东", "东南", "南", "西南", "西", "西北", "原点", ]+ 下一节点方向(左/右/直行/掉头))--方向-八方位-坐标大小/ 方向-左右-向量叉乘(注意:当前代码因数据区域大小原因,已忽略椭球体及地理坐标影响因素);
8.结果输出(在路书中增加出发点/到达点与最短路径的关系--详细导航信息)--当前输出json结构参考了高德最短路径分析api返回数据结构;
9.效率优化--(将数据提前加载进内存/api调用)--当前框架使用python/flask,一秒内返回结果(当前测试数据大概0.04-0.5)
2.实现
1.python
#!/usr/bin/python3
# -*- coding:utf-8 -*-
'''
####################################################################################
#版本信息
##最近修改时间:20220331
##最近修改人:慢慢
####################################################################################
#功能说明
##陇南路径分析
####################################################################################
#返回结果
##返回格式-dict
##运行成功-
##运行失败-{"code": 404, "type": "fail", "msg": "服务错误", "box": "", "data": "需要重启"}
####################################################################################
#使用说明
####################################################################################
#更新历史
##20220421 新增最短路径导航详细信息 路书功能开发-(路口左右转、东南西北八个方向、距离)
####################################################################################
'''
import json
from shapely.geometry import LineString
from shapely.geometry import mapping as geommapping
# from lib.geotools import Features, Intersect, Clip, ToPolygons, CreatGrid, ToPoints
try:
from lib.flaskservices.config_app import __keyself__
except ImportError:
__keyself__ = {"file_line": "/data/l_road.shp",
"building": "/data/r_building.shp", "grid": "/data/r_grid.shp",
"people": "/data/r_people.csv"}
from geopandas import GeoSeries, GeoDataFrame
__data__type = (list, tuple)
__mapdata_type__ = (GeoSeries, GeoDataFrame)
__kself__ = list(__keyself__.keys())[0]
def GetRoadBookCourse(pointa, pointb):
# 方向-八方位-坐标大小
txt = ["北", "东北", "东", "东南", "南", "西南", "西", "西北", "原点", ]
x_p, y_p = round(pointb[0], 4) - round(pointa[0], 4), round(pointb[1], 4) - round(pointa[1], 4)
if x_p == 0 and y_p > 0:
course = txt[0]
elif x_p > 0 and y_p > 0:
course = txt[1]
elif x_p > 0 and y_p == 0:
course = txt[2]
elif x_p > 0 and y_p < 0:
course = txt[3]
elif x_p == 0 and y_p < 0:
course = txt[4]
elif x_p < 0 and y_p < 0:
course = txt[5]
elif x_p < 0 and y_p == 0:
course = txt[6]
elif x_p < 0 and y_p > 0:
course = txt[7]
else:
course = txt[-1]
return course
def GetRoadBookLR(origin, pointa, pointb):
# 方向-左右-向量叉乘
x_o, y_o = round(pointa[0], 2) - round(origin[0], 2), round(pointa[1], 2) - round(origin[1], 2)
x_l, y_l = round(pointb[0], 2) - round(origin[0], 2), round(pointb[1], 2) - round(origin[1], 2)
k = (x_o * y_l) - (x_l * y_o)
if k < 0:
lr = "右转"
elif k > 0:
lr = "左转"
else:
# 左右-向量点乘
k = (x_o * x_l) + (y_o * y_l)
if k < 0:
lr = "掉头"
else:
lr = "直行"
return lr
def GetNodeInd(kdtree, point, k=1):
try:
__distances, __ind = kdtree.query(point, k=k)
return True, __ind
except:
return False, "根据输入点获取最近线失败(经度,纬度) " + "%s" % point
# 因为节点重复,根据节点键值取存储线id后,需要根据前后节点筛选线
def GetNodeIdLineId(poiss_ind, adjacency_matrix, node_dict, node_key):
# 广度优先遍历--结合迪杰克斯特拉(Dijkstra)算法思路
try:
loop_will = [poiss_ind[0]]
loop_ed = set()
loop_done = []
loop = 0
loop_stdout = [poiss_ind[1]]
stdout = []
while loop_will:
loop = loop_will.pop(0)
loop_once = []
loop_ed.add(loop)
for i, lind_id in enumerate(adjacency_matrix[loop, ...]):
if lind_id > 0:
loop_once.append(i)
if i == poiss_ind[1]:
loop_will.clear()
loop_stdout.append(loop)
break
if i not in loop_ed:
loop_will.append(i)
else:
loop_done.append([loop, loop_once])
loop_done.reverse()
for key in loop_done:
if loop in key[1]:
loop_stdout.append(key[0])
loop = key[0]
if loop == poiss_ind[0]:
loop_stdout.reverse()
break
#################################################################################
# 可运行,可用于功能扩展
# while loop:
# for key in keys:
# if loop in loop_done[key]:
# loop_stdout.append(key)
# loop = key
# if loop == poiss_ind[0]:
# # loop_stdout.append(key)
# loop_stdout.reverse()
# loop = 0
# break
#################################################################################
# 确定前进方向
for key in loop_stdout:
stdout.extend(node_dict[node_key[key]])
else:
stdout = list(set([v for v in stdout if stdout.count(v) > 1]))
# 根据图遍历结果数据特性--间隔取值,逆间隔取值--转换到最终结果
if len(stdout) > 2:
stdout = stdout[1::2] + [stdout[-1]] + stdout[-3::-2]
except Exception as __ex:
return False, "路径搜索失败:GetLineId()" + "%s" % __ex
else:
return True, (loop_stdout, stdout)
def SetOutFormatGD(poiss, nodeid_lineid, node, line, to_crs=4544):
def GetBook(poi, p_l, roadname, length):
if not roadname:
roadname = "无名道路"
data["points"].extend(p_l)
tmp = dict()
tmp["instruction"] = "tGetRoadBookLRt.沿" + roadname + GetRoadBookCourse(poi[0],
poi[1]) + "方向前进 %s" % round(
length, 2) + " 米,tGetRoadBookLRNextt;"
tmp["points"] = p_l
data["steps"].append(tmp)
data = {"steps": [], "points": []}
for i, id in enumerate(nodeid_lineid[1]):
points = list(geommapping(line["geometry"][id])["coordinates"])
if node[nodeid_lineid[0][i]] != points[0]:
points.reverse()
if i == 0:
poi = [list(poiss[0]), points[0]]
p_l = poi
roadname = "出发点到最近路口的"
length = GeoDataFrame(crs=to_crs, geometry=[LineString(poi)]).length[0] * 100000
else:
poi = [points[0], points[-1]]
p_l = points
roadname = str(line["Name"][id]).replace("None", "")
length = line["Shape_Leng"][id]
GetBook(poi, p_l, roadname, length)
else:
poi = [points[-1], list(poiss[1])]
p_l = poi
roadname = "最近路口到达点的"
length = GeoDataFrame(crs=to_crs, geometry=[LineString(poi)]).length[0] * 100000
GetBook(poi, p_l, roadname, length)
# 路书
for i in range(len(data["steps"])):
if i == 0:
# 正北方向
origin = [list(poiss[0])[0], list(poiss[0])[1] - 1]
pointa = data["steps"][i]["points"][0]
pointb = data["steps"][i]["points"][1]
lr = ",准备出发"
insed = "到达主路"
elif i == len(data["steps"]) - 1:
origin = data["steps"][i]["points"][-2]
pointa = data["steps"][i]["points"][-1]
pointb = list(poiss[1])
lr = ",到达目的地附近"
insed = "到达目的地"
else:
origin = data["steps"][i - 1]["points"][-2]
pointa = data["steps"][i]["points"][0]
pointb = data["steps"][i]["points"][1]
lr = ""
insed = "下个路口" + ins
ins = GetRoadBookLR(origin, pointa, pointb)
data["steps"][i]["instruction"] = data["steps"][i]["instruction"].replace("tGetRoadBookLRt",
ins + lr).replace(
"tGetRoadBookLRNextt", insed)
return data
class AnalysisPath:
def __init__(self, data_dict):
self._type = True
self._msg = {"code": 200, "type": "success", "msg": "最短路径分析成功", "box": "", "data": None}
self.data_dict = data_dict
def __str__(self):
return 'AnalysisPath类实现路径分析'
def __Check(self, data):
if data[0]:
return data[1]
else:
self._type = False
self._msg = {"code": 204, "type": "fail", "msg": "最短路径分析失败", "box": "", "data": "%s" % data[1]}
return None
def Run(self, crs_distance=4544):
# ########################################################################################################
# # # 可运行--计算两点直线距离,以后功能扩展可用
# pois = [self.__Check(v) for v in
# [ToPoints(data=[v], crs=self.data_dict[__kself__].crs).XYs() for v in
# self.data_dict['file']]]
# if self._type:
# poi_distance = round(pois[0].to_crs(crs_distance).distance(pois[1].to_crs(crs_distance)).loc[0], 2)
#
# poiss_ind = [self.__Check(GetNodeInd(self.data_dict["kdtree"], __v)) for __v in self.data_dict['file'] if
# self._type]
# ########################################################################################################
poiss_ind = [self.__Check(GetNodeInd(self.data_dict["kdtree"], __v)) for __v in self.data_dict['file'] if
self._type]
if self._type:
if poiss_ind[0] == poiss_ind[1]:
self._msg["data"] = "起点和终点相同"
else:
nodeid_lineid = self.__Check(
GetNodeIdLineId(poiss_ind, self.data_dict["adjacency_matrix"], self.data_dict["node_dict"],
self.data_dict["node_key"]))
if self._type:
point_gd = SetOutFormatGD(self.data_dict['file'], nodeid_lineid, self.data_dict['node'],
self.data_dict[__kself__].loc[nodeid_lineid[1]][
["Name", "Shape_Leng", "geometry"]].to_dict())
self._msg["data"] = json.dumps(point_gd)
return self._msg
def Main(data_dict):
return AnalysisPath(data_dict).Run()
if __name__ == '__main__':
import numpy as np
from cartopy import crs as ccrs
import matplotlib.pyplot as plt
ax = plt.axes(projection=ccrs.PlateCarree(), aspect='auto')
import time, datetime
__start_time = time.time()
print('开始处理:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
import geopandas as gpd
from scipy.spatial import KDTree
path_base = r"C:myprojectspy_projectsget_polygon"
data_dict = dict()
nodes = set()
data_dict[__kself__] = gpd.read_file(path_base + __keyself__[__kself__])
for id in range(len(data_dict[__kself__])):
points = geommapping(data_dict[__kself__]["geometry"][id])["coordinates"]
####################################################################################
# 获取所有点--生成前期基础数据很慢,需要优化速度
# for v in points:
# nodes.append([v, data_dict[__kself__]["id"][id]])
####################################################################################
# 起点、中点、终点
nodes.add((points[0], data_dict[__kself__]["id"][id]))
# nodes.add((points[int(len(points)/2)], data_dict[__kself__]["id"][id]))
nodes.add((points[-1], data_dict[__kself__]["id"][id]))
data_dict[__kself__].plot(ax=ax)
node_dict = {}
tmp_xy = []
node = []
# 点关联到线,字典--key:点id,value:线id
for __i, __v in enumerate(nodes):
# 过滤重复点--根据线共用点情况,多份存储
if __v[0] in tmp_xy:
node_dict[tmp_xy.index(__v[0])].append(__v[1])
else:
node_dict[__i] = [__v[1]]
node.append(__v[0])
tmp_xy.append(__v[0])
else:
del tmp_xy
node_key = list(node_dict.keys())
data_dict["kdtree"] = KDTree(node)
data_dict["node"] = node
data_dict["node_dict"] = node_dict
data_dict["node_key"] = node_key
# 点连通性关系存储--邻接矩阵
adjacency_matrix = np.zeros(shape=(len(node), len(node)), dtype=int) - 1
for i in range(len(node_key)):
for j in range(len(node_key)):
if i == j:
adjacency_matrix[i][j] = 0
else:
for __vj in node_dict[node_key[j]]:
if __vj in node_dict[node_key[i]]:
adjacency_matrix[i][j] = __vj + 1
######################################################################################
# 当前算法未用到距离
# adjacency_matrix[i][j] = data_dict[__kself__].loc[__vj]["Shape_Leng"]
######################################################################################
else:
data_dict["adjacency_matrix"] = adjacency_matrix
data_dict['file'] = [(104.907, 33.408), (104.916, 33.398)]
# data_dict['file'] = [(104.028, 34.1034), (104.05, 34.1)]
# data_dict['file'] = [(105.115, 32.6145), (105.2, 32.6306)]
# data_dict['file'] = [(104.919, 33.394), (104.921, 33.391)]
tmp = Main(data_dict)
print(tmp)
print('本次耗时:', datetime.timedelta(seconds=time.time() - __start_time), 'n')
2.c
待补充...



