From 8034f0044b3a64810aaf5cd2febfdffbca99b8bc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 22 Jan 2026 14:42:25 +0000 Subject: [PATCH 1/8] Initial plan From 169fd9bf2ce2d2b5bc92815801cb627bf55744a5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 22 Jan 2026 14:47:26 +0000 Subject: [PATCH 2/8] feat(feast): add FeastClient integration with tests Co-authored-by: franciscojavierarceo <4163062+franciscojavierarceo@users.noreply.github.com> --- =0.59.0 | 0 kubeflow/feast/__init__.py | 17 ++ kubeflow/feast/api/__init__.py | 13 + kubeflow/feast/api/feast_client.py | 180 ++++++++++++ kubeflow/feast/api/feast_client_test.py | 346 ++++++++++++++++++++++++ pyproject.toml | 3 + 6 files changed, 559 insertions(+) create mode 100644 =0.59.0 create mode 100644 kubeflow/feast/__init__.py create mode 100644 kubeflow/feast/api/__init__.py create mode 100644 kubeflow/feast/api/feast_client.py create mode 100644 kubeflow/feast/api/feast_client_test.py diff --git a/=0.59.0 b/=0.59.0 new file mode 100644 index 000000000..e69de29bb diff --git a/kubeflow/feast/__init__.py b/kubeflow/feast/__init__.py new file mode 100644 index 000000000..cef05f7f3 --- /dev/null +++ b/kubeflow/feast/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kubeflow.feast.api.feast_client import FeastClient + +__all__ = ["FeastClient"] diff --git a/kubeflow/feast/api/__init__.py b/kubeflow/feast/api/__init__.py new file mode 100644 index 000000000..48e3dcfaa --- /dev/null +++ b/kubeflow/feast/api/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/kubeflow/feast/api/feast_client.py b/kubeflow/feast/api/feast_client.py new file mode 100644 index 000000000..6ead06913 --- /dev/null +++ b/kubeflow/feast/api/feast_client.py @@ -0,0 +1,180 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + import pandas as pd + + from feast import FeatureStore + + +class FeastClient: + """Client for Feast feature store operations. + + Feast is a feature store that enables offline retrieval of historical datasets + and online serving of features/data for ML applications. + + Requires the feast package to be installed. Install it with: + + pip install 'kubeflow[feast]' + + """ + + def __init__(self, repo_path: str | None = None, config: dict[str, Any] | None = None): + """Initialize the FeastClient. + + Args: + repo_path: Path to the Feast repository. If not provided, uses the current directory. + config: Optional configuration dictionary for Feast FeatureStore. + If provided, takes precedence over repo_path. + + Raises: + ImportError: If feast is not installed. + """ + try: + from feast import FeatureStore + except ImportError as e: + raise ImportError( + "feast is not installed. Install it with:\n\n" # fmt: skip + " pip install 'kubeflow[feast]'\n" + ) from e + + if config is not None: + self._store: FeatureStore = FeatureStore(config=config) + else: + self._store: FeatureStore = FeatureStore(repo_path=repo_path) + + def get_historical_features( + self, + entity_df: pd.DataFrame, + features: list[str], + full_feature_names: bool = False, + ) -> pd.DataFrame: + """Retrieve historical features for training datasets. + + Args: + entity_df: DataFrame with entity keys and timestamps. + features: List of feature references in the format "feature_view:feature_name". + full_feature_names: Whether to use full feature names in the output DataFrame. + + Returns: + DataFrame with historical feature values joined to the entity_df. + """ + return self._store.get_historical_features( + entity_df=entity_df, + features=features, + full_feature_names=full_feature_names, + ).to_df() + + def get_online_features( + self, + features: list[str], + entity_rows: list[dict[str, Any]], + full_feature_names: bool = False, + ) -> dict[str, list[Any]]: + """Retrieve online features for real-time inference. + + Args: + features: List of feature references in the format "feature_view:feature_name". + entity_rows: List of entity dictionaries with entity keys. + full_feature_names: Whether to use full feature names in the output. + + Returns: + Dictionary mapping feature names to lists of feature values. + """ + result = self._store.get_online_features( + features=features, + entity_rows=entity_rows, + full_feature_names=full_feature_names, + ) + return result.to_dict() + + def apply(self) -> None: + """Apply changes to the feature store. + + This method deploys all feature definitions to the feature store, + including feature views, entities, and data sources. + """ + self._store.apply([]) + + def materialize( + self, + start_date: Any, + end_date: Any, + feature_views: list[str] | None = None, + ) -> None: + """Materialize features into the online store. + + Args: + start_date: Start date for materialization (datetime or string). + end_date: End date for materialization (datetime or string). + feature_views: Optional list of feature view names to materialize. + If None, all feature views are materialized. + """ + self._store.materialize( + start_date=start_date, + end_date=end_date, + feature_views=feature_views, + ) + + def materialize_incremental(self, end_date: Any, feature_views: list[str] | None = None) -> None: + """Materialize features incrementally into the online store. + + This method materializes features from the last materialized state up to end_date. + + Args: + end_date: End date for materialization (datetime or string). + feature_views: Optional list of feature view names to materialize. + If None, all feature views are materialized. + """ + self._store.materialize_incremental( + end_date=end_date, + feature_views=feature_views, + ) + + def list_feature_views(self) -> list[Any]: + """List all feature views in the feature store. + + Returns: + List of feature view objects. + """ + return self._store.list_feature_views() + + def list_entities(self) -> list[Any]: + """List all entities in the feature store. + + Returns: + List of entity objects. + """ + return self._store.list_entities() + + def list_data_sources(self) -> list[Any]: + """List all data sources in the feature store. + + Returns: + List of data source objects. + """ + return self._store.list_data_sources() + + @property + def store(self) -> FeatureStore: + """Access the underlying Feast FeatureStore instance. + + Returns: + The Feast FeatureStore instance. + """ + return self._store diff --git a/kubeflow/feast/api/feast_client_test.py b/kubeflow/feast/api/feast_client_test.py new file mode 100644 index 000000000..1ac8c6b29 --- /dev/null +++ b/kubeflow/feast/api/feast_client_test.py @@ -0,0 +1,346 @@ +# Copyright 2025 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for FeastClient.""" + +from __future__ import annotations + +import tempfile +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock + +import pytest + +# Test constants +SUCCESS = "success" +FAILED = "Failed" + + +@dataclass +class TestCase: + """Test case configuration.""" + + name: str + expected_status: str = SUCCESS + config: dict[str, Any] = field(default_factory=dict) + expected_output: Any | None = None + expected_error: type[Exception] | None = None + __test__ = False + + +@pytest.fixture(autouse=True) +def skip_if_no_feast(): + """Skip tests if feast not installed.""" + pytest.importorskip("feast") + + +@pytest.fixture +def mock_feast_store(): + """Create a mock FeatureStore with all methods we wrap.""" + store = MagicMock() + # Set up return values for list methods to be iterable + store.list_feature_views.return_value = [] + store.list_entities.return_value = [] + store.list_data_sources.return_value = [] + return store + + +@pytest.fixture +def client(mock_feast_store, monkeypatch): + """Create FeastClient with mock FeatureStore.""" + from kubeflow.feast.api.feast_client import FeastClient + + # Patch FeatureStore so __init__ uses the mock + monkeypatch.setattr("feast.FeatureStore", lambda **kwargs: mock_feast_store) + + return FeastClient() + + +@pytest.mark.parametrize( + "test_case", + [ + TestCase( + name="raises helpful ImportError when feast not installed", + expected_status=FAILED, + config={}, + expected_error=ImportError, + ), + ], +) +def test_init_import_error(test_case, monkeypatch): + """Test that __init__ raises helpful ImportError when feast missing.""" + + from kubeflow.feast.api.feast_client import FeastClient + + # Simulate missing feast by making import fail + def mock_import(name, *args, **kwargs): + if name == "feast": + raise ImportError("No module named 'feast'") + return __import__(name, *args, **kwargs) + + monkeypatch.setattr("builtins.__import__", mock_import) + + try: + FeastClient(**test_case.config) + assert test_case.expected_status == SUCCESS + except ImportError as e: + assert test_case.expected_status == FAILED + assert "pip install 'kubeflow[feast]'" in str(e) + + +@pytest.mark.parametrize( + "test_case", + [ + TestCase( + name="default initialization with no args", + expected_status=SUCCESS, + config={}, + expected_output={"repo_path": None, "config": None}, + ), + TestCase( + name="initialization with repo_path", + expected_status=SUCCESS, + config={"repo_path": "/tmp/feast-repo"}, + expected_output={"repo_path": "/tmp/feast-repo", "config": None}, + ), + TestCase( + name="initialization with config dict", + expected_status=SUCCESS, + config={"config": {"project": "test"}}, + expected_output={"repo_path": None, "config": {"project": "test"}}, + ), + ], +) +def test_init(test_case, monkeypatch): + """Test FeastClient initialization with different configurations.""" + + from kubeflow.feast.api.feast_client import FeastClient + + mock_feast_store_class = MagicMock() + mock_feast_store_instance = MagicMock() + mock_feast_store_class.return_value = mock_feast_store_instance + + monkeypatch.setattr("feast.FeatureStore", mock_feast_store_class) + + try: + client = FeastClient(**test_case.config) + + assert test_case.expected_status == SUCCESS + mock_feast_store_class.assert_called_once() + assert client._store == mock_feast_store_instance + except Exception as e: + assert test_case.expected_status == FAILED + if hasattr(test_case, "expected_error"): + assert isinstance(e, test_case.expected_error) + + +def test_get_online_features(client, mock_feast_store): + """Test get_online_features method.""" + # Setup mock return value + mock_result = MagicMock() + mock_result.to_dict.return_value = { + "feature1": [1, 2, 3], + "feature2": ["a", "b", "c"], + } + mock_feast_store.get_online_features.return_value = mock_result + + features = ["feature_view:feature1", "feature_view:feature2"] + entity_rows = [{"entity_id": 1}, {"entity_id": 2}, {"entity_id": 3}] + + result = client.get_online_features(features=features, entity_rows=entity_rows) + + mock_feast_store.get_online_features.assert_called_once_with( + features=features, + entity_rows=entity_rows, + full_feature_names=False, + ) + assert result == {"feature1": [1, 2, 3], "feature2": ["a", "b", "c"]} + + +def test_list_feature_views(client, mock_feast_store): + """Test list_feature_views method.""" + mock_feature_views = [MagicMock(name="fv1"), MagicMock(name="fv2")] + mock_feast_store.list_feature_views.return_value = mock_feature_views + + result = client.list_feature_views() + + mock_feast_store.list_feature_views.assert_called_once() + assert result == mock_feature_views + + +def test_list_entities(client, mock_feast_store): + """Test list_entities method.""" + mock_entities = [MagicMock(name="entity1"), MagicMock(name="entity2")] + mock_feast_store.list_entities.return_value = mock_entities + + result = client.list_entities() + + mock_feast_store.list_entities.assert_called_once() + assert result == mock_entities + + +def test_list_data_sources(client, mock_feast_store): + """Test list_data_sources method.""" + mock_data_sources = [MagicMock(name="ds1"), MagicMock(name="ds2")] + mock_feast_store.list_data_sources.return_value = mock_data_sources + + result = client.list_data_sources() + + mock_feast_store.list_data_sources.assert_called_once() + assert result == mock_data_sources + + +def test_apply(client, mock_feast_store): + """Test apply method.""" + client.apply() + + mock_feast_store.apply.assert_called_once_with([]) + + +def test_materialize(client, mock_feast_store): + """Test materialize method.""" + start_date = datetime.now() - timedelta(days=7) + end_date = datetime.now() + + client.materialize(start_date=start_date, end_date=end_date) + + mock_feast_store.materialize.assert_called_once_with( + start_date=start_date, + end_date=end_date, + feature_views=None, + ) + + +def test_materialize_incremental(client, mock_feast_store): + """Test materialize_incremental method.""" + end_date = datetime.now() + + client.materialize_incremental(end_date=end_date) + + mock_feast_store.materialize_incremental.assert_called_once_with( + end_date=end_date, + feature_views=None, + ) + + +def test_store_property(client, mock_feast_store): + """Test store property.""" + assert client.store == mock_feast_store + + +def test_feast_integration_with_local_setup(): + """Test Feast with a simple local setup. + + This test creates a minimal Feast feature store and validates basic operations. + """ + pytest.importorskip("feast") + pandas = pytest.importorskip("pandas") + + from feast import Entity, FeatureView, Field, FileSource + from feast.types import Float32, Int64 + + from kubeflow.feast.api.feast_client import FeastClient + + with tempfile.TemporaryDirectory() as temp_dir: + repo_path = Path(temp_dir) / "feature_repo" + repo_path.mkdir() + + # Create a simple feature_store.yaml + feature_store_yaml = repo_path / "feature_store.yaml" + feature_store_yaml.write_text( + """ +project: test_project +provider: local +registry: data/registry.db +online_store: + type: sqlite + path: data/online_store.db +""" + ) + + # Create a data directory + data_dir = repo_path / "data" + data_dir.mkdir() + + # Create sample data + sample_data = pandas.DataFrame( + { + "driver_id": [1001, 1002, 1003], + "event_timestamp": [ + datetime(2024, 1, 1, 12, 0, 0), + datetime(2024, 1, 1, 12, 0, 0), + datetime(2024, 1, 1, 12, 0, 0), + ], + "trips_today": [10, 15, 20], + "rating": [4.5, 4.8, 4.2], + } + ) + sample_data_path = data_dir / "driver_stats.parquet" + sample_data.to_parquet(sample_data_path) + + # Create feature definitions + driver = Entity(name="driver", join_keys=["driver_id"]) + + driver_stats_source = FileSource( + name="driver_stats_source", + path=str(sample_data_path), + timestamp_field="event_timestamp", + ) + + driver_stats_fv = FeatureView( + name="driver_stats", + entities=[driver], + schema=[ + Field(name="trips_today", dtype=Int64), + Field(name="rating", dtype=Float32), + ], + source=driver_stats_source, + ) + + # Initialize client + client = FeastClient(repo_path=str(repo_path)) + + # Write feature definitions to store + store = client.store + store.apply([driver, driver_stats_source, driver_stats_fv]) + + # Verify feature views + feature_views = client.list_feature_views() + assert len(feature_views) == 1 + assert feature_views[0].name == "driver_stats" + + # Verify entities + entities = client.list_entities() + assert len(entities) == 1 + assert entities[0].name == "driver" + + # Test online features retrieval after materialization + start_date = datetime(2024, 1, 1, 0, 0, 0) + end_date = datetime(2024, 1, 2, 0, 0, 0) + client.materialize(start_date=start_date, end_date=end_date) + + # Get online features + online_features = client.get_online_features( + features=["driver_stats:trips_today", "driver_stats:rating"], + entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}], + ) + + assert "trips_today" in online_features + assert "rating" in online_features + assert len(online_features["trips_today"]) == 2 + + print(f"✅ Feast integration test passed with {len(feature_views)} feature view(s)") diff --git a/pyproject.toml b/pyproject.toml index 1242b530a..98c47d93d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,9 @@ podman = [ hub = [ "model-registry>=0.3.0", ] +feast = [ + "feast>=0.59.0", +] [dependency-groups] dev = [ From 0ef470f5bbeb6c35cd7ab189d08a3dab9babc110 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 22 Jan 2026 14:47:48 +0000 Subject: [PATCH 3/8] chore: remove spurious file --- =0.59.0 | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 =0.59.0 diff --git a/=0.59.0 b/=0.59.0 deleted file mode 100644 index e69de29bb..000000000 From 1fabeaf0223224c43dd5d5d89b9ceffcd8209f7a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 22 Jan 2026 14:48:44 +0000 Subject: [PATCH 4/8] chore: fix linting issues in feast module Co-authored-by: franciscojavierarceo <4163062+franciscojavierarceo@users.noreply.github.com> --- kubeflow/feast/api/feast_client.py | 4 +++- kubeflow/feast/api/feast_client_test.py | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/kubeflow/feast/api/feast_client.py b/kubeflow/feast/api/feast_client.py index 6ead06913..022a0701f 100644 --- a/kubeflow/feast/api/feast_client.py +++ b/kubeflow/feast/api/feast_client.py @@ -131,7 +131,9 @@ def materialize( feature_views=feature_views, ) - def materialize_incremental(self, end_date: Any, feature_views: list[str] | None = None) -> None: + def materialize_incremental( + self, end_date: Any, feature_views: list[str] | None = None + ) -> None: """Materialize features incrementally into the online store. This method materializes features from the last materialized state up to end_date. diff --git a/kubeflow/feast/api/feast_client_test.py b/kubeflow/feast/api/feast_client_test.py index 1ac8c6b29..603a469ce 100644 --- a/kubeflow/feast/api/feast_client_test.py +++ b/kubeflow/feast/api/feast_client_test.py @@ -16,10 +16,10 @@ from __future__ import annotations -import tempfile from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path +import tempfile from typing import Any from unittest.mock import MagicMock @@ -250,9 +250,9 @@ def test_feast_integration_with_local_setup(): pytest.importorskip("feast") pandas = pytest.importorskip("pandas") - from feast import Entity, FeatureView, Field, FileSource from feast.types import Float32, Int64 + from feast import Entity, FeatureView, Field, FileSource from kubeflow.feast.api.feast_client import FeastClient with tempfile.TemporaryDirectory() as temp_dir: From ae9384528ea1f004228ad740b660d1e268165b44 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 22 Jan 2026 14:50:42 +0000 Subject: [PATCH 5/8] refactor: use common test utilities from kubeflow.trainer.test Co-authored-by: franciscojavierarceo <4163062+franciscojavierarceo@users.noreply.github.com> --- kubeflow/feast/api/feast_client_test.py | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/kubeflow/feast/api/feast_client_test.py b/kubeflow/feast/api/feast_client_test.py index 603a469ce..ed783999b 100644 --- a/kubeflow/feast/api/feast_client_test.py +++ b/kubeflow/feast/api/feast_client_test.py @@ -16,30 +16,14 @@ from __future__ import annotations -from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path import tempfile -from typing import Any from unittest.mock import MagicMock import pytest -# Test constants -SUCCESS = "success" -FAILED = "Failed" - - -@dataclass -class TestCase: - """Test case configuration.""" - - name: str - expected_status: str = SUCCESS - config: dict[str, Any] = field(default_factory=dict) - expected_output: Any | None = None - expected_error: type[Exception] | None = None - __test__ = False +from kubeflow.trainer.test.common import FAILED, SUCCESS, TestCase @pytest.fixture(autouse=True) From 84d0fd9bb674348bbe6781657604029d8f16de10 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 22 Jan 2026 14:51:44 +0000 Subject: [PATCH 6/8] docs: add Feast documentation to README Co-authored-by: franciscojavierarceo <4163062+franciscojavierarceo@users.noreply.github.com> --- README.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/README.md b/README.md index 72e3b3ed4..d58a24c50 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,38 @@ for version in client.list_model_versions("my-model"): print(f"Version: {version.name}") ``` +### Manage features with Feast + +**Install Feast support:** +```bash +pip install 'kubeflow[feast]' +``` + +Feast is a feature store that enables offline retrieval of historical datasets and online serving of features/data for ML applications. + +```python +from kubeflow.feast import FeastClient + +# Initialize the Feast client with your feature repository path +client = FeastClient(repo_path="/path/to/feast/repo") + +# Get online features for real-time inference +online_features = client.get_online_features( + features=["feature_view:feature1", "feature_view:feature2"], + entity_rows=[{"entity_id": 1}, {"entity_id": 2}], +) + +# Materialize features to the online store +from datetime import datetime, timedelta +end_date = datetime.now() +start_date = end_date - timedelta(days=7) +client.materialize(start_date=start_date, end_date=end_date) + +# List all feature views +for fv in client.list_feature_views(): + print(f"Feature view: {fv.name}") +``` + ## Local Development Kubeflow Trainer client supports local development without needing a Kubernetes cluster. @@ -186,6 +218,7 @@ job_id = client.train(trainer=CustomTrainer(func=train_fn)) | **Kubeflow Trainer** | ✅ **Available** | v2.0.0+ | Train and fine-tune AI models with various frameworks | | **Kubeflow Katib** | ✅ **Available** | v0.19.0+ | Hyperparameter optimization | | **Kubeflow Model Registry** | ✅ **Available** | v0.3.0+ | Manage model artifacts, versions and ML artifacts metadata | +| **Feast** | ✅ **Available** | v0.59.0+ | Feature store for offline and online feature serving | | **Kubeflow Pipelines** | 🚧 Planned | TBD | Build, run, and track AI workflows | | **Kubeflow Spark Operator** | 🚧 Planned | TBD | Manage Spark applications for data processing and feature engineering | From 57e4400973e981f5a66b020cbcb629afed368b27 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 22 Jan 2026 14:53:31 +0000 Subject: [PATCH 7/8] fix: make apply method accept objects parameter Co-authored-by: franciscojavierarceo <4163062+franciscojavierarceo@users.noreply.github.com> --- kubeflow/feast/api/feast_client.py | 12 +++++++++--- kubeflow/feast/api/feast_client_test.py | 7 ++++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/kubeflow/feast/api/feast_client.py b/kubeflow/feast/api/feast_client.py index 022a0701f..9d3d10322 100644 --- a/kubeflow/feast/api/feast_client.py +++ b/kubeflow/feast/api/feast_client.py @@ -103,13 +103,19 @@ def get_online_features( ) return result.to_dict() - def apply(self) -> None: + def apply(self, objects: list[Any] | None = None) -> None: """Apply changes to the feature store. - This method deploys all feature definitions to the feature store, + This method deploys feature definitions to the feature store, including feature views, entities, and data sources. + + Args: + objects: List of Feast objects (Feature Views, Entities, Data Sources) to apply. + If None or empty list, applies all objects defined in the repository. """ - self._store.apply([]) + if objects is None: + objects = [] + self._store.apply(objects) def materialize( self, diff --git a/kubeflow/feast/api/feast_client_test.py b/kubeflow/feast/api/feast_client_test.py index ed783999b..f819c82fc 100644 --- a/kubeflow/feast/api/feast_client_test.py +++ b/kubeflow/feast/api/feast_client_test.py @@ -190,9 +190,14 @@ def test_list_data_sources(client, mock_feast_store): def test_apply(client, mock_feast_store): """Test apply method.""" + # Test with no objects client.apply() + mock_feast_store.apply.assert_called_with([]) - mock_feast_store.apply.assert_called_once_with([]) + # Test with objects + mock_objects = [MagicMock(), MagicMock()] + client.apply(objects=mock_objects) + mock_feast_store.apply.assert_called_with(mock_objects) def test_materialize(client, mock_feast_store): From e78cee211ccbbfb8339d8c09d7f155fea987a55a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 15:52:25 +0000 Subject: [PATCH 8/8] refactor: simplify FeastClient to minimal wrapper with feature_store property Co-authored-by: franciscojavierarceo <4163062+franciscojavierarceo@users.noreply.github.com> --- README.md | 10 +- kubeflow/feast/api/feast_client.py | 163 ++++++------------------ kubeflow/feast/api/feast_client_test.py | 130 +++---------------- 3 files changed, 60 insertions(+), 243 deletions(-) diff --git a/README.md b/README.md index d58a24c50..cecf83f89 100644 --- a/README.md +++ b/README.md @@ -165,14 +165,16 @@ pip install 'kubeflow[feast]' Feast is a feature store that enables offline retrieval of historical datasets and online serving of features/data for ML applications. +The FeastClient provides simplified initialization. Access full Feast functionality through the `feature_store` property: + ```python from kubeflow.feast import FeastClient # Initialize the Feast client with your feature repository path client = FeastClient(repo_path="/path/to/feast/repo") -# Get online features for real-time inference -online_features = client.get_online_features( +# Access full Feast functionality through feature_store property +online_features = client.feature_store.get_online_features( features=["feature_view:feature1", "feature_view:feature2"], entity_rows=[{"entity_id": 1}, {"entity_id": 2}], ) @@ -181,10 +183,10 @@ online_features = client.get_online_features( from datetime import datetime, timedelta end_date = datetime.now() start_date = end_date - timedelta(days=7) -client.materialize(start_date=start_date, end_date=end_date) +client.feature_store.materialize(start_date=start_date, end_date=end_date) # List all feature views -for fv in client.list_feature_views(): +for fv in client.feature_store.list_feature_views(): print(f"Feature view: {fv.name}") ``` diff --git a/kubeflow/feast/api/feast_client.py b/kubeflow/feast/api/feast_client.py index 9d3d10322..6f63696c3 100644 --- a/kubeflow/feast/api/feast_client.py +++ b/kubeflow/feast/api/feast_client.py @@ -17,8 +17,6 @@ from typing import TYPE_CHECKING, Any if TYPE_CHECKING: - import pandas as pd - from feast import FeatureStore @@ -28,10 +26,24 @@ class FeastClient: Feast is a feature store that enables offline retrieval of historical datasets and online serving of features/data for ML applications. + This is a minimal wrapper that provides simplified initialization. For full Feast + functionality, use the `feature_store` property to access the underlying FeatureStore. + Requires the feast package to be installed. Install it with: pip install 'kubeflow[feast]' + Example: + ```python + from kubeflow.feast import FeastClient + + # Initialize client + client = FeastClient(repo_path="/path/to/feast/repo") + + # Access full Feast functionality + client.feature_store.get_online_features(...) + client.feature_store.materialize(...) + ``` """ def __init__(self, repo_path: str | None = None, config: dict[str, Any] | None = None): @@ -54,135 +66,36 @@ def __init__(self, repo_path: str | None = None, config: dict[str, Any] | None = ) from e if config is not None: - self._store: FeatureStore = FeatureStore(config=config) + self._feature_store: FeatureStore = FeatureStore(config=config) else: - self._store: FeatureStore = FeatureStore(repo_path=repo_path) - - def get_historical_features( - self, - entity_df: pd.DataFrame, - features: list[str], - full_feature_names: bool = False, - ) -> pd.DataFrame: - """Retrieve historical features for training datasets. - - Args: - entity_df: DataFrame with entity keys and timestamps. - features: List of feature references in the format "feature_view:feature_name". - full_feature_names: Whether to use full feature names in the output DataFrame. - - Returns: - DataFrame with historical feature values joined to the entity_df. - """ - return self._store.get_historical_features( - entity_df=entity_df, - features=features, - full_feature_names=full_feature_names, - ).to_df() - - def get_online_features( - self, - features: list[str], - entity_rows: list[dict[str, Any]], - full_feature_names: bool = False, - ) -> dict[str, list[Any]]: - """Retrieve online features for real-time inference. - - Args: - features: List of feature references in the format "feature_view:feature_name". - entity_rows: List of entity dictionaries with entity keys. - full_feature_names: Whether to use full feature names in the output. - - Returns: - Dictionary mapping feature names to lists of feature values. - """ - result = self._store.get_online_features( - features=features, - entity_rows=entity_rows, - full_feature_names=full_feature_names, - ) - return result.to_dict() - - def apply(self, objects: list[Any] | None = None) -> None: - """Apply changes to the feature store. - - This method deploys feature definitions to the feature store, - including feature views, entities, and data sources. - - Args: - objects: List of Feast objects (Feature Views, Entities, Data Sources) to apply. - If None or empty list, applies all objects defined in the repository. - """ - if objects is None: - objects = [] - self._store.apply(objects) - - def materialize( - self, - start_date: Any, - end_date: Any, - feature_views: list[str] | None = None, - ) -> None: - """Materialize features into the online store. - - Args: - start_date: Start date for materialization (datetime or string). - end_date: End date for materialization (datetime or string). - feature_views: Optional list of feature view names to materialize. - If None, all feature views are materialized. - """ - self._store.materialize( - start_date=start_date, - end_date=end_date, - feature_views=feature_views, - ) - - def materialize_incremental( - self, end_date: Any, feature_views: list[str] | None = None - ) -> None: - """Materialize features incrementally into the online store. + self._feature_store: FeatureStore = FeatureStore(repo_path=repo_path) - This method materializes features from the last materialized state up to end_date. - - Args: - end_date: End date for materialization (datetime or string). - feature_views: Optional list of feature view names to materialize. - If None, all feature views are materialized. - """ - self._store.materialize_incremental( - end_date=end_date, - feature_views=feature_views, - ) - - def list_feature_views(self) -> list[Any]: - """List all feature views in the feature store. - - Returns: - List of feature view objects. - """ - return self._store.list_feature_views() + @property + def feature_store(self) -> FeatureStore: + """Access the underlying Feast FeatureStore instance. - def list_entities(self) -> list[Any]: - """List all entities in the feature store. + Use this property to access the full Feast API for operations like: + - get_online_features() / get_historical_features() + - materialize() / materialize_incremental() + - apply() - Deploy feature definitions + - list_feature_views() / list_entities() / list_data_sources() Returns: - List of entity objects. - """ - return self._store.list_entities() - - def list_data_sources(self) -> list[Any]: - """List all data sources in the feature store. + The Feast FeatureStore instance. - Returns: - List of data source objects. - """ - return self._store.list_data_sources() + Example: + ```python + client = FeastClient(repo_path="/path/to/feast/repo") - @property - def store(self) -> FeatureStore: - """Access the underlying Feast FeatureStore instance. + # Get online features + features = client.feature_store.get_online_features( + features=["feature_view:feature1"], + entity_rows=[{"entity_id": 1}], + ) - Returns: - The Feast FeatureStore instance. + # List feature views + for fv in client.feature_store.list_feature_views(): + print(fv.name) + ``` """ - return self._store + return self._feature_store diff --git a/kubeflow/feast/api/feast_client_test.py b/kubeflow/feast/api/feast_client_test.py index f819c82fc..82beb4236 100644 --- a/kubeflow/feast/api/feast_client_test.py +++ b/kubeflow/feast/api/feast_client_test.py @@ -16,7 +16,7 @@ from __future__ import annotations -from datetime import datetime, timedelta +from datetime import datetime from pathlib import Path import tempfile from unittest.mock import MagicMock @@ -34,12 +34,8 @@ def skip_if_no_feast(): @pytest.fixture def mock_feast_store(): - """Create a mock FeatureStore with all methods we wrap.""" + """Create a mock FeatureStore.""" store = MagicMock() - # Set up return values for list methods to be iterable - store.list_feature_views.return_value = [] - store.list_entities.return_value = [] - store.list_data_sources.return_value = [] return store @@ -125,116 +121,23 @@ def test_init(test_case, monkeypatch): assert test_case.expected_status == SUCCESS mock_feast_store_class.assert_called_once() - assert client._store == mock_feast_store_instance + assert client._feature_store == mock_feast_store_instance except Exception as e: assert test_case.expected_status == FAILED if hasattr(test_case, "expected_error"): assert isinstance(e, test_case.expected_error) -def test_get_online_features(client, mock_feast_store): - """Test get_online_features method.""" - # Setup mock return value - mock_result = MagicMock() - mock_result.to_dict.return_value = { - "feature1": [1, 2, 3], - "feature2": ["a", "b", "c"], - } - mock_feast_store.get_online_features.return_value = mock_result - - features = ["feature_view:feature1", "feature_view:feature2"] - entity_rows = [{"entity_id": 1}, {"entity_id": 2}, {"entity_id": 3}] - - result = client.get_online_features(features=features, entity_rows=entity_rows) - - mock_feast_store.get_online_features.assert_called_once_with( - features=features, - entity_rows=entity_rows, - full_feature_names=False, - ) - assert result == {"feature1": [1, 2, 3], "feature2": ["a", "b", "c"]} - - -def test_list_feature_views(client, mock_feast_store): - """Test list_feature_views method.""" - mock_feature_views = [MagicMock(name="fv1"), MagicMock(name="fv2")] - mock_feast_store.list_feature_views.return_value = mock_feature_views - - result = client.list_feature_views() - - mock_feast_store.list_feature_views.assert_called_once() - assert result == mock_feature_views - - -def test_list_entities(client, mock_feast_store): - """Test list_entities method.""" - mock_entities = [MagicMock(name="entity1"), MagicMock(name="entity2")] - mock_feast_store.list_entities.return_value = mock_entities - - result = client.list_entities() - - mock_feast_store.list_entities.assert_called_once() - assert result == mock_entities - - -def test_list_data_sources(client, mock_feast_store): - """Test list_data_sources method.""" - mock_data_sources = [MagicMock(name="ds1"), MagicMock(name="ds2")] - mock_feast_store.list_data_sources.return_value = mock_data_sources - - result = client.list_data_sources() - - mock_feast_store.list_data_sources.assert_called_once() - assert result == mock_data_sources - - -def test_apply(client, mock_feast_store): - """Test apply method.""" - # Test with no objects - client.apply() - mock_feast_store.apply.assert_called_with([]) - - # Test with objects - mock_objects = [MagicMock(), MagicMock()] - client.apply(objects=mock_objects) - mock_feast_store.apply.assert_called_with(mock_objects) - - -def test_materialize(client, mock_feast_store): - """Test materialize method.""" - start_date = datetime.now() - timedelta(days=7) - end_date = datetime.now() - - client.materialize(start_date=start_date, end_date=end_date) - - mock_feast_store.materialize.assert_called_once_with( - start_date=start_date, - end_date=end_date, - feature_views=None, - ) - - -def test_materialize_incremental(client, mock_feast_store): - """Test materialize_incremental method.""" - end_date = datetime.now() - - client.materialize_incremental(end_date=end_date) - - mock_feast_store.materialize_incremental.assert_called_once_with( - end_date=end_date, - feature_views=None, - ) - - -def test_store_property(client, mock_feast_store): - """Test store property.""" - assert client.store == mock_feast_store +def test_feature_store_property(client, mock_feast_store): + """Test feature_store property provides access to underlying FeatureStore.""" + assert client.feature_store == mock_feast_store def test_feast_integration_with_local_setup(): """Test Feast with a simple local setup. - This test creates a minimal Feast feature store and validates basic operations. + This test creates a minimal Feast feature store and validates basic operations + through the feature_store property. """ pytest.importorskip("feast") pandas = pytest.importorskip("pandas") @@ -303,30 +206,29 @@ def test_feast_integration_with_local_setup(): # Initialize client client = FeastClient(repo_path=str(repo_path)) - # Write feature definitions to store - store = client.store - store.apply([driver, driver_stats_source, driver_stats_fv]) + # Use feature_store property to access full Feast functionality + client.feature_store.apply([driver, driver_stats_source, driver_stats_fv]) # Verify feature views - feature_views = client.list_feature_views() + feature_views = client.feature_store.list_feature_views() assert len(feature_views) == 1 assert feature_views[0].name == "driver_stats" # Verify entities - entities = client.list_entities() + entities = client.feature_store.list_entities() assert len(entities) == 1 assert entities[0].name == "driver" # Test online features retrieval after materialization start_date = datetime(2024, 1, 1, 0, 0, 0) end_date = datetime(2024, 1, 2, 0, 0, 0) - client.materialize(start_date=start_date, end_date=end_date) + client.feature_store.materialize(start_date=start_date, end_date=end_date) - # Get online features - online_features = client.get_online_features( + # Get online features using feature_store + online_features = client.feature_store.get_online_features( features=["driver_stats:trips_today", "driver_stats:rating"], entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}], - ) + ).to_dict() assert "trips_today" in online_features assert "rating" in online_features