From ad4b5c9588f16655c07d16b460e2b15119f136f0 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Mon, 20 Apr 2015 17:08:40 +0800 Subject: [PATCH 01/31] MIS of LR with Vector. forward func added. --- .../classification/LogisticRegression.scala | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala new file mode 100644 index 00000000..40f54fa1 --- /dev/null +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.cloudml.zen.ml.classification + +import breeze.numerics.exp +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD +import com.github.cloudml.zen.ml.linalg.BLAS.dot + +class LogisticRegressionMIS ( + private var stepSize: Double, + private var numIterations: Int, + private var regParm: Double, + private var miniBatchFraction: Double) +extends Logging with Serializable{ + /** + * Construct a LogisticRegression object with default parameters: {stepSize: 1.0, + * numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}. + */ + def this() = this(1.0, 100, 0.01, 1.0) + + /** + * Calculate the mistake probability: q(i) = 1/(1+exp(yi*(w*xi))). + * @param initialWeights weights of last iteration. + * @param dataSet + */ + protected[ml] def forward(initialWeights: Vector, dataSet: RDD[LabeledPoint]): RDD[(Double, + Double)] = { + dataSet.map{point => + val ywx = point.label * dot(initialWeights, point.features) + (point.label, 1.0 / (1.0 + exp(ywx))) + } + } + + /** + * Calculate the change in weights. wj = log(mu_j_+/mu_j_-) + * @param misProb q(i) = 1/(1+exp(yi*(w*xi))). + * @param dataSet + */ + protected[ml] def backward(misProb: RDD[(Double, Double)], dataSet: RDD[LabeledPoint]) = { + + } +} From 3ba09be2f83ce18539b416c7f6fc4075d181592d Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Tue, 21 Apr 2015 10:13:08 +0800 Subject: [PATCH 02/31] MIS of LR with Vectors. backward added. --- .../classification/LogisticRegression.scala | 48 ++++++++++++++++--- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 40f54fa1..e8c2f276 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -18,10 +18,12 @@ package com.github.cloudml.zen.ml.classification import breeze.numerics.exp import org.apache.spark.Logging -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD import com.github.cloudml.zen.ml.linalg.BLAS.dot +import com.github.cloudml.zen.ml.linalg.BLAS.scal +import com.github.cloudml.zen.ml.linalg.BLAS.axpy class LogisticRegressionMIS ( private var stepSize: Double, @@ -35,16 +37,26 @@ extends Logging with Serializable{ */ def this() = this(1.0, 100, 0.01, 1.0) + private var epsilon = 1e-4 + + /** + * Set smooth parameter. + * @param eps parameter for smooth, default 1e-4. + * @return + */ + def setEpsilon(eps: Double): this.type = { + epsilon = eps + this + } /** * Calculate the mistake probability: q(i) = 1/(1+exp(yi*(w*xi))). * @param initialWeights weights of last iteration. * @param dataSet */ - protected[ml] def forward(initialWeights: Vector, dataSet: RDD[LabeledPoint]): RDD[(Double, - Double)] = { + protected[ml] def forward(initialWeights: Vector, dataSet: RDD[LabeledPoint]): RDD[Double] = { dataSet.map{point => val ywx = point.label * dot(initialWeights, point.features) - (point.label, 1.0 / (1.0 + exp(ywx))) + 1.0 / (1.0 + exp(ywx)) } } @@ -53,7 +65,31 @@ extends Logging with Serializable{ * @param misProb q(i) = 1/(1+exp(yi*(w*xi))). * @param dataSet */ - protected[ml] def backward(misProb: RDD[(Double, Double)], dataSet: RDD[LabeledPoint]) = { - + protected[ml] def backward(misProb: RDD[Double], dataSet: RDD[LabeledPoint], numFeatures: Int): + Array[Double] = { + def func(v1: Vector, v2: Vector) = { + axpy(1.0, v1, v2) + v2 + } + val muArr: Array[(Double, Vector)] = dataSet.zip(misProb).map { + case (point, prob) => + val scaledFeatures = Vectors.zeros(numFeatures) + axpy(prob, point.features, scaledFeatures) + (point.label, scaledFeatures) + }.aggregateByKey(Vectors.zeros(numFeatures))(func, func).collect() + assert(muArr.length == 2) + val grads: Array[Double] = new Array[Double](numFeatures) + val muPlus: Array[Double] = {if (muArr(0)._1 > 0) muArr(0)._2 else muArr(1)._2}.toArray + val muMinus: Array[Double] = {if (muArr(0)._1 < 0) muArr(0)._2 else muArr(1)._2}.toArray + var i = 0 + while (i < numFeatures) { + grads(i) = if (epsilon == 0.0) { + math.log(muPlus(i) / muMinus(i)) + } else { + math.log(muPlus(i) / (epsilon + muMinus(i))) + } + i += 1 + } + grads } } From d1e579516272da3a6e3b8199ebca177237fe5340 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Tue, 21 Apr 2015 12:53:25 +0800 Subject: [PATCH 03/31] MIS LR with Vectors. Loss for Binary LR added. --- .../classification/LogisticRegression.scala | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index e8c2f276..40f145a2 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -17,12 +17,12 @@ package com.github.cloudml.zen.ml.classification import breeze.numerics.exp +import com.github.cloudml.zen.ml.util.Utils import org.apache.spark.Logging import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD import com.github.cloudml.zen.ml.linalg.BLAS.dot -import com.github.cloudml.zen.ml.linalg.BLAS.scal import com.github.cloudml.zen.ml.linalg.BLAS.axpy class LogisticRegressionMIS ( @@ -55,8 +55,8 @@ extends Logging with Serializable{ */ protected[ml] def forward(initialWeights: Vector, dataSet: RDD[LabeledPoint]): RDD[Double] = { dataSet.map{point => - val ywx = point.label * dot(initialWeights, point.features) - 1.0 / (1.0 + exp(ywx)) + val z = point.label * dot(initialWeights, point.features) + 1.0 / (1.0 + exp(z)) } } @@ -92,4 +92,23 @@ extends Logging with Serializable{ } grads } + + /** + * @param weights + * @param dataSet + * @return Loss of given weights and dataSet in one iteration. + */ + protected[ml] def loss(weights: Vector, dataSet: RDD[LabeledPoint]) : Double = { + // For Binary Logistic Regression + var lossSum = 0 + dataSet.foreach {point => + val margin = -1.0 * dot(point.features, weights) + if (point.label > 0) { + lossSum += Utils.log1pExp(margin) + } else { + lossSum += Utils.log1pExp(margin) - margin + } + } + lossSum + } } From f6530285c290ee6fa05d1424d84b91f2c743ead1 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Tue, 21 Apr 2015 13:50:42 +0800 Subject: [PATCH 04/31] MIS LR with Vectors. updateWeights added. --- .../classification/LogisticRegression.scala | 67 ++++++++++++++++--- 1 file changed, 58 insertions(+), 9 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 40f145a2..f9635f0a 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -24,21 +24,52 @@ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD import com.github.cloudml.zen.ml.linalg.BLAS.dot import com.github.cloudml.zen.ml.linalg.BLAS.axpy +import com.github.cloudml.zen.ml.linalg.BLAS.scal -class LogisticRegressionMIS ( - private var stepSize: Double, - private var numIterations: Int, - private var regParm: Double, - private var miniBatchFraction: Double) -extends Logging with Serializable{ +class LogisticRegressionMIS extends Logging with Serializable{ /** * Construct a LogisticRegression object with default parameters: {stepSize: 1.0, * numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}. */ def this() = this(1.0, 100, 0.01, 1.0) - private var epsilon = 1e-4 + private var epsilon: Double = 1e-4 + private var stepSize: Double = 1.0 + private var numIterations: Int = 100 + private var regParam: Double = 0.0 + private var miniBatchFraction: Double = 1.0 + /** + * Set the initial step size of SGD for the first step. Default 1.0. + * In subsequent steps, the step size will decrease with stepSize/sqrt(t) + */ + def setStepSize(stepSize: Double): this.type = { + this.stepSize = stepSize + this + } + /** + * Set fraction of data to be used for each SGD iteration. + * Default 1.0 (corresponding to deterministic/classical gradient descent) + */ + def setMiniBatchFraction(fraction: Double): this.type = { + this.miniBatchFraction = fraction + this + } + + /** + * Set the number of iterations for SGD. Default 100. + */ + def setNumIterations(iters: Int): this.type = { + this.numIterations = iters + this + } + /** + * Set the regularization parameter. Default 0.0. + */ + def setRegParam(regParam: Double): this.type = { + this.regParam = regParam + this + } /** * Set smooth parameter. * @param eps parameter for smooth, default 1e-4. @@ -66,7 +97,7 @@ extends Logging with Serializable{ * @param dataSet */ protected[ml] def backward(misProb: RDD[Double], dataSet: RDD[LabeledPoint], numFeatures: Int): - Array[Double] = { + Vector = { def func(v1: Vector, v2: Vector) = { axpy(1.0, v1, v2) v2 @@ -90,9 +121,27 @@ extends Logging with Serializable{ } i += 1 } - grads + Vectors.dense(grads) + } + + /** + * delta = stepSize * grad + * @param iter + * @param grads + */ + protected[ml] def updateGradients(iter: Int, grads: Vector): Unit = { + val thisIterStepSize = stepSize / math.sqrt(iter) + scal(thisIterStepSize, grads) } + /** + * Update weights + * @param initialWeights + * @param delta + */ + protected[ml] def updateWeights(initialWeights: Vector, delta: Vector): Unit = { + axpy(1.0, delta, initialWeights) + } /** * @param weights * @param dataSet From 77585fd2d2a19b5c6780dad58c7d0028964f84c0 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Tue, 21 Apr 2015 14:37:22 +0800 Subject: [PATCH 05/31] MIS LR with Vectors. updateWeights added. --- .../zen/ml/classification/LogisticRegression.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index f9635f0a..27967d29 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -129,18 +129,20 @@ class LogisticRegressionMIS extends Logging with Serializable{ * @param iter * @param grads */ - protected[ml] def updateGradients(iter: Int, grads: Vector): Unit = { + protected[ml] def updateGradients(iter: Int, grads: Vector): Vector = { val thisIterStepSize = stepSize / math.sqrt(iter) scal(thisIterStepSize, grads) + grads } /** * Update weights - * @param initialWeights + * @param weights * @param delta */ - protected[ml] def updateWeights(initialWeights: Vector, delta: Vector): Unit = { - axpy(1.0, delta, initialWeights) + protected[ml] def updateWeights(weights: Vector, delta: Vector): Vector = { + axpy(1.0, delta, weights) + weights } /** * @param weights From 36c851f2cfe25e17b2999ee6eb79de599bf9753f Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Thu, 23 Apr 2015 14:14:16 +0800 Subject: [PATCH 06/31] MIS LR with Vectors. v0.1. --- .../classification/LogisticRegression.scala | 196 ++++++++++++++++-- 1 file changed, 184 insertions(+), 12 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 27967d29..3213a129 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -18,26 +18,79 @@ package com.github.cloudml.zen.ml.classification import breeze.numerics.exp import com.github.cloudml.zen.ml.util.Utils -import org.apache.spark.Logging -import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.Loader +import org.apache.spark.mllib.classification.LogisticRegressionModel +import org.apache.spark.mllib.classification.impl.GLMClassificationModel.SaveLoadV1_0.Data +import org.apache.spark.mllib.feature.StandardScaler +import org.apache.spark.mllib.util.Loader +import org.apache.spark.mllib.util.Loader +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.mllib.linalg.{DenseVector, Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.util.MLUtils._ import org.apache.spark.rdd.RDD import com.github.cloudml.zen.ml.linalg.BLAS.dot import com.github.cloudml.zen.ml.linalg.BLAS.axpy import com.github.cloudml.zen.ml.linalg.BLAS.scal +import org.apache.spark.storage.StorageLevel +import org.json4s.jackson.JsonMethods._ -class LogisticRegressionMIS extends Logging with Serializable{ +class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Serializable{ /** * Construct a LogisticRegression object with default parameters: {stepSize: 1.0, * numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}. */ def this() = this(1.0, 100, 0.01, 1.0) - private var epsilon: Double = 1e-4 private var stepSize: Double = 1.0 private var numIterations: Int = 100 private var regParam: Double = 0.0 private var miniBatchFraction: Double = 1.0 + /** + * In `GeneralizedLinearModel`, only single linear predictor is allowed for both weights + * and intercept. However, for multinomial logistic regression, with K possible outcomes, + * we are training K-1 independent binary logistic regression models which requires K-1 sets + * of linear predictor. + * + * As a result, the workaround here is if more than two sets of linear predictors are needed, + * we construct bigger `weights` vector which can hold both weights and intercepts. + * If the intercepts are added, the dimension of `weights` will be + * (numOfLinearPredictor) * (numFeatures + 1) . If the intercepts are not added, + * the dimension of `weights` will be (numOfLinearPredictor) * numFeatures. + * + * Thus, the intercepts will be encapsulated into weights, and we leave the value of intercept + * in GeneralizedLinearModel as zero. + */ + protected var numOfLinearPredictor: Int = 1 + /** Whether to add intercept (default: false). */ + protected var addIntercept: Boolean = false + /** + * The dimension of training features. + */ + protected var numFeatures: Int = -1 + /** + * Whether to perform feature scaling before model training to reduce the condition numbers + * which can significantly help the optimizer converging faster. The scaling correction will be + * translated back to resulting model weights, so it's transparent to users. + * Note: This technique is used in both libsvm and glmnet packages. Default false. + */ + private var useFeatureScaling = false + /** + * Set if the algorithm should use feature scaling to improve the convergence during optimization. + */ + private[mllib] def setFeatureScaling(useFeatureScaling: Boolean): this.type = { + this.useFeatureScaling = useFeatureScaling + this + } + /** + * Set if the algorithm should add an intercept. Default false. + * We set the default to false because adding the intercept will cause memory allocation. + */ + def setIntercept(addIntercept: Boolean): this.type = { + this.addIntercept = addIntercept + this + } /** * Set the initial step size of SGD for the first step. Default 1.0. * In subsequent steps, the step size will decrease with stepSize/sqrt(t) @@ -79,12 +132,116 @@ class LogisticRegressionMIS extends Logging with Serializable{ epsilon = eps this } + + /** + * Run the algorithm with the configured parameters on an input + * RDD of LabeledPoint entries. + */ + def run(iterations: Int): Unit = { + if (numFeatures < 0) { + numFeatures = dataSet.map(_.features.size).first() + } + /** + * When `numOfLinearPredictor > 1`, the intercepts are encapsulated into weights, + * so the `weights` will include the intercepts. When `numOfLinearPredictor == 1`, + * the intercept will be stored as separated value in `GeneralizedLinearModel`. + * This will result in different behaviors since when `numOfLinearPredictor == 1`, + * users have no way to set the initial intercept, while in the other case, users + * can set the intercepts as part of weights. + * + * TODO: See if we can deprecate `intercept` in `GeneralizedLinearModel`, and always + * have the intercept as part of weights to have consistent design. + */ + val initialWeights = { + if (numOfLinearPredictor == 1) { + Vectors.dense(new Array[Double](numFeatures)) + } else if (addIntercept) { + Vectors.dense(new Array[Double]((numFeatures + 1) * numOfLinearPredictor)) + } else { + Vectors.dense(new Array[Double](numFeatures * numOfLinearPredictor)) + } + } + run(iterations, initialWeights) + } + def run(iterations: Int, initialWeights: Vector): Unit ={ + if (numFeatures < 0) { + numFeatures = dataSet.map(_.features.size).first() + } + + if (dataSet.getStorageLevel == StorageLevel.NONE) { + logWarning("The input data is not directly cached, which may hurt performance if its" + + " parent RDDs are also uncached.") + } + + /* + * Scaling columns to unit variance as a heuristic to reduce the condition number: + * + * During the optimization process, the convergence (rate) depends on the condition number of + * the training dataset. Scaling the variables often reduces this condition number + * heuristically, thus improving the convergence rate. Without reducing the condition number, + * some training datasets mixing the columns with different scales may not be able to converge. + * + * GLMNET and LIBSVM packages perform the scaling to reduce the condition number, and return + * the weights in the original scale. + * See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf + * + * Here, if useFeatureScaling is enabled, we will standardize the training features by dividing + * the variance of each column (without subtracting the mean), and train the model in the + * scaled space. Then we transform the coefficients from the scaled space to the original scale + * as GLMNET and LIBSVM do. + * + * Currently, it's only enabled in LogisticRegressionWithLBFGS + */ + val scaler = if (useFeatureScaling) { + new StandardScaler(withStd = true, withMean = false).fit(dataSet.map(_.features)) + } else { + null + } + // Prepend an extra variable consisting of all 1.0's for the intercept. + // TODO: Apply feature scaling to the weight vector instead of input data. + val data = + if (addIntercept) { + if (useFeatureScaling) { + dataSet.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache() + } else { + dataSet.map(lp => (lp.label, appendBias(lp.features))).cache() + } + } else { + if (useFeatureScaling) { + dataSet.map(lp => (lp.label, scaler.transform(lp.features))).cache() + } else { + dataSet.map(lp => (lp.label, lp.features)) + } + } + + /** + * TODO: For better convergence, in logistic regression, the intercepts should be computed + * from the prior probability distribution of the outcomes; for linear regression, + * the intercept should be set as the average of response. + */ + val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) { + appendBias(initialWeights) + } else { + /** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */ + initialWeights + } + for (iter <- 1 to iterations) { + logInfo(s"Start train (Iteration $iter/$iterations)") + val startedAt = System.nanoTime() + + val delta = updateGradients(iter, backward(forward(initialWeightsWithIntercept), numFeatures)) + updateWeights(initialWeightsWithIntercept, delta) + val loss = loss(initialWeightsWithIntercept) + val elapsedSeconds = (System.nanoTime() - startedAt) / 1e9 + logInfo(s"train (Iteration $iter/$iterations) loss: $loss") + logInfo(s"End train (Iteration $iter/$iterations) takes: $elapsedSeconds") + } + } /** * Calculate the mistake probability: q(i) = 1/(1+exp(yi*(w*xi))). * @param initialWeights weights of last iteration. - * @param dataSet */ - protected[ml] def forward(initialWeights: Vector, dataSet: RDD[LabeledPoint]): RDD[Double] = { + protected[ml] def forward(initialWeights: Vector): RDD[Double] = { dataSet.map{point => val z = point.label * dot(initialWeights, point.features) 1.0 / (1.0 + exp(z)) @@ -94,9 +251,8 @@ class LogisticRegressionMIS extends Logging with Serializable{ /** * Calculate the change in weights. wj = log(mu_j_+/mu_j_-) * @param misProb q(i) = 1/(1+exp(yi*(w*xi))). - * @param dataSet */ - protected[ml] def backward(misProb: RDD[Double], dataSet: RDD[LabeledPoint], numFeatures: Int): + protected[ml] def backward(misProb: RDD[Double], numFeatures: Int): Vector = { def func(v1: Vector, v2: Vector) = { axpy(1.0, v1, v2) @@ -140,16 +296,14 @@ class LogisticRegressionMIS extends Logging with Serializable{ * @param weights * @param delta */ - protected[ml] def updateWeights(weights: Vector, delta: Vector): Vector = { + protected[ml] def updateWeights(weights: Vector, delta: Vector): Unit = { axpy(1.0, delta, weights) - weights } /** * @param weights - * @param dataSet * @return Loss of given weights and dataSet in one iteration. */ - protected[ml] def loss(weights: Vector, dataSet: RDD[LabeledPoint]) : Double = { + protected[ml] def loss(weights: Vector) : Double = { // For Binary Logistic Regression var lossSum = 0 dataSet.foreach {point => @@ -163,3 +317,21 @@ class LogisticRegressionMIS extends Logging with Serializable{ lossSum } } + +object LogisticRegression { + def trainMIS( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + regParam: Double, + epsilon: Double = 1e-3, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): Unit = { + val lr = new LogisticRegressionMIS(input) + lr.setEpsilon(epsilon) + .setIntercept(false) + .setStepSize(stepSize) + .setNumIterations(numIterations) + .setRegParam(regParam) + .run(numIterations) + } +} \ No newline at end of file From ee9db9d10ec9e72e2c32393adc034ca419ea605c Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Thu, 23 Apr 2015 14:24:42 +0800 Subject: [PATCH 07/31] MIS LR with Vectors. v0.1. --- .../zen/ml/classification/LogisticRegression.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 3213a129..bd640f0f 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -18,15 +18,9 @@ package com.github.cloudml.zen.ml.classification import breeze.numerics.exp import com.github.cloudml.zen.ml.util.Utils -import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.Loader -import org.apache.spark.mllib.classification.LogisticRegressionModel -import org.apache.spark.mllib.classification.impl.GLMClassificationModel.SaveLoadV1_0.Data import org.apache.spark.mllib.feature.StandardScaler -import org.apache.spark.mllib.util.Loader -import org.apache.spark.mllib.util.Loader -import org.apache.spark.sql.SQLContext -import org.apache.spark.{SparkContext, Logging} -import org.apache.spark.mllib.linalg.{DenseVector, Vectors, Vector} +import org.apache.spark.{Logging} +import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils._ import org.apache.spark.rdd.RDD @@ -34,7 +28,6 @@ import com.github.cloudml.zen.ml.linalg.BLAS.dot import com.github.cloudml.zen.ml.linalg.BLAS.axpy import com.github.cloudml.zen.ml.linalg.BLAS.scal import org.apache.spark.storage.StorageLevel -import org.json4s.jackson.JsonMethods._ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Serializable{ /** From 8dc67ee026481e760a5f33b9541970266d3954ed Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Mon, 27 Apr 2015 16:29:04 +0800 Subject: [PATCH 08/31] LRSuite MIS witn vectors. --- .../LogisticRegressionSuite.scala | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala diff --git a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala new file mode 100644 index 00000000..03239261 --- /dev/null +++ b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.cloudml.zen.ml.classification + +import com.github.cloudml.zen.ml.util.SharedSparkContext +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.util.MLUtils +import org.scalatest.{Matchers, FunSuite} +import com.github.cloudml.zen.ml.util.SparkUtils._ + +class LogisticRegressionSuite extends FunSuite with SharedSparkContext with Matchers { + test("LogisticRegression MIS with Vectors") { + val zenHome = sys.props.getOrElse("zen.test.home", fail("zen.test.home is not set!")) + val dataSetFile = s"$zenHome/data/binary_classification_data.txt" + val dataSet = MLUtils.loadLibSVMFile(sc, dataSetFile).map{case LabeledPoint(label, features)=> + val newLabel = if (label > 0.0) 1.0 else -1.0 + LabeledPoint(newLabel, features) + } + val max = dataSet.map(_.features.activeValuesIterator.map(_.abs).sum + 1L).max + val maxIter = 10 + val stepSize = 1 / (2 * max) + val lr = new LogisticRegressionMIS(dataSet) + lr.setStepSize(stepSize) + var i = 0 + val startedAt = System.currentTimeMillis() + while (i < maxIter) { + lr.run(1) + i += 1 + } + println((System.currentTimeMillis() - startedAt) / 1e3) + } +} From b24a2b77ee8208874e1fac92223be5bf9030ca1d Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Mon, 27 Apr 2015 17:38:45 +0800 Subject: [PATCH 09/31] details. --- .../classification/LogisticRegression.scala | 56 +++++++++---------- 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index bd640f0f..8559f02e 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -18,7 +18,6 @@ package com.github.cloudml.zen.ml.classification import breeze.numerics.exp import com.github.cloudml.zen.ml.util.Utils -import org.apache.spark.mllib.feature.StandardScaler import org.apache.spark.{Logging} import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint @@ -30,11 +29,6 @@ import com.github.cloudml.zen.ml.linalg.BLAS.scal import org.apache.spark.storage.StorageLevel class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Serializable{ - /** - * Construct a LogisticRegression object with default parameters: {stepSize: 1.0, - * numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}. - */ - def this() = this(1.0, 100, 0.01, 1.0) private var epsilon: Double = 1e-4 private var stepSize: Double = 1.0 private var numIterations: Int = 100 @@ -72,7 +66,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser /** * Set if the algorithm should use feature scaling to improve the convergence during optimization. */ - private[mllib] def setFeatureScaling(useFeatureScaling: Boolean): this.type = { + private def setFeatureScaling(useFeatureScaling: Boolean): this.type = { this.useFeatureScaling = useFeatureScaling this } @@ -185,27 +179,27 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser * * Currently, it's only enabled in LogisticRegressionWithLBFGS */ - val scaler = if (useFeatureScaling) { - new StandardScaler(withStd = true, withMean = false).fit(dataSet.map(_.features)) - } else { - null - } - // Prepend an extra variable consisting of all 1.0's for the intercept. - // TODO: Apply feature scaling to the weight vector instead of input data. - val data = - if (addIntercept) { - if (useFeatureScaling) { - dataSet.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache() - } else { - dataSet.map(lp => (lp.label, appendBias(lp.features))).cache() - } - } else { - if (useFeatureScaling) { - dataSet.map(lp => (lp.label, scaler.transform(lp.features))).cache() - } else { - dataSet.map(lp => (lp.label, lp.features)) - } - } +// val scaler = if (useFeatureScaling) { +// new StandardScaler(withStd = true, withMean = false).fit(dataSet.map(_.features)) +// } else { +// null +// } +// // Prepend an extra variable consisting of all 1.0's for the intercept. +// // TODO: Apply feature scaling to the weight vector instead of input data. +// val data = +// if (addIntercept) { +// if (useFeatureScaling) { +// dataSet.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache() +// } else { +// dataSet.map(lp => (lp.label, appendBias(lp.features))).cache() +// } +// } else { +// if (useFeatureScaling) { +// dataSet.map(lp => (lp.label, scaler.transform(lp.features))).cache() +// } else { +// dataSet.map(lp => (lp.label, lp.features)) +// } +// } /** * TODO: For better convergence, in logistic regression, the intercepts should be computed @@ -224,9 +218,9 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser val delta = updateGradients(iter, backward(forward(initialWeightsWithIntercept), numFeatures)) updateWeights(initialWeightsWithIntercept, delta) - val loss = loss(initialWeightsWithIntercept) + val lossSum = loss(initialWeightsWithIntercept) val elapsedSeconds = (System.nanoTime() - startedAt) / 1e9 - logInfo(s"train (Iteration $iter/$iterations) loss: $loss") + logInfo(s"train (Iteration $iter/$iterations) loss: $lossSum") logInfo(s"End train (Iteration $iter/$iterations) takes: $elapsedSeconds") } } @@ -298,7 +292,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser */ protected[ml] def loss(weights: Vector) : Double = { // For Binary Logistic Regression - var lossSum = 0 + var lossSum = 0.0 dataSet.foreach {point => val margin = -1.0 * dot(point.features, weights) if (point.label > 0) { From a2a820ec1dc0162811cc6138fe0d9cfb18f98c5d Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Wed, 29 Apr 2015 14:11:50 +0800 Subject: [PATCH 10/31] optimize. --- .../classification/LogisticRegression.scala | 21 ++++++++++++++++--- .../LogisticRegressionSuite.scala | 10 +++++---- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 8559f02e..6101f1c6 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -18,6 +18,7 @@ package com.github.cloudml.zen.ml.classification import breeze.numerics.exp import com.github.cloudml.zen.ml.util.Utils +import org.apache.spark.mllib.classification.LogisticRegressionModel import org.apache.spark.{Logging} import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint @@ -124,7 +125,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser * Run the algorithm with the configured parameters on an input * RDD of LabeledPoint entries. */ - def run(iterations: Int): Unit = { + def run(iterations: Int): (LogisticRegressionModel, Array[Double]) = { if (numFeatures < 0) { numFeatures = dataSet.map(_.features.size).first() } @@ -150,7 +151,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser } run(iterations, initialWeights) } - def run(iterations: Int, initialWeights: Vector): Unit ={ + def run(iterations: Int, initialWeights: Vector): (LogisticRegressionModel, Array[Double]) ={ if (numFeatures < 0) { numFeatures = dataSet.map(_.features.size).first() } @@ -212,6 +213,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser /** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */ initialWeights } + val lossArr = new Array[Double](iterations) for (iter <- 1 to iterations) { logInfo(s"Start train (Iteration $iter/$iterations)") val startedAt = System.nanoTime() @@ -219,10 +221,22 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser val delta = updateGradients(iter, backward(forward(initialWeightsWithIntercept), numFeatures)) updateWeights(initialWeightsWithIntercept, delta) val lossSum = loss(initialWeightsWithIntercept) + lossArr(iter-1) = lossSum val elapsedSeconds = (System.nanoTime() - startedAt) / 1e9 logInfo(s"train (Iteration $iter/$iterations) loss: $lossSum") logInfo(s"End train (Iteration $iter/$iterations) takes: $elapsedSeconds") } + val intercept = if (addIntercept && numOfLinearPredictor == 1) { + initialWeightsWithIntercept(initialWeightsWithIntercept.size - 1) + } else { + 0.0 + } + var weights = if (addIntercept && numOfLinearPredictor == 1) { + Vectors.dense(initialWeightsWithIntercept.toArray.slice(0, initialWeightsWithIntercept.size - 1)) + } else { + initialWeightsWithIntercept + } + (new LogisticRegressionModel(initialWeightsWithIntercept, intercept), lossArr) } /** * Calculate the mistake probability: q(i) = 1/(1+exp(yi*(w*xi))). @@ -283,8 +297,9 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser * @param weights * @param delta */ - protected[ml] def updateWeights(weights: Vector, delta: Vector): Unit = { + protected[ml] def updateWeights(weights: Vector, delta: Vector): Vector = { axpy(1.0, delta, weights) + weights } /** * @param weights diff --git a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala index 03239261..ad49e920 100644 --- a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala +++ b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala @@ -38,10 +38,12 @@ class LogisticRegressionSuite extends FunSuite with SharedSparkContext with Matc lr.setStepSize(stepSize) var i = 0 val startedAt = System.currentTimeMillis() - while (i < maxIter) { - lr.run(1) - i += 1 - } + val (model, lossArr) = lr.run(maxIter) println((System.currentTimeMillis() - startedAt) / 1e3) + + lossArr.foreach(println) + val ppsDiff = lossArr.init.zip(lossArr.tail).map { case (lhs, rhs) => lhs - rhs } + assert(ppsDiff.count(_ > 0).toDouble / ppsDiff.size > 0.05) + assert(lossArr.head - lossArr.last > 0) } } From 4300cdd1810d35a1189159e08f0eb5aac6a677b7 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Wed, 29 Apr 2015 15:15:59 +0800 Subject: [PATCH 11/31] For debug. --- data/MISLRtestData.txt | 8 +++++++ .../ml/LogisticRegressionMISVector.scala | 22 +++++++++++++++++++ .../classification/LogisticRegression.scala | 21 ++++++------------ 3 files changed, 37 insertions(+), 14 deletions(-) create mode 100644 data/MISLRtestData.txt create mode 100644 examples/src/main/scala/com/github/cloudml/zen/examples/ml/LogisticRegressionMISVector.scala diff --git a/data/MISLRtestData.txt b/data/MISLRtestData.txt new file mode 100644 index 00000000..5bb5072f --- /dev/null +++ b/data/MISLRtestData.txt @@ -0,0 +1,8 @@ +-1 1:1 3:1 +-1 1:1 ++1 3:1 ++1 2:1 3:1 +-1 1:1 2:1 3:1 +-1 2:1 ++1 1:1 2:1 +-1 1:1 \ No newline at end of file diff --git a/examples/src/main/scala/com/github/cloudml/zen/examples/ml/LogisticRegressionMISVector.scala b/examples/src/main/scala/com/github/cloudml/zen/examples/ml/LogisticRegressionMISVector.scala new file mode 100644 index 00000000..20a15e90 --- /dev/null +++ b/examples/src/main/scala/com/github/cloudml/zen/examples/ml/LogisticRegressionMISVector.scala @@ -0,0 +1,22 @@ +package com.github.cloudml.zen.examples.ml + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +class LogisticRegressionMISVector { + +} diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 6101f1c6..1072bf55 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -218,7 +218,9 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser logInfo(s"Start train (Iteration $iter/$iterations)") val startedAt = System.nanoTime() - val delta = updateGradients(iter, backward(forward(initialWeightsWithIntercept), numFeatures)) + val q = forward(initialWeightsWithIntercept) + val qArr = q.collect() + val delta = backward(iter, q, numFeatures) updateWeights(initialWeightsWithIntercept, delta) val lossSum = loss(initialWeightsWithIntercept) lossArr(iter-1) = lossSum @@ -253,8 +255,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser * Calculate the change in weights. wj = log(mu_j_+/mu_j_-) * @param misProb q(i) = 1/(1+exp(yi*(w*xi))). */ - protected[ml] def backward(misProb: RDD[Double], numFeatures: Int): - Vector = { + protected[ml] def backward(iter: Int, misProb: RDD[Double], numFeatures: Int): Vector = { def func(v1: Vector, v2: Vector) = { axpy(1.0, v1, v2) v2 @@ -278,18 +279,10 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser } i += 1 } - Vectors.dense(grads) - } - - /** - * delta = stepSize * grad - * @param iter - * @param grads - */ - protected[ml] def updateGradients(iter: Int, grads: Vector): Vector = { + val gradVec = Vectors.dense(grads) val thisIterStepSize = stepSize / math.sqrt(iter) - scal(thisIterStepSize, grads) - grads + scal(thisIterStepSize, gradVec) + gradVec } /** From 8837bc5c692970ac1bc23e4427d985d9726691e9 Mon Sep 17 00:00:00 2001 From: Peishen-Jia Date: Wed, 29 Apr 2015 21:33:35 +0800 Subject: [PATCH 12/31] epsilon added. --- .../ml/LogisticRegressionMISVector.scala | 25 ++++++++++++++++++- .../classification/LogisticRegression.scala | 23 ++++++++--------- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/examples/src/main/scala/com/github/cloudml/zen/examples/ml/LogisticRegressionMISVector.scala b/examples/src/main/scala/com/github/cloudml/zen/examples/ml/LogisticRegressionMISVector.scala index 20a15e90..c3fc812a 100644 --- a/examples/src/main/scala/com/github/cloudml/zen/examples/ml/LogisticRegressionMISVector.scala +++ b/examples/src/main/scala/com/github/cloudml/zen/examples/ml/LogisticRegressionMISVector.scala @@ -1,5 +1,10 @@ package com.github.cloudml.zen.examples.ml +import com.github.cloudml.zen.ml.classification.LogisticRegressionMIS +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.util.MLUtils + /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,6 +22,24 @@ package com.github.cloudml.zen.examples.ml * limitations under the License. */ -class LogisticRegressionMISVector { +object LogisticRegressionMISVector { + def main (args: Array[String]) { + val conf = new SparkConf().setAppName("MIS with Vectors").setMaster("local") + val sc = new SparkContext(conf) + val dataSetFile = s"/Users/basin/gitStore/zen/data/binary_classification_data.txt" + val dataSet = MLUtils.loadLibSVMFile(sc, dataSetFile).map{case LabeledPoint(label, features)=> + val newLabel = if (label > 0.0) 1.0 else -1.0 + LabeledPoint(newLabel, features) + } + val maxIter = 10 + val stepSize = 1 + val lr = new LogisticRegressionMIS(dataSet) + lr.setStepSize(stepSize) + var i = 0 + val startedAt = System.currentTimeMillis() + val (model, lossArr) = lr.run(maxIter) + println((System.currentTimeMillis() - startedAt) / 1e3) + lossArr.foreach(println) + } } diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 1072bf55..c47420a3 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -71,6 +71,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser this.useFeatureScaling = useFeatureScaling this } + private val numSamples = dataSet.count() /** * Set if the algorithm should add an intercept. Default false. * We set the default to false because adding the intercept will cause memory allocation. @@ -217,7 +218,6 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser for (iter <- 1 to iterations) { logInfo(s"Start train (Iteration $iter/$iterations)") val startedAt = System.nanoTime() - val q = forward(initialWeightsWithIntercept) val qArr = q.collect() val delta = backward(iter, q, numFeatures) @@ -233,12 +233,12 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser } else { 0.0 } - var weights = if (addIntercept && numOfLinearPredictor == 1) { + val weights = if (addIntercept && numOfLinearPredictor == 1) { Vectors.dense(initialWeightsWithIntercept.toArray.slice(0, initialWeightsWithIntercept.size - 1)) } else { initialWeightsWithIntercept } - (new LogisticRegressionModel(initialWeightsWithIntercept, intercept), lossArr) + (new LogisticRegressionModel(weights, intercept), lossArr) } /** * Calculate the mistake probability: q(i) = 1/(1+exp(yi*(w*xi))). @@ -252,7 +252,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser } /** - * Calculate the change in weights. wj = log(mu_j_+/mu_j_-) + * Calculate the change in weights. delta_W_j = stepSize * log(mu_j_+/mu_j_-) * @param misProb q(i) = 1/(1+exp(yi*(w*xi))). */ protected[ml] def backward(iter: Int, misProb: RDD[Double], numFeatures: Int): Vector = { @@ -275,7 +275,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser grads(i) = if (epsilon == 0.0) { math.log(muPlus(i) / muMinus(i)) } else { - math.log(muPlus(i) / (epsilon + muMinus(i))) + math.log(epsilon + muPlus(i) / (epsilon + muMinus(i))) } i += 1 } @@ -290,9 +290,8 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser * @param weights * @param delta */ - protected[ml] def updateWeights(weights: Vector, delta: Vector): Vector = { + protected[ml] def updateWeights(weights: Vector, delta: Vector): Unit = { axpy(1.0, delta, weights) - weights } /** * @param weights @@ -300,16 +299,14 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser */ protected[ml] def loss(weights: Vector) : Double = { // For Binary Logistic Regression - var lossSum = 0.0 - dataSet.foreach {point => + dataSet.map {point => val margin = -1.0 * dot(point.features, weights) if (point.label > 0) { - lossSum += Utils.log1pExp(margin) + Utils.log1pExp(margin) } else { - lossSum += Utils.log1pExp(margin) - margin + Utils.log1pExp(margin) - margin } - } - lossSum + }.reduce(_+_) / numSamples } } From 2d55c17c5b76caca5b9afe5de03229758b10bb0d Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Thu, 30 Apr 2015 10:01:21 +0800 Subject: [PATCH 13/31] details. --- .../ml/classification/LogisticRegressionSuite.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala index ad49e920..ba8bc91d 100644 --- a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala +++ b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala @@ -38,12 +38,15 @@ class LogisticRegressionSuite extends FunSuite with SharedSparkContext with Matc lr.setStepSize(stepSize) var i = 0 val startedAt = System.currentTimeMillis() - val (model, lossArr) = lr.run(maxIter) + while (i < maxIter) { + val (model, lossArr) = lr.run(1) + println(lossArr(0)) + } println((System.currentTimeMillis() - startedAt) / 1e3) - lossArr.foreach(println) - val ppsDiff = lossArr.init.zip(lossArr.tail).map { case (lhs, rhs) => lhs - rhs } - assert(ppsDiff.count(_ > 0).toDouble / ppsDiff.size > 0.05) - assert(lossArr.head - lossArr.last > 0) +// lossArr.foreach(println) +// val ppsDiff = lossArr.init.zip(lossArr.tail).map { case (lhs, rhs) => lhs - rhs } +// assert(ppsDiff.count(_ > 0).toDouble / ppsDiff.size > 0.05) +// assert(lossArr.head - lossArr.last > 0) } } From 8acf5e6972eecf982d3c97e935ed6b2085c4175a Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Thu, 30 Apr 2015 10:09:05 +0800 Subject: [PATCH 14/31] details. --- .../cloudml/zen/ml/classification/LogisticRegressionSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala index ba8bc91d..5ad79aab 100644 --- a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala +++ b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala @@ -41,6 +41,7 @@ class LogisticRegressionSuite extends FunSuite with SharedSparkContext with Matc while (i < maxIter) { val (model, lossArr) = lr.run(1) println(lossArr(0)) + i += 1 } println((System.currentTimeMillis() - startedAt) / 1e3) From f151d10cbee37a05cc82b4a7224b79b52c1b3218 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Thu, 30 Apr 2015 10:19:10 +0800 Subject: [PATCH 15/31] details --- .../zen/ml/classification/LogisticRegressionSuite.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala index 5ad79aab..b9ce2cae 100644 --- a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala +++ b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala @@ -38,11 +38,8 @@ class LogisticRegressionSuite extends FunSuite with SharedSparkContext with Matc lr.setStepSize(stepSize) var i = 0 val startedAt = System.currentTimeMillis() - while (i < maxIter) { - val (model, lossArr) = lr.run(1) - println(lossArr(0)) - i += 1 - } + val (model, lossArr) = lr.run(maxIter) + lossArr.foreach(println) println((System.currentTimeMillis() - startedAt) / 1e3) // lossArr.foreach(println) From 81efd65839cb7d2db9540025cb620e4af71e1d8e Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Thu, 30 Apr 2015 10:23:43 +0800 Subject: [PATCH 16/31] details --- .../zen/ml/classification/LogisticRegressionSuite.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala index b9ce2cae..ad49e920 100644 --- a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala +++ b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala @@ -39,12 +39,11 @@ class LogisticRegressionSuite extends FunSuite with SharedSparkContext with Matc var i = 0 val startedAt = System.currentTimeMillis() val (model, lossArr) = lr.run(maxIter) - lossArr.foreach(println) println((System.currentTimeMillis() - startedAt) / 1e3) -// lossArr.foreach(println) -// val ppsDiff = lossArr.init.zip(lossArr.tail).map { case (lhs, rhs) => lhs - rhs } -// assert(ppsDiff.count(_ > 0).toDouble / ppsDiff.size > 0.05) -// assert(lossArr.head - lossArr.last > 0) + lossArr.foreach(println) + val ppsDiff = lossArr.init.zip(lossArr.tail).map { case (lhs, rhs) => lhs - rhs } + assert(ppsDiff.count(_ > 0).toDouble / ppsDiff.size > 0.05) + assert(lossArr.head - lossArr.last > 0) } } From 045e00913974c855beb14d4589fda8ea27243353 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Thu, 30 Apr 2015 10:33:41 +0800 Subject: [PATCH 17/31] MIS LR with Vectors. --- .../cloudml/zen/ml/classification/LogisticRegression.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index c47420a3..ace98323 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -218,9 +218,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser for (iter <- 1 to iterations) { logInfo(s"Start train (Iteration $iter/$iterations)") val startedAt = System.nanoTime() - val q = forward(initialWeightsWithIntercept) - val qArr = q.collect() - val delta = backward(iter, q, numFeatures) + val delta = backward(iter, forward(initialWeightsWithIntercept), numFeatures) updateWeights(initialWeightsWithIntercept, delta) val lossSum = loss(initialWeightsWithIntercept) lossArr(iter-1) = lossSum From 90a8e91f45e984bc9288ab6e83998e88c53b2300 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Thu, 30 Apr 2015 11:07:41 +0800 Subject: [PATCH 18/31] delete examples --- data/MISLRtestData.txt | 8 ---- .../ml/LogisticRegressionMISVector.scala | 45 ------------------- 2 files changed, 53 deletions(-) delete mode 100644 data/MISLRtestData.txt delete mode 100644 examples/src/main/scala/com/github/cloudml/zen/examples/ml/LogisticRegressionMISVector.scala diff --git a/data/MISLRtestData.txt b/data/MISLRtestData.txt deleted file mode 100644 index 5bb5072f..00000000 --- a/data/MISLRtestData.txt +++ /dev/null @@ -1,8 +0,0 @@ --1 1:1 3:1 --1 1:1 -+1 3:1 -+1 2:1 3:1 --1 1:1 2:1 3:1 --1 2:1 -+1 1:1 2:1 --1 1:1 \ No newline at end of file diff --git a/examples/src/main/scala/com/github/cloudml/zen/examples/ml/LogisticRegressionMISVector.scala b/examples/src/main/scala/com/github/cloudml/zen/examples/ml/LogisticRegressionMISVector.scala deleted file mode 100644 index c3fc812a..00000000 --- a/examples/src/main/scala/com/github/cloudml/zen/examples/ml/LogisticRegressionMISVector.scala +++ /dev/null @@ -1,45 +0,0 @@ -package com.github.cloudml.zen.examples.ml - -import com.github.cloudml.zen.ml.classification.LogisticRegressionMIS -import org.apache.spark.{SparkContext, SparkConf} -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.util.MLUtils - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -object LogisticRegressionMISVector { - def main (args: Array[String]) { - val conf = new SparkConf().setAppName("MIS with Vectors").setMaster("local") - val sc = new SparkContext(conf) - val dataSetFile = s"/Users/basin/gitStore/zen/data/binary_classification_data.txt" - val dataSet = MLUtils.loadLibSVMFile(sc, dataSetFile).map{case LabeledPoint(label, features)=> - val newLabel = if (label > 0.0) 1.0 else -1.0 - LabeledPoint(newLabel, features) - } - val maxIter = 10 - val stepSize = 1 - val lr = new LogisticRegressionMIS(dataSet) - lr.setStepSize(stepSize) - var i = 0 - val startedAt = System.currentTimeMillis() - val (model, lossArr) = lr.run(maxIter) - println((System.currentTimeMillis() - startedAt) / 1e3) - - lossArr.foreach(println) - } -} From 19dad0d49b895fdfaa621df8fa66a961289d05c6 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Mon, 4 May 2015 11:02:43 +0800 Subject: [PATCH 19/31] l1 reg added. --- .../classification/LogisticRegression.scala | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index ace98323..cbe92ea9 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -16,7 +16,8 @@ */ package com.github.cloudml.zen.ml.classification -import breeze.numerics.exp +import breeze.linalg.max +import breeze.numerics.{abs, signum, sqrt, exp} import com.github.cloudml.zen.ml.util.Utils import org.apache.spark.mllib.classification.LogisticRegressionModel import org.apache.spark.{Logging} @@ -208,7 +209,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser * from the prior probability distribution of the outcomes; for linear regression, * the intercept should be set as the average of response. */ - val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) { + var initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) { appendBias(initialWeights) } else { /** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */ @@ -219,7 +220,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser logInfo(s"Start train (Iteration $iter/$iterations)") val startedAt = System.nanoTime() val delta = backward(iter, forward(initialWeightsWithIntercept), numFeatures) - updateWeights(initialWeightsWithIntercept, delta) + initialWeightsWithIntercept = updateWeights(initialWeightsWithIntercept, delta, iter) val lossSum = loss(initialWeightsWithIntercept) lossArr(iter-1) = lossSum val elapsedSeconds = (System.nanoTime() - startedAt) / 1e9 @@ -288,8 +289,19 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser * @param weights * @param delta */ - protected[ml] def updateWeights(weights: Vector, delta: Vector): Unit = { + protected[ml] def updateWeights(weights: Vector, delta: Vector, iter: Int): Vector = { axpy(1.0, delta, weights) + val thisIterL1StepSize = stepSize / sqrt(iter) + weights.toArray.map{ + weight => + var newWeight = weight + if (regParam > 0.0 && weight != 0.0) { + val shrinkageVal = regParam * thisIterL1StepSize + newWeight = signum(weight) * max(0.0, abs(weight) - shrinkageVal) + } + assert(!newWeight.isNaN) + newWeight + }.toVector } /** * @param weights @@ -315,7 +327,7 @@ object LogisticRegression { stepSize: Double, regParam: Double, epsilon: Double = 1e-3, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): Unit = { + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): (LogisticRegressionModel, Array[Double]) = { val lr = new LogisticRegressionMIS(input) lr.setEpsilon(epsilon) .setIntercept(false) From c1b7f54c1c42e18869fa648e3eff821aad410221 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Mon, 4 May 2015 11:10:13 +0800 Subject: [PATCH 20/31] l1 reg added. --- .../cloudml/zen/ml/classification/LogisticRegression.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index cbe92ea9..30a3e61c 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -292,8 +292,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser protected[ml] def updateWeights(weights: Vector, delta: Vector, iter: Int): Vector = { axpy(1.0, delta, weights) val thisIterL1StepSize = stepSize / sqrt(iter) - weights.toArray.map{ - weight => + val newWeights = weights.toArray.map{ weight => var newWeight = weight if (regParam > 0.0 && weight != 0.0) { val shrinkageVal = regParam * thisIterL1StepSize @@ -301,7 +300,8 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser } assert(!newWeight.isNaN) newWeight - }.toVector + } + Vectors.dense(newWeights) } /** * @param weights From 23898473d6b6bb9189941250a6f39bf6f0c8a900 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Wed, 6 May 2015 14:13:59 +0800 Subject: [PATCH 21/31] replace collect with reduce --- .../classification/LogisticRegression.scala | 36 +++++++++---------- .../LogisticRegressionSuite.scala | 1 - 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 30a3e61c..a9138fc1 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -259,29 +259,29 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser axpy(1.0, v1, v2) v2 } - val muArr: Array[(Double, Vector)] = dataSet.zip(misProb).map { + dataSet.zip(misProb).map { case (point, prob) => val scaledFeatures = Vectors.zeros(numFeatures) axpy(prob, point.features, scaledFeatures) (point.label, scaledFeatures) - }.aggregateByKey(Vectors.zeros(numFeatures))(func, func).collect() - assert(muArr.length == 2) - val grads: Array[Double] = new Array[Double](numFeatures) - val muPlus: Array[Double] = {if (muArr(0)._1 > 0) muArr(0)._2 else muArr(1)._2}.toArray - val muMinus: Array[Double] = {if (muArr(0)._1 < 0) muArr(0)._2 else muArr(1)._2}.toArray - var i = 0 - while (i < numFeatures) { - grads(i) = if (epsilon == 0.0) { - math.log(muPlus(i) / muMinus(i)) - } else { - math.log(epsilon + muPlus(i) / (epsilon + muMinus(i))) + }.aggregateByKey(Vectors.zeros(numFeatures))(func, func).reduce{ (x1, x2) => + val grads: Array[Double] = new Array[Double](numFeatures) + val muPlus: Array[Double] = {if (x1._1 > 0) x1._2 else x2._2}.toArray + val muMinus: Array[Double] = {if (x1._1 < 0) x1._2 else x2._2}.toArray + var i = 0 + while (i < numFeatures) { + grads(i) = if (epsilon == 0.0) { + math.log(muPlus(i) / muMinus(i)) + } else { + math.log(epsilon + muPlus(i) / (epsilon + muMinus(i))) + } + i += 1 } - i += 1 - } - val gradVec = Vectors.dense(grads) - val thisIterStepSize = stepSize / math.sqrt(iter) - scal(thisIterStepSize, gradVec) - gradVec + val thisIterStepSize = stepSize / math.sqrt(iter) + val gradVec = Vectors.dense(grads) + scal(thisIterStepSize, gradVec) + (0.0, gradVec) + }._2 } /** diff --git a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala index ad49e920..39429bef 100644 --- a/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala +++ b/ml/src/test/scala/com/github/cloudml/zen/ml/classification/LogisticRegressionSuite.scala @@ -36,7 +36,6 @@ class LogisticRegressionSuite extends FunSuite with SharedSparkContext with Matc val stepSize = 1 / (2 * max) val lr = new LogisticRegressionMIS(dataSet) lr.setStepSize(stepSize) - var i = 0 val startedAt = System.currentTimeMillis() val (model, lossArr) = lr.run(maxIter) println((System.currentTimeMillis() - startedAt) / 1e3) From 5ce3c6a931f237473afbed6089811b527790421b Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Wed, 6 May 2015 17:48:30 +0800 Subject: [PATCH 22/31] add loss log. --- .../github/cloudml/zen/ml/regression/LogisticRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/regression/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/regression/LogisticRegression.scala index f8cb4b60..fb259f2f 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/regression/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/regression/LogisticRegression.scala @@ -119,7 +119,7 @@ abstract class LogisticRegression( vertices.count() dataSet = GraphImpl(vertices, edges) val elapsedSeconds = (System.nanoTime() - startedAt) / 1e9 - // logInfo(s"train (Iteration $iter/$iterations) loss: ${loss(margin)}") + logInfo(s"train (Iteration $iter/$iterations) loss: ${loss(margin)}") logInfo(s"End train (Iteration $iter/$iterations) takes: $elapsedSeconds") unpersistVertices() innerIter += 1 From ac7f0fd38433578a942f4c824ac0fde4e0debb53 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Wed, 6 May 2015 17:52:22 +0800 Subject: [PATCH 23/31] add loss log. --- .../github/cloudml/zen/ml/regression/LogisticRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/regression/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/regression/LogisticRegression.scala index fb259f2f..8293f37e 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/regression/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/regression/LogisticRegression.scala @@ -119,7 +119,7 @@ abstract class LogisticRegression( vertices.count() dataSet = GraphImpl(vertices, edges) val elapsedSeconds = (System.nanoTime() - startedAt) / 1e9 - logInfo(s"train (Iteration $iter/$iterations) loss: ${loss(margin)}") + logInfo(s"train (Iteration $iter/$iterations) loss: ${loss(margin)}") logInfo(s"End train (Iteration $iter/$iterations) takes: $elapsedSeconds") unpersistVertices() innerIter += 1 From e024f0175946112905f855fb2ac04ce83f19bb2d Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Wed, 6 May 2015 17:56:51 +0800 Subject: [PATCH 24/31] style fix. --- .../cloudml/zen/ml/classification/LogisticRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index a9138fc1..3cb691c8 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -316,7 +316,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser } else { Utils.log1pExp(margin) - margin } - }.reduce(_+_) / numSamples + }.reduce(_ + _) / numSamples } } From 6dd46d4112b521d02c84d1c262515df74008732e Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Wed, 6 May 2015 17:59:03 +0800 Subject: [PATCH 25/31] style fix. --- .../cloudml/zen/ml/classification/LogisticRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 3cb691c8..bb704162 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -336,4 +336,4 @@ object LogisticRegression { .setRegParam(regParam) .run(numIterations) } -} \ No newline at end of file +} From 9b6a9dfc8bd905c26c37bfa8b725584d0ff60061 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Thu, 7 May 2015 11:54:42 +0800 Subject: [PATCH 26/31] number of features added. --- .../classification/LogisticRegression.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index bb704162..59a5ffda 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -73,6 +73,16 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser this } private val numSamples = dataSet.count() + + /** + * Set Number of features + * @param numFeatures + * @return + */ + def setNumFeatures(numFeatures: Int): this.type = { + this.numFeatures = numFeatures + this + } /** * Set if the algorithm should add an intercept. Default false. * We set the default to false because adding the intercept will cause memory allocation. @@ -261,15 +271,16 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser } dataSet.zip(misProb).map { case (point, prob) => - val scaledFeatures = Vectors.zeros(numFeatures) - axpy(prob, point.features, scaledFeatures) + val scaledFeatures = point.features + scal(prob, scaledFeatures) (point.label, scaledFeatures) }.aggregateByKey(Vectors.zeros(numFeatures))(func, func).reduce{ (x1, x2) => - val grads: Array[Double] = new Array[Double](numFeatures) val muPlus: Array[Double] = {if (x1._1 > 0) x1._2 else x2._2}.toArray val muMinus: Array[Double] = {if (x1._1 < 0) x1._2 else x2._2}.toArray + assert(muPlus.length == muMinus.length) + val grads: Array[Double] = new Array[Double](muPlus.length) var i = 0 - while (i < numFeatures) { + while (i < muPlus.length) { grads(i) = if (epsilon == 0.0) { math.log(muPlus(i) / muMinus(i)) } else { From bf7aa539c72657843d194aa30b23ae515dc2603e Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Thu, 7 May 2015 15:32:18 +0800 Subject: [PATCH 27/31] intercepts removed. --- .../classification/LogisticRegression.scala | 73 +------------------ 1 file changed, 3 insertions(+), 70 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 59a5ffda..8d3a8f08 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -52,8 +52,6 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser * in GeneralizedLinearModel as zero. */ protected var numOfLinearPredictor: Int = 1 - /** Whether to add intercept (default: false). */ - protected var addIntercept: Boolean = false /** * The dimension of training features. */ @@ -83,14 +81,6 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser this.numFeatures = numFeatures this } - /** - * Set if the algorithm should add an intercept. Default false. - * We set the default to false because adding the intercept will cause memory allocation. - */ - def setIntercept(addIntercept: Boolean): this.type = { - this.addIntercept = addIntercept - this - } /** * Set the initial step size of SGD for the first step. Default 1.0. * In subsequent steps, the step size will decrease with stepSize/sqrt(t) @@ -155,8 +145,6 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser val initialWeights = { if (numOfLinearPredictor == 1) { Vectors.dense(new Array[Double](numFeatures)) - } else if (addIntercept) { - Vectors.dense(new Array[Double]((numFeatures + 1) * numOfLinearPredictor)) } else { Vectors.dense(new Array[Double](numFeatures * numOfLinearPredictor)) } @@ -173,58 +161,12 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser + " parent RDDs are also uncached.") } - /* - * Scaling columns to unit variance as a heuristic to reduce the condition number: - * - * During the optimization process, the convergence (rate) depends on the condition number of - * the training dataset. Scaling the variables often reduces this condition number - * heuristically, thus improving the convergence rate. Without reducing the condition number, - * some training datasets mixing the columns with different scales may not be able to converge. - * - * GLMNET and LIBSVM packages perform the scaling to reduce the condition number, and return - * the weights in the original scale. - * See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf - * - * Here, if useFeatureScaling is enabled, we will standardize the training features by dividing - * the variance of each column (without subtracting the mean), and train the model in the - * scaled space. Then we transform the coefficients from the scaled space to the original scale - * as GLMNET and LIBSVM do. - * - * Currently, it's only enabled in LogisticRegressionWithLBFGS - */ -// val scaler = if (useFeatureScaling) { -// new StandardScaler(withStd = true, withMean = false).fit(dataSet.map(_.features)) -// } else { -// null -// } -// // Prepend an extra variable consisting of all 1.0's for the intercept. -// // TODO: Apply feature scaling to the weight vector instead of input data. -// val data = -// if (addIntercept) { -// if (useFeatureScaling) { -// dataSet.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache() -// } else { -// dataSet.map(lp => (lp.label, appendBias(lp.features))).cache() -// } -// } else { -// if (useFeatureScaling) { -// dataSet.map(lp => (lp.label, scaler.transform(lp.features))).cache() -// } else { -// dataSet.map(lp => (lp.label, lp.features)) -// } -// } - /** * TODO: For better convergence, in logistic regression, the intercepts should be computed * from the prior probability distribution of the outcomes; for linear regression, * the intercept should be set as the average of response. */ - var initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) { - appendBias(initialWeights) - } else { - /** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */ - initialWeights - } + var initialWeightsWithIntercept = initialWeights val lossArr = new Array[Double](iterations) for (iter <- 1 to iterations) { logInfo(s"Start train (Iteration $iter/$iterations)") @@ -237,16 +179,8 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser logInfo(s"train (Iteration $iter/$iterations) loss: $lossSum") logInfo(s"End train (Iteration $iter/$iterations) takes: $elapsedSeconds") } - val intercept = if (addIntercept && numOfLinearPredictor == 1) { - initialWeightsWithIntercept(initialWeightsWithIntercept.size - 1) - } else { - 0.0 - } - val weights = if (addIntercept && numOfLinearPredictor == 1) { - Vectors.dense(initialWeightsWithIntercept.toArray.slice(0, initialWeightsWithIntercept.size - 1)) - } else { - initialWeightsWithIntercept - } + val intercept = 0.0 + val weights = initialWeightsWithIntercept (new LogisticRegressionModel(weights, intercept), lossArr) } /** @@ -341,7 +275,6 @@ object LogisticRegression { storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): (LogisticRegressionModel, Array[Double]) = { val lr = new LogisticRegressionMIS(input) lr.setEpsilon(epsilon) - .setIntercept(false) .setStepSize(stepSize) .setNumIterations(numIterations) .setRegParam(regParam) From ea67324748ba2d9a2715e980c7dc595eece26fe7 Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Fri, 8 May 2015 09:01:43 +0800 Subject: [PATCH 28/31] reduceByKey. --- .../cloudml/zen/ml/classification/LogisticRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 8d3a8f08..bb4a10df 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -208,7 +208,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser val scaledFeatures = point.features scal(prob, scaledFeatures) (point.label, scaledFeatures) - }.aggregateByKey(Vectors.zeros(numFeatures))(func, func).reduce{ (x1, x2) => + }.reduceByKey(func).reduce{ (x1, x2) => val muPlus: Array[Double] = {if (x1._1 > 0) x1._2 else x2._2}.toArray val muMinus: Array[Double] = {if (x1._1 < 0) x1._2 else x2._2}.toArray assert(muPlus.length == muMinus.length) From edc11dc506b384466d31f39a8a61659cc24dc3ba Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Wed, 13 May 2015 13:34:18 +0800 Subject: [PATCH 29/31] Tree Aggregate. --- .../classification/LogisticRegression.scala | 72 +++++++++---------- 1 file changed, 32 insertions(+), 40 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index bb4a10df..295fc1a6 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -23,12 +23,12 @@ import org.apache.spark.mllib.classification.LogisticRegressionModel import org.apache.spark.{Logging} import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.util.MLUtils._ import org.apache.spark.rdd.RDD import com.github.cloudml.zen.ml.linalg.BLAS.dot import com.github.cloudml.zen.ml.linalg.BLAS.axpy import com.github.cloudml.zen.ml.linalg.BLAS.scal import org.apache.spark.storage.StorageLevel +import breeze.linalg.{DenseVector => BDV} class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Serializable{ private var epsilon: Double = 1e-4 @@ -171,7 +171,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser for (iter <- 1 to iterations) { logInfo(s"Start train (Iteration $iter/$iterations)") val startedAt = System.nanoTime() - val delta = backward(iter, forward(initialWeightsWithIntercept), numFeatures) + val delta = backward(iter, initialWeightsWithIntercept, numFeatures) initialWeightsWithIntercept = updateWeights(initialWeightsWithIntercept, delta, iter) val lossSum = loss(initialWeightsWithIntercept) lossArr(iter-1) = lossSum @@ -183,50 +183,42 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser val weights = initialWeightsWithIntercept (new LogisticRegressionModel(weights, intercept), lossArr) } - /** - * Calculate the mistake probability: q(i) = 1/(1+exp(yi*(w*xi))). - * @param initialWeights weights of last iteration. - */ - protected[ml] def forward(initialWeights: Vector): RDD[Double] = { - dataSet.map{point => - val z = point.label * dot(initialWeights, point.features) - 1.0 / (1.0 + exp(z)) - } - } /** * Calculate the change in weights. delta_W_j = stepSize * log(mu_j_+/mu_j_-) - * @param misProb q(i) = 1/(1+exp(yi*(w*xi))). */ - protected[ml] def backward(iter: Int, misProb: RDD[Double], numFeatures: Int): Vector = { - def func(v1: Vector, v2: Vector) = { - axpy(1.0, v1, v2) - v2 + protected[ml] def backward(iter: Int, initialWeights: Vector, numFeatures: Int): Vector = { + + def func(c1: (Vector, Vector), c2: (Vector, Vector)): (Vector, Vector) = { + axpy(1.0, c1._1, c2._1) + axpy(1.0, c1._2, c2._2) + (c2._1, c2._2) } - dataSet.zip(misProb).map { - case (point, prob) => - val scaledFeatures = point.features - scal(prob, scaledFeatures) - (point.label, scaledFeatures) - }.reduceByKey(func).reduce{ (x1, x2) => - val muPlus: Array[Double] = {if (x1._1 > 0) x1._2 else x2._2}.toArray - val muMinus: Array[Double] = {if (x1._1 < 0) x1._2 else x2._2}.toArray - assert(muPlus.length == muMinus.length) - val grads: Array[Double] = new Array[Double](muPlus.length) - var i = 0 - while (i < muPlus.length) { - grads(i) = if (epsilon == 0.0) { - math.log(muPlus(i) / muMinus(i)) - } else { - math.log(epsilon + muPlus(i) / (epsilon + muMinus(i))) - } - i += 1 + val (muPlus, muMinus) = dataSet.map { point => + val z = point.label * dot(initialWeights, point.features) + val prob = 1.0 / (1.0 + exp(z)) + scal(prob, point.features) + if (point.label > 0.0){ + (point.features, Vectors.zeros(numFeatures)) + } else { + (Vectors.zeros(numFeatures), point.features) } - val thisIterStepSize = stepSize / math.sqrt(iter) - val gradVec = Vectors.dense(grads) - scal(thisIterStepSize, gradVec) - (0.0, gradVec) - }._2 + }.treeAggregate((Vectors.zeros(numFeatures), Vectors.zeros(numFeatures)))(seqOp = func, combOp = func) + assert(muMinus.size == muPlus.size) + val grads: Array[Double] = new Array[Double](muPlus.size) + var i = 0 + while (i < muPlus.size) { + grads(i) = if (epsilon == 0.0) { + math.log(muPlus(i) / muMinus(i)) + } else { + math.log(epsilon + muPlus(i) / (epsilon + muMinus(i))) + } + i += 1 + } + val thisIterStepSize = stepSize / math.sqrt(iter) + val gradVec = Vectors.dense(grads) + scal(thisIterStepSize, gradVec) + gradVec } /** From 825d5b2b5ed54488dab053b91b682da19688841e Mon Sep 17 00:00:00 2001 From: Peishen Jia Date: Wed, 13 May 2015 14:28:37 +0800 Subject: [PATCH 30/31] Tree Aggregate. --- .../zen/ml/classification/LogisticRegression.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 295fc1a6..2139d653 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -28,7 +28,6 @@ import com.github.cloudml.zen.ml.linalg.BLAS.dot import com.github.cloudml.zen.ml.linalg.BLAS.axpy import com.github.cloudml.zen.ml.linalg.BLAS.scal import org.apache.spark.storage.StorageLevel -import breeze.linalg.{DenseVector => BDV} class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Serializable{ private var epsilon: Double = 1e-4 @@ -197,11 +196,12 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser val (muPlus, muMinus) = dataSet.map { point => val z = point.label * dot(initialWeights, point.features) val prob = 1.0 / (1.0 + exp(z)) - scal(prob, point.features) + val scaledFeatures = Vectors.dense(point.features.toArray) + scal(prob, scaledFeatures) if (point.label > 0.0){ - (point.features, Vectors.zeros(numFeatures)) + (scaledFeatures, Vectors.zeros(numFeatures)) } else { - (Vectors.zeros(numFeatures), point.features) + (Vectors.zeros(numFeatures), scaledFeatures) } }.treeAggregate((Vectors.zeros(numFeatures), Vectors.zeros(numFeatures)))(seqOp = func, combOp = func) assert(muMinus.size == muPlus.size) From 28fd74f451388fae0a8a2926701cee6ad05ae443 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 16 Jul 2015 08:54:03 +0800 Subject: [PATCH 31/31] LR --- .../cloudml/zen/ml/classification/LogisticRegression.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala index 2139d653..ec6a0b02 100644 --- a/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala +++ b/ml/src/main/scala/com/github/cloudml/zen/ml/classification/LogisticRegression.scala @@ -208,11 +208,7 @@ class LogisticRegressionMIS(dataSet: RDD[LabeledPoint]) extends Logging with Ser val grads: Array[Double] = new Array[Double](muPlus.size) var i = 0 while (i < muPlus.size) { - grads(i) = if (epsilon == 0.0) { - math.log(muPlus(i) / muMinus(i)) - } else { - math.log(epsilon + muPlus(i) / (epsilon + muMinus(i))) - } + grads(i) = math.log(epsilon + muPlus(i) / (epsilon + muMinus(i))) i += 1 } val thisIterStepSize = stepSize / math.sqrt(iter)