diff --git a/examples/pytorch_imagenet_resnet50.py b/examples/pytorch_imagenet_resnet50.py index ce050d902a..db17d89b6d 100644 --- a/examples/pytorch_imagenet_resnet50.py +++ b/examples/pytorch_imagenet_resnet50.py @@ -3,6 +3,7 @@ import torch import argparse import torch.backends.cudnn as cudnn +from torch.autograd import Variable import torch.nn.functional as F import torch.optim as optim import torch.utils.data.distributed @@ -14,6 +15,9 @@ from tqdm import tqdm # Training settings + +export_dir = os.path.abspath(os.environ.get('PS_MODEL_PATH', os.getcwd() + '/models')) + parser = argparse.ArgumentParser(description='PyTorch ImageNet Example', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--train-dir', default=os.path.expanduser('~/imagenet/train'), @@ -30,8 +34,6 @@ help='number of batches processed locally before ' 'executing allreduce across workers; it multiplies ' 'total batch size.') -parser.add_argument('--use-adasum', action='store_true', default=False, - help='use adasum algorithm to do reduction') # Default settings from https://arxiv.org/abs/1706.02677. parser.add_argument('--batch-size', type=int, default=32, @@ -127,21 +129,15 @@ # Set up standard ResNet-50 model. model = models.resnet50() -# By default, Adasum doesn't need scaling up learning rate. -# For sum/average with gradient Accumulation: scale learning rate by batches_per_allreduce -lr_scaler = args.batches_per_allreduce * hvd.size() if not args.use_adasum else 1 - if args.cuda: # Move model to GPU. model.cuda() - # If using GPU Adasum allreduce, scale learning rate by local_size. - if args.use_adasum and hvd.nccl_built(): - lr_scaler = args.batches_per_allreduce * hvd.local_size() # Horovod: scale learning rate by the number of GPUs. +# Gradient Accumulation: scale learning rate by batches_per_allreduce optimizer = optim.SGD(model.parameters(), lr=(args.base_lr * - lr_scaler), + args.batches_per_allreduce * hvd.size()), momentum=args.momentum, weight_decay=args.wd) # Horovod: (optional) compression algorithm. @@ -151,8 +147,7 @@ optimizer = hvd.DistributedOptimizer( optimizer, named_parameters=model.named_parameters(), compression=compression, - backward_passes_per_step=args.batches_per_allreduce, - op=hvd.Adasum if args.use_adasum else hvd.Average) + backward_passes_per_step=args.batches_per_allreduce) # Restore from a previous checkpoint, if initial_epoch is specified. # Horovod: restore on the first worker which will broadcast weights to other workers. @@ -261,9 +256,23 @@ def save_checkpoint(epoch): 'model': model.state_dict(), 'optimizer': optimizer.state_dict(), } - torch.save(state, filepath) - - + torch.save(state, export_dir + '/' + filepath) + +def export_model(): + if hvd.rank() == 0: + # Save to ONNX model format + dummy_input = Variable(torch.randn(1, 3, 224, 224, device='cuda')) # one color 224 x 224 picture will be the input to the model + + print('Saving ONNX model to: ' + export_dir) + torch.onnx.export(model, # model being run + dummy_input, # model input (or a tuple for multiple inputs) + export_dir + "/model.onnx", # where to save the model (can be a file or file-like object) + export_params=True) # store the trained parameter weights inside the model file + # opset_version=10, # the ONNX version to export the model to + #do_constant_folding=True, # whether to execute constant folding for optimization + #input_names = ['input'], # the model's input names + #output_names = ['output']) # the model's output names + # Horovod: average metrics from distributed training. class Metric(object): def __init__(self, name): @@ -284,3 +293,5 @@ def avg(self): train(epoch) validate(epoch) save_checkpoint(epoch) +export_model() + diff --git a/examples/pytorch_mnist.py b/examples/pytorch_mnist.py index 783e603d3d..23efcece29 100644 --- a/examples/pytorch_mnist.py +++ b/examples/pytorch_mnist.py @@ -7,7 +7,14 @@ import torch.utils.data.distributed import horovod.torch as hvd +from torch.autograd import Variable + +import os + # Training settings + +export_dir = os.path.abspath(os.environ.get('PS_MODEL_PATH', os.getcwd() + '/models')) + parser = argparse.ArgumentParser(description='PyTorch MNIST Example') parser.add_argument('--batch-size', type=int, default=64, metavar='N', help='input batch size for training (default: 64)') @@ -27,9 +34,6 @@ help='how many batches to wait before logging training status') parser.add_argument('--fp16-allreduce', action='store_true', default=False, help='use fp16 compression during allreduce') -parser.add_argument('--use-adasum', action='store_true', default=False, - help='use adasum algorithm to do reduction') - args = parser.parse_args() args.cuda = not args.no_cuda and torch.cuda.is_available() @@ -92,18 +96,12 @@ def forward(self, x): model = Net() -# By default, Adasum doesn't need scaling up learning rate. -lr_scaler = hvd.size() if not args.use_adasum else 1 - if args.cuda: # Move model to GPU. model.cuda() - # If using GPU Adasum allreduce, scale learning rate by local_size. - if args.use_adasum and hvd.nccl_built(): - lr_scaler = hvd.local_size() -# Horovod: scale learning rate by lr_scaler. -optimizer = optim.SGD(model.parameters(), lr=args.lr * lr_scaler, +# Horovod: scale learning rate by the number of GPUs. +optimizer = optim.SGD(model.parameters(), lr=args.lr * hvd.size(), momentum=args.momentum) # Horovod: broadcast parameters & optimizer state. @@ -116,8 +114,7 @@ def forward(self, x): # Horovod: wrap optimizer with DistributedOptimizer. optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters(), - compression=compression, - op=hvd.Adasum if args.use_adasum else hvd.Average) + compression=compression) def train(epoch): @@ -173,7 +170,21 @@ def test(): if hvd.rank() == 0: print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format( test_loss, 100. * test_accuracy)) - + print('Saving PyTorch model to: ' + export_dir) + torch.save(model.state_dict(), export_dir + '/model.pth') + # Save to ONNX model format + # dummy_input = torch.randn(10, 3, 224, 224, device='cuda') + dummy_input = Variable(torch.randn(1, 1, 28, 28, device='cuda')) # one black and white 28 x 28 picture will be the input to the model + + print('Saving ONNX model to: ' + export_dir) + torch.onnx.export(model, # model being run + dummy_input, # model input (or a tuple for multiple inputs) + export_dir + "/model.onnx", # where to save the model (can be a file or file-like object) + export_params=True) # store the trained parameter weights inside the model file + # opset_version=10, # the ONNX version to export the model to + #do_constant_folding=True, # whether to execute constant folding for optimization + #input_names = ['input'], # the model's input names + #output_names = ['output']) # the model's output names for epoch in range(1, args.epochs + 1): train(epoch) diff --git a/examples/tensorflow2_keras_imagenet.py b/examples/tensorflow2_keras_imagenet.py new file mode 100644 index 0000000000..7e4971f1c1 --- /dev/null +++ b/examples/tensorflow2_keras_imagenet.py @@ -0,0 +1,202 @@ +# +# ResNet-50 model training using Keras and Horovod. +# +# This model is an example of a computation-intensive model that achieves good accuracy on an image +# classification task. It brings together distributed training concepts such as learning rate +# schedule adjustments with a warmup, randomized data reading, and checkpointing on the first worker +# only. +# +# Note: This model uses Keras native ImageDataGenerator and not the sophisticated preprocessing +# pipeline that is typically used to train state-of-the-art ResNet-50 model. This results in ~0.5% +# increase in the top-1 validation error compared to the single-crop top-1 validation error from +# https://github.com/KaimingHe/deep-residual-networks. +# +from __future__ import print_function + +import argparse +# import keras +# from keras import backend as K +# from keras.preprocessing import image +import tensorflow as tf +from tensorflow import keras +from keras import backend as K +from keras.preprocessing import image + +import horovod.keras as hvd +import os + +parser = argparse.ArgumentParser(description='Keras ImageNet Example', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument('--train-dir', default=os.path.expanduser('~/imagenet/train'), + help='path to training data') +parser.add_argument('--val-dir', default=os.path.expanduser('~/imagenet/validation'), + help='path to validation data') +parser.add_argument('--log-dir', default='./logs', + help='tensorboard log directory') +parser.add_argument('--checkpoint-format', default='checkpoint-{epoch}.h5', + help='checkpoint file format') +parser.add_argument('--fp16-allreduce', action='store_true', default=False, + help='use fp16 compression during allreduce') + +# Default settings from https://arxiv.org/abs/1706.02677. +parser.add_argument('--batch-size', type=int, default=32, + help='input batch size for training') +parser.add_argument('--val-batch-size', type=int, default=32, + help='input batch size for validation') +parser.add_argument('--epochs', type=int, default=90, + help='number of epochs to train') +parser.add_argument('--base-lr', type=float, default=0.0125, + help='learning rate for a single GPU') +parser.add_argument('--warmup-epochs', type=float, default=5, + help='number of warmup epochs') +parser.add_argument('--momentum', type=float, default=0.9, + help='SGD momentum') +parser.add_argument('--wd', type=float, default=0.00005, + help='weight decay') + +args = parser.parse_args() + +export_dir = os.path.abspath(os.environ.get('PS_MODEL_PATH', os.getcwd() + '/models')) + +print(tf.test.is_built_with_cuda()) +# data_format = ('channels_first' +# if tf.test.is_built_with_cuda() else 'channels_last') +tf.keras.backend.set_image_data_format('channels_last') + +# Horovod: initialize Horovod. +hvd.init() + +# Horovod: pin GPU to be used to process local rank (one GPU per process) +# config = tf.compat.v1.ConfigProto() +# config.gpu_options.allow_growth = True +# config.gpu_options.visible_device_list = str(hvd.local_rank()) +# K.set_session(tf.compat.v1.Session(config=config)) + +# Horovod: pin GPU to be used to process local rank (one GPU per process) +gpus = tf.config.experimental.list_physical_devices('GPU') +for gpu in gpus: + tf.config.experimental.set_memory_growth(gpu, True) +if gpus: + tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU') + +# If set > 0, will resume training from a given checkpoint. +resume_from_epoch = 0 +for try_epoch in range(args.epochs, 0, -1): + if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)): + resume_from_epoch = try_epoch + break + +# Horovod: broadcast resume_from_epoch from rank 0 (which will have +# checkpoints) to other ranks. +resume_from_epoch = hvd.broadcast(resume_from_epoch, 0, name='resume_from_epoch') + +# Horovod: print logs on the first worker. +verbose = 1 if hvd.rank() == 0 else 0 + +# Training data iterator. +train_gen = image.ImageDataGenerator( + width_shift_range=0.33, height_shift_range=0.33, zoom_range=0.5, horizontal_flip=True, + preprocessing_function=keras.applications.resnet50.preprocess_input) + +train_iter = train_gen.flow_from_directory(args.train_dir, + batch_size=args.batch_size, + target_size=(224, 224)) + +# Validation data iterator. +test_gen = image.ImageDataGenerator( + zoom_range=(0.875, 0.875), preprocessing_function=keras.applications.resnet50.preprocess_input) +test_iter = test_gen.flow_from_directory(args.val_dir, + batch_size=args.val_batch_size, + target_size=(224, 224)) + + +# Set up standard ResNet-50 model. +num_classes = int(os.environ.get('IMAGENET_CLASSES', 1000)) +model = keras.applications.resnet50.ResNet50(weights=None, classes=num_classes) + +# Horovod: (optional) compression algorithm. +compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none + +# Restore from a previous checkpoint, if initial_epoch is specified. +# Horovod: restore on the first worker which will broadcast both model and optimizer weights +# to other workers. +if resume_from_epoch > 0 and hvd.rank() == 0: + model = hvd.load_model(args.checkpoint_format.format(epoch=resume_from_epoch), + compression=compression) +else: + # ResNet-50 model that is included with Keras is optimized for inference. + # Add L2 weight decay & adjust BN settings. + model_config = model.get_config() + for layer, layer_config in zip(model.layers, model_config['layers']): + if hasattr(layer, 'kernel_regularizer'): + regularizer = keras.regularizers.l2(args.wd) + layer_config['config']['kernel_regularizer'] = \ + {'class_name': regularizer.__class__.__name__, + 'config': regularizer.get_config()} + if type(layer) == keras.layers.BatchNormalization: + layer_config['config']['momentum'] = 0.9 + layer_config['config']['epsilon'] = 1e-5 + + model = keras.models.Model.from_config(model_config) + + # Horovod: adjust learning rate based on number of GPUs. + opt = keras.optimizers.SGD(lr=args.base_lr * hvd.size(), + momentum=args.momentum) + + # Horovod: add Horovod Distributed Optimizer. + opt = hvd.DistributedOptimizer(opt, compression=compression) + + model.compile(loss=keras.losses.categorical_crossentropy, + optimizer=opt, + metrics=['accuracy', 'top_k_categorical_accuracy'], + experimental_run_tf_function=False) + +callbacks = [ + # Horovod: broadcast initial variable states from rank 0 to all other processes. + # This is necessary to ensure consistent initialization of all workers when + # training is started with random weights or restored from a checkpoint. + hvd.callbacks.BroadcastGlobalVariablesCallback(0), + + # Horovod: average metrics among workers at the end of every epoch. + # + # Note: This callback must be in the list before the ReduceLROnPlateau, + # TensorBoard, or other metrics-based callbacks. + hvd.callbacks.MetricAverageCallback(), + + # Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final + # accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during + # the first five epochs. See https://arxiv.org/abs/1706.02677 for details. + hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=args.warmup_epochs, verbose=verbose), + + # Horovod: after the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs. + hvd.callbacks.LearningRateScheduleCallback(start_epoch=args.warmup_epochs, end_epoch=30, multiplier=1.), + hvd.callbacks.LearningRateScheduleCallback(start_epoch=30, end_epoch=60, multiplier=1e-1), + hvd.callbacks.LearningRateScheduleCallback(start_epoch=60, end_epoch=80, multiplier=1e-2), + hvd.callbacks.LearningRateScheduleCallback(start_epoch=80, multiplier=1e-3), +] + +# Horovod: save checkpoints only on the first worker to prevent other workers from corrupting them. +if hvd.rank() == 0: + callbacks.append(keras.callbacks.ModelCheckpoint(export_dir + args.checkpoint_format)) + + callbacks.append(keras.callbacks.TensorBoard(export_dir)) + +# Train the model. The training will randomly sample 1 / N batches of training data and +# 3 / N batches of validation data on every worker, where N is the number of workers. +# Over-sampling of validation data helps to increase probability that every validation +# example will be evaluated. +model.fit_generator(train_iter, + steps_per_epoch=len(train_iter) // hvd.size(), + callbacks=callbacks, + epochs=args.epochs, + verbose=verbose, + workers=4, + initial_epoch=resume_from_epoch, + validation_data=test_iter, + validation_steps=3 * len(test_iter) // hvd.size()) + +# Evaluate the model on the full data set. +score = hvd.allreduce(model.evaluate_generator(test_iter, len(test_iter), workers=4)) +if verbose: + print('Test loss:', score[0]) + print('Test accuracy:', score[1])