要求:
将特定格式的txt文本,使用geopandas库转化为线矢量。
注意:安装geopandas时,由于geopandas是python第三方库,其有很多依赖库,如GDAL,Fiona,Shapely,pyproj等,不能直接使用pip安装,安装依赖库后才能使用 pip install geopandas 安装。依赖库的安装不细说,在 https://www.lfd.uci.edu/~gohlke/pythonlibs 中找到对应版本的.whl文件就行,下载后放到对应的python环境的scripts文件夹下,然后使用使用pip 安装即可。
注:在cmd中,输入pip install 后,直接将.whl文件用鼠标拖到pip install 的空格后面,即可自动输入路径。
本例使用的txt格式:
完整代码:
from osgeo import gdal
import re
import os
import csv
import time
import pandas as pd
import geopandas as gpd
from shapely.geometry import LineString
def read_wr_txt(txt_path,temp_2lines_path,temp_line_path):
file=open(txt_path,encoding='utf-8')
datas=file.readlines()
#将txt文件分离
lines_2=datas[0].split('[')[0]#前一部分是范围线数据
line=datas[0].split('[')[1]#后一部分是卫星实际轨迹线数据
records_2lines=re.findall("[(](.*?)[)]",lines_2)[0].strip('()')#写入第一部分,这部分是字符串,需要分割
point_2list=records_2lines.split(",")
with open(temp_2lines_path,"w") as f:
for point in point_2list:
temp=point.strip().split(" ")
x=temp[0]
y =temp[1]
f.write(x+','+y+'n')
f.close()
records_line = re.findall("[(](.*?)[)]", line)#写入第二部分,这部分是点数组,直接读
with open(temp_line_path,"w") as f:
for point in records_line:
temp=point.split(" ")
x = temp[0]
y = temp[1]
f.write(x + ',' + y + 'n')
f.close()
#分别对两个txt文件转为csv
def txt_to_csv(temp_line_path,temp_line_csv):
fh = open(temp_line_csv, "w+", newline='')
writer = csv.writer(fh)
writer.writerow(["x_start", "y_start", "x_end", "y_end"])
file = open(temp_line_path,encoding='utf-8')
data=file.readlines()
res = []
for i in range(len(data)):
start_word=data[i].strip().split(',')
if i == len(data)-1:
end_word = data[i].strip().split(',')
else:
end_word = data[i+1].strip().split(',')
d=[start_word[0],start_word[1],end_word[0],end_word[1]]
res.append(d)
writer.writerows(res)
file.close()
fh.close()
# CSV转换为shapefile线数据,数据格式为x_start, y_start, x_end, y_end
def csv_to_lines(temp_line_csv,result_line_shp):
df = pd.read_csv(temp_line_csv, header=0, encoding='gbk')
geometry = [LineString(xy_list) for xy_list in zip(zip(df.x_start, df.y_start), zip(df.x_end, df.y_end))]
gdf = gpd.GeoDataframe(df, crs="EPSG:4326", geometry=geometry)
gdf.to_file(result_line_shp, encoding='gbk')
def create_txt2shp(txt_path):
os.makedirs(os.getcwd()+r"txt", exist_ok=True)
os.makedirs(os.getcwd() + r"csv", exist_ok=True)
os.makedirs(os.getcwd() + r"shp", exist_ok=True)
temp_2lines_path = os.path.join(os.getcwd()+r"txt", "2lines_" + os.path.split(txt_path)[1])
temp_line_path = os.path.join(os.getcwd()+r"txt", "1line_" + os.path.split(txt_path)[1])
temp_2lines_csv = os.path.join(os.getcwd()+r"csv", "2lines_" + os.path.split(txt_path)[1][0:-4] + ".csv")
temp_line_csv = os.path.join(os.getcwd()+r"csv", "1line_" + os.path.split(txt_path)[1][0:-4] + ".csv")
result_2lines_shp = os.path.join(os.getcwd()+r"shp", "2lines_" + os.path.split(txt_path)[1][0:-4] + ".shp")
result_line_shp = os.path.join(os.getcwd()+r"shp", "1line_" + os.path.split(txt_path)[1][0:-4] + ".shp")
read_wr_txt(txt_path, temp_2lines_path, temp_line_path)
txt_to_csv(temp_2lines_path, temp_2lines_csv)
txt_to_csv(temp_line_path, temp_line_csv)
csv_to_lines(temp_2lines_csv, result_2lines_shp)
csv_to_lines(temp_line_csv, result_line_shp)
print("finished!")
#使程序更具健壮性,输入目录或者文件名,都可以进行处理
def dir_or_file(path):
if not os.path.exists(path):
print("输入路径不存在,请重新输入!")
return 0
if os.path.isdir(path):
File_Path = os.listdir(Root_Path)
if not File_Path:
print("输入路径有误!请重新输入!")
return 0
bool=False
for file in File_Path:
if file.endswith(".txt"):
bool=True
print("正在处理" + file)
create_txt2shp(os.path.join(Root_Path, file))
if not bool:
print("该路径下没有txt文件,请重新输入!")
return 0
if os.path.isfile(path):
print("正在处理" + path)
create_txt2shp(path)
if __name__ == '__main__':
start=time.time()
Root_Path=r"C:UsersAdministratorDesktoptxt2shptest"
dir_or_file(Root_Path)
end = time.time()
print("用时"+str(end-start)+"秒!")
结果为:
生成的shp文件在项目路径的shp文件夹下:
shp文件列表:
生成的shp线要素效果:



