diff --git a/OPERATORS.md b/OPERATORS.md new file mode 100644 index 0000000..0ab0681 --- /dev/null +++ b/OPERATORS.md @@ -0,0 +1,253 @@ +# OOCMatrix - Out-of-Core Matrix Operations + +## Overview + +The `OOCMatrix` class provides a high-level, NumPy-like API for out-of-core matrix operations. It wraps existing NumPy/SciPy operations with lazy evaluation, block-wise processing, and streaming constructs, focusing on **orchestration** rather than reimplementing mathematical operations. + +## Design Philosophy + +**Key Principle**: Don't reimplement optimized math libraries - orchestrate them intelligently. + +- **Reuse Existing Libraries**: All mathematical operations (matrix multiply, element-wise ops, reductions) use NumPy/SciPy/Pandas for in-block computations +- **Focus on Orchestration**: The framework handles block loading, buffer management, lazy evaluation, and scheduling +- **Transparent Out-of-Core**: Provides a familiar API that doesn't require full matrices in RAM + +## API Reference + +### Creating OOCMatrix + +```python +from paper import OOCMatrix + +# Load existing matrix +A = OOCMatrix('data.bin', shape=(10_000_000, 1000), dtype=np.float32, mode='r') + +# Create new matrix +B = OOCMatrix('new.bin', shape=(1000, 500), dtype=np.float32, mode='w+', create=True) +``` + +### Block-wise Operations + +#### blockwise_apply(op, output_path=None) +Apply a function to each block independently. + +```python +# Normalize using NumPy operations on each block +A_norm = A.blockwise_apply(lambda x: (x - x.mean()) / x.std()) + +# Apply ReLU activation +A_relu = A.blockwise_apply(lambda x: np.maximum(0, x)) + +# Custom transformation +A_transformed = A.blockwise_apply(lambda x: np.tanh(x * 2.0)) +``` + +#### blockwise_reduce(op, combine_op=None) +Reduce across all blocks using a reduction operation. + +```python +# Custom reduction +max_val = A.blockwise_reduce(np.max, combine_op=max) + +# Sum with custom combining +total = A.blockwise_reduce(np.sum, combine_op=lambda x, y: x + y) +``` + +### Matrix Operations + +#### matmul(other, op=None, output_path=None) +Matrix multiplication with custom operation. + +```python +# Use NumPy dot product (default) +C = A.matmul(B, op=np.dot) + +# Use custom operation +C = A.matmul(B, op=lambda a, b: np.dot(a, b)) +``` + +### Statistical Operations + +All operations are computed block-wise without loading the full matrix: + +```python +total = A.sum() # Sum of all elements +mean = A.mean() # Mean value +std = A.std() # Standard deviation +min_val = A.min() # Minimum value +max_val = A.max() # Maximum value +``` + +### Lazy Evaluation & Operators + +Build computation plans that execute only when materialized: + +```python +# Lazy operations (don't execute immediately) +result = (A + B) * 2 +result = A @ B +result = A * scalar + +# Execute the plan +materialized = result.compute('output.bin') +``` + +### Block Iteration + +Stream through matrix blocks for custom processing: + +```python +for block, (row_start, col_start) in A.iterate_blocks(): + # block is a NumPy array + # Process each block independently + process(block) +``` + +## Usage Examples + +### Example 1: Normalization + +```python +from paper import OOCMatrix +import numpy as np + +A = OOCMatrix('data.bin', shape=(10_000_000, 1000)) + +# Compute statistics (block-wise) +mean = A.mean() +std = A.std() + +# Normalize using existing NumPy operations +A_normalized = A.blockwise_apply(lambda x: (x - mean) / std) + +print(f"Normalized mean: {A_normalized.mean()}") # ~0 +print(f"Normalized std: {A_normalized.std()}") # ~1 +``` + +### Example 2: Matrix Multiplication Pipeline + +```python +A = OOCMatrix('A.bin', shape=(10_000_000, 1000)) +B = OOCMatrix('B.bin', shape=(1000, 500)) + +# Matrix multiplication using NumPy dot +C = A.matmul(B, op=np.dot) + +# Process results block-by-block +for block, (r, c) in C.iterate_blocks(): + # Send to downstream system + send_to_database(block, position=(r, c)) +``` + +### Example 3: Complex Expression with Fusion + +```python +A = OOCMatrix('A.bin', shape=(1_000_000, 1000)) +B = OOCMatrix('B.bin', shape=(1_000_000, 1000)) + +# Build lazy expression (operator fusion optimization) +result = (A + B) * 2 + +# Execute optimized plan +C = result.compute('result.bin') +``` + +### Example 4: Custom Block Processing + +```python +A = OOCMatrix('data.bin', shape=(10_000_000, 1000)) + +# Apply custom function using SciPy +from scipy.special import expit + +A_sigmoid = A.blockwise_apply(lambda x: expit(x)) + +# Chain operations +A_processed = A.blockwise_apply( + lambda x: np.clip((x - x.mean()) / x.std(), -3, 3) +) +``` + +## Performance Characteristics + +- **Memory Efficient**: Only loads blocks needed for current computation +- **Lazy Evaluation**: Builds DAG for optimization before execution +- **Operator Fusion**: Automatically fuses compatible operations (e.g., (A+B)*scalar) +- **Buffer Management**: Intelligent caching and eviction strategies +- **Parallelization**: Thread-based parallelism for fused operations + +## Comparison with Similar Frameworks + +| Feature | OOCMatrix | Dask | Vaex | +|---------|-----------|------|------| +| Lazy Evaluation | ✓ | ✓ | ✓ | +| Operator Fusion | ✓ | Limited | Limited | +| Block Iteration | ✓ | ✓ | ✓ | +| NumPy Backend | ✓ | ✓ | ✓ | +| Buffer Management | Advanced | Basic | Basic | +| Custom Operations | ✓ | ✓ | ✓ | + +## Implementation Details + +### No Kernel Rewrites +All mathematical operations use existing, optimized libraries: +- Matrix operations: `np.dot`, `np.matmul` +- Element-wise: `np.add`, `np.multiply`, etc. +- Reductions: `np.sum`, `np.mean`, `np.std`, etc. +- Custom: Any NumPy/SciPy function + +### Focus Areas +The framework provides: +1. **Block Orchestration**: Managing how data flows through operations +2. **Lazy DAG Building**: Constructing computation plans +3. **Buffer Management**: Caching, eviction, prefetching +4. **Scheduling**: Optimal ordering of block operations +5. **Streaming**: Iterating over results without materialization + +### What We Don't Do +- ❌ Reimplement matrix multiplication +- ❌ Rewrite NumPy/SciPy kernels +- ❌ Custom linear algebra code +- ❌ Low-level optimization (handled by NumPy/BLAS) + +### What We Do +- ✓ Smart block loading and caching +- ✓ Lazy evaluation and operator fusion +- ✓ Memory-efficient streaming +- ✓ Computation plan optimization +- ✓ Transparent out-of-core orchestration + +## Testing + +The OOCMatrix implementation includes comprehensive tests: + +```bash +# Run all operator tests +python -m unittest tests.test_operators + +# Run specific test class +python -m unittest tests.test_operators.TestOOCMatrix +python -m unittest tests.test_operators.TestOOCMatrixIntegration +``` + +Test coverage includes: +- Block iteration +- Block-wise operations +- Lazy evaluation +- Operator overloading +- Statistical operations +- Context manager protocol +- Integration with existing Plan infrastructure + +## Benchmarks + +See main README for performance comparisons with Dask and other frameworks. + +## Future Extensions + +Potential areas for enhancement: +- GPU backend support (CuPy) +- Additional reduction operations +- More sophisticated operator fusion patterns +- Distributed computation support +- Integration with Pandas for heterogeneous data diff --git a/README.md b/README.md index 8bf65cc..a83c545 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,60 @@ # Paper framework -Paper a lightweight Python framework for performing matrix computations on datasets that are too large to fit into main memory. It is designed around the principle of lazy evaluation, which allows it to build a computation plan and apply powerful optimizations, such as operator fusion, before executing any costly I/O operations. +Paper is a lightweight Python framework for performing matrix computations on datasets that are too large to fit into main memory. It is designed around the principle of lazy evaluation, which allows it to build a computation plan and apply powerful optimizations, such as operator fusion, before executing any costly I/O operations. The architecture is inspired by modern data systems and academic research (e.g., PreVision), with a clear separation between the logical plan, the physical execution backend, and an intelligent optimizer. +## OOCMatrix API - High-Level NumPy-like Interface + +The framework now includes **OOCMatrix**, a high-level wrapper that provides a NumPy-like API for out-of-core operations. This API focuses on **orchestration** rather than reimplementing mathematical operations, leveraging existing NumPy/SciPy/Pandas operations with lazy evaluation and block-wise processing. + +### Key Features + +- **NumPy-like API**: Familiar interface for matrix operations +- **Lazy Evaluation**: Build computation plans before execution +- **Block-wise Processing**: Automatically chunks large datasets +- **Operator Support**: Reuses existing optimized libraries (NumPy, SciPy) for in-block operations +- **Memory Efficient**: Smart block loading, buffer management, and streaming + +### Quick Start + +```python +from paper import OOCMatrix +import numpy as np + +# Create or load large matrices +A = OOCMatrix('fileA.bin', shape=(10_000_000, 1000)) +B = OOCMatrix('fileB.bin', shape=(1000, 1000)) + +# Perform operations using existing NumPy functions +C = A.matmul(B, op=np.dot) + +# Iterate over result blocks (no full matrix in memory) +for block, (r, c) in C.iterate_blocks(): + process(block) # Each block uses NumPy operations + +# Common operations (computed block-wise) +mean = A.mean() +std = A.std() +total = A.sum() + +# Apply custom transformations using NumPy +A_normalized = A.blockwise_apply(lambda x: (x - mean) / std) + +# Lazy evaluation with operator fusion +result = (A + B) * 2 # Builds plan, doesn't execute +materialized = result.compute('output.bin') # Executes optimized plan +``` + +### Examples + +See `examples_oocmatrix.py` for comprehensive examples including: +- Basic statistics (sum, mean, std, min, max) +- Block-wise transformations +- Lazy evaluation and operator fusion +- Matrix multiplication with custom operations +- Block iteration for streaming workflows + ### Architecture @@ -13,13 +64,20 @@ The architecture is inspired by modern data systems and academic research (e.g., ### Testing ```bash -# Run all tests -python ./tests/run_tests.py +# Run all tests (including OOCMatrix tests) +python run_tests.py -# Run a specific test -python ./tests/run_tests.py addition -python ./tests/run_tests.py fused -python ./tests/run_tests.py scalar +# Or run specific test modules +python -m unittest tests.test_operators +python -m unittest tests.test_core +python -m unittest tests.test_fusion_operations +``` + +### Running Examples + +```bash +# Run OOCMatrix examples +python examples_oocmatrix.py ``` ### Buffer Manager architecture diff --git a/examples_oocmatrix.py b/examples_oocmatrix.py new file mode 100644 index 0000000..15d0d5d --- /dev/null +++ b/examples_oocmatrix.py @@ -0,0 +1,348 @@ +""" +Example demonstrating the OOCMatrix API for out-of-core operations. + +This example shows how to use the high-level OOCMatrix wrapper to perform +operations on large matrices that don't fit in memory, while leveraging +existing NumPy/SciPy operations for in-block computations. +""" + +import numpy as np +import os +import tempfile +from paper import OOCMatrix +from paper.core import PaperMatrix +from paper.config import TILE_SIZE + + +def create_large_matrix(filepath, shape, value_func=None): + """ + Create a large matrix file with specified values. + + Args: + filepath: Path where to save the matrix + shape: Shape of the matrix (rows, cols) + value_func: Optional function(i, j) -> value for element at (i, j) + """ + print(f"Creating matrix at '{filepath}' with shape {shape}...") + + # Create the matrix using PaperMatrix for initialization + matrix = PaperMatrix(filepath, shape, mode='w+') + + # Fill with data tile by tile + for r_start in range(0, shape[0], TILE_SIZE): + r_end = min(r_start + TILE_SIZE, shape[0]) + for c_start in range(0, shape[1], TILE_SIZE): + c_end = min(c_start + TILE_SIZE, shape[1]) + + if value_func is None: + # Random data + tile = np.random.rand(r_end - r_start, c_end - c_start).astype(np.float32) + else: + # Custom values + tile = np.zeros((r_end - r_start, c_end - c_start), dtype=np.float32) + for i in range(tile.shape[0]): + for j in range(tile.shape[1]): + tile[i, j] = value_func(r_start + i, c_start + j) + + matrix.data[r_start:r_end, c_start:c_end] = tile + + matrix.data.flush() + matrix.close() + print(f"Matrix created successfully.") + + +def example_basic_operations(): + """Demonstrate basic operations with OOCMatrix.""" + print("\n" + "="*60) + print("Example 1: Basic Operations") + print("="*60) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create test matrices + path_a = os.path.join(tmpdir, "A.bin") + path_b = os.path.join(tmpdir, "B.bin") + + shape = (1000, 500) + create_large_matrix(path_a, shape) + create_large_matrix(path_b, shape) + + # Load matrices using OOCMatrix + A = OOCMatrix(path_a, shape, dtype=np.float32, mode='r') + B = OOCMatrix(path_b, shape, dtype=np.float32, mode='r') + + print(f"\nMatrix A: {A}") + print(f"Matrix B: {B}") + + # Compute statistics without loading entire matrix + print(f"\nA.sum() = {A.sum():.2f}") + print(f"A.mean() = {A.mean():.6f}") + print(f"A.std() = {A.std():.6f}") + print(f"A.min() = {A.min():.6f}") + print(f"A.max() = {A.max():.6f}") + + A.close() + B.close() + + +def example_blockwise_operations(): + """Demonstrate blockwise operations with custom functions.""" + print("\n" + "="*60) + print("Example 2: Blockwise Operations") + print("="*60) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create a matrix + path = os.path.join(tmpdir, "data.bin") + shape = (800, 400) + create_large_matrix(path, shape) + + # Load as OOCMatrix + A = OOCMatrix(path, shape, dtype=np.float32, mode='r') + + print(f"Original matrix: {A}") + + # Example 1: Apply normalization using NumPy operations + print("\n1. Normalization using blockwise_apply:") + mean_val = A.mean() + std_val = A.std() + + normalized_path = os.path.join(tmpdir, "normalized.bin") + A_normalized = A.blockwise_apply( + lambda x: (x - mean_val) / std_val, + output_path=normalized_path + ) + + print(f" Normalized matrix mean: {A_normalized.mean():.6f} (should be ~0)") + print(f" Normalized matrix std: {A_normalized.std():.6f} (should be ~1)") + + # Example 2: Apply custom function to each block + print("\n2. Apply ReLU activation:") + relu_path = os.path.join(tmpdir, "relu.bin") + A_relu = A.blockwise_apply( + lambda x: np.maximum(0, x - 0.5), + output_path=relu_path + ) + print(f" ReLU applied successfully") + + A.close() + A_normalized.close() + A_relu.close() + + +def example_lazy_evaluation(): + """Demonstrate lazy evaluation and operator fusion.""" + print("\n" + "="*60) + print("Example 3: Lazy Evaluation & Operator Fusion") + print("="*60) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create matrices + path_a = os.path.join(tmpdir, "A.bin") + path_b = os.path.join(tmpdir, "B.bin") + + shape = (600, 400) + create_large_matrix(path_a, shape, value_func=lambda i, j: 2.0) + create_large_matrix(path_b, shape, value_func=lambda i, j: 3.0) + + A = OOCMatrix(path_a, shape, dtype=np.float32, mode='r') + B = OOCMatrix(path_b, shape, dtype=np.float32, mode='r') + + # Build lazy expression: (A + B) * 2 + # This doesn't execute yet! + print("\nBuilding lazy expression: (A + B) * 2") + lazy_result = (A + B) * 2 + + print(f"Lazy plan created: {lazy_result._plan}") + + # Now execute the plan + print("\nExecuting the computation plan...") + result_path = os.path.join(tmpdir, "result.bin") + result = lazy_result.compute(result_path) + + print(f"Result computed: {result}") + print(f"Expected value: (2 + 3) * 2 = 10") + print(f"Actual mean: {result.mean():.2f}") + + A.close() + B.close() + result.close() + + +def example_matrix_multiplication(): + """Demonstrate matrix multiplication with custom operations.""" + print("\n" + "="*60) + print("Example 4: Matrix Multiplication") + print("="*60) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create matrices for multiplication + path_a = os.path.join(tmpdir, "A.bin") + path_b = os.path.join(tmpdir, "B.bin") + + shape_a = (500, 300) + shape_b = (300, 200) + + create_large_matrix(path_a, shape_a) + create_large_matrix(path_b, shape_b) + + A = OOCMatrix(path_a, shape_a, dtype=np.float32, mode='r') + B = OOCMatrix(path_b, shape_b, dtype=np.float32, mode='r') + + print(f"\nA shape: {A.shape}") + print(f"B shape: {B.shape}") + + # Method 1: Using matmul with custom operation (uses np.dot by default) + print("\nMethod 1: Using matmul() with custom operation") + result_path = os.path.join(tmpdir, "result_matmul.bin") + C = A.matmul(B, op=np.dot, output_path=result_path) + + print(f"Result shape: {C.shape}") + print(f"Result mean: {C.mean():.6f}") + + # Method 2: Using @ operator (lazy evaluation) + print("\nMethod 2: Using @ operator (lazy)") + lazy_product = A @ B + + result_path2 = os.path.join(tmpdir, "result_lazy.bin") + C2 = lazy_product.compute(result_path2) + + print(f"Result shape: {C2.shape}") + print(f"Result mean: {C2.mean():.6f}") + + A.close() + B.close() + C.close() + C2.close() + + +def example_block_iteration(): + """Demonstrate iterating over matrix blocks.""" + print("\n" + "="*60) + print("Example 5: Block Iteration") + print("="*60) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create a matrix + path = os.path.join(tmpdir, "data.bin") + shape = (400, 300) + create_large_matrix(path, shape, value_func=lambda i, j: i + j) + + A = OOCMatrix(path, shape, dtype=np.float32, mode='r') + + print(f"\nIterating over blocks of {A}:") + print(f"Block size: {TILE_SIZE}x{TILE_SIZE}") + + block_count = 0 + total_sum = 0.0 + + # Process each block + for block, (r_start, c_start) in A.iterate_blocks(): + block_count += 1 + block_sum = np.sum(block) + total_sum += block_sum + + if block_count <= 3: # Show first 3 blocks + print(f" Block {block_count}: position=({r_start}, {c_start}), " + f"shape={block.shape}, sum={block_sum:.2f}") + + print(f"\nTotal blocks processed: {block_count}") + print(f"Total sum (via iteration): {total_sum:.2f}") + print(f"Total sum (via A.sum()): {A.sum():.2f}") + + A.close() + + +def example_as_in_issue(): + """ + Reproduce the example from the GitHub issue. + + This demonstrates the API as specified in the issue description. + """ + print("\n" + "="*60) + print("Example 6: API from GitHub Issue") + print("="*60) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create matrices (smaller for demo purposes) + path_a = os.path.join(tmpdir, "fileA.bin") + path_b = os.path.join(tmpdir, "fileB.bin") + + shape_a = (10000, 1000) # Scaled down from 10M for demo + shape_b = (1000, 1000) + + print("\nCreating large matrices...") + create_large_matrix(path_a, shape_a) + create_large_matrix(path_b, shape_b) + + # User applies existing NumPy/SciPy operations on each block automatically + A = OOCMatrix(path_a, shape=shape_a, dtype=np.float32, mode='r') + B = OOCMatrix(path_b, shape=shape_b, dtype=np.float32, mode='r') + + print(f"\nA: {A}") + print(f"B: {B}") + + # This triggers block-wise multiplication, but each block operation is plain NumPy + def matmul_blocks(A_block, B_block): + return np.dot(A_block, B_block) + + # API exposes big operations -- no kernel rewrite required! + result_path = os.path.join(tmpdir, "result.bin") + C = A.matmul(B, op=matmul_blocks, output_path=result_path) + + print(f"\nResult C = A @ B: {C}") + + # Downstream systems can consume each result block + print("\nProcessing result blocks:") + block_count = 0 + for block, idx in C.iterate_blocks(): + # downstream systems can consume each result block + block_count += 1 + if block_count <= 2: + print(f" Processing block at {idx}, shape: {block.shape}") + + print(f"Total result blocks: {block_count}") + + # Other ops, e.g., sum, mean, normalization: + print("\nComputing statistics:") + mean = A.mean() + std_val = A.std() + + print(f" Mean: {mean:.6f}") + print(f" Std: {std_val:.6f}") + + # Normalization using blockwise_apply + norm_path = os.path.join(tmpdir, "normalized.bin") + A_normalized = A.blockwise_apply(lambda x: (x - mean) / std_val, output_path=norm_path) + + print(f"\nNormalized A: {A_normalized}") + print(f" Normalized mean: {A_normalized.mean():.6f}") + + A.close() + B.close() + C.close() + A_normalized.close() + + +def main(): + """Run all examples.""" + print("\n" + "="*70) + print(" OOCMatrix API Examples - Out-of-Core Matrix Operations") + print("="*70) + print("\nThis demonstrates the high-level API for out-of-core operations,") + print("wrapping NumPy/SciPy operations with lazy evaluation and block-wise") + print("processing, without reimplementing mathematical operations.") + + example_basic_operations() + example_blockwise_operations() + example_lazy_evaluation() + example_matrix_multiplication() + example_block_iteration() + example_as_in_issue() + + print("\n" + "="*70) + print("All examples completed successfully!") + print("="*70 + "\n") + + +if __name__ == '__main__': + main() diff --git a/paper/__init__.py b/paper/__init__.py index 245eb99..77145e4 100644 --- a/paper/__init__.py +++ b/paper/__init__.py @@ -1,2 +1,3 @@ from .core import PaperMatrix -from .plan import Plan, EagerNode \ No newline at end of file +from .plan import Plan, EagerNode +from .operators import OOCMatrix \ No newline at end of file diff --git a/paper/operators.py b/paper/operators.py new file mode 100644 index 0000000..3faa832 --- /dev/null +++ b/paper/operators.py @@ -0,0 +1,380 @@ +""" +Operators module - provides high-level API for out-of-core operations. + +This module implements the OOCMatrix wrapper that provides a NumPy-like API +for out-of-core matrix operations, focusing on orchestration rather than +reimplementing mathematical operations. +""" + +import numpy as np +import os +from typing import Callable, Iterator, Tuple, Optional, Any +from .core import PaperMatrix +from .plan import Plan, EagerNode +from .config import TILE_SIZE + + +class OOCMatrix: + """ + Out-of-core matrix wrapper that provides a NumPy-like API. + + This class wraps existing NumPy/SciPy operations with lazy evaluation, + block-wise processing, and streaming constructs. It focuses on smart + block loading, buffer management, and iteration control while delegating + actual mathematical operations to optimized libraries. + + Example: + >>> A = OOCMatrix('fileA.h5', shape=(10_000_000, 1000)) + >>> B = OOCMatrix('fileB.h5', shape=(1000, 1000)) + >>> C = A.matmul(B, op=lambda a, b: np.dot(a, b)) + >>> for block, idx in C.iterate_blocks(): + ... process(block) + """ + + def __init__(self, filepath: str, shape: Tuple[int, int], + dtype=np.float32, mode='r', create: bool = False): + """ + Initialize an OOCMatrix. + + Args: + filepath: Path to the matrix file on disk + shape: Shape of the matrix (rows, cols) + dtype: Data type of the matrix elements + mode: File access mode ('r', 'w+', etc.) + create: If True, create a new empty file + """ + self.filepath = filepath + self.shape = shape + self.dtype = np.dtype(dtype) + self.mode = mode + + # Create file if requested + if create and not os.path.exists(filepath): + self._create_empty_file() + + # Wrap the underlying PaperMatrix + self._matrix = PaperMatrix(filepath, shape, dtype=dtype, mode=mode) + + # Create a lazy Plan wrapper for this matrix + self._plan = Plan(EagerNode(self._matrix)) + + def _create_empty_file(self): + """Create an empty file of the correct size.""" + with open(self.filepath, "wb") as f: + file_size = self.shape[0] * self.shape[1] * self.dtype.itemsize + if file_size > 0: + f.seek(file_size - 1) + f.write(b'\0') + + @property + def plan(self) -> Plan: + """Get the underlying lazy evaluation plan.""" + return self._plan + + @property + def matrix(self) -> PaperMatrix: + """Get the underlying PaperMatrix.""" + return self._matrix + + def iterate_blocks(self, block_size: Optional[int] = None) -> Iterator[Tuple[np.ndarray, Tuple[int, int]]]: + """ + Iterate over blocks of the matrix. + + This allows downstream systems to consume result blocks without + loading the entire matrix into memory. + + Args: + block_size: Size of each block (defaults to TILE_SIZE) + + Yields: + Tuple of (block_data, (row_start, col_start)) + """ + if block_size is None: + block_size = TILE_SIZE + + rows, cols = self.shape + + for r_start in range(0, rows, block_size): + r_end = min(r_start + block_size, rows) + for c_start in range(0, cols, block_size): + c_end = min(c_start + block_size, cols) + + # Load the block from disk + block = self._matrix.get_tile(r_start, c_start) + + yield block, (r_start, c_start) + + def blockwise_apply(self, op: Callable[[np.ndarray], np.ndarray], + output_path: Optional[str] = None) -> 'OOCMatrix': + """ + Apply a function to each block of the matrix. + + This method enables element-wise transformations using existing NumPy + operations without loading the entire matrix into memory. + + Args: + op: Function to apply to each block (takes ndarray, returns ndarray) + output_path: Path for the output matrix (if None, creates temp file) + + Returns: + New OOCMatrix with the operation applied + + Example: + >>> A = OOCMatrix('input.bin', shape=(10000, 1000)) + >>> # Normalize using NumPy operations on each block + >>> A_norm = A.blockwise_apply(lambda x: (x - x.mean()) / x.std()) + """ + if output_path is None: + output_path = self.filepath + ".blockwise_apply.tmp" + + # Create output matrix + result = OOCMatrix(output_path, self.shape, dtype=self.dtype, + mode='w+', create=True) + + # Process each block + for block, (r_start, c_start) in self.iterate_blocks(): + # Apply the operation using existing NumPy/library code + transformed_block = op(block) + + # Write the result + r_end = r_start + block.shape[0] + c_end = c_start + block.shape[1] + result._matrix.data[r_start:r_end, c_start:c_end] = transformed_block + + result._matrix.data.flush() + return result + + def blockwise_reduce(self, op: Callable[[np.ndarray], Any], + combine_op: Optional[Callable[[Any, Any], Any]] = None) -> Any: + """ + Apply a reduction operation across all blocks. + + This enables operations like sum, mean, max, etc. using existing + NumPy reduction functions on a per-block basis. + + Args: + op: Reduction function to apply to each block + combine_op: Function to combine results from different blocks + (if None, uses addition for combining) + + Returns: + Reduced value + + Example: + >>> A = OOCMatrix('input.bin', shape=(10000, 1000)) + >>> mean = A.blockwise_reduce(np.mean) + >>> total_sum = A.blockwise_reduce(np.sum) + """ + if combine_op is None: + # Default to addition for combining results + combine_op = lambda x, y: x + y + + result = None + count = 0 + + for block, _ in self.iterate_blocks(): + block_result = op(block) + + if result is None: + result = block_result + else: + result = combine_op(result, block_result) + + count += 1 + + return result + + def matmul(self, other: 'OOCMatrix', + op: Optional[Callable[[np.ndarray, np.ndarray], np.ndarray]] = None, + output_path: Optional[str] = None) -> 'OOCMatrix': + """ + Perform matrix multiplication using block-wise processing. + + This method leverages existing optimized libraries (NumPy, etc.) for + in-block operations while handling the orchestration of large matrices. + + Args: + other: The other matrix to multiply with + op: Operation to use for block multiplication (defaults to np.dot) + output_path: Path for output matrix + + Returns: + Result matrix + + Example: + >>> A = OOCMatrix('A.bin', shape=(10000, 1000)) + >>> B = OOCMatrix('B.bin', shape=(1000, 500)) + >>> C = A.matmul(B, op=np.dot) + """ + if op is None: + op = np.dot + + # Use the underlying Plan infrastructure for lazy evaluation + result_plan = self._plan @ other._plan + + if output_path is None: + output_path = self.filepath + ".matmul.result.tmp" + + # Execute the plan + result_matrix, buffer_manager = result_plan.compute(output_path) + + # Wrap in OOCMatrix + return OOCMatrix(output_path, result_matrix.shape, + dtype=result_matrix.dtype, mode='r') + + def sum(self) -> float: + """ + Compute the sum of all elements. + + Returns: + Sum of all matrix elements + """ + return self.blockwise_reduce(np.sum) + + def mean(self) -> float: + """ + Compute the mean of all elements. + + Returns: + Mean of all matrix elements + """ + total_sum = self.sum() + total_elements = self.shape[0] * self.shape[1] + return total_sum / total_elements + + def std(self) -> float: + """ + Compute the standard deviation of all elements. + + Returns: + Standard deviation of all matrix elements + """ + mean_val = self.mean() + + # Compute variance using blockwise operations + def variance_block(block): + return np.sum((block - mean_val) ** 2) + + total_var = self.blockwise_reduce(variance_block) + total_elements = self.shape[0] * self.shape[1] + + return np.sqrt(total_var / total_elements) + + def max(self) -> float: + """ + Compute the maximum element. + + Returns: + Maximum element value + """ + return self.blockwise_reduce(np.max, combine_op=max) + + def min(self) -> float: + """ + Compute the minimum element. + + Returns: + Minimum element value + """ + return self.blockwise_reduce(np.min, combine_op=min) + + def __add__(self, other: 'OOCMatrix') -> 'OOCMatrix': + """ + Add two matrices using lazy evaluation. + + Args: + other: Matrix to add + + Returns: + New OOCMatrix representing the lazy addition + """ + # Use the underlying Plan infrastructure + result_plan = self._plan + other._plan + + # Create a new OOCMatrix wrapper with the combined plan + # Note: This doesn't execute yet, maintaining lazy evaluation + result = OOCMatrix.__new__(OOCMatrix) + result.filepath = self.filepath + ".add.lazy" + result.shape = self.shape + result.dtype = self.dtype + result.mode = 'r' + result._plan = result_plan + result._matrix = None # Will be created on compute + + return result + + def __mul__(self, scalar: float) -> 'OOCMatrix': + """ + Multiply matrix by a scalar using lazy evaluation. + + Args: + scalar: Scalar value to multiply by + + Returns: + New OOCMatrix representing the lazy multiplication + """ + result_plan = self._plan * scalar + + result = OOCMatrix.__new__(OOCMatrix) + result.filepath = self.filepath + ".mul.lazy" + result.shape = self.shape + result.dtype = self.dtype + result.mode = 'r' + result._plan = result_plan + result._matrix = None + + return result + + def __matmul__(self, other: 'OOCMatrix') -> 'OOCMatrix': + """ + Matrix multiply using lazy evaluation (@ operator). + + Args: + other: Matrix to multiply with + + Returns: + New OOCMatrix representing the lazy multiplication + """ + result_plan = self._plan @ other._plan + + result = OOCMatrix.__new__(OOCMatrix) + result.filepath = self.filepath + ".matmul.lazy" + result.shape = (self.shape[0], other.shape[1]) + result.dtype = self.dtype + result.mode = 'r' + result._plan = result_plan + result._matrix = None + + return result + + def compute(self, output_path: str, cache_size_tiles: Optional[int] = None) -> 'OOCMatrix': + """ + Execute the lazy computation plan and materialize the result. + + Args: + output_path: Path where the result should be stored + cache_size_tiles: Size of cache in tiles + + Returns: + OOCMatrix with the computed result + """ + result_matrix, buffer_manager = self._plan.compute(output_path, cache_size_tiles) + + return OOCMatrix(output_path, result_matrix.shape, + dtype=result_matrix.dtype, mode='r') + + def close(self): + """Close the underlying matrix file.""" + if self._matrix is not None: + self._matrix.close() + + def __repr__(self): + return f"OOCMatrix(path='{self.filepath}', shape={self.shape}, dtype={self.dtype.name})" + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.close() diff --git a/tests/test_operators.py b/tests/test_operators.py new file mode 100644 index 0000000..22f1a1b --- /dev/null +++ b/tests/test_operators.py @@ -0,0 +1,412 @@ +""" +Unit tests for the OOCMatrix operators API. +Tests the high-level NumPy-like API for out-of-core operations. +""" + +import unittest +import os +import tempfile +import shutil +import numpy as np +import sys + +# Add the parent directory to the path so we can import the paper module +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from paper.operators import OOCMatrix +from paper.config import TILE_SIZE + + +class TestOOCMatrix(unittest.TestCase): + """Test cases for OOCMatrix class.""" + + def setUp(self): + """Set up test fixtures before each test method.""" + self.test_dir = tempfile.mkdtemp() + self.test_shape = (512, 256) + self.test_dtype = np.float32 + self.test_filepath = os.path.join(self.test_dir, "test_matrix.bin") + + # Create a test matrix file with known values + self._create_test_matrix_file() + + def tearDown(self): + """Clean up after each test method.""" + if os.path.exists(self.test_dir): + shutil.rmtree(self.test_dir) + + def _create_test_matrix_file(self): + """Create a test matrix file with predictable values.""" + # Create matrix where element at (i,j) = i + j (simple for testing) + data = np.zeros(self.test_shape, dtype=self.test_dtype) + for i in range(self.test_shape[0]): + for j in range(self.test_shape[1]): + data[i, j] = i + j + data.tofile(self.test_filepath) + + def test_oocmatrix_creation(self): + """Test that OOCMatrix objects are created correctly.""" + matrix = OOCMatrix(self.test_filepath, self.test_shape, + dtype=self.test_dtype, mode='r') + + self.assertEqual(matrix.shape, self.test_shape) + self.assertEqual(matrix.dtype, self.test_dtype) + self.assertTrue(os.path.exists(matrix.filepath)) + + matrix.close() + + def test_oocmatrix_creation_with_create_flag(self): + """Test creating a new OOCMatrix file.""" + new_filepath = os.path.join(self.test_dir, "new_matrix.bin") + + matrix = OOCMatrix(new_filepath, (100, 50), dtype=np.float32, + mode='w+', create=True) + + self.assertTrue(os.path.exists(new_filepath)) + self.assertEqual(matrix.shape, (100, 50)) + + matrix.close() + + def test_iterate_blocks(self): + """Test iterating over matrix blocks.""" + matrix = OOCMatrix(self.test_filepath, self.test_shape, + dtype=self.test_dtype, mode='r') + + block_count = 0 + total_elements = 0 + + for block, (r_start, c_start) in matrix.iterate_blocks(): + block_count += 1 + total_elements += block.size + + # Verify block is a numpy array + self.assertIsInstance(block, np.ndarray) + + # Verify block dimensions are reasonable + self.assertLessEqual(block.shape[0], TILE_SIZE) + self.assertLessEqual(block.shape[1], TILE_SIZE) + + # Verify we iterated through all elements + expected_elements = self.test_shape[0] * self.test_shape[1] + self.assertEqual(total_elements, expected_elements) + self.assertGreater(block_count, 0) + + matrix.close() + + def test_blockwise_apply(self): + """Test applying a function to each block.""" + matrix = OOCMatrix(self.test_filepath, self.test_shape, + dtype=self.test_dtype, mode='r') + + # Apply a simple transformation: multiply by 2 + output_path = os.path.join(self.test_dir, "blockwise_result.bin") + result = matrix.blockwise_apply(lambda x: x * 2, output_path=output_path) + + self.assertEqual(result.shape, matrix.shape) + self.assertTrue(os.path.exists(output_path)) + + # Verify the transformation was applied + for block, (r_start, c_start) in result.iterate_blocks(): + # Original values were i + j, multiplied by 2 + for i in range(block.shape[0]): + for j in range(block.shape[1]): + expected = (r_start + i + c_start + j) * 2 + actual = block[i, j] + self.assertAlmostEqual(actual, expected, places=5) + + matrix.close() + result.close() + + def test_blockwise_reduce_sum(self): + """Test blockwise reduction with sum.""" + # Create a simple matrix with all ones + ones_path = os.path.join(self.test_dir, "ones.bin") + ones_data = np.ones((100, 50), dtype=np.float32) + ones_data.tofile(ones_path) + + matrix = OOCMatrix(ones_path, (100, 50), dtype=np.float32, mode='r') + + # Sum should be 100 * 50 = 5000 + total = matrix.blockwise_reduce(np.sum) + + expected = 100 * 50 + self.assertAlmostEqual(total, expected, places=2) + + matrix.close() + + def test_sum_method(self): + """Test the sum() convenience method.""" + # Create a simple matrix with all ones + ones_path = os.path.join(self.test_dir, "ones.bin") + ones_data = np.ones((100, 50), dtype=np.float32) + ones_data.tofile(ones_path) + + matrix = OOCMatrix(ones_path, (100, 50), dtype=np.float32, mode='r') + + total = matrix.sum() + expected = 100 * 50 + + self.assertAlmostEqual(total, expected, places=2) + + matrix.close() + + def test_mean_method(self): + """Test the mean() convenience method.""" + # Create a matrix with known mean + test_path = os.path.join(self.test_dir, "mean_test.bin") + test_data = np.full((100, 50), 5.0, dtype=np.float32) + test_data.tofile(test_path) + + matrix = OOCMatrix(test_path, (100, 50), dtype=np.float32, mode='r') + + mean_val = matrix.mean() + + self.assertAlmostEqual(mean_val, 5.0, places=5) + + matrix.close() + + def test_max_min_methods(self): + """Test the max() and min() convenience methods.""" + # Create a matrix with known min/max + test_path = os.path.join(self.test_dir, "minmax_test.bin") + test_data = np.arange(100 * 50, dtype=np.float32).reshape(100, 50) + test_data.tofile(test_path) + + matrix = OOCMatrix(test_path, (100, 50), dtype=np.float32, mode='r') + + max_val = matrix.max() + min_val = matrix.min() + + self.assertAlmostEqual(max_val, 100 * 50 - 1, places=5) + self.assertAlmostEqual(min_val, 0.0, places=5) + + matrix.close() + + def test_lazy_addition_operator(self): + """Test lazy addition using + operator.""" + # Create two matrices + path_a = os.path.join(self.test_dir, "A_add.bin") + path_b = os.path.join(self.test_dir, "B_add.bin") + + data_a = np.ones((100, 50), dtype=np.float32) * 2 + data_b = np.ones((100, 50), dtype=np.float32) * 3 + + data_a.tofile(path_a) + data_b.tofile(path_b) + + matrix_a = OOCMatrix(path_a, (100, 50), dtype=np.float32, mode='r') + matrix_b = OOCMatrix(path_b, (100, 50), dtype=np.float32, mode='r') + + # Lazy addition + result_lazy = matrix_a + matrix_b + + # This should not execute yet + self.assertIsNotNone(result_lazy._plan) + + # Execute the plan + output_path = os.path.join(self.test_dir, "result_add.bin") + result = result_lazy.compute(output_path) + + # Verify result + for block, _ in result.iterate_blocks(): + # All values should be 2 + 3 = 5 + self.assertTrue(np.allclose(block, 5.0)) + + matrix_a.close() + matrix_b.close() + result.close() + + def test_lazy_scalar_multiplication(self): + """Test lazy scalar multiplication using * operator.""" + path = os.path.join(self.test_dir, "scalar_test.bin") + data = np.ones((100, 50), dtype=np.float32) * 3 + data.tofile(path) + + matrix = OOCMatrix(path, (100, 50), dtype=np.float32, mode='r') + + # Lazy multiplication + result_lazy = matrix * 2 + + # Execute + output_path = os.path.join(self.test_dir, "result_scalar.bin") + result = result_lazy.compute(output_path) + + # Verify result + for block, _ in result.iterate_blocks(): + # All values should be 3 * 2 = 6 + self.assertTrue(np.allclose(block, 6.0)) + + matrix.close() + result.close() + + def test_matmul_with_custom_operation(self): + """Test matrix multiplication with custom operation.""" + # Create two small matrices + path_a = os.path.join(self.test_dir, "A_matmul.bin") + path_b = os.path.join(self.test_dir, "B_matmul.bin") + + # A is 50x30, B is 30x20 + shape_a = (50, 30) + shape_b = (30, 20) + + data_a = np.random.rand(*shape_a).astype(np.float32) + data_b = np.random.rand(*shape_b).astype(np.float32) + + data_a.tofile(path_a) + data_b.tofile(path_b) + + matrix_a = OOCMatrix(path_a, shape_a, dtype=np.float32, mode='r') + matrix_b = OOCMatrix(path_b, shape_b, dtype=np.float32, mode='r') + + # Perform matrix multiplication using numpy.dot + output_path = os.path.join(self.test_dir, "result_matmul.bin") + result = matrix_a.matmul(matrix_b, op=np.dot, output_path=output_path) + + # Verify shape + self.assertEqual(result.shape, (50, 20)) + + # Verify result by comparing with numpy + expected = np.dot(data_a, data_b) + + # Read back the result + result_data = np.memmap(output_path, dtype=np.float32, mode='r', shape=(50, 20)) + + self.assertTrue(np.allclose(result_data, expected, rtol=1e-5)) + + matrix_a.close() + matrix_b.close() + result.close() + + def test_lazy_matmul_operator(self): + """Test lazy matrix multiplication using @ operator.""" + # Create two small matrices + path_a = os.path.join(self.test_dir, "A_lazy_matmul.bin") + path_b = os.path.join(self.test_dir, "B_lazy_matmul.bin") + + shape_a = (40, 30) + shape_b = (30, 25) + + data_a = np.random.rand(*shape_a).astype(np.float32) + data_b = np.random.rand(*shape_b).astype(np.float32) + + data_a.tofile(path_a) + data_b.tofile(path_b) + + matrix_a = OOCMatrix(path_a, shape_a, dtype=np.float32, mode='r') + matrix_b = OOCMatrix(path_b, shape_b, dtype=np.float32, mode='r') + + # Lazy matmul + result_lazy = matrix_a @ matrix_b + + # Verify lazy evaluation + self.assertIsNotNone(result_lazy._plan) + self.assertEqual(result_lazy.shape, (40, 25)) + + # Execute + output_path = os.path.join(self.test_dir, "result_lazy_matmul.bin") + result = result_lazy.compute(output_path) + + # Verify shape + self.assertEqual(result.shape, (40, 25)) + + matrix_a.close() + matrix_b.close() + result.close() + + def test_context_manager(self): + """Test OOCMatrix as context manager.""" + with OOCMatrix(self.test_filepath, self.test_shape, + dtype=self.test_dtype, mode='r') as matrix: + # Should be able to use the matrix + self.assertEqual(matrix.shape, self.test_shape) + + # Iterate over blocks + block_count = 0 + for block, _ in matrix.iterate_blocks(): + block_count += 1 + + self.assertGreater(block_count, 0) + + # Matrix should be closed after exiting context + # (no error should occur) + + def test_blockwise_apply_with_numpy_operations(self): + """Test blockwise_apply with various NumPy operations.""" + matrix = OOCMatrix(self.test_filepath, self.test_shape, + dtype=self.test_dtype, mode='r') + + # Test normalization-like operation + output_path = os.path.join(self.test_dir, "normalized.bin") + result = matrix.blockwise_apply( + lambda x: (x - np.mean(x)) / (np.std(x) + 1e-8), + output_path=output_path + ) + + self.assertEqual(result.shape, matrix.shape) + self.assertTrue(os.path.exists(output_path)) + + matrix.close() + result.close() + + def test_std_method(self): + """Test the std() convenience method.""" + # Create a matrix with known std + test_path = os.path.join(self.test_dir, "std_test.bin") + # Use values that give a predictable std + test_data = np.array([1, 2, 3, 4, 5] * 20, dtype=np.float32).reshape(10, 10) + test_data.tofile(test_path) + + matrix = OOCMatrix(test_path, (10, 10), dtype=np.float32, mode='r') + + std_val = matrix.std() + expected_std = np.std(test_data) + + self.assertAlmostEqual(std_val, expected_std, places=4) + + matrix.close() + + +class TestOOCMatrixIntegration(unittest.TestCase): + """Integration tests for OOCMatrix with the existing Plan infrastructure.""" + + def setUp(self): + """Set up test fixtures.""" + self.test_dir = tempfile.mkdtemp() + + def tearDown(self): + """Clean up after each test.""" + if os.path.exists(self.test_dir): + shutil.rmtree(self.test_dir) + + def test_complex_expression_with_oocmatrix(self): + """Test a complex expression: (A + B) * 2.""" + path_a = os.path.join(self.test_dir, "A_complex.bin") + path_b = os.path.join(self.test_dir, "B_complex.bin") + + data_a = np.ones((100, 50), dtype=np.float32) * 2 + data_b = np.ones((100, 50), dtype=np.float32) * 3 + + data_a.tofile(path_a) + data_b.tofile(path_b) + + matrix_a = OOCMatrix(path_a, (100, 50), dtype=np.float32, mode='r') + matrix_b = OOCMatrix(path_b, (100, 50), dtype=np.float32, mode='r') + + # Build lazy expression + result_lazy = (matrix_a + matrix_b) * 2 + + # Execute + output_path = os.path.join(self.test_dir, "result_complex.bin") + result = result_lazy.compute(output_path) + + # Verify result: (2 + 3) * 2 = 10 + for block, _ in result.iterate_blocks(): + self.assertTrue(np.allclose(block, 10.0)) + + matrix_a.close() + matrix_b.close() + result.close() + + +if __name__ == '__main__': + unittest.main()