Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ numpy>=1.22
pyarrow>=18.0.0
six==1.16.0
pandas>=2.2.0
scipy
scipy>=1.8.0
plotly<6.0.0
mlflow>=2.3.1
scikit-learn
Expand Down
2 changes: 2 additions & 0 deletions dev/spark-test-image/lint/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ RUN python3.11 -m pip install \
'pyarrow>=22.0.0' \
'pytest-mypy-plugins==1.9.3' \
'pytest==7.1.3' \
'scipy>=1.8.0' \
'scipy-stubs' \
&& python3.11 -m pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu \
&& python3.11 -m pip install torcheval \
&& python3.11 -m pip cache purge
8 changes: 5 additions & 3 deletions python/pyspark/ml/_typing.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@
# specific language governing permissions and limitations
# under the License.

from typing import Any, Dict, List, TypeVar, Tuple, Union
from typing import Any, Dict, List, TYPE_CHECKING, TypeVar, Tuple, Union
from typing_extensions import Literal

from numpy import ndarray
from py4j.java_gateway import JavaObject

import pyspark.ml.base
import pyspark.ml.param
import pyspark.ml.util
from pyspark.ml.linalg import Vector
import pyspark.ml.wrapper

if TYPE_CHECKING:
from scipy.sparse import spmatrix, sparray

ParamMap = Dict[pyspark.ml.param.Param, Any]
PipelineStage = Union[pyspark.ml.base.Estimator, pyspark.ml.base.Transformer]

Expand Down Expand Up @@ -81,4 +83,4 @@ RankingEvaluatorMetricType = Union[
Literal["recallAtK"],
]

VectorLike = Union[ndarray, Vector, List[float], Tuple[float, ...]]
VectorLike = Union[ndarray, Vector, List[float], Tuple[float, ...], "spmatrix", "sparray", range]
51 changes: 28 additions & 23 deletions python/pyspark/ml/linalg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
if TYPE_CHECKING:
from pyspark.mllib._typing import NormType
from pyspark.ml._typing import VectorLike
from scipy.sparse import spmatrix


# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
Expand All @@ -85,23 +84,25 @@
_have_scipy = False


def _convert_to_vector(d: Union["VectorLike", "spmatrix", range]) -> "Vector":
def _convert_to_vector(d: "VectorLike") -> "Vector":
if isinstance(d, Vector):
return d
elif type(d) in (array.array, np.array, np.ndarray, list, tuple, range):
elif isinstance(d, (array.array, np.ndarray, list, tuple, range)):
return DenseVector(d)
elif _have_scipy and scipy.sparse.issparse(d):
assert cast("spmatrix", d).shape[1] == 1, "Expected column vector"
assert hasattr(d, "shape")
assert d.shape[1] == 1, "Expected column vector"
# Make sure the converted csc_matrix has sorted indices.
csc = cast("spmatrix", d).tocsc()
assert hasattr(d, "tocsc")
csc = d.tocsc()
if not csc.has_sorted_indices:
csc.sort_indices()
return SparseVector(cast("spmatrix", d).shape[0], csc.indices, csc.data)
return SparseVector(d.shape[0], csc.indices, csc.data)
else:
raise TypeError("Cannot convert type %s into Vector" % type(d))


def _vector_size(v: Union["VectorLike", "spmatrix", range]) -> int:
def _vector_size(v: "VectorLike") -> int:
"""
Returns the size of the vector.

Expand All @@ -124,16 +125,17 @@ def _vector_size(v: Union["VectorLike", "spmatrix", range]) -> int:
"""
if isinstance(v, Vector):
return len(v)
elif type(v) in (array.array, list, tuple, range):
elif isinstance(v, (array.array, list, tuple, range)):
return len(v)
elif type(v) == np.ndarray:
elif isinstance(v, np.ndarray):
if v.ndim == 1 or (v.ndim == 2 and v.shape[1] == 1):
return len(v)
else:
raise ValueError("Cannot treat an ndarray of shape %s as a vector" % str(v.shape))
elif _have_scipy and scipy.sparse.issparse(v):
assert cast("spmatrix", v).shape[1] == 1, "Expected column vector"
return cast("spmatrix", v).shape[0]
assert hasattr(v, "shape")
assert v.shape[1] == 1, "Expected column vector"
return v.shape[0]
else:
raise TypeError("Cannot treat type %s as a vector" % type(v))

Expand Down Expand Up @@ -337,13 +339,13 @@ def __init__(self, ar: Union[bytes, np.ndarray, Iterable[float]]):
def __reduce__(self) -> Tuple[Type["DenseVector"], Tuple[bytes]]:
return DenseVector, (self.array.tobytes(),)

def numNonzeros(self) -> int:
def numNonzeros(self) -> Union[int, np.intp]:
"""
Number of nonzero elements. This scans all active values and count non zeros
"""
return np.count_nonzero(self.array)

def norm(self, p: "NormType") -> np.float64:
def norm(self, p: "NormType") -> np.floating[Any]:
"""
Calculates the norm of a DenseVector.

Expand Down Expand Up @@ -386,15 +388,17 @@ def dot(self, other: Iterable[float]) -> np.float64:
...
AssertionError: dimension mismatch
"""
if type(other) == np.ndarray:
if isinstance(other, np.ndarray):
if other.ndim > 1:
assert len(self) == other.shape[0], "dimension mismatch"
return np.dot(self.array, other)
elif _have_scipy and scipy.sparse.issparse(other):
assert len(self) == cast("spmatrix", other).shape[0], "dimension mismatch"
return cast("spmatrix", other).transpose().dot(self.toArray())
assert hasattr(other, "shape")
assert len(self) == other.shape[0], "dimension mismatch"
assert hasattr(other, "transpose")
return other.transpose().dot(self.toArray())
else:
assert len(self) == _vector_size(other), "dimension mismatch"
assert len(self) == _vector_size(other), "dimension mismatch" # type: ignore[arg-type]
if isinstance(other, SparseVector):
return other.dot(self)
elif isinstance(other, Vector):
Expand Down Expand Up @@ -429,10 +433,11 @@ def squared_distance(self, other: Iterable[float]) -> np.float64:
...
AssertionError: dimension mismatch
"""
assert len(self) == _vector_size(other), "dimension mismatch"
assert len(self) == _vector_size(other), "dimension mismatch" # type: ignore[arg-type]
if isinstance(other, SparseVector):
return other.squared_distance(self)
elif _have_scipy and scipy.sparse.issparse(other):
assert isinstance(other, scipy.sparse.spmatrix), "other must be a scipy.sparse.spmatrix"
return _convert_to_vector(other).squared_distance(self) # type: ignore[attr-defined]

if isinstance(other, Vector):
Expand Down Expand Up @@ -636,13 +641,13 @@ def __init__(
)
assert np.min(self.indices) >= 0, "Contains negative index %d" % (np.min(self.indices))

def numNonzeros(self) -> int:
def numNonzeros(self) -> Union[int, np.intp]:
"""
Number of nonzero elements. This scans all active values and count non zeros.
"""
return np.count_nonzero(self.values)

def norm(self, p: "NormType") -> np.float64:
def norm(self, p: "NormType") -> np.floating[Any]:
"""
Calculates the norm of a SparseVector.

Expand Down Expand Up @@ -699,7 +704,7 @@ def dot(self, other: Iterable[float]) -> np.float64:
assert len(self) == other.shape[0], "dimension mismatch"
return np.dot(self.values, other[self.indices])

assert len(self) == _vector_size(other), "dimension mismatch"
assert len(self) == _vector_size(other), "dimension mismatch" # type: ignore[arg-type]

if isinstance(other, DenseVector):
return np.dot(other.array[self.indices], self.values)
Expand All @@ -717,7 +722,7 @@ def dot(self, other: Iterable[float]) -> np.float64:
else:
return self.dot(_convert_to_vector(other)) # type: ignore[arg-type]

def squared_distance(self, other: Iterable[float]) -> np.float64:
def squared_distance(self, other: "VectorLike") -> np.float64:
"""
Squared distance from a SparseVector or 1-dimensional NumPy array.

Expand Down Expand Up @@ -785,7 +790,7 @@ def squared_distance(self, other: Iterable[float]) -> np.float64:
j += 1
return result
else:
return self.squared_distance(_convert_to_vector(other)) # type: ignore[arg-type]
return self.squared_distance(_convert_to_vector(other))

def toArray(self) -> np.ndarray:
"""
Expand Down
8 changes: 6 additions & 2 deletions python/pyspark/mllib/_typing.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@
# specific language governing permissions and limitations
# under the License.

from typing import List, Tuple, TypeVar, Union
from typing import List, Tuple, TYPE_CHECKING, TypeVar, Union

from typing_extensions import Literal
from numpy import ndarray # noqa: F401
from py4j.java_gateway import JavaObject

from pyspark.mllib.linalg import Vector

VectorLike = Union[ndarray, Vector, List[float], Tuple[float, ...]]
if TYPE_CHECKING:
from scipy.sparse import spmatrix, sparray

C = TypeVar("C", bound=type)
JavaObjectOrPickleDump = Union[JavaObject, bytearray, bytes]

CorrMethodType = Union[Literal["spearman"], Literal["pearson"]]
KolmogorovSmirnovTestDistNameType = Literal["norm"]
NormType = Union[None, float, Literal["fro"], Literal["nuc"]]

VectorLike = Union[ndarray, Vector, List[float], Tuple[float, ...], "spmatrix", "sparray", range]
50 changes: 27 additions & 23 deletions python/pyspark/mllib/linalg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@

if TYPE_CHECKING:
from pyspark.mllib._typing import VectorLike, NormType
from scipy.sparse import spmatrix
from numpy.typing import ArrayLike


Expand Down Expand Up @@ -94,23 +93,25 @@
_have_scipy = False


def _convert_to_vector(d: Union["VectorLike", "spmatrix", range]) -> "Vector":
def _convert_to_vector(d: "VectorLike") -> "Vector":
if isinstance(d, Vector):
return d
elif type(d) in (array.array, np.array, np.ndarray, list, tuple, range):
elif isinstance(d, (array.array, np.ndarray, list, tuple, range)):
return DenseVector(d)
elif _have_scipy and scipy.sparse.issparse(d):
assert cast("spmatrix", d).shape[1] == 1, "Expected column vector"
assert hasattr(d, "shape")
assert d.shape[1] == 1, "Expected column vector"
# Make sure the converted csc_matrix has sorted indices.
csc = cast("spmatrix", d).tocsc()
assert hasattr(d, "tocsc")
csc = d.tocsc()
if not csc.has_sorted_indices:
csc.sort_indices()
return SparseVector(cast("spmatrix", d).shape[0], csc.indices, csc.data)
return SparseVector(d.shape[0], csc.indices, csc.data)
else:
raise TypeError("Cannot convert type %s into Vector" % type(d))


def _vector_size(v: Union["VectorLike", "spmatrix", range]) -> int:
def _vector_size(v: "VectorLike") -> int:
"""
Returns the size of the vector.

Expand All @@ -133,16 +134,17 @@ def _vector_size(v: Union["VectorLike", "spmatrix", range]) -> int:
"""
if isinstance(v, Vector):
return len(v)
elif type(v) in (array.array, list, tuple, range):
elif isinstance(v, (array.array, list, tuple, range)):
return len(v)
elif type(v) == np.ndarray:
elif isinstance(v, np.ndarray):
if v.ndim == 1 or (v.ndim == 2 and v.shape[1] == 1):
return len(v)
else:
raise ValueError("Cannot treat an ndarray of shape %s as a vector" % str(v.shape))
elif _have_scipy and scipy.sparse.issparse(v):
assert cast("spmatrix", v).shape[1] == 1, "Expected column vector"
return cast("spmatrix", v).shape[0]
assert hasattr(v, "shape")
assert v.shape[1] == 1, "Expected column vector"
return v.shape[0]
else:
raise TypeError("Cannot treat type %s as a vector" % type(v))

Expand Down Expand Up @@ -390,13 +392,13 @@ def parse(s: str) -> "DenseVector":
def __reduce__(self) -> Tuple[Type["DenseVector"], Tuple[bytes]]:
return DenseVector, (self.array.tobytes(),)

def numNonzeros(self) -> int:
def numNonzeros(self) -> Union[int, np.intp]:
"""
Number of nonzero elements. This scans all active values and count non zeros
"""
return np.count_nonzero(self.array)

def norm(self, p: "NormType") -> np.float64:
def norm(self, p: "NormType") -> np.floating[Any]:
"""
Calculates the norm of a DenseVector.

Expand All @@ -410,7 +412,7 @@ def norm(self, p: "NormType") -> np.float64:
"""
return np.linalg.norm(self.array, p)

def dot(self, other: Iterable[float]) -> np.float64:
def dot(self, other: "VectorLike") -> np.float64:
"""
Compute the dot product of two Vectors. We support
(Numpy array, list, SparseVector, or SciPy sparse)
Expand Down Expand Up @@ -444,8 +446,10 @@ def dot(self, other: Iterable[float]) -> np.float64:
assert len(self) == other.shape[0], "dimension mismatch"
return np.dot(self.array, other)
elif _have_scipy and scipy.sparse.issparse(other):
assert len(self) == cast("spmatrix", other).shape[0], "dimension mismatch"
return cast("spmatrix", other).transpose().dot(self.toArray())
assert hasattr(other, "shape")
assert len(self) == other.shape[0], "dimension mismatch"
assert hasattr(other, "transpose")
return other.transpose().dot(self.toArray())
else:
assert len(self) == _vector_size(other), "dimension mismatch"
if isinstance(other, SparseVector):
Expand All @@ -455,7 +459,7 @@ def dot(self, other: Iterable[float]) -> np.float64:
else:
return np.dot(self.toArray(), cast("ArrayLike", other))

def squared_distance(self, other: Iterable[float]) -> np.float64:
def squared_distance(self, other: "VectorLike") -> np.float64:
"""
Squared distance of two Vectors.

Expand Down Expand Up @@ -685,13 +689,13 @@ def __init__(
% (self.indices[i], self.indices[i + 1])
)

def numNonzeros(self) -> int:
def numNonzeros(self) -> Union[int, np.intp]:
"""
Number of nonzero elements. This scans all active values and count non zeros.
"""
return np.count_nonzero(self.values)

def norm(self, p: "NormType") -> np.float64:
def norm(self, p: "NormType") -> np.floating[Any]:
"""
Calculates the norm of a SparseVector.

Expand Down Expand Up @@ -766,7 +770,7 @@ def parse(s: str) -> "SparseVector":
raise ValueError("Unable to parse values from %s." % s)
return SparseVector(cast(int, size), indices, values)

def dot(self, other: Iterable[float]) -> np.float64:
def dot(self, other: "VectorLike") -> np.float64:
"""
Dot product with a SparseVector or 1- or 2-dimensional Numpy array.

Expand Down Expand Up @@ -822,9 +826,9 @@ def dot(self, other: Iterable[float]) -> np.float64:
return np.dot(self_values, other.values[other_cmind])

else:
return self.dot(_convert_to_vector(other)) # type: ignore[arg-type]
return self.dot(_convert_to_vector(other))

def squared_distance(self, other: Iterable[float]) -> np.float64:
def squared_distance(self, other: "VectorLike") -> np.float64:
"""
Squared distance from a SparseVector or 1-dimensional NumPy array.

Expand Down Expand Up @@ -892,7 +896,7 @@ def squared_distance(self, other: Iterable[float]) -> np.float64:
j += 1
return result
else:
return self.squared_distance(_convert_to_vector(other)) # type: ignore[arg-type]
return self.squared_distance(_convert_to_vector(other))

def toArray(self) -> np.ndarray:
"""
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/mllib/linalg/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
VT = TypeVar("VT", bound="Matrix")

if TYPE_CHECKING:
from pyspark.ml._typing import VectorLike
from pyspark.mllib._typing import VectorLike

__all__ = [
"BlockMatrix",
Expand Down
Loading