TensorFlow single node examples¶
To run the examples you will need to first install click
in your environment.
module load python/3.7.5 &&\
pip install click
Then you need to clone the jean-zay repo in your $WORK
dir:
cd $WORK &&\
git clone https://github.com/jean-zay-users/jean-zay-doc.git
Classical examples¶
For the single GPU job you can do:
cd jean-zay-doc/docs/examples/tf/tf_simple
sbatch mnist_submission_script.slurm
For the multi GPU job you can do:
jean-zay-doc/docs/examples/tf/tf_simple
sbatch mnist_submission_script_multi_gpus.slurm
The training code used in this example is:
# all taken from https://www.tensorflow.org/guide/keras/functional
import click
@click.command()
@click.option(
'cuda_visible_devices',
'-gpus',
default=None,
help='The GPUs you want visible for this task, comma separated. Defaults to all GPUs visible',
)
@click.option(
'save',
'-s',
'--save',
is_flag=True,
help='Whether you want to save the model or not',
)
def train_dense_model_click(cuda_visible_devices, save):
return train_dense_model(cuda_visible_devices, save, batch_size=64)
def train_dense_model(cuda_visible_devices, save, batch_size):
# limit imports oustide the call to the function, in order to launch quickly
# when using dask
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
if cuda_visible_devices is not None:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = cuda_visible_devices
# model building
tf.keras.backend.clear_session() # For easy reset of notebook state.
inputs = keras.Input(shape=(784,), name='img')
x = layers.Dense(64, activation='relu')(inputs)
x = layers.Dense(64, activation='relu')(x)
outputs = layers.Dense(10)(x)
model = keras.Model(inputs=inputs, outputs=outputs, name='mnist_model')
# training and inference
# network is not reachable, so we use random data
x_train = tf.random.normal((60000, 784), dtype='float32')
x_test = tf.random.normal((10000, 784), dtype='float32')
y_train = tf.random.uniform((60000,), minval=0, maxval=10, dtype='int32')
y_test = tf.random.uniform((10000,), minval=0, maxval=10, dtype='int32')
model.compile(loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=keras.optimizers.RMSprop(),
metrics=['accuracy'])
history = model.fit(x_train, y_train,
batch_size=batch_size,
epochs=5,
validation_split=0.2)
test_scores = model.evaluate(x_test, y_test, verbose=2)
print('Test loss:', test_scores[0])
print('Test accuracy:', test_scores[1])
# saving
if save:
model.save(os.environ['SCRATCH'])
return True
if __name__ == '__main__':
train_dense_model_click()
and the script used to launch a single GPU job is:
#!/bin/bash
#SBATCH --job-name=tf_mnist # job name
#SBATCH --ntasks=1 # number of MP tasks
#SBATCH --ntasks-per-node=1 # number of MPI tasks per node
#SBATCH --gres=gpu:1 # number of GPUs per node
#SBATCH --cpus-per-task=10 # number of cores per tasks
# /!\ Caution, in the following line, "multithread" refers to hyperthreading.
#SBATCH --hint=nomultithread # we get physical cores not logical
#SBATCH --distribution=block:block # we pin the tasks on contiguous cores
#SBATCH --time=2:00:00 # maximum execution time (HH:MM:SS)
#SBATCH --output=tf_mnist%j.out # output file name
#SBATCH --error=tf_mnist%j.out # error file name
#SBATCH --qos=qos_gpu-dev # we are submitting a test job
set -x
cd ${SLURM_SUBMIT_DIR}
module purge
module load tensorflow-gpu/py3/2.1.0
srun python ./mnist_example.py -s&
wait
to launch the same code using a multiGPU configuration, use the following script:
#!/bin/bash
#SBATCH --job-name=tf_mnist_multi_gpus # job name
#SBATCH --ntasks=1 # number of MP tasks
#SBATCH --ntasks-per-node=1 # number of MPI tasks per node
#SBATCH --gres=gpu:1 # number of GPUs per node
#SBATCH --cpus-per-task=10 # number of cores per tasks
# /!\ Caution, in the following line, "multithread" refers to hyperthreading.
#SBATCH --hint=nomultithread # we get physical cores not logical
#SBATCH --distribution=block:block # we pin the tasks on contiguous cores
#SBATCH --time=2:00:00 # maximum execution time (HH:MM:SS)
#SBATCH --output=tf_mnist_multi_gpus%A_%a.out # output file name
#SBATCH --error=tf_mnist_multi_gpus%A_%a.out # error file name
#SBATCH --array=0-1 # one job array with 2 jobs
#SBATCH --qos=qos_gpu-dev # we are submitting a test job
set -x
cd ${SLURM_SUBMIT_DIR}
# no particular option here but you could imagine it being different parameters,
# for example for running two jobs with differente learning rates (0.1 and 1):
# opt[0]="0.1"
# opt[1]="1"
opt[0]=""
opt[1]=""
module purge
module load tensorflow-gpu/py3/2.1.0
srun python ./mnist_example.py ${opt[$SLURM_ARRAY_TASK_ID]}
wait
Dask example¶
To run the dask example you will need to install dask-jobqueue
in your
environment additionally. Notice that this time you need to use the python
module with tensorflow loaded, because dask will by default use the same
python for the worker as the one you used for the
scheduler. See this GitHub
issue for more information.
module load tensorflow-gpu/py3/2.1.0 &&\
pip install click dask-jobqueue
You can then do:
python jean-zay-doc/docs/examples/tf/tf_simple/dask_script.py 64
where 64 is the batch size you want to run the mnist example with. If you want multiple batch sizes just have them space-separated.
Be sure to load the tensorflow module before launching the dask script because otherwise Tensorflow will not be loaded. This is because the python executable used to launch the dask worker is the same as the one used to launch the scheduler by default. You can set it otherwise in the cluster if you want something more tailored.
Here is the code for the file dask_script.py
:
import click
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from mnist_example import train_dense_model
@click.command()
@click.argument(
'batch_sizes',
nargs=-1,
type=int,
)
@click.option(
'save',
'-s',
'--save',
is_flag=True,
help='Whether you want to save the models or not',
)
def launch_dask_tasks(batch_sizes, save):
job_name = 'dask_mnist_tf_example'
cluster = SLURMCluster(
cores=1,
job_cpu=10,
memory='10GB',
job_name=job_name,
walltime='1:00:00',
interface='ib0',
job_extra=[
f'--gres=gpu:1',
'--qos=qos_gpu-dev',
'--distribution=block:block',
'--hint=nomultithread',
'--output=%x_%j.out',
],
)
n_jobs = len(batch_sizes)
cluster.scale(jobs=n_jobs)
print(cluster.job_script())
client = Client(cluster)
futures = [client.submit(
# function to execute
train_dense_model,
# *args
None, save, batch_size,
# this function has potential side effects
pure=not save,
) for batch_size in batch_sizes]
job_result = client.gather(futures)
if all(job_result):
print('All jobs finished without errors')
else:
print('One job errored out')
print('Shutting down dask workers')
if __name__ == '__main__':
launch_dask_tasks()