diff --git a/crossfit/__init__.py b/crossfit/__init__.py index 7ff9648..a3f78f0 100644 --- a/crossfit/__init__.py +++ b/crossfit/__init__.py @@ -9,7 +9,6 @@ from crossfit.metric import * # noqa from crossfit.op import * # noqa - __all__ = [ "Aggregator", "backend", @@ -27,21 +26,27 @@ try: - from crossfit.backend.torch import SentenceTransformerModel, TorchExactSearch, HFModel + from crossfit.backend.torch import ( + HFModel, + SentenceTransformerModel, + TorchExactSearch, + ) + from crossfit.dataset.base import IRDataset, MultiDataset + from crossfit.dataset.load import load_dataset from crossfit.report.beir.embed import embed from crossfit.report.beir.report import beir_report - from crossfit.dataset.load import load_dataset - from crossfit.dataset.base import IRDataset, MultiDataset - - __all__.extend([ - "embed", - "beir_report", - "load_dataset", - "TorchExactSearch", - "SentenceTransformerModel", - "HFModel", - "MultiDataset", - "IRDataset", - ]) + + __all__.extend( + [ + "embed", + "beir_report", + "load_dataset", + "TorchExactSearch", + "SentenceTransformerModel", + "HFModel", + "MultiDataset", + "IRDataset", + ] + ) except ImportError as e: pass diff --git a/crossfit/_version.py b/crossfit/_version.py index 5e96abd..54b1fb2 100644 --- a/crossfit/_version.py +++ b/crossfit/_version.py @@ -357,7 +357,10 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, runner=run_command): if verbose: fmt = "tag '%s' doesn't start with prefix '%s'" print(fmt % (full_tag, tag_prefix)) - pieces["error"] = "tag '%s' doesn't start with prefix '%s'" % (full_tag, tag_prefix) + pieces["error"] = "tag '%s' doesn't start with prefix '%s'" % ( + full_tag, + tag_prefix, + ) return pieces pieces["closest-tag"] = full_tag[len(tag_prefix) :] diff --git a/crossfit/backend/__init__.py b/crossfit/backend/__init__.py index 3370763..2fb3d46 100644 --- a/crossfit/backend/__init__.py +++ b/crossfit/backend/__init__.py @@ -1,9 +1,8 @@ -from crossfit.backend.numpy.sparse import * from crossfit.backend.dask.dataframe import * +from crossfit.backend.numpy.sparse import * from crossfit.backend.pandas.array import * from crossfit.backend.pandas.dataframe import * - try: from crossfit.backend.cudf.array import * from crossfit.backend.cudf.dataframe import * diff --git a/crossfit/backend/cudf/array.py b/crossfit/backend/cudf/array.py index 3b04ab2..fa0029c 100644 --- a/crossfit/backend/cudf/array.py +++ b/crossfit/backend/cudf/array.py @@ -1,7 +1,7 @@ import logging from crossfit.data.array import conversion -from crossfit.data.array.dispatch import np_backend_dispatch, ArrayBackend +from crossfit.data.array.dispatch import ArrayBackend, np_backend_dispatch @np_backend_dispatch.register_lazy("cudf") @@ -28,7 +28,9 @@ def cudf_to_dlpack(input_array: cudf.Series): if input_array.dtype.name == "list": if not input_array.list.len().min() == input_array.list.len().max(): - raise NotImplementedError("Cannot convert list column with variable length") + raise NotImplementedError( + "Cannot convert list column with variable length" + ) dim = input_array.list.len().iloc[0] return input_array.list.leaves.values.reshape(-1, dim).toDlpack() diff --git a/crossfit/backend/cudf/dataframe.py b/crossfit/backend/cudf/dataframe.py index 60801e5..243619b 100644 --- a/crossfit/backend/cudf/dataframe.py +++ b/crossfit/backend/cudf/dataframe.py @@ -1,9 +1,8 @@ from typing import Callable - +from crossfit.backend.pandas.dataframe import PandasDataFrame from crossfit.data.array.dispatch import crossarray from crossfit.data.dataframe.dispatch import CrossFrame -from crossfit.backend.pandas.dataframe import PandasDataFrame class CudfDataFrame(PandasDataFrame): @@ -24,7 +23,9 @@ def apply(self, func: Callable, *args, **kwargs): # Numba-compilation failed pass with crossarray: - return CrossFrame({k: func(v, *args, **kwargs) for k, v in self.data.items()}).cast() + return CrossFrame( + {k: func(v, *args, **kwargs) for k, v in self.data.items()} + ).cast() @CrossFrame.register_lazy("cupy") diff --git a/crossfit/backend/cudf/series.py b/crossfit/backend/cudf/series.py index e406085..6a67f95 100644 --- a/crossfit/backend/cudf/series.py +++ b/crossfit/backend/cudf/series.py @@ -1,5 +1,5 @@ -import cupy as cp import cudf +import cupy as cp from cudf.core.column import as_column @@ -9,7 +9,9 @@ def create_list_series_from_2d_ar(ar, index): """ n_rows, n_cols = ar.shape data = as_column(ar.flatten()) - offset_col = as_column(cp.arange(start=0, stop=len(data) + 1, step=n_cols), dtype="int32") + offset_col = as_column( + cp.arange(start=0, stop=len(data) + 1, step=n_cols), dtype="int32" + ) mask_col = cp.full(shape=n_rows, fill_value=True) mask = cudf._lib.transform.bools_to_mask(as_column(mask_col)) lc = cudf.core.column.ListColumn( diff --git a/crossfit/backend/cupy/array.py b/crossfit/backend/cupy/array.py index 2991561..d81ac80 100644 --- a/crossfit/backend/cupy/array.py +++ b/crossfit/backend/cupy/array.py @@ -1,7 +1,7 @@ import logging from crossfit.data.array import conversion -from crossfit.data.array.dispatch import np_backend_dispatch, ArrayBackend +from crossfit.data.array.dispatch import ArrayBackend, np_backend_dispatch @np_backend_dispatch.register_lazy("cupy") diff --git a/crossfit/backend/cupy/kernels.py b/crossfit/backend/cupy/kernels.py index 338599e..0e2d554 100644 --- a/crossfit/backend/cupy/kernels.py +++ b/crossfit/backend/cupy/kernels.py @@ -66,7 +66,9 @@ def _numba_sort(idx_ptr, col_idx, data): @cuda.jit -def _numba_setop(self_idx_ptr, self_col_idx, self_data, other_idx_ptr, other_col_idx, intersect): +def _numba_setop( + self_idx_ptr, self_col_idx, self_data, other_idx_ptr, other_col_idx, intersect +): i = cuda.grid(1) if i < len(self_idx_ptr) - 1: diff --git a/crossfit/backend/cupy/sparse.py b/crossfit/backend/cupy/sparse.py index 982b7da..49688ef 100644 --- a/crossfit/backend/cupy/sparse.py +++ b/crossfit/backend/cupy/sparse.py @@ -9,7 +9,9 @@ class CPSparseMatrixBackend(SparseMatrixBackend): - def __init__(self, idx_ptr: cp.ndarray, col_idx: cp.ndarray, data: cp.ndarray, shape=None): + def __init__( + self, idx_ptr: cp.ndarray, col_idx: cp.ndarray, data: cp.ndarray, shape=None + ): if shape is None: if len(col_idx): M = col_idx.max() + 1 @@ -49,7 +51,9 @@ def from_matrix(cls, matrix, keep_zeros=False): if isinstance(matrix, list): matrix = cp.asarray(matrix, dtype=object).astype(cp.float32) matrix = cp.atleast_2d(matrix) - if not cp.issubdtype(matrix.dtype, cp.number) or cp.issubdtype(matrix.dtype, cp.bool_): + if not cp.issubdtype(matrix.dtype, cp.number) or cp.issubdtype( + matrix.dtype, cp.bool_ + ): raise ValueError("Input must be numeric") elif matrix.ndim != 2: raise ValueError("Input arrays need to be 1D or 2D.") @@ -79,7 +83,9 @@ def from_lil(cls, rows, data=None, dtype=cp.float32, keep_zeros=False): data = cp.ones_like(col_idx, dtype=dtype) else: data = cp.fromiter( - itertools.chain.from_iterable(data), dtype=dtype, count=idx_ptr[-1].item() + itertools.chain.from_iterable(data), + dtype=dtype, + count=idx_ptr[-1].item(), ) if keep_zeros: data += 1 - data[cp.isfinite(data)].min() @@ -93,7 +99,9 @@ def from_lil(cls, rows, data=None, dtype=cp.float32, keep_zeros=False): return instance def tocsr(self, copy=False): - return sp.csr_matrix((self.data, self.col_idx, self.idx_ptr), copy=copy, shape=self.shape) + return sp.csr_matrix( + (self.data, self.col_idx, self.idx_ptr), copy=copy, shape=self.shape + ) def todense(self): return cp.asarray(self.tocsr().todense()) @@ -153,11 +161,16 @@ def todense_masked(self, shape) -> MaskedArray: return MaskedArray(data, mask) def lookup(self, indices): - from crossfit.backend.cupy.kernels import _numba_lookup, determine_blocks_threads + from crossfit.backend.cupy.kernels import ( + _numba_lookup, + determine_blocks_threads, + ) vals = cp.zeros_like(indices) blocks, threads = determine_blocks_threads(indices.shape[0]) - _numba_lookup[blocks, threads](self.idx_ptr, self.col_idx, self.data, indices, vals) + _numba_lookup[blocks, threads]( + self.idx_ptr, self.col_idx, self.data, indices, vals + ) return cp.asarray(vals) def rank_top_k(self, k=None) -> MaskedArray: diff --git a/crossfit/backend/dask/aggregate.py b/crossfit/backend/dask/aggregate.py index 0d0d782..aa211b1 100644 --- a/crossfit/backend/dask/aggregate.py +++ b/crossfit/backend/dask/aggregate.py @@ -1,8 +1,8 @@ from functools import partial +import dask.dataframe as dd from dask.delayed import Delayed from dask.highlevelgraph import HighLevelGraph -import dask.dataframe as dd from crossfit.calculate.aggregate import Aggregator from crossfit.data.dataframe.dispatch import CrossFrame diff --git a/crossfit/backend/dask/cluster.py b/crossfit/backend/dask/cluster.py index ae7d9f6..bf4e6e8 100644 --- a/crossfit/backend/dask/cluster.py +++ b/crossfit/backend/dask/cluster.py @@ -1,17 +1,16 @@ -from typing import Callable, Optional, Any -from contextvars import ContextVar -import importlib import gc +import importlib import warnings +from contextvars import ContextVar +from typing import Any, Callable, Optional import dask -from dask.distributed import Client, get_client -from dask.dataframe.optimize import optimize as dd_optimize import distributed +from dask.dataframe.optimize import optimize as dd_optimize +from dask.distributed import Client, get_client from crossfit.backend.gpu import HAS_GPU - _crossfit_dask_client = ContextVar("_crossfit_dask_client", default="auto") @@ -117,7 +116,9 @@ def ensure_optimize_dataframe_graph(ddf=None, dsk=None, keys=None): if ddf is None: if dsk is None or keys is None: - raise ValueError("Must specify both `dsk` and `keys` if `ddf` is not supplied.") + raise ValueError( + "Must specify both `dsk` and `keys` if `ddf` is not supplied." + ) dsk = ddf.dask if dsk is None else dsk keys = ddf.__dask_keys__() if keys is None else keys @@ -285,7 +286,9 @@ def _activate(self): self._active = True if self._client in ("auto", None): - raise RuntimeError(f"Failed to deploy a new local {self.cluster_type} cluster.") + raise RuntimeError( + f"Failed to deploy a new local {self.cluster_type} cluster." + ) def _deactivate(self): self._client = set_dask_client(self._initial_client) @@ -380,7 +383,9 @@ def __exit__(self, *args): self.deactivate() -def set_dask_client(client="auto", new_cluster=None, force_new=False, **cluster_options): +def set_dask_client( + client="auto", new_cluster=None, force_new=False, **cluster_options +): """Set the Dask-Distributed client. Parameters diff --git a/crossfit/backend/dask/dataframe.py b/crossfit/backend/dask/dataframe.py index 4024301..ca9a3b2 100644 --- a/crossfit/backend/dask/dataframe.py +++ b/crossfit/backend/dask/dataframe.py @@ -3,10 +3,10 @@ from typing import Callable, List import dask.dataframe as dd + from crossfit.data.dataframe.core import FrameBackend from crossfit.data.dataframe.dispatch import CrossFrame - # @CrossFrame.register_lazy("dask") # def register_dask_backend(): # import dask.dataframe as dd @@ -36,7 +36,9 @@ def concat( ignore_index: bool = False, axis: int = 0, ): - return CrossFrame(dd.DataFrame.concat(frames, ignore_index=ignore_index, axis=axis)) + return CrossFrame( + dd.DataFrame.concat(frames, ignore_index=ignore_index, axis=axis) + ) def aggregate(self, agg, **kwargs): from crossfit.backend.dask.aggregate import aggregate as dask_aggregate diff --git a/crossfit/backend/numpy/sparse.py b/crossfit/backend/numpy/sparse.py index e71f606..7f4ca1e 100644 --- a/crossfit/backend/numpy/sparse.py +++ b/crossfit/backend/numpy/sparse.py @@ -1,16 +1,18 @@ import itertools -from crossfit.data.array.masked import MaskedArray +import numba import numpy as np import scipy.sparse as sp -import numba -from crossfit.data.sparse.dispatch import CrossSparse +from crossfit.data.array.masked import MaskedArray from crossfit.data.sparse.core import SparseMatrixBackend +from crossfit.data.sparse.dispatch import CrossSparse class NPSparseMatrixBackend(SparseMatrixBackend): - def __init__(self, idx_ptr: np.ndarray, col_idx: np.ndarray, data: np.ndarray, shape=None): + def __init__( + self, idx_ptr: np.ndarray, col_idx: np.ndarray, data: np.ndarray, shape=None + ): if shape is None: if len(col_idx): M = col_idx.max() + 1 @@ -48,7 +50,9 @@ def from_matrix(cls, matrix, keep_zeros=False): if isinstance(matrix, list): matrix = np.asarray(matrix, dtype=object).astype(np.float32) matrix = np.atleast_2d(matrix) - if not np.issubdtype(matrix.dtype, np.number) or np.issubdtype(matrix.dtype, np.bool_): + if not np.issubdtype(matrix.dtype, np.number) or np.issubdtype( + matrix.dtype, np.bool_ + ): raise ValueError("Input must be numeric") elif matrix.ndim != 2: raise ValueError("Input arrays need to be 1D or 2D.") @@ -71,7 +75,9 @@ def from_lil(cls, rows, data=None, dtype=np.float32, keep_zeros=False): rows = [rows] idx_ptr = np.asarray([0] + [len(x) for x in rows], dtype=int).cumsum() try: - col_idx = np.fromiter(itertools.chain.from_iterable(rows), dtype=int, count=idx_ptr[-1]) + col_idx = np.fromiter( + itertools.chain.from_iterable(rows), dtype=int, count=idx_ptr[-1] + ) if data is None: data = np.ones_like(col_idx, dtype=dtype) else: @@ -90,7 +96,9 @@ def from_lil(cls, rows, data=None, dtype=np.float32, keep_zeros=False): return instance def tocsr(self, copy=False): - return sp.csr_matrix((self.data, self.col_idx, self.idx_ptr), copy=copy, shape=self.shape) + return sp.csr_matrix( + (self.data, self.col_idx, self.idx_ptr), copy=copy, shape=self.shape + ) def todense(self): return np.asarray(self.tocsr().todense()) @@ -108,7 +116,9 @@ def eliminate_zeros(self): def _setop(self, other, mode): if self.shape[0] != other.shape[0]: raise ValueError("Matrices need to have the same number of rows!") - _numba_setop(self.idx_ptr, self.col_idx, self.data, other.idx_ptr, other.col_idx, mode) + _numba_setop( + self.idx_ptr, self.col_idx, self.data, other.idx_ptr, other.col_idx, mode + ) self.eliminate_zeros() def sort(self): @@ -192,13 +202,17 @@ def _numba_sort(idx_ptr, col_idx, data): @numba.njit(parallel=True) -def _numba_setop(self_idx_ptr, self_col_idx, self_data, other_idx_ptr, other_col_idx, intersect): +def _numba_setop( + self_idx_ptr, self_col_idx, self_data, other_idx_ptr, other_col_idx, intersect +): for i in numba.prange(len(self_idx_ptr) - 1): ss, se = self_idx_ptr[i], self_idx_ptr[i + 1] os, oe = other_idx_ptr[i], other_idx_ptr[i + 1] left_idx = np.searchsorted(other_col_idx[os:oe], self_col_idx[ss:se]) - right_idx = np.searchsorted(other_col_idx[os:oe], self_col_idx[ss:se], side="right") + right_idx = np.searchsorted( + other_col_idx[os:oe], self_col_idx[ss:se], side="right" + ) if intersect: found = left_idx == right_idx else: diff --git a/crossfit/backend/pandas/dataframe.py b/crossfit/backend/pandas/dataframe.py index 6d48016..8937d72 100644 --- a/crossfit/backend/pandas/dataframe.py +++ b/crossfit/backend/pandas/dataframe.py @@ -103,8 +103,8 @@ def groupby_indices(self, by: list) -> dict: @CrossFrame.register_lazy("numpy") def register_numpy_backend(): try: - import pandas as pd import numpy as np + import pandas as pd @CrossFrame.register(np.ndarray) def _numpy_to_pandas(data, name="data"): diff --git a/crossfit/backend/torch/__init__.py b/crossfit/backend/torch/__init__.py index 93e1f9f..b465973 100644 --- a/crossfit/backend/torch/__init__.py +++ b/crossfit/backend/torch/__init__.py @@ -1,4 +1,20 @@ from crossfit.backend.torch.hf.model import HFModel, SentenceTransformerModel +from crossfit.backend.torch.loader import InMemoryLoader, SortedSeqLoader from crossfit.backend.torch.op.vector_search import TorchExactSearch -__all__ = ["HFModel", "SentenceTransformerModel", "TorchExactSearch"] +__all__ = [ + "HFModel", + "InMemoryLoader", + "SentenceTransformerModel", + "SortedSeqLoader", + "TorchExactSearch", +] + + +try: + from crossfit.backend.torch.currated.generate import CuratedGenerator + from crossfit.backend.torch.currated.tokenize import CurratedTokenizer + + __all__ += ["CuratedGenerator", "CurratedTokenizer"] +except ImportError: + pass diff --git a/crossfit/backend/torch/array.py b/crossfit/backend/torch/array.py index 5322770..508a05c 100644 --- a/crossfit/backend/torch/array.py +++ b/crossfit/backend/torch/array.py @@ -1,7 +1,7 @@ import logging from crossfit.data.array import conversion -from crossfit.data.array.dispatch import np_backend_dispatch, ArrayBackend +from crossfit.data.array.dispatch import ArrayBackend, np_backend_dispatch try: import torch diff --git a/crossfit/backend/torch/currated/__init__.py b/crossfit/backend/torch/currated/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/crossfit/backend/torch/currated/generate.py b/crossfit/backend/torch/currated/generate.py new file mode 100644 index 0000000..9160696 --- /dev/null +++ b/crossfit/backend/torch/currated/generate.py @@ -0,0 +1,582 @@ +import dataclasses +from dataclasses import dataclass +from typing import Generic, List, Optional, Tuple + +import torch +import torch.nn.functional as F +from curated_transformers.generation.auto_generator import AutoGenerator +from curated_transformers.generation.config import ( + GreedyGeneratorConfig, + SampleGeneratorConfig, +) +from curated_transformers.generation.generator import Generator +from curated_transformers.generation.stop_conditions import StopCondition +from curated_transformers.generation.string_generator import StringGenerator +from curated_transformers.layers.attention import AttentionMask +from curated_transformers.layers.cache import KeyValueCache +from curated_transformers.models.output import CacheT + +# from curated_transformers.layers.attention import enable_torch_sdp +from curated_transformers.quantization.bnb import BitsAndBytesConfig +from tqdm import tqdm + +from crossfit.backend.torch.loader import InMemoryLoader +from crossfit.op.base import Op + + +class CuratedGenerator(Op): + def __init__( + self, + model_name: str, + config=GreedyGeneratorConfig(), + output_col: str = "text", + batch_size: int = 32, + batch_steps: int = 20, + quantization_config=BitsAndBytesConfig.for_4bit(), + pre=None, + cols=False, + keep_cols=None, + ): + super().__init__(pre=pre, cols=cols, keep_cols=keep_cols) + self.model_name = model_name + self.config = config + self.quantization_config = quantization_config + self.output_col = output_col + self.batch_size = batch_size + self.batch_steps = batch_steps + + def setup(self): + self.generator = AutoGenerator.from_hf_hub( + name=self.model_name, + device=torch.device("cuda"), + quantization_config=self.quantization_config, + ) + self.config = get_config(self.generator, self.config) + + def call(self, data): + generator = PartitionGenerator( + self.generator.generator.tokenizer, + BatchGenerator(self.generator.generator.inner.model), + ) + outputs = generator( + data, + config=self.config, + batch_size=self.batch_size, + max_steps=self.batch_steps, + ) + + output = self.create_df() + output[self.output_col] = self.create_series(outputs) + + return output + + def meta(self): + return {self.output_col: "str"} + + +class BatchGenerator(Generator): + def generate_df( + self, + *, + attention_mask, + ids, + config, + index=None, + generated_ids=None, + cache=None, + max_steps=10, + ): + self.model.eval() + + logits_transform = config.logits_transform() + stop_condition = config.stop_condition() + if isinstance(config, GreedyGeneratorConfig): + generation_step = self._decode_greedy + elif isinstance(config, SampleGeneratorConfig): + generation_step = self._decode_sample + else: + raise ValueError( + f"Unknown generator configuration: {type(config).__name__}" + ) + + state = CuDFGeneratorState( + attention_mask=attention_mask, cache=cache, prompt_ids=ids, index=index + ) + if generated_ids is not None: + state.generated_ids = generated_ids + + state.tokenizer = self.tokenizer + + for i in range(max_steps): + if i == 0 and cache is None and generated_ids is not None: + ids = torch.concat([state.prompt_ids, state.generated_ids], 1) + else: + ids = state.last_step_ids + + with torch.no_grad(): + try: + output = self.model( + ids, + attention_mask=state.attention_mask, + cache=state.cache, + store_cache=True, + positions=state.positions, + ) + except Exception as e: + print( + ids.shape, + state.attention_mask.bool_mask.shape, + state.cache[0].key.shape, + ) + raise e + + state.step( + cache=output.cache, + predicted_ids=generation_step(logits_transform, output), + stop_condition=stop_condition, + ) + + if not state.is_finished: + yield False, state + else: + return True, state + + +class PartitionCache: + def __init__(self, *cache, device="cpu"): + self.cache = list(cache) + self.device = device + + def add(self, batch_cache, index): + cache = [] + for c in batch_cache: + cache.append(KeyValueCache(c.key.to(self.device), c.value.to(self.device))) + + self.cache.append(cache) + + return self + + def pop(self, num: int, max_seq_len: Optional[int] = None, device="cuda"): + if not self.cache: + return None + + outputs = [] + + i = 0 + while i < num: + current = self.cache.pop() + batch_size = current[0].key.size(0) + + if batch_size == 0: + continue + + if i + batch_size <= num: + outputs.append( + [ + KeyValueCache(key=c.key.to(device), value=c.value.to(device)) + for c in current + ] + ) + i += batch_size + else: + left, right = [], [] + + for c in current: + left.append( + KeyValueCache( + key=c.key[: num - i].to(device), + value=c.value[: num - i].to(device), + ) + ) + right.append( + KeyValueCache(key=c.key[num - i :], value=c.value[num - i :]) + ) + i = num + outputs.append(left) + if right[0].key.size(0) > 0: + self.cache.append(right) + + pad_kwargs = dict(dim=2, min_length=max_seq_len - 1) + if len(outputs) == 1: + output = outputs[0] + else: + output = [] + for i in range(len(outputs[0])): + output.append( + KeyValueCache( + key=pad_and_stack([o[i].key for o in outputs], **pad_kwargs), + value=pad_and_stack( + [o[i].value for o in outputs], **pad_kwargs + ), + ) + ) + + if max_seq_len: + output = [ + KeyValueCache( + key=pad_and_stack( + [c.key[:, :, : max_seq_len - 1, :]], **pad_kwargs + ), + value=pad_and_stack( + [c.value[:, :, : max_seq_len - 1, :]], **pad_kwargs + ), + ) + for c in output + ] + + return output + + +class PartitionGenerator(StringGenerator): + def __call__(self, df, config, batch_size=1024, max_steps=10): + return self.generate( + df, config=config, batch_size=batch_size, max_steps=max_steps + ) + + def generate(self, df, config, batch_size=1024, max_steps=10): + """ + Generate text using the given prompts. + + :param prompts: + Prompts to generate from. + :param config: + Generator configuraton. + :returns: + Strings generated for the prompts. + """ + completed_ids, completed_seq_ids = [], [] + self.inner.tokenizer = self.tokenizer + num_samples = df["input_ids"].size + i, n_tokens_generated, n_samples_done = 0, 0, 0 + it, n_batches = 0, 0 + + if "index" not in df.columns: + df["index"] = df.index + partition = df[["input_ids", "attention_mask", "index"]] + + current_cache, next_cache = PartitionCache(), PartitionCache() + + with tqdm(total=num_samples, desc="Generating...", dynamic_ncols=True) as pbar: + while n_samples_done < num_samples: + results_list: List[Results] = [] + for batch in InMemoryLoader(partition, batch_size=batch_size): + ids = batch["input_ids"] + _batch_size = ids.size(0) + + max_seq_len = (ids != 0).sum(axis=1).max() + ids = ids[:, :max_seq_len] + mask = batch["attention_mask"].to(torch.bool)[:, :max_seq_len] + attention_mask = AttentionMask(mask) + cache = current_cache.pop(_batch_size, max_seq_len=max_seq_len) + + for generation_step in self.inner.generate_df( + ids=ids, + attention_mask=attention_mask, + config=config, + generated_ids=batch.get("generated_ids"), + index=batch["index"], + cache=cache, + max_steps=max_steps, + ): + n_tokens_generated += _batch_size + _, state = generation_step + pbar.set_postfix( + { + "token": n_tokens_generated, + "batch": n_batches, + "it": it, + }, + refresh=True, + ) + + results: Results = state.results() + if state.completed_ids is not None: + _num_samples_done = state.completed_ids.size(0) + pbar.update(_num_samples_done) + n_samples_done += _num_samples_done + completed_ids.append(results.completed_ids) + completed_seq_ids.append(results.completed_seq_ids) + if results.prompt_ids is not None: + results_list.append(results) + + i += batch["input_ids"].size(0) + n_batches += 1 + + if state.cache is not None: + next_cache.add(state.cache, results.seq_ids) + + pbar.set_postfix( + { + "token": n_tokens_generated, + "batch": n_batches, + "it": it, + }, + refresh=True, + ) + + if results_list: + partition = { + "input_ids": pad_and_stack( + [r.prompt_ids for r in results_list] + ), + "generated_ids": pad_and_stack( + [r.generated_ids for r in results_list] + ), + "attention_mask": pad_and_stack( + [r.masks for r in results_list] + ), + "index": torch.concat([r.seq_ids for r in results_list]), + } + it += 1 + pbar.set_postfix( + { + "token": n_tokens_generated, + "batch": n_batches, + "it": it, + }, + refresh=True, + ) + + current_cache = next_cache + next_cache = PartitionCache() + + tokens = pad_and_stack(completed_ids) + text = self.tokenizer.decode(tokens.tolist()) + + return text + + +def get_config(generator, config): + eos_id = generator.default_config.eos_id if config.eos_id is None else config.eos_id + max_generated_pieces = ( + generator.default_config.max_generated_pieces + if config.max_generated_pieces is None + else config.max_generated_pieces + ) + config = dataclasses.replace( + config, eos_id=eos_id, max_generated_pieces=max_generated_pieces + ) + + return config + + +@dataclass +class Results: + completed_ids: torch.Tensor + completed_seq_ids: torch.Tensor + prompt_ids: Optional[torch.Tensor] + generated_ids: Optional[torch.Tensor] + seq_ids: Optional[torch.Tensor] + masks: Optional[torch.Tensor] + + +class CuDFGeneratorState(Generic[CacheT]): + """ + Stores the state of the generation process and tracks + the sequences being generated. + """ + + attention_mask: AttentionMask + cache: Optional[List[CacheT]] + positions: torch.Tensor + seq_ids: torch.Tensor + prompt_ids: torch.Tensor + generated_ids: torch.Tensor + + def __init__( + self, + *, + attention_mask: AttentionMask, + cache: Optional[List[CacheT]], + prompt_ids: torch.Tensor, + index=None, + ) -> None: + """ + Construct a generator state. + + :param attention_mask: + Attention mask for the prompts. + :param cache: + Transformer model cache. + :param prompt_ids: + Batch of prompts. + + *Shape:* ``(batch_size, seq_len)`` + """ + device = prompt_ids.device + self.attention_mask = attention_mask + self.positions = attention_mask.bool_mask.int().cumsum(-1) - 1 + self.cache = cache + + if cache is not None: + self.positions = self.positions.max(-1, keepdim=True).values + 1 + + self.index = index + self.seq_ids = torch.arange(0, self.attention_mask.shape[0], device=device) + self.prompt_ids = prompt_ids + self.generated_ids = torch.zeros( + (prompt_ids.size(0), 0), dtype=prompt_ids.dtype, device=device + ) + self.completed_ids, self.completed_seq_ids = None, None + + @property + def is_finished(self): + """ + Whether all sequences have finished generating. + + :returns: + ``True`` iff all sequences have finished generating. + """ + return len(self.seq_ids) == 0 + + @property + def last_step_ids(self) -> torch.Tensor: + """ + Identifiers generated in the last step. + + :returns: + Generated identifiers. Prompt identifiers are returned + when the generator has not stepped yet. + """ + if not self.generated_ids.size(1): + return self.prompt_ids + + return self.generated_ids[:, -1:] + + def step( + self, + *, + cache: List[CacheT], + predicted_ids: torch.Tensor, + stop_condition: StopCondition, + ) -> Tuple[torch.Tensor, torch.Tensor]: + """ + Step the generation state. + + :param cache: + Model cache from the last model call. + :param generated_ids: + Tensor containing generated IDs. + + *Shape:* ``(batch_size, 1)`` + :param stop_condition: + Generation stop condition. + :returns: + Sequence identifiers and piece IDs. + + *Shape:* ``(batch_size), (batch_size, 1)`` + """ + # We update the state before removing completed sequences, so that + # stopping conditions get a consistent view. + self.cache = cache + self.generated_ids = torch.concat([self.generated_ids, predicted_ids], 1) + self.attention_mask = self.attention_mask.extend_length( + count=1, fill_value=True + ) + self.positions = self.positions.max(-1, keepdim=True).values + 1 + + # Determine which sequences are done generating and remove them. + completed_exclude = torch.zeros_like(predicted_ids, dtype=torch.bool) + completed_include = torch.zeros_like(predicted_ids, dtype=torch.bool) + stop_condition.update_completed( + state=self, + completed_exclude=completed_exclude, + completed_include=completed_include, + ) + + self._remove_completed((completed_exclude ^ completed_include).view(-1)) + + # return seq_ids, last_step_ids + + def _remove_completed(self, completed: torch.Tensor): + """ + Remove completed sequences. + + :param completed: + Tensor indicating for the active sequences whether they are completed. + + :meta private: + """ + if torch.any(completed).item(): + if self.completed_ids is None: + self.completed_ids = self.generated_ids[completed] + self.completed_seq_ids = self.seq_ids[completed] + else: + self.completed_ids = pad_and_stack( + [self.completed_ids, self.generated_ids[completed]] + ) + self.completed_seq_ids = torch.concat( + [self.completed_seq_ids, self.seq_ids[completed]] + ) + + not_completed = completed.logical_not() + self.generated_ids = self.generated_ids[not_completed] + self.attention_mask = self.attention_mask.filter_batch_items(not_completed) + if self.cache is not None: + self.cache = [ + layer_cache.filter_batch_items(not_completed) + for layer_cache in self.cache + ] + self.prompt_ids = self.prompt_ids[not_completed] + self.positions = self.positions[not_completed] + self.seq_ids = self.seq_ids[not_completed] + + def get_ids(self, seq_ids): + return self.index[seq_ids] if self.index is not None else seq_ids + + def results(self) -> Results: + prompt_ids, generated_ids, seq_ids, masks, completed_seq_ids = ( + None, + None, + None, + None, + None, + ) + if not self.is_finished: + prompt_ids = self.prompt_ids + if self.generated_ids.size(1): + generated_ids = self.generated_ids + masks = self.attention_mask.bool_mask.reshape(generated_ids.size(0), -1) + seq_ids = self.get_ids(self.seq_ids) + + completed_ids = self.completed_ids + if completed_ids is not None: + completed_seq_ids = self.get_ids(self.completed_seq_ids) + + return Results( + completed_ids=completed_ids, + completed_seq_ids=completed_seq_ids, + prompt_ids=prompt_ids, + generated_ids=generated_ids, + seq_ids=seq_ids, + masks=masks, + ) + + +def pad_and_stack(tensor_list, dim=1, min_length=None): + # Find the maximum size along the specified dimension + max_size = max(tensor.shape[dim] for tensor in tensor_list) + max_size = max_size if min_length is None else max(max_size, min_length) + + # Initialize a list to store the padded tensors + padded_tensors = [] + + for tensor in tensor_list: + # Calculate the amount of padding needed for this tensor + padding_needed = max_size - tensor.shape[dim] + + # Build the padding tuple + # Here, we pad the 3rd dimension (0-based index 2) + # with 0s on the left and padding_needed on the right + pad = tuple( + 0 if i != dim else padding_needed for i in range(len(tensor.shape) * 2) + ) + + # Apply the padding + padded_tensor = F.pad(tensor, pad=pad, mode="constant", value=0) + + # Add the padded tensor to the list + padded_tensors.append(padded_tensor) + + # Stack along the specified dimension + return torch.cat(padded_tensors, dim=0) diff --git a/crossfit/backend/torch/currated/tokenize.py b/crossfit/backend/torch/currated/tokenize.py new file mode 100644 index 0000000..51498a3 --- /dev/null +++ b/crossfit/backend/torch/currated/tokenize.py @@ -0,0 +1,79 @@ +from typing import Optional + +import cudf +import cupy as cp +import torch +from curated_transformers.tokenizers import Tokenizer, AutoTokenizer + +from crossfit.backend.cudf.series import create_list_series_from_2d_ar +from crossfit.data.array.conversion import convert_array +from crossfit.op.tokenize import _TokenizerOp, clip_tokens + + +class CurratedTokenizer(_TokenizerOp): + def __init__( + self, + tokenizer: Tokenizer, + cols=None, + keep_cols=None, + pre=None, + pad_left=False, + max_length: Optional[int] = None, + ): + super().__init__(pre=pre, cols=cols, keep_cols=keep_cols) + self.tokenizer = tokenizer + self.max_length = max_length + self.pad_left = pad_left + + @classmethod + def from_hf_hub( + cls, + *, + name: str, + cols=None, + revision: str = "main", + ) -> "CurratedTokenizer": + try: + tokenizer = AutoTokenizer.from_hf_hub(name=name, revision=revision) + except Exception as e: + if "llama" in name: + raise Exception( + "Couldn't access model, make sure you are logged in + have access to the model" + + "Find instructions at: https://huggingface.co/meta-llama/Llama-2-7b-hf" + ) + + raise e + + return cls(tokenizer, cols=cols) + + def tokenize_strings(self, sentences): + pieces = self.tokenizer(sentences.to_arrow().to_pylist()) + ids = pieces.padded_tensor(device="cuda", pad_left=self.pad_left) + mask = pieces.attention_mask(device="cuda", pad_left=self.pad_left) + attention_mask = mask.bool_mask.to(torch.int8) + + return { + "input_ids": convert_array(ids, cp.ndarray), + "attention_mask": convert_array(attention_mask, cp.ndarray), + } + + def call_column(self, data): + if isinstance(data, cudf.DataFrame): + raise ValueError( + "data must be a Series, got DataFrame. Add a pre step to convert to Series" + ) + + text = data + tokenized_data = self.tokenize_strings(text) + if self.max_length: + tokenized_data = clip_tokens(tokenized_data, max_length=self.max_length) + + input_ids = create_list_series_from_2d_ar( + tokenized_data["input_ids"].astype("int32"), data.index + ) + attention_mask = create_list_series_from_2d_ar( + tokenized_data["attention_mask"].astype("int32").reshape(len(data), -1), + data.index, + ) + + return input_ids, attention_mask diff --git a/crossfit/backend/torch/hf/model.py b/crossfit/backend/torch/hf/model.py index 5c09376..bfb0036 100644 --- a/crossfit/backend/torch/hf/model.py +++ b/crossfit/backend/torch/hf/model.py @@ -1,15 +1,16 @@ -from functools import lru_cache import gc import os -from crossfit.dataset.home import CF_HOME -import joblib +from functools import lru_cache +import joblib import numpy as np import torch +from sklearn.linear_model import LinearRegression from tqdm import tqdm from transformers import AutoConfig, AutoModel, AutoTokenizer -from sklearn.linear_model import LinearRegression + from crossfit.backend.torch.model import Model +from crossfit.dataset.home import CF_HOME class HFModel(Model): @@ -62,13 +63,19 @@ def fit_memory_estimate_curve(self, model=None): torch.cuda.reset_peak_memory_stats() batch = { - "input_ids": torch.randint(1, 501, (batch_size, seq_len)).to(device=device), - "attention_mask": torch.ones((batch_size, seq_len)).to(device=device), + "input_ids": torch.randint(1, 501, (batch_size, seq_len)).to( + device=device + ), + "attention_mask": torch.ones((batch_size, seq_len)).to( + device=device + ), } try: outputs = model(batch) - memory_used = torch.cuda.max_memory_allocated() / (1024**2) # Convert to MB + memory_used = torch.cuda.max_memory_allocated() / ( + 1024**2 + ) # Convert to MB X.append([batch_size, seq_len, seq_len**2]) y.append(memory_used) diff --git a/crossfit/backend/torch/loader.py b/crossfit/backend/torch/loader.py index 9edbc89..87b90d4 100644 --- a/crossfit/backend/torch/loader.py +++ b/crossfit/backend/torch/loader.py @@ -1,12 +1,12 @@ -from typing import Dict, overload from itertools import islice +from typing import Dict, overload import torch from crossfit.backend.torch.model import Model -from crossfit.data.dataframe.dispatch import CrossFrame -from crossfit.data.array.dispatch import crossarray from crossfit.data.array.conversion import convert_array +from crossfit.data.array.dispatch import crossarray +from crossfit.data.dataframe.dispatch import CrossFrame class InMemoryLoader: @@ -51,6 +51,7 @@ def __next__(self): batch = { key: val[self.current_idx : end] for key, val in self.tensor_dict.items() } + if self.max_seq_len is not None: batch = {key: val[:, : self.max_seq_len] for key, val in batch.items()} diff --git a/crossfit/backend/torch/op/embed.py b/crossfit/backend/torch/op/embed.py index 09cdce6..eef9673 100644 --- a/crossfit/backend/torch/op/embed.py +++ b/crossfit/backend/torch/op/embed.py @@ -1,13 +1,13 @@ import gc -import cupy as cp import cudf +import cupy as cp import torch -from crossfit.op.base import Op from crossfit.backend.cudf.series import create_list_series_from_2d_ar +from crossfit.backend.torch.loader import InMemoryLoader, SortedSeqLoader from crossfit.backend.torch.model import Model -from crossfit.backend.torch.loader import SortedSeqLoader, InMemoryLoader +from crossfit.op.base import Op class Embedder(Op): diff --git a/crossfit/backend/torch/op/vector_search.py b/crossfit/backend/torch/op/vector_search.py index a2422ed..ac86aaf 100644 --- a/crossfit/backend/torch/op/vector_search.py +++ b/crossfit/backend/torch/op/vector_search.py @@ -21,7 +21,10 @@ def __init__( self.embedding_col = embedding_col self.normalize = False self.score_functions = {"cos_sim": utils.cos_sim, "dot": utils.dot_score} - self.score_function_desc = {"cos_sim": "Cosine Similarity", "dot": "Dot Product"} + self.score_function_desc = { + "cos_sim": "Cosine Similarity", + "dot": "Dot Product", + } def search_tensors(self, queries, corpus): queries = convert_array(queries, torch.Tensor) diff --git a/crossfit/calculate/aggregate.py b/crossfit/calculate/aggregate.py index ef9abca..8140605 100644 --- a/crossfit/calculate/aggregate.py +++ b/crossfit/calculate/aggregate.py @@ -3,8 +3,9 @@ from functools import wraps import numpy as np -from crossfit.data.dataframe.core import FrameBackend + from crossfit.data.array.conversion import convert_array +from crossfit.data.dataframe.core import FrameBackend def pre_processing(func): @@ -144,7 +145,9 @@ def present(self, state, to_frame=True): if isinstance(keys[0], str): # return pd.DataFrame(present_dict) - result = {k: convert_array(v, np.ndarray) for k, v in present_dict.items()} + result = { + k: convert_array(v, np.ndarray) for k, v in present_dict.items() + } return pd.DataFrame.from_dict(result, orient="index").T @@ -153,7 +156,11 @@ def present(self, state, to_frame=True): if columns and groupings != {None}: for k, v in present_dict.items(): - grouping = "&".join(k.grouping) if isinstance(k.grouping, tuple) else k.grouping + grouping = ( + "&".join(k.grouping) + if isinstance(k.grouping, tuple) + else k.grouping + ) if isinstance(k.group, tuple): if len(k.group) > 1: group = "&".join([str(i) for i in k.group]) @@ -170,7 +177,9 @@ def present(self, state, to_frame=True): ) else: new[(grouping, group, k.column)].update({k.name: v}) - index = pd.MultiIndex.from_tuples(new.keys(), names=("grouping", "group", "column")) + index = pd.MultiIndex.from_tuples( + new.keys(), names=("grouping", "group", "column") + ) output = pd.DataFrame.from_records(list(new.values()), index=index) if columns == {None}: diff --git a/crossfit/calculate/module.py b/crossfit/calculate/module.py index 1276e53..a28bf9e 100644 --- a/crossfit/calculate/module.py +++ b/crossfit/calculate/module.py @@ -1,7 +1,6 @@ -from typing import Dict, List -from dataclasses import field, MISSING, Field from copy import deepcopy - +from dataclasses import MISSING, Field, field +from typing import Dict, List from typing_utils import get_origin diff --git a/crossfit/data/__init__.py b/crossfit/data/__init__.py index 07175b2..0bb6275 100644 --- a/crossfit/data/__init__.py +++ b/crossfit/data/__init__.py @@ -1,15 +1,13 @@ +from crossfit.data.array import conversion +from crossfit.data.array.conversion import convert_array from crossfit.data.array.dispatch import ( - crossarray, - numpy, ArrayBackend, + crossarray, np_backend_dispatch, + numpy, ) -from crossfit.data.array import conversion -from crossfit.data.array.conversion import convert_array - -from crossfit.data.dataframe.dispatch import CrossFrame from crossfit.data.dataframe.core import FrameBackend - +from crossfit.data.dataframe.dispatch import CrossFrame __all__ = [ "crossarray", diff --git a/crossfit/data/array/conversion.py b/crossfit/data/array/conversion.py index 827c9ad..74d3f74 100644 --- a/crossfit/data/array/conversion.py +++ b/crossfit/data/array/conversion.py @@ -1,5 +1,5 @@ -from typing import Any, Type, TypeVar from itertools import product +from typing import Any, Type, TypeVar import numpy as np from dask.utils import Dispatch @@ -113,7 +113,9 @@ def supports(self): to_types = dispatch_utils.supports(to_) if from_types and to_types: - types = [t for t in set(product(from_types, to_types)) if len(set(t)) > 1] + types = [ + t for t in set(product(from_types, to_types)) if len(set(t)) > 1 + ] for from_t, to_t in types: if from_t in conversions: diff --git a/crossfit/data/array/decorator.py b/crossfit/data/array/decorator.py index 5609919..bcb467d 100644 --- a/crossfit/data/array/decorator.py +++ b/crossfit/data/array/decorator.py @@ -1,18 +1,18 @@ -from typing import Set, TypeVar, Union, Optional, List -from copy import deepcopy -import sys import ast -from pathlib import Path import functools import inspect +import sys import types +from copy import deepcopy from itertools import zip_longest +from pathlib import Path +from typing import List, Optional, Set, TypeVar, Union import astunparse import numpy as np -from crossfit.array import numpy as cnp, np_backend_dispatch - +from crossfit.array import np_backend_dispatch +from crossfit.array import numpy as cnp _CALL_HANDLER_ID = "__crossfit_call_handler__" _CLOSURE_WRAPPER_ID = "__crossfit_closure_wrapper__" diff --git a/crossfit/data/array/ops.py b/crossfit/data/array/ops.py index 9e9f273..76ee2e1 100644 --- a/crossfit/data/array/ops.py +++ b/crossfit/data/array/ops.py @@ -1,6 +1,6 @@ import numpy as np -from crossfit.data.array.dispatch import with_dispatch, np_backend_dispatch +from crossfit.data.array.dispatch import np_backend_dispatch, with_dispatch dtype = with_dispatch(np.dtype) errstate = np.errstate diff --git a/crossfit/data/dataframe/core.py b/crossfit/data/dataframe/core.py index f89cade..f4fbec8 100644 --- a/crossfit/data/dataframe/core.py +++ b/crossfit/data/dataframe/core.py @@ -97,7 +97,9 @@ def groupby_partition(self, by: list): """ if isinstance(by, (str, int, tuple)): by = [by] - return {key: self.take(indices) for key, indices in self.groupby_indices(by).items()} + return { + key: self.take(indices) for key, indices in self.groupby_indices(by).items() + } def cast(self, columns: type | dict | None = None, backend: type | bool = True): """Cast column types and/or frame backend @@ -130,7 +132,9 @@ def cast(self, columns: type | dict | None = None, backend: type | bool = True): new_columns = {} for col, typ in columns.items(): if col not in frame.columns: - raise ValueError(f"{col} not in available columns: {frame.columns}") + raise ValueError( + f"{col} not in available columns: {frame.columns}" + ) try: new_columns[col] = cf.convert_array(frame[col], typ) except TypeError as err: @@ -341,7 +345,9 @@ def __len__(self): if _len is None: _len = len(v) elif len(v) != _len: - raise ValueError(f"Column {k} was length {len(v)}, but expected length {_len}") + raise ValueError( + f"Column {k} was length {len(v)}, but expected length {_len}" + ) return _len @property @@ -367,7 +373,9 @@ def concat( columns = frames[0].columns for frame in frames: if type(frame) != cls: - raise TypeError(f"All frames should be type {cls}, got {type(frame)}") + raise TypeError( + f"All frames should be type {cls}, got {type(frame)}" + ) if columns != frame.columns: raise TypeError("Cannot concatenat misaligned columns") @@ -380,7 +388,9 @@ def concat( combined = {} for frame in frames: if type(frame) != cls: - raise TypeError(f"All frames should be type {cls}, got {type(frame)}") + raise TypeError( + f"All frames should be type {cls}, got {type(frame)}" + ) _columns = set(frame.columns) if _columns.intersection(columns): intersection = _columns.intersection(columns) @@ -407,7 +417,9 @@ def assign(self, **kwargs): data = self.data.copy() for k, v in kwargs.items(): if self.columns and len(v) != len(self): - raise ValueError(f"Column {k} was length {len(v)}, but expected length {len(self)}") + raise ValueError( + f"Column {k} was length {len(v)}, but expected length {len(self)}" + ) data.update(**kwargs) return self.__class__(data) @@ -431,10 +443,12 @@ def take(self, indices, axis=0): assert axis == 0 # TODO: Support axis=1 with crossarray: - return self.__class__({k: np.take(v, indices, axis=axis) for k, v in self.data.items()}) + return self.__class__( + {k: np.take(v, indices, axis=axis) for k, v in self.data.items()} + ) def groupby_indices(self, by: list) -> dict: - + if isinstance(by, (str, int, tuple)): by = [by] diff --git a/crossfit/data/dataframe/dispatch.py b/crossfit/data/dataframe/dispatch.py index c4d777a..3f71dc7 100644 --- a/crossfit/data/dataframe/dispatch.py +++ b/crossfit/data/dataframe/dispatch.py @@ -9,20 +9,19 @@ def __call__(self, data, *args, **kwargs): return data # TODO: Fix this - from crossfit.backend.pandas.dataframe import PandasDataFrame from crossfit.backend.dask.dataframe import DaskDataFrame - + from crossfit.backend.pandas.dataframe import PandasDataFrame + backends = [PandasDataFrame, DaskDataFrame] - try: from crossfit.backend.cudf.dataframe import CudfDataFrame + CudfDataFrame._lib() backends.append(CudfDataFrame) except ImportError: pass - for backend in backends: if isinstance(data, getattr(backend._lib(), "DataFrame")): return backend(data, *args, **kwargs) diff --git a/crossfit/data/sparse/dispatch.py b/crossfit/data/sparse/dispatch.py index 2f5ce41..d21a21a 100644 --- a/crossfit/data/sparse/dispatch.py +++ b/crossfit/data/sparse/dispatch.py @@ -36,7 +36,9 @@ def from_matrix(cls, matrix, keep_zeros=False) -> SparseMatrixProtocol: return cross_cls.from_matrix(matrix, keep_zeros=keep_zeros) @classmethod - def from_lil(cls, rows, data=None, dtype="float32", keep_zeros=False) -> SparseMatrixProtocol: + def from_lil( + cls, rows, data=None, dtype="float32", keep_zeros=False + ) -> SparseMatrixProtocol: cross_cls = CrossSparse(rows) return cross_cls.from_lil(rows, data=data, dtype=dtype, keep_zeros=keep_zeros) diff --git a/crossfit/data/sparse/ranking.py b/crossfit/data/sparse/ranking.py index 929ff10..076470a 100644 --- a/crossfit/data/sparse/ranking.py +++ b/crossfit/data/sparse/ranking.py @@ -1,9 +1,10 @@ import warnings -from crossfit.data.array.masked import MaskedArray + import numpy as np -from crossfit.data.sparse.dispatch import CrossSparse, SparseMatrixProtocol from crossfit.data.array.dispatch import crossarray +from crossfit.data.array.masked import MaskedArray +from crossfit.data.sparse.dispatch import CrossSparse, SparseMatrixProtocol class SparseLabels: @@ -33,7 +34,9 @@ def get_labels_for(self, ranking: "SparseRankings", k=None) -> MaskedArray: return MaskedArray(retrieved, indices.mask) def as_rankings(self): - return SparseRankings.from_scores(self._labels.tocsr(copy=True), warn_empty=False) + return SparseRankings.from_scores( + self._labels.tocsr(copy=True), warn_empty=False + ) @property def labels(self) -> SparseMatrixProtocol: @@ -154,7 +157,8 @@ def _verify_input(cls, arr, dtype=np.floating): if np.issubdtype(dtype, np.floating): if not np.all(np.isfinite(arr)): warnings.warn( - "Input contains NaN or Inf entries which will be ignored.", InvalidValuesWarning + "Input contains NaN or Inf entries which will be ignored.", + InvalidValuesWarning, ) arr[~np.isfinite(arr)] = np.NINF elif not np.issubdtype(dtype, np.integer): @@ -175,12 +179,19 @@ def from_ranked_indices(cls, indices, valid_items=None, invalid_items=None): @classmethod def from_scores( - cls, raw_scores, valid_items=None, invalid_items=None, warn_empty=True, k_max=None + cls, + raw_scores, + valid_items=None, + invalid_items=None, + warn_empty=True, + k_max=None, ): raw_scores = cls._verify_input(raw_scores, dtype=np.floating) if valid_items is not None: - invalid_idx = CrossSparse.from_nonzero_indices(invalid_items).csr.toarray() == 0 + invalid_idx = ( + CrossSparse.from_nonzero_indices(invalid_items).csr.toarray() == 0 + ) raw_scores -= np.inf * invalid_idx if invalid_items is not None: invalid_items = CrossSparse.from_nonzero_indices(invalid_items).csr @@ -226,7 +237,8 @@ def __init__(self, indices, valid_items=None, invalid_items=None, warn_empty=Tru indices.difference(invalid_items) if not indices.isfinite(): warnings.warn( - "Input contains NaN or Inf entries which will be ignored.", InvalidValuesWarning + "Input contains NaN or Inf entries which will be ignored.", + InvalidValuesWarning, ) indices.remove_infinite() n_empty_rows = indices.count_empty_rows() @@ -265,7 +277,9 @@ def from_ranked_indices(cls, indices, valid_items=None, invalid_items=None): return cls(indices, valid_items, invalid_items) @classmethod - def from_scores(cls, raw_scores, valid_items=None, invalid_items=None, warn_empty=True): + def from_scores( + cls, raw_scores, valid_items=None, invalid_items=None, warn_empty=True + ): """ Construct a rankings instance from raw scores where each item's score is specified. Items will be ranked in descending order (higher scores meaning better). @@ -312,7 +326,9 @@ def topk(x, k, return_scores=False): # stable argsort in descending order top_idx_local = top_k_partition.shape[1] - 1 - top_idx_local -= np.fliplr(np.argsort(np.fliplr(top_k_partition), axis=-1, kind="stable")) + top_idx_local -= np.fliplr( + np.argsort(np.fliplr(top_k_partition), axis=-1, kind="stable") + ) # sort the top partition top_idx = np.take_along_axis(index_array, top_idx_local, axis=-1) diff --git a/crossfit/dataset/base.py b/crossfit/dataset/base.py index cc3a126..abd6b62 100644 --- a/crossfit/dataset/base.py +++ b/crossfit/dataset/base.py @@ -65,7 +65,9 @@ def __init__( val: Optional[Union[Dataset, str]] = None, test: Optional[Union[Dataset, str]] = None, ): - self.train: Optional[Dataset] = Dataset(train) if isinstance(train, str) else train + self.train: Optional[Dataset] = ( + Dataset(train) if isinstance(train, str) else train + ) self.val: Optional[Dataset] = Dataset(val) if isinstance(val, str) else val self.test: Optional[Dataset] = Dataset(test) if isinstance(test, str) else test @@ -81,10 +83,14 @@ def __init__( query: Optional[Union[Dataset, str]] = None, item: Optional[Union[Dataset, str]] = None, ): - self.train: Optional[Dataset] = Dataset(train) if isinstance(train, str) else train + self.train: Optional[Dataset] = ( + Dataset(train) if isinstance(train, str) else train + ) self.val: Optional[Dataset] = Dataset(val) if isinstance(val, str) else val self.test: Optional[Dataset] = Dataset(test) if isinstance(test, str) else test - self.query: Optional[Dataset] = Dataset(query) if isinstance(query, str) else query + self.query: Optional[Dataset] = ( + Dataset(query) if isinstance(query, str) else query + ) self.item: Optional[Dataset] = Dataset(item) if isinstance(item, str) else item diff --git a/crossfit/dataset/beir/load.py b/crossfit/dataset/beir/load.py index d38c219..ed588f1 100644 --- a/crossfit/dataset/beir/load.py +++ b/crossfit/dataset/beir/load.py @@ -16,7 +16,9 @@ def load_dataset( ) -> IRDataset: raw_path = download_raw(name, out_dir=out_dir, overwrite=False) - return _process_data(name, raw_path, blocksize=blocksize, overwrite=overwrite, out_dir=out_dir) + return _process_data( + name, raw_path, blocksize=blocksize, overwrite=overwrite, out_dir=out_dir + ) def load_test_dataset( @@ -28,11 +30,18 @@ def load_test_dataset( raw_path = sample_raw(name, out_dir=out_dir, overwrite=False) return _process_data( - name, raw_path, blocksize=blocksize, overwrite=overwrite, out_dir=out_dir, is_test=True + name, + raw_path, + blocksize=blocksize, + overwrite=overwrite, + out_dir=out_dir, + is_test=True, ) -def _process_data(name, raw_path, blocksize=2**30, overwrite=False, out_dir=None, is_test=False): +def _process_data( + name, raw_path, blocksize=2**30, overwrite=False, out_dir=None, is_test=False +): import dask_cudf out_dir = out_dir or CF_HOME @@ -42,11 +51,17 @@ def _process_data(name, raw_path, blocksize=2**30, overwrite=False, out_dir=None # Check if the output directory already exists if os.path.exists(processed_dir): if overwrite: - print("Processed directory {} already exists. Overwriting.".format(processed_dir)) + print( + "Processed directory {} already exists. Overwriting.".format( + processed_dir + ) + ) shutil.rmtree(processed_dir) # Remove the existing directory else: print( - "Processed directory {} already exists. Skipping processing.".format(processed_dir) + "Processed directory {} already exists. Skipping processing.".format( + processed_dir + ) ) return IRDataset.from_dir(processed_dir) @@ -76,7 +91,9 @@ def _process_data(name, raw_path, blocksize=2**30, overwrite=False, out_dir=None corpus_ddf.to_parquet(corpus_dir) qrels_dir = os.path.join(processed_dir, "qrels") - qrels_files = [f for f in os.listdir(os.path.join(raw_path, "qrels")) if f.endswith(".tsv")] + qrels_files = [ + f for f in os.listdir(os.path.join(raw_path, "qrels")) if f.endswith(".tsv") + ] qrels_dtypes = {"query-id": "str", "corpus-id": "str", "score": "int32"} dataset_dirs = {"query": queries_dir, "item": corpus_dir} name_mapping = {"train": "train", "dev": "val", "test": "test"} diff --git a/crossfit/dataset/beir/raw.py b/crossfit/dataset/beir/raw.py index 7b0ce98..f43488b 100644 --- a/crossfit/dataset/beir/raw.py +++ b/crossfit/dataset/beir/raw.py @@ -4,6 +4,7 @@ from typing import Dict, List, Union from beir import util + from crossfit.dataset.home import CF_HOME @@ -203,7 +204,9 @@ class DatasetInfo: def download_raw(name, out_dir=None, overwrite=False) -> str: if name not in BEIR_DATASETS: raise ValueError( - "Dataset {} not found. Available datasets: {}".format(name, BEIR_DATASETS.keys()) + "Dataset {} not found. Available datasets: {}".format( + name, BEIR_DATASETS.keys() + ) ) out_dir = out_dir or CF_HOME @@ -213,10 +216,16 @@ def download_raw(name, out_dir=None, overwrite=False) -> str: # Check if the output directory already exists if os.path.exists(output_path): if overwrite: - print("Output directory {} already exists. Overwriting.".format(output_path)) + print( + "Output directory {} already exists. Overwriting.".format(output_path) + ) shutil.rmtree(output_path) # Remove the existing directory else: - print("Output directory {} already exists. Skipping download.".format(output_path)) + print( + "Output directory {} already exists. Skipping download.".format( + output_path + ) + ) return output_path os.makedirs(output_path, exist_ok=True) @@ -232,7 +241,9 @@ def download_raw(name, out_dir=None, overwrite=False) -> str: return output_path -def sample_raw(name, out_dir=None, overwrite=False, sample_size=100, blocksize=2**30) -> str: +def sample_raw( + name, out_dir=None, overwrite=False, sample_size=100, blocksize=2**30 +) -> str: import cudf import dask_cudf @@ -242,7 +253,9 @@ def sample_raw(name, out_dir=None, overwrite=False, sample_size=100, blocksize=2 sampled_dir = os.path.join(out_dir, "sampled") output_path = os.path.join(sampled_dir, name) - qrels_files = [f for f in os.listdir(os.path.join(full_path, "qrels")) if f.endswith(".tsv")] + qrels_files = [ + f for f in os.listdir(os.path.join(full_path, "qrels")) if f.endswith(".tsv") + ] sampled_query_ids, sampled_corpus_id = set(), set() qrel_dfs = {} diff --git a/crossfit/dataset/home.py b/crossfit/dataset/home.py index 8cc21fb..157a651 100644 --- a/crossfit/dataset/home.py +++ b/crossfit/dataset/home.py @@ -1,4 +1,3 @@ import os - CF_HOME = os.environ.get("CF_HOME", os.path.join(os.path.expanduser("~"), ".cf")) diff --git a/crossfit/dataset/load.py b/crossfit/dataset/load.py index a7dac12..623f7c7 100644 --- a/crossfit/dataset/load.py +++ b/crossfit/dataset/load.py @@ -1,7 +1,9 @@ from crossfit.dataset.beir import load as beir -def load_dataset(name, out_dir=None, blocksize=2**30, overwrite=False, tiny_sample=False): +def load_dataset( + name, out_dir=None, blocksize=2**30, overwrite=False, tiny_sample=False +): load_fn_name = "load_dataset" if not tiny_sample else "load_test_dataset" if name.startswith("beir/"): diff --git a/crossfit/metric/__init__.py b/crossfit/metric/__init__.py index 87e9cc4..3d6900a 100644 --- a/crossfit/metric/__init__.py +++ b/crossfit/metric/__init__.py @@ -1,11 +1,10 @@ +from crossfit.metric.categorical.str_len import MeanStrLength +from crossfit.metric.categorical.value_counts import ValueCounts +from crossfit.metric.continuous.max import Max from crossfit.metric.continuous.mean import Mean, create_mean_metric from crossfit.metric.continuous.min import Min -from crossfit.metric.continuous.max import Max from crossfit.metric.continuous.sum import Sum -from crossfit.metric.categorical.value_counts import ValueCounts -from crossfit.metric.categorical.str_len import MeanStrLength - __all__ = [ "create_mean_metric", "Mean", diff --git a/crossfit/metric/base.py b/crossfit/metric/base.py index 6d7dc4e..a907cea 100644 --- a/crossfit/metric/base.py +++ b/crossfit/metric/base.py @@ -3,9 +3,9 @@ import numpy as np +from crossfit.calculate.aggregate import Aggregator from crossfit.calculate.module import CrossModule, state from crossfit.data import crossarray -from crossfit.calculate.aggregate import Aggregator class CrossMetric(CrossModule, abc.ABC): diff --git a/crossfit/metric/categorical/value_counts.py b/crossfit/metric/categorical/value_counts.py index 40fc6f5..ddfdfe3 100644 --- a/crossfit/metric/categorical/value_counts.py +++ b/crossfit/metric/categorical/value_counts.py @@ -1,5 +1,5 @@ -import pandas as pd import numpy as np +import pandas as pd from crossfit.metric.base import CrossMetric, state @@ -52,7 +52,9 @@ def combine(self, other) -> "ValueCounts": rsuffix="_r", how="outer", ).fillna(0) - combined = (combined_frame["count_l"] + combined_frame["count_r"]).astype("int64") + combined = (combined_frame["count_l"] + combined_frame["count_r"]).astype( + "int64" + ) return ValueCounts(values=combined.index._data, counts=combined.values) diff --git a/crossfit/metric/continuous/mean.py b/crossfit/metric/continuous/mean.py index cb48e41..e19b5e6 100644 --- a/crossfit/metric/continuous/mean.py +++ b/crossfit/metric/continuous/mean.py @@ -1,6 +1,5 @@ import functools as ft - from crossfit.metric.base import CrossMetric, state diff --git a/crossfit/metric/continuous/range.py b/crossfit/metric/continuous/range.py index 23e9471..4eb2542 100644 --- a/crossfit/metric/continuous/range.py +++ b/crossfit/metric/continuous/range.py @@ -8,7 +8,9 @@ class Range(CrossAxisMetric): max = state(init=0, combine=np.maximum) def prepare(self, array) -> "Range": - return Range(axis=self.axis, min=array.min(axis=self.axis), max=array.max(axis=self.axis)) + return Range( + axis=self.axis, min=array.min(axis=self.axis), max=array.max(axis=self.axis) + ) def present(self): return {"min": self.min, "max": self.max} diff --git a/crossfit/metric/ranking/__init__.py b/crossfit/metric/ranking/__init__.py index 1749fce..18d169b 100644 --- a/crossfit/metric/ranking/__init__.py +++ b/crossfit/metric/ranking/__init__.py @@ -1,17 +1,16 @@ -from crossfit.metric.ranking.f1 import F1 -from crossfit.metric.ranking.hitrate import HitRate -from crossfit.metric.ranking.ndcg import DCG, NDCG -from crossfit.metric.ranking.precision import Precision, AP -from crossfit.metric.ranking.rank import FirstRelevantRank, MeanRanks, ReciprocalRank -from crossfit.metric.ranking.recall import Recall from crossfit.data.sparse.ranking import ( - SparseLabels, + Rankings, SparseBinaryLabels, + SparseLabels, SparseNumericLabels, SparseRankings, - Rankings, ) - +from crossfit.metric.ranking.f1 import F1 +from crossfit.metric.ranking.hitrate import HitRate +from crossfit.metric.ranking.ndcg import DCG, NDCG +from crossfit.metric.ranking.precision import AP, Precision +from crossfit.metric.ranking.rank import FirstRelevantRank, MeanRanks, ReciprocalRank +from crossfit.metric.ranking.recall import Recall __all__ = [ "AP", diff --git a/crossfit/metric/ranking/base.py b/crossfit/metric/ranking/base.py index 3896e98..a7ddba6 100644 --- a/crossfit/metric/ranking/base.py +++ b/crossfit/metric/ranking/base.py @@ -1,9 +1,14 @@ -from crossfit.data.array.dispatch import crossarray import numpy as np +from crossfit.data.array.dispatch import crossarray from crossfit.data.array.masked import MaskedArray +from crossfit.data.sparse.ranking import ( + Rankings, + SparseBinaryLabels, + SparseLabels, + SparseRankings, +) from crossfit.metric.continuous.mean import Mean -from crossfit.data.sparse.ranking import SparseBinaryLabels, SparseLabels, Rankings, SparseRankings class RankingMetric(Mean): @@ -27,11 +32,15 @@ def score(self, y_true: SparseLabels, y_pred: Rankings, nan_handling="zerofill") def _bootstrap_ci(cls, scores, n_bootstrap_samples, confidence): if not isinstance(n_bootstrap_samples, int) or n_bootstrap_samples <= 1: raise ValueError("n_bootstrap_samples must be int > 1") - elif not isinstance(confidence, float) or confidence <= 0.0 or confidence >= 1.0: + elif ( + not isinstance(confidence, float) or confidence <= 0.0 or confidence >= 1.0 + ): raise ValueError("Confidence must be float and 0 < confidence < 1") if len(scores): - resamples = np.random.choice(scores, (len(scores), n_bootstrap_samples), replace=True) + resamples = np.random.choice( + scores, (len(scores), n_bootstrap_samples), replace=True + ) bootstrap_means = resamples.mean(axis=0) # Compute "percentile bootstrap" @@ -77,7 +86,9 @@ def nan_handling(self, scores, handling="zerofill"): elif handling == "propagate": return scores else: - raise ValueError('nan_handling must be "propagate", "drop" or "zerofill"') + raise ValueError( + 'nan_handling must be "propagate", "drop" or "zerofill"' + ) def name(self): if self._k is None: diff --git a/crossfit/metric/ranking/f1.py b/crossfit/metric/ranking/f1.py index e707ccc..8b7dcb8 100644 --- a/crossfit/metric/ranking/f1.py +++ b/crossfit/metric/ranking/f1.py @@ -1,9 +1,9 @@ import numpy as np +from crossfit.data.array.masked import MaskedArray from crossfit.metric.ranking.base import SparseBinaryLabels from crossfit.metric.ranking.precision import Precision from crossfit.metric.ranking.recall import Recall -from crossfit.data.array.masked import MaskedArray class F1(Precision, Recall): diff --git a/crossfit/metric/ranking/ndcg.py b/crossfit/metric/ranking/ndcg.py index b0957d7..da829de 100644 --- a/crossfit/metric/ranking/ndcg.py +++ b/crossfit/metric/ranking/ndcg.py @@ -1,8 +1,8 @@ import numpy as np -from crossfit.metric.ranking.base import RankingMetric, SparseLabels, SparseRankings -from crossfit.data.array.masked import MaskedArray from crossfit.data.array.conversion import convert_array +from crossfit.data.array.masked import MaskedArray +from crossfit.metric.ranking.base import RankingMetric, SparseLabels, SparseRankings class DCG(RankingMetric): diff --git a/crossfit/metric/ranking/precision.py b/crossfit/metric/ranking/precision.py index eaa7775..08299ee 100644 --- a/crossfit/metric/ranking/precision.py +++ b/crossfit/metric/ranking/precision.py @@ -1,7 +1,7 @@ import numpy as np -from crossfit.metric.ranking.base import BinaryRankingMetric, SparseBinaryLabels from crossfit.data.array.masked import MaskedArray +from crossfit.metric.ranking.base import BinaryRankingMetric, SparseBinaryLabels class Precision(BinaryRankingMetric): @@ -12,7 +12,9 @@ def __init__(self, k, truncated=False): def _precision(self, y_true: SparseBinaryLabels, y_pred_labels: MaskedArray): n_pos = y_true.get_n_positives(y_pred_labels.shape[0]) n_relevant = np.sum( - (y_pred_labels.data[:, : self._k] == 1) & (~y_pred_labels.mask[:, : self._k]), axis=-1 + (y_pred_labels.data[:, : self._k] == 1) + & (~y_pred_labels.mask[:, : self._k]), + axis=-1, ) if self._truncated: diff --git a/crossfit/metric/ranking/rank.py b/crossfit/metric/ranking/rank.py index af71b08..e7c2be9 100644 --- a/crossfit/metric/ranking/rank.py +++ b/crossfit/metric/ranking/rank.py @@ -1,7 +1,7 @@ import numpy as np -from crossfit.metric.ranking.base import BinaryRankingMetric, SparseBinaryLabels from crossfit.data.array.masked import MaskedArray +from crossfit.metric.ranking.base import BinaryRankingMetric, SparseBinaryLabels class ReciprocalRank(BinaryRankingMetric): diff --git a/crossfit/metric/ranking/recall.py b/crossfit/metric/ranking/recall.py index 57c24a8..24f8e2b 100644 --- a/crossfit/metric/ranking/recall.py +++ b/crossfit/metric/ranking/recall.py @@ -1,7 +1,7 @@ import numpy as np -from crossfit.metric.ranking.base import BinaryRankingMetric, SparseBinaryLabels from crossfit.data.array.masked import MaskedArray +from crossfit.metric.ranking.base import BinaryRankingMetric, SparseBinaryLabels class Recall(BinaryRankingMetric): @@ -12,7 +12,9 @@ def __init__(self, k, truncated=False): def _recall(self, y_true: SparseBinaryLabels, y_pred_labels: MaskedArray): n_pos = y_true.get_n_positives(y_pred_labels.shape[0]) n_relevant = np.sum( - (y_pred_labels.data[:, : self._k] == 1) & (~y_pred_labels.mask[:, : self._k]), axis=-1 + (y_pred_labels.data[:, : self._k] == 1) + & (~y_pred_labels.mask[:, : self._k]), + axis=-1, ) scores = np.NaN * np.zeros_like(n_relevant, dtype=float) diff --git a/crossfit/op/__init__.py b/crossfit/op/__init__.py index 09dc4c1..bbe9385 100644 --- a/crossfit/op/__init__.py +++ b/crossfit/op/__init__.py @@ -1,30 +1,36 @@ -from crossfit.op.base import Op +from crossfit.op.base import Op, ColumnOp, Repartition from crossfit.op.combinators import Sequential - __all__ = [ "Op", + "ColumnOp", + "Repartition", "Sequential", ] try: - from crossfit.backend.torch.op.embed import Embedder - + from crossfit.backend.torch.op.embed import Embedder # noqa + __all__.append("Embedder") except ImportError: pass try: - from crossfit.op.tokenize import Tokenizer - + from crossfit.op.tokenize import Tokenizer # noqa + __all__.append("Tokenizer") except ImportError: pass try: - from crossfit.op.vector_search import CuMLANNSearch, CuMLExactSearch, RaftExactSearch + from crossfit.op.vector_search import ( # noqa + CuMLANNSearch, + CuMLExactSearch, + RaftExactSearch, + ) + __all__.extend(["CuMLANNSearch", "CuMLExactSearch", "RaftExactSearch"]) except ImportError: pass diff --git a/crossfit/op/base.py b/crossfit/op/base.py index 2caa9c7..9d25b41 100644 --- a/crossfit/op/base.py +++ b/crossfit/op/base.py @@ -1,17 +1,22 @@ import inspect +from typing import Dict, List, Generic, Union, TypeVar import uuid +from typing_utils import get_args import dask.dataframe as dd -from tqdm.auto import tqdm +import pandas as pd from dask.distributed import get_worker +from tqdm.auto import tqdm class Op: - def __init__(self, pre=None, cols=False, keep_cols=None): + def __init__(self, pre=None, cols=False, keep_cols=None, post=None): self.pre = pre + self.post = post self.cols = cols self.keep_cols = keep_cols or [] self.id = str(uuid.uuid4()) + self._input_type = None def setup(self): pass @@ -37,6 +42,17 @@ def call_dask(self, data: dd.DataFrame): return output + def create_df(self, *args, **kwargs): + return self._input_type(*args, **kwargs) + + def create_series(self, *args, **kwargs): + if isinstance(self._input_type, pd.DataFrame): + return pd.Series(*args, **kwargs) + + import cudf + + return cudf.Series(*args, **kwargs) + def create_progress_bar(self, total, partition_info=None, **kwargs): return tqdm( total=total, @@ -58,24 +74,37 @@ def add_keep_cols(self, data, output): return output + def _call_df(self, data, *args, partition_info=None, **kwargs): + params = inspect.signature(self.call).parameters + if "partition_info" in params: + output = self.call(data, *args, partition_info=partition_info, **kwargs) + else: + output = self.call(data, *args, **kwargs) + + return output + def __call__(self, data, *args, partition_info=None, **kwargs): if isinstance(data, dd.DataFrame): return self.call_dask(data, *args, **kwargs) self.setup_worker() + inputs = data + self._input_type = type(inputs) if self.pre is not None: params = inspect.signature(self.pre).parameters if "partition_info" in params: - data = self.pre(data, partition_info=partition_info) + inputs = self.pre(inputs, partition_info=partition_info) else: - data = self.pre(data) + inputs = self.pre(inputs) - params = inspect.signature(self.call).parameters - if "partition_info" in params: - output = self.call(data, *args, partition_info=partition_info, **kwargs) - else: - output = self.call(data, *args, **kwargs) + output = self._call_df(inputs, *args, partition_info=partition_info, **kwargs) + if self.post is not None: + params = inspect.signature(self.post).parameters + if "partition_info" in params: + output = self.post(output, partition_info=partition_info) + else: + output = self.post(output) if self.keep_cols: output = self.add_keep_cols(data, output) @@ -89,3 +118,81 @@ def _build_dask_meta(self, data): output.update(meta) return output + + +DtypeT = TypeVar("DtypeT") + + +class ColumnOp(Op, Generic[DtypeT]): + def __init__( + self, + cols: Union[str, List[str], Dict[str, str]], + output_dtype: DtypeT = None, + keep_cols=None, + ): + super().__init__(keep_cols=keep_cols) + self.cols = [cols] if isinstance(cols, str) else cols + + if output_dtype is None: + # Infer output dtype from generic + generics = get_args(self.__orig_bases__[0]) + if isinstance(generics, tuple): + output_dtype = generics[0].__name__ + if output_dtype == "float": + output_dtype = "float32" + if output_dtype == "int": + output_dtype = "int32" + else: + raise ValueError("Could not infer output_dtype, please specify it.") + + self.output_dtype = output_dtype + + def _call_df(self, data, *args, partition_info=None, **kwargs): + output = self.create_df() + + if self.cols is None: + if not str(type(data)).endswith("Series"): + raise ValueError("data must be a Series") + + return self.call(data, *args, partition_info=partition_info, **kwargs) + + for col in self.cols: + if col not in data.columns: + raise ValueError(f"Column {col} not found in data") + + col_out = self.call(data[col]) + output[self._construct_name(col)] = col_out + + return output + + def _construct_name(self, col_name): + if isinstance(self.cols, dict): + return self.cols[col_name] + + return col_name + + def meta(self): + if not self.cols: + return self.output_dtype + + if isinstance(self.cols, dict): + return {self.cols[col]: self.output_dtype for col in self.cols} + + return {col: self.output_dtype for col in self.cols} + + +class Repartition(Op): + def __init__(self, partition_size: int, min_paritions=2): + super().__init__() + self.partition_size = partition_size + self.min_paritions = min_paritions + + def call(self, data): + return data + + def call_dask(self, data): + partitions = max(int(len(data) / self.partition_size), 1) + if partitions < self.min_paritions: + partitions = self.min_paritions + + return data.repartition(partitions) diff --git a/crossfit/op/tokenize.py b/crossfit/op/tokenize.py index 8e3af13..13785b9 100644 --- a/crossfit/op/tokenize.py +++ b/crossfit/op/tokenize.py @@ -11,7 +11,56 @@ from crossfit.op.base import Op -class Tokenizer(Op): +class _TokenizerOp(Op): + def call_column(self, data): + raise NotImplementedError() + + def call(self, data): + output = cudf.DataFrame() + + if self.cols is None: + if not isinstance(data, cudf.Series): + raise ValueError("data must be a cudf Series") + + input_ids, attention_mask = self.call_column(data) + output["input_ids"] = input_ids + output["attention_mask"] = attention_mask + + return output + + for col in self.cols: + if col not in data.columns: + raise ValueError(f"Column {col} not found in data") + + input_ids, attention_mask = self.call_column(data[col]) + output[self._construct_name(col, "input_ids")] = input_ids + output[self._construct_name(col, "attention_mask")] = attention_mask + + return output + + def meta(self): + tokenized = { + "input_ids": "int32", + "attention_mask": "int32", + } + + if len(self.cols) > 1: + tokenized = { + self._construct_name(col, suffix): dtype + for col in self.cols + for suffix, dtype in tokenized.items() + } + + return tokenized + + def _construct_name(self, col_name, suffix): + if len(self.cols) == 1: + return suffix + + return f"{col_name}_{suffix}" + + +class Tokenizer(_TokenizerOp): def __init__( self, model: Model, @@ -49,7 +98,9 @@ def call_column(self, data): text = data.replace("", "unknown") tokenized_data = self.tokenize_strings(text).copy() - tokenized_data = clip_tokens(tokenized_data, max_length=self.max_length, return_type="cp") + tokenized_data = clip_tokens( + tokenized_data, max_length=self.max_length, return_type="cp" + ) input_ids = create_list_series_from_2d_ar( tokenized_data["input_ids"].astype("int32"), data.index @@ -60,50 +111,6 @@ def call_column(self, data): return input_ids, attention_mask - def call(self, data): - output = cudf.DataFrame() - - if self.cols is None: - if not isinstance(data, cudf.Series): - raise ValueError("data must be a cudf Series") - - input_ids, attention_mask = self.call_column(data) - output["input_ids"] = input_ids - output["attention_mask"] = attention_mask - - return output - - for col in self.cols: - if col not in data.columns: - raise ValueError(f"Column {col} not found in data") - - input_ids, attention_mask = self.call_column(data[col]) - output[self._construct_name(col, "input_ids")] = input_ids - output[self._construct_name(col, "attention_mask")] = attention_mask - - return output - - def meta(self): - tokenized = { - "input_ids": "int32", - "attention_mask": "int32", - } - - if len(self.cols) > 1: - tokenized = { - self._construct_name(col, suffix): dtype - for col in self.cols - for suffix, dtype in tokenized.items() - } - - return tokenized - - def _construct_name(self, col_name, suffix): - if len(self.cols) == 1: - return suffix - - return f"{col_name}_{suffix}" - class GPUTokenizer(SubwordTokenizer): def __init__(self, hash_file: str, do_lower_case: bool = True, config=None): diff --git a/crossfit/op/vector_search.py b/crossfit/op/vector_search.py index b61a973..bc0aeee 100644 --- a/crossfit/op/vector_search.py +++ b/crossfit/op/vector_search.py @@ -40,8 +40,12 @@ def search_tensors(self, queries, corpus): raise NotImplementedError() def call_part(self, queries, items): - query_emb = _get_embedding_cupy(queries, self.embedding_col, normalize=self.normalize) - item_emb = _get_embedding_cupy(items, self.embedding_col, normalize=self.normalize) + query_emb = _get_embedding_cupy( + queries, self.embedding_col, normalize=self.normalize + ) + item_emb = _get_embedding_cupy( + items, self.embedding_col, normalize=self.normalize + ) results, indices = self.search_tensors(query_emb, item_emb) @@ -60,15 +64,21 @@ def call_part(self, queries, items): return out def call(self, queries, items): - query_emb = _get_embedding_cupy(queries, self.embedding_col, normalize=self.normalize) - item_emb = _get_embedding_cupy(items, self.embedding_col, normalize=self.normalize) + query_emb = _get_embedding_cupy( + queries, self.embedding_col, normalize=self.normalize + ) + item_emb = _get_embedding_cupy( + items, self.embedding_col, normalize=self.normalize + ) results, indices = self.search_tensors(query_emb, item_emb) df = cudf.DataFrame(index=queries.index) df["query-id"] = queries["_id"] df["query-index"] = queries["index"] - df["corpus-index"] = create_list_series_from_2d_ar(items["index"].values[indices], df.index) + df["corpus-index"] = create_list_series_from_2d_ar( + items["index"].values[indices], df.index + ) df["score"] = create_list_series_from_2d_ar(results, df.index) return df @@ -92,7 +102,9 @@ def reduce(self, grouped): reduced["query-index"] = grouped["query-index"] reduced["query-id"] = grouped["query-id"] reduced["score"] = create_list_series_from_2d_ar(topk_scores, reduced.index) - reduced["corpus-index"] = create_list_series_from_2d_ar(topk_indices, reduced.index) + reduced["corpus-index"] = create_list_series_from_2d_ar( + topk_indices, reduced.index + ) reduced = reduced.set_index("query-index", drop=False) @@ -155,7 +167,9 @@ def __init__( self.normalize = normalize def search_tensors(self, queries, corpus): - results, indices = knn(dataset=corpus, queries=queries, k=self.k, metric=self.metric) + results, indices = knn( + dataset=corpus, queries=queries, k=self.k, metric=self.metric + ) return cp.asarray(results), cp.asarray(indices) @@ -203,7 +217,9 @@ def query(self, knn, queries): if hasattr(queries, "ddf"): query_ddf = queries.ddf() - query_ddf_per_dim = _per_dim_ddf(query_ddf, self.embedding_col, normalize=self.normalize) + query_ddf_per_dim = _per_dim_ddf( + query_ddf, self.embedding_col, normalize=self.normalize + ) distances, indices = knn.kneighbors(query_ddf_per_dim) @@ -270,12 +286,17 @@ def _get_embedding_cupy(data, embedding_col, normalize=True): def _per_dim_ddf( - data: dd.DataFrame, embedding_col: str, index_col: str = "index", normalize: bool = True + data: dd.DataFrame, + embedding_col: str, + index_col: str = "index", + normalize: bool = True, ) -> dd.DataFrame: dim = len(data.head()[embedding_col].iloc[0]) def to_map(part, dim): - values = part[embedding_col].list.leaves.values.reshape(-1, dim).astype("float32") + values = ( + part[embedding_col].list.leaves.values.reshape(-1, dim).astype("float32") + ) if normalize: values = values / cp.linalg.norm(values, axis=1, keepdims=True) diff --git a/crossfit/report/beir/embed.py b/crossfit/report/beir/embed.py index 3bb6477..fc4ebae 100644 --- a/crossfit/report/beir/embed.py +++ b/crossfit/report/beir/embed.py @@ -3,11 +3,11 @@ from typing import Optional from crossfit import op +from crossfit.backend.torch.model import Model from crossfit.dataset.base import Dataset, EmbeddingDatataset, IRDataset from crossfit.dataset.home import CF_HOME from crossfit.dataset.load import load_dataset from crossfit.op.vector_search import VectorSearchOp -from crossfit.backend.torch.model import Model def embed( @@ -26,7 +26,9 @@ def embed( out_dir = out_dir or CF_HOME processed_name = "processed-test" if tiny_sample else "processed" - emb_dir = os.path.join(out_dir, processed_name, "beir", dataset_name, "emb", model.path_or_name) + emb_dir = os.path.join( + out_dir, processed_name, "beir", dataset_name, "emb", model.path_or_name + ) if os.path.exists(emb_dir): if overwrite: diff --git a/crossfit/report/beir/report.py b/crossfit/report/beir/report.py index 94869fa..27b5e3e 100644 --- a/crossfit/report/beir/report.py +++ b/crossfit/report/beir/report.py @@ -3,20 +3,26 @@ import cudf import cupy as cp import dask_cudf -from cuml.preprocessing import LabelEncoder import numpy as np +from cuml.preprocessing import LabelEncoder from crossfit.backend.dask.aggregate import aggregate -from crossfit.data.sparse.dispatch import CrossSparse +from crossfit.backend.torch.model import Model +from crossfit.calculate.aggregate import Aggregator from crossfit.data.array.dispatch import crossarray +from crossfit.data.sparse.dispatch import CrossSparse from crossfit.dataset.base import EmbeddingDatataset -from crossfit.report.beir.embed import embed -from crossfit.calculate.aggregate import Aggregator from crossfit.metric.continuous.mean import Mean -from crossfit.metric.ranking import NDCG, Precision, Recall, SparseBinaryLabels, SparseRankings -from crossfit.report.base import Report +from crossfit.metric.ranking import ( + NDCG, + Precision, + Recall, + SparseBinaryLabels, + SparseRankings, +) from crossfit.op.vector_search import VectorSearchOp -from crossfit.backend.torch.model import Model +from crossfit.report.base import Report +from crossfit.report.beir.embed import embed class BeirMetricAggregator(Aggregator): @@ -29,14 +35,22 @@ def __init__( groupby=None, metrics=[NDCG, Precision, Recall], ): - super().__init__(None, pre=pre, post_group=post_group, post=post, groupby=groupby) + super().__init__( + None, pre=pre, post_group=post_group, post=post, groupby=groupby + ) self.ks = ks self.metrics = metrics def prepare(self, df): - encoder = self.create_label_encoder(df, ["corpus-index-pred", "corpus-index-obs"]) - obs_csr = self.create_csr_matrix(df["corpus-index-obs"], df["score-obs"], encoder) - pred_csr = self.create_csr_matrix(df["corpus-index-pred"], df["score-pred"], encoder) + encoder = self.create_label_encoder( + df, ["corpus-index-pred", "corpus-index-obs"] + ) + obs_csr = self.create_csr_matrix( + df["corpus-index-obs"], df["score-obs"], encoder + ) + pred_csr = self.create_csr_matrix( + df["corpus-index-pred"], df["score-pred"], encoder + ) # TODO: Fix dispatch labels = SparseBinaryLabels(CrossSparse.from_matrix(obs_csr)) @@ -108,7 +122,11 @@ def join_predictions(data, predictions): predictions = predictions.set_index("query-index") merged = observed.merge( - predictions, left_index=True, right_index=True, how="left", suffixes=("-obs", "-pred") + predictions, + left_index=True, + right_index=True, + how="left", + suffixes=("-obs", "-pred"), ).rename(columns={"split-obs": "split"}) output = merged.reset_index() @@ -140,7 +158,9 @@ def console(self): # Sort the @k values within each group for metric, columns in grouped_columns.items(): - grouped_columns[metric] = sorted(columns, key=lambda x: int(x.split("@")[-1])) + grouped_columns[metric] = sorted( + columns, key=lambda x: int(x.split("@")[-1]) + ) # Print table for each metric type for metric, columns in grouped_columns.items(): diff --git a/crossfit/report/data_overview/report.py b/crossfit/report/data_overview/report.py index 4b23409..4bb9f25 100644 --- a/crossfit/report/data_overview/report.py +++ b/crossfit/report/data_overview/report.py @@ -2,19 +2,15 @@ import numpy as np -from crossfit.report.base import Report +from crossfit.backend.dask.aggregate import aggregate from crossfit.calculate.aggregate import Aggregator -from crossfit.metric.common import CommonStats -from crossfit.metric.continuous.range import Range -from crossfit.metric.continuous.moments import Moments from crossfit.metric.categorical.str_len import MeanStrLength from crossfit.metric.categorical.value_counts import ValueCounts -from crossfit.report.data_overview.visualization.facets import ( - visualize, - FacetsOverview, -) - -from crossfit.backend.dask.aggregate import aggregate +from crossfit.metric.common import CommonStats +from crossfit.metric.continuous.moments import Moments +from crossfit.metric.continuous.range import Range +from crossfit.report.base import Report +from crossfit.report.data_overview.visualization.facets import FacetsOverview, visualize class ContinuousMetrics(Aggregator): diff --git a/crossfit/report/data_overview/visualization/facets.py b/crossfit/report/data_overview/visualization/facets.py index c527ae7..48ee883 100644 --- a/crossfit/report/data_overview/visualization/facets.py +++ b/crossfit/report/data_overview/visualization/facets.py @@ -3,7 +3,6 @@ from tensorflow_metadata.proto.v0 import statistics_pb2 - STATS_FILE_NAME = "stats.pb" diff --git a/crossfit/utils/np_utils.py b/crossfit/utils/np_utils.py index c2d418b..6dde10c 100644 --- a/crossfit/utils/np_utils.py +++ b/crossfit/utils/np_utils.py @@ -1,4 +1,5 @@ import inspect + import numpy as np diff --git a/examples/dask_compute_bench.py b/examples/dask_compute_bench.py index 65ec812..202d7c7 100644 --- a/examples/dask_compute_bench.py +++ b/examples/dask_compute_bench.py @@ -1,14 +1,13 @@ import time import dask +from dask_cuda import LocalCUDACluster +from distributed import Client, LocalCluster from crossfit.calculate.frame import MetricFrame from crossfit.dask.calculate import calculate_per_col as calculate_dask from crossfit.stats.continuous.stats import ContinuousStats -from dask_cuda import LocalCUDACluster -from distributed import Client, LocalCluster - # Benchmark assumes Criteo dataset. # Low-cardinality columns: # {C6:4, C9:64, C13:11, C16:155, C17:4, C19:15, C25:109, C26:37} diff --git a/examples/nlp/generate-questions.py b/examples/nlp/generate-questions.py new file mode 100644 index 0000000..d08426a --- /dev/null +++ b/examples/nlp/generate-questions.py @@ -0,0 +1,37 @@ +import crossfit as cf +from crossfit.backend.torch import CurratedTokenizer, CuratedGenerator + + +class QueryPrompt(cf.ColumnOp[str]): + doc_name: str = "Document" + query_name: str = "Relevant query" + + def call(self, data): + return f"{self.doc_name}: \n\n" + data + f"\n\n{self.query_name}: \n\n" + + +def main(model="meta-llama/Llama-2-7b-hf", dataset="beir/quora", overwrite=True): + dataset: cf.IRDataset = cf.load_dataset(dataset, overwrite=False) + + pipe = cf.Sequential( + QueryPrompt(cols="text"), + CurratedTokenizer.from_hf_hub(name=model, cols=["text"]), + # cf.Repartition(50_000), + CuratedGenerator(model, batch_size=32, batch_steps=10, output_col="answer"), + keep_cols=["index", "_id", "text"], + ) + + passages = dataset.item.ddf() + passages = passages.loc[:30] + generated = pipe(passages).compute() + + for _, row in generated.to_pandas().iterrows(): + print(row["text"]) + print() + print("Response: " + row["answer"]) + print() + print() + + +if __name__ == "__main__": + main() diff --git a/tests/backend/dask_backend/test_aggregate.py b/tests/backend/dask_backend/test_aggregate.py index cca107e..692ed7b 100644 --- a/tests/backend/dask_backend/test_aggregate.py +++ b/tests/backend/dask_backend/test_aggregate.py @@ -2,9 +2,8 @@ from crossfit.calculate.aggregate import Aggregator, metric_key from crossfit.data.dataframe.dispatch import CrossFrame -from crossfit.metric.continuous.range import Range from crossfit.metric import Mean - +from crossfit.metric.continuous.range import Range from tests.utils import is_leaf_node_instance_of, sample_df diff --git a/tests/backend/pytorch_backend/test_torch_convert.py b/tests/backend/pytorch_backend/test_torch_convert.py index d6b7cc5..aab9bd7 100644 --- a/tests/backend/pytorch_backend/test_torch_convert.py +++ b/tests/backend/pytorch_backend/test_torch_convert.py @@ -1,6 +1,5 @@ -import pytest - import numpy as np +import pytest import torch from crossfit.data import convert_array diff --git a/tests/backend/test_sklearn.py b/tests/backend/test_sklearn.py index c540b69..6992a4a 100644 --- a/tests/backend/test_sklearn.py +++ b/tests/backend/test_sklearn.py @@ -1,13 +1,11 @@ -import pytest - import numpy as np +import pytest from sklearn import metrics -from sklearn.utils.multiclass import type_of_target from sklearn.utils._array_api import get_namespace +from sklearn.utils.multiclass import type_of_target from crossfit.data import crossarray, np_backend_dispatch - arr1 = [1, 2, 3] arr2 = [4, 5, 6] diff --git a/tests/data/array/test_conversion.py b/tests/data/array/test_conversion.py index 0636652..5a9240a 100644 --- a/tests/data/array/test_conversion.py +++ b/tests/data/array/test_conversion.py @@ -1,6 +1,5 @@ -import pytest - import numpy as np +import pytest from crossfit.data import convert_array diff --git a/tests/data/array/test_decorator.py b/tests/data/array/test_decorator.py index 20591ac..06dfa39 100644 --- a/tests/data/array/test_decorator.py +++ b/tests/data/array/test_decorator.py @@ -1,7 +1,5 @@ -import pytest - import numpy as np - +import pytest from crossfit.data import crossarray from crossfit.utils import test_utils diff --git a/tests/data/dataframe/test_dispatch.py b/tests/data/dataframe/test_dispatch.py index 8940cd8..2ee7de2 100644 --- a/tests/data/dataframe/test_dispatch.py +++ b/tests/data/dataframe/test_dispatch.py @@ -1,6 +1,5 @@ -import pytest - import numpy as np +import pytest from crossfit.backend.pandas.dataframe import PandasDataFrame from crossfit.data.dataframe.core import ArrayBundle diff --git a/tests/report/data_overview/test_report.py b/tests/report/data_overview/test_report.py index 6c8ea25..6582f2b 100644 --- a/tests/report/data_overview/test_report.py +++ b/tests/report/data_overview/test_report.py @@ -1,17 +1,16 @@ -import pandas as pd -import numpy as np import dask.dataframe as dd +import numpy as np +import pandas as pd import crossfit as cf +from crossfit.backend.dask.aggregate import aggregate from crossfit.report.data_overview.report import ( - ContinuousMetrics, CategoricalMetrics, - data_overview_report, + ContinuousMetrics, DataOverviewReport, + data_overview_report, ) from crossfit.report.data_overview.visualization.facets import FacetsOverview -from crossfit.backend.dask.aggregate import aggregate - from tests.utils import sample_df diff --git a/tests/utils.py b/tests/utils.py index da25b81..309ee5c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,7 +1,7 @@ -import pytest from collections.abc import Mapping import pandas as pd +import pytest try: import cudf