Skip to content

TensorFlow single node examples

To run the examples you will need to first install click in your environment.

module load python/3.7.5 &&\
pip install click

Then you need to clone the jean-zay repo in your $WORK dir:

cd $WORK &&\
git clone https://github.com/jean-zay-users/jean-zay-doc.git

Classical examples

For the single GPU job you can do:

cd jean-zay-doc/docs/examples/tf/tf_simple
sbatch mnist_submission_script.slurm

For the multi GPU job you can do:

jean-zay-doc/docs/examples/tf/tf_simple
sbatch mnist_submission_script_multi_gpus.slurm

The training code used in this example is:

# all taken from https://www.tensorflow.org/guide/keras/functional
import click


@click.command()
@click.option(
    'cuda_visible_devices',
    '-gpus',
    default=None,
    help='The GPUs you want visible for this task, comma separated. Defaults to all GPUs visible',
)
@click.option(
    'save',
    '-s',
    '--save',
    is_flag=True,
    help='Whether you want to save the model or not',
)
def train_dense_model_click(cuda_visible_devices, save):
    return train_dense_model(cuda_visible_devices, save, batch_size=64)


def train_dense_model(cuda_visible_devices, save, batch_size):
    # limit imports oustide the call to the function, in order to launch quickly
    # when using dask
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers

    if cuda_visible_devices is not None:
        import os
        os.environ['CUDA_VISIBLE_DEVICES'] = cuda_visible_devices
    # model building
    tf.keras.backend.clear_session()  # For easy reset of notebook state.

    inputs = keras.Input(shape=(784,), name='img')
    x = layers.Dense(64, activation='relu')(inputs)
    x = layers.Dense(64, activation='relu')(x)
    outputs = layers.Dense(10)(x)

    model = keras.Model(inputs=inputs, outputs=outputs, name='mnist_model')

    # training and inference
    # network is not reachable, so we use random data
    x_train = tf.random.normal((60000, 784), dtype='float32')
    x_test = tf.random.normal((10000, 784), dtype='float32')
    y_train = tf.random.uniform((60000,), minval=0, maxval=10, dtype='int32')
    y_test = tf.random.uniform((10000,), minval=0, maxval=10, dtype='int32')


    model.compile(loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  optimizer=keras.optimizers.RMSprop(),
                  metrics=['accuracy'])
    history = model.fit(x_train, y_train,
                        batch_size=batch_size,
                        epochs=5,
                        validation_split=0.2)
    test_scores = model.evaluate(x_test, y_test, verbose=2)
    print('Test loss:', test_scores[0])
    print('Test accuracy:', test_scores[1])

    # saving
    if save:
        model.save(os.environ['SCRATCH'])
    return True

if __name__ == '__main__':
    train_dense_model_click()

and the script used to launch a single GPU job is:

#!/bin/bash
#SBATCH --job-name=tf_mnist     # job name
#SBATCH --ntasks=1                   # number of MP tasks
#SBATCH --ntasks-per-node=1          # number of MPI tasks per node
#SBATCH --gres=gpu:1                 # number of GPUs per node
#SBATCH --cpus-per-task=10           # number of cores per tasks
# /!\ Caution, in the following line, "multithread" refers to hyperthreading.
#SBATCH --hint=nomultithread         # we get physical cores not logical
#SBATCH --distribution=block:block   # we pin the tasks on contiguous cores
#SBATCH --time=2:00:00              # maximum execution time (HH:MM:SS)
#SBATCH --output=tf_mnist%j.out # output file name
#SBATCH --error=tf_mnist%j.out  # error file name
#SBATCH --qos=qos_gpu-dev         # we are submitting a test job

set -x
cd ${SLURM_SUBMIT_DIR}

module purge
module load tensorflow-gpu/py3/2.1.0

srun python ./mnist_example.py -s&

wait

to launch the same code using a multiGPU configuration, use the following script:

#!/bin/bash
#SBATCH --job-name=tf_mnist_multi_gpus     # job name
#SBATCH --ntasks=1                   # number of MP tasks
#SBATCH --ntasks-per-node=1          # number of MPI tasks per node
#SBATCH --gres=gpu:1                 # number of GPUs per node
#SBATCH --cpus-per-task=10           # number of cores per tasks
# /!\ Caution, in the following line, "multithread" refers to hyperthreading.
#SBATCH --hint=nomultithread         # we get physical cores not logical
#SBATCH --distribution=block:block   # we pin the tasks on contiguous cores
#SBATCH --time=2:00:00              # maximum execution time (HH:MM:SS)
#SBATCH --output=tf_mnist_multi_gpus%A_%a.out # output file name
#SBATCH --error=tf_mnist_multi_gpus%A_%a.out  # error file name
#SBATCH --array=0-1            # one job array with 2 jobs
#SBATCH --qos=qos_gpu-dev         # we are submitting a test job

set -x
cd ${SLURM_SUBMIT_DIR}

# no particular option here but you could imagine it being different parameters,
# for example for running two jobs with differente learning rates (0.1 and 1):
# opt[0]="0.1"
# opt[1]="1"
opt[0]=""
opt[1]=""

module purge
module load tensorflow-gpu/py3/2.1.0

srun python ./mnist_example.py ${opt[$SLURM_ARRAY_TASK_ID]}

wait

Dask example

To run the dask example you will need to install dask-jobqueue in your environment additionally. Notice that this time you need to use the python module with tensorflow loaded, because dask will by default use the same python for the worker as the one you used for the scheduler. See this GitHub issue for more information.

module load tensorflow-gpu/py3/2.1.0 &&\
pip install click dask-jobqueue

You can then do:

python jean-zay-doc/docs/examples/tf/tf_simple/dask_script.py 64

where 64 is the batch size you want to run the mnist example with. If you want multiple batch sizes just have them space-separated.

Be sure to load the tensorflow module before launching the dask script because otherwise Tensorflow will not be loaded. This is because the python executable used to launch the dask worker is the same as the one used to launch the scheduler by default. You can set it otherwise in the cluster if you want something more tailored.

Here is the code for the file dask_script.py:

import click
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

from mnist_example import train_dense_model


@click.command()
@click.argument(
    'batch_sizes',
    nargs=-1,
    type=int,
)
@click.option(
    'save',
    '-s',
    '--save',
    is_flag=True,
    help='Whether you want to save the models or not',
)
def launch_dask_tasks(batch_sizes, save):
    job_name = 'dask_mnist_tf_example'

    cluster = SLURMCluster(
        cores=1,
        job_cpu=10,
        memory='10GB',
        job_name=job_name,
        walltime='1:00:00',
        interface='ib0',
        job_extra=[
            f'--gres=gpu:1',
            '--qos=qos_gpu-dev',
            '--distribution=block:block',
            '--hint=nomultithread',
            '--output=%x_%j.out',
        ],
    )
    n_jobs = len(batch_sizes)
    cluster.scale(jobs=n_jobs)
    print(cluster.job_script())

    client = Client(cluster)
    futures = [client.submit(
        # function to execute
        train_dense_model,
        # *args
        None, save, batch_size,
        # this function has potential side effects
        pure=not save,
    ) for batch_size in batch_sizes]
    job_result = client.gather(futures)
    if all(job_result):
        print('All jobs finished without errors')
    else:
        print('One job errored out')
    print('Shutting down dask workers')


if __name__ == '__main__':
    launch_dask_tasks()