diff --git a/README.md b/README.md index 72e3b3ed4..cecf83f89 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,40 @@ for version in client.list_model_versions("my-model"): print(f"Version: {version.name}") ``` +### Manage features with Feast + +**Install Feast support:** +```bash +pip install 'kubeflow[feast]' +``` + +Feast is a feature store that enables offline retrieval of historical datasets and online serving of features/data for ML applications. + +The FeastClient provides simplified initialization. Access full Feast functionality through the `feature_store` property: + +```python +from kubeflow.feast import FeastClient + +# Initialize the Feast client with your feature repository path +client = FeastClient(repo_path="/path/to/feast/repo") + +# Access full Feast functionality through feature_store property +online_features = client.feature_store.get_online_features( + features=["feature_view:feature1", "feature_view:feature2"], + entity_rows=[{"entity_id": 1}, {"entity_id": 2}], +) + +# Materialize features to the online store +from datetime import datetime, timedelta +end_date = datetime.now() +start_date = end_date - timedelta(days=7) +client.feature_store.materialize(start_date=start_date, end_date=end_date) + +# List all feature views +for fv in client.feature_store.list_feature_views(): + print(f"Feature view: {fv.name}") +``` + ## Local Development Kubeflow Trainer client supports local development without needing a Kubernetes cluster. @@ -186,6 +220,7 @@ job_id = client.train(trainer=CustomTrainer(func=train_fn)) | **Kubeflow Trainer** | ✅ **Available** | v2.0.0+ | Train and fine-tune AI models with various frameworks | | **Kubeflow Katib** | ✅ **Available** | v0.19.0+ | Hyperparameter optimization | | **Kubeflow Model Registry** | ✅ **Available** | v0.3.0+ | Manage model artifacts, versions and ML artifacts metadata | +| **Feast** | ✅ **Available** | v0.59.0+ | Feature store for offline and online feature serving | | **Kubeflow Pipelines** | 🚧 Planned | TBD | Build, run, and track AI workflows | | **Kubeflow Spark Operator** | 🚧 Planned | TBD | Manage Spark applications for data processing and feature engineering | diff --git a/kubeflow/feast/__init__.py b/kubeflow/feast/__init__.py new file mode 100644 index 000000000..cef05f7f3 --- /dev/null +++ b/kubeflow/feast/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kubeflow.feast.api.feast_client import FeastClient + +__all__ = ["FeastClient"] diff --git a/kubeflow/feast/api/__init__.py b/kubeflow/feast/api/__init__.py new file mode 100644 index 000000000..48e3dcfaa --- /dev/null +++ b/kubeflow/feast/api/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/kubeflow/feast/api/feast_client.py b/kubeflow/feast/api/feast_client.py new file mode 100644 index 000000000..6f63696c3 --- /dev/null +++ b/kubeflow/feast/api/feast_client.py @@ -0,0 +1,101 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from feast import FeatureStore + + +class FeastClient: + """Client for Feast feature store operations. + + Feast is a feature store that enables offline retrieval of historical datasets + and online serving of features/data for ML applications. + + This is a minimal wrapper that provides simplified initialization. For full Feast + functionality, use the `feature_store` property to access the underlying FeatureStore. + + Requires the feast package to be installed. Install it with: + + pip install 'kubeflow[feast]' + + Example: + ```python + from kubeflow.feast import FeastClient + + # Initialize client + client = FeastClient(repo_path="/path/to/feast/repo") + + # Access full Feast functionality + client.feature_store.get_online_features(...) + client.feature_store.materialize(...) + ``` + """ + + def __init__(self, repo_path: str | None = None, config: dict[str, Any] | None = None): + """Initialize the FeastClient. + + Args: + repo_path: Path to the Feast repository. If not provided, uses the current directory. + config: Optional configuration dictionary for Feast FeatureStore. + If provided, takes precedence over repo_path. + + Raises: + ImportError: If feast is not installed. + """ + try: + from feast import FeatureStore + except ImportError as e: + raise ImportError( + "feast is not installed. Install it with:\n\n" # fmt: skip + " pip install 'kubeflow[feast]'\n" + ) from e + + if config is not None: + self._feature_store: FeatureStore = FeatureStore(config=config) + else: + self._feature_store: FeatureStore = FeatureStore(repo_path=repo_path) + + @property + def feature_store(self) -> FeatureStore: + """Access the underlying Feast FeatureStore instance. + + Use this property to access the full Feast API for operations like: + - get_online_features() / get_historical_features() + - materialize() / materialize_incremental() + - apply() - Deploy feature definitions + - list_feature_views() / list_entities() / list_data_sources() + + Returns: + The Feast FeatureStore instance. + + Example: + ```python + client = FeastClient(repo_path="/path/to/feast/repo") + + # Get online features + features = client.feature_store.get_online_features( + features=["feature_view:feature1"], + entity_rows=[{"entity_id": 1}], + ) + + # List feature views + for fv in client.feature_store.list_feature_views(): + print(fv.name) + ``` + """ + return self._feature_store diff --git a/kubeflow/feast/api/feast_client_test.py b/kubeflow/feast/api/feast_client_test.py new file mode 100644 index 000000000..82beb4236 --- /dev/null +++ b/kubeflow/feast/api/feast_client_test.py @@ -0,0 +1,237 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for FeastClient.""" + +from __future__ import annotations + +from datetime import datetime +from pathlib import Path +import tempfile +from unittest.mock import MagicMock + +import pytest + +from kubeflow.trainer.test.common import FAILED, SUCCESS, TestCase + + +@pytest.fixture(autouse=True) +def skip_if_no_feast(): + """Skip tests if feast not installed.""" + pytest.importorskip("feast") + + +@pytest.fixture +def mock_feast_store(): + """Create a mock FeatureStore.""" + store = MagicMock() + return store + + +@pytest.fixture +def client(mock_feast_store, monkeypatch): + """Create FeastClient with mock FeatureStore.""" + from kubeflow.feast.api.feast_client import FeastClient + + # Patch FeatureStore so __init__ uses the mock + monkeypatch.setattr("feast.FeatureStore", lambda **kwargs: mock_feast_store) + + return FeastClient() + + +@pytest.mark.parametrize( + "test_case", + [ + TestCase( + name="raises helpful ImportError when feast not installed", + expected_status=FAILED, + config={}, + expected_error=ImportError, + ), + ], +) +def test_init_import_error(test_case, monkeypatch): + """Test that __init__ raises helpful ImportError when feast missing.""" + + from kubeflow.feast.api.feast_client import FeastClient + + # Simulate missing feast by making import fail + def mock_import(name, *args, **kwargs): + if name == "feast": + raise ImportError("No module named 'feast'") + return __import__(name, *args, **kwargs) + + monkeypatch.setattr("builtins.__import__", mock_import) + + try: + FeastClient(**test_case.config) + assert test_case.expected_status == SUCCESS + except ImportError as e: + assert test_case.expected_status == FAILED + assert "pip install 'kubeflow[feast]'" in str(e) + + +@pytest.mark.parametrize( + "test_case", + [ + TestCase( + name="default initialization with no args", + expected_status=SUCCESS, + config={}, + expected_output={"repo_path": None, "config": None}, + ), + TestCase( + name="initialization with repo_path", + expected_status=SUCCESS, + config={"repo_path": "/tmp/feast-repo"}, + expected_output={"repo_path": "/tmp/feast-repo", "config": None}, + ), + TestCase( + name="initialization with config dict", + expected_status=SUCCESS, + config={"config": {"project": "test"}}, + expected_output={"repo_path": None, "config": {"project": "test"}}, + ), + ], +) +def test_init(test_case, monkeypatch): + """Test FeastClient initialization with different configurations.""" + + from kubeflow.feast.api.feast_client import FeastClient + + mock_feast_store_class = MagicMock() + mock_feast_store_instance = MagicMock() + mock_feast_store_class.return_value = mock_feast_store_instance + + monkeypatch.setattr("feast.FeatureStore", mock_feast_store_class) + + try: + client = FeastClient(**test_case.config) + + assert test_case.expected_status == SUCCESS + mock_feast_store_class.assert_called_once() + assert client._feature_store == mock_feast_store_instance + except Exception as e: + assert test_case.expected_status == FAILED + if hasattr(test_case, "expected_error"): + assert isinstance(e, test_case.expected_error) + + +def test_feature_store_property(client, mock_feast_store): + """Test feature_store property provides access to underlying FeatureStore.""" + assert client.feature_store == mock_feast_store + + +def test_feast_integration_with_local_setup(): + """Test Feast with a simple local setup. + + This test creates a minimal Feast feature store and validates basic operations + through the feature_store property. + """ + pytest.importorskip("feast") + pandas = pytest.importorskip("pandas") + + from feast.types import Float32, Int64 + + from feast import Entity, FeatureView, Field, FileSource + from kubeflow.feast.api.feast_client import FeastClient + + with tempfile.TemporaryDirectory() as temp_dir: + repo_path = Path(temp_dir) / "feature_repo" + repo_path.mkdir() + + # Create a simple feature_store.yaml + feature_store_yaml = repo_path / "feature_store.yaml" + feature_store_yaml.write_text( + """ +project: test_project +provider: local +registry: data/registry.db +online_store: + type: sqlite + path: data/online_store.db +""" + ) + + # Create a data directory + data_dir = repo_path / "data" + data_dir.mkdir() + + # Create sample data + sample_data = pandas.DataFrame( + { + "driver_id": [1001, 1002, 1003], + "event_timestamp": [ + datetime(2024, 1, 1, 12, 0, 0), + datetime(2024, 1, 1, 12, 0, 0), + datetime(2024, 1, 1, 12, 0, 0), + ], + "trips_today": [10, 15, 20], + "rating": [4.5, 4.8, 4.2], + } + ) + sample_data_path = data_dir / "driver_stats.parquet" + sample_data.to_parquet(sample_data_path) + + # Create feature definitions + driver = Entity(name="driver", join_keys=["driver_id"]) + + driver_stats_source = FileSource( + name="driver_stats_source", + path=str(sample_data_path), + timestamp_field="event_timestamp", + ) + + driver_stats_fv = FeatureView( + name="driver_stats", + entities=[driver], + schema=[ + Field(name="trips_today", dtype=Int64), + Field(name="rating", dtype=Float32), + ], + source=driver_stats_source, + ) + + # Initialize client + client = FeastClient(repo_path=str(repo_path)) + + # Use feature_store property to access full Feast functionality + client.feature_store.apply([driver, driver_stats_source, driver_stats_fv]) + + # Verify feature views + feature_views = client.feature_store.list_feature_views() + assert len(feature_views) == 1 + assert feature_views[0].name == "driver_stats" + + # Verify entities + entities = client.feature_store.list_entities() + assert len(entities) == 1 + assert entities[0].name == "driver" + + # Test online features retrieval after materialization + start_date = datetime(2024, 1, 1, 0, 0, 0) + end_date = datetime(2024, 1, 2, 0, 0, 0) + client.feature_store.materialize(start_date=start_date, end_date=end_date) + + # Get online features using feature_store + online_features = client.feature_store.get_online_features( + features=["driver_stats:trips_today", "driver_stats:rating"], + entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}], + ).to_dict() + + assert "trips_today" in online_features + assert "rating" in online_features + assert len(online_features["trips_today"]) == 2 + + print(f"✅ Feast integration test passed with {len(feature_views)} feature view(s)") diff --git a/pyproject.toml b/pyproject.toml index 1242b530a..98c47d93d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,9 @@ podman = [ hub = [ "model-registry>=0.3.0", ] +feast = [ + "feast>=0.59.0", +] [dependency-groups] dev = [