收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

4 F. U; N4 i; _
, Z' h# c$ r* F1 |& \8 L& A ~

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

0 m) c, O' L. p4 s. z' d & X0 I/ Q$ s6 y; d, m

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

# A- J# w9 |% r $ ?, P0 P% x" b; ]. l

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

+ ]) I$ E; w& y

其中几个关键点解释一下:

# @* P$ _; E; s

(1)首先定义拟合正太分布的函数

" j7 o* l2 e: a9 j1 ]$ ^- h
def norm_fit(data):7 o% P y8 I7 K. d9 t1 U% x( L loc, scale = st.norm.fit(data) 9 W' O+ z& t+ A( `- E7 e1 |) G return np.array([loc, scale])
) h: n! ?$ v. s8 U- M

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

: ^' Q9 C& [# n' S1 s! v7 x

(2)xarray分块

4 E# `5 b! E2 B0 v. o2 y! s
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 5 Z7 N. T8 ^6 T6 e" a; g/ C) ~! B x = x.chunk({"lev":5, "lat":100,"lon":100})
5 `! b& X2 F0 E- n) j) T

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

) B+ D: l6 o! M/ J) G2 N

(3)xarray.apply_ufunc函数

4 g' q+ c5 X5 L1 y/ H2 C. e0 k: [
result = xr.apply_ufunc(norm_fit,% x+ c7 i5 Z. E& `5 \! ~; ` x, . w) a3 {) |% ?! N/ W- P5 \ input_core_dims=[["time"]],7 A) p& y4 r7 |7 D output_core_dims=[["paras"]], ! @' m& P; Q- S2 n! c$ q m dask="parallelized",) O# i; ?# L! s+ X$ |' a- Z output_dtypes=[np.float], / a1 G* `" c9 o w& S; E9 z! f dask_gufunc_kwargs={output_sizes:{"paras":2}} ; M* ? h* T& ] )
9 Q9 ?" u+ F9 ]4 L l

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

5 u& y/ a+ }4 I 2 q) E$ n& d: d0 `' O/ M* j

以下是全部代码:

$ J* R* M% Q- w5 q
from scipy import stats as st 2 l, i. n5 f: X% @& \ ( k. N! Z W- Q' r% W8 R import xarray as xr3 B9 ^2 n' K. o import dask * {* {5 I, f' X3 e* t import numpy as np4 g7 y8 w/ W7 k' k* u5 e from dask.diagnostics import ProgressBar 2 H6 s3 Y4 H4 |. J4 E; E/ y' z! e0 R# d% U3 m def norm_fit(data):' l/ Y8 [$ h& z c. T loc, scale = st.norm.fit(data) + G5 Z+ {4 A8 |; s+ ? return np.array([loc, scale]) : a$ Q" _1 C9 R# g' h* O# A; m! I . \6 M( I' _- G$ r6 I3 H" A# t rs = np.random.RandomState(0) ; z9 m5 I7 Q# J* E5 `$ t+ L) B" G x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])1 N) K/ J1 ^+ y3 q x = x.chunk({"lev":5, "lat":100,"lon":100}) * V0 E" R! C& v0 w# v$ ? 7 V; \) ` [1 P5 Z+ i #使用apply_ufunc计算,并用dask的并行计算. m* R/ L B1 Q" ]3 V# K result = xr.apply_ufunc(norm_fit,- D) Q; D+ G A: f1 a4 o9 Q x,/ C0 D; b: r, L( [. v input_core_dims=[["time"]],2 k" ]/ h+ S4 f- p$ |* U7 p: P3 | output_core_dims=[["paras"]],( V' X4 m2 x' m& D. D% j dask="parallelized", 3 T0 ?: F8 d% g output_dtypes=[np.float], 2 w# c; ~1 e0 w( I; ? dask_gufunc_kwargs={output_sizes:{"paras":2}} $ Q5 {+ [+ I2 P. R )0 I. [4 _$ m* O' e0 U, M 3 g! A# W8 N+ Y* u/ g: _ #compute进行真实的计算并显示进度 ; u$ h& w! u) y3 m with ProgressBar(): 3 G, l* y2 p3 `( [( F" I5 G' c& N result = results.compute() . m/ b4 ?* ^$ D0 d% t" m% {5 X( c( G6 C) u% { #结果冲命名保存到nc文件 9 O" p; Y J/ T4 K* a: v result = result.rename("norm_paras") & i% a3 e3 U0 R N( q' } result = result.to_netcdf("norm_fit_paras.nc",compute=False) ) D) ^( }9 I, }# x with ProgressBar(): , ^6 B& Z$ C0 c8 N6 x6 n [" P0 H result.compute()
0 u3 h/ C/ l! m. [

转自:气海同途

! O% \. J; I2 e# r

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-4-7
快速回复 返回顶部 返回列表