diff --git a/.gitignore b/.gitignore index 5c1678f..5697341 100644 --- a/.gitignore +++ b/.gitignore @@ -270,3 +270,5 @@ _Pvt_Extensions # Python *.pyc +*.egg +binding/python/multiverso_python.egg-info/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 8d63ca3..88227d2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,6 +21,10 @@ if(USE_HDFS) LINK_DIRECTORIES(${JVM_LIB}) endif(USE_HDFS) +if(ENABLE_DCASGD) + add_definitions(-DENABLE_DCASGD) +endif(ENABLE_DCASGD) + include_directories(${PROJECT_SOURCE_DIR}/include) set(MULTIVERSO_DIR ${PROJECT_SOURCE_DIR}) @@ -43,3 +47,4 @@ configure_file( add_custom_target(uninstall COMMAND ${CMAKE_COMMAND} -P ${CMAKE_CURRENT_BINARY_DIR}/cmake_uninstall.cmake) + diff --git a/binding/python/examples/torch/mnist.py b/binding/python/examples/torch/mnist.py new file mode 100644 index 0000000..d77a9ad --- /dev/null +++ b/binding/python/examples/torch/mnist.py @@ -0,0 +1,129 @@ +from __future__ import print_function +import argparse +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +from torchvision import datasets, transforms +from torch.autograd import Variable + +import numpy as np +import multiverso as mv +from multiverso.torch_ext import torchmodel + +mv.init(sync=True, updater='sgd') + +# Training settings +parser = argparse.ArgumentParser(description='PyTorch MNIST Example') +parser.add_argument('--batch-size', type=int, default=64, metavar='N', + help='input batch size for training (default: 64)') +parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', + help='input batch size for testing (default: 1000)') +parser.add_argument('--epochs', type=int, default=10, metavar='N', + help='number of epochs to train (default: 10)') +parser.add_argument('--lr', type=float, default=0.01, metavar='LR', + help='learning rate (default: 0.01)') +parser.add_argument('--momentum', type=float, default=0, metavar='M', + help='SGD momentum (default: 0)') +parser.add_argument('--no-cuda', action='store_true', default=False, + help='disables CUDA training') +parser.add_argument('--seed', type=int, default=1, metavar='S', + help='random seed (default: 1)') +parser.add_argument('--log-interval', type=int, default=10, metavar='N', + help='how many batches to wait before logging training status') +args = parser.parse_args() +args.cuda = not args.no_cuda and torch.cuda.is_available() + +torch.manual_seed(args.seed) +if args.cuda: + torch.cuda.manual_seed(args.seed) + + +kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {} +train_loader = torch.utils.data.DataLoader( + datasets.MNIST('../data', train=True, download=True, + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])), + batch_size=args.batch_size, shuffle=True, **kwargs) +test_loader = torch.utils.data.DataLoader( + datasets.MNIST('../data', train=False, transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])), + batch_size=args.batch_size, shuffle=True, **kwargs) + + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x) + +model = torchmodel.MVTorchModel(Net()) + +if args.cuda: + model.cuda() + +optimizer = optim.SGD(model.parameters(), lr=args.lr * mv.workers_num(), momentum=args.momentum) + +def train(epoch): + model.train() + for batch_idx, (data, target) in enumerate(train_loader): + if batch_idx % mv.workers_num() == mv.worker_id(): + if args.cuda: + data, target = data.cuda(), target.cuda() + data, target = Variable(data), Variable(target) + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target) + loss.backward() + optimizer.step() + + model.cpu() + model.mv_sync() + model.cuda() + + if (batch_idx/mv.workers_num()) % args.log_interval == 0: + print('Worker: {}\tTrain Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( + mv.worker_id(), epoch, batch_idx * len(data), len(train_loader.dataset), + 100. * batch_idx / len(train_loader), loss.data[0])) + +def test(epoch): + model.eval() + test_loss = 0 + correct = 0 + for data, target in test_loader: + if args.cuda: + data, target = data.cuda(), target.cuda() + data, target = Variable(data, volatile=True), Variable(target) + output = model(data) + test_loss += F.nll_loss(output, target).data[0] + pred = output.data.max(1)[1] # get the index of the max log-probability + correct += pred.eq(target.data).cpu().sum() + + test_loss = test_loss + test_loss /= len(test_loader) # loss function already averages over batch size + print('\nWorker: {}\tTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( + mv.worker_id(), test_loss, correct, len(test_loader.dataset), + 100. * correct / len(test_loader.dataset))) + + +for epoch in range(1, args.epochs + 1): + train(epoch) + test(epoch) + +mv.shutdown() \ No newline at end of file diff --git a/binding/python/multiverso/api.py b/binding/python/multiverso/api.py index 849e51b..0fd8081 100644 --- a/binding/python/multiverso/api.py +++ b/binding/python/multiverso/api.py @@ -9,7 +9,7 @@ mv_lib = Loader.get_lib() -def init(sync=False): +def init(sync=False, updater=None): '''Initialize mutliverso. This should be called only once before training at the beginning of the @@ -29,6 +29,8 @@ def init(sync=False): args = [b""] # the first argument will be ignored. So we put a placeholder here if sync: args.append(b"-sync=true") + if updater: + args.append(b"-updater_type="+updater) n = len(args) args_type = ctypes.c_char_p * n mv_lib.MV_Init(ctypes.pointer(ctypes.c_int(n)), args_type(*[ctypes.c_char_p(arg) for arg in args])) diff --git a/binding/python/multiverso/tables.py b/binding/python/multiverso/tables.py index 4ec312b..757a40f 100644 --- a/binding/python/multiverso/tables.py +++ b/binding/python/multiverso/tables.py @@ -65,7 +65,7 @@ def get(self): mv_lib.MV_GetArrayTable(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size) return data - def add(self, data, sync=False): + def add(self, data, sync=False, lr=0.1, mom=0.0, rho=0.0, lam=0.0): '''add the data to the multiverso ArrayTable Data type of `data` is numpy.ndarray with one-dimensional @@ -76,9 +76,13 @@ def add(self, data, sync=False): data = convert_data(data) assert(data.size == self._size) if sync: - mv_lib.MV_AddArrayTable(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size) + # mv_lib.MV_AddArrayTable(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size) + mv_lib.MV_AddArrayTableOption(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size, + ctypes.c_float(lr), ctypes.c_float(mom), ctypes.c_float(rho), ctypes.c_float(lam)) else: - mv_lib.MV_AddAsyncArrayTable(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size) + # mv_lib.MV_AddAsyncArrayTable(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size) + mv_lib.MV_AddAsyncArrayTableOption(self._handler, data.ctypes.data_as(C_FLOAT_P), self._size, + ctypes.c_float(lr), ctypes.c_float(mom), ctypes.c_float(rho), ctypes.c_float(lam)) class MatrixTableHandler(TableHandler): diff --git a/binding/python/multiverso/torch_ext/__init__.py b/binding/python/multiverso/torch_ext/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/binding/python/multiverso/torch_ext/torchmodel.py b/binding/python/multiverso/torch_ext/torchmodel.py new file mode 100644 index 0000000..23ef007 --- /dev/null +++ b/binding/python/multiverso/torch_ext/torchmodel.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python +# coding:utf8 + +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +from torchvision import datasets, transforms +from torch.autograd import Variable + +import numpy as np +import multiverso as mv + + +class MVTorchModel(object): + def __init__(self, tmobj): + assert(isinstance(tmobj, nn.Module)) + self._tmobj = tmobj + self._mv_params=[] + for param in self._tmobj.parameters(): + self._mv_params.append(mv.ArrayTableHandler(param.data.numpy().size, param.data.numpy().reshape((-1,)))) + mv.barrier() + self._last_mv_params=[] + for mv_param in self._mv_params: + self._last_mv_params.append(mv_param.get()) + for param, last_mv_param in zip(self._tmobj.parameters(), self._last_mv_params): + param.data=torch.from_numpy(last_mv_param.reshape(param.data.numpy().shape)) + + def mv_sync(self, lr=0.1, mom=0.0, rho=0.0, lam=0.0): + for mv_param, last_mv_param, param in zip(self._mv_params, self._last_mv_params, self._tmobj.parameters()): + mv_param.add(last_mv_param - param.data.numpy().reshape((-1,)), lr=lr, mom=mom, rho=rho, lam=lam) + + for i, (mv_param, last_mv_param, param) in enumerate(zip(self._mv_params, self._last_mv_params, self._tmobj.parameters())): + self._last_mv_params[i]=mv_param.get() + param.data=torch.from_numpy(self._last_mv_params[i].reshape(param.data.numpy().shape)) + + def __call__(self, *args, **kwargs): + return self._tmobj(*args, **kwargs) + + def __getstate__(self): + odict = self.__dict__.copy() + del odict['_mv_params'] + return odict + + def __getattribute__(self, attr): + if attr in ['_tmobj', '_mv_params', '_last_mv_params']: + return object.__getattribute__(self, attr) + elif attr in ['mv_sync', '__call__','__getstate__']: + return getattr(MVTorchModel, attr).__get__(self) + else: + return getattr(self._tmobj, attr) diff --git a/binding/python/setup.py b/binding/python/setup.py index f7b5604..a7108c3 100644 --- a/binding/python/setup.py +++ b/binding/python/setup.py @@ -16,7 +16,7 @@ def readme(): url='https://github.com/Microsoft/multiverso', author='Microsoft', license='MIT', - packages=['multiverso', 'multiverso.theano_ext', 'multiverso.theano_ext.lasagne_ext'], + packages=['multiverso', 'multiverso.torch_ext', 'multiverso.theano_ext', 'multiverso.theano_ext.lasagne_ext'], # TODO: The lasagne on pypi is too old. multiverso need some functions in # lasagne-0.2 which is not released yet. Please replace the dev version # with the stable release later. diff --git a/include/multiverso/c_api.h b/include/multiverso/c_api.h index 3a566b6..d9abb9b 100644 --- a/include/multiverso/c_api.h +++ b/include/multiverso/c_api.h @@ -32,8 +32,11 @@ DllExport void MV_GetArrayTable(TableHandler handler, float* data, int size); DllExport void MV_AddArrayTable(TableHandler handler, float* data, int size); +DllExport void MV_AddArrayTableOption(TableHandler handler, float* data, int size, float lr, float mom, float rho, float lambda); + DllExport void MV_AddAsyncArrayTable(TableHandler handler, float* data, int size); +DllExport void MV_AddAsyncArrayTableOption(TableHandler handler, float* data, int size, float lr, float mom, float rho, float lambda); // Matrix Table DllExport void MV_NewMatrixTable(int num_row, int num_col, TableHandler* out); diff --git a/include/multiverso/updater/dcasgd b/include/multiverso/updater/dcasgd index 4bdd2c3..ab8dab6 160000 --- a/include/multiverso/updater/dcasgd +++ b/include/multiverso/updater/dcasgd @@ -1 +1 @@ -Subproject commit 4bdd2c38702be6f443f9540f9d76d5190cf13e06 +Subproject commit ab8dab629c725cdf322534098cc838c6a0aa86e1 diff --git a/include/multiverso/updater/sgd_updater.h b/include/multiverso/updater/sgd_updater.h index 922165d..fa0edc8 100644 --- a/include/multiverso/updater/sgd_updater.h +++ b/include/multiverso/updater/sgd_updater.h @@ -9,7 +9,7 @@ template class SGDUpdater : public Updater { public: explicit SGDUpdater(size_t){ - Log::Debug("[SGDUpdater] Init. \n"); + Log::Debug("[SGDUpdater] Init. \n"); } void Update(size_t num_element, T* data, T* delta, AddOption*, size_t offset) override { @@ -28,4 +28,4 @@ class SGDUpdater : public Updater { } -#endif // MULTIVERSO_UPDATER_ASGD_UPDATER_H_ \ No newline at end of file +#endif // MULTIVERSO_UPDATER_ASGD_UPDATER_H_ diff --git a/src/c_api.cpp b/src/c_api.cpp index a952ff1..56122b6 100644 --- a/src/c_api.cpp +++ b/src/c_api.cpp @@ -4,6 +4,7 @@ #include "multiverso/table/array_table.h" #include "multiverso/table/matrix_table.h" #include "multiverso/util/log.h" +#include "multiverso/updater/updater.h" extern "C" { @@ -46,11 +47,32 @@ void MV_AddArrayTable(TableHandler handler, float* data, int size) { worker->Add(data, size); } +void MV_AddArrayTableOption(TableHandler handler, float* data, int size, float lr, float mom, float rho, float lambda) { + auto worker = reinterpret_cast*>(handler); + multiverso::AddOption option; + option.set_worker_id(multiverso::MV_WorkerId()); + option.set_learning_rate(lr); + option.set_momentum(mom); + option.set_rho(rho); + option.set_lambda(lambda); + worker->Add(data, size, &option); +} + void MV_AddAsyncArrayTable(TableHandler handler, float* data, int size) { auto worker = reinterpret_cast*>(handler); worker->AddAsync(data, size); } +void MV_AddAsyncArrayTableOption(TableHandler handler, float* data, int size, float lr, float mom, float rho, float lambda) { + auto worker = reinterpret_cast*>(handler); + multiverso::AddOption option; + option.set_worker_id(multiverso::MV_WorkerId()); + option.set_learning_rate(lr); + option.set_momentum(mom); + option.set_rho(rho); + option.set_lambda(lambda); + worker->AddAsync(data, size, &option); +} // MatrixTable void MV_NewMatrixTable(int num_row, int num_col, TableHandler* out) { diff --git a/src/updater/updater.cpp b/src/updater/updater.cpp index bfd7d66..3f847ba 100644 --- a/src/updater/updater.cpp +++ b/src/updater/updater.cpp @@ -16,9 +16,6 @@ namespace multiverso { MV_DEFINE_string(updater_type, "default", "multiverso server updater type"); MV_DEFINE_int(omp_threads, 4 , "#theads used by openMP for updater"); -#ifdef ENABLE_DCASGD -MV_DEFINE_bool(is_pipelined, false, "Only used for CNTK - DCASGD"); -#endif template void Updater::Update(size_t num_element, T* data, T* delta, @@ -51,9 +48,10 @@ Updater* Updater::GetUpdater(size_t size) { if (type == "adagrad") return new AdaGradUpdater(size); if (type == "momentum_sgd") return new MomentumUpdater(size); #ifdef ENABLE_DCASGD - if (type == "dcasgd") return new DCASGDUpdater(size, MV_CONFIG_is_pipelined); + if (type == "dcasgd") return new DCASGDUpdater(size); #endif // Default: simple updater + Log::Info("[Updater] Init. \n"); return new Updater(); }