|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 ! P& \1 [8 H% W$ s5 P
( h& E& C# A& Z9 ]& j
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 2 }0 w# R( F6 [# s- q9 L, F
: L( e6 v% \% u n9 w/ F0 C$ l8 s! o) i 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。 3 W6 L7 g) |. g2 g
, Q u- ?$ y J9 [6 h- g% L
数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
* _, o, f' U3 f% ] 其中几个关键点解释一下: & c7 t3 M0 T/ ]- D6 w6 I
(1)首先定义拟合正太分布的函数 ( `- D* z; Q+ J9 O9 F5 ?1 Q5 O; q
def norm_fit(data):
4 V4 P5 T Q: I3 g, B/ X7 @, h loc, scale = st.norm.fit(data)) R2 f" K' y; A3 X4 h
return np.array([loc, scale])
- r) O# i5 q( q N6 m 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。 & v* R! J2 G1 Y+ j! P5 b
(2)xarray分块 6 [) y$ S) o9 X, N
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
/ n& ~: V2 t8 I, L x = x.chunk({"lev":5, "lat":100,"lon":100})
# F4 F2 V- a1 R4 H. G6 [ xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。 " ]% ]& K/ t# q/ H s* p' l
(3)xarray.apply_ufunc函数
. N o3 A0 E, m% U8 o result = xr.apply_ufunc(norm_fit,- y/ p. W' \- F) a- O$ s, W
x,
- _% S* q& [7 S+ a" L, z input_core_dims=[["time"]],
* r& J N9 Q, O# D' n; [. Y output_core_dims=[["paras"]],
]4 A0 F- E" n9 K; u& I F3 L* l dask="parallelized",
$ ~0 G9 }9 x4 W4 h, ?' W- g output_dtypes=[np.float],
: E4 p) p- F) ~6 q/ S6 E( ` dask_gufunc_kwargs={output_sizes:{"paras":2}}$ J9 O5 @" z( ]7 W/ U( c! U, x1 v
)
9 O: Q# X$ G3 I2 _. O7 H- Y h apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
$ @, i& W0 V/ E4 Y$ x* E: { n% M4 G
; M' t, R2 S. j4 k$ u4 ?* W4 x; a 以下是全部代码: 4 p+ {# p$ w) P- D
from scipy import stats as st4 Z' @* P5 ?1 L3 r- U3 q
) L+ [! d' u; }
import xarray as xr
2 c! @+ f. Z4 L4 P% y import dask# D5 E j7 ]6 e
import numpy as np+ T3 E7 K* [; c$ Y0 p m
from dask.diagnostics import ProgressBar
6 t# W" Q) K: `3 G$ H9 Q" U8 k" x
+ j' I8 U$ ~9 h+ U! v' l def norm_fit(data):! C$ K, H: Q/ ]
loc, scale = st.norm.fit(data)
( q4 u4 a! ~0 T return np.array([loc, scale]) q$ Q0 y3 j/ l0 u! L1 C
U) h: F. \9 L( H. h9 D( ~7 ] rs = np.random.RandomState(0)9 N4 R1 S0 ^9 g+ C& ?! S
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
( `, B, n9 M: t9 F- V x = x.chunk({"lev":5, "lat":100,"lon":100})
/ B* D" T1 t. m8 x2 c$ g
* y9 l+ ^( c' L/ F1 o #使用apply_ufunc计算,并用dask的并行计算
' |) d" N4 H4 ^' G% f result = xr.apply_ufunc(norm_fit,9 S! j$ ?9 B% L$ @
x,( B; b N' {3 x9 `7 |1 x
input_core_dims=[["time"]],
* W% |! _. u3 D4 a output_core_dims=[["paras"]],
* a& H. x! R# R' O3 M5 g& z. T dask="parallelized",
$ P% ?8 T& ]6 M, ~7 @( ^ output_dtypes=[np.float],5 ?5 z4 R& c3 \7 g
dask_gufunc_kwargs={output_sizes:{"paras":2}}
( I( G8 d6 Z% \, e( `$ q! w )5 n+ l3 }8 ?% e3 e5 |
4 F2 u$ v+ q B" v) h7 c, b/ n% x
#compute进行真实的计算并显示进度8 C2 ], J. L, h) g7 j7 I
with ProgressBar():# [9 u/ g! @, K$ F
result = results.compute()) w' Y$ J7 V+ p* s$ w: J
3 p6 Q' r N( J8 X' U0 ^3 P2 U
#结果冲命名保存到nc文件
0 c* A+ {8 [; h" x) C: j result = result.rename("norm_paras"). }+ u H6 V# c" s( j
result = result.to_netcdf("norm_fit_paras.nc",compute=False)8 W; v6 Q1 c! q3 X, Z/ L" q. N8 C
with ProgressBar():4 N4 y5 ?" a- D! G, ]
result.compute()
$ m% ~" y+ x2 Z9 W/ w3 O" i 转自:气海同途 % c6 ?* N. M! [/ a, |+ a. P& ^
关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |