Skip to content
41 changes: 26 additions & 15 deletions examples/pytorch_imagenet_resnet50.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import torch
import argparse
import torch.backends.cudnn as cudnn
from torch.autograd import Variable
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
Expand All @@ -14,6 +15,9 @@
from tqdm import tqdm

# Training settings

export_dir = os.path.abspath(os.environ.get('PS_MODEL_PATH', os.getcwd() + '/models'))

parser = argparse.ArgumentParser(description='PyTorch ImageNet Example',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--train-dir', default=os.path.expanduser('~/imagenet/train'),
Expand All @@ -30,8 +34,6 @@
help='number of batches processed locally before '
'executing allreduce across workers; it multiplies '
'total batch size.')
parser.add_argument('--use-adasum', action='store_true', default=False,
help='use adasum algorithm to do reduction')

# Default settings from https://arxiv.org/abs/1706.02677.
parser.add_argument('--batch-size', type=int, default=32,
Expand Down Expand Up @@ -127,21 +129,15 @@
# Set up standard ResNet-50 model.
model = models.resnet50()

# By default, Adasum doesn't need scaling up learning rate.
# For sum/average with gradient Accumulation: scale learning rate by batches_per_allreduce
lr_scaler = args.batches_per_allreduce * hvd.size() if not args.use_adasum else 1

if args.cuda:
# Move model to GPU.
model.cuda()
# If using GPU Adasum allreduce, scale learning rate by local_size.
if args.use_adasum and hvd.nccl_built():
lr_scaler = args.batches_per_allreduce * hvd.local_size()

# Horovod: scale learning rate by the number of GPUs.
# Gradient Accumulation: scale learning rate by batches_per_allreduce
optimizer = optim.SGD(model.parameters(),
lr=(args.base_lr *
lr_scaler),
args.batches_per_allreduce * hvd.size()),
momentum=args.momentum, weight_decay=args.wd)

# Horovod: (optional) compression algorithm.
Expand All @@ -151,8 +147,7 @@
optimizer = hvd.DistributedOptimizer(
optimizer, named_parameters=model.named_parameters(),
compression=compression,
backward_passes_per_step=args.batches_per_allreduce,
op=hvd.Adasum if args.use_adasum else hvd.Average)
backward_passes_per_step=args.batches_per_allreduce)

# Restore from a previous checkpoint, if initial_epoch is specified.
# Horovod: restore on the first worker which will broadcast weights to other workers.
Expand Down Expand Up @@ -261,9 +256,23 @@ def save_checkpoint(epoch):
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
}
torch.save(state, filepath)


torch.save(state, export_dir + '/' + filepath)

def export_model():
if hvd.rank() == 0:
# Save to ONNX model format
dummy_input = Variable(torch.randn(1, 3, 224, 224, device='cuda')) # one color 224 x 224 picture will be the input to the model

print('Saving ONNX model to: ' + export_dir)
torch.onnx.export(model, # model being run
dummy_input, # model input (or a tuple for multiple inputs)
export_dir + "/model.onnx", # where to save the model (can be a file or file-like object)
export_params=True) # store the trained parameter weights inside the model file
# opset_version=10, # the ONNX version to export the model to
#do_constant_folding=True, # whether to execute constant folding for optimization
#input_names = ['input'], # the model's input names
#output_names = ['output']) # the model's output names

# Horovod: average metrics from distributed training.
class Metric(object):
def __init__(self, name):
Expand All @@ -284,3 +293,5 @@ def avg(self):
train(epoch)
validate(epoch)
save_checkpoint(epoch)
export_model()

39 changes: 25 additions & 14 deletions examples/pytorch_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
import torch.utils.data.distributed
import horovod.torch as hvd

from torch.autograd import Variable

import os

# Training settings

export_dir = os.path.abspath(os.environ.get('PS_MODEL_PATH', os.getcwd() + '/models'))

parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
Expand All @@ -27,9 +34,6 @@
help='how many batches to wait before logging training status')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
help='use fp16 compression during allreduce')
parser.add_argument('--use-adasum', action='store_true', default=False,
help='use adasum algorithm to do reduction')

args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()

Expand Down Expand Up @@ -92,18 +96,12 @@ def forward(self, x):

model = Net()

# By default, Adasum doesn't need scaling up learning rate.
lr_scaler = hvd.size() if not args.use_adasum else 1

if args.cuda:
# Move model to GPU.
model.cuda()
# If using GPU Adasum allreduce, scale learning rate by local_size.
if args.use_adasum and hvd.nccl_built():
lr_scaler = hvd.local_size()

# Horovod: scale learning rate by lr_scaler.
optimizer = optim.SGD(model.parameters(), lr=args.lr * lr_scaler,
# Horovod: scale learning rate by the number of GPUs.
optimizer = optim.SGD(model.parameters(), lr=args.lr * hvd.size(),
momentum=args.momentum)

# Horovod: broadcast parameters & optimizer state.
Expand All @@ -116,8 +114,7 @@ def forward(self, x):
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(optimizer,
named_parameters=model.named_parameters(),
compression=compression,
op=hvd.Adasum if args.use_adasum else hvd.Average)
compression=compression)


def train(epoch):
Expand Down Expand Up @@ -173,7 +170,21 @@ def test():
if hvd.rank() == 0:
print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(
test_loss, 100. * test_accuracy))

print('Saving PyTorch model to: ' + export_dir)
torch.save(model.state_dict(), export_dir + '/model.pth')
# Save to ONNX model format
# dummy_input = torch.randn(10, 3, 224, 224, device='cuda')
dummy_input = Variable(torch.randn(1, 1, 28, 28, device='cuda')) # one black and white 28 x 28 picture will be the input to the model

print('Saving ONNX model to: ' + export_dir)
torch.onnx.export(model, # model being run
dummy_input, # model input (or a tuple for multiple inputs)
export_dir + "/model.onnx", # where to save the model (can be a file or file-like object)
export_params=True) # store the trained parameter weights inside the model file
# opset_version=10, # the ONNX version to export the model to
#do_constant_folding=True, # whether to execute constant folding for optimization
#input_names = ['input'], # the model's input names
#output_names = ['output']) # the model's output names

for epoch in range(1, args.epochs + 1):
train(epoch)
Expand Down
202 changes: 202 additions & 0 deletions examples/tensorflow2_keras_imagenet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
#
# ResNet-50 model training using Keras and Horovod.
#
# This model is an example of a computation-intensive model that achieves good accuracy on an image
# classification task. It brings together distributed training concepts such as learning rate
# schedule adjustments with a warmup, randomized data reading, and checkpointing on the first worker
# only.
#
# Note: This model uses Keras native ImageDataGenerator and not the sophisticated preprocessing
# pipeline that is typically used to train state-of-the-art ResNet-50 model. This results in ~0.5%
# increase in the top-1 validation error compared to the single-crop top-1 validation error from
# https://github.com/KaimingHe/deep-residual-networks.
#
from __future__ import print_function

import argparse
# import keras
# from keras import backend as K
# from keras.preprocessing import image
import tensorflow as tf
from tensorflow import keras
from keras import backend as K
from keras.preprocessing import image

import horovod.keras as hvd
import os

parser = argparse.ArgumentParser(description='Keras ImageNet Example',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--train-dir', default=os.path.expanduser('~/imagenet/train'),
help='path to training data')
parser.add_argument('--val-dir', default=os.path.expanduser('~/imagenet/validation'),
help='path to validation data')
parser.add_argument('--log-dir', default='./logs',
help='tensorboard log directory')
parser.add_argument('--checkpoint-format', default='checkpoint-{epoch}.h5',
help='checkpoint file format')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
help='use fp16 compression during allreduce')

# Default settings from https://arxiv.org/abs/1706.02677.
parser.add_argument('--batch-size', type=int, default=32,
help='input batch size for training')
parser.add_argument('--val-batch-size', type=int, default=32,
help='input batch size for validation')
parser.add_argument('--epochs', type=int, default=90,
help='number of epochs to train')
parser.add_argument('--base-lr', type=float, default=0.0125,
help='learning rate for a single GPU')
parser.add_argument('--warmup-epochs', type=float, default=5,
help='number of warmup epochs')
parser.add_argument('--momentum', type=float, default=0.9,
help='SGD momentum')
parser.add_argument('--wd', type=float, default=0.00005,
help='weight decay')

args = parser.parse_args()

export_dir = os.path.abspath(os.environ.get('PS_MODEL_PATH', os.getcwd() + '/models'))

print(tf.test.is_built_with_cuda())
# data_format = ('channels_first'
# if tf.test.is_built_with_cuda() else 'channels_last')
tf.keras.backend.set_image_data_format('channels_last')

# Horovod: initialize Horovod.
hvd.init()

# Horovod: pin GPU to be used to process local rank (one GPU per process)
# config = tf.compat.v1.ConfigProto()
# config.gpu_options.allow_growth = True
# config.gpu_options.visible_device_list = str(hvd.local_rank())
# K.set_session(tf.compat.v1.Session(config=config))

# Horovod: pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# If set > 0, will resume training from a given checkpoint.
resume_from_epoch = 0
for try_epoch in range(args.epochs, 0, -1):
if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
resume_from_epoch = try_epoch
break

# Horovod: broadcast resume_from_epoch from rank 0 (which will have
# checkpoints) to other ranks.
resume_from_epoch = hvd.broadcast(resume_from_epoch, 0, name='resume_from_epoch')

# Horovod: print logs on the first worker.
verbose = 1 if hvd.rank() == 0 else 0

# Training data iterator.
train_gen = image.ImageDataGenerator(
width_shift_range=0.33, height_shift_range=0.33, zoom_range=0.5, horizontal_flip=True,
preprocessing_function=keras.applications.resnet50.preprocess_input)

train_iter = train_gen.flow_from_directory(args.train_dir,
batch_size=args.batch_size,
target_size=(224, 224))

# Validation data iterator.
test_gen = image.ImageDataGenerator(
zoom_range=(0.875, 0.875), preprocessing_function=keras.applications.resnet50.preprocess_input)
test_iter = test_gen.flow_from_directory(args.val_dir,
batch_size=args.val_batch_size,
target_size=(224, 224))


# Set up standard ResNet-50 model.
num_classes = int(os.environ.get('IMAGENET_CLASSES', 1000))
model = keras.applications.resnet50.ResNet50(weights=None, classes=num_classes)

# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

# Restore from a previous checkpoint, if initial_epoch is specified.
# Horovod: restore on the first worker which will broadcast both model and optimizer weights
# to other workers.
if resume_from_epoch > 0 and hvd.rank() == 0:
model = hvd.load_model(args.checkpoint_format.format(epoch=resume_from_epoch),
compression=compression)
else:
# ResNet-50 model that is included with Keras is optimized for inference.
# Add L2 weight decay & adjust BN settings.
model_config = model.get_config()
for layer, layer_config in zip(model.layers, model_config['layers']):
if hasattr(layer, 'kernel_regularizer'):
regularizer = keras.regularizers.l2(args.wd)
layer_config['config']['kernel_regularizer'] = \
{'class_name': regularizer.__class__.__name__,
'config': regularizer.get_config()}
if type(layer) == keras.layers.BatchNormalization:
layer_config['config']['momentum'] = 0.9
layer_config['config']['epsilon'] = 1e-5

model = keras.models.Model.from_config(model_config)

# Horovod: adjust learning rate based on number of GPUs.
opt = keras.optimizers.SGD(lr=args.base_lr * hvd.size(),
momentum=args.momentum)

# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt, compression=compression)

model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=opt,
metrics=['accuracy', 'top_k_categorical_accuracy'],
experimental_run_tf_function=False)

callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),

# Horovod: average metrics among workers at the end of every epoch.
#
# Note: This callback must be in the list before the ReduceLROnPlateau,
# TensorBoard, or other metrics-based callbacks.
hvd.callbacks.MetricAverageCallback(),

# Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
# accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=args.warmup_epochs, verbose=verbose),

# Horovod: after the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
hvd.callbacks.LearningRateScheduleCallback(start_epoch=args.warmup_epochs, end_epoch=30, multiplier=1.),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=30, end_epoch=60, multiplier=1e-1),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=60, end_epoch=80, multiplier=1e-2),
hvd.callbacks.LearningRateScheduleCallback(start_epoch=80, multiplier=1e-3),
]

# Horovod: save checkpoints only on the first worker to prevent other workers from corrupting them.
if hvd.rank() == 0:
callbacks.append(keras.callbacks.ModelCheckpoint(export_dir + args.checkpoint_format))

callbacks.append(keras.callbacks.TensorBoard(export_dir))

# Train the model. The training will randomly sample 1 / N batches of training data and
# 3 / N batches of validation data on every worker, where N is the number of workers.
# Over-sampling of validation data helps to increase probability that every validation
# example will be evaluated.
model.fit_generator(train_iter,
steps_per_epoch=len(train_iter) // hvd.size(),
callbacks=callbacks,
epochs=args.epochs,
verbose=verbose,
workers=4,
initial_epoch=resume_from_epoch,
validation_data=test_iter,
validation_steps=3 * len(test_iter) // hvd.size())

# Evaluate the model on the full data set.
score = hvd.allreduce(model.evaluate_generator(test_iter, len(test_iter), workers=4))
if verbose:
print('Test loss:', score[0])
print('Test accuracy:', score[1])