|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 3 p+ b5 x% S5 b2 l
2 Y' Y1 P9 p# i4 I0 T" |/ C4 ^5 ^ 大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
7 s8 c# L" m1 `- P7 ^" K! X5 a0 X6 A( _& m! ?3 u
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。 8 Z! q! g* {) ]3 @7 d' b7 G8 S
6 Y. N6 b# J8 n 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。 ! y' u' R6 N3 U# V: I; t
其中几个关键点解释一下:
! ^3 Z( u" ~. v- I (1)首先定义拟合正太分布的函数 & P# L) R6 y- } |. X' U7 [; z
def norm_fit(data):
' n0 j; |; e! c9 n& c loc, scale = st.norm.fit(data)
2 r7 j2 n0 D& E6 D return np.array([loc, scale])
0 B3 i* ] g p# t+ F' n8 T% `# X 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
' @6 p$ w2 ^ u6 e9 }& [ (2)xarray分块
, N9 M) [& m+ Z% y- J: s x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
7 u) K' a8 l6 q4 a) w3 Z! ] x = x.chunk({"lev":5, "lat":100,"lon":100}) " }* C5 J" X3 h& S
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
5 l( W9 Y$ L2 T+ \* L# O (3)xarray.apply_ufunc函数
& F$ D$ B4 r* v) ^# ^, [9 V- v3 T! T result = xr.apply_ufunc(norm_fit,
4 X) q" n# T9 Z; O. a P- R+ M! }; C5 Y x,6 w- i& n7 u6 Q9 o F
input_core_dims=[["time"]],
. W- ^/ U0 I; m% f2 e- o+ Q output_core_dims=[["paras"]],
+ T) M/ _/ O. }/ Q dask="parallelized",- h7 P% A9 [1 I1 s, a/ c
output_dtypes=[np.float]," Z, d0 v+ ?* m. \5 t. y6 A( y, [
dask_gufunc_kwargs={output_sizes:{"paras":2}}
# q4 K9 g6 D0 a9 n, ]$ C1 H )
* y/ K, T$ G) c" _: ]6 H apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
9 H; K# w) Y8 }! M! y( X/ A" h& ?& Q P% ?4 ]- o
以下是全部代码:
4 h, M4 A5 A( H. a. v/ R from scipy import stats as st' B8 x2 A" L. N
, \6 W# H' n8 u' e8 t5 I9 X import xarray as xr
) j2 x7 e' F+ B9 V' E import dask1 w- Y( d8 B; q
import numpy as np: w" R. a. ~) [. S+ l9 j5 b& {
from dask.diagnostics import ProgressBar) K, Z) i9 @( I- s. _
3 a3 _% J% r1 B( m& v def norm_fit(data):
8 f+ d6 z `; l) g2 l! _+ w4 x- g loc, scale = st.norm.fit(data)# @! j) F9 i9 S
return np.array([loc, scale])* I6 \/ S# L9 j, y( v) {4 C
7 X2 g) F/ s6 ], L9 Q9 Z& |
rs = np.random.RandomState(0)
; _4 K# Z# ?- z9 M& x7 T) V8 e x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
& j: f( W- D- ? ~" v x = x.chunk({"lev":5, "lat":100,"lon":100})# v1 n! ` X2 R9 m9 m
' J# W% V" C( A; C* Z1 U b) i #使用apply_ufunc计算,并用dask的并行计算) ~5 s p" r% F0 q- G
result = xr.apply_ufunc(norm_fit,
N( b4 w6 E n% M x,
( N! E1 [; [! l5 X. a: j input_core_dims=[["time"]],7 G# y& C# s' g* K
output_core_dims=[["paras"]],5 a: C- t$ I6 [; z6 _% \: H m
dask="parallelized",
5 d% e) e* _/ j5 C8 E/ ~ l output_dtypes=[np.float],
2 q9 K) s' v9 `9 T6 j9 C dask_gufunc_kwargs={output_sizes:{"paras":2}}9 u9 X$ p+ P+ H* p/ E# Q
); N( [6 H1 m" O' t! \0 I
9 a4 r2 ^; h3 e6 k6 v( G0 |7 N
#compute进行真实的计算并显示进度% I, r0 f9 J) x' A* L. F+ _8 E
with ProgressBar():% d0 \% J4 }7 [* z- ]8 a s4 |& [
result = results.compute()6 v4 S l" h& p( V9 O6 J$ ]
W0 `0 c& W7 s #结果冲命名保存到nc文件% }9 m4 b4 T: f3 z5 G$ Y: ]
result = result.rename("norm_paras")( z2 Y3 M0 S3 S
result = result.to_netcdf("norm_fit_paras.nc",compute=False) L6 d F* M' f- N: }* v
with ProgressBar():4 b! Q" z9 h* F2 v: F' N
result.compute() - r1 i6 M8 ~! V9 ^0 j% W0 ~- F
转自:气海同途
9 b, B0 o5 d2 ^9 q7 g 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |