python dask_使用Python并行框架Dask处理和分析大规模时空数据

并行是提高大数据处理和分析速度的关键,不管是CPU并行还是GPU并行,核心的观念就是将大数据变为小的分块,然后让计算机操作系统调配可用资源进行处理。使用C,C++等语言可以调用openmpi,CPP-taskflow以及Interl TBB等框架管理执行并行程序,但是,程序执行的效率取决于对底层数据分块策略以及需要仔细考虑网络传输成本,并行程序的效率完全取决于程序员的天赋和水平。在这里,暂且不讨论GPU这一块,因为在后续我会专门进行讨论。对于Java的忠实粉来讲,使用Spark框架无疑会极大提高速度,只可惜,如果手头上只有一台好点的机器,且老板又在追成果时,就会比较难过了。因为配置Spark集群也是需要超多耐心去配置参数的,而且虚拟机本身负载也是很大的。对于使用python进行时空数据分析和实验时,dask框架无疑带来了极大的方便,可以不考虑那么多底层并行细节。考虑到大家有很多都会使用python的Multithreading以及Multiprocessing模块,例如,使用以下python文件

from multiprocessingimport Pool

pool = Pool(N) #这里的N表示开启多少个进程

area_list= pool.map(func, parameters)#func表示每个进程都会执行的函数

pool.close() #上面的parameters表示向函数传递的参数

pool.join() #最后我们等待每一个线程执行完成并回收进程所用资源

但是,同样这种底层的操作,让我们无法完全利用可用的计算资源,例如,我手头上有一个 E5-2699 v4 CPU,22核心44线程,如果只开2个进程,那这台几乎等于在休眠了。

而python dask提供了动态资源调度能力以及“大”集合支持(使用分布式内存缓存,例如矩阵、列表以及数据帧dataframes)。和Spark弹性分布式数据集类似, python依靠这些对象,生成有向无环图描述复杂算法的并行逻辑。在这里,首先介绍Dask Array, Dask Bag, Dask Dataframe, Dask Dealyed以及Dask Futhure对象之后,用一个具体的例子说明,选择使用Dask Bag做影像矢量化操作的并行化方法。最后,总结下全文核心部分。

Dask Array对于执行幂等操作的大型矩阵数据特别适用,它首先会按照Chunk分块策略将矩阵分成小块,然后会将这些分块矩阵操作转换为互相关任务图模型,用数据之间的依赖关系,用多线程执行图模型中分块矩阵操作。说它对于幂等操作特别适用,是因为,各分块矩阵独立计算的结果,彼此不会根据另外分块计算的结果改变而改变。例如,对矩阵中所有元素进行加操作,分块矩阵的计算结果就与其它分块计算的结果没有关系。在Dask Array上执行并行操作,程序的运行效率与分块策略相关。官方给出的五点建议是:

1.分块的数据大小必须足够小,能够被装载到内存。

2.分块的数据大小必须足够大,对每一分块的计算操作要大于任务调度的时间开销(1ms)

3.分块的大小在10MB-1GB之间是合适和常见的,只要不超过内存大小限制并考虑计算时延

4.对多个矩阵进行操作时,矩阵分块的大小尽量一致

5 当存储和导入数据时,分块大小尽量和特定存储约束保持一致

Dask Dataframe对象则 在处理远大于当前主机内存的表格数据有用。与传统pandas Dataframe在加载完成所有数据后继续数据类型推断不同Dask Datadrame支持部分加载数据时,对表格数据类型进行推断。Dask Dataframe实现了分块并行Dataframe, 对Dask Dataframe的操作将被映射到按索引列划分的子Dataframe上,例如:可以使用DepDelay延迟执行操作

maxes = []

for fn in filenames:

df = pd.read_csv(fn)

maxes.append(df.DepDelay.max())

和pandas Datafeames不同的地方还在于延迟执行操作,常用的方法包括map_paritions,map_overlap以及reduction方法.map_paritions对每一分块施加并计算操作,map_overlap和Spark windows函数类似,对于处理不同分块相邻近的行数据进行操作比较有用。reduction则代表对分块进行聚合的操作函数。

Dask future和Dask bag以及Dask delayed密切相关,它代表在分布式环境下执行的某个操作将在未来某个时间返回,例如:

from dask.distributed import Client

client = Client() # start local workers as processes

def inc(x):

return x + 1

a = client.submit(inc, 10) # calls inc(10) in background thread or process

a.result() # blocks until task completes and data arrives

在调用result之前,这里的a实际上就是一个Future对象。

在实际工作中,用到最多的还是Dask Dealyed以及Dask bag两个对象,这也是本次要介绍的重点。为方便理解,我们将Delayed对象看成是python装饰器,它的作用是将普通的python函数封装成Dask延迟装载的函数,例如:

import dask

@dask.delayed

def inc(x):

return x + 1

@dask.delayed

def double(x):

return x * 2

@dask.delayed

def add(x, y):

return x + y

data = [1, 2, 3, 4, 5]

output = []

for x in data:

a = inc(x)

b = double(x)

c = add(a, b)

output.append(c)

但这种直接装饰器假比较麻烦,假设我们要为所有感新区的湖泊水库计算面积,我们可以直接使用dask.delayed对象初始化要执行的python函数。

client = Client(threads_per_worker=10, n_workers=2)

print(client)

lazy_results = [] #记录延迟结果

for specify_which_reservior in lakes:

lazy_result =dask.delayed(calculat_regions)(specify_which_reservior)

lazy_results.append(lazy_result)

futures = dask.persist(*lazy_results) # trigger computation in the background

results = dask.compute(*futures)

Dask Dealyed在分布式环境中注册执行任务,对任务丢的开销通常在几百微秒左右,通常这对于要处理巨型集合的任务来讲是不使用的,例如,我们要处理的得到的是全球湖泊水库的面积,这种任务生成和调度的开销是比较大的。另外一个明智的选择是使用 Dask Bag,它更适合以批的方式执行复杂算法,是类似于Spark RDD分布式数据集的对象,当然,在Dask bag中是允许嵌套Dealyed对象的,本文在后面如何并行矢量化栅格数据集。

栅格数据例如遥感影像通常都比较巨大,生硬地调用gdal_polygonize.py会造成未知的处理延时,特别是当我们需要矢量化的影像数据特别多时,因此,比较合理的做法是先将这些影像转换为小的分块,然后在单独矢量化,最后通过ogr2ogr合并这些矢量:我们使用了清华大学宫鹏团队发布的全球水体数据集,期望通过这些数据集,以及全球水库湖泊本底矢量获取其中大型湖泊水库的逐日面积数据。

# -*- coding: GBK -*-

import os

import geopandas as gpd

import earthpy as et

import os

import numpy as np

import rasterio as rio

from rasterio.mask import mask

from rasterio.plot import show

import pandas as pd

# Package created for the earth analytics program

import earthpy as et

import earthpy.spatial as es

from rasterio.warp import calculate_default_transform, reproject, Resampling

from fiona.crs import from_epsg,from_string

from utils import *

from fnmatch import fnmatchcase as match

import pyproj

from functools import partial

from multiprocessing import Pool

from functools import partial

from osgeo import ogr

from osgeo import osr

from area import area

from shapely.geometry import box

from fiona.crs import from_epsg,from_string

from utils import *

from fnmatch import fnmatchcase as match

import pyproj

from functools import partial

import warnings

from gdalconst import *

import dask

from dask.distributed import Client, progress

import dask.bag as db

import glob

import shutil

from pathlib import Path

warnings.simplefilter('ignore')

#输入一个reservior,输出一个列表

def calculat_regions(specify_which_reservior):

utility = utils()

specify_year=2008

df = gpd.read_file(r"./shapefilegallery/Export_Output.shp")

df.crs = {'init': 'epsg:4326', 'no_defs': True}

#df = df.to_crs('PROJCS["unnamed",GEOGCS["Unknown datum based upon the custom spheroid",DATUM["Not_specified_based_on_custom_spheroid",SPHEROID["Custom spheroid",6371007.181,0]],PRIMEM["Greenwich",0],UNIT["degree",0.0174532925199433]],PROJECTION["Sinusoidal"],PARAMETER["longitude_of_center",0],PARAMETER["false_easting",0],PARAMETER["false_northing",0],UNIT["metre",1,AUTHORITY["EPSG","9001"]]]')

df = df.to_crs("EPSG:4326")

modis_bound_data = np.genfromtxt('sn_bound_10deg.txt',

skip_header=7,

skip_footer=3)

modis_tiles = []

for row in modis_bound_data:

bbox_= box(row[2],row[4], row[3], row[5])

bbox_.crs= {'init': 'epsg:4326', 'no_defs': True}

bbox_.crs = from_epsg(4326)

modis_tiles.append(bbox_)

hvmps=[]

for tile in modis_tiles:

intersection = df.loc[df['GLWD_ID']==specify_which_reservior,'geometry'].iloc[0].intersects(tile)

if(intersection !=False):

temppair=retrive_hv(modis_bound_data,tile.bounds)

hvmps.append(temppair)

allfiles=[]

for pair in hvmps:

convertedH = int(pair[0])

convertedV =int(pair[1])

temp_str="h"+ str(convertedH).rjust(2,'0')+"v"+ str(convertedV).rjust(2,'0')

image_path = os.path.join("D:\\globaldatasets"+"\\"+str(specify_year)+"_waterbody",str(specify_year),temp_str)

tmp = utility.printPath(1, image_path)

for i in tmp:

temp_str = os.path.join(image_path, str(i))

allfiles.append(temp_str)

#specify_pattern = str(specify_day).rjust(3, '0') + "_water"

#matched_files = [fpth for fpth in allfiles if match(fpth, "*" + specify_pattern + ".tiff")]

return (specify_which_reservior,allfiles)

#我们只需要水体部分的数据

def filter_water(allfiles):

#specify_pattern = "_water"

#specify_day = 1

#specify_pattern = str(specify_day).rjust(3, '0') + "_water"

specify_pattern = "_water"

allfile = allfiles[1]

specify_which_reservior = allfiles[0]

matched_files = [fpth for fpth in allfile if match(fpth, "*" + specify_pattern + ".tiff")]

return (specify_which_reservior, matched_files)

#将单个水体文件切分成64*64的小块,其实在这个地方就可以求它的额面积

def spilt_image_using_gdal(image_file,specify_which_reservior):

print("we going to process the resevior....." + str(specify_which_reservior))

subarrays_files = []

dset = gdal.Open(image_file)

if dset is None:

return subarrays_files

width = dset.RasterXSize

height = dset.RasterYSize

tilesize = 240

#判断存放shp文件夹是否存在,如果不存在则创建

final_output_shp_path = os.path.join(os.getcwd(), "outputshp"+str(specify_which_reservior))

if os.path.isdir(final_output_shp_path)==False:

os.mkdir(final_output_shp_path)

#判断存放tiff的文件夹是否存在,如果不存在则创建

final_output_tiff_path = os.path.join(os.getcwd(),"outputtiff"+str(specify_which_reservior))

if os.path.isdir(final_output_tiff_path)==False:

os.mkdir(final_output_tiff_path)

outputname_prefix = os.path.splitext(os.path.basename(image_file))[0]

filnal_output = os.path.join(final_output_shp_path, outputname_prefix + ".shp")

if(os.path.exists(filnal_output)==True): #有时候程序自然挂了,有点sb的样子

return filnal_output

for i in range(0, width, tilesize):

for j in range(0, height, tilesize):

w = min(i + tilesize, width) - i

h = min(j + tilesize, height) - j

the_file = os.path.join(final_output_tiff_path,outputname_prefix + "_" + str(i) + "_" + str(j) + ".tiff")

#the_file = outputname_prefix + "_" + str(i) + "_" + str(j) + ".tiff"

#如果这个tiff存在,我们就将其转换为shap

if os.path.exists(the_file)==True:

output_name = os.path.join(final_output_shp_path,

outputname_prefix + "_" + str(i) + "_" + str(j) + ".shp")

if os.path.exists(output_name)==False:

command = " gdal_polygonize.py " + the_file + " "

command = command + output_name + " -f \"ESRI Shapefile\""

os.system(command)

subarrays_files.append(output_name)

else: # 如果存在则删除之后,在运行上面的

os.remove(os.path.join(final_output_shp_path, outputname_prefix + "_" + str(i) + "_" + str(j) + ".shp"))

os.remove(os.path.join(final_output_shp_path, outputname_prefix + "_" + str(i) + "_" + str(j) + ".shx"))

os.remove(os.path.join(final_output_shp_path, outputname_prefix + "_" + str(i) + "_" + str(j) + ".dbf"))

os.remove(os.path.join(final_output_shp_path, outputname_prefix + "_" + str(i) + "_" + str(j) + ".prj"))

output_name = os.path.join(final_output_shp_path,

outputname_prefix + "_" + str(i) + "_" + str(j) + ".shp")

command = " gdal_polygonize.py " + the_file + " "

command = command + output_name + " -f \"ESRI Shapefile\""

os.system(command)

subarrays_files.append(output_name)

else: #不存在,则按原定程序运行

gdaltranString = "gdal_translate-of GTiff -srcwin " + str(i) + ", " + str(j) + ", " + str(w) + ", " \

+ str(h) + " " + image_file + " " + the_file

os.system(gdaltranString)

# 再转换之后,开始对其进行矢量化,并删除对应的tif

output_name = os.path.join(final_output_shp_path,

outputname_prefix + "_" + str(i) + "_" + str(j) + ".shp")

command = " gdal_polygonize.py " + the_file + " "

command = command + output_name + " -f \"ESRI Shapefile\""

os.system(command)

try:

os.remove(the_file)

except FileNotFoundError as e:

print("cannot remove unfound file ")

subarrays_files.append(output_name)

#filnal_output = os.path.join(final_output_shp_path, outputname_prefix+".shp")

filnal_output = os.path.join(final_output_shp_path, outputname_prefix + ".shp")

for the_subarray in subarrays_files: #the_subarray 表示的是经过转换为shp的子文件

command = "ogr2ogr" + " -f \"ESRI Shapefile\"" + " -update -append " + filnal_output + " "

command = command + the_subarray

os.system(command)

#最后我们删除所有不需要的文件

for the_subarray in subarrays_files:

try:

zz=os.path.splitext(os.path.basename(the_subarray))[0]

os.remove(os.path.join(final_output_shp_path, zz+".shp"))

os.remove(os.path.join(final_output_shp_path, zz + ".shx"))

os.remove(os.path.join(final_output_shp_path, zz + ".dbf"))

os.remove(os.path.join(final_output_shp_path, zz + ".prj"))

except FileNotFoundError as e:

print("cannot remove unfound file ")

print("meregy done ........................hwhhwwhhwhwhwhwhhhhhhhhhhhhhh...." + filnal_output)

return filnal_output

#将所有大的水体切分成小的水体

def convert_to_small_tif(waterfiles):

lazy_results = [] # 记录延迟结果,看下这里能否用delayed

for the_water_file in waterfiles[1]:

#这里的waterfiles其实就是那个waterreserviro编号

lazy_result = dask.delayed(spilt_image_using_gdal)(the_water_file, waterfiles[0])

lazy_results.append(lazy_result)

return lazy_results

if __name__ == "__main__":

#需要求最大湖泊和水库的GWID

utility = utils()

lake_reserviro = utility.getmaxreserviors_lakes()

print(lake_reserviro[1])

lakes = lake_reserviro[1][2:]

print(lakes)

bag_allfiles= db.from_sequence(lakes) #根据所有lake获得对应的所有文件

bbag_= bag_allfiles.map(calculat_regions) #在所有文件中筛选指定区域的文件

bbag_water= bbag_.map(lambda x: filter_water(x)) #在所有筛选过的文件中,只找出是水体的文件

bbag_small_water= bbag_water.map(lambda x: convert_to_small_tif(x))

#好像这个地方也是可以继续在dealyed结果上,并行的

result = bbag_small_water.compute()

futures = dask.persist(*result) # trigger computation in the background

results = dask.compute(*futures)

print(results)

最为重要的是标注黑色的部分代码,bag_allfiles, bbag_ bbag_water 这三个Dask bag对象分别是获取特定湖泊水库的影像文件对应的所有文件,获取覆盖指定水湖泊区域的文件,以及找出那写特定水体影像文件,为了便于理解,可以看下解压全球水体数据集之后的文件目录情况:

在最里层的文件夹里面,包含的文件信息如下:

通过使用Dask bag以及嵌套Dask Dealyed对象,可以对覆盖特定湖泊水库的影像文件进行矢量化。

最后,作为总结,使用python Dask的动态调度和Bag对象,可以并行化对海量空间数据进行处理和分析。尽量使用 Dask bag对象创建延迟加载的复杂算法,可以看到使用这些对象做并处理还是十分方便和具有资源利用优势的,例如下图1表示用python多进程并行矢量化影像的资源利用情况:Python Dask并行矢量化海量影像的资源负载情况

  • 0
    点赞
  • 2
    收藏
    觉得还不错? 一键收藏
  • 0
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值