From ed5a1416c5b0ce397c95fe34f4d5889df6bdbf0f Mon Sep 17 00:00:00 2001 From: wangyukai Date: Fri, 5 Sep 2025 12:39:41 +0000 Subject: [PATCH] fix empty episodes issue --- internnav/evaluator/vln_multi_evaluator.py | 22 +-- internnav/evaluator/vln_pe_evaluator.py | 30 ++-- tests/function_test/e2e_test.py | 12 +- tests/function_test/test_challenge.py | 39 +++--- tests/function_test/test_evaluator.py | 151 +++++++++++++++++++++ tests/function_test/test_server.py | 22 +-- 6 files changed, 213 insertions(+), 63 deletions(-) create mode 100644 tests/function_test/test_evaluator.py diff --git a/internnav/evaluator/vln_multi_evaluator.py b/internnav/evaluator/vln_multi_evaluator.py index be469314..823524ec 100644 --- a/internnav/evaluator/vln_multi_evaluator.py +++ b/internnav/evaluator/vln_multi_evaluator.py @@ -1,7 +1,10 @@ +import sys from enum import Enum from pathlib import Path from time import time + import numpy as np + from internnav.configs.evaluator import EvalCfg from internnav.evaluator.base import Evaluator from internnav.evaluator.utils.common import set_seed_model @@ -28,22 +31,23 @@ def transform_action_batch(actions, flash=False): if 'ideal_flag' in action.keys(): ideal_flag = action['ideal_flag'] if flash: - assert ideal_flag == True + assert ideal_flag is True else: ideal_flag = False if not ideal_flag: transformed_actions.append({'h1': {'vln_dp_move_by_speed': action['action'][0]}}) continue a = action['action'] - if a == 0 or a == [0] or a==[[0]]: + if a == 0 or a == [0] or a == [[0]]: transformed_actions.append({'h1': {'stop': []}}) elif a == -1 or a == [-1] or a == [[-1]]: transformed_actions.append({'h1': {'stand_still': []}}) else: move = f"move_by_{'discrete' if not flash else 'flash'}" - transformed_actions.append({'h1': {move: a}}) # discrete e.g. [3] + transformed_actions.append({'h1': {move: a}}) # discrete e.g. [3] return transformed_actions + @Evaluator.register('vln_multi') class VlnMultiEvaluator(Evaluator): def __init__(self, config: EvalCfg): @@ -61,6 +65,9 @@ def __init__(self, config: EvalCfg): ) # generate episode episodes = generate_episode(self.dataloader, config) + if len(episodes) == 0: + log.info("No more episodes to evaluate") + sys.exit(0) config.task.task_settings.update({'episodes': episodes}) self.env_num = config.task.task_settings['env_num'] self.proc_num = ( @@ -88,7 +95,6 @@ def __init__(self, config: EvalCfg): self.data_collector = DataCollector(self.dataloader.lmdb_path) self.robot_flash = config.task.robot_flash - @property def ignore_obs_attr(self): return [ @@ -223,15 +229,11 @@ def terminate_ops(self, obs_ls, reset_infos, terminated_ls): log.info(f'env{reset_env_ids}: states switch to WARM UP.') # modify original reset_info reset_infos = np.array(reset_infos) - reset_infos[reset_env_ids] = ( - new_reset_infos if len(new_reset_infos) > 0 else None - ) + reset_infos[reset_env_ids] = new_reset_infos if len(new_reset_infos) > 0 else None self.runner_status[ np.vectorize(lambda x: x)(reset_infos) == None # noqa: E711 ] = runner_status_code.TERMINATED - log.info( - f'env{np.vectorize(lambda x: x)(reset_infos) == None}: states switch to TERMINATED.' - ) + log.info(f'env{np.vectorize(lambda x: x)(reset_infos) == None}: states switch to TERMINATED.') reset_infos = reset_infos.tolist() if np.logical_and.reduce(self.runner_status == runner_status_code.TERMINATED): diff --git a/internnav/evaluator/vln_pe_evaluator.py b/internnav/evaluator/vln_pe_evaluator.py index 61e4426f..468f49d9 100644 --- a/internnav/evaluator/vln_pe_evaluator.py +++ b/internnav/evaluator/vln_pe_evaluator.py @@ -1,10 +1,13 @@ +import sys from enum import Enum from pathlib import Path from time import time + import numpy as np + from internnav.configs.evaluator import EvalCfg from internnav.evaluator.base import Evaluator -from internnav.evaluator.utils.common import set_seed_model, obs_to_image +from internnav.evaluator.utils.common import set_seed_model from internnav.evaluator.utils.config import get_lmdb_path from internnav.evaluator.utils.data_collector import DataCollector from internnav.evaluator.utils.dataset import ResultLogger, split_data @@ -56,6 +59,9 @@ def __init__(self, config: EvalCfg): # generate episode episodes = generate_episode(self.dataloader, config) + if len(episodes) == 0: + log.info("No more episodes to evaluate. Episodes are saved in data/sample_episodes/") + sys.exit(0) config.task.task_settings.update({'episodes': episodes}) self.env_num = config.task.task_settings['env_num'] self.proc_num = ( @@ -211,7 +217,7 @@ def terminate_ops(self, obs_ls, reset_infos, terminated_ls): # need this status to reset reset_env_ids = np.where(self.runner_status == runner_status_code.NOT_RESET)[0].tolist() - + if len(reset_env_ids) > 0: log.debug(f'env{reset_env_ids}: start new episode!') obs, new_reset_infos = self.env.reset(reset_env_ids) @@ -225,9 +231,7 @@ def terminate_ops(self, obs_ls, reset_infos, terminated_ls): self.runner_status[ np.vectorize(lambda x: x)(reset_infos) == None # noqa: E711 ] = runner_status_code.TERMINATED - log.debug( - f'env{np.vectorize(lambda x: x)(reset_infos) == None}: states switch to TERMINATED.' - ) + log.debug(f'env{np.vectorize(lambda x: x)(reset_infos) == None}: states switch to TERMINATED.') reset_infos = reset_infos.tolist() if np.logical_and.reduce(self.runner_status == runner_status_code.TERMINATED): @@ -241,8 +245,7 @@ def terminate_ops(self, obs_ls, reset_infos, terminated_ls): ) if self.vis_output: self.visualize_util.trace_start( - trajectory_id=self.now_path_key(reset_info), - reference_path=reset_info.data['reference_path'] + trajectory_id=self.now_path_key(reset_info), reference_path=reset_info.data['reference_path'] ) return False, reset_infos @@ -258,8 +261,7 @@ def eval(self): ) if self.vis_output: self.visualize_util.trace_start( - trajectory_id=self.now_path_key(info), - reference_path=info.data['reference_path'] + trajectory_id=self.now_path_key(info), reference_path=info.data['reference_path'] ) log.info('start new episode!') @@ -281,18 +283,16 @@ def eval(self): env_term, reset_info = self.terminate_ops(obs, reset_info, terminated) if env_term: break - + # save step obs if self.vis_output: for ob, info, act in zip(obs, reset_info, action): - if info is None or not 'rgb' in ob or ob['fail_reason']: + if info is None or 'rgb' not in ob or ob['fail_reason']: continue self.visualize_util.save_observation( - trajectory_id=self.now_path_key(info), - obs=ob, - action=act[self.robot_name] + trajectory_id=self.now_path_key(info), obs=ob, action=act[self.robot_name] ) - + self.env.close() progress_log_multi_util.report() diff --git a/tests/function_test/e2e_test.py b/tests/function_test/e2e_test.py index 67b48b8d..61074b3d 100644 --- a/tests/function_test/e2e_test.py +++ b/tests/function_test/e2e_test.py @@ -54,13 +54,13 @@ def test_server(): common_body(start_command) -@pytest.mark.gpu -def test_challenge(): - start_command = 'python ./tests/function_test/test_challenge.py' +@pytest.mark.ray +def test_evaluator(): + start_command = 'python ./tests/function_test/test_evaluator.py' common_body(start_command) -@pytest.mark.ray -def test_challenge_ray(): - start_command = 'python ./tests/function_test/test_challenge_ray.py' +@pytest.mark.gpu +def test_challenge(): + start_command = 'python ./tests/function_test/test_challenge.py' common_body(start_command) diff --git a/tests/function_test/test_challenge.py b/tests/function_test/test_challenge.py index ab492a53..3d6b4bad 100644 --- a/tests/function_test/test_challenge.py +++ b/tests/function_test/test_challenge.py @@ -5,11 +5,11 @@ ''' import importlib.util -import subprocess import sys import time import numpy as np +from test_server import start_server, stop_server from internnav.configs.evaluator.default_config import get_config from internnav.evaluator import Evaluator @@ -66,39 +66,32 @@ def load_eval_cfg(config_path, attr_name='eval_cfg'): evaluator.env.close() -def start_server(): - server_cmd = [ - sys.executable, - "internnav/agent/utils/server.py", - "--config", - "scripts/eval/configs/challenge_cfg.py", - ] +def start_evaluator(): + from multiprocessing import get_context - proc = subprocess.Popen( - server_cmd, - stdout=None, - stderr=None, - ) - return proc + ctx = get_context("spawn") # Use 'spawn' to avoid issues on some platforms + p = ctx.Process(target=main) + p.start() + p.join() + assert p.exitcode == 0 + print("Evaluator process completed successfully.") if __name__ == '__main__': try: proc = start_server() time.sleep(3) - main() + start_evaluator() + except Exception as e: print(f'exception is {e}') import traceback traceback.print_exc() sys.exit(1) + + except SystemExit as e: + print(f"Caught SystemExit from env.close(): code={e.code}", flush=True) + finally: - if proc and proc.poll() is None: - print("Shutting down server...") - proc.terminate() - try: - proc.wait(timeout=10) - except subprocess.TimeoutExpired: - print("Force killing server...") - proc.kill() + stop_server(proc) diff --git a/tests/function_test/test_evaluator.py b/tests/function_test/test_evaluator.py new file mode 100644 index 00000000..8aab08c3 --- /dev/null +++ b/tests/function_test/test_evaluator.py @@ -0,0 +1,151 @@ +""" +Test for VLN evaluators. + +1. create temp one episode test split. +2. run and finish with ray (env_num=2, proc_num=8). +3. rerun to test the "no more episodes" case. +4. delete the sample_episodes after testing. +""" + +''' +Test the evaluator eval logic with ray, set proc_num = 4. +The main progress: + Init => warm up => one action +''' + +import shutil +import sys +import time +from enum import Enum +from pathlib import Path + +from test_server import start_server, stop_server + +from internnav.configs.agent import AgentCfg +from internnav.configs.evaluator import ( + EnvCfg, + EvalCfg, + EvalDatasetCfg, + SceneCfg, + TaskCfg, +) +from internnav.configs.evaluator.default_config import get_config +from internnav.evaluator import Evaluator + +eval_cfg = EvalCfg( + agent=AgentCfg( + server_port=8087, + model_name='rdp', + ckpt_path='checkpoints/r2r/fine_tuned/rdp', + model_settings={}, + ), + env=EnvCfg( + env_type='vln_pe', + env_settings={ + 'use_fabric': False, + 'headless': True, # display option: set to False will open isaac-sim interactive window + }, + ), + task=TaskCfg( + task_name='test_evaluation', + task_settings={ + 'env_num': 2, + 'use_distributed': True, # Ray distributed framework + 'proc_num': 8, + }, + scene=SceneCfg( + scene_type='mp3d', + scene_data_dir='data/scene_data/mp3d_pe', + ), + robot_name='h1', + robot_usd_path='data/Embodiments/vln-pe/h1/h1_vln_pointcloud.usd', + camera_resolution=[256, 256], # (W,H) + camera_prim_path='torso_link/h1_pano_camera_0', + ), + dataset=EvalDatasetCfg( + dataset_type="mp3d", + dataset_settings={ + 'base_data_dir': 'data/vln_pe/raw_data/r2r', + 'split_data_types': ['function_test'], + 'filter_stairs': False, + }, + ), + eval_settings={'save_to_json': False, 'vis_output': False}, # save result to video under logs/ +) + + +class runner_status_code(Enum): + NORMAL = 0 + WARM_UP = 1 + NOT_RESET = 3 + TERMINATED = 2 + STOP = 4 + + +def _safe_remove(p: Path): + try: + if p.is_symlink() or p.is_file(): + p.unlink(missing_ok=True) + elif p.is_dir(): + shutil.rmtree(p, ignore_errors=True) + except Exception: + # Swallow errors to keep tests robust; log if you prefer + pass + + +def cleanup_task_dirs(eval_cfg): + """ + Remove data/sample_episodes/ and logs/ + before the test, and again after the test (teardown). + """ + task_name = eval_cfg.task.task_name + + data_task_dir = Path("data") / "sample_episodes" / task_name + logs_task_dir = Path("logs") / task_name + print(f'"data": {data_task_dir}, "logs": {logs_task_dir}') + + # pre-clean + _safe_remove(data_task_dir) + _safe_remove(logs_task_dir) + print("Cleaned up test directories.") + + +def main(): + cfg = get_config(eval_cfg) + evaluator = Evaluator.init(cfg) + evaluator.eval() + + +def start_evaluator(): + from multiprocessing import get_context + + ctx = get_context("spawn") # Use 'spawn' to avoid issues on some platforms + p = ctx.Process(target=main) + p.start() + p.join() + assert p.exitcode == 0 + print("Evaluator process completed successfully.") + + +if __name__ == '__main__': + try: + proc = start_server() + time.sleep(3) + start_evaluator() + start_evaluator() + + except Exception as e: + print(f'exception is {e}') + import traceback + + traceback.print_exc() + sys.exit(1) + + except SystemExit as e: + print(f"Caught SystemExit from env.close(): code={e.code}", flush=True) + + finally: + # shut down server + stop_server(proc) + # clean up task dirs + cleanup_task_dirs(eval_cfg) diff --git a/tests/function_test/test_server.py b/tests/function_test/test_server.py index 5bdfcee7..f279cfd4 100644 --- a/tests/function_test/test_server.py +++ b/tests/function_test/test_server.py @@ -10,8 +10,6 @@ def start_server(): server_cmd = [ sys.executable, "internnav/agent/utils/server.py", - "--config", - "scripts/eval/configs/challenge_cfg.py", ] proc = subprocess.Popen( @@ -23,6 +21,18 @@ def start_server(): return proc +def stop_server(proc): + if proc and proc.poll() is None: + print("Shutting down server...") + proc.terminate() + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + raise RuntimeError("❌ Server failed to shut down within 10 seconds.") + + if __name__ == '__main__': try: proc = start_server() @@ -33,13 +43,7 @@ def start_server(): raise RuntimeError(f"❌ Server exited too early with code {proc.returncode}") print("✅ Server is still alive after 5 seconds.") - if proc and proc.poll() is None: - print("Shutting down server...") - proc.terminate() - try: - proc.wait(timeout=10) - except subprocess.TimeoutExpired: - raise RuntimeError("❌ Server failed to shut down within 10 seconds.") + stop_server(proc) except Exception as e: print(f'exception is {e}')