diff --git a/scripts/sensitivity_analysis_refactored.py b/scripts/sensitivity_analysis_refactored.py new file mode 100644 index 0000000..f4d3897 --- /dev/null +++ b/scripts/sensitivity_analysis_refactored.py @@ -0,0 +1,695 @@ +#!/usr/bin/env python3 +""" +Refactored Sensitivity Analysis Script for Water Access Mobility Model + +This script performs sensitivity analysis on the mobility model for water access research. +Key improvements from original: +- Removed confusing "water_ration_kms" metric +- Reports simple one-way distance to water source (matching global model) +- Fixed water carrying capacity at 15L for all vehicles +- Water requirement per person is now a sensitivity parameter +- Clearer metric definitions and documentation +""" + +import os +import sys +from pathlib import Path +import pandas as pd +import numpy as np +import plotly +import plotly.express as px +from plotly.subplots import make_subplots +import plotly.graph_objects as go +import seaborn as sns +from typing import Dict, List, Tuple, Optional +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + +# Setup path to import custom modules +notebook_path = Path(__file__).resolve().parent +project_root = notebook_path.parent +sys.path.append(str(project_root)) + +# Import custom modules +import src.mobility_module as mm +import src.plotting_tools_water_access as pt + +# Set seaborn theme +sns.set_theme() + + +class RefactoredSensitivityAnalyzer: + """ + Performs sensitivity analysis on mobility models for water access research. + + This refactored version uses simplified, clear metrics that match the global model. + """ + + def __init__(self, model_type: str = "Lankford"): + """ + Initialize the sensitivity analyzer. + + Args: + model_type: Either "Lankford" for walking or "Martin" for cycling + """ + self.model_type = model_type + self.model_number = 3 if model_type == "Lankford" else 2 + self.filter_value = "Buckets" if model_type == "Lankford" else "Bicycle" + + # Fixed water parameters + self.fixed_water_capacity = 15.0 # L (standardized for all vehicles) + self.default_water_requirement = 15.0 # L per person per day + + # Graph colors for consistent plotting + self.graph_colours = [ + "#3D87CB", + "#F0B323", + "#DC582A", + "#674230", + "#3A913F", + "#75787B", + ] + + # Results storage + self.full_result_dict = {} + self.plot_dict = {} + self.df_large = pd.DataFrame() + + logger.info(f"Initialized RefactoredSensitivityAnalyzer for {model_type} model") + logger.info(f"Fixed water capacity: {self.fixed_water_capacity}L") + + def load_data(self) -> Tuple[pd.DataFrame, pd.DataFrame]: + """ + Load parameter data and sensitivity variables from CSV files. + + Returns: + Tuple of (filtered_param_df, sensitivity_df) + """ + try: + # File paths (relative to project root) + project_root = Path(__file__).resolve().parent.parent + file_path_params = ( + project_root / "data/lookup tables/mobility-model-parameters.csv" + ) + file_path_sens_vars = ( + project_root / "data/lookup tables/Sensitivity Analysis Variables.csv" + ) + + # Load data + all_hpv_param_df = pd.read_csv(file_path_params) + sens_df = pd.read_csv(file_path_sens_vars) + + # Filter parameters for the selected model + param_df = all_hpv_param_df.loc[ + all_hpv_param_df["Name"] == self.filter_value + ] + + # Add water requirement as a sensitivity parameter if not present + if "Water Requirement" not in sens_df["Short Name"].values: + water_req_row = pd.DataFrame( + { + "Short Name": ["Water Requirement"], + "Long Name": ["Daily water requirement per person"], + "Units": ["L/day"], + "Default Value": [15.0], + "Expected Min": [10.0], + "Expected Max": [20.0], + "Plotting Min": [5.0], + "Plotting Max": [25.0], + } + ) + sens_df = pd.concat([sens_df, water_req_row], ignore_index=True) + + logger.info( + f"Loaded {len(param_df)} parameter records and {len(sens_df)} sensitivity variables" + ) + return param_df, sens_df + + except Exception as e: + logger.error(f"Error loading data: {e}") + raise + + def initialize_model_components(self, param_df: pd.DataFrame) -> Tuple: + """ + Initialize model components with standardized water capacity. + """ + try: + # Initialize model components + mo = mm.model_options() + mv = mm.model_variables() + met = mm.MET_values( + mv, country_weight=60, met=4.5, use_country_specific_weights=False + ) + hpv = mm.HPV_variables(param_df, mv) + mr = mm.model_results(hpv, mo) + + # Set model selection + mo.model_selection = self.model_number + + # Override load capacity to fixed water capacity + # This ensures all vehicles carry the same amount of water + hpv.load_limit = np.array([[[self.fixed_water_capacity]]]) + hpv.practical_limit = np.array([[[self.fixed_water_capacity]]]) + + logger.info("Initialized model components with fixed water capacity") + return mo, mv, met, hpv, mr + + except Exception as e: + logger.error(f"Error initializing model components: {e}") + raise + + def calculate_metrics(self, velocities: List[Dict], mv) -> Dict[str, float]: + """ + Calculate clear, simple metrics from velocity results. + + Args: + velocities: List of velocity results from model runs + mv: Model variables + + Returns: + Dictionary of calculated metrics + """ + # Extract valid velocities + valid_velocities = [v for v in velocities if not np.isnan(v["avg_velocity"])] + + if not valid_velocities: + return { + "one_way_distance_km": np.nan, + "round_trip_distance_km": np.nan, + "avg_velocity_m_s": np.nan, + "trips_per_day": np.nan, + "time_per_trip_hours": np.nan, + "daily_water_collected_L": np.nan, + } + + # Calculate average velocity across slopes + avg_velocity = np.mean([v["avg_velocity"] for v in valid_velocities]) + + # Calculate one-way distance to water source (matching global model) + one_way_distance_km = avg_velocity * mv.t_hours * 3600 / 2 / 1000 + + # Calculate round-trip distance + round_trip_distance_km = one_way_distance_km * 2 + + # Calculate trips needed per day + trips_per_day = self.default_water_requirement / self.fixed_water_capacity + + # Time per round trip + time_per_trip_hours = ( + round_trip_distance_km / (avg_velocity * 3.6) + if avg_velocity > 0 + else np.inf + ) + + # Total daily water that can be collected in available time + max_trips_in_time = ( + mv.t_hours / time_per_trip_hours if time_per_trip_hours > 0 else 0 + ) + daily_water_collected = min( + max_trips_in_time * self.fixed_water_capacity, + self.default_water_requirement, + ) + + return { + "one_way_distance_km": one_way_distance_km, + "round_trip_distance_km": round_trip_distance_km, + "avg_velocity_m_s": avg_velocity, + "trips_per_day": trips_per_day, + "time_per_trip_hours": time_per_trip_hours, + "daily_water_collected_L": daily_water_collected, + } + + def run_direct_model_calls(self, mv, mo, met, hpv) -> List[Dict]: + """ + Run direct mobility model calls like the global model does. + """ + # Test slopes (matching global model approach) + test_slopes = [0, 1, 2, 3, 4, 5] # Extended range for better representation + + results = [] + + for slope in test_slopes: + try: + if mo.model_selection == 2: # Cycling (Martin model) + result = mm.mobility_models.single_bike_run( + mv, mo, hpv, slope, self.fixed_water_capacity + ) + if isinstance(result, tuple) and len(result) == 3: + loaded_velocity, unloaded_velocity, max_load = result + else: + logger.error( + f"Unexpected result format from single_bike_run: {type(result)}, {result}" + ) + continue + elif mo.model_selection == 3: # Walking (Lankford model) + result = mm.mobility_models.single_lankford_run( + mv, mo, met, hpv, slope, self.fixed_water_capacity + ) + if isinstance(result, tuple) and len(result) == 3: + loaded_velocity, unloaded_velocity, max_load = result + else: + logger.error( + f"Unexpected result format from single_lankford_run: {type(result)}, {result}" + ) + continue + else: + logger.warning(f"Unknown model selection: {mo.model_selection}") + continue + + # Ensure scalar values + loaded_velocity = ( + float(loaded_velocity) if not np.isnan(loaded_velocity) else np.nan + ) + unloaded_velocity = ( + float(unloaded_velocity) + if not np.isnan(unloaded_velocity) + else np.nan + ) + max_load = float(max_load) if not np.isnan(max_load) else np.nan + + # Calculate average velocity (like global model) + avg_velocity = (loaded_velocity + unloaded_velocity) / 2 + + results.append( + { + "slope": slope, + "loaded_velocity": loaded_velocity, + "unloaded_velocity": unloaded_velocity, + "avg_velocity": avg_velocity, + "max_load": max_load, + } + ) + + except Exception as e: + logger.error(f"Error in direct model call for slope {slope}: {e}") + results.append( + { + "slope": slope, + "loaded_velocity": np.nan, + "unloaded_velocity": np.nan, + "avg_velocity": np.nan, + "max_load": np.nan, + } + ) + + return results + + def apply_sensitivity_parameter( + self, var_string: str, var_test: float, hpv, mv, mo, met + ) -> Tuple: + """ + Apply sensitivity parameter to model components. + Updated to include water requirement as a parameter. + """ + if var_string == "Coefficient of Rolling Resistance": + hpv.Crr = np.array([[[var_test]]]) + elif var_string == "Water Requirement": + # This affects how many trips are needed, not the mobility calculation + self.default_water_requirement = var_test + elif var_string == "Practical Limit Cycling" and mo.model_selection == 2: + # Skip - we're using fixed water capacity + pass + elif var_string == "Practical Limit Walking" and mo.model_selection == 3: + # Skip - we're using fixed water capacity + pass + elif var_string == "Reference Area": + mv.A = var_test + elif var_string == "Drag Coefficient": + mv.C_d = var_test + elif var_string == "Efficiency": + mv.eta = var_test + elif var_string == "Air Density": + mv.ro = var_test + elif var_string == "MET budget": + met = mm.MET_values( + mv, country_weight=60, met=var_test, use_country_specific_weights=False + ) + elif var_string == "T_hours": + mv.t_hours = var_test + elif var_string == "HPV Weight": + hpv.m_HPV_only = np.array([[[var_test]]]) + elif var_string == "Human Weight": + mv.m1 = var_test + elif var_string == "Human Power Output": + mv.P_t = var_test + else: + logger.warning(f"Unknown sensitivity parameter: {var_string}") + + return hpv, mv, mo, met + + def run_single_sensitivity( + self, sens_row: pd.Series, param_df: pd.DataFrame + ) -> pd.DataFrame: + """ + Run sensitivity analysis for a single parameter with simplified metrics. + """ + var_string = sens_row["Short Name"] + var_units = sens_row["Units"] + + logger.info(f"Running sensitivity analysis for: {var_string}") + + # Create phase space + phase_space = self.create_phase_space(sens_row) + + # Storage for results + results_list = [] + + for var_test in phase_space: + try: + # Initialize fresh model components + mo, mv, met, hpv, mr = self.initialize_model_components(param_df) + + # Apply sensitivity parameter + hpv, mv, mo, met = self.apply_sensitivity_parameter( + var_string, var_test, hpv, mv, mo, met + ) + + # Run model with multiple slopes + velocities = self.run_direct_model_calls(mv, mo, met, hpv) + + # Calculate simplified metrics + metrics = self.calculate_metrics(velocities, mv) + + # Add to results + result_row = { + "Variable": var_test, + "Name": var_string, + "One-way Distance (km)": metrics["one_way_distance_km"], + "Round-trip Distance (km)": metrics["round_trip_distance_km"], + "Average Velocity (m/s)": metrics["avg_velocity_m_s"], + "Trips per Day": metrics["trips_per_day"], + "Time per Trip (hours)": metrics["time_per_trip_hours"], + "Daily Water Collected (L)": metrics["daily_water_collected_L"], + } + results_list.append(result_row) + + except Exception as e: + logger.error( + f"Error in sensitivity calculation for {var_string} = {var_test}: {e}" + ) + # Fill with NaN for failed calculations + result_row = { + "Variable": var_test, + "Name": var_string, + "One-way Distance (km)": np.nan, + "Round-trip Distance (km)": np.nan, + "Average Velocity (m/s)": np.nan, + "Trips per Day": np.nan, + "Time per Trip (hours)": np.nan, + "Daily Water Collected (L)": np.nan, + } + results_list.append(result_row) + + # Create results DataFrame + df_results = pd.DataFrame(results_list) + + # Add adjustment columns for plotting + resolution = len(phase_space) - 3 # Subtract the 3 special values + default_distance = df_results.at[resolution, "One-way Distance (km)"] + df_results["Adjusted Distance"] = ( + df_results["One-way Distance (km)"] - default_distance + ) + + # Add data type labels + df_results["DataType"] = "Sensitivity" + df_results["MarkerSize"] = 10 + + # Mark special points + df_results.at[resolution, "DataType"] = "Default" + df_results.at[resolution + 1, "DataType"] = "Minimum Expected" + df_results.at[resolution + 2, "DataType"] = "Maximum Expected" + df_results.at[resolution, "MarkerSize"] = 50 + df_results.at[resolution + 1, "MarkerSize"] = 20 + df_results.at[resolution + 2, "MarkerSize"] = 20 + + return df_results + + def create_phase_space( + self, sens_row: pd.Series, resolution: int = 7 + ) -> np.ndarray: + """Create phase space for sensitivity analysis.""" + plot_min = sens_row["Plotting Min"] + plot_max = sens_row["Plotting Max"] + minval = sens_row["Expected Min"] + maxval = sens_row["Expected Max"] + def_val = sens_row["Default Value"] + + # Create linear space + phase_space = np.linspace( + start=plot_min, stop=plot_max, num=resolution, endpoint=True + ) + + # Add special values for clear visualization + phase_space = np.append(phase_space, [def_val, minval, maxval]) + + return phase_space + + def run_full_sensitivity_analysis(self) -> pd.DataFrame: + """ + Run complete sensitivity analysis for all parameters. + """ + logger.info("Starting full sensitivity analysis with refactored metrics") + + # Load data + param_df, sens_df = self.load_data() + + # Run analysis for each sensitivity parameter + for i, (_, sens_row) in enumerate(sens_df.iterrows()): + var_string = sens_row["Short Name"] + + # Skip old load/practical limit parameters + if var_string in [ + "Load Limit", + "Practical Limit Cycling", + "Practical Limit Walking", + ]: + logger.info(f"Skipping {var_string} (using fixed water capacity)") + continue + + try: + df_results = self.run_single_sensitivity(sens_row, param_df) + + # Store results + self.full_result_dict[var_string] = df_results + self.df_large = pd.concat([self.df_large, df_results], sort=False) + + logger.info( + f"Completed sensitivity analysis for {var_string} ({i+1}/{len(sens_df)})" + ) + + except Exception as e: + logger.error(f"Failed sensitivity analysis for {var_string}: {e}") + continue + + logger.info("Completed full sensitivity analysis") + return self.df_large + + def create_summary_plot( + self, mo, show_default_annotations: bool = False + ) -> go.Figure: + """ + Create summary bar plot showing parameter importance with new metrics. + """ + # Filter and prepare summary data + summary_df = self.df_large[ + (self.df_large.DataType == "Minimum Expected") + | (self.df_large.DataType == "Maximum Expected") + | (self.df_large.DataType == "Default") + ].copy() + + # Rename variables for better display + name_mapping = { + "T_hours": "Time available [hours]", + "Reference Area": "Reference area [m²]", + "Coefficient of Rolling Resistance": "Rolling resistance", + "Water Requirement": "Water requirement [L/day]", + "Human Weight": "Human mass [kg]", + "Human Power Output": "Power output [W]", + "MET budget": "Energy expenditure [MET]", + "HPV Weight": "Vehicle mass [kg]", + "Air Density": "Air density [kg/m³]", + "Drag Coefficient": "Drag coefficient", + "Efficiency": "Efficiency", + } + summary_df["Name"] = summary_df["Name"].replace(name_mapping) + + # Calculate parameter importance based on distance impact + pivot_df = summary_df.pivot( + index="Name", columns="DataType", values="Adjusted Distance" + ) + pivot_df["abs_diff"] = np.abs( + pivot_df["Maximum Expected"] - pivot_df["Minimum Expected"] + ) + pivot_df.reset_index(inplace=True) + + # Sort by importance + pivot_df = pivot_df.sort_values("abs_diff", ascending=False) + + # Merge back + summary_df = pd.merge(pivot_df[["Name", "abs_diff"]], summary_df, on="Name") + + # Filter based on model type + if mo.model_selection == 2: # Cycling + exclude_vars = ["Energy expenditure [MET]"] + else: # Walking + exclude_vars = [ + "Reference area [m²]", + "Power output [W]", + "Efficiency", + "Drag coefficient", + "Rolling resistance", + "Air density [kg/m³]", + ] + + summary_df = summary_df[~summary_df.Name.isin(exclude_vars)] + + # Create plot + filtered_summary_df = summary_df[summary_df["DataType"] != "Default"] + color_discrete_map = { + "Minimum Expected": "#42BFDD", + "Maximum Expected": "#084B83", + "Default": None, + } + + fig = px.bar( + filtered_summary_df, + y="Name", + x="Adjusted Distance", + color="DataType", + color_discrete_map=color_discrete_map, + labels={ + "Name": "", + "Adjusted Distance": "Impact on One-way Distance (km)", + "DataType": "Parameter Value", + }, + title=f"{self.model_type} Model Sensitivity Analysis - Impact on Water Access Distance", + ) + + # Add baseline reference line (x=0) + fig.add_vline( + x=0, + line_dash="dash", + line_color="gray", + line_width=2, + annotation_text="Baseline", + annotation_position="top", + ) + + fig.update_layout( + width=1000, + height=600, + font=dict(size=14), + margin=dict(l=250, r=50, t=70, b=50), + ) + + return fig + + def save_results(self, output_dir: Optional[str] = None) -> None: + """Save analysis results to files.""" + try: + if output_dir is None: + project_root = Path(__file__).resolve().parent.parent + output_path = project_root / "results" + else: + output_path = Path(output_dir) + output_path.mkdir(exist_ok=True) + + # Save summary data + summary_file = ( + output_path + / f"sensitivity_analysis_{self.model_type.lower()}_refactored.csv" + ) + self.df_large.to_csv(summary_file, index=False) + + logger.info(f"Results saved to {output_path}") + + except Exception as e: + logger.error(f"Error saving results: {e}") + + +def main(): + """Main execution function for refactored sensitivity analysis.""" + import argparse + + # Parse command line arguments + parser = argparse.ArgumentParser( + description="Refactored Water Access Sensitivity Analysis" + ) + parser.add_argument( + "--model", + "-m", + type=str, + choices=["Lankford", "Martin"], + default="Lankford", + help="Model type to analyze", + ) + parser.add_argument( + "--show-defaults", + action="store_true", + help="Show default parameter value annotations on plot", + ) + args = parser.parse_args() + + model_type = args.model + logger.info("Starting Refactored Sensitivity Analysis") + logger.info(f"Using model: {model_type}") + logger.info("Key changes: Fixed water capacity, simplified metrics") + + try: + # Initialize analyzer + analyzer = RefactoredSensitivityAnalyzer(model_type=model_type) + + # Run full analysis + df_results = analyzer.run_full_sensitivity_analysis() + + # Create summary plot + project_root = Path(__file__).resolve().parent.parent + param_df = pd.read_csv( + project_root / "data/lookup tables/mobility-model-parameters.csv" + ) + mo, _, _, _, _ = analyzer.initialize_model_components( + param_df.loc[lambda x: x["Name"] == analyzer.filter_value] + ) + summary_fig = analyzer.create_summary_plot( + mo, show_default_annotations=args.show_defaults + ) + + # Display summary plot + summary_fig.show() + + # Save results + analyzer.save_results() + + # Print summary statistics + print("\n=== SUMMARY STATISTICS ===") + print(f"Model: {model_type}") + print(f"Fixed water capacity: {analyzer.fixed_water_capacity}L") + + # Get default values + default_results = df_results[df_results["DataType"] == "Default"] + if not default_results.empty: + default_row = default_results.iloc[0] + print(f"\nDefault scenario results:") + print(f" One-way distance: {default_row['One-way Distance (km)']:.2f} km") + print( + f" Round-trip distance: {default_row['Round-trip Distance (km)']:.2f} km" + ) + print( + f" Average velocity: {default_row['Average Velocity (m/s)']:.2f} m/s" + ) + print(f" Time per trip: {default_row['Time per Trip (hours)']:.2f} hours") + + logger.info("Refactored sensitivity analysis completed successfully") + + except Exception as e: + logger.error(f"Sensitivity analysis failed: {e}") + raise + + +if __name__ == "__main__": + main() diff --git a/src/gis_global_module.py b/src/gis_global_module.py index a29d80d..e0e8b04 100644 --- a/src/gis_global_module.py +++ b/src/gis_global_module.py @@ -1,17 +1,19 @@ +import pathlib import pandas as pd import numpy as np +import matplotlib.pyplot as plt import plotly.express as px # import weightedstats as ws import sys from pathlib import Path +import pdb import os import warnings # ## Import Data from CSVs. -# CSVs created in previous script, which did the cycling mobility on a per -# country basis +# CSVs created in previous script, which did the cycling mobility on a per country basis # # ### GIS Data From QGIS Export # https://ghsl.jrc.ec.europa.eu/download.php?ds=smod <- urbanisation @@ -21,11 +23,11 @@ # https://www.globio.info/download-grip-dataset <- roads -########################################################################## +############################################################################################################################## # # PRE-PROCESSING # -########################################################################## +############################################################################################################################## # Assuming the script is in the src directory, set the script_dir accordingly script_dir = Path(__file__).resolve().parent @@ -60,23 +62,33 @@ def weighted_mean(var, wts): def weighted_median_series(val, weight): - """Calculates the weighted median ArithmeticError If the sum of the weights - is zero, or if the weights are not positive.""" + """Calculates the weighted median + ArithmeticError + If the sum of the weights is zero, or if the weights are not positive. + """ try: + if len(val) == 0 or len(weight) == 0: + return np.nan + if np.sum(weight) == 0: + return np.nan + df = pd.DataFrame({"val": val, "weight": weight}) df_sorted = df.sort_values("val") cumsum = df_sorted["weight"].cumsum() cutoff = df_sorted["weight"].sum() / 2.0 result = df_sorted[cumsum >= cutoff]["val"].iloc[0] # return just the value - except BaseException: + except (ValueError, IndexError, KeyError) as e: + print(f"Warning: weighted_median_series failed with error: {e}") result = np.nan return result def weighted_median(df, val_column, weight_column): - """Calculates the weighted median ArithmeticError If the sum of the weights - is zero, or if the weights are not positive.""" + """Calculates the weighted median + ArithmeticError + If the sum of the weights is zero, or if the weights are not positive. + """ df_sorted = df.sort_values(val_column) cumsum = df_sorted[weight_column].cumsum() cutoff = df_sorted[weight_column].sum() / 2.0 @@ -85,7 +97,6 @@ def weighted_median(df, val_column, weight_column): def run_weighted_median_on_grouped_df(df, groupby_column, value_column, weight_column): """Calculate the weighted median of a dataframe grouped by a column. - Args: df (pandas.DataFrame): DataFrame to calculate weighted median on. groupby_column (str): Column to group by. @@ -114,7 +125,8 @@ def run_weighted_median_on_grouped_df(df, groupby_column, value_column, weight_c def load_data(urb_data_file, country_data_file): - """Load data from CSV files. + """ + Load data from CSV files. Parameters: - urb_data_file (str): The file path of the urban data CSV file. @@ -127,43 +139,47 @@ def load_data(urb_data_file, country_data_file): try: df_zones_input = pd.read_csv(urb_data_file) df_input = pd.read_csv(country_data_file) - except BaseException: - df_zones_input = pd.read_csv("." + urb_data_file) - df_input = pd.read_csv("." + country_data_file) + except (FileNotFoundError, pd.errors.EmptyDataError) as e: + # Try relative path as fallback + try: + df_zones_input = pd.read_csv(Path(".") / urb_data_file) + df_input = pd.read_csv(Path(".") / country_data_file) + except Exception as fallback_e: + raise FileNotFoundError( + f"Could not load data files: {e}, fallback error: {fallback_e}" + ) return df_zones_input, df_input # Function for managing urban/rural data def manage_urban_rural(df_zones_input): - """Converts the 'dtw_1' column from meters to kilometers and creates a new - binary column 'urban_rural' based on the 'GHS_SMOD' column. Rural values - are below 15, while urban values are above 15. + """ + Converts the 'dtw_1' column from meters to kilometers and creates a new binary column + 'urban_rural' based on the 'GHS_SMOD' column. Rural values are below 15, while urban values + are above 15. Args: - df_zones_input (pandas.DataFrame): Input DataFrame containing the - 'dtw_1' and 'GHS_SMOD' columns. + df_zones_input (pandas.DataFrame): Input DataFrame containing the 'dtw_1' and 'GHS_SMOD' columns. Returns: - pandas.DataFrame: DataFrame with the 'dtw_1' column converted to - kilometers and a new 'urban_rural' column indicating whether a zone - is urban (1) or rural (0). + pandas.DataFrame: DataFrame with the 'dtw_1' column converted to kilometers and a new 'urban_rural' + column indicating whether a zone is urban (1) or rural (0). """ # Convert dtw_1 from meters to kilometers df_zones_input["dtw_1"] /= 1000 # Manage Urban / Rural Data # Use the GHS_SMOD_E2020_GLOBE_R2023A_54009_1000_V1_0 dataset. # from here: https://ghsl.jrc.ec.europa.eu/download.php?ds=smod - # create new binary column for urban / rural. Rural is below 15, Urban - # above 15 + # create new binary column for urban / rural. Rural is below 15, Urban above 15 df_zones_input["urban_rural"] = np.where(df_zones_input["GHS_SMOD"] > 15, 1, 0) return df_zones_input def adjust_euclidean(df_zones_input, urban_adjustment, rural_adjustment): - """Adjusts distance to water to account for people having to travel on - roads and paths instead of in straight lines (Euclidean distance), with - different adjustments for urban and rural areas. + """ + Adjusts distance to water to account for people having to travel on roads and paths instead of + in straight lines (Euclidean distance), with different adjustments for urban and rural areas. Parameters: - df_zones_input (pd.DataFrame): DataFrame of GIS zones data @@ -187,15 +203,14 @@ def adjust_euclidean(df_zones_input, urban_adjustment, rural_adjustment): # Function for managing slope def manage_slope(df_zones_input): - """Perform slope management on the input DataFrame. + """ + Perform slope management on the input DataFrame. Args: - df_zones_input (pandas.DataFrame): The input DataFrame containing the - slope data. + df_zones_input (pandas.DataFrame): The input DataFrame containing the slope data. Returns: - pandas.DataFrame: The modified DataFrame after performing slope - management. + pandas.DataFrame: The modified DataFrame after performing slope management. """ df_zones_input["slope_1"].hist(bins=100, log=False) df_zones_input["slope_1"].quantile(np.arange(0, 1, 0.05)) @@ -204,13 +219,12 @@ def manage_slope(df_zones_input): # Function for merging dataframes and adjusting populations def merge_and_adjust_population(df_zones_input, df_input): - """Merge and adjust population data based on zone and country information. + """ + Merge and adjust population data based on zone and country information. Args: - df_zones_input (DataFrame): Input DataFrame containing zone - information. - df_input (DataFrame): Input DataFrame containing country - information. + df_zones_input (DataFrame): Input DataFrame containing zone information. + df_input (DataFrame): Input DataFrame containing country information. Returns: DataFrame: DataFrame with merged and adjusted population data. @@ -220,20 +234,17 @@ def merge_and_adjust_population(df_zones_input, df_input): """ assert not df_zones_input.empty, "df_zones_input is empty" assert not df_input.empty, "df_input is empty" - # this analysis loses some data as the overlap between the rasters is not - # perfect. To reduce this error, use the 30 arc second data. Too much - # heavy lifting for my computer to do this at the moment. - # merge df_input and df_zones on ISO_CC. This assigns all the country - # data to each zone. + # this analysis loses some data as the overlap between the rasters is not perfect. To reduce this error, use the 30 arc second data. Too much heavy lifting for my computer to do this at the moment. + # merge df_input and df_zones on ISO_CC. This assigns all the country data to each zone. # join inner will remove some of the data that is not in both datasets df_zones = df_zones_input.merge( df_input, left_on="ISOCODE", right_on="alpha3", how="inner" ) - # convert population density to percent of national population on a per - # country basis, grouped by ISO_CC + # convert population density to percent of national population on a per country basis, grouped by ISO_CC + # Prevent division by zero when country population sum is zero df_zones["pop_density_perc"] = df_zones.groupby("ISOCODE")["pop_density"].apply( - lambda x: x / x.sum() + lambda x: x / x.sum() if x.sum() > 0 else 0 ) # multiply population density by population on a per country basis df_zones["pop_zone"] = df_zones["pop_density_perc"] * df_zones["Population"] @@ -259,8 +270,7 @@ def merge_and_adjust_population(df_zones_input, df_input): # df_zones (pandas.DataFrame): The DataFrame containing the zones data. # Returns: -# pandas.DataFrame: The DataFrame with additional columns for road -# analysis. +# pandas.DataFrame: The DataFrame with additional columns for road analysis. # """ # # Define the columns to work with for GRIP data @@ -274,8 +284,7 @@ def merge_and_adjust_population(df_zones_input, df_input): # # Handle rows with all zeros # all_zeros = np.all(data == 0, axis=1) -# non_zero_indices[all_zeros] = -1 # Set a distinct value for rows with -# all zeros +# non_zero_indices[all_zeros] = -1 # Set a distinct value for rows with all zeros # # Map indices to road types # road_types = np.array( @@ -309,9 +318,9 @@ def merge_and_adjust_population(df_zones_input, df_input): def crr_add_uncertainty(road_type, adjustment): - """The road type is adjusted by the given integer amount, unless the - change would go out of bounds. In these cases, the adjustment stops at - the best or worst road type. + """ + The road type is adjusted by the given integer amount, unless the change would go out of bounds. + In these cases, the adjustment stops at the best or worst road type. Parameters: road_type (str): The current road type. @@ -347,17 +356,15 @@ def crr_add_uncertainty(road_type, adjustment): def road_analysis(df_zones, crr_adjustment=0): - """Maps CRR values to road types in df_zones. Adjusts the CRR up or down - to add uncertainty. + """ + Maps CRR values to road types in df_zones. Adjusts the CRR up or down to add uncertainty. Args: df_zones (pandas.DataFrame): The DataFrame containing zone data. - crr_adjustment (int, optional): The adjustment value for Crr - mapping. Defaults to 0. + crr_adjustment (int, optional): The adjustment value for Crr mapping. Defaults to 0. Returns: - pandas.DataFrame: The DataFrame with additional columns for dominant - road type and Crr values. + pandas.DataFrame: The DataFrame with additional columns for dominant road type and Crr values. """ # Define the columns to work with for GRIP data @@ -371,8 +378,7 @@ def road_analysis(df_zones, crr_adjustment=0): # Handle rows with all zeros all_zeros = np.all(data == 0, axis=1) - # Set a distinct value for rows with all zeros - non_zero_indices[all_zeros] = -1 + non_zero_indices[all_zeros] = -1 # Set a distinct value for rows with all zeros # Map indices to road types road_types = np.array( @@ -394,8 +400,7 @@ def road_analysis(df_zones, crr_adjustment=0): # Load the Crr mapping table df_crr = pd.read_csv(CRR_FILE) - # Add uncertainty (make road type 1 better or 1 worse, unless road type is - # best (highway) or worst (no roads)) + # Add uncertainty (make road type 1 better or 1 worse, unless road type is best (highway) or worst (no roads)) df_crr["Crr"] = df_crr["Crr"].shift(-crr_adjustment) df_crr["Crr"] = df_crr["Crr"].ffill().bfill() @@ -412,8 +417,10 @@ def road_analysis(df_zones, crr_adjustment=0): def calculate_nat_piped(df_zones): - """Calculates National piped using the formula: - [ URBANPiped ×'% urban' ÷100 + (100−'% urban' )÷100×RURALPiped ] + """ + Calculates National piped using the formula + [ URBANPiped ב% urban’ ÷100+(100−‘% urban’ )÷100×RURALPiped ] + """ df_zones["NATPiped"] = ( df_zones["URBANPiped"] * df_zones["% urban"] / 100 @@ -426,22 +433,22 @@ def calculate_nat_piped(df_zones): def preprocess_data( crr_adjustment, urban_adjustment, rural_adjustment, use_sample_data=False ): - """Preprocesses data by loading, managing, merging, and adjusting - population data. + """ + Preprocesses data by loading, managing, merging, and adjusting population data. Args: crr_adjustment (float): The adjustment factor for road analysis. Returns: pandas.DataFrame: The preprocessed data. + """ if use_sample_data: urban_data_file = URB_DATA_FILE_SAMPLE warnings.warn( - "Using sample data. This should only be done for testing, and " - "not for generating real model results!" + "Using sample data. This should only be done for testing, and not for generating real model results!" ) else: urban_data_file = URB_DATA_FILE @@ -460,41 +467,37 @@ def preprocess_data( return df_zones -########################################################################## +############################################################################################################################## # # ANALYSIS I: Models # -########################################################################## +############################################################################################################################## def load_hpv_parameters(file_path_params, hpv_name): - """Load HPV parameters from a CSV file and return the subset of - parameters for a specific HPV name. + """ + Load HPV parameters from a CSV file and return the subset of parameters for a specific HPV name. Parameters: - - file_path_params (str): The file path to the CSV file containing the - HPV parameters. - - hpv_name (str): The name of the HPV for which to retrieve the - parameters. + - file_path_params (str): The file path to the CSV file containing the HPV parameters. + - hpv_name (str): The name of the HPV for which to retrieve the parameters. Returns: - - pandas.DataFrame: A DataFrame containing the subset of parameters for - the specified HPV name. + - pandas.DataFrame: A DataFrame containing the subset of parameters for the specified HPV name. """ allHPV_param_df = pd.read_csv(file_path_params) return allHPV_param_df[allHPV_param_df["Name"] == hpv_name] def extract_slope_crr(df_zones): - """Extracts slope and Crr values from the given DataFrame. + """ + Extracts slope and Crr values from the given DataFrame. Args: - df_zones (pandas.DataFrame): The DataFrame containing slope and Crr - values. + df_zones (pandas.DataFrame): The DataFrame containing slope and Crr values. Returns: - tuple: A tuple containing two pandas.Series objects - slope_zones and - Crr_values. + tuple: A tuple containing two pandas.Series objects - slope_zones and Crr_values. slope_zones: The series containing slope values. Crr_values: The series containing Crr values. """ @@ -509,7 +512,8 @@ def extract_slope_crr(df_zones): def run_bicycle_model( mv, mo, hpv, slope_zones, Crr_values, country_average_weights, load_attempt ): - """Runs a bicycle model for different slope zones and Crr values. + """ + Runs a bicycle model for different slope zones and Crr values. Args: mv: Model variables. @@ -536,8 +540,7 @@ def run_bicycle_model( hpv.Crr = crr mv.m1 = country_average_weight results[i] = mm.mobility_models.single_bike_run( - # need to modify single_bike_run function to take - # country_average_weight instead of m1 + # need to modify single_bike_run function to take country_average_weight instead of m1 # (done by updated mv to take country_average_weight as m1) mv, mo, @@ -556,22 +559,20 @@ def run_bicycle_model( def process_and_save_results( df_zones, results, export_file_location, velocity_type, save_csv=False ): - """Process the results and optionally save them to a DataFrame. - The results should only be saved for single runs, as saving all monte carlo - csvs will be too large. + """ + Process the results and optionally save them to a DataFrame. + The results should only be saved for single runs, as saving all monte carlo csvs will be too large. Args: df_zones (pandas.DataFrame): The DataFrame containing the zones data. - results (numpy.ndarray): The results array containing the velocity - vectors. - export_file_location (str): The file location where the output CSV - will be saved. + results (numpy.ndarray): The results array containing the velocity vectors. + export_file_location (str): The file location where the output CSV will be saved. velocity_type (str): The type of velocity (e.g., walk/bicycle). - save_csv (bool, optional): Whether to save the results as a CSV file. - Defaults to False. + save_csv (bool, optional): Whether to save the results as a CSV file. Defaults to False. Returns: pandas.DataFrame: The updated DataFrame with the new columns. + """ # Unpack results loaded_velocity_vec, unloaded_velocity_vec, max_load_vec = results.T @@ -611,11 +612,10 @@ def process_and_save_results( return df_zones -# function to map hill polarity strings to integers for lhillpo and -# ulhillpo, using a dictionary for the mapping +# function to map hill polarity strings to integers for lhillpo and ulhillpo, using a dictionary for the mapping def map_hill_polarity(hill_polarity): - """Maps the hill polarity string to integers for the unloaded trip - (ulhillpo) and the loaded trip back (lhillpo). + """ + Maps the hill polarity string to integers for the unloaded trip (ulhillpo) and the loaded trip back (lhillpo). Args: hill_polarity (str): The hill polarity string. @@ -646,16 +646,14 @@ def calculate_and_merge_bicycle_distance( human_mass=62, hill_polarity="flat_uphill", ): - """Calculates and merges bicycle distance for each zone in the given - dataframe. + """ + Calculates and merges bicycle distance for each zone in the given dataframe. Args: df_zones (pandas.DataFrame): The dataframe containing zone information. - calculate_distance (bool): Flag indicating whether to calculate the - distance or not. + calculate_distance (bool): Flag indicating whether to calculate the distance or not. export_file_location (str): The file location to export the results. - practical_limit_bicycle (int, optional): The practical limit for - bicycle distance in kg. Defaults to 40. + practical_limit_bicycle (int, optional): The practical limit for bicycle distance in kg. Defaults to 40. Returns: pandas.DataFrame: The dataframe with bicycle distance merged. @@ -713,7 +711,8 @@ def calculate_and_merge_bicycle_distance( def run_walking_model( mv, mo, met, hpv, slope_zones, country_average_weights, load_attempt ): - """Run the walking model for multiple slope zones. + """ + Run the walking model for multiple slope zones. Args: mv: Model variables. @@ -726,6 +725,7 @@ def run_walking_model( Returns: numpy.ndarray: An array of results for each slope zone. + """ # Adjust the project root and import mobility module as needed project_root = Path().resolve().parent @@ -761,20 +761,18 @@ def calculate_and_merge_walking_distance( human_mass=62, hill_polarity="flat_uphill", ): - """Calculate and merge walking distance for zones. + """ + Calculate and merge walking distance for zones. Args: df_zones (pandas.DataFrame): The input DataFrame containing zone data. - calculate_distance (bool): Flag indicating whether to calculate the - walking distance. + calculate_distance (bool): Flag indicating whether to calculate the walking distance. export_file_location (str): The file location to export the results. - practical_limit_buckets (int, optional): The practical limit buckets. - Defaults to 20. + practical_limit_buckets (int, optional): The practical limit buckets. Defaults to 20. met (float, optional): The MET value. Defaults to 3.3. Returns: - pandas.DataFrame: The updated DataFrame with walking distance - information. + pandas.DataFrame: The updated DataFrame with walking distance information. """ if calculate_distance: # Adjust the project root and import mobility module as needed @@ -800,13 +798,7 @@ def calculate_and_merge_walking_distance( # Run model for each zone with country-specific weights slope_zones, Crr_values, country_average_weights = extract_slope_crr(df_zones) results = run_walking_model( - mv, - mo, - met, - hpv, - slope_zones, - country_average_weights, - load_attempt=15, + mv, mo, met, hpv, slope_zones, country_average_weights, load_attempt=15 ) process_and_save_results(df_zones, results, export_file_location, "walk") else: @@ -822,35 +814,34 @@ def calculate_and_merge_walking_distance( return df_zones -########################################################################## +############################################################################################################################## # # ANALYSIS II: Population Water Access # -########################################################################## +############################################################################################################################## def calculate_max_distances(df_zones, time_gathering_water): - """Calculate the maximum distances achievable for gathering water in each - zone. + """ + Calculate the maximum distances achievable for gathering water in each zone. Args: df_zones (pandas.DataFrame): DataFrame containing zone information. time_gathering_water (float): Time taken to gather water in hours. Returns: - pandas.DataFrame: DataFrame with additional columns for max distances - and water ration. + pandas.DataFrame: DataFrame with additional columns for max distances and water ration. + """ - # Max distance achievable (not round trip, just the distance from home to - # water source) + # Max distance achievable (not round trip, just the distance from home to water source) + # Convert velocity from m/s to km/h: multiply by 3600/1000 df_zones["max distance cycling"] = ( - df_zones["average_velocity_bicycle"] * time_gathering_water / 2 + df_zones["average_velocity_bicycle"] * time_gathering_water * 3600 / 2 / 1000 ) df_zones["max distance walking"] = ( - df_zones["average_velocity_walk"] * time_gathering_water / 2 + df_zones["average_velocity_walk"] * time_gathering_water * 3600 / 2 / 1000 ) - # Use water_ration_kms to calculate the water ration achievable per bike - # per zone + # Use water_ration_kms to calculate the water ration achievable per bike per zone df_zones["water_ration_kms"] = ( df_zones["max distance cycling"] * df_zones["max_load_bicycle"] ) @@ -858,21 +849,20 @@ def calculate_max_distances(df_zones, time_gathering_water): def calculate_population_water_access(df_zones): - """Calculates the population with and without access to water for each - zone. + """ + Calculates the population with and without access to water for each zone. Args: df_zones (pandas.DataFrame): DataFrame containing zone information. Returns: - pandas.DataFrame: DataFrame with additional columns representing the - population with and without access to water for each zone. + pandas.DataFrame: DataFrame with additional columns representing the population + with and without access to water for each zone. """ # Set df_zones["zone_pop_piped"] to 0 for all zones to begin with df_zones["zone_pop_piped"] = 0 - # If urban use urban piped and unpiped, if rural use rural piped and - # unpiped + # If urban use urban piped and unpiped, if rural use rural piped and unpiped # Use the urban_rural column to do this df_zones["zone_pop_piped"] = ( df_zones["pop_zone"] * df_zones["urban_rural"] * df_zones["URBANPiped"] / 100 @@ -906,21 +896,16 @@ def calculate_population_water_access(df_zones): ) df_zones["fraction_of_zone_with_walking_access"] = df_zones["zone_walking_okay"] * 1 - # Calculate the population that can access piped water by cycling and - # walking + # Calculate the population that can access piped water by cycling and walking df_zones["population_piped_with_cycling_access"] = ( df_zones["fraction_of_zone_with_cycling_access"] * df_zones["zone_pop_piped"] ) df_zones["population_piped_with_walking_access"] = ( df_zones["fraction_of_zone_with_walking_access"] * df_zones["zone_pop_piped"] ) - # Select the maximum between the two, if walkable, max will always be - # walking + # Select the maximum between the two, if walkable, max will always be walking df_zones["population_piped_with_access"] = df_zones[ - [ - "population_piped_with_cycling_access", - "population_piped_with_walking_access", - ] + ["population_piped_with_cycling_access", "population_piped_with_walking_access"] ].max(axis=1) # Calculate the population that can access water only by cycling @@ -940,30 +925,30 @@ def calculate_population_water_access(df_zones): def calculate_water_rations(df_zones): - """Calculate the water rations achievable utilizing all the bikes in each - zone. + """ + Calculate the water rations achievable utilizing all the bikes in each zone. Parameters: - df_zones (pandas.DataFrame): A DataFrame containing the zone data. Returns: - - df_zones (pandas.DataFrame): The input DataFrame with additional - columns for water rations. - - The function calculates the water rations achievable by dividing the - water ration distance (water_ration_kms) by the distance to water - source (dtw_1) for each zone. It then calculates the number of bikes - in each zone by dividing the population of the zone by the average - household size and multiplying it by the PBO (Percent Bike Ownership) - factor. Finally, it calculates the total water rations achievable in - each zone by multiplying the number of bikes in the zone by the water - rations per bike. + - df_zones (pandas.DataFrame): The input DataFrame with additional columns for water rations. + + The function calculates the water rations achievable by dividing the water ration distance (water_ration_kms) + by the distance to water source (dtw_1) for each zone. It then calculates the number of bikes in each zone + by dividing the population of the zone by the average household size and multiplying it by the PBO (Percent Bike Ownership) factor. + Finally, it calculates the total water rations achievable in each zone by multiplying the number of bikes in the zone + by the water rations per bike. Example usage: + >>> df = calculate_water_rations(df_zones) """ - df_zones["water_rations_per_bike"] = ( - df_zones["water_ration_kms"] / df_zones["dtw_1"] + # Prevent division by zero when distance to water is zero + df_zones["water_rations_per_bike"] = np.where( + df_zones["dtw_1"] > 0, + df_zones["water_ration_kms"] / df_zones["dtw_1"], + np.inf, # Infinite water access when distance is zero ) df_zones["bikes_in_zone"] = ( df_zones["pop_zone"] @@ -984,22 +969,22 @@ def process_zones_for_water_access(df_zones, time_gathering_water=16): return df_zones -########################################################################## +############################################################################################################################## # # ANALYSIS III: Aggregating Country Data # -########################################################################## +############################################################################################################################## def aggregate_country_level_data(df_zones): - """Aggregate zone data into country level summaries. + """ + Aggregate zone data into country level summaries. Parameters: df_zones (DataFrame): The input DataFrame containing zone-level data. Returns: - df_countries (DataFrame): The aggregated DataFrame with country-level - summaries. + df_countries (DataFrame): The aggregated DataFrame with country-level summaries. """ df_countries = ( df_zones.groupby("ISOCODE") @@ -1041,8 +1026,10 @@ def aggregate_country_level_data(df_zones): def weighted_percentile(df, val_column, weight_column, percentile): - """Calculates the weighted percentile ArithmeticError If the sum of the - weights is zero, or if the weights are not positive.""" + """Calculates the weighted percentile + ArithmeticError + If the sum of the weights is zero, or if the weights are not positive. + """ df_sorted = df.sort_values(val_column) cumsum = df_sorted[weight_column].cumsum() cutoff = df_sorted[weight_column].sum() * percentile / 100.0 @@ -1050,7 +1037,9 @@ def weighted_percentile(df, val_column, weight_column, percentile): def calculate_weighted_results(df_zones): - """Calculate the weighted median for each country group.""" + """ + Calculate the weighted median for each country group. + """ # return error if the input DataFrame is empty if df_zones.empty: raise ValueError("Input DataFrame is empty") @@ -1089,7 +1078,8 @@ def calculate_weighted_results(df_zones): def aggregate_global(df_zones): - """Get global data to be added to the country-level data. + """ + Get global data to be added to the country-level data. Parameters: df_zones (DataFrame): The input DataFrame containing zone-level data. @@ -1139,34 +1129,23 @@ def aggregate_global(df_zones): # Create a new DataFrame with the global row df_global = pd.DataFrame( - [ - { - "Entity": "Global", - "ISOCODE": "GLOBAL", - **sum_data, - **weighted_data, - } - ] + [{"Entity": "Global", "ISOCODE": "GLOBAL", **sum_data, **weighted_data}] ) return df_global def clean_up_data(df_countries): - """Clean up data from spurious country values. + """ + Clean up data from spurious country values. Parameters: - - df_countries (pandas.DataFrame): The input dataframe containing - country data. + - df_countries (pandas.DataFrame): The input dataframe containing country data. Returns: - - df_countries (pandas.DataFrame): The cleaned dataframe with spurious - country values removed. - - countries_further_than_libya (pandas.DataFrame): The dataframe - containing countries with distances greater than the maximum distance - to water. - - list_of_countries_to_remove (list): The list of specific countries - manually removed from the dataframe. + - df_countries (pandas.DataFrame): The cleaned dataframe with spurious country values removed. + - countries_further_than_libya (pandas.DataFrame): The dataframe containing countries with distances greater than the maximum distance to water. + - list_of_countries_to_remove (list): The list of specific countries manually removed from the dataframe. """ # df_countries = df_countries.dropna() # Remove any NaN rows @@ -1184,8 +1163,7 @@ def clean_up_data(df_countries): else: countries_further_than_libya = pd.DataFrame() print( - "Warning: No data for Libya (ISOCODE 'LBY'). Skipping outlier " - "removal based on Libya's weighted_med." + "Warning: No data for Libya (ISOCODE 'LBY'). Skipping outlier removal based on Libya's weighted_med." ) # Manually remove specific countries @@ -1203,43 +1181,32 @@ def clean_up_data(df_countries): ~df_countries["ISOCODE"].isin(list_of_countries_to_remove) ] - # Return the cleaned dataframe and lists of removed countries for logging - # or review - return ( - df_countries, - countries_further_than_libya, - list_of_countries_to_remove, - ) + # Return the cleaned dataframe and lists of removed countries for logging or review + return df_countries, countries_further_than_libya, list_of_countries_to_remove def process_country_data(df_zones): - """Orchestrate the processing of zone data into country-level summaries - and cleanup. + """ + Orchestrate the processing of zone data into country-level summaries and cleanup. Args: - df_zones (pandas.DataFrame): The input dataframe containing - zone-level data. + df_zones (pandas.DataFrame): The input dataframe containing zone-level data. Returns: - pandas.DataFrame: The processed dataframe containing country-level - summaries. + pandas.DataFrame: The processed dataframe containing country-level summaries. """ assert not df_zones.empty, "Input dataframe is empty" warnings.warn("Input dataframe contains NaN values") df_countries = aggregate_country_level_data(df_zones) assert not df_countries.empty, "Country-level dataframe is empty" - # TODO check this out (assert raises error, but probably not an issue as - # some NaNs are expected?) - # assert not df_countries.isnull().values.any(), "Country-level dataframe - # contains NaN values" + # TODO check this out (assert raises error, but probably not an issue as some NaNs are expected?) + # assert not df_countries.isnull().values.any(), "Country-level dataframe contains NaN values" df_median_group = calculate_weighted_results(df_zones) assert not df_median_group.empty, "Weighted median dataframe is empty" - # TODO check this out (assert raises error, but probably not an issue as - # some NaNs are expected?) - # assert not df_median_group.isnull().values.any(), "Weighted median - # dataframe contains NaN values" + # TODO check this out (assert raises error, but probably not an issue as some NaNs are expected?) + # assert not df_median_group.isnull().values.any(), "Weighted median dataframe contains NaN values" df_countries = df_countries.merge( df_median_group, on="ISOCODE" @@ -1266,8 +1233,9 @@ def process_country_data(df_zones): # Check if 'country_pop_with_water' exists in df_countries if "country_pop_with_water" not in df_countries.columns: raise KeyError( - "Column 'country_pop_with_water' does not exist in df_countries. " - "Available columns: {}".format(df_countries.columns) + "Column 'country_pop_with_water' does not exist in df_countries. Available columns: {}".format( + df_countries.columns + ) ) df_countries["percent_with_water"] = ( @@ -1311,8 +1279,7 @@ def process_country_data(df_zones): # Log or print summary of removed countries if not removed_further_than_libya.empty: print( - "Countries removed from analysis due to being further than Libya's" - " median:", # noqa + "Countries removed from analysis due to being further than Libya's median:", removed_further_than_libya["Entity"].tolist(), ) print("Countries removed manually:", removed_countries_list) @@ -1320,22 +1287,22 @@ def process_country_data(df_zones): return df_countries -########################################################################## +############################################################################################################################## # # ANALYSIS IV: Aggregating District Data # -########################################################################## +############################################################################################################################## def aggregate_district_level_data(df_zones): - """Aggregate zone data into district level summaries. + """ + Aggregate zone data into district level summaries. Parameters: df_zones (DataFrame): The input DataFrame containing zone-level data. Returns: - df_countries (DataFrame): The aggregated DataFrame with district-level - summaries. + df_countries (DataFrame): The aggregated DataFrame with district-level summaries. """ df_zones["district_pop_raw"] = df_zones.groupby("shapeID")["pop_zone"].transform( @@ -1385,26 +1352,21 @@ def aggregate_district_level_data(df_zones): def process_district_data(df_zones): - """Orchestrate the processing of zone data into district-level summaries - and cleanup. + """ + Orchestrate the processing of zone data into district-level summaries and cleanup. Args: - df_zones (pandas.DataFrame): The input dataframe containing - zone-level data. + df_zones (pandas.DataFrame): The input dataframe containing zone-level data. Returns: - pandas.DataFrame: The processed dataframe containing district-level - summaries. + pandas.DataFrame: The processed dataframe containing district-level summaries. """ assert not df_zones.empty, "Input dataframe is empty" df_districts = aggregate_district_level_data(df_zones) - # use groupby to create weighted median, needs to be separate from the - # above groupby as it uses apply, which can't be used in the same - # groupby - # needs to use apply because the function required two columns as - # input + # use groupby to create weighted median, needs to be speerate from the above groupby as it uses apply, which can't be used in the same groupby + # needs to use apply because the function required two columns as input df_median_group = df_zones.groupby(["shapeID"]).apply( lambda x: pd.Series({"weighted_med": weighted_median(x, "dtw_1", "pop_zone")}) ) @@ -1476,11 +1438,11 @@ def process_district_data(df_zones): return df_districts -########################################################################## +############################################################################################################################## # # VISUALISATIONS # -########################################################################## +############################################################################################################################## def plot_chloropleth(df_countries): @@ -1520,11 +1482,11 @@ def plot_chloropleth(df_countries): choro.show() -########################################################################## +############################################################################################################################## # # MAIN Function # -########################################################################## +############################################################################################################################## def run_global_analysis( @@ -1542,28 +1504,20 @@ def run_global_analysis( human_mass=62, # gets overridden by country specific weight use_sample_data=False, # only change to test functionality ): - """Runs one run of the global analysis for water access. + """ + Runs one run of the global analysis for water access. Args: - crr_adjustment (float): The adjustment factor for calculating the - Coefficient of Rolling Resistance (CRR). - time_gathering_water (float): The time taken to gather water in - minutes. - practical_limit_bicycle (float): The practical limit for distance - traveled by bicycle in kilometers. - practical_limit_buckets (float): The practical limit for distance - traveled by carrying buckets in kilometers. + crr_adjustment (float): The adjustment factor for calculating the Coefficient of Rolling Resistance (CRR). + time_gathering_water (float): The time taken to gather water in minutes. + practical_limit_bicycle (float): The practical limit for distance traveled by bicycle in kilometers. + practical_limit_buckets (float): The practical limit for distance traveled by carrying buckets in kilometers. met (str): metabolic equivalent of task (energy for walking). watts (float): The power output in watts (energy for cycling). - calculate_distance (bool, optional): Whether to calculate distance - or not. Defaults to True, otherwise new results are not - generated. - human_mass (float): The mass of an average human carrying water in - kilograms. - plot (bool, optional): Whether to plot the chloropleth map or not. - Defaults to False. - use_sample_data (bool, optional): Whether to use sample data or not - (only for testing). Defaults to False. + calculate_distance (bool, optional): Whether to calculate distance or not. Defaults to True, otherwise new results are not generated. + human_mass (float): The mass of an average human carrying water in kilograms. + plot (bool, optional): Whether to plot the chloropleth map or not. Defaults to False. + use_sample_data (bool, optional): Whether to use sample data or not (only for testing). Defaults to False. Returns: pandas.DataFrame: The processed data for each country. @@ -1619,6 +1573,7 @@ def run_global_analysis( if __name__ == "__main__": + df_countries, df_districts, zones_results = run_global_analysis( crr_adjustment=0, time_gathering_water=5.5,