diff --git a/pytensor/tensor/optimize.py b/pytensor/tensor/optimize.py index 8da5f40aeb..7653d01b54 100644 --- a/pytensor/tensor/optimize.py +++ b/pytensor/tensor/optimize.py @@ -1,27 +1,29 @@ import logging from collections.abc import Sequence from copy import copy -from typing import cast import numpy as np import pytensor.scalar as ps from pytensor.compile.function import function -from pytensor.gradient import grad, jacobian +from pytensor.gradient import grad, grad_not_implemented, jacobian from pytensor.graph.basic import Apply, Constant from pytensor.graph.fg import FunctionGraph from pytensor.graph.op import ComputeMapType, HasInnerGraph, Op, StorageMapType from pytensor.graph.replace import graph_replace from pytensor.graph.traversal import ancestors, truncated_graph_inputs +from pytensor.scalar import ScalarType, ScalarVariable from pytensor.tensor.basic import ( atleast_2d, concatenate, + scalar_from_tensor, tensor, tensor_from_scalar, zeros_like, ) from pytensor.tensor.math import dot from pytensor.tensor.slinalg import solve +from pytensor.tensor.type import DenseTensorType from pytensor.tensor.variable import TensorVariable, Variable @@ -126,7 +128,9 @@ def clear_cache(self): self.hess_calls = 0 -def _find_optimization_parameters(objective: TensorVariable, x: TensorVariable): +def _find_optimization_parameters( + objective: TensorVariable, x: TensorVariable +) -> list[Variable]: """ Find the parameters of the optimization problem that are not the variable `x`. @@ -140,30 +144,29 @@ def _find_optimization_parameters(objective: TensorVariable, x: TensorVariable): def _get_parameter_grads_from_vector( - grad_wrt_args_vector: Variable, - x_star: Variable, - args: Sequence[Variable], - output_grad: Variable, -): + grad_wrt_args_vector: TensorVariable, + x_star: TensorVariable, + args: Sequence[TensorVariable | ScalarVariable], + output_grad: TensorVariable, +) -> list[TensorVariable | ScalarVariable]: """ Given a single concatenated vector of objective function gradients with respect to raveled optimization parameters, returns the contribution of each parameter to the total loss function, with the unraveled shape of the parameter. """ - grad_wrt_args_vector = cast(TensorVariable, grad_wrt_args_vector) - x_star = cast(TensorVariable, x_star) - cursor = 0 grad_wrt_args = [] for arg in args: - arg = cast(TensorVariable, arg) arg_shape = arg.shape arg_size = arg_shape.prod() arg_grad = grad_wrt_args_vector[:, cursor : cursor + arg_size].reshape( (*x_star.shape, *arg_shape) ) - grad_wrt_args.append(dot(output_grad, arg_grad)) + grad_wrt_arg = dot(output_grad, arg_grad) + if isinstance(arg.type, ScalarType): + grad_wrt_arg = scalar_from_tensor(grad_wrt_arg) + grad_wrt_args.append(grad_wrt_arg) cursor += arg_size @@ -268,17 +271,16 @@ def build_fn(self): def scalar_implict_optimization_grads( - inner_fx: Variable, - inner_x: Variable, - inner_args: Sequence[Variable], - args: Sequence[Variable], - x_star: Variable, - output_grad: Variable, + inner_fx: TensorVariable, + inner_x: TensorVariable, + inner_args: Sequence[TensorVariable | ScalarVariable], + args: Sequence[TensorVariable | ScalarVariable], + x_star: TensorVariable, + output_grad: TensorVariable, fgraph: FunctionGraph, -) -> list[Variable]: - df_dx, *df_dthetas = cast( - list[Variable], - grad(inner_fx, [inner_x, *inner_args], disconnected_inputs="ignore"), +) -> list[TensorVariable | ScalarVariable]: + df_dx, *df_dthetas = grad( + inner_fx, [inner_x, *inner_args], disconnected_inputs="ignore" ) replace = dict(zip(fgraph.inputs, (x_star, *args), strict=True)) @@ -286,20 +288,20 @@ def scalar_implict_optimization_grads( grad_wrt_args = [ (-df_dtheta_star / df_dx_star) * output_grad - for df_dtheta_star in cast(list[TensorVariable], df_dthetas_stars) + for df_dtheta_star in df_dthetas_stars ] return grad_wrt_args def implict_optimization_grads( - df_dx: Variable, - df_dtheta_columns: Sequence[Variable], - args: Sequence[Variable], - x_star: Variable, - output_grad: Variable, + df_dx: TensorVariable, + df_dtheta_columns: Sequence[TensorVariable], + args: Sequence[TensorVariable | ScalarVariable], + x_star: TensorVariable, + output_grad: TensorVariable, fgraph: FunctionGraph, -): +) -> list[TensorVariable | ScalarVariable]: r""" Compute gradients of an optimization problem with respect to its parameters. @@ -341,21 +343,15 @@ def implict_optimization_grads( fgraph : FunctionGraph The function graph that contains the inputs and outputs of the optimization problem. """ - df_dx = cast(TensorVariable, df_dx) - df_dtheta = concatenate( - [ - atleast_2d(jac_col, left=False) - for jac_col in cast(list[TensorVariable], df_dtheta_columns) - ], + [atleast_2d(jac_col, left=False) for jac_col in df_dtheta_columns], axis=-1, ) replace = dict(zip(fgraph.inputs, (x_star, *args), strict=True)) - df_dx_star, df_dtheta_star = cast( - list[TensorVariable], - graph_replace([atleast_2d(df_dx), df_dtheta], replace=replace), + df_dx_star, df_dtheta_star = graph_replace( + [atleast_2d(df_dx), df_dtheta], replace=replace ) grad_wrt_args_vector = solve(-df_dx_star, df_dtheta_star) @@ -369,20 +365,24 @@ def implict_optimization_grads( class MinimizeScalarOp(ScipyScalarWrapperOp): def __init__( self, - x: Variable, + x: TensorVariable, *args: Variable, - objective: Variable, - method: str = "brent", + objective: TensorVariable, + method: str, optimizer_kwargs: dict | None = None, ): - if not cast(TensorVariable, x).ndim == 0: + if not (isinstance(x, TensorVariable) and x.ndim == 0): raise ValueError( "The variable `x` must be a scalar (0-dimensional) tensor for minimize_scalar." ) - if not cast(TensorVariable, objective).ndim == 0: + if not (isinstance(objective, TensorVariable) and objective.ndim == 0): raise ValueError( "The objective function must be a scalar (0-dimensional) tensor for minimize_scalar." ) + if x not in ancestors([objective]): + raise ValueError( + "The variable `x` must be an input to the computational graph of the objective function." + ) self.fgraph = FunctionGraph([x, *args], [objective]) self.method = method @@ -416,7 +416,19 @@ def perform(self, node, inputs, outputs): outputs[1][0] = np.bool_(res.success) def L_op(self, inputs, outputs, output_grads): + # TODO: Handle disconnected inputs x, *args = inputs + if non_supported_types := tuple( + inp.type + for inp in inputs + if not isinstance(inp.type, DenseTensorType | ScalarType) + ): + # TODO: Support SparseTensorTypes + # TODO: Remaining types are likely just disconnected anyway + msg = f"Minimize gradient not implemented due to inputs of type {non_supported_types}" + return [ + grad_not_implemented(self, i, inp, msg) for i, inp in enumerate(inputs) + ] x_star, _ = outputs output_grad, _ = output_grads @@ -468,7 +480,6 @@ def minimize_scalar( Symbolic boolean flag indicating whether the minimization routine reported convergence to a minimum value, based on the requested convergence criteria. """ - args = _find_optimization_parameters(objective, x) minimize_scalar_op = MinimizeScalarOp( @@ -479,9 +490,7 @@ def minimize_scalar( optimizer_kwargs=optimizer_kwargs, ) - solution, success = cast( - tuple[TensorVariable, TensorVariable], minimize_scalar_op(x, *args) - ) + solution, success = minimize_scalar_op(x, *args) return solution, success @@ -489,17 +498,21 @@ def minimize_scalar( class MinimizeOp(ScipyVectorWrapperOp): def __init__( self, - x: Variable, + x: TensorVariable, *args: Variable, - objective: Variable, - method: str = "BFGS", + objective: TensorVariable, + method: str, jac: bool = True, hess: bool = False, hessp: bool = False, use_vectorized_jac: bool = False, optimizer_kwargs: dict | None = None, ): - if not cast(TensorVariable, objective).ndim == 0: + if not (isinstance(x, TensorVariable) and x.ndim in (0, 1)): + raise ValueError( + "The variable `x` must be a scalar or vector (0-or-1-dimensional) tensor for minimize." + ) + if not (isinstance(objective, TensorVariable) and objective.ndim == 0): raise ValueError( "The objective function must be a scalar (0-dimensional) tensor for minimize." ) @@ -512,19 +525,14 @@ def __init__( self.use_vectorized_jac = use_vectorized_jac if jac: - grad_wrt_x = cast( - Variable, grad(self.fgraph.outputs[0], self.fgraph.inputs[0]) - ) + grad_wrt_x = grad(self.fgraph.outputs[0], self.fgraph.inputs[0]) self.fgraph.add_output(grad_wrt_x) if hess: - hess_wrt_x = cast( - Variable, - jacobian( - self.fgraph.outputs[-1], - self.fgraph.inputs[0], - vectorize=use_vectorized_jac, - ), + hess_wrt_x = jacobian( + self.fgraph.outputs[-1], + self.fgraph.inputs[0], + vectorize=use_vectorized_jac, ) self.fgraph.add_output(hess_wrt_x) @@ -570,7 +578,19 @@ def perform(self, node, inputs, outputs): outputs[1][0] = np.bool_(res.success) def L_op(self, inputs, outputs, output_grads): + # TODO: Handle disconnected inputs x, *args = inputs + if non_supported_types := tuple( + inp.type + for inp in inputs + if not isinstance(inp.type, DenseTensorType | ScalarType) + ): + # TODO: Support SparseTensorTypes + # TODO: Remaining types are likely just disconnected anyway + msg = f"MinimizeOp gradient not implemented due to inputs of type {non_supported_types}" + return [ + grad_not_implemented(self, i, inp, msg) for i, inp in enumerate(inputs) + ] x_star, _success = outputs output_grad, _ = output_grads @@ -654,9 +674,7 @@ def minimize( optimizer_kwargs=optimizer_kwargs, ) - solution, success = cast( - tuple[TensorVariable, TensorVariable], minimize_op(x, *args) - ) + solution, success = minimize_op(x, *args) return solution, success @@ -664,21 +682,23 @@ def minimize( class RootScalarOp(ScipyScalarWrapperOp): def __init__( self, - variables, - *args, - equation, - method, + variables: TensorVariable, + *args: Variable, + equation: TensorVariable, + method: str, jac: bool = False, hess: bool = False, optimizer_kwargs=None, ): - if not equation.ndim == 0: + if not (isinstance(variables, TensorVariable) and variables.ndim == 0): + raise ValueError( + "The variable `x` must be a scalar (0-dimensional) tensor for root_scalar." + ) + if not (isinstance(equation, TensorVariable) and equation.ndim == 0): raise ValueError( "The equation must be a scalar (0-dimensional) tensor for root_scalar." ) - if not isinstance(variables, Variable) or variables not in ancestors( - [equation] - ): + if variables not in ancestors([equation]): raise ValueError( "The variable `variables` must be an input to the computational graph of the equation." ) @@ -686,9 +706,7 @@ def __init__( self.fgraph = FunctionGraph([variables, *args], [equation]) if jac: - f_prime = cast( - Variable, grad(self.fgraph.outputs[0], self.fgraph.inputs[0]) - ) + f_prime = grad(self.fgraph.outputs[0], self.fgraph.inputs[0]) self.fgraph.add_output(f_prime) if hess: @@ -697,9 +715,7 @@ def __init__( "Cannot set `hess=True` without `jac=True`. No methods use second derivatives without also" " using first derivatives." ) - f_double_prime = cast( - Variable, grad(self.fgraph.outputs[-1], self.fgraph.inputs[0]) - ) + f_double_prime = grad(self.fgraph.outputs[-1], self.fgraph.inputs[0]) self.fgraph.add_output(f_double_prime) self.method = method @@ -741,7 +757,19 @@ def perform(self, node, inputs, outputs): outputs[1][0] = np.bool_(res.converged) def L_op(self, inputs, outputs, output_grads): + # TODO: Handle disconnected inputs x, *args = inputs + if non_supported_types := tuple( + inp.type + for inp in inputs + if not isinstance(inp.type, DenseTensorType | ScalarType) + ): + # TODO: Support SparseTensorTypes + # TODO: Remaining types are likely just disconnected anyway + msg = f"RootScalarOp gradient not implemented due to inputs of type {non_supported_types}" + return [ + grad_not_implemented(self, i, inp, msg) for i, inp in enumerate(inputs) + ] x_star, _ = outputs output_grad, _ = output_grads @@ -813,9 +841,7 @@ def root_scalar( optimizer_kwargs=optimizer_kwargs, ) - solution, success = cast( - tuple[TensorVariable, TensorVariable], root_scalar_op(variable, *args) - ) + solution, success = root_scalar_op(variable, *args) return solution, success @@ -825,15 +851,19 @@ class RootOp(ScipyVectorWrapperOp): def __init__( self, - variables: Variable, + variables: TensorVariable, *args: Variable, - equations: Variable, - method: str = "hybr", + equations: TensorVariable, + method: str, jac: bool = True, optimizer_kwargs: dict | None = None, use_vectorized_jac: bool = False, ): - if cast(TensorVariable, variables).ndim != cast(TensorVariable, equations).ndim: + if not isinstance(variables, TensorVariable): + raise ValueError("The variable `variables` must be a tensor for root.") + if not isinstance(equations, TensorVariable): + raise ValueError("The equations must be a tensor for root.") + if variables.ndim != equations.ndim: raise ValueError( "The variable `variables` must have the same number of dimensions as the equations." ) @@ -843,6 +873,7 @@ def __init__( ) self.fgraph = FunctionGraph([variables, *args], [equations]) + self.use_vectorized_jac = use_vectorized_jac if jac: jac_wrt_x = jacobian( @@ -916,13 +947,20 @@ def perform(self, node, inputs, outputs): outputs[0][0] = res.x.reshape(variables.shape).astype(variables.dtype) outputs[1][0] = np.bool_(res.success) - def L_op( - self, - inputs: Sequence[Variable], - outputs: Sequence[Variable], - output_grads: Sequence[Variable], - ) -> list[Variable]: + def L_op(self, inputs, outputs, output_grads): + # TODO: Handle disconnected inputs x, *args = inputs + if non_supported_types := tuple( + inp.type + for inp in inputs + if not isinstance(inp.type, DenseTensorType | ScalarType) + ): + # TODO: Support SparseTensorTypes + # TODO: Remaining types are likely just disconnected anyway + msg = f"RootOp gradient not implemented due to inputs of type {non_supported_types}" + return [ + grad_not_implemented(self, i, inp, msg) for i, inp in enumerate(inputs) + ] x_star, _ = outputs output_grad, _ = output_grads @@ -930,12 +968,15 @@ def L_op( inner_fx = self.fgraph.outputs[0] df_dx = ( - jacobian(inner_fx, inner_x, vectorize=True) + jacobian(inner_fx, inner_x, vectorize=self.use_vectorized_jac) if not self.jac else self.fgraph.outputs[1] ) df_dtheta_columns = jacobian( - inner_fx, inner_args, disconnected_inputs="ignore", vectorize=True + inner_fx, + inner_args, + disconnected_inputs="ignore", + vectorize=self.use_vectorized_jac, ) grad_wrt_args = implict_optimization_grads( @@ -1004,9 +1045,7 @@ def root( use_vectorized_jac=use_vectorized_jac, ) - solution, success = cast( - tuple[TensorVariable, TensorVariable], root_op(variables, *args) - ) + solution, success = root_op(variables, *args) return solution, success diff --git a/scripts/mypy-failing.txt b/scripts/mypy-failing.txt index 11da9ed34c..ff73de2605 100644 --- a/scripts/mypy-failing.txt +++ b/scripts/mypy-failing.txt @@ -15,6 +15,7 @@ pytensor/tensor/blas_headers.py pytensor/tensor/elemwise.py pytensor/tensor/extra_ops.py pytensor/tensor/math.py +pytensor/tensor/optimize.py pytensor/tensor/random/basic.py pytensor/tensor/random/op.py pytensor/tensor/random/utils.py diff --git a/tests/tensor/test_optimize.py b/tests/tensor/test_optimize.py index f5bb59d6b4..b211381e30 100644 --- a/tests/tensor/test_optimize.py +++ b/tests/tensor/test_optimize.py @@ -3,9 +3,10 @@ import pytensor import pytensor.tensor as pt -from pytensor import config, function -from pytensor.graph import Apply, Op -from pytensor.tensor import scalar +from pytensor import Variable, config, function +from pytensor.gradient import NullTypeGradError, disconnected_type +from pytensor.graph import Apply, Op, Type +from pytensor.tensor import alloc, scalar, scalar_from_tensor, tensor_from_scalar from pytensor.tensor.optimize import minimize, minimize_scalar, root, root_scalar from tests import unittest_tools as utt @@ -224,7 +225,7 @@ def root_fn(x, a, b): @pytest.mark.parametrize("optimize_op", (minimize, root)) -def test_minimize_0d(optimize_op): +def test_optimize_0d(optimize_op): # Scipy vector minimizers upcast 0d x to 1d. We need to work-around this class AssertScalar(Op): @@ -248,3 +249,106 @@ def L_op(self, inputs, outputs, out_grads): np.testing.assert_allclose( opt_x_res, 0, atol=1e-15 if floatX == "float64" else 1e-6 ) + + +@pytest.mark.parametrize("optimize_op", (minimize, minimize_scalar, root, root_scalar)) +def test_optimize_grad_scalar_arg(optimize_op): + # Regression test for https://github.com/pymc-devs/pytensor/pull/1744 + x = scalar("x") + theta = scalar("theta") + theta_scalar = scalar_from_tensor(theta) + obj = tensor_from_scalar((scalar_from_tensor(x) + theta_scalar) ** 2) + x0, _ = optimize_op(obj, x) + + # Confirm theta is a direct input to the node + assert x0.owner.inputs[1] is theta_scalar + + grad_wrt_theta = pt.grad(x0, theta) + np.testing.assert_allclose(grad_wrt_theta.eval({x: np.pi, theta: np.e}), -1) + + +@pytest.mark.parametrize("optimize_op", (minimize, minimize_scalar, root, root_scalar)) +def test_optimize_grad_disconnected_numerical_inp(optimize_op): + x = scalar("x", dtype="float64") + theta = scalar("theta", dtype="int64") + obj = alloc(x**2, theta).sum() # repeat theta times and sum + x0, _ = optimize_op(obj, x) + + # Confirm theta is a direct input to the node + assert x0.owner.inputs[1] is theta + + # This should technically raise, but does not right now + grad_wrt_theta = pt.grad(x0, theta, disconnected_inputs="raise") + np.testing.assert_allclose(grad_wrt_theta.eval({x: np.pi, theta: 5}), 0) + + # This should work even if the previous one raised + grad_wrt_theta = pt.grad(x0, theta, disconnected_inputs="ignore") + np.testing.assert_allclose(grad_wrt_theta.eval({x: np.pi, theta: 5}), 0) + + +@pytest.mark.parametrize("optimize_op", (minimize, minimize_scalar, root, root_scalar)) +def test_optimize_grad_disconnected_non_numerical_inp(optimize_op): + class StrType(Type): + def filter(self, x, **kwargs): + if isinstance(x, str): + return x + raise TypeError + + class SmileOrFrown(Op): + def make_node(self, x, str_emoji): + return Apply(self, [x, str_emoji], [x.type()]) + + def perform(self, node, inputs, output_storage): + [x, str_emoji] = inputs + match str_emoji: + case ":)": + out = np.array(x) + case ":(": + out = np.array(-x) + case _: + ValueError("str_emoji must be a smile or a frown") + output_storage[0][0] = out + + def connection_pattern(self, node): + # Gradient connected only to first input + return [[True], [False]] + + def L_op(self, inputs, outputs, output_gradients): + [_x, str_emoji] = inputs + [g] = output_gradients + return [ + self(g, str_emoji), + disconnected_type(), + ] + + # We could try to use real types like NoneTypeT or SliceType, but this is more robust to future API changes + str_type = StrType() + smile_or_frown = SmileOrFrown() + + x = scalar("x", dtype="float64") + num_theta = pt.scalar("num_theta", dtype="float64") + str_theta = Variable(str_type, None, None, name="str_theta") + obj = (smile_or_frown(x, str_theta) + num_theta) ** 2 + x_star, _ = optimize_op(obj, x) + + # Confirm thetas are direct inputs to the node + assert set(x_star.owner.inputs[1:]) == {num_theta, str_theta} + + # Confirm forward pass works, no point in worrying about gradient otherwise + np.testing.assert_allclose( + x_star.eval({x: np.pi, num_theta: np.e, str_theta: ":)"}), + -np.e, + ) + np.testing.assert_allclose( + x_star.eval({x: np.pi, num_theta: np.e, str_theta: ":("}), + np.e, + ) + + with pytest.raises(NullTypeGradError): + pt.grad(x_star, str_theta, disconnected_inputs="raise") + + # This could be supported, but it is not right now. + with pytest.raises(NullTypeGradError): + _grad_wrt_num_theta = pt.grad(x_star, num_theta, disconnected_inputs="raise") + # np.testing.assert_allclose(grad_wrt_num_theta.eval({x: np.pi, num_theta: np.e, str_theta: ":)"}), -1) + # np.testing.assert_allclose(grad_wrt_num_theta.eval({x: np.pi, num_theta: np.e, str_theta: ":("}), 1)