diff --git a/tests/test_dict_flattening.py b/tests/test_dict_flattening.py index cd4776d..9bad7bf 100644 --- a/tests/test_dict_flattening.py +++ b/tests/test_dict_flattening.py @@ -18,9 +18,6 @@ from fz.io import flatten_dict_recursive, flatten_dict_columns -# Skip all tests if pandas is not available (dict flattening requires pandas) - - class TestFlattenDictRecursive: """Test the flatten_dict_recursive helper function""" @@ -211,69 +208,81 @@ class TestFzoWithDictFlattening: def test_fzo_with_dict_output(self): """Test fzo automatically flattens dict outputs""" with tempfile.TemporaryDirectory() as tmpdir: - # Create result directory with dict output - result_dir = Path(tmpdir) / "results" / "x=5,y=10" - result_dir.mkdir(parents=True) - - # Write output file with JSON dict - with open(result_dir / "output.txt", "w") as f: - f.write("sum=15\n") - f.write('stats={"min": 5, "max": 10, "diff": 5}\n') - - # Define model - model = { - "varprefix": "$", - "delim": "{}", - "output": { - "sum": "grep 'sum=' output.txt | cut -d'=' -f2", - "stats": "grep 'stats=' output.txt | cut -d'=' -f2" + # Save original directory to avoid Windows file deletion issues + original_cwd = os.getcwd() + try: + # Create result directory with dict output + result_dir = Path(tmpdir) / "results" / "x=5,y=10" + result_dir.mkdir(parents=True) + + # Write output file with JSON dict + with open(result_dir / "output.txt", "w") as f: + f.write("sum=15\n") + f.write('stats={"min": 5, "max": 10, "diff": 5}\n') + + # Define model + model = { + "varprefix": "$", + "delim": "{}", + "output": { + "sum": "grep 'sum=' output.txt | cut -d'=' -f2", + "stats": "grep 'stats=' output.txt | cut -d'=' -f2" + } } - } - - # Run fzo - os.chdir(tmpdir) - results = fz.fzo("results/*", model) - - # Check flattening occurred - assert 'stats' not in results.columns - assert 'stats_min' in results.columns - assert 'stats_max' in results.columns - assert 'stats_diff' in results.columns - # Check values - assert results['sum'].iloc[0] == 15 - assert results['stats_min'].iloc[0] == 5 - assert results['stats_max'].iloc[0] == 10 - assert results['stats_diff'].iloc[0] == 5 + # Run fzo + os.chdir(tmpdir) + results = fz.fzo("results/*", model) + + # Check flattening occurred + assert 'stats' not in results.columns + assert 'stats_min' in results.columns + assert 'stats_max' in results.columns + assert 'stats_diff' in results.columns + + # Check values + assert results['sum'].iloc[0] == 15 + assert results['stats_min'].iloc[0] == 5 + assert results['stats_max'].iloc[0] == 10 + assert results['stats_diff'].iloc[0] == 5 + finally: + # Restore original directory to allow cleanup on Windows + os.chdir(original_cwd) def test_fzo_with_nested_dict_output(self): """Test fzo with nested dict outputs""" with tempfile.TemporaryDirectory() as tmpdir: - result_dir = Path(tmpdir) / "results" / "case1" - result_dir.mkdir(parents=True) - - # Write output with nested dict - nested_dict = { - 'basic': {'min': 1, 'max': 10}, - 'advanced': {'mean': 5.5, 'std': 2.5} - } - with open(result_dir / "output.txt", "w") as f: - f.write(f"data={json.dumps(nested_dict)}\n") + # Save original directory to avoid Windows file deletion issues + original_cwd = os.getcwd() + try: + result_dir = Path(tmpdir) / "results" / "case1" + result_dir.mkdir(parents=True) + + # Write output with nested dict + nested_dict = { + 'basic': {'min': 1, 'max': 10}, + 'advanced': {'mean': 5.5, 'std': 2.5} + } + with open(result_dir / "output.txt", "w") as f: + f.write(f"data={json.dumps(nested_dict)}\n") - model = { - "output": { - "data": "grep 'data=' output.txt | cut -d'=' -f2" + model = { + "output": { + "data": "grep 'data=' output.txt | cut -d'=' -f2" + } } - } - os.chdir(tmpdir) - results = fz.fzo("results/*", model) + os.chdir(tmpdir) + results = fz.fzo("results/*", model) - # Check nested flattening - assert 'data_basic_min' in results.columns - assert 'data_basic_max' in results.columns - assert 'data_advanced_mean' in results.columns - assert 'data_advanced_std' in results.columns + # Check nested flattening + assert 'data_basic_min' in results.columns + assert 'data_basic_max' in results.columns + assert 'data_advanced_mean' in results.columns + assert 'data_advanced_std' in results.columns + finally: + # Restore original directory to allow cleanup on Windows + os.chdir(original_cwd) class TestFzrWithDictFlattening: @@ -282,16 +291,19 @@ class TestFzrWithDictFlattening: def test_fzr_with_dict_output(self): """Test fzr automatically flattens dict outputs""" with tempfile.TemporaryDirectory() as tmpdir: - os.chdir(tmpdir) - - # Create input template - with open("input.txt", "w") as f: - f.write("x = ${x}\n") - - # Create calculator script that produces dict output - calc_script = Path(tmpdir) / "calc.py" - with open(calc_script, "w") as f: - f.write("""#!/usr/bin/env python3 + # Save original directory to avoid Windows file deletion issues + original_cwd = os.getcwd() + try: + os.chdir(tmpdir) + + # Create input template + with open("input.txt", "w") as f: + f.write("x = ${x}\n") + + # Create calculator script that produces dict output + calc_script = Path(tmpdir) / "calc.py" + with open(calc_script, "w") as f: + f.write("""#!/usr/bin/env python3 import json # Read input @@ -307,53 +319,59 @@ def test_fzr_with_dict_output(self): f.write(f"value={x}\\n") f.write(f"stats={json.dumps(stats)}\\n") """) - os.chmod(calc_script, 0o755) - - # Define model - model = { - "varprefix": "$", - "delim": "{}", - "output": { - "value": "grep 'value=' output.txt | cut -d'=' -f2", - "stats": "grep 'stats=' output.txt | cut -d'=' -f2" + os.chmod(calc_script, 0o755) + + # Define model + model = { + "varprefix": "$", + "delim": "{}", + "output": { + "value": "grep 'value=' output.txt | cut -d'=' -f2", + "stats": "grep 'stats=' output.txt | cut -d'=' -f2" + } } - } - # Run fzr - results = fz.fzr( - input_path="input.txt", - input_variables={"x": [5, 10, 15]}, - model=model, - calculators=f"sh://python3 {calc_script}" - ) - - # Check flattening occurred - assert 'stats' not in results.columns - assert 'stats_min' in results.columns - assert 'stats_max' in results.columns - assert 'stats_mean' in results.columns - - # Check values for first row - assert results['x'].iloc[0] == 5 - assert results['value'].iloc[0] == 5 - assert results['stats_min'].iloc[0] == 4 - assert results['stats_max'].iloc[0] == 6 - assert results['stats_mean'].iloc[0] == 5 - - # Check all rows - assert len(results) == 3 + # Run fzr + results = fz.fzr( + input_path="input.txt", + input_variables={"x": [5, 10, 15]}, + model=model, + calculators=f"sh://python3 {calc_script}" + ) + + # Check flattening occurred + assert 'stats' not in results.columns + assert 'stats_min' in results.columns + assert 'stats_max' in results.columns + assert 'stats_mean' in results.columns + + # Check values for first row + assert results['x'].iloc[0] == 5 + assert results['value'].iloc[0] == 5 + assert results['stats_min'].iloc[0] == 4 + assert results['stats_max'].iloc[0] == 6 + assert results['stats_mean'].iloc[0] == 5 + + # Check all rows + assert len(results) == 3 + finally: + # Restore original directory to allow cleanup on Windows + os.chdir(original_cwd) def test_fzr_with_deeply_nested_dict(self): """Test fzr with deeply nested dict outputs (3 levels)""" with tempfile.TemporaryDirectory() as tmpdir: - os.chdir(tmpdir) + # Save original directory to avoid Windows file deletion issues + original_cwd = os.getcwd() + try: + os.chdir(tmpdir) - with open("input.txt", "w") as f: - f.write("x = ${x}\n") + with open("input.txt", "w") as f: + f.write("x = ${x}\n") - calc_script = Path(tmpdir) / "calc.py" - with open(calc_script, "w") as f: - f.write("""#!/usr/bin/env python3 + calc_script = Path(tmpdir) / "calc.py" + with open(calc_script, "w") as f: + f.write("""#!/usr/bin/env python3 import json with open('input.txt', 'r') as f: @@ -375,44 +393,50 @@ def test_fzr_with_deeply_nested_dict(self): with open('output.txt', 'w') as f: f.write(f"result={json.dumps(result)}\\n") """) - os.chmod(calc_script, 0o755) - - model = { - "varprefix": "$", - "delim": "{}", - "output": { - "result": "grep 'result=' output.txt | cut -d'=' -f2" + os.chmod(calc_script, 0o755) + + model = { + "varprefix": "$", + "delim": "{}", + "output": { + "result": "grep 'result=' output.txt | cut -d'=' -f2" + } } - } - - results = fz.fzr( - input_path="input.txt", - input_variables={"x": [3, 5]}, - model=model, - calculators=f"sh://python3 {calc_script}" - ) - # Check deep nesting flattened correctly - assert 'result_level1_level2_level3_value' in results.columns - assert 'result_level1_level2_level3_squared' in results.columns - - # Check values - assert results['result_level1_level2_level3_value'].iloc[0] == 6 - assert results['result_level1_level2_level3_squared'].iloc[0] == 9 - assert results['result_level1_level2_level3_value'].iloc[1] == 10 - assert results['result_level1_level2_level3_squared'].iloc[1] == 25 + results = fz.fzr( + input_path="input.txt", + input_variables={"x": [3, 5]}, + model=model, + calculators=f"sh://python3 {calc_script}" + ) + + # Check deep nesting flattened correctly + assert 'result_level1_level2_level3_value' in results.columns + assert 'result_level1_level2_level3_squared' in results.columns + + # Check values + assert results['result_level1_level2_level3_value'].iloc[0] == 6 + assert results['result_level1_level2_level3_squared'].iloc[0] == 9 + assert results['result_level1_level2_level3_value'].iloc[1] == 10 + assert results['result_level1_level2_level3_squared'].iloc[1] == 25 + finally: + # Restore original directory to allow cleanup on Windows + os.chdir(original_cwd) def test_fzr_with_multiple_dict_outputs(self): """Test fzr with multiple dict-valued outputs""" with tempfile.TemporaryDirectory() as tmpdir: - os.chdir(tmpdir) + # Save original directory to avoid Windows file deletion issues + original_cwd = os.getcwd() + try: + os.chdir(tmpdir) - with open("input.txt", "w") as f: - f.write("x = ${x}\n") + with open("input.txt", "w") as f: + f.write("x = ${x}\n") - calc_script = Path(tmpdir) / "calc.py" - with open(calc_script, "w") as f: - f.write("""#!/usr/bin/env python3 + calc_script = Path(tmpdir) / "calc.py" + with open(calc_script, "w") as f: + f.write("""#!/usr/bin/env python3 import json with open('input.txt', 'r') as f: @@ -426,33 +450,36 @@ def test_fzr_with_multiple_dict_outputs(self): f.write(f"stats={json.dumps(stats)}\\n") f.write(f"meta={json.dumps(meta)}\\n") """) - os.chmod(calc_script, 0o755) - - model = { - "varprefix": "$", - "delim": "{}", - "output": { - "stats": "grep 'stats=' output.txt | cut -d'=' -f2", - "meta": "grep 'meta=' output.txt | cut -d'=' -f2" + os.chmod(calc_script, 0o755) + + model = { + "varprefix": "$", + "delim": "{}", + "output": { + "stats": "grep 'stats=' output.txt | cut -d'=' -f2", + "meta": "grep 'meta=' output.txt | cut -d'=' -f2" + } } - } - results = fz.fzr( - input_path="input.txt", - input_variables={"x": [5, 10]}, - model=model, - calculators=f"sh://python3 {calc_script}" - ) - - # Check both dicts flattened - assert 'stats_min' in results.columns - assert 'stats_max' in results.columns - assert 'meta_name' in results.columns - assert 'meta_id' in results.columns - - # Verify values - assert results['meta_name'].iloc[0] == 'case5' - assert results['meta_id'].iloc[0] == 500 + results = fz.fzr( + input_path="input.txt", + input_variables={"x": [5, 10]}, + model=model, + calculators=f"sh://python3 {calc_script}" + ) + + # Check both dicts flattened + assert 'stats_min' in results.columns + assert 'stats_max' in results.columns + assert 'meta_name' in results.columns + assert 'meta_id' in results.columns + + # Verify values + assert results['meta_name'].iloc[0] == 'case5' + assert results['meta_id'].iloc[0] == 500 + finally: + # Restore original directory to allow cleanup on Windows + os.chdir(original_cwd) class TestEdgeCases: