From bd89a7a2ff0ca3fba2f675acecb5092d3ca6cf47 Mon Sep 17 00:00:00 2001 From: sIncerass Date: Tue, 22 Jun 2021 14:29:10 -0700 Subject: [PATCH] multiclass_logreg --- nums/models/lbfgs.py | 234 +++++++++++++++++++++++++++ nums/models/multinomial_lr.py | 239 ++++++++++++++++++++++++++++ tests/models/test_multinomial_lr.py | 65 ++++++++ 3 files changed, 538 insertions(+) create mode 100644 nums/models/lbfgs.py create mode 100644 nums/models/multinomial_lr.py create mode 100644 tests/models/test_multinomial_lr.py diff --git a/nums/models/lbfgs.py b/nums/models/lbfgs.py new file mode 100644 index 00000000..d630d617 --- /dev/null +++ b/nums/models/lbfgs.py @@ -0,0 +1,234 @@ +# originally from https://github.com/elibol/nums-1/blob/plan_support/nums/experimental/lbfgs.py +# modified for numerical stability and multiple classes + +import time +from typing import List, Union + +import numpy as np +import ray + +from nums.core import settings +from nums.core.array.application import ArrayApplication +from nums.core.application_manager import instance +from nums import numpy as nps + + +def forward(app, X, theta): + if X.shape[1] < theta.shape[0]: + assert X.shape[1] + 1 == theta.shape[0] + eta = theta[-1] + X @ theta[:-1] + else: + eta = X @ theta + eta = eta - app.max(eta, axis=1).expand_dims(-1) + unnormalized_probs = app.exp(eta) + mu = unnormalized_probs / app.sum(unnormalized_probs, axis=1).expand_dims(-1) + # print('mu', mu.get()[0]) + return mu # probabilities for each class + + +def objective(app: ArrayApplication, y, mu): + # neg log likelihood of correct class. y is an array of onehots + return - app.sum(y * app.log(mu + 1e-10)) + # one = app.one + # return - app.sum(y * app.log(mu) + (one - y) * app.log(one - mu + 1e-14)) + + +def grad(X, y, mu): + return X.T @ (mu - y) # this is still correct + + +def bt_linesearch(app, + X, y, theta, + grad, p, + rho=1.e-1, init_alpha=1.0, c=1e-4, min_alpha=1e-10): + + def f(theta_prime): + return objective(app, y, forward(app, X, theta_prime)) + + alpha = init_alpha + f_val = f(theta) + f_next = f(theta + alpha * p) + # print(f_val.get(), f_next.get()) + while nps.isnan(f_next) or f_next > f_val + c * alpha * app.sum(grad * p): + alpha *= rho + if alpha < min_alpha: + return min_alpha + # print("btls step alpha=%s" % alpha) + f_next = f(theta + alpha * p) + return alpha + + +class LBFGSMemory(object): + + def __init__(self, app, k, s, y): + self.k = k + self.s = s + self.y = y + # print('s', self.s.get()) + # print('y', self.y.get()) + ys_inner = app.sum(s * y) + self.rho = 1.0 / ys_inner + # print('rho', self.rho.get()) + self.gamma = ys_inner / (app.sum(y * y)) + # print('gamma', self.gamma.get()) + + +class LBFGS(object): + + def __init__(self, app: ArrayApplication, + m=3, max_iter=100, thresh=1e-5, dtype=np.float32): + self.app: ArrayApplication = app + self.m = m + self.max_iter = max_iter + self.thresh = thresh + self.dtype = dtype + self.k = 0 + self.identity = None + self.memory: Union[List[LBFGSMemory], List[None]] = [None]*m + + def get_H(self): + if self.k == 0: + return self.identity + else: + mem: LBFGSMemory = self.memory[-1] + assert mem.k == self.k-1 + return mem.gamma * self.identity + + def get_p(self, H, g): + q = g + forward_vars = [] + for i in range(-1, -self.m-1, -1): + mem_i: LBFGSMemory = self.memory[i] + if mem_i is None: + break + alpha = mem_i.rho * self.app.sum(mem_i.s * q) + q -= alpha * mem_i.y + forward_vars.insert(0, (alpha, mem_i)) + r = H @ q + for alpha, mem_i in forward_vars: + beta = mem_i.rho * self.app.sum(mem_i.y * r) + r += mem_i.s * (alpha - beta) + return r + + def execute(self, X, y, theta): + + if self.k != 0: + raise Exception("Unexpected state.") + + self.identity = self.app.eye((X.shape[1], X.shape[1]), + (X.block_shape[1], X.block_shape[1]), + dtype=self.dtype) + + # TODO: Try sampling a new block every iteration. + # TODO: Try stochastic approach, given line search... + # X_btls = X[:X.block_shape[0]] + # y_btls = y[:y.block_shape[0]] + X_btls = X + y_btls = y + + g = grad(X, y, forward(self.app, X, theta)) + next_g = None + next_theta = None + while self.k < self.max_iter: + # print('iter', self.k) + H = self.get_H() + p = - self.get_p(H, g) + init_alpha = 1 + + alpha = bt_linesearch(self.app, X_btls, y_btls, + theta, g, p, + rho=0.1, + init_alpha=init_alpha, + c=1e-4, + min_alpha=1e-10) + # print("alpha", alpha) + + next_theta = theta + alpha * p + if self.k + 1 >= self.max_iter: + # Terminate immediately if this is the last iteration. + theta = next_theta + break + next_g = grad(X, y, forward(self.app, X, next_theta)) + theta_diff = next_theta - theta + grad_diff = next_g - g + mem: LBFGSMemory = LBFGSMemory(self.app, k=self.k, s=theta_diff, y=grad_diff) + self.memory.append(mem) + self.memory.pop(0) + self.k += 1 + theta = next_theta + g = next_g + # print('theta', theta.get()) + if self.app.max(self.app.abs(g)) <= self.thresh: + # if self.converged(next_g): + break + + # Reset vars. + self.k = 0 + self.identity = None + self.memory: Union[List[LBFGSMemory], List[None]] = [None]*self.m + + return theta + + def converged(self, g): + return self.app.sqrt(self.app.sum(g * g)) < self.thresh + + +def logistic(app, X, y, max_iter, m): + Xc = app.concatenate([X, app.ones(shape=(X.shape[0], 1), + block_shape=(X.block_shape[0], 1), + dtype=X.dtype)], + axis=1, + axis_block_size=X.block_shape[1]) + theta = app.zeros((Xc.shape[1],), (Xc.block_shape[1],), dtype=Xc.dtype) + lbfgs_optimizer = LBFGS(app, m=m, max_iter=max_iter, dtype=Xc.dtype) + theta = lbfgs_optimizer.execute(Xc, y, theta) + return forward(app, Xc, theta) + + +def sample_set(app: ArrayApplication): + shape = (500, 10) + block_shape = (100, 10) + rs = app.random_state(1337) + X1 = rs.normal(loc=5.0, shape=shape, block_shape=block_shape) + y1 = app.zeros(shape=(shape[0],), block_shape=(block_shape[0],), dtype=int) + X2 = rs.normal(loc=10.0, shape=shape, block_shape=block_shape) + y2 = app.ones(shape=(shape[0],), block_shape=(block_shape[0],), dtype=int) + X = app.concatenate([X1, X2], axis=0) + y = app.concatenate([y1, y2], axis=0) + return X, y + + +def load_set(app: ArrayApplication, read_func, dataset): + X = read_func("%s_X" % dataset) + y = read_func("%s_y" % dataset) + return X, y + + +def execute(dataset, cluster_shape, address, use_s3): + + settings.cluster_shape = cluster_shape + ray.init(address=address) + app: ArrayApplication = instance() + time.sleep(0.1) + + start_time = time.time() + read_func = app.read_s3 if use_s3 else app.read_fs + # X, y = load_set(app, read_func, dataset) + X, y = sample_set(app) + y_pred_proba = logistic(app, X, y, max_iter=10, m=3) + print("scheduling submitted.") + y_pred = (y_pred_proba > 0.5).astype(np.float32) + print("prediction submitted.") + error = (app.sum(app.abs(y - y_pred)) / X.shape[0]).astype(np.float32).get() + total_time = time.time() - start_time + print("opt", "lbfgs") + print("total time", total_time) + print("error (1-accuracy)", error) + # print("norm", model.grad_norm_sq(X, y).get()) + # print("objective", model.objective(X, y).get()) + return total_time, float(error) + + +if __name__ == "__main__": + execute(dataset=None, cluster_shape=(1, 1), + address=None, use_s3=False) diff --git a/nums/models/multinomial_lr.py b/nums/models/multinomial_lr.py new file mode 100644 index 00000000..e4c992b1 --- /dev/null +++ b/nums/models/multinomial_lr.py @@ -0,0 +1,239 @@ +import numpy as np +import random + +from nums.core.array.blockarray import BlockArray +from nums.core.array.application import ArrayApplication + +from nums.core.application_manager import instance as _instance +from nums.core.array import utils as array_utils +from nums.core.array.random import NumsRandomState +from collections import defaultdict +from nums import numpy as nps + +from .lbfgs import LBFGS, forward + + +class MultinomialLogisticRegression(object): + + def __init__(self, + penalty='none', + C=1.0, + tol=0.0001, + max_iter=100, + solver="newton-cg", + lr=0.01, + m=3, + random_state=None, + fit_intercept=True, + normalize=False): + + if fit_intercept is False: + raise NotImplementedError("fit_incercept=False currently not supported.") + if normalize is True: + raise NotImplementedError("normalize=True currently not supported.") + + self._app = _instance() + if random_state is None: + self.rs: NumsRandomState = self._app.random + elif array_utils.is_int(random_state): + self.rs: NumsRandomState = NumsRandomState(system=self._app.system, seed=random_state) + elif isinstance(random_state, NumsRandomState): + self.rs: NumsRandomState = random_state + else: + raise Exception("Unexpected type for random_state %s" % str(type(random_state))) + self._penalty = None if penalty == 'none' else penalty + if not (self._penalty is None or self._penalty == "l2"): + raise NotImplementedError("%s penalty not supported" % self._penalty) + self._lambda = 1.0 / C + self._lambda_vec = None + self._tol = tol + self._max_iter = max_iter + self._opt = solver + self._lr = lr + self._m = m + self._beta = None + self._beta0 = None + + def fit(self, X: BlockArray, y: BlockArray): + # Note, it's critically important from a performance point-of-view + # to maintain the original block shape of X below, along axis 1. + # Otherwise, the concatenation operation will not construct the new X + # by referencing X's existing blocks. + # TODO: Option to do concat. + # TODO: Provide support for batching. + X = self._app.concatenate([X, self._app.ones(shape=(X.shape[0], 1), + block_shape=(X.block_shape[0], 1), + dtype=X.dtype)], + axis=1, + axis_block_size=X.block_shape[1]) + assert len(X.shape) == 2 and len(y.shape) == 2, "X must be a 2D matrix and Y must be one-hot" + self._num_class = y.shape[1] + + self.feature_dim = X.shape[1] + self.feature_block_dim = X.block_shape[1] + + beta: BlockArray = self._app.zeros((X.shape[1], self._num_class), (X.block_shape[1], self._num_class), dtype=float) + tol: BlockArray = self._app.scalar(self._tol) + max_iter: int = self._max_iter + self.use_lbfgs_forward = False + if self._penalty == "l2": + self._lambda_vec = self._app.ones(beta.shape, + beta.block_shape, + beta.dtype) * self._lambda + if self._opt == "gd" or self._opt == "sgd" or self._opt == "block_sgd": + lr: BlockArray = self._app.scalar(self._lr) + if self._opt == "gd": + beta = gd(self, beta, X, y, tol, max_iter, lr) + elif self._opt == "sgd": + beta = sgd(self, beta, X, y, tol, max_iter, lr) + else: + beta = block_sgd(self, beta, X, y, tol, max_iter, lr) + elif self._opt == "newton" or self._opt == "newton-cg": + beta = newton(self._app, self, beta, X, y, tol, max_iter) + if self._penalty == 'l2': + self._lambda_id = self._app.eye((self.feature_dim, self.feature_dim), + block_shape=(self.feature_block_dim, self.feature_block_dim)) \ + * self._lambda + elif self._opt == "lbfgs": + self.use_lbfgs_forward = True + lbfgs_optimizer = LBFGS(self._app, m=self._m, max_iter=max_iter, thresh=self._tol, dtype=X.dtype) + beta = lbfgs_optimizer.execute(X, y, beta) + self.beta = beta + else: + raise Exception("Unsupported optimizer specified %s." % self._opt) + self._beta0 = beta[-1] + self._beta = beta[:-1] + + def forward(self, X, beta=None): + if self.use_lbfgs_forward: + return forward(self._app, X, self.beta) + if beta: + return self.link_inv(X @ beta) + return self.link_inv(self._beta0 + X @ self._beta) + + def link_inv(self, eta: BlockArray): + def truncate(x, maximum): + masked = (x - maximum) > 0 + return x * (1 - masked) + maximum * masked + return self._app.one / (self._app.one + self._app.exp(truncate(-eta, 10))) + + def gradient(self, X: BlockArray, y: BlockArray, + mu: BlockArray = None, beta: BlockArray = None): + if mu is None: + mu = self.forward(X) + if self._penalty is None: + return X.T @ (mu - y) + else: + assert beta is not None + return X.T @ (mu - y) + self._lambda_vec * beta + + def hessian(self, X: BlockArray, y: BlockArray, mu: BlockArray = None, learning_ends_for_class=None): + class_count = mu.shape[1] + if mu is None: + mu = self.forward(X) + if learning_ends_for_class is None: + learning_ends_for_class = [False for _ in range(class_count)] + + dim, block_dim = mu.shape[0], mu.block_shape[0] + s = mu * (self._app.one - mu) + if self._penalty is None: + return [(X.T @ (s[:, class_idx].reshape((dim, 1), block_shape=(block_dim, 1)) * X)) if not learning_ends_for_class[class_idx] else None + for class_idx in range(class_count)] + else: + return [(X.T @ (s[:, class_idx].reshape((dim, 1), block_shape=(block_dim, 1)) * X) + self._lambda_id) if not learning_ends_for_class[class_idx] else None + for class_idx in range(class_count)] + + def grad_norm_sq(self, X: BlockArray, y: BlockArray, beta=None): + g = self.gradient(X, y, self.forward(X, beta), beta=beta) + return self._app.sum(g * g) + + def predict(self, X: BlockArray): + pred = self.forward(X).get() + return np.argmax(pred, axis=-1) + + +def sgd(model: MultinomialLogisticRegression, beta, + X: BlockArray, y: BlockArray, + tol: BlockArray, max_iter: int, lr: BlockArray): + # Classic SGD. + app = _instance() + for _ in range(max_iter): + # Sample an entry uniformly at random. + idx = model.rs.numpy().integers(X.shape[0]) + X_sample, y_sample = X[idx:idx+1], y[idx:idx+1] + mu = model.forward(X_sample, beta) + g = model.gradient(X_sample, y_sample, mu, beta=beta) + beta += - lr * g + if app.max(app.abs(g)) <= tol: + # sklearn uses max instead of l2 norm. + break + return beta + + +def block_sgd(model: MultinomialLogisticRegression, beta, + X: BlockArray, y: BlockArray, + tol: BlockArray, max_iter: int, lr: BlockArray): + # SGD with batches equal to block shape along first axis. + app = _instance() + for _ in range(max_iter): + for (start, stop) in X.grid.grid_slices[0]: + X_batch, y_batch = X[start:stop], y[start:stop] + bsize = X_batch.shape[0] + mu = model.forward(X_batch, beta) + g = model.gradient(X_batch, y_batch, mu, beta=beta) + beta += - lr * g / bsize + if app.max(app.abs(g)) <= tol: + return beta + return beta + + +def gd(model: MultinomialLogisticRegression, beta, + X: BlockArray, y: BlockArray, + tol: BlockArray, max_iter: int, lr: BlockArray): + app = _instance() + + for _ in range(max_iter): + mu = model.forward(X, beta) + g = model.gradient(X, y, mu, beta=beta) + beta += - lr * g + if app.max(app.abs(g)) <= tol: + break + return beta + + +def newton(app: ArrayApplication, model: MultinomialLogisticRegression, beta, + X: BlockArray, y: BlockArray, + tol: BlockArray, max_iter: int): + num_classes = y.shape[1] + learning_ends_for_class = [False for _ in range(num_classes)] + + opt_count = [0 for _ in range(num_classes)] + for _ in range(max_iter): + + mu: BlockArray = model.forward(X, beta) + g = model.gradient(X, y, mu, beta=beta) + + hessians = model.hessian(X, y, mu, learning_ends_for_class) + + class_count = g.shape[1] + + for class_idx in range(class_count): + if learning_ends_for_class[class_idx]: + continue + opt_count[class_idx] += 1 + # These are PSD, but inv is faster than psd inv. + + h = hessians[class_idx] + stable_h = h + app.eye(h.shape, h.block_shape) * 1e-6 + invert_stable_h = app.inv(stable_h) + + step = - invert_stable_h @ g[:, class_idx] + beta[:, class_idx] += step # - invert_stable_h @ g[:,class_idx] + + if app.max(app.abs(g[:, class_idx])) <= tol: + learning_ends_for_class[class_idx] = True + + # learning ends if all class finishes + if all(learning_ends_for_class): + break + return beta \ No newline at end of file diff --git a/tests/models/test_multinomial_lr.py b/tests/models/test_multinomial_lr.py new file mode 100644 index 00000000..704f841f --- /dev/null +++ b/tests/models/test_multinomial_lr.py @@ -0,0 +1,65 @@ +# coding=utf-8 +# Copyright (C) 2020 NumS Development Team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import time + +import numpy as np +import pytest + +from sklearn.datasets import load_iris + +from nums.core.array.application import ArrayApplication +from nums.core.storage.storage import BimodalGaussian +from nums.models.multinomial_lr import MultinomialLogisticRegression + +# pylint: disable = protected-access, import-outside-toplevel, import-error + +def test_multinomial_logistic(nps_app_inst: ArrayApplication): + data = load_iris() + real_X = data['data'] + real_y_indices = data['target'] + num_samples, num_features, num_classes = real_X.shape[0], real_X.shape[1], real_y_indices.max()+1 + real_y = np.zeros((num_samples, num_classes)) + real_y[np.arange(num_samples), real_y_indices] = 1 # make it a onehot + X = nps_app_inst.array(real_X, block_shape=(100, 3)) + y = nps_app_inst.array(real_y, block_shape=(100, 3)) # TODO block shape? iris is 3 classes, and we seem to crash when using less than 3 here. + param_set = [ + {"solver": "gd", "lr": 1e-6, "tol": 1e-8, "max_iter": 10}, + {"solver": "sgd", "lr": 1e-6, "tol": 1e-8, "max_iter": 10}, + {"solver": "block_sgd", "lr": 1e-6, "tol": 1e-8, "max_iter": 10}, + {"solver": "newton", "tol": 1e-8, "max_iter": 10}, + {"solver": "lbfgs", "tol": 1e-8, "max_iter": 10, "m": 3} + ] + for kwargs in param_set: + runtime = time.time() + lr_model: MultinomialLogisticRegression = MultinomialLogisticRegression(**kwargs) + lr_model.fit(X, y) + runtime = time.time() - runtime + y_pred = lr_model.predict(X)#.get() TODO we should return a nums object not np + # y_pred_proba = lr_model.predict_proba(X).get() # TODO this isn't implemented atm. does it make sense to implement? + # np.allclose(np.ones(shape=(y.shape[0],)), y_pred_proba[:, 0] + y_pred_proba[:, 1]) # TODO not sure if we need this line + print("opt", kwargs["solver"]) + print("runtime", runtime) + # print("norm", lr_model.grad_norm_sq(X, y).get()) # TODO does this matter? + # print("objective", lr_model.objective(X, y).get()) # TODO we don't have this function implemented + print("accuracy", np.sum(y.get().argmax(axis=1) == y_pred)/num_samples) + + +if __name__ == "__main__": + # pylint: disable=import-error + from nums.core import application_manager + nps_app_inst = application_manager.instance() + test_multinomial_logistic(nps_app_inst)