|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 4 F. U; N4 i; _
, Z' h# c$ r* F1 |& \8 L& A ~ 大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
0 m) c, O' L. p4 s. z' d
& X0 I/ Q$ s6 y; d, m 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。 # A- J# w9 |% r
$ ?, P0 P% x" b; ]. l 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
+ ]) I$ E; w& y 其中几个关键点解释一下: # @* P$ _; E; s
(1)首先定义拟合正太分布的函数 " j7 o* l2 e: a9 j1 ]$ ^- h
def norm_fit(data):7 o% P y8 I7 K. d9 t1 U% x( L
loc, scale = st.norm.fit(data)
9 W' O+ z& t+ A( `- E7 e1 |) G return np.array([loc, scale]) ) h: n! ?$ v. s8 U- M
这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
: ^' Q9 C& [# n' S1 s! v7 x (2)xarray分块
4 E# `5 b! E2 B0 v. o2 y! s x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
5 Z7 N. T8 ^6 T6 e" a; g/ C) ~! B x = x.chunk({"lev":5, "lat":100,"lon":100})
5 `! b& X2 F0 E- n) j) T xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。 ) B+ D: l6 o! M/ J) G2 N
(3)xarray.apply_ufunc函数
4 g' q+ c5 X5 L1 y/ H2 C. e0 k: [ result = xr.apply_ufunc(norm_fit,% x+ c7 i5 Z. E& `5 \! ~; `
x,
. w) a3 {) |% ?! N/ W- P5 \ input_core_dims=[["time"]],7 A) p& y4 r7 |7 D
output_core_dims=[["paras"]],
! @' m& P; Q- S2 n! c$ q m dask="parallelized",) O# i; ?# L! s+ X$ |' a- Z
output_dtypes=[np.float],
/ a1 G* `" c9 o w& S; E9 z! f dask_gufunc_kwargs={output_sizes:{"paras":2}}
; M* ? h* T& ] )
9 Q9 ?" u+ F9 ]4 L l apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) 5 u& y/ a+ }4 I
2 q) E$ n& d: d0 `' O/ M* j
以下是全部代码: $ J* R* M% Q- w5 q
from scipy import stats as st
2 l, i. n5 f: X% @& \
( k. N! Z W- Q' r% W8 R import xarray as xr3 B9 ^2 n' K. o
import dask
* {* {5 I, f' X3 e* t import numpy as np4 g7 y8 w/ W7 k' k* u5 e
from dask.diagnostics import ProgressBar
2 H6 s3 Y4 H4 |. J4 E; E/ y' z! e0 R# d% U3 m
def norm_fit(data):' l/ Y8 [$ h& z c. T
loc, scale = st.norm.fit(data)
+ G5 Z+ {4 A8 |; s+ ? return np.array([loc, scale])
: a$ Q" _1 C9 R# g' h* O# A; m! I
. \6 M( I' _- G$ r6 I3 H" A# t rs = np.random.RandomState(0)
; z9 m5 I7 Q# J* E5 `$ t+ L) B" G x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])1 N) K/ J1 ^+ y3 q
x = x.chunk({"lev":5, "lat":100,"lon":100})
* V0 E" R! C& v0 w# v$ ?
7 V; \) ` [1 P5 Z+ i #使用apply_ufunc计算,并用dask的并行计算. m* R/ L B1 Q" ]3 V# K
result = xr.apply_ufunc(norm_fit,- D) Q; D+ G A: f1 a4 o9 Q
x,/ C0 D; b: r, L( [. v
input_core_dims=[["time"]],2 k" ]/ h+ S4 f- p$ |* U7 p: P3 |
output_core_dims=[["paras"]],( V' X4 m2 x' m& D. D% j
dask="parallelized",
3 T0 ?: F8 d% g output_dtypes=[np.float],
2 w# c; ~1 e0 w( I; ? dask_gufunc_kwargs={output_sizes:{"paras":2}}
$ Q5 {+ [+ I2 P. R )0 I. [4 _$ m* O' e0 U, M
3 g! A# W8 N+ Y* u/ g: _ #compute进行真实的计算并显示进度
; u$ h& w! u) y3 m with ProgressBar():
3 G, l* y2 p3 `( [( F" I5 G' c& N result = results.compute()
. m/ b4 ?* ^$ D0 d% t" m% {5 X( c( G6 C) u% {
#结果冲命名保存到nc文件
9 O" p; Y J/ T4 K* a: v result = result.rename("norm_paras")
& i% a3 e3 U0 R N( q' } result = result.to_netcdf("norm_fit_paras.nc",compute=False)
) D) ^( }9 I, }# x with ProgressBar():
, ^6 B& Z$ C0 c8 N6 x6 n [" P0 H result.compute()
0 u3 h/ C/ l! m. [ 转自:气海同途 ! O% \. J; I2 e# r
关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |