Through xarray

This example shows how to use dask through xarray on Jeanzay. You will need a python environment with xarray installed to get it work. The easiest way to create an environment is to load the anaconda package and create your own environment -

module load anaconda-py3/2021.05
conda create --prefix=./xarray python xarray netCDF4 pandas numpy scipy

The above conda command will create a python environment in a directory called xarray in the directory where the command is issued. The module anaconda-py3/2021.05 will be needed during runtime too. An example submission script is following -

#!/bin/bash
#SBATCH --partition=prepost
#SBATCH --account=your_account
#SBATCH --job-name=xarray
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=2
#SBATCH --hint=nomultithread
#SBATCH --time=20:00:00
#SBATCH --output=xarray_%j.out
#SBATCH --error=xarray_%j.out

cd /path/to/your/scratch/folder

# Loading module
module purge
module load anaconda-py3/2021.05

# change the path to your own conda environment
conda activate /path/to/conda/environment
echo `which python`

python example.py```

The submission script `example.slurm` is submitted through a **single core** process. That **single core** process then spawn the distributed computing processes. Typically, one can use `prepost` partition if the required time is less than 20 hours. If more, the job may be submitted through a `cpu_p1` node. The actual scaling of cluster is done through the `example.py` script.

```python
import dask
from dask_jobqueue import SLURMCluster 

cluster = SLURMCluster(
    project='your_account', 
    cores=40, 
    processes=40,
    memory='160GB', 
    walltime='20:00:00', 
    interface='ib0', 
    death_timeout=300, 
    job_extra=['--job-name=worker', '--hint=nomultithread'],
    env={'HDF5_USE_FILE_LOCKING':'FALSE'}
)
cluster.scale(jobs=4) # jobs=4 means total 4 nodes, 4*40 = 160 processes

from dask.distributed import Client 
client = Client(cluster) 
dask.config.set({'distributed.comm.timeouts.connect': '60s'})
# print(client) # Will throw warning if all asked jobs are not started e.g., jeanzay

# Other libraries
import xarray as xr
import numpy as np

# Setting seed for reproducibility
np.random.seed(42)

# Helper functions
# numpy ufunc
def calc_ci_lower(x):
    ci = np.quantile(np.sort(np.random.choice(x, size=(len(x), 10000)), axis=0), 0.025, axis=1)
    return(ci)

def calc_ci_lower_array(arr, axis=0):
    ci = np.apply_along_axis(calc_ci_lower, axis, arr)
    return(ci)

# Computation on data
ds = xr.open_dataset(
    'input_data_file.nc', 
    chunks={'nnode':1000}
)

# Now we apply a ufunc
ci = xr.apply_ufunc(
    calc_ci_lower_array, 
    ds['maxelev'], 
    dask='parallelized', 
    output_dtypes=[float]
)

ci.to_netcdf('ci_lower.nc')

client.close()
cluster.close()

The example.py file shows an example script for computing confidence interval through bootstrapping over a very large dataset. The definition for a single machine in the cluster is defined in the variable cluster. In example.py, a single cluster is defined as a 40-core [40-process] node (e.g., no multithreading). The number of node is scalled using cluster.scale(jobs=4), where jobs=4 means total 4 nodes, i.e., 4*40 = 160 processes. The dask_jobqueue will spawn computing nodes one by one based on avaiability by SLURM, and distribute the job.

Note

  • If 4 nodes are demanded, but only one node is allocated by the SLURM, then the job will get started with one node only, and more node will be/or not be added based on their availability while running. This implies that, during busy hours, all required nodes may not be alloted.
  • The single core task does not do anything other than managing the job. During busy hours, it is not unlikely that a compute node is allocated hours after starting of the single core job. Hence, it is necessary to keep some provision of extra time for the single core job.