Passer au contenu principal

Dask on curnagl

In order to use Dask in Curnagl you have to use the following packages:

  • dask
  • dask-jobqueue

Note: please make sur to use version 2022.11.0 or later. Previous versions have some bugs on worker-nodes that make them very slow when using several threads.

Dask makes easy to parallelize computations, you can run computational intensive methods on parallel by assign those computations to different CPU resources.

For example:

def cpu_intensive_method(x, y , z):
    # CPU computations
    return x + 1


futures = []
for x,y,z in zip(list_x, list_y, list_z):
	future = client.submit(inc, x, y, z)
    futures.append(future)

result = client.gather(futures)

This documentation proposes two types of uses:

  • LocalCluster: this mode is very simple and can be used to easily parallelize computations by submitting just one job in the cluster. This is a good starting point
  • SlurmCluster: this mode handle more parallelisim by distributing work on several machines. It can handle load and submit automatically new jobs for increasing paralellisim

Local cluster

Python script looks like:

import dask
from dask.distributed import Client, LocalCluster

def compute(x):
  ""CPU demanding code"
  

if __name__ == "__main__":
  
	cluster = LocalCluster()
	client = Client(address=cluster)
    
    for x in parameters:
      future = client.submit(inc, x)
      futures.append(future)
      
    result = client.gather(futures)

Call to LocalCluster and Client should be inside the block if __name__ == "__main__".  You can check the following link: https://docs.dask.org/en/stable/scheduling.html

The method LocalCluster() will deploy N workers, each worker using T threads such that NxT is equal to the number of cores reserved by SLURM. Dask will balance the number of workers and the number of threads per worker, the goal is to take advantage of GIL free workloads such as Numpy and Pandas.

SLURM script:

#SBATCH --job-name dask_job
#SBATCH --ntasks 16
#SBATCH -N 1
#SBATCH --partition cpu
#SBATCH --cpus-per-task 1
#SBATCH --time 01:00:00
#SBATCH --output=dask_job-%j.out
#SBATCH --error=dask_job%j.error


python script.py

Make sure to include the parameter -N 1 otherwise SLURM will allocate the tasks on different nodes and it will make Dask local cluster fail. You should adapt the parameter --ntasks, this will be equivalent to the number of dask workers you want to deploy and the level of parallelism you want. As we are using just one machine we can choose between 1 and 48. Just have in mind that the smallest the number the faster your job will start. You can choose to run with less processes but for a longer time.

Slurm cluster

The python script can be launched directly from the frontend but you need to keep you session open with tools such as tmux or screen otherwise your jobs will be cancelled. 

In your script you should put something like:

import dask
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

def compute(x):
  ""CPU demanding code"
  

if __name__ == "__main__":
  
	cluster = SLURMCluster(cores=8, memory="40GB")
    client = Client(cluster)
    
    cluster.adapt(maximum_jobs=5, interval="10000 ms")
    for x in parameters:
      future = client.submit(inc, x)
      futures.append(future)
      
    result = client.gather(futures)

In this case DASK will launch jobs with 8 cores and 40GB of memory.  The parameter memory is mandatory, you can specify either cores or both n_workers.  There are two methods to launch jobs: adapt and scale. adapt will launch jobs and kill jobs by taking into account the load of your computation and how many computation in parallel you can run. You can put a limit on the number of jobs that will be launched. The parameter interval is necessary and needs to be set to 10000 ms to avoid killing jobs to early.