for looking into this further and for figuring out the cause of the problem. , does this mean that I should submit a dask issue?
dask.async.RuntimeError: NetCDF: HDF error on xarray to_netcdf
Dask appears to be failing on serialization following a ds.to_netcdef() via a NETCDF: HDF error.
Excerpted error below:
Traceback (most recent call last): File "reduce_dispersion_file.py", line 40, in <module> if __name__ == "__main__": File "reduce_dispersion_file.py", line 36, in reduce_dispersion_file with timeit_context('output to disk'): File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/xarray/core/dataset.py", line 791, in to_netcdf engine=engine, encoding=encoding) File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/xarray/backends/api.py", line 356, in to_netcdf dataset.dump_to_store(store, sync=sync, encoding=encoding) File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/xarray/core/dataset.py", line 739, in dump_to_store store.sync() File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/xarray/backends/netCDF4_.py", line 283, in sync super(NetCDF4DataStore, self).sync() File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/xarray/backends/common.py", line 186, in sync self.writer.sync() File "/users/pwolfram/envs/LIGHT_analysis/lib/python2.7/site-packages/xarray/backends/common.py", line 165, in sync da.store(self.sources, self.targets) File "/users/pwolfram/lib/python2.7/site-packages/dask/array/core.py", line 712, in store Array._get(dsk, keys, **kwargs) File "/users/pwolfram/lib/python2.7/site-packages/dask/base.py", line 43, in _get return get(dsk2, keys, **kwargs) File "/users/pwolfram/lib/python2.7/site-packages/dask/threaded.py", line 57, in get **kwargs) File "/users/pwolfram/lib/python2.7/site-packages/dask/async.py", line 481, in get_async raise(remote_exception(res, tb)) dask.async.RuntimeError: NetCDF: HDF error Traceback --------- File "/users/pwolfram/lib/python2.7/site-packages/dask/async.py", line 264, in execute_task result = _execute_task(task, data) File "/users/pwolfram/lib/python2.7/site-packages/dask/async.py", line 246, in _execute_task return func(*args2) File "/users/pwolfram/lib/python2.7/site-packages/dask/array/core.py", line 1954, in store out[index] = np.asanyarray(x) File "netCDF4/_netCDF4.pyx", line 3678, in netCDF4._netCDF4.Variable.__setitem__ (netCDF4/_netCDF4.c:37215) File "netCDF4/_netCDF4.pyx", line 3887, in netCDF4._netCDF4.Variable._put (netCDF4/_netCDF4.c:38907) </module>
Script used: https://gist.github.com/98acaa31a4533b490f78 Full output: https://gist.github.com/248efce774ad08cb1dd6
- 点赞 评论 复制链接分享
https://github.com/dask/dask/pull/1053点赞 评论 复制链接分享
, I'm assuming there needs to be an xarray PR corresponding to Matt's merged PR, is that correct? Do you think this will be a difficult xarray change?点赞 评论 复制链接分享
This should be pretty easy -- we'll just need to add
lock=threading.Lock()to this line: https://github.com/pydata/xarray/blob/v0.7.2/xarray/backends/common.py#L165
The only subtlety is that this needs to be done in a way that is dependent on the version of dask, because the keyword argument is new -- something like
if dask.__version__ > '0.8.1'.点赞 评论 复制链接分享
Thanks ! I ran into this problem again with this morning and as you note I had multiple arrays in the file that were being written. PR https://github.com/pydata/xarray/pull/800 implements your suggestion and should hopefully resolve the issue, although it is not clear to me how to build a reproducible test case-- perhaps write a file with a ton of random arrays to crash it out on the write? Any thoughts or suggestions you have on this would be very helpful.
Note that the PR is preliminary until I can verify that it resolves the issue via testing.点赞 评论 复制链接分享
Note, also waiting on
daskgoing to 0.8.2 version number for the full fix.点赞 评论 复制链接分享
I'm going to close this for now but will reopen it if the issue arises again following the dask release.点赞 评论 复制链接分享
cc点赞 评论 复制链接分享
I should note that serialization also does not appear to be robust under reshaping the data via
ds = ds.transpose('Nt','Nt-1','Nr','Nb','Nc')as well as rechunking. The input data stream was previously generated via a call to
ds.to_netcdfin another script using xarray.点赞 评论 复制链接分享
There are a large number of files (1320) where
nfiles = 120and
len(dslist)=11, so perhaps this is an issue with opening a large number of files as noted by .点赞 评论 复制链接分享
1024 might be a common open file handle limit. Some things to try to isolate the issue: 1. Try this with
dask.set_globals(get=dask.async.get_sync)to turn off threading 2. Try just opening all of the files and see if the NetCDF error presents itself under normal operation点赞 评论 复制链接分享
Quick question , for 2, are you proposing a script that just opens all the files, e.g., something like this
# get full xr dataset dslist =  nfiles = len(glob.glob('dispersion_calcs_rlzn0*layerrange_0000-0000.nc')) for i in np.arange(nfiles): ds = xr.open_mfdataset('dispersion_calcs_rlzn%04d_*nc'%(i)) dslist.append(ds) dstotal = xr.concat(dslist,'Nr') # do an operation spanning Nr space and Nb space print dstotal.dtdays.values
dtdaysspans the all the files? I'm running it now.点赞 评论 复制链接分享
Sure. I'm not proposing any particular approach. I'm just supporting your previous idea that maybe the problem is having too many open file handles. It would be good to check this before diving into threading or concurrency issues.点赞 评论 复制链接分享
Agreed. I'll let you know what I find out. Thanks .点赞 评论 复制链接分享
Test 2 passed, so it doesn't appear to be due to too many open file handles.点赞 评论 复制链接分享
, For option 1, should the command be
dask.set_options(get=dask.async.get_sync)? I'm on 0.8.0点赞 评论 复制链接分享
Yes, my apologies for the typo.点赞 评论 复制链接分享
I'm pretty sure we now have a thread lock around all writes to NetCDF files, but it's possible that isn't aggressive enough (maybe we can't safely read and write a different file at the same time?). If your script works with synchronous execution I'll take another look.点赞 评论 复制链接分享
I can't fully confirm that the above scripts works with synchronous execution because the job ran out of its 16hr run time. However, it does appear to be the case that forcing synchronous execution resolves potential issues because previous runs of the script crashed and this one did not. I'll have to try more cases with synchronous execution, especially over the next half week, to see if I encounter more issues but am suspicious this is the problem.
and I noted that the netCDF reader has problems when threading is on when we were using distributed, so this appears to be a likely candidate. We got the same
NetCDF: HDF errorerror as above, and were able to resolve the issue by forcing distributed to work synchronously (non-threaded). should feel free to correct me if I've miss-represented our findings yesterday.
I'm suspicious that the netCDF reader is not thread safe and may not have been compiled as such (http://hdf-forum.184993.n3.nabble.com/Activate-thread-safe-and-enable-cxx-in-HDF5-td2993951.html) but there appear other potential issues that could be part of the problem, e.g., https://github.com/Unidata/netcdf4-python/issues/279 because I am doing so many reads. It may also be possible, as you note , that the tread locks aren't aggressive enough.
It would probably be good to come up with some type of testing strategy to better isolate the problem... I'll have to give this more thought.点赞 评论 复制链接分享
To be clear, we ran into the
NetCDF: HDF errorerror when having multiple threads in the same process open-read-close many different files. I don't think there was any concurrent access of the same file. The problem went away when we switched to using processes rather than threads.点赞 评论 复制链接分享
I did a little digging into this and I'm pretty sure the issue here is that HDF5 cannot do multi-threading -- at all. Moreover, many HDF5 builds are not thread safe.
Right now, we use a single shared lock for all reads with xarray, but for writes we rely on dask.array.store, which only uses different locks for each array it writes. Because 's HDF5 file includes multiple variables, each of these gets written with their own thread lock -- which means we end up writing to the same file simultaneously from multiple threads.
So what we could really use here is a
dask.array.from_array) that lets us insist on a using a shared lock when we're writing HDF5 files. Also, we may need to share that same lock between reading and writing data -- I'm not 100% sure. But at the very least we definitely need a lock to stop HDF5 from trying to do multi-threaded writes, whether that's to the same or different files.点赞 评论 复制链接分享