Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions crossfit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from crossfit.metric import * # noqa
from crossfit.op import * # noqa


__all__ = [
"Aggregator",
"backend",
Expand All @@ -27,21 +26,27 @@


try:
from crossfit.backend.torch import SentenceTransformerModel, TorchExactSearch, HFModel
from crossfit.backend.torch import (
HFModel,
SentenceTransformerModel,
TorchExactSearch,
)
from crossfit.dataset.base import IRDataset, MultiDataset
from crossfit.dataset.load import load_dataset
from crossfit.report.beir.embed import embed
from crossfit.report.beir.report import beir_report
from crossfit.dataset.load import load_dataset
from crossfit.dataset.base import IRDataset, MultiDataset

__all__.extend([
"embed",
"beir_report",
"load_dataset",
"TorchExactSearch",
"SentenceTransformerModel",
"HFModel",
"MultiDataset",
"IRDataset",
])

__all__.extend(
[
"embed",
"beir_report",
"load_dataset",
"TorchExactSearch",
"SentenceTransformerModel",
"HFModel",
"MultiDataset",
"IRDataset",
]
)
except ImportError as e:
pass
5 changes: 4 additions & 1 deletion crossfit/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,10 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, runner=run_command):
if verbose:
fmt = "tag '%s' doesn't start with prefix '%s'"
print(fmt % (full_tag, tag_prefix))
pieces["error"] = "tag '%s' doesn't start with prefix '%s'" % (full_tag, tag_prefix)
pieces["error"] = "tag '%s' doesn't start with prefix '%s'" % (
full_tag,
tag_prefix,
)
return pieces
pieces["closest-tag"] = full_tag[len(tag_prefix) :]

Expand Down
3 changes: 1 addition & 2 deletions crossfit/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from crossfit.backend.numpy.sparse import *
from crossfit.backend.dask.dataframe import *
from crossfit.backend.numpy.sparse import *
from crossfit.backend.pandas.array import *
from crossfit.backend.pandas.dataframe import *


try:
from crossfit.backend.cudf.array import *
from crossfit.backend.cudf.dataframe import *
Expand Down
6 changes: 4 additions & 2 deletions crossfit/backend/cudf/array.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

from crossfit.data.array import conversion
from crossfit.data.array.dispatch import np_backend_dispatch, ArrayBackend
from crossfit.data.array.dispatch import ArrayBackend, np_backend_dispatch


@np_backend_dispatch.register_lazy("cudf")
Expand All @@ -28,7 +28,9 @@ def cudf_to_dlpack(input_array: cudf.Series):

if input_array.dtype.name == "list":
if not input_array.list.len().min() == input_array.list.len().max():
raise NotImplementedError("Cannot convert list column with variable length")
raise NotImplementedError(
"Cannot convert list column with variable length"
)

dim = input_array.list.len().iloc[0]
return input_array.list.leaves.values.reshape(-1, dim).toDlpack()
Expand Down
7 changes: 4 additions & 3 deletions crossfit/backend/cudf/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from typing import Callable


from crossfit.backend.pandas.dataframe import PandasDataFrame
from crossfit.data.array.dispatch import crossarray
from crossfit.data.dataframe.dispatch import CrossFrame
from crossfit.backend.pandas.dataframe import PandasDataFrame


class CudfDataFrame(PandasDataFrame):
Expand All @@ -24,7 +23,9 @@ def apply(self, func: Callable, *args, **kwargs):
# Numba-compilation failed
pass
with crossarray:
return CrossFrame({k: func(v, *args, **kwargs) for k, v in self.data.items()}).cast()
return CrossFrame(
{k: func(v, *args, **kwargs) for k, v in self.data.items()}
).cast()


@CrossFrame.register_lazy("cupy")
Expand Down
6 changes: 4 additions & 2 deletions crossfit/backend/cudf/series.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import cupy as cp
import cudf
import cupy as cp
from cudf.core.column import as_column


Expand All @@ -9,7 +9,9 @@ def create_list_series_from_2d_ar(ar, index):
"""
n_rows, n_cols = ar.shape
data = as_column(ar.flatten())
offset_col = as_column(cp.arange(start=0, stop=len(data) + 1, step=n_cols), dtype="int32")
offset_col = as_column(
cp.arange(start=0, stop=len(data) + 1, step=n_cols), dtype="int32"
)
mask_col = cp.full(shape=n_rows, fill_value=True)
mask = cudf._lib.transform.bools_to_mask(as_column(mask_col))
lc = cudf.core.column.ListColumn(
Expand Down
2 changes: 1 addition & 1 deletion crossfit/backend/cupy/array.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

from crossfit.data.array import conversion
from crossfit.data.array.dispatch import np_backend_dispatch, ArrayBackend
from crossfit.data.array.dispatch import ArrayBackend, np_backend_dispatch


@np_backend_dispatch.register_lazy("cupy")
Expand Down
4 changes: 3 additions & 1 deletion crossfit/backend/cupy/kernels.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ def _numba_sort(idx_ptr, col_idx, data):


@cuda.jit
def _numba_setop(self_idx_ptr, self_col_idx, self_data, other_idx_ptr, other_col_idx, intersect):
def _numba_setop(
self_idx_ptr, self_col_idx, self_data, other_idx_ptr, other_col_idx, intersect
):
i = cuda.grid(1)

if i < len(self_idx_ptr) - 1:
Expand Down
25 changes: 19 additions & 6 deletions crossfit/backend/cupy/sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@


class CPSparseMatrixBackend(SparseMatrixBackend):
def __init__(self, idx_ptr: cp.ndarray, col_idx: cp.ndarray, data: cp.ndarray, shape=None):
def __init__(
self, idx_ptr: cp.ndarray, col_idx: cp.ndarray, data: cp.ndarray, shape=None
):
if shape is None:
if len(col_idx):
M = col_idx.max() + 1
Expand Down Expand Up @@ -49,7 +51,9 @@ def from_matrix(cls, matrix, keep_zeros=False):
if isinstance(matrix, list):
matrix = cp.asarray(matrix, dtype=object).astype(cp.float32)
matrix = cp.atleast_2d(matrix)
if not cp.issubdtype(matrix.dtype, cp.number) or cp.issubdtype(matrix.dtype, cp.bool_):
if not cp.issubdtype(matrix.dtype, cp.number) or cp.issubdtype(
matrix.dtype, cp.bool_
):
raise ValueError("Input must be numeric")
elif matrix.ndim != 2:
raise ValueError("Input arrays need to be 1D or 2D.")
Expand Down Expand Up @@ -79,7 +83,9 @@ def from_lil(cls, rows, data=None, dtype=cp.float32, keep_zeros=False):
data = cp.ones_like(col_idx, dtype=dtype)
else:
data = cp.fromiter(
itertools.chain.from_iterable(data), dtype=dtype, count=idx_ptr[-1].item()
itertools.chain.from_iterable(data),
dtype=dtype,
count=idx_ptr[-1].item(),
)
if keep_zeros:
data += 1 - data[cp.isfinite(data)].min()
Expand All @@ -93,7 +99,9 @@ def from_lil(cls, rows, data=None, dtype=cp.float32, keep_zeros=False):
return instance

def tocsr(self, copy=False):
return sp.csr_matrix((self.data, self.col_idx, self.idx_ptr), copy=copy, shape=self.shape)
return sp.csr_matrix(
(self.data, self.col_idx, self.idx_ptr), copy=copy, shape=self.shape
)

def todense(self):
return cp.asarray(self.tocsr().todense())
Expand Down Expand Up @@ -153,11 +161,16 @@ def todense_masked(self, shape) -> MaskedArray:
return MaskedArray(data, mask)

def lookup(self, indices):
from crossfit.backend.cupy.kernels import _numba_lookup, determine_blocks_threads
from crossfit.backend.cupy.kernels import (
_numba_lookup,
determine_blocks_threads,
)

vals = cp.zeros_like(indices)
blocks, threads = determine_blocks_threads(indices.shape[0])
_numba_lookup[blocks, threads](self.idx_ptr, self.col_idx, self.data, indices, vals)
_numba_lookup[blocks, threads](
self.idx_ptr, self.col_idx, self.data, indices, vals
)
return cp.asarray(vals)

def rank_top_k(self, k=None) -> MaskedArray:
Expand Down
2 changes: 1 addition & 1 deletion crossfit/backend/dask/aggregate.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from functools import partial

import dask.dataframe as dd
from dask.delayed import Delayed
from dask.highlevelgraph import HighLevelGraph
import dask.dataframe as dd

from crossfit.calculate.aggregate import Aggregator
from crossfit.data.dataframe.dispatch import CrossFrame
Expand Down
23 changes: 14 additions & 9 deletions crossfit/backend/dask/cluster.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
from typing import Callable, Optional, Any
from contextvars import ContextVar
import importlib
import gc
import importlib
import warnings
from contextvars import ContextVar
from typing import Any, Callable, Optional

import dask
from dask.distributed import Client, get_client
from dask.dataframe.optimize import optimize as dd_optimize
import distributed
from dask.dataframe.optimize import optimize as dd_optimize
from dask.distributed import Client, get_client

from crossfit.backend.gpu import HAS_GPU


_crossfit_dask_client = ContextVar("_crossfit_dask_client", default="auto")


Expand Down Expand Up @@ -117,7 +116,9 @@ def ensure_optimize_dataframe_graph(ddf=None, dsk=None, keys=None):

if ddf is None:
if dsk is None or keys is None:
raise ValueError("Must specify both `dsk` and `keys` if `ddf` is not supplied.")
raise ValueError(
"Must specify both `dsk` and `keys` if `ddf` is not supplied."
)
dsk = ddf.dask if dsk is None else dsk
keys = ddf.__dask_keys__() if keys is None else keys

Expand Down Expand Up @@ -285,7 +286,9 @@ def _activate(self):

self._active = True
if self._client in ("auto", None):
raise RuntimeError(f"Failed to deploy a new local {self.cluster_type} cluster.")
raise RuntimeError(
f"Failed to deploy a new local {self.cluster_type} cluster."
)

def _deactivate(self):
self._client = set_dask_client(self._initial_client)
Expand Down Expand Up @@ -380,7 +383,9 @@ def __exit__(self, *args):
self.deactivate()


def set_dask_client(client="auto", new_cluster=None, force_new=False, **cluster_options):
def set_dask_client(
client="auto", new_cluster=None, force_new=False, **cluster_options
):
"""Set the Dask-Distributed client.

Parameters
Expand Down
6 changes: 4 additions & 2 deletions crossfit/backend/dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from typing import Callable, List

import dask.dataframe as dd

from crossfit.data.dataframe.core import FrameBackend
from crossfit.data.dataframe.dispatch import CrossFrame


# @CrossFrame.register_lazy("dask")
# def register_dask_backend():
# import dask.dataframe as dd
Expand Down Expand Up @@ -36,7 +36,9 @@ def concat(
ignore_index: bool = False,
axis: int = 0,
):
return CrossFrame(dd.DataFrame.concat(frames, ignore_index=ignore_index, axis=axis))
return CrossFrame(
dd.DataFrame.concat(frames, ignore_index=ignore_index, axis=axis)
)

def aggregate(self, agg, **kwargs):
from crossfit.backend.dask.aggregate import aggregate as dask_aggregate
Expand Down
34 changes: 24 additions & 10 deletions crossfit/backend/numpy/sparse.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import itertools
from crossfit.data.array.masked import MaskedArray

import numba
import numpy as np
import scipy.sparse as sp
import numba

from crossfit.data.sparse.dispatch import CrossSparse
from crossfit.data.array.masked import MaskedArray
from crossfit.data.sparse.core import SparseMatrixBackend
from crossfit.data.sparse.dispatch import CrossSparse


class NPSparseMatrixBackend(SparseMatrixBackend):
def __init__(self, idx_ptr: np.ndarray, col_idx: np.ndarray, data: np.ndarray, shape=None):
def __init__(
self, idx_ptr: np.ndarray, col_idx: np.ndarray, data: np.ndarray, shape=None
):
if shape is None:
if len(col_idx):
M = col_idx.max() + 1
Expand Down Expand Up @@ -48,7 +50,9 @@ def from_matrix(cls, matrix, keep_zeros=False):
if isinstance(matrix, list):
matrix = np.asarray(matrix, dtype=object).astype(np.float32)
matrix = np.atleast_2d(matrix)
if not np.issubdtype(matrix.dtype, np.number) or np.issubdtype(matrix.dtype, np.bool_):
if not np.issubdtype(matrix.dtype, np.number) or np.issubdtype(
matrix.dtype, np.bool_
):
raise ValueError("Input must be numeric")
elif matrix.ndim != 2:
raise ValueError("Input arrays need to be 1D or 2D.")
Expand All @@ -71,7 +75,9 @@ def from_lil(cls, rows, data=None, dtype=np.float32, keep_zeros=False):
rows = [rows]
idx_ptr = np.asarray([0] + [len(x) for x in rows], dtype=int).cumsum()
try:
col_idx = np.fromiter(itertools.chain.from_iterable(rows), dtype=int, count=idx_ptr[-1])
col_idx = np.fromiter(
itertools.chain.from_iterable(rows), dtype=int, count=idx_ptr[-1]
)
if data is None:
data = np.ones_like(col_idx, dtype=dtype)
else:
Expand All @@ -90,7 +96,9 @@ def from_lil(cls, rows, data=None, dtype=np.float32, keep_zeros=False):
return instance

def tocsr(self, copy=False):
return sp.csr_matrix((self.data, self.col_idx, self.idx_ptr), copy=copy, shape=self.shape)
return sp.csr_matrix(
(self.data, self.col_idx, self.idx_ptr), copy=copy, shape=self.shape
)

def todense(self):
return np.asarray(self.tocsr().todense())
Expand All @@ -108,7 +116,9 @@ def eliminate_zeros(self):
def _setop(self, other, mode):
if self.shape[0] != other.shape[0]:
raise ValueError("Matrices need to have the same number of rows!")
_numba_setop(self.idx_ptr, self.col_idx, self.data, other.idx_ptr, other.col_idx, mode)
_numba_setop(
self.idx_ptr, self.col_idx, self.data, other.idx_ptr, other.col_idx, mode
)
self.eliminate_zeros()

def sort(self):
Expand Down Expand Up @@ -192,13 +202,17 @@ def _numba_sort(idx_ptr, col_idx, data):


@numba.njit(parallel=True)
def _numba_setop(self_idx_ptr, self_col_idx, self_data, other_idx_ptr, other_col_idx, intersect):
def _numba_setop(
self_idx_ptr, self_col_idx, self_data, other_idx_ptr, other_col_idx, intersect
):
for i in numba.prange(len(self_idx_ptr) - 1):
ss, se = self_idx_ptr[i], self_idx_ptr[i + 1]
os, oe = other_idx_ptr[i], other_idx_ptr[i + 1]

left_idx = np.searchsorted(other_col_idx[os:oe], self_col_idx[ss:se])
right_idx = np.searchsorted(other_col_idx[os:oe], self_col_idx[ss:se], side="right")
right_idx = np.searchsorted(
other_col_idx[os:oe], self_col_idx[ss:se], side="right"
)
if intersect:
found = left_idx == right_idx
else:
Expand Down
Loading