diff --git a/.env.example b/.env.example index d9a9e7e..285b0ed 100644 --- a/.env.example +++ b/.env.example @@ -1 +1,6 @@ -OPENAI_API_KEY=your_openai_api_key \ No newline at end of file +OPENAI_API_KEY=your_openai_api_key +LANGSMITH_TRACING=true +LANGSMITH_ENDPOINT=https://api.smith.langchain.com +LANGSMITH_API_KEY=your_langsmith_api_key +LANGSMITH_PROJECT=suntrace +GOOGLE_API_KEY=your_google_api_key \ No newline at end of file diff --git a/.gcloudignore b/.gcloudignore new file mode 100644 index 0000000..333eb9e --- /dev/null +++ b/.gcloudignore @@ -0,0 +1,2 @@ +!data/ +!data/** \ No newline at end of file diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 0000000..74ea215 --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,59 @@ +name: Build & Deploy to Cloud Run + +on: + push: + branches: + - main + pull_request: + branches: + - main + +permissions: + contents: read + id-token: write + +jobs: + deploy: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Authenticate to GCP via Google Cloud Service Account Key JSON. + uses: google-github-actions/auth@v2 + with: + credentials_json: '${{ secrets.GOOGLE_CREDENTIALS }}' + + - name: Set up gcloud SDK + uses: google-github-actions/setup-gcloud@v2 + with: + project_id: ${{ secrets.GCP_PROJECT_ID }} + + - name: Configure Docker for Artifact Registry + run: | + gcloud auth configure-docker ${{ secrets.GCP_REGION }}-docker.pkg.dev --quiet + + # ← NEW STEP: fetch your geojson folder from GCS into ./data + - name: Download GeoJSON data from GCS + run: | + mkdir -p data + gsutil -m cp -r gs://${{ secrets.GCS_BUCKET }}/geojson/data/* data/ + + - name: Build Docker image + run: | + IMAGE=${{ secrets.GCP_REGION }}-docker.pkg.dev/${{ secrets.GCP_PROJECT_ID }}/${{ secrets.GCP_PROJECT_REPO }}/${{ secrets.APP_NAME }}:${{ github.sha }} + docker build -t $IMAGE . + + - name: Push Docker image + run: | + IMAGE=${{ secrets.GCP_REGION }}-docker.pkg.dev/${{ secrets.GCP_PROJECT_ID }}/${{ secrets.GCP_PROJECT_REPO }}/${{ secrets.APP_NAME }}:${{ github.sha }} + docker push $IMAGE + + - name: Deploy to Cloud Run + uses: google-github-actions/deploy-cloudrun@v2 + with: + service: ${{ secrets.APP_NAME }} + image: ${{ secrets.GCP_REGION }}-docker.pkg.dev/${{ secrets.GCP_PROJECT_ID }}/${{ secrets.GCP_PROJECT_REPO }}/${{ secrets.APP_NAME }}:${{ github.sha }} + region: ${{ secrets.GCP_REGION }} + project_id: ${{ secrets.GCP_PROJECT_ID }} diff --git a/.gitignore b/.gitignore index dbd8f56..00fe7db 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ # Ignore venv +env/ venv/ .venv/ # Ignore Python cache files diff --git a/Dockerfile b/Dockerfile index 3827512..3ebea04 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,31 +1,43 @@ # Use slim Python 3.12 image FROM python:3.12-slim -# Set environment variables +# Don’t write .pyc files and force stdout/stderr to be unbuffered ENV PYTHONDONTWRITEBYTECODE=1 \ - PYTHONUNBUFFERED=1 + PYTHONUNBUFFERED=1 \ + # Tell rasterio/geopandas where GDAL lives + GDAL_CONFIG=/usr/bin/gdal-config \ + CPLUS_INCLUDE_PATH=/usr/include/gdal \ + C_INCLUDE_PATH=/usr/include/gdal # Set working directory WORKDIR /app -# Install system dependencies -RUN apt-get update && apt-get install -y --no-install-recommends \ +# Install build tools + GDAL + GEOS + PROJ headers +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ build-essential \ - gcc \ - libffi-dev \ - libssl-dev \ + python3-dev \ + gdal-bin \ + libgdal-dev \ + libgeos-dev \ + libproj-dev \ && rm -rf /var/lib/apt/lists/* # Install Python dependencies COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt +RUN pip install --upgrade pip wheel && \ + pip install --no-cache-dir -r requirements.txt # Copy the app code COPY . . +COPY ./start.sh /app/start.sh + # Expose the port FastAPI will run on -EXPOSE 8000 +ENV PORT 8080 +# EXPOSE 8000 # Run the app using Uvicorn -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] - +RUN chmod +x /app/start.sh +# CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] +ENTRYPOINT ["/app/start.sh"] diff --git a/README.md b/README.md index cc1957d..95606df 100644 --- a/README.md +++ b/README.md @@ -55,13 +55,30 @@ python main.py ### 4. Run in Docker ```sh +export OPENAI_API_KEY=your_openai_key docker build -t suntrace . -docker run -p 8000:8000 suntrace +docker run --rm -d \ + -p 8080:8080 \ + -e OPENAI_API_KEY="${OPENAI_API_KEY}" \ + --name suntrace \ + suntrace:latest +``` + +See logs + +```sh +docker logs -f suntracte +``` + +#### With docker compose + +```sh +docker-compose up -d --build ``` ### 5. Access Frontend -Open [http://localhost:8000](http://localhost:8000) in your browser. +Open [http://localhost:8080](http://localhost:8080) in your browser. ## Testing @@ -78,6 +95,20 @@ Create a `.env` file for secrets (e.g., OpenAI API key): OPENAI_API_KEY=your_openai_key ``` +## Deployment + +Make sure you have [**gcloud cli**](https://cloud.google.com/sdk/docs/install-sdk) installed and setup + +The app is deployed using [**Google Cloud Run**](https://cloud.google.com/run?hl=en) + +To deploy the application, run the commands below + +```sh +chmod +x bin/deploy +chmod +x start.sh +./bin/deploy +``` + ## Data Requirements Place required geospatial files in the `data/` directory. See [tests/TESTING.md](tests/TESTING.md) for details. diff --git a/bin/deploy b/bin/deploy new file mode 100755 index 0000000..48257c5 --- /dev/null +++ b/bin/deploy @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +set -euo pipefail + +export APP=suntrace +export PROJECT_ID=sb-gcp-project-01 +export REGION=europe-west1 +export REPO=suntrace-repo +export TAG=${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO}/${APP} + +# 1. Build & push through Cloud Build +gcloud builds submit --tag $TAG + + +# # 2. Deploy to Cloud Run +gcloud run deploy $APP \ + --image $TAG \ + --region $REGION \ + --platform managed \ + --allow-unauthenticated \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..a9d06fe --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,11 @@ +services: + suntrace: + container_name: suntrace + # image: suntrace:latest + build: + context: . + dockerfile: Dockerfile + ports: + - "8080:8080" + env_file: + - .env diff --git a/requirements.txt b/requirements.txt index e484ea9..b140e5e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ numpy==2.3.1 geopandas==1.1.1 dotenv==0.9.9 -openai==1.58.1 +openai==1.99.1 rasterio==1.4.3 earthengine_api==1.6.0 folium==0.20.0 diff --git a/start.sh b/start.sh new file mode 100755 index 0000000..66effa3 --- /dev/null +++ b/start.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +# Start the FastAPI application with Uvicorn +uvicorn main:app --host 0.0.0.0 --port ${PORT} --workers 1 \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index ee74bf9..b2b27bf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,9 @@ import os import sys -import pytest from pathlib import Path +import pytest + # Add src directory to Python path project_root = Path(__file__).parent.parent src_path = project_root @@ -13,16 +14,19 @@ if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) + @pytest.fixture(scope="session") def project_root_path(): """Return the project root path.""" return project_root + @pytest.fixture(scope="session") def data_dir_path(project_root_path): """Return the data directory path.""" return project_root_path / "data" + @pytest.fixture(scope="session") def sample_data_paths(data_dir_path): """Return paths to sample data files.""" @@ -30,10 +34,13 @@ def sample_data_paths(data_dir_path): "buildings": data_dir_path / "lamwo_buildings_V3.gpkg", "minigrids": data_dir_path / "updated_candidate_minigrids_merged.gpkg", "tile_stats": data_dir_path / "Lamwo_Tile_Stats_EE.csv", - "plain_tiles": data_dir_path / "lamwo_sentinel_composites" / "lamwo_grid.geojson", + "plain_tiles": data_dir_path + / "lamwo_sentinel_composites" + / "lamwo_grid.geojson", "sample_region": data_dir_path / "sample_region_mudu" / "mudu_village.gpkg", } + @pytest.fixture(scope="session") def check_data_files(sample_data_paths): """Check if required data files exist and skip tests if not.""" @@ -41,8 +48,8 @@ def check_data_files(sample_data_paths): for name, path in sample_data_paths.items(): if not path.exists(): missing_files.append(f"{name}: {path}") - + if missing_files: pytest.skip(f"Missing data files: {', '.join(missing_files)}") - + return sample_data_paths diff --git a/tests/test_api_buffer.py b/tests/test_api_buffer.py index 62337e4..c03049f 100644 --- a/tests/test_api_buffer.py +++ b/tests/test_api_buffer.py @@ -3,18 +3,20 @@ Simple API test for buffer functionality to verify the Flask endpoints work. """ -import requests import json + +import requests from shapely.geometry import Point from shapely.wkt import dumps as wkt_dumps BASE_URL = "http://127.0.0.1:5000" + def test_api_buffer(): """Test buffer functionality through API endpoints""" print("🌐 Testing Buffer API Endpoints") print("=" * 35) - + # Test 1: Check if server is running print("\n1️⃣ Checking API server...") try: @@ -28,74 +30,85 @@ def test_api_buffer(): print(f" ❌ Could not connect to API server: {e}") print(" πŸ’‘ Make sure Flask server is running: python src/app.py") return False - + # Test 2: Test buffer creation print("\n2️⃣ Testing buffer creation...") test_point = Point(32.8, 3.16) # Point in Lamwo, Uganda test_wkt = wkt_dumps(test_point) print(f" πŸ“ Test point: {test_wkt}") - - buffer_data = { - "geometry_wkt": test_wkt, - "radius_m": 1000 - } - + + buffer_data = {"geometry_wkt": test_wkt, "radius_m": 1000} + try: - response = requests.post(f"{BASE_URL}/api/buffer-feature", json=buffer_data, timeout=10) - + response = requests.post( + f"{BASE_URL}/api/buffer-feature", json=buffer_data, timeout=10 + ) + if response.status_code == 200: result = response.json() print(" βœ… Buffer creation successful!") - print(f" πŸ“ Buffer radius: {result['buffer_geojson']['properties']['radius_m']}m") - + print( + f" πŸ“ Buffer radius: {result['buffer_geojson']['properties']['radius_m']}m" + ) + # Store the buffered geometry for subsequent tests - buffered_wkt = result.get('buffered_geometry_wkt') - + buffered_wkt = result.get("buffered_geometry_wkt") + # Test 3: Query features within buffer print("\n3️⃣ Testing buffer queries...") - + # Test buildings query_data = { "query_type": "buildings", "geometry_wkt": buffered_wkt, - "radius_m": 1000 + "radius_m": 1000, } - response = requests.post(f"{BASE_URL}/api/query-buffer", json=query_data, timeout=10) + response = requests.post( + f"{BASE_URL}/api/query-buffer", json=query_data, timeout=10 + ) if response.status_code == 200: result = response.json() - building_count = result['result']['count'] + building_count = result["result"]["count"] print(f" 🏠 Buildings in buffer: {building_count}") else: - print(f" ⚠️ Building query returned {response.status_code}: {response.text}") - + print( + f" ⚠️ Building query returned {response.status_code}: {response.text}" + ) + # Test minigrids query_data = { "query_type": "minigrids", "geometry_wkt": buffered_wkt, - "radius_m": 1000 + "radius_m": 1000, } - response = requests.post(f"{BASE_URL}/api/query-buffer", json=query_data, timeout=10) + response = requests.post( + f"{BASE_URL}/api/query-buffer", json=query_data, timeout=10 + ) if response.status_code == 200: result = response.json() - minigrid_count = result['result']['count'] + minigrid_count = result["result"]["count"] print(f" ⚑ Minigrids in buffer: {minigrid_count}") else: - print(f" ⚠️ Minigrid query returned {response.status_code}: {response.text}") - + print( + f" ⚠️ Minigrid query returned {response.status_code}: {response.text}" + ) + # Test NDVI query_data = { "query_type": "ndvi", "geometry_wkt": buffered_wkt, - "radius_m": 1000 + "radius_m": 1000, } try: - response = requests.post(f"{BASE_URL}/api/query-buffer", json=query_data, timeout=10) + response = requests.post( + f"{BASE_URL}/api/query-buffer", json=query_data, timeout=10 + ) if response.status_code == 200: try: result = response.json() - avg_ndvi = result['result']['avg_ndvi'] + avg_ndvi = result["result"]["avg_ndvi"] print(f" 🌱 Average NDVI: {avg_ndvi:.3f}") - + print("\nπŸŽ‰ All API tests passed!") return True except json.JSONDecodeError as e: @@ -103,44 +116,48 @@ def test_api_buffer(): print(f" Raw response: {response.text}") return False else: - print(f" ⚠️ NDVI query returned {response.status_code}: {response.text}") + print( + f" ⚠️ NDVI query returned {response.status_code}: {response.text}" + ) return False except requests.exceptions.RequestException as e: print(f" ❌ NDVI request failed: {e}") return False - + except requests.exceptions.RequestException as e: print(f" ❌ API request failed: {e}") return False - + return True + def main(): success = test_api_buffer() - + if success: print("\nπŸŽ‰ Buffer API Functionality is Working!") print("\n" + "=" * 50) print("βœ… Core buffer_geometry method works") - print("βœ… API endpoints for buffer operations work") + print("βœ… API endpoints for buffer operations work") print("βœ… Session management works") print("βœ… Spatial queries on buffered areas work") - + print("\nπŸš€ Ready for Frontend Integration!") print("\nNext steps:") print("1. Add feature selection UI to map") print("2. Add buffer visualization") print("3. Connect buffer controls to chat interface") print("4. Test complete user workflow") - + print("\nπŸ’‘ Demo usage:") print("- Open http://127.0.0.1:5000 in your browser") - print("- Draw a polygon and ask: 'Create a 2km buffer'") + print("- Draw a polygon and ask: 'Create a 2km buffer'") print("- Then ask: 'How many buildings are in this buffer?'") - + else: print("\n❌ API buffer functionality needs fixes") print("Make sure the Flask server is running: python src/app.py") + if __name__ == "__main__": main() diff --git a/tests/test_buffer_functionality.py b/tests/test_buffer_functionality.py index cf9471d..b920157 100644 --- a/tests/test_buffer_functionality.py +++ b/tests/test_buffer_functionality.py @@ -3,22 +3,24 @@ Test script to verify buffer_geometry functionality works through the LLM interface. """ -import sys import os +import sys + sys.path.append(os.path.join(os.path.dirname(__file__))) -from utils.llm_function_caller import ask_with_functions -from utils.factory import create_geospatial_analyzer from shapely.geometry import Point +from utils.factory import create_geospatial_analyzer +from utils.llm_function_caller import ask_with_functions + def test_buffer_functionality(): """Test the buffer functionality through the LLM interface.""" - + print("πŸ§ͺ Testing buffer_geometry functionality...") - + # Create analyzer analyzer = create_geospatial_analyzer() - + # Test 1: Test buffer_geometry method directly print("\n1. Testing buffer_geometry method directly...") test_point = Point(32.8, 3.16) # Point in Lamwo, Uganda @@ -28,16 +30,16 @@ def test_buffer_functionality(): except Exception as e: print(f"❌ Direct buffer test failed: {e}") return False - + # Test 2: Test through LLM interface print("\n2. Testing through LLM interface...") - + # Note: For this test to work, you need OpenAI API key set - if not os.environ.get('OPENAI_API_KEY'): + if not os.environ.get("OPENAI_API_KEY"): print("⚠️ Skipping LLM test - OPENAI_API_KEY not set") print("βœ… Core buffer functionality works!") return True - + try: query = f"Create a 500 meter buffer around this point: {test_point.wkt}" response = ask_with_functions(query, analyzer) @@ -49,10 +51,11 @@ def test_buffer_functionality(): print("βœ… Core buffer functionality works! (LLM API issue)") return True return False - + print("βœ… All buffer tests passed!") return True + if __name__ == "__main__": success = test_buffer_functionality() if success: diff --git a/tests/test_geospatial_analyzer.py b/tests/test_geospatial_analyzer.py index 9b785e3..cafda7a 100644 --- a/tests/test_geospatial_analyzer.py +++ b/tests/test_geospatial_analyzer.py @@ -1,17 +1,17 @@ -import sys import os +import sys + import geopandas as gpd -from shapely.geometry import Polygon, Point +from shapely.geometry import Point, Polygon # Add the src directory to the Python path -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from utils.factory import create_geospatial_analyzer # Add the project root (one level up) to the Python path for configs -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from configs.paths import SAMPLE_REGION_PATH - # Define a sample region for testing (you may need to adjust coordinates) # These coordinates are arbitrary and likely outside your actual data extent. # You should use coordinates that are relevant to your data for meaningful tests. @@ -21,9 +21,12 @@ sample_gdf = gpd.read_file(SAMPLE_REGION_PATH) if sample_gdf.empty: - raise ValueError(f"Sample region shapefile at {SAMPLE_REGION_PATH} is empty or failed to load.") + raise ValueError( + f"Sample region shapefile at {SAMPLE_REGION_PATH} is empty or failed to load." + ) sample_polygon = sample_gdf.geometry.iloc[0] + def run_tests(): print("Initializing GeospatialAnalyzer...") try: @@ -39,17 +42,23 @@ def run_tests(): # 1. _load_and_validate_gdf (checked by successful initialization) print("\n1. _load_and_validate_gdf:") if not analyzer._buildings_gdf.empty: - print(f" Buildings loaded: {len(analyzer._buildings_gdf)} features. CRS: {analyzer._buildings_gdf.crs}") + print( + f" Buildings loaded: {len(analyzer._buildings_gdf)} features. CRS: {analyzer._buildings_gdf.crs}" + ) else: print(" Buildings GDF is empty or failed to load.") if not analyzer._minigrids_gdf.empty: - print(f" Minigrids loaded: {len(analyzer._minigrids_gdf)} features. CRS: {analyzer._minigrids_gdf.crs}") + print( + f" Minigrids loaded: {len(analyzer._minigrids_gdf)} features. CRS: {analyzer._minigrids_gdf.crs}" + ) else: print(" Minigrids GDF is empty or failed to load.") if not analyzer._plain_tiles_gdf.empty: - print(f" Plain tiles loaded: {len(analyzer._plain_tiles_gdf)} features. CRS: {analyzer._plain_tiles_gdf.crs}") + print( + f" Plain tiles loaded: {len(analyzer._plain_tiles_gdf)} features. CRS: {analyzer._plain_tiles_gdf.crs}" + ) else: print(" Plain tiles GDF is empty or failed to load.") @@ -57,7 +66,7 @@ def run_tests(): print("\n2. _load_and_process_tile_stats:") if not analyzer._tile_stats_gdf.empty: print(f" Tile stats loaded: {len(analyzer._tile_stats_gdf)} records.") - if 'ndvi_mean' in analyzer._tile_stats_gdf.columns: + if "ndvi_mean" in analyzer._tile_stats_gdf.columns: print(f" 'ndvi_mean' column found in tile_stats_gdf.") else: print(f" Warning: 'ndvi_mean' column NOT found in tile_stats_gdf.") @@ -67,11 +76,18 @@ def run_tests(): # 3. _merge_tile_data (checked by successful initialization) print("\n3. _merge_tile_data:") if not analyzer._joined_tiles_gdf.empty: - print(f" Joined tiles created: {len(analyzer._joined_tiles_gdf)} features. CRS: {analyzer._joined_tiles_gdf.crs}") - if 'ndvi_mean' in analyzer._joined_tiles_gdf.columns and 'geometry' in analyzer._joined_tiles_gdf.columns: + print( + f" Joined tiles created: {len(analyzer._joined_tiles_gdf)} features. CRS: {analyzer._joined_tiles_gdf.crs}" + ) + if ( + "ndvi_mean" in analyzer._joined_tiles_gdf.columns + and "geometry" in analyzer._joined_tiles_gdf.columns + ): print(f" 'ndvi_mean' and 'geometry' columns found in joined_tiles_gdf.") else: - print(f" Warning: 'ndvi_mean' or 'geometry' column NOT found in joined_tiles_gdf.") + print( + f" Warning: 'ndvi_mean' or 'geometry' column NOT found in joined_tiles_gdf." + ) else: print(" Joined tiles GDF is empty or failed to merge.") @@ -79,14 +95,24 @@ def run_tests(): print("\n4. _ensure_gdf_crs_for_calculation:") if not analyzer._buildings_gdf.empty: try: - buildings_metric = analyzer._check_and_reproject_gdf(analyzer._buildings_gdf.copy(), analyzer.target_metric_crs) - print(f" Buildings GDF reprojected/ensured to {analyzer.target_metric_crs}: {buildings_metric.crs}") - buildings_geo = analyzer._check_and_reproject_gdf(analyzer._buildings_gdf.copy(), analyzer.target_geographic_crs) - print(f" Buildings GDF reprojected/ensured to {analyzer.target_geographic_crs}: {buildings_geo.crs}") + buildings_metric = analyzer._check_and_reproject_gdf( + analyzer._buildings_gdf.copy(), analyzer.target_metric_crs + ) + print( + f" Buildings GDF reprojected/ensured to {analyzer.target_metric_crs}: {buildings_metric.crs}" + ) + buildings_geo = analyzer._check_and_reproject_gdf( + analyzer._buildings_gdf.copy(), analyzer.target_geographic_crs + ) + print( + f" Buildings GDF reprojected/ensured to {analyzer.target_geographic_crs}: {buildings_geo.crs}" + ) except Exception as e: print(f" Error testing _ensure_gdf_crs_for_calculation: {e}") else: - print(" Skipping _ensure_gdf_crs_for_calculation test as buildings GDF is empty.") + print( + " Skipping _ensure_gdf_crs_for_calculation test as buildings GDF is empty." + ) # 5. _ensure_crs_for_calculation (Harder to test in isolation without a GeoSeries/CRS-aware geometry) # This helper is typically used with geometries derived from GDFs. @@ -96,21 +122,33 @@ def run_tests(): try: sample_geom = analyzer._buildings_gdf.geometry.iloc[0] # Test reprojecting a single geometry - reprojected_geom, reprojected_flag = analyzer._prepare_geometry_for_crs(sample_geom, analyzer.target_metric_crs) - print(f" Sample geometry reprojected to metric CRS (reprojected: {reprojected_flag}). Original CRS was implicitly {analyzer._buildings_gdf.crs}") + reprojected_geom, reprojected_flag = analyzer._prepare_geometry_for_crs( + sample_geom, analyzer.target_metric_crs + ) + print( + f" Sample geometry reprojected to metric CRS (reprojected: {reprojected_flag}). Original CRS was implicitly {analyzer._buildings_gdf.crs}" + ) # Test with a geometry that might already be in the target CRS (less likely for initial geographic load) # Create a dummy GeoSeries with the target metric CRS import geopandas as gpd - temp_metric_geom = gpd.GeoSeries([Point(1,1)], crs=analyzer.target_metric_crs).iloc[0] - ensured_geom, reprojected_flag_metric = analyzer._prepare_geometry_for_crs(temp_metric_geom, analyzer.target_metric_crs) - print(f" Sample geometry already in metric CRS (reprojected: {reprojected_flag_metric})") + + temp_metric_geom = gpd.GeoSeries( + [Point(1, 1)], crs=analyzer.target_metric_crs + ).iloc[0] + ensured_geom, reprojected_flag_metric = analyzer._prepare_geometry_for_crs( + temp_metric_geom, analyzer.target_metric_crs + ) + print( + f" Sample geometry already in metric CRS (reprojected: {reprojected_flag_metric})" + ) except Exception as e: print(f" Error testing _ensure_crs_for_calculation: {e}") else: - print(" Skipping _ensure_crs_for_calculation test as buildings GDF is empty or has no CRS.") - + print( + " Skipping _ensure_crs_for_calculation test as buildings GDF is empty or has no CRS." + ) # --- Test Generic vector-counting primitive --- print("\n--- Testing Generic vector-counting primitive ---") @@ -123,12 +161,16 @@ def run_tests(): # We can make it more robust by creating it with a CRS if GeoPandas is available here. try: # Create a GeoSeries for the polygon with a defined CRS - region_gs = gpd.GeoSeries([sample_polygon], crs=analyzer.target_geographic_crs) # Explicitly WGS84 + region_gs = gpd.GeoSeries( + [sample_polygon], crs=analyzer.target_geographic_crs + ) # Explicitly WGS84 test_region_polygon = region_gs.iloc[0] print(f"Test region polygon CRS for count_features_within: {region_gs.crs}") except ImportError: - print("Geopandas not available for creating CRS-aware test polygon. Using raw Shapely polygon.") - test_region_polygon = sample_polygon # Fallback + print( + "Geopandas not available for creating CRS-aware test polygon. Using raw Shapely polygon." + ) + test_region_polygon = sample_polygon # Fallback # 1. Count buildings print("\n1. count_features_within (buildings):") @@ -141,7 +183,9 @@ def run_tests(): # 2. Count minigrids print("\n2. count_features_within (minigrids):") try: - minigrid_count = analyzer.count_features_within_region(test_region_polygon, 'minigrids') + minigrid_count = analyzer.count_features_within_region( + test_region_polygon, "minigrids" + ) print(f" Number of minigrids in sample region: {minigrid_count}") except Exception as e: print(f" Error counting minigrids: {e}") @@ -149,28 +193,41 @@ def run_tests(): # 3. Count tiles print("\n3. count_features_within (tiles):") try: - tile_count = analyzer.count_features_within_region(test_region_polygon, 'tiles') + tile_count = analyzer.count_features_within_region(test_region_polygon, "tiles") print(f" Number of tiles in sample region: {tile_count}") except Exception as e: print(f" Error counting tiles: {e}") # 4. Count tiles with a filter (e.g., NDVI_mean > 0.1) print("\n4. count_features_within (tiles with filter):") - if not analyzer._joined_tiles_gdf.empty and 'ndvi_mean' in analyzer._joined_tiles_gdf.columns: + if ( + not analyzer._joined_tiles_gdf.empty + and "ndvi_mean" in analyzer._joined_tiles_gdf.columns + ): try: # Ensure the filter is valid for your data. This is an example. - filtered_tile_count = analyzer.count_features_within_region(test_region_polygon, 'tiles', filter_expr="ndvi_mean > 0.1") - print(f" Number of tiles with NDVI > 0.1 in sample region: {filtered_tile_count}") + filtered_tile_count = analyzer.count_features_within_region( + test_region_polygon, "tiles", filter_expr="ndvi_mean > 0.1" + ) + print( + f" Number of tiles with NDVI > 0.1 in sample region: {filtered_tile_count}" + ) except Exception as e: print(f" Error counting filtered tiles: {e}") else: - print(" Skipping filtered tile count as joined_tiles_gdf is empty or missing 'ndvi_mean'.") + print( + " Skipping filtered tile count as joined_tiles_gdf is empty or missing 'ndvi_mean'." + ) # 5. Test with an invalid layer name print("\n5. count_features_within (invalid layer):") try: - invalid_count = analyzer.count_features_within_region(test_region_polygon, 'non_existent_layer') - print(f" Count for non_existent_layer: {invalid_count} (expected 0 and an error message)") + invalid_count = analyzer.count_features_within_region( + test_region_polygon, "non_existent_layer" + ) + print( + f" Count for non_existent_layer: {invalid_count} (expected 0 and an error message)" + ) except Exception as e: print(f" Error with invalid layer (as expected): {e}") @@ -181,9 +238,10 @@ def run_tests(): print(f" Tile IDs within sample region: {tile_ids}") except Exception as e: print(f" Error getting tile IDs within region: {e}") - + print("\n--- Tests Complete ---") + if __name__ == "__main__": run_tests() diff --git a/tests/test_integration.py b/tests/test_integration.py index 668d6f7..ca87bf0 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -2,11 +2,11 @@ Integration tests for GeospatialAnalyzer. These tests require actual data files and test the full workflow. """ -import pytest + import geopandas as gpd -from shapely.geometry import Point, Polygon import numpy as np - +import pytest +from shapely.geometry import Point, Polygon from utils.factory import create_geospatial_analyzer @@ -23,16 +23,16 @@ def analyzer_with_data(self, check_data_files): except Exception as e: pytest.skip(f"Could not create analyzer with real data: {e}") - @pytest.fixture(scope="class") + @pytest.fixture(scope="class") def test_region(self, sample_data_paths): """Load a test region from actual data.""" if not sample_data_paths["sample_region"].exists(): pytest.skip("Sample region file not found") - + sample_gdf = gpd.read_file(sample_data_paths["sample_region"]) if sample_gdf.empty: pytest.skip("Sample region is empty") - + return sample_gdf.geometry.iloc[0] def test_full_workflow_building_analysis(self, analyzer_with_data, test_region): @@ -41,29 +41,35 @@ def test_full_workflow_building_analysis(self, analyzer_with_data, test_region): building_count = analyzer_with_data.count_buildings_within_region(test_region) assert isinstance(building_count, int) assert building_count >= 0 - + # Test with different buffer sizes if building_count > 0: - buffered_region = analyzer_with_data.buffer_geometry(test_region, 500) # 500m buffer - buffered_count = analyzer_with_data.count_buildings_within_region(buffered_region) + buffered_region = analyzer_with_data.buffer_geometry( + test_region, 500 + ) # 500m buffer + buffered_count = analyzer_with_data.count_buildings_within_region( + buffered_region + ) assert buffered_count >= building_count # Should be at least as many def test_full_workflow_ndvi_analysis(self, analyzer_with_data, test_region): """Test complete workflow for NDVI analysis.""" - if (analyzer_with_data._joined_tiles_gdf.empty or - 'ndvi_mean' not in analyzer_with_data._joined_tiles_gdf.columns): + if ( + analyzer_with_data._joined_tiles_gdf.empty + or "ndvi_mean" not in analyzer_with_data._joined_tiles_gdf.columns + ): pytest.skip("No NDVI data available") # Test average NDVI avg_ndvi = analyzer_with_data.avg_ndvi(test_region) if not np.isnan(avg_ndvi): assert -1 <= avg_ndvi <= 1 - + # Test NDVI statistics ndvi_stats = analyzer_with_data.ndvi_stats(test_region) assert isinstance(ndvi_stats, dict) - assert 'NDVI_mean' in ndvi_stats - + assert "NDVI_mean" in ndvi_stats + # Test high NDVI building count if not analyzer_with_data._buildings_gdf.empty: high_ndvi_buildings = analyzer_with_data.count_high_ndvi_buildings( @@ -80,13 +86,13 @@ def test_full_workflow_minigrid_analysis(self, analyzer_with_data, test_region): # Test listing minigrids grid_list = analyzer_with_data.list_mini_grids() assert isinstance(grid_list, list) - + if grid_list: # Test getting site geometry site_geom = analyzer_with_data.get_site_geometry(grid_list[0]) if site_geom is not None: - assert hasattr(site_geom, 'bounds') - + assert hasattr(site_geom, "bounds") + # Test nearest minigrids center_point = test_region.centroid nearest = analyzer_with_data.nearest_mini_grids(center_point, k=3) @@ -97,10 +103,12 @@ def test_cross_layer_consistency(self, analyzer_with_data, test_region): """Test consistency across different data layers.""" # Get tile IDs in region tile_ids = analyzer_with_data.get_tile_ids_within_region(test_region) - + # Count tiles using generic method - tile_count = analyzer_with_data.count_features_within_region(test_region, 'tiles') - + tile_count = analyzer_with_data.count_features_within_region( + test_region, "tiles" + ) + # These should be consistent if tile_ids and tile_count > 0: assert len(tile_ids) == tile_count @@ -112,31 +120,35 @@ def test_spatial_accuracy(self, analyzer_with_data, test_region): # Test that buildings in buffered region >= buildings in original region original_count = analyzer_with_data.count_buildings_within_region(test_region) - + if original_count > 0: - buffered_region = analyzer_with_data.buffer_geometry(test_region, 100) # 100m buffer - buffered_count = analyzer_with_data.count_buildings_within_region(buffered_region) - + buffered_region = analyzer_with_data.buffer_geometry( + test_region, 100 + ) # 100m buffer + buffered_count = analyzer_with_data.count_buildings_within_region( + buffered_region + ) + assert buffered_count >= original_count def test_performance_with_real_data(self, analyzer_with_data, test_region): """Test performance with real data.""" import time - + start_time = time.time() - + # Run multiple operations building_count = analyzer_with_data.count_buildings_within_region(test_region) if not analyzer_with_data._joined_tiles_gdf.empty: avg_ndvi = analyzer_with_data.avg_ndvi(test_region) ndvi_stats = analyzer_with_data.ndvi_stats(test_region) - + end_time = time.time() execution_time = end_time - start_time - + # Should complete within reasonable time (adjust as needed) assert execution_time < 10 # 10 seconds max for basic operations - + print(f"Performance test completed in {execution_time:.2f} seconds") @pytest.mark.slow @@ -151,16 +163,20 @@ def test_large_region_analysis(self, analyzer_with_data): # Create region covering 50% of the bounding box x_range = bounds[2] - bounds[0] y_range = bounds[3] - bounds[1] - - large_region = Polygon([ - (bounds[0] + x_range * 0.25, bounds[1] + y_range * 0.25), - (bounds[2] - x_range * 0.25, bounds[1] + y_range * 0.25), - (bounds[2] - x_range * 0.25, bounds[3] - y_range * 0.25), - (bounds[0] + x_range * 0.25, bounds[3] - y_range * 0.25) - ]) - + + large_region = Polygon( + [ + (bounds[0] + x_range * 0.25, bounds[1] + y_range * 0.25), + (bounds[2] - x_range * 0.25, bounds[1] + y_range * 0.25), + (bounds[2] - x_range * 0.25, bounds[3] - y_range * 0.25), + (bounds[0] + x_range * 0.25, bounds[3] - y_range * 0.25), + ] + ) + # This should not crash and should return reasonable results - building_count = analyzer_with_data.count_buildings_within_region(large_region) + building_count = analyzer_with_data.count_buildings_within_region( + large_region + ) assert isinstance(building_count, int) assert building_count >= 0 @@ -169,19 +185,22 @@ def test_data_integrity_checks(self, analyzer_with_data): # Check that all loaded GeoDataFrames have valid CRS if not analyzer_with_data._buildings_gdf.empty: assert analyzer_with_data._buildings_gdf.crs is not None - + if not analyzer_with_data._minigrids_gdf.empty: assert analyzer_with_data._minigrids_gdf.crs is not None - + if not analyzer_with_data._plain_tiles_gdf.empty: assert analyzer_with_data._plain_tiles_gdf.crs is not None - + # Check that joined tiles have both geometry and stats if not analyzer_with_data._joined_tiles_gdf.empty: - assert 'geometry' in analyzer_with_data._joined_tiles_gdf.columns + assert "geometry" in analyzer_with_data._joined_tiles_gdf.columns # Should have at least some statistical columns - stat_cols = [col for col in analyzer_with_data._joined_tiles_gdf.columns - if any(stat in col.lower() for stat in ['ndvi', 'mean', 'std', 'med'])] + stat_cols = [ + col + for col in analyzer_with_data._joined_tiles_gdf.columns + if any(stat in col.lower() for stat in ["ndvi", "mean", "std", "med"]) + ] if stat_cols: # Only check if statistical columns exist assert len(stat_cols) > 0 @@ -189,13 +208,13 @@ def test_coordinate_system_consistency(self, analyzer_with_data): """Test that coordinate system handling is consistent.""" # All geographic data should be transformable to common CRS target_crs = "EPSG:4326" # WGS84 - + if not analyzer_with_data._buildings_gdf.empty: buildings_4326 = analyzer_with_data._ensure_gdf_crs_for_calculation( analyzer_with_data._buildings_gdf.copy(), target_crs ) assert buildings_4326.crs.to_string() == target_crs - + if not analyzer_with_data._minigrids_gdf.empty: minigrids_4326 = analyzer_with_data._ensure_gdf_crs_for_calculation( analyzer_with_data._minigrids_gdf.copy(), target_crs diff --git a/tests/test_utilities.py b/tests/test_utilities.py index 3256112..0f541fe 100644 --- a/tests/test_utilities.py +++ b/tests/test_utilities.py @@ -1,13 +1,14 @@ """ Unit tests for GeospatialAnalyzer utility methods. """ -import pytest + +from unittest.mock import Mock, patch + import geopandas as gpd +import numpy as np import pandas as pd +import pytest from shapely.geometry import Point, Polygon -from unittest.mock import Mock, patch -import numpy as np - from utils.GeospatialAnalyzer import GeospatialAnalyzer @@ -17,21 +18,21 @@ class TestGeospatialAnalyzerUtilities: @pytest.fixture def mock_minimal_analyzer(self): """Create a minimal analyzer for utility testing.""" - with patch('utils.GeospatialAnalyzer.gpd.read_file') as mock_read_file: - with patch('pandas.read_csv') as mock_read_csv: + with patch("utils.GeospatialAnalyzer.gpd.read_file") as mock_read_file: + with patch("pandas.read_csv") as mock_read_csv: # Mock empty GeoDataFrames empty_gdf = gpd.GeoDataFrame(geometry=[]) mock_read_file.return_value = empty_gdf - + # Mock empty DataFrame for CSV empty_df = pd.DataFrame() mock_read_csv.return_value = empty_df - + analyzer = GeospatialAnalyzer( buildings_path="mock.gpkg", - minigrids_path="mock.gpkg", + minigrids_path="mock.gpkg", tile_stats_path="mock.csv", - plain_tiles_path="mock.geojson" + plain_tiles_path="mock.geojson", ) return analyzer @@ -39,38 +40,38 @@ def test_buffer_geometry_with_geographic_crs(self, mock_minimal_analyzer): """Test buffering with geographic coordinates.""" # Create a point in geographic coordinates point = Point(-0.5, 0.5) # Lon, Lat - + # Mock the geometry to have geographic CRS - with patch.object(point, 'crs', "EPSG:4326"): + with patch.object(point, "crs", "EPSG:4326"): buffered = mock_minimal_analyzer.buffer_geometry(point, 1000) # 1km buffer - + assert buffered is not None - assert hasattr(buffered, 'area') + assert hasattr(buffered, "area") assert buffered.area > 0 def test_buffer_geometry_without_crs(self, mock_minimal_analyzer): """Test buffering without CRS information.""" point = Point(0, 0) - + buffered = mock_minimal_analyzer.buffer_geometry(point, 1000) - + assert buffered is not None - assert hasattr(buffered, 'area') + assert hasattr(buffered, "area") assert buffered.area > 0 def test_ensure_gdf_crs_for_calculation(self, mock_minimal_analyzer): """Test CRS ensuring for GeoDataFrames.""" # Create test GeoDataFrame - test_gdf = gpd.GeoDataFrame({ - 'geometry': [Point(0, 0), Point(1, 1)] - }, crs="EPSG:4326") - + test_gdf = gpd.GeoDataFrame( + {"geometry": [Point(0, 0), Point(1, 1)]}, crs="EPSG:4326" + ) + target_crs = "EPSG:32636" # UTM Zone 36N - + result_gdf = mock_minimal_analyzer._ensure_gdf_crs_for_calculation( test_gdf, target_crs ) - + assert result_gdf.crs.to_string() == target_crs assert len(result_gdf) == len(test_gdf) @@ -79,11 +80,11 @@ def test_ensure_crs_for_calculation_same_crs(self, mock_minimal_analyzer): # Create geometry that's already in target CRS target_crs = "EPSG:32636" test_geom = gpd.GeoSeries([Point(0, 0)], crs=target_crs).iloc[0] - - result_geom, was_reprojected = mock_minimal_analyzer._ensure_crs_for_calculation( - test_geom, target_crs + + result_geom, was_reprojected = ( + mock_minimal_analyzer._ensure_crs_for_calculation(test_geom, target_crs) ) - + assert not was_reprojected assert result_geom is not None @@ -91,13 +92,13 @@ def test_ensure_crs_for_calculation_different_crs(self, mock_minimal_analyzer): """Test CRS ensuring when geometry needs reprojection.""" source_crs = "EPSG:4326" target_crs = "EPSG:32636" - + test_geom = gpd.GeoSeries([Point(0, 0)], crs=source_crs).iloc[0] - - result_geom, was_reprojected = mock_minimal_analyzer._ensure_crs_for_calculation( - test_geom, target_crs + + result_geom, was_reprojected = ( + mock_minimal_analyzer._ensure_crs_for_calculation(test_geom, target_crs) ) - + assert was_reprojected assert result_geom is not None @@ -111,62 +112,62 @@ def test_initialization_with_missing_files(self): GeospatialAnalyzer( buildings_path="nonexistent.gpkg", minigrids_path="nonexistent.gpkg", - tile_stats_path="nonexistent.csv", - plain_tiles_path="nonexistent.geojson" + tile_stats_path="nonexistent.csv", + plain_tiles_path="nonexistent.geojson", ) def test_count_features_empty_gdf(self): """Test feature counting with empty GeoDataFrame.""" - with patch('utils.GeospatialAnalyzer.gpd.read_file') as mock_read_file: - with patch('pandas.read_csv') as mock_read_csv: + with patch("utils.GeospatialAnalyzer.gpd.read_file") as mock_read_file: + with patch("pandas.read_csv") as mock_read_csv: # Mock empty GeoDataFrames empty_gdf = gpd.GeoDataFrame(geometry=[], crs="EPSG:4326") mock_read_file.return_value = empty_gdf - + # Mock empty DataFrame for CSV empty_df = pd.DataFrame() mock_read_csv.return_value = empty_df - + analyzer = GeospatialAnalyzer( buildings_path="mock.gpkg", minigrids_path="mock.gpkg", tile_stats_path="mock.csv", - plain_tiles_path="mock.geojson" + plain_tiles_path="mock.geojson", ) - + test_region = Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]) - count = analyzer.count_features_within_region(test_region, 'buildings') - + count = analyzer.count_features_within_region(test_region, "buildings") + assert count == 0 def test_invalid_filter_expression(self): """Test handling of invalid filter expressions.""" - with patch('utils.GeospatialAnalyzer.gpd.read_file') as mock_read_file: - with patch('pandas.read_csv') as mock_read_csv: + with patch("utils.GeospatialAnalyzer.gpd.read_file") as mock_read_file: + with patch("pandas.read_csv") as mock_read_csv: # Create mock GeoDataFrame with data - mock_gdf = gpd.GeoDataFrame({ - 'geometry': [Point(0, 0), Point(1, 1)], - 'test_col': [1, 2] - }, crs="EPSG:4326") + mock_gdf = gpd.GeoDataFrame( + {"geometry": [Point(0, 0), Point(1, 1)], "test_col": [1, 2]}, + crs="EPSG:4326", + ) mock_read_file.return_value = mock_gdf - + mock_df = pd.DataFrame() mock_read_csv.return_value = mock_df - + analyzer = GeospatialAnalyzer( buildings_path="mock.gpkg", minigrids_path="mock.gpkg", tile_stats_path="mock.csv", - plain_tiles_path="mock.geojson" + plain_tiles_path="mock.geojson", ) - + test_region = Polygon([(0, 0), (2, 0), (2, 2), (0, 2)]) - + # Test with invalid column name in filter count = analyzer.count_features_within_region( - test_region, 'buildings', filter_expr="nonexistent_col > 0" + test_region, "buildings", filter_expr="nonexistent_col > 0" ) - + # Should return 0 due to error handling assert count == 0 @@ -176,12 +177,12 @@ class TestGeospatialAnalyzerDataValidation: def test_load_and_validate_gdf_valid_file(self): """Test loading valid GeoDataFrame.""" - with patch('utils.GeospatialAnalyzer.gpd.read_file') as mock_read: - valid_gdf = gpd.GeoDataFrame({ - 'geometry': [Point(0, 0), Point(1, 1)] - }, crs="EPSG:4326") + with patch("utils.GeospatialAnalyzer.gpd.read_file") as mock_read: + valid_gdf = gpd.GeoDataFrame( + {"geometry": [Point(0, 0), Point(1, 1)]}, crs="EPSG:4326" + ) mock_read.return_value = valid_gdf - + # This would be called during initialization # We're testing the internal logic here result = gpd.read_file("mock_path.gpkg") @@ -190,47 +191,54 @@ def test_load_and_validate_gdf_valid_file(self): def test_load_and_validate_gdf_empty_file(self): """Test loading empty GeoDataFrame.""" - with patch('utils.GeospatialAnalyzer.gpd.read_file') as mock_read: + with patch("utils.GeospatialAnalyzer.gpd.read_file") as mock_read: empty_gdf = gpd.GeoDataFrame(geometry=[], crs="EPSG:4326") mock_read.return_value = empty_gdf - + result = gpd.read_file("mock_path.gpkg") assert result.empty def test_tile_stats_processing(self): """Test tile statistics processing.""" - with patch('pandas.read_csv') as mock_csv: + with patch("pandas.read_csv") as mock_csv: # Mock CSV with tile statistics - mock_stats = pd.DataFrame({ - 'tile_id': [1, 2, 3], - 'ndvi_mean': [0.3, 0.5, 0.7], - 'ndvi_med': [0.25, 0.48, 0.68], - 'ndvi_std': [0.1, 0.15, 0.12] - }) + mock_stats = pd.DataFrame( + { + "tile_id": [1, 2, 3], + "ndvi_mean": [0.3, 0.5, 0.7], + "ndvi_med": [0.25, 0.48, 0.68], + "ndvi_std": [0.1, 0.15, 0.12], + } + ) mock_csv.return_value = mock_stats - + result = pd.read_csv("mock_stats.csv") - + assert not result.empty - assert 'ndvi_mean' in result.columns + assert "ndvi_mean" in result.columns assert len(result) == 3 - assert result['ndvi_mean'].min() >= 0 - assert result['ndvi_mean'].max() <= 1 - - -@pytest.mark.parametrize("crs_input,expected_output", [ - ("EPSG:4326", "EPSG:4326"), - ("EPSG:32636", "EPSG:32636"), - (4326, "EPSG:4326"), -]) + assert result["ndvi_mean"].min() >= 0 + assert result["ndvi_mean"].max() <= 1 + + +@pytest.mark.parametrize( + "crs_input,expected_output", + [ + ("EPSG:4326", "EPSG:4326"), + ("EPSG:32636", "EPSG:32636"), + (4326, "EPSG:4326"), + ], +) def test_crs_handling_parametrized(crs_input, expected_output): """Parametrized test for CRS handling.""" # This tests different CRS input formats import pyproj - + try: crs = pyproj.CRS(crs_input) - assert crs.to_string() == expected_output or crs.to_epsg() == int(expected_output.split(':')[1]) + assert crs.to_string() == expected_output or crs.to_epsg() == int( + expected_output.split(":")[1] + ) except Exception: pytest.skip(f"Invalid CRS input: {crs_input}")