|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。
% q2 ]9 u2 }. V0 T 1 W$ r3 x$ ]. w0 X3 A
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 . K( I) j, |% T4 l( p
- A% c5 v# i: g v/ n" a+ p 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。 ; k" h+ Q: m( \1 E' p
, P$ I$ n* b' w 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。 3 g- i& e6 s- s3 [4 B
其中几个关键点解释一下: + j" x3 y9 \' T
(1)首先定义拟合正太分布的函数
# Z. E! P+ c( K def norm_fit(data):! D& P, P. Y$ N' T6 j& Q! U5 Q9 u" O
loc, scale = st.norm.fit(data)# [# i: f9 ~/ I' I$ B
return np.array([loc, scale])
p1 u# w- a- B! T1 ]" j' X# f f 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
7 ^1 B% j0 u, d! S, a0 A4 C (2)xarray分块 1 q# T+ l# s; L9 t2 T( @
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
1 o+ S% Q5 w) Y% x x = x.chunk({"lev":5, "lat":100,"lon":100}) ( F% n! R0 n' W" t$ O' N$ S5 {8 _
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
- u5 T9 D' F. j- G (3)xarray.apply_ufunc函数 0 D) M$ v/ ~+ g) e- H' ~
result = xr.apply_ufunc(norm_fit,
# p2 @# [. F: J0 E/ C x,. V1 |) e# {# |% N: c- t/ A, n
input_core_dims=[["time"]],
2 D0 Q0 K; `% b+ z: a output_core_dims=[["paras"]],
8 }7 i) P, k, O- T) }% d9 E6 [ dask="parallelized",
6 G! { ]" {/ K output_dtypes=[np.float],' w4 ^. V! U7 B+ K8 |: \
dask_gufunc_kwargs={output_sizes:{"paras":2}}
7 L8 m8 ^7 K4 [' E )
9 B! m D- j/ G( K8 z apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) 4 _! m, ^, }) {1 g* S7 X7 ?$ U" I
8 A1 i" H! c+ t+ v0 e" V 以下是全部代码:
6 M+ V+ R) P: c7 @ from scipy import stats as st/ c9 D% \& f8 B: o4 K
) L# u& F1 v8 f# H9 n import xarray as xr
: x1 n r. p+ q+ S, @- f& G import dask
6 d7 h4 F( w2 D7 `' J% w9 O9 |; b( |2 W import numpy as np+ D: A% N7 E2 \! I& B
from dask.diagnostics import ProgressBar2 h3 W. v4 X e) \/ ^& i6 h4 \
% @ N6 r7 e6 w; m
def norm_fit(data):
2 L0 H, `; ?# S" p/ C7 \6 G; `5 p loc, scale = st.norm.fit(data)4 L# |( }& F% o; E3 ~$ ]
return np.array([loc, scale])
2 p$ L* y: F+ s4 j0 x* A& H$ o& W6 [6 i0 [* E! E, |" d
rs = np.random.RandomState(0)
: l6 h( R4 |0 m x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
* Z0 ?- Y4 k% P4 ^; R. ?" Q7 _ x = x.chunk({"lev":5, "lat":100,"lon":100})
, M0 P' ^! V F# v2 b6 l& }& u- W5 a* I" l
#使用apply_ufunc计算,并用dask的并行计算
, P6 `2 Y6 U# x) b( L) [+ f# O; u! b result = xr.apply_ufunc(norm_fit,
: ~$ i! w1 \% |0 s/ Z x,; f! c. N; C, o4 P
input_core_dims=[["time"]],
7 N: t( x2 V Z$ L/ Z4 F% l output_core_dims=[["paras"]],
3 f3 ]4 W" a R% w3 O2 [ dask="parallelized",5 K. e% m, x+ B" w; c) M
output_dtypes=[np.float],3 @6 g2 Y/ F- C- h: O
dask_gufunc_kwargs={output_sizes:{"paras":2}}
# E5 e3 ?# |4 Y3 l3 n5 B )
1 l$ ~( Y% q- E8 k+ }$ T1 z! E: b! y' m, `% k: ?6 a& A# X
#compute进行真实的计算并显示进度
4 D( d9 `+ m) Q with ProgressBar():' w% C2 ^$ g5 Q2 r
result = results.compute()4 b- D/ l+ w% j- |6 x! r% J
3 `% ~2 h9 x7 V9 u2 d
#结果冲命名保存到nc文件
8 }" t, U: e7 F! j3 N# {& {7 m result = result.rename("norm_paras")3 Y7 e8 ]2 i) |6 W' w2 a$ g
result = result.to_netcdf("norm_fit_paras.nc",compute=False)
; ~2 f' ^2 M4 U0 g8 i' p with ProgressBar():
1 I3 I; W- H3 o2 k result.compute()
* p2 T6 U0 J x- E 转自:气海同途
4 i- V1 f4 b0 S+ b& M 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |