Through xarray
This example shows how to use dask through xarray on Jeanzay. You will need a python environment with xarray installed to get it work. The easiest way to create an environment is to load the anaconda package and create your own environment -
module load anaconda-py3/2021.05
conda create --prefix=./xarray python xarray netCDF4 pandas numpy scipy
The above conda command will create a python environment in a directory called xarray
in the directory where the command is issued. The module anaconda-py3/2021.05
will be needed during runtime too. An example submission script is following -
#!/bin/bash
#SBATCH --partition=prepost
#SBATCH --account=your_account
#SBATCH --job-name=xarray
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=2
#SBATCH --hint=nomultithread
#SBATCH --time=20:00:00
#SBATCH --output=xarray_%j.out
#SBATCH --error=xarray_%j.out
cd /path/to/your/scratch/folder
# Loading module
module purge
module load anaconda-py3/2021.05
# change the path to your own conda environment
conda activate /path/to/conda/environment
echo `which python`
python example.py```
The submission script `example.slurm` is submitted through a **single core** process. That **single core** process then spawn the distributed computing processes. Typically, one can use `prepost` partition if the required time is less than 20 hours. If more, the job may be submitted through a `cpu_p1` node. The actual scaling of cluster is done through the `example.py` script.
```python
import dask
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(
project='your_account',
cores=40,
processes=40,
memory='160GB',
walltime='20:00:00',
interface='ib0',
death_timeout=300,
job_extra=['--job-name=worker', '--hint=nomultithread'],
env={'HDF5_USE_FILE_LOCKING':'FALSE'}
)
cluster.scale(jobs=4) # jobs=4 means total 4 nodes, 4*40 = 160 processes
from dask.distributed import Client
client = Client(cluster)
dask.config.set({'distributed.comm.timeouts.connect': '60s'})
# print(client) # Will throw warning if all asked jobs are not started e.g., jeanzay
# Other libraries
import xarray as xr
import numpy as np
# Setting seed for reproducibility
np.random.seed(42)
# Helper functions
# numpy ufunc
def calc_ci_lower(x):
ci = np.quantile(np.sort(np.random.choice(x, size=(len(x), 10000)), axis=0), 0.025, axis=1)
return(ci)
def calc_ci_lower_array(arr, axis=0):
ci = np.apply_along_axis(calc_ci_lower, axis, arr)
return(ci)
# Computation on data
ds = xr.open_dataset(
'input_data_file.nc',
chunks={'nnode':1000}
)
# Now we apply a ufunc
ci = xr.apply_ufunc(
calc_ci_lower_array,
ds['maxelev'],
dask='parallelized',
output_dtypes=[float]
)
ci.to_netcdf('ci_lower.nc')
client.close()
cluster.close()
The example.py
file shows an example script for computing confidence interval through bootstrapping over a very large dataset. The definition for a single machine in the cluster is defined in the variable cluster
. In example.py
, a single cluster is defined as a 40-core [40-process] node (e.g., no multithreading). The number of node is scalled using cluster.scale(jobs=4)
, where jobs=4
means total 4 nodes, i.e., 4*40 = 160 processes. The dask_jobqueue
will spawn computing nodes one by one based on avaiability by SLURM, and distribute the job.
Note
- If 4 nodes are demanded, but only one node is allocated by the SLURM, then the job will get started with one node only, and more node will be/or not be added based on their availability while running. This implies that, during busy hours, all required nodes may not be alloted.
- The single core task does not do anything other than managing the job. During busy hours, it is not unlikely that a compute node is allocated hours after starting of the single core job. Hence, it is necessary to keep some provision of extra time for the single core job.