Dask on curnagl
In order to use Dask in Curnagl you have to use the following packages:
Note: please make sure to use version 2022.11.0 or later. Previous versions have some bugs on worker-nodes that make them very slow when using several threads.
Dask makes easy to parallelize computations, you can run computational intensive methods on parallel by assigning those computations to different CPU resources.
def cpu_intensive_method(x, y , z): # CPU computations return x + 1 futures =  for x,y,z in zip(list_x, list_y, list_z): future = client.submit(cpu_intensive_method, x, y, z) futures.append(future) result = client.gather(futures)
This documentation proposes two types of use:
- LocalCluster: this mode is very simple and can be used to easily parallelize computations by submitting just one job in the cluster. This is a good starting point
- SlurmCluster: this mode handle more parallelisim by distributing work on several machines. It can handle load and submit automatically new jobs for increasing paralellisim
Python script looks like:
import dask from dask.distributed import Client, LocalCluster def compute(x): ""CPU demanding code" if __name__ == "__main__": cluster = LocalCluster() client = Client(address=cluster) parameters = [1, 2, 3, 4] for x in parameters: future = client.submit(inc, x) futures.append(future) result = client.gather(futures)
Call to LocalCluster and Client should be put inside the block if __name__ == "__main__". For more information, you can check the following link: https://docs.dask.org/en/stable/scheduling.html
The method LocalCluster() will deploy N workers, each worker using T threads such that NxT is equal to the number of cores reserved by SLURM. Dask will balance the number of workers and the number of threads per worker, the goal is to take advantage of GIL free workloads such as Numpy and Pandas.
#SBATCH --job-name dask_job #SBATCH --ntasks 16 #SBATCH -N 1 #SBATCH --partition cpu #SBATCH --cpus-per-task 1 #SBATCH --time 01:00:00 #SBATCH --output=dask_job-%j.out #SBATCH --error=dask_job%j.error python script.py
Make sure to include the parameter
-N 1 otherwise SLURM will allocate tasks on different nodes and it will make Dask local cluster fail. You should adapt the parameter
--ntasks, as we are using just one machine we can choose between 1 and 48. Just have in mind that the smallest the number the faster your job will start. You can choose to run with less processes but for a longer time.
The python script can be launched directly from the frontend but you need to keep you session open with tools such as
screen otherwise your jobs will be cancelled.
In your Python script you should put something like:
import dask from dask.distributed import Client from dask_jobqueue import SLURMCluster def compute(x): ""CPU demanding code" if __name__ == "__main__": cluster = SLURMCluster(cores=8, memory="40GB") client = Client(cluster) cluster.adapt(maximum_jobs=5, interval="10000 ms") for x in parameters: future = client.submit(inc, x) futures.append(future) result = client.gather(futures)
In this case DASK will launch jobs with 8 cores and 40GB of memory. The parameters
cores are mandatory. There are two methods to launch jobs: adapt and scale.
adapt will launch/kill jobs by taking into account the load of your computation and how many computations in parallel you can run. You can put a limit on the number of jobs that will be launched. The parameter
interval is necessary and needs to be set to
10000 ms to avoid killing jobs too early.
scale will create a static infrastructure composed of a fix number of jobs, specified with the parameters jobs. Example
This will launch 10 jobs independent from the load and the amount of computation you generate.
Some facts about Slurm jobs and DASK
You need to have in mind that the computation will depend on the availability of resources, if jobs are not running your computation will not start. So if you think that your computation is stuck, please verify first that jobs have been submitted and that they are running using the command:
squeue -u $USER.
By default the walltime is set to 30 min, you can use the parameter:
walltime if you think that each individual computation will last more than the default time.
Slurm files will be generated under the same directory where you launch your python command.
Jobs will killed by Dask when there is no more computation to be done. If you see the message:
slurmstepd: error: *** JOB 25260254 ON dna051 CANCELLED AT 2023-03-01T11:00:19 ***
It is completely normal and it does not mean that there was an error in your computation.
Optimal number of workers
Both LocalCluster or SLURMCluster, will automatically balance the number of workers and the number of threads per worker. You can choose the number of workers using the parameter
n_workers. If most of the computation relies on Numpy or Pandas, it is preferable to have only one worker
n_workers=1. If most of the computation is pure Python code you should use as much workers as possible. Example:
SLURMCluster(cores=8, memory="40GB", n_workers=8)
Here, it is an example code which illustrates the use of Dask. The code runs 40 multiplications of random matrices of size NXN, each computation returns the sum of all the elements of the result matrix:
import os import time import numpy as np from dask.distributed import Client, LocalCluster from dask_jobqueue import SLURMCluster SIZE = 9192 def compute(tag): np.random.seed(tag) A = np.random.random((SIZE,SIZE)) B = np.random.random((SIZE,SIZE)) start = time.time() C = np.dot(A,B) end = time.time() elapsed = end-start return elapsed, np.sum(C) if __name__ == "__main__": # cluster = LocalCluster(n_workers=int(os.environ['SLURM_NTASKS'])) cluster = SLURMCluster(memory="40GB", n_workers=8) client = Client(cluster) cluster.adapt(maximum_jobs=5, interval="10000 ms") N_ITER = 40 futures =  for i in range(N_ITER): future = client.submit(compute, i) futures.append(future) results = client.gather(futures) print(results)