收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

3 p+ b5 x% S5 b2 l
2 Y' Y1 P9 p# i4 I0 T" |/ C4 ^5 ^

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

7 s8 c# L" m1 `- P7 ^" K! X5 a0 X6 A( _& m! ?3 u

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

8 Z! q! g* {) ]3 @7 d' b7 G8 S 6 Y. N6 b# J8 n

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

! y' u' R6 N3 U# V: I; t

其中几个关键点解释一下:

! ^3 Z( u" ~. v- I

(1)首先定义拟合正太分布的函数

& P# L) R6 y- } |. X' U7 [; z
def norm_fit(data): ' n0 j; |; e! c9 n& c loc, scale = st.norm.fit(data) 2 r7 j2 n0 D& E6 D return np.array([loc, scale])
0 B3 i* ] g p# t+ F' n8 T% `# X

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

' @6 p$ w2 ^ u6 e9 }& [

(2)xarray分块

, N9 M) [& m+ Z% y- J: s
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 7 u) K' a8 l6 q4 a) w3 Z! ] x = x.chunk({"lev":5, "lat":100,"lon":100})
" }* C5 J" X3 h& S

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

5 l( W9 Y$ L2 T+ \* L# O

(3)xarray.apply_ufunc函数

& F$ D$ B4 r* v) ^# ^, [9 V- v3 T! T
result = xr.apply_ufunc(norm_fit, 4 X) q" n# T9 Z; O. a P- R+ M! }; C5 Y x,6 w- i& n7 u6 Q9 o F input_core_dims=[["time"]], . W- ^/ U0 I; m% f2 e- o+ Q output_core_dims=[["paras"]], + T) M/ _/ O. }/ Q dask="parallelized",- h7 P% A9 [1 I1 s, a/ c output_dtypes=[np.float]," Z, d0 v+ ?* m. \5 t. y6 A( y, [ dask_gufunc_kwargs={output_sizes:{"paras":2}} # q4 K9 g6 D0 a9 n, ]$ C1 H )
* y/ K, T$ G) c" _: ]6 H

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

9 H; K# w) Y8 }! M! y( X/ A" h& ?& Q P% ?4 ]- o

以下是全部代码:

4 h, M4 A5 A( H. a. v/ R
from scipy import stats as st' B8 x2 A" L. N , \6 W# H' n8 u' e8 t5 I9 X import xarray as xr ) j2 x7 e' F+ B9 V' E import dask1 w- Y( d8 B; q import numpy as np: w" R. a. ~) [. S+ l9 j5 b& { from dask.diagnostics import ProgressBar) K, Z) i9 @( I- s. _ 3 a3 _% J% r1 B( m& v def norm_fit(data): 8 f+ d6 z `; l) g2 l! _+ w4 x- g loc, scale = st.norm.fit(data)# @! j) F9 i9 S return np.array([loc, scale])* I6 \/ S# L9 j, y( v) {4 C 7 X2 g) F/ s6 ], L9 Q9 Z& | rs = np.random.RandomState(0) ; _4 K# Z# ?- z9 M& x7 T) V8 e x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) & j: f( W- D- ? ~" v x = x.chunk({"lev":5, "lat":100,"lon":100})# v1 n! ` X2 R9 m9 m ' J# W% V" C( A; C* Z1 U b) i #使用apply_ufunc计算,并用dask的并行计算) ~5 s p" r% F0 q- G result = xr.apply_ufunc(norm_fit, N( b4 w6 E n% M x, ( N! E1 [; [! l5 X. a: j input_core_dims=[["time"]],7 G# y& C# s' g* K output_core_dims=[["paras"]],5 a: C- t$ I6 [; z6 _% \: H m dask="parallelized", 5 d% e) e* _/ j5 C8 E/ ~ l output_dtypes=[np.float], 2 q9 K) s' v9 `9 T6 j9 C dask_gufunc_kwargs={output_sizes:{"paras":2}}9 u9 X$ p+ P+ H* p/ E# Q ); N( [6 H1 m" O' t! \0 I 9 a4 r2 ^; h3 e6 k6 v( G0 |7 N #compute进行真实的计算并显示进度% I, r0 f9 J) x' A* L. F+ _8 E with ProgressBar():% d0 \% J4 }7 [* z- ]8 a s4 |& [ result = results.compute()6 v4 S l" h& p( V9 O6 J$ ] W0 `0 c& W7 s #结果冲命名保存到nc文件% }9 m4 b4 T: f3 z5 G$ Y: ] result = result.rename("norm_paras")( z2 Y3 M0 S3 S result = result.to_netcdf("norm_fit_paras.nc",compute=False) L6 d F* M' f- N: }* v with ProgressBar():4 b! Q" z9 h* F2 v: F' N result.compute()
- r1 i6 M8 ~! V9 ^0 j% W0 ~- F

转自:气海同途

9 b, B0 o5 d2 ^9 q7 g

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-4-7
快速回复 返回顶部 返回列表