收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

% q2 ]9 u2 }. V0 T
1 W$ r3 x$ ]. w0 X3 A

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

. K( I) j, |% T4 l( p - A% c5 v# i: g v/ n" a+ p

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

; k" h+ Q: m( \1 E' p , P$ I$ n* b' w

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

3 g- i& e6 s- s3 [4 B

其中几个关键点解释一下:

+ j" x3 y9 \' T

(1)首先定义拟合正太分布的函数

# Z. E! P+ c( K
def norm_fit(data):! D& P, P. Y$ N' T6 j& Q! U5 Q9 u" O loc, scale = st.norm.fit(data)# [# i: f9 ~/ I' I$ B return np.array([loc, scale])
p1 u# w- a- B! T1 ]" j' X# f f

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

7 ^1 B% j0 u, d! S, a0 A4 C

(2)xarray分块

1 q# T+ l# s; L9 t2 T( @
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 1 o+ S% Q5 w) Y% x x = x.chunk({"lev":5, "lat":100,"lon":100})
( F% n! R0 n' W" t$ O' N$ S5 {8 _

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

- u5 T9 D' F. j- G

(3)xarray.apply_ufunc函数

0 D) M$ v/ ~+ g) e- H' ~
result = xr.apply_ufunc(norm_fit, # p2 @# [. F: J0 E/ C x,. V1 |) e# {# |% N: c- t/ A, n input_core_dims=[["time"]], 2 D0 Q0 K; `% b+ z: a output_core_dims=[["paras"]], 8 }7 i) P, k, O- T) }% d9 E6 [ dask="parallelized", 6 G! { ]" {/ K output_dtypes=[np.float],' w4 ^. V! U7 B+ K8 |: \ dask_gufunc_kwargs={output_sizes:{"paras":2}} 7 L8 m8 ^7 K4 [' E )
9 B! m D- j/ G( K8 z

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

4 _! m, ^, }) {1 g* S7 X7 ?$ U" I 8 A1 i" H! c+ t+ v0 e" V

以下是全部代码:

6 M+ V+ R) P: c7 @
from scipy import stats as st/ c9 D% \& f8 B: o4 K ) L# u& F1 v8 f# H9 n import xarray as xr : x1 n r. p+ q+ S, @- f& G import dask 6 d7 h4 F( w2 D7 `' J% w9 O9 |; b( |2 W import numpy as np+ D: A% N7 E2 \! I& B from dask.diagnostics import ProgressBar2 h3 W. v4 X e) \/ ^& i6 h4 \ % @ N6 r7 e6 w; m def norm_fit(data): 2 L0 H, `; ?# S" p/ C7 \6 G; `5 p loc, scale = st.norm.fit(data)4 L# |( }& F% o; E3 ~$ ] return np.array([loc, scale]) 2 p$ L* y: F+ s4 j0 x* A& H$ o& W6 [6 i0 [* E! E, |" d rs = np.random.RandomState(0) : l6 h( R4 |0 m x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) * Z0 ?- Y4 k% P4 ^; R. ?" Q7 _ x = x.chunk({"lev":5, "lat":100,"lon":100}) , M0 P' ^! V F# v2 b6 l& }& u- W5 a* I" l #使用apply_ufunc计算,并用dask的并行计算 , P6 `2 Y6 U# x) b( L) [+ f# O; u! b result = xr.apply_ufunc(norm_fit, : ~$ i! w1 \% |0 s/ Z x,; f! c. N; C, o4 P input_core_dims=[["time"]], 7 N: t( x2 V Z$ L/ Z4 F% l output_core_dims=[["paras"]], 3 f3 ]4 W" a R% w3 O2 [ dask="parallelized",5 K. e% m, x+ B" w; c) M output_dtypes=[np.float],3 @6 g2 Y/ F- C- h: O dask_gufunc_kwargs={output_sizes:{"paras":2}} # E5 e3 ?# |4 Y3 l3 n5 B ) 1 l$ ~( Y% q- E8 k+ }$ T1 z! E: b! y' m, `% k: ?6 a& A# X #compute进行真实的计算并显示进度 4 D( d9 `+ m) Q with ProgressBar():' w% C2 ^$ g5 Q2 r result = results.compute()4 b- D/ l+ w% j- |6 x! r% J 3 `% ~2 h9 x7 V9 u2 d #结果冲命名保存到nc文件 8 }" t, U: e7 F! j3 N# {& {7 m result = result.rename("norm_paras")3 Y7 e8 ]2 i) |6 W' w2 a$ g result = result.to_netcdf("norm_fit_paras.nc",compute=False) ; ~2 f' ^2 M4 U0 g8 i' p with ProgressBar(): 1 I3 I; W- H3 o2 k result.compute()
* p2 T6 U0 J x- E

转自:气海同途

4 i- V1 f4 b0 S+ b& M

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-4-7
快速回复 返回顶部 返回列表