Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies = [
"pydantic>=1.10.8",
"rich>=13.3.5",
"scikit-learn>=1.2.2",
"scipy>=1.15.3",
"seaborn>=0.12.2",
]

Expand Down
183 changes: 125 additions & 58 deletions src/random_tree_models/models/gradientboostedtrees.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
import sklearn.base as base
from rich.progress import track
from scipy.optimize import minimize_scalar
from sklearn.utils.multiclass import (
check_classification_targets,
type_of_target,
Expand All @@ -17,7 +18,7 @@
DecisionTreeRegressor,
)
from random_tree_models.params import MetricNames, is_greater_zero
from random_tree_models.utils import bool_to_float
from random_tree_models.utils import vectorize_bool_to_float


class GradientBoostedTreesTemplate(base.BaseEstimator):
Expand Down Expand Up @@ -53,32 +54,85 @@ def predict(self, X: np.ndarray) -> np.ndarray:
raise NotImplementedError()


def get_pseudo_residual_mse(y: np.ndarray, current_estimates: np.ndarray) -> np.ndarray:
"""
mse loss = sum_i (y_i - estimate_i)^2
pseudo residual_i = d mse loss(y,estimate) / d estimate_i = - (y_i - estimate_i)
since we want to apply it as the negative gradient for steepest descent we flip the sign
"""
return y - current_estimates


def get_pseudo_residual_log_odds(
y: np.ndarray, current_estimates: np.ndarray
) -> np.ndarray:
"""
# dloss/dyhat, g in the xgboost paper
"""
return 2 * y / (1 + np.exp(2 * y * current_estimates))


def get_start_estimate_mse(y: np.ndarray) -> float:
return float(np.mean(y))


def get_start_estimate_log_odds(y: np.ndarray) -> float:
"""
1/2 log(1+ym)/(1-ym) because ym is in [-1, 1]
equivalent to log(ym)/(1-ym) if ym were in [0, 1]
"""
ym = np.mean(y)
if ym == 1:
return math.inf
elif ym == -1:
return -math.inf
start_estimate = 0.5 * math.log((1 + ym) / (1 - ym))
return start_estimate


def find_step_size(
y: np.ndarray, current_estimates: np.ndarray, h: np.ndarray
) -> float:
"""
Finds the optimal step size gamma for the update rule:
new_estimate = current_estimates + gamma * h

This is done by minimizing the MSE loss function with respect to gamma.
loss(gamma) = sum((y - (current_estimates + gamma * h))^2)
"""

def loss(gamma: float) -> float:
return float(np.sum((y - (current_estimates + gamma * h)) ** 2))

res = minimize_scalar(loss)
if res.success:
return float(res.x)
else:
# Fallback or error handling
return 1.0


def get_probabilities_from_mapped_bools(h: np.ndarray) -> np.ndarray:
proba = 1 / (1 + np.exp(-2.0 * h))
proba = np.array([1 - proba, proba]).T
return proba


class GradientBoostedTreesRegressor(
base.RegressorMixin,
GradientBoostedTreesTemplate,
):
"""OG Gradient boosted trees regressor

Friedman 2001, Greedy Function Approximation: A Gradient Boosting Machine
https://www.jstor.org/stable/2699986

Algorithm 2 (LS_Boost)

y = our continuous target
M = number of boosts

start_estimate = mean(y)
for m = 1 to M do:
dy = y - prev_estimate
new_rho, new_estimator = arg min(rho, estimator) mse(dy, rho*estimator(x))
new_estimate = prev_estimate + new_rho * new_estimator(x)
prev_estimate = new_estimate
https://www.jstor.org/stable/2699986 -> Algorithm 2 (LS_Boost)
https://en.wikipedia.org/wiki/Gradient_boosting#Algorithm
https://maelfabien.github.io/machinelearning/GradientBoost/#implement-a-high-level-gradient-boosting-in-python
"""

def __init__(
self,
measure_name: MetricNames = MetricNames.variance,
factor: float = 1.0,
n_trees: int = 3,
max_depth: int = 2,
min_improvement: float = 0.0,
Expand All @@ -91,47 +145,58 @@ def __init__(
min_improvement=min_improvement,
ensure_all_finite=ensure_all_finite,
)
self.factor = factor

def fit(self, X: np.ndarray, y: np.ndarray) -> "GradientBoostedTreesRegressor":
X, y = validate_data(self, X, y, ensure_all_finite=False)
X, y = validate_data(self, X, y, ensure_all_finite=self.ensure_all_finite)

self.trees_: list[DecisionTreeRegressor] = []

self.start_estimate_ = np.mean(y)

# initial differences to predict
_y = y - self.start_estimate_
self.start_estimate_ = get_start_estimate_mse(y)
current_estimates = self.start_estimate_ * np.ones_like(y)
self.step_sizes_: list[float] = []

for _ in track(range(self.n_trees), total=self.n_trees, description="tree"):
r = get_pseudo_residual_mse(y, current_estimates)

# train decision tree to predict differences
new_tree = DecisionTreeRegressor(
measure_name=self.measure_name,
max_depth=self.max_depth,
min_improvement=self.min_improvement,
)
new_tree.fit(X, _y)
new_tree.fit(X, r)
self.trees_.append(new_tree)

h = new_tree.predict(X) # estimate of pseudo residual

# find one optimal step size to rule them all
gamma = find_step_size(y, current_estimates, h)
self.step_sizes_.append(gamma)

# update differences to predict
dy = self.factor * new_tree.predict(X)
_y = _y - dy
current_estimates = current_estimates + gamma * h

return self

def predict(self, X: np.ndarray) -> np.ndarray:
check_is_fitted(self, ("trees_", "n_features_in_", "start_estimate_"))
check_is_fitted(
self, ("trees_", "n_features_in_", "start_estimate_", "step_sizes_")
)

X = validate_data(self, X, reset=False, ensure_all_finite=False)
X = validate_data(
self, X, reset=False, ensure_all_finite=self.ensure_all_finite
)

# baseline estimate
y = np.ones(X.shape[0]) * self.start_estimate_

# improve on baseline
for tree in track(
self.trees_, description="tree", total=len(self.trees_)
for tree, step_size in track(
zip(self.trees_, self.step_sizes_),
description="tree",
total=len(self.trees_),
): # loop boosts
dy = self.factor * tree.predict(X)
dy = step_size * tree.predict(X)
y += dy

return y
Expand Down Expand Up @@ -209,10 +274,6 @@ def __init__(
ensure_all_finite=ensure_all_finite,
)

def _bool_to_float(self, y: np.ndarray) -> np.ndarray:
f = np.vectorize(bool_to_float)
return f(y)

def _more_tags(self) -> T.Dict[str, bool]:
"""Describes to scikit-learn parametrize_with_checks the scope of this class

Expand All @@ -227,7 +288,7 @@ def __sklearn_tags__(self):
return tags

def fit(self, X: np.ndarray, y: np.ndarray) -> "GradientBoostedTreesClassifier":
X, y = validate_data(self, X, y, ensure_all_finite=False)
X, y = validate_data(self, X, y, ensure_all_finite=self.ensure_all_finite)

check_classification_targets(y)

Expand All @@ -243,52 +304,58 @@ def fit(self, X: np.ndarray, y: np.ndarray) -> "GradientBoostedTreesClassifier":

self.classes_, y = np.unique(y, return_inverse=True)
self.trees_: list[DecisionTreeRegressor] = []
self.gammas_ = []

y = self._bool_to_float(y)
ym = np.mean(y)
self.start_estimate_ = 0.5 * math.log((1 + ym) / (1 - ym))

yhat = np.ones_like(y) * self.start_estimate_
y = vectorize_bool_to_float(y) # True -> 1, False -> -1
self.start_estimate_ = get_start_estimate_log_odds(y)
current_estimates = self.start_estimate_ * np.ones_like(y)
self.step_sizes_: list[float] = []

for _ in track(range(self.n_trees), description="tree", total=self.n_trees):
g = (
2 * y / (1 + np.exp(2 * y * yhat))
) # dloss/dyhat, g in the xgboost paper
r = get_pseudo_residual_log_odds(y, current_estimates)

new_tree = DecisionTreeRegressor(
measure_name=self.measure_name,
max_depth=self.max_depth,
min_improvement=self.min_improvement,
)
new_tree.fit(X, y, g=g)
new_tree.fit(X, y, g=r)
self.trees_.append(new_tree)

# update _y
g_pred = new_tree.predict(X)
yhat += g_pred
h = new_tree.predict(X) # estimate of pseudo residual
# find one optimal step size to rule them all
gamma = find_step_size(y, current_estimates, h)
self.step_sizes_.append(gamma)

# update differences to predict
current_estimates = current_estimates + gamma * h

return self

def predict_proba(self, X: np.ndarray) -> np.ndarray:
check_is_fitted(self, ("trees_", "classes_", "gammas_", "n_features_in_"))
X = validate_data(self, X, reset=False, ensure_all_finite=False)
check_is_fitted(
self,
("trees_", "classes_", "n_features_in_", "start_estimate_", "step_sizes_"),
)
X = validate_data(
self, X, reset=False, ensure_all_finite=self.ensure_all_finite
)

g = np.ones(X.shape[0]) * self.start_estimate_
h = np.ones(X.shape[0]) * self.start_estimate_

for _, tree in track(
enumerate(self.trees_), description="tree", total=len(self.trees_)
for tree, step_size in track(
zip(self.trees_, self.step_sizes_),
description="tree",
total=len(self.trees_),
): # loop boosts
g += tree.predict(X)
h += step_size * tree.predict(X)

proba = 1 / (1 + np.exp(-2.0 * g))
proba = np.array([1 - proba, proba]).T
return proba
p = get_probabilities_from_mapped_bools(h)
return p

def predict(self, X: np.ndarray) -> np.ndarray:
proba = self.predict_proba(X)
p = self.predict_proba(X)

ix = np.argmax(proba, axis=1)
ix = np.argmax(p, axis=1)
y = self.classes_[ix]

return y
84 changes: 84 additions & 0 deletions tests/models/test_gradientboostedtrees.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import math

import numpy as np
import pytest
from sklearn.utils.estimator_checks import parametrize_with_checks
Expand Down Expand Up @@ -86,3 +88,85 @@ def test_gbt_estimators_with_sklearn_checks(estimator, check):
Reference: https://scikit-learn.org/stable/modules/generated/sklearn.utils.estimator_checks.parametrize_with_checks.html#sklearn.utils.estimator_checks.parametrize_with_checks
"""
check(estimator)


def test_get_pseudo_residual_mse():
y = np.array([1.0, 2.0, 3.0])
current_estimates = np.array([0.5, 1.0, 2.0])
expected_residuals = np.array([0.5, 1.0, 1.0])
actual_residuals = gbt.get_pseudo_residual_mse(y, current_estimates)
assert np.allclose(actual_residuals, expected_residuals)

# Test with negative values
y = np.array([-1.0, -2.0, -3.0])
current_estimates = np.array([-0.5, -1.0, -2.0])
expected_residuals = np.array([-0.5, -1.0, -1.0])
actual_residuals = gbt.get_pseudo_residual_mse(y, current_estimates)
assert np.allclose(actual_residuals, expected_residuals)

# Test with zero values
y = np.array([0.0, 0.0, 0.0])
current_estimates = np.array([0.0, 0.0, 0.0])
expected_residuals = np.array([0.0, 0.0, 0.0])
actual_residuals = gbt.get_pseudo_residual_mse(y, current_estimates)
assert np.allclose(actual_residuals, expected_residuals)


def test_get_pseudo_residual_log_odds():
# Test case 1: Basic test with positive and negative values
y = np.array([1, -1, 1, -1])
current_estimates = np.array([0.1, 0.2, -0.1, -0.2])
expected_residuals = 2 * y / (1 + np.exp(2 * y * current_estimates))
actual_residuals = gbt.get_pseudo_residual_log_odds(y, current_estimates)
assert np.allclose(actual_residuals, expected_residuals)

# Test case 2: y close to zero
y = np.array([0.001, -0.001])
current_estimates = np.array([0.5, 0.5])
expected_residuals = 2 * y / (1 + np.exp(2 * y * current_estimates))
actual_residuals = gbt.get_pseudo_residual_log_odds(y, current_estimates)
assert np.allclose(actual_residuals, expected_residuals)

# Test case 3: current_estimates close to zero
y = np.array([1, -1])
current_estimates = np.array([0.001, -0.001])
expected_residuals = 2 * y / (1 + np.exp(2 * y * current_estimates))
actual_residuals = gbt.get_pseudo_residual_log_odds(y, current_estimates)
assert np.allclose(actual_residuals, expected_residuals)

# Test case 4: Larger current_estimates
y = np.array([1, -1])
current_estimates = np.array([2, -2])
expected_residuals = 2 * y / (1 + np.exp(2 * y * current_estimates))
actual_residuals = gbt.get_pseudo_residual_log_odds(y, current_estimates)
assert np.allclose(actual_residuals, expected_residuals)


def test_get_start_estimate_log_odds():
# Test case 1: Balanced classes (mean close to 0)
y = np.array([1, -1, 1, -1])
actual_start_estimate = gbt.get_start_estimate_log_odds(y)
assert np.isclose(actual_start_estimate, 0.0)

# Test case 2: All positive class
y = np.array([1, 1, 1, 1])
actual_start_estimate = gbt.get_start_estimate_log_odds(y)
assert math.isinf(actual_start_estimate)

# Test case 3: All negative class
y = np.array([-1, -1, -1, -1])
actual_start_estimate = gbt.get_start_estimate_log_odds(y)
assert math.isinf(actual_start_estimate)

# Test case 4: Unbalanced classes
y = np.array([1, 1, 1, -1])
ym = np.mean(y)
actual_start_estimate = gbt.get_start_estimate_log_odds(y)
v = 0.5493061443340549
assert np.isclose(actual_start_estimate, v)

# Test case 5: Another set of unbalanced classes
y = np.array([1, -1, -1, -1])
ym = np.mean(y)
actual_start_estimate = gbt.get_start_estimate_log_odds(y)
assert np.isclose(actual_start_estimate, -v)
Loading
Loading