From f49e0434dc74c21429877747bf5d51cb737c79b5 Mon Sep 17 00:00:00 2001 From: Amitayush Thakur Date: Sat, 10 Jan 2026 17:10:47 +0000 Subject: [PATCH 1/7] Add support for starting tactic sequences in SimpleLean4SyncExecutor --- .../lean/simple_lean4_sync_executor.py | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/itp_interface/lean/simple_lean4_sync_executor.py b/src/itp_interface/lean/simple_lean4_sync_executor.py index 8590875..2e29403 100644 --- a/src/itp_interface/lean/simple_lean4_sync_executor.py +++ b/src/itp_interface/lean/simple_lean4_sync_executor.py @@ -36,6 +36,8 @@ class SimpleLean4SyncExecutor: have_regex = r"(^\s*have\s+([^:]*):([\s|\S]*?))(:=\s*by)([\s|\S]*)" have_match = re.compile(have_regex, re.MULTILINE) theorem_has_by_match = re.compile(theorem_has_by, re.MULTILINE) + ends_with_by_sorry = r":=[\s]*by\s+sorry[\s]*$" + ends_with_by_sorry_match = re.compile(ends_with_by_sorry, re.MULTILINE) unsolved_message = "unsolved goals" no_goals = "No goals to be solved" no_goals_alternative = "no goals to be solved" @@ -56,7 +58,8 @@ def __init__(self, enable_search: bool = False, keep_local_context: bool = False, enforce_qed: bool = False, - logger: Optional[logging.Logger] = None): + logger: Optional[logging.Logger] = None, + starting_tactic_sequence: Optional[List[str]] = None): assert proof_step_iter is None or isinstance(proof_step_iter, ClonableIterator), \ "proof_step_iter must be an iterator" assert main_file is not None or proof_step_iter is not None, \ @@ -121,6 +124,7 @@ def __init__(self, self._last_modified_tactic : str | None = None self._recursion_depth = 0 self.max_threshold_for_tactic_length = 575 # Max 575 characters for a tactic + self._starting_tactic_sequence = ['by'] if starting_tactic_sequence is None or len(starting_tactic_sequence) == 0 else starting_tactic_sequence if self._enable_search: pass pass @@ -670,7 +674,11 @@ def _skip_to_theorem(self, theorem: str): self._content_till_last_theorem_stmt = "\n".join(self._lines_executed) assert not theorem_text.endswith(':='), "Theorem text should not end with ':='" - theorem_text = theorem_text + " :=" + if SimpleLean4SyncExecutor.ends_with_by_sorry_match.search(theorem_text): + # Remove the ':= by sorry' part + theorem_text = SimpleLean4SyncExecutor.ends_with_by_sorry_match.sub('', theorem_text).strip() + " :=" + else: + theorem_text = theorem_text + " :=" content_until_after_theorem = "\n".join(self._lines_executed) + "\n" + theorem_text self._content_till_after_theorem_stmt = content_until_after_theorem.strip() assert self._content_till_after_theorem_stmt.endswith(':='), "Content till last theorem statement should not end with ':='" @@ -693,8 +701,9 @@ def _skip_to_theorem(self, theorem: str): self._last_theorem = (given_theorem_name, theorem_text, theorem_text) self._theorem_started = True self._lines_executed.extend(lines_in_theorem_text) - self._run_stmt_on_lean_server(tactic_start_line, "by", theorem_started=True) - self._lines_executed.append('by') + starting_tactics = '\n'.join(self._starting_tactic_sequence) + self._run_stmt_on_lean_server(tactic_start_line, starting_tactics, theorem_started=True) + self._lines_executed.append(starting_tactics) # Reset the iterator to the line of the theorem if lines[tactic_start_line - 1].strip().endswith("by"): self.main_file_iter.set_to_index(tactic_start_line) @@ -1026,11 +1035,21 @@ def get_theorem_name_resembling(file_path: str, theorem_name: str, use_cache: bo return json.dumps(dict_thm) raise ValueError(f"The theorem '{theorem_name}' was not found in the file '{file_path}'") -def execute_thm_line_by_line(file_path: str, project_root: str, theorem_name: str, logger: logging.Logger, with_print: bool=False): +def execute_thm_line_by_line(file_path: str, project_root: str, theorem_name: str, logger: logging.Logger, with_print: bool=False, starting_tactic_sequence: List[str]=[]) -> None: pprint = lambda msg: print(msg) if with_print else None - with SimpleLean4SyncExecutor(main_file=file_path, project_root=project_root, logger=logger) as executor: + with SimpleLean4SyncExecutor(main_file=file_path, project_root=project_root, logger=logger, starting_tactic_sequence=starting_tactic_sequence) as executor: executor.set_run_exactly() executor._skip_to_theorem(theorem_name) + if len(starting_tactic_sequence) > 0: + # It can happen that the starting tactic sequence already finishes the proof + proof_context_empty = executor.proof_context is None or executor.proof_context == ProofContext.empty() + no_error_messages = len(executor.lean_error_messages) == 0 + if proof_context_empty and no_error_messages: + pprint("Proof finished with starting tactic sequence") + return + if len(executor.lean_error_messages) > 0: + pprint(f"Error messages after starting tactic sequence:\n{executor.lean_error_messages}") + return assert executor.proof_context is not None, "Proof context should be present" proof_exec = False while not executor.execution_complete: From 9b68c3bd62f6b5a8d3f3e16e788512d89c734774 Mon Sep 17 00:00:00 2001 From: Amitayush Thakur Date: Sat, 10 Jan 2026 17:37:51 +0000 Subject: [PATCH 2/7] Add FULL_BACKTRACK action type and implement full backtracking logic in ProofEnv --- src/itp_interface/agent/simple_proof_agent.py | 3 +- src/itp_interface/rl/proof_action.py | 3 ++ src/itp_interface/rl/simple_proof_env.py | 33 +++++++++++++++++++ .../tools/dynamic_lean4_proof_exec.py | 22 ++++++++++--- .../tools/proof_exec_callback.py | 9 ++--- src/test/test_simple_proof_env.py | 5 ++- src/test/test_simple_proof_env_pool.py | 5 ++- src/test/test_simple_proof_env_ray.py | 5 ++- 8 files changed, 73 insertions(+), 12 deletions(-) diff --git a/src/itp_interface/agent/simple_proof_agent.py b/src/itp_interface/agent/simple_proof_agent.py index eb19218..9e571b1 100644 --- a/src/itp_interface/agent/simple_proof_agent.py +++ b/src/itp_interface/agent/simple_proof_agent.py @@ -80,7 +80,8 @@ def _run_episode_as_per_policy(self, self.logger.info("**"*20) env.render() self.logger.info("**"*20) - if action.action_type != ProofAction.ActionType.BACKTRACK: + if action.action_type != ProofAction.ActionType.BACKTRACK and \ + action.action_type != ProofAction.ActionType.FULL_BACKTRACK: # Don't update policy for backtracking actions, this will create a # a very nasty loop in the policy. self.logger.info("Updating policy") diff --git a/src/itp_interface/rl/proof_action.py b/src/itp_interface/rl/proof_action.py index 23971c6..4ec3605 100644 --- a/src/itp_interface/rl/proof_action.py +++ b/src/itp_interface/rl/proof_action.py @@ -30,12 +30,15 @@ class ActionType(Enum): GET_DFNS_THMS = 'GET_DFNS_THMS' RUN_TACTIC = 'RUN_TACTIC' BACKTRACK = 'BACKTRACK' + FULL_BACKTRACK = 'FULL_BACKTRACK' EXIT = 'EXIT' NONE = 'NONE' @staticmethod def get_order(action_type: 'ProofAction.ActionType'): if action_type == ProofAction.ActionType.EXIT: + return 8 + elif action_type == ProofAction.ActionType.FULL_BACKTRACK: return 7 elif action_type == ProofAction.ActionType.BACKTRACK: return 6 diff --git a/src/itp_interface/rl/simple_proof_env.py b/src/itp_interface/rl/simple_proof_env.py index 5b54ded..578c006 100644 --- a/src/itp_interface/rl/simple_proof_env.py +++ b/src/itp_interface/rl/simple_proof_env.py @@ -225,6 +225,8 @@ def step(self, action: Action) -> typing.Tuple[State, Action, State, float, bool self._get_dfns_thms(history_idx) elif action.action_type == ProofAction.ActionType.BACKTRACK: self._backtrack(history_idx) + elif action.action_type == ProofAction.ActionType.FULL_BACKTRACK: + self._full_backtrack(history_idx) else: raise NotImplementedError(f"Action type {action.action_type} not implemented") self.inferences_used += 1 @@ -558,6 +560,37 @@ def _backtrack(self, history_idx: int = None): current_proof_state = self.state done = self.done self._history[history_idx] = (state, action, current_proof_state, reward, done, env_info) + + def _full_backtrack(self, history_idx: int = None): + assert self._loaded, "Env not loaded, call reset() first" + history_idx = len(self._history) - 1 if history_idx is None else history_idx + state, action, current_proof_state, reward, done, env_info = self._history[history_idx] + assert action.action_type == ProofAction.ActionType.FULL_BACKTRACK, "Action must be of type FULL_BACKTRACK" + assert isinstance(self._dynamic_proof_executor, DynamicLean4ProofExecutor), "Full backtrack is only implemented for Lean 4" + try: + self._dynamic_proof_executor.cancel_all_tactics() + self.current_proof_depth = 0 + self._p_tree = ProofTree() + env_info.progress = ProgressState.STATE_CHANGED + env_info.error_message = "Full backtrack successful" + reward = 0.0 + except Exception: + self.logger.exception("Exception occured while full backtracking") + history = copy.deepcopy(self._history) + self.reset() # To ensure that everything is fine we start again + # Run all the current steps in the proof tree + self.logger + for _tactic_idx, (_, tactic) in enumerate(self._p_tree.tactics): + _action = self._p_tree.actions[_tactic_idx] + self._run_tactics(tactic.proof_steps, self.state, _action, ProofEnvInfo(progress=ProgressState.STARTING)) + # No need to capture in history as the history is already captured + self._history = history + env_info.progress = ProgressState.FAILED + env_info.error_message = "Full backtrack failed, resetting the environment and running all the tactics again" + self.logger.warning("Full backtrack failed, resetting the environment and running all the tactics again") + current_proof_state = self.state + done = self.done + self._history[history_idx] = (state, action, current_proof_state, reward, done, env_info) def _foward_to_lemma_proof(self): assert self._loaded, "Env not loaded, call reset() first" diff --git a/src/itp_interface/tools/dynamic_lean4_proof_exec.py b/src/itp_interface/tools/dynamic_lean4_proof_exec.py index 17ab8d4..bb7eda2 100644 --- a/src/itp_interface/tools/dynamic_lean4_proof_exec.py +++ b/src/itp_interface/tools/dynamic_lean4_proof_exec.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -import sys import typing import os import copy @@ -48,7 +47,12 @@ def goal_description_compare(description1: str, descripton2: str) -> int: else: return -1 - def __init__(self, coq_context_helper: Lean3ContextHelper, project_folder: str = None, proof_file: str = None, instruction_iter: typing.Optional[str] = None, use_hammer: typing.Union[bool, HammerMode] = False, timeout_in_seconds: int = 60, use_human_readable_proof_context: bool = True, suppress_error_log: bool = True, context_type: ContextType = ContextType.NoContext, keep_local_context = False, enforce_qed: bool = False): + def __init__(self, + coq_context_helper: Lean3ContextHelper, project_folder: str = None, proof_file: str = None, + instruction_iter: typing.Optional[str] = None, use_hammer: typing.Union[bool, HammerMode] = False, + timeout_in_seconds: int = 60, use_human_readable_proof_context: bool = True, suppress_error_log: bool = True, + context_type: ContextType = ContextType.NoContext, keep_local_context = False, enforce_qed: bool = False, + starting_tactic_sequence: typing.Optional[typing.List[str]] = None): assert proof_file is None or os.path.exists(proof_file), f"Proof file {proof_file} does not exist" assert coq_context_helper is not None, "coq_context_helper must not be None" self.proof_file = proof_file @@ -58,7 +62,11 @@ def __init__(self, coq_context_helper: Lean3ContextHelper, project_folder: str = self.run_state = DynamicProofExecutor.RunState() self.logger = None self.lean_context_helper = coq_context_helper - super().__init__(project_root=project_folder, proof_step_iter=self.tactic_switch_iterator, use_hammer=use_hammer, timeout_in_sec=timeout_in_seconds, use_human_readable_proof_context=use_human_readable_proof_context, suppress_error_log=suppress_error_log, keep_local_context=keep_local_context, enforce_qed=enforce_qed) + super().__init__(project_root=project_folder, proof_step_iter=self.tactic_switch_iterator, + use_hammer=use_hammer, timeout_in_sec=timeout_in_seconds, + use_human_readable_proof_context=use_human_readable_proof_context, suppress_error_log=suppress_error_log, + keep_local_context=keep_local_context, enforce_qed=enforce_qed, + starting_tactic_sequence=starting_tactic_sequence) def __enter__(self): self.lean_context_helper.__enter__() @@ -174,6 +182,9 @@ def cancel_tactic_till_line(self, tactic_line_num: int, no_backtracking: bool = state_num = self.run_state.line_tactic_map[tactic_line_num] self.run_state.tactics_ran = self.run_state.tactics_ran[:state_num] line_tactic_map_keys = list(self.run_state.line_tactic_map.keys()) + lowest_line_num_gt_eq_tactic_line = min([ln for ln in line_tactic_map_keys if ln >= tactic_line_num], default=None) + assert lowest_line_num_gt_eq_tactic_line is not None, "There must be at least one line number >= tactic_line_num" + tactic_line_num = lowest_line_num_gt_eq_tactic_line for line_num in line_tactic_map_keys: if line_num >= tactic_line_num: del self.run_state.line_tactic_map[line_num] @@ -186,4 +197,7 @@ def cancel_tactic_till_line(self, tactic_line_num: int, no_backtracking: bool = for line_num in line_proof_context_map_keys: if line_num >= tactic_line_num: del self.run_state.line_proof_context_map[line_num] - return cancelled_some_tactics \ No newline at end of file + return cancelled_some_tactics + + def cancel_all_tactics(self): + self.cancel_tactic_till_line(0) \ No newline at end of file diff --git a/src/itp_interface/tools/proof_exec_callback.py b/src/itp_interface/tools/proof_exec_callback.py index 9c3b031..e40bbf5 100644 --- a/src/itp_interface/tools/proof_exec_callback.py +++ b/src/itp_interface/tools/proof_exec_callback.py @@ -40,7 +40,8 @@ def __init__(self, setup_cmds: typing.List[str] = [], port: typing.Optional[int] = None, enable_search: bool = True, - enforce_qed: bool = False): + enforce_qed: bool = False, + starting_tactic_sequence: typing.Optional[typing.List[str]] = None): self.project_folder = project_folder self.file_path = file_path self.language = language @@ -57,7 +58,7 @@ def __init__(self, self.port = port self.enable_search = enable_search self.enforce_qed = enforce_qed - pass + self.starting_tactic_sequence = starting_tactic_sequence def get_proof_executor(self) -> typing.Union[DynamicCoqProofExecutor, DynamicLeanProofExecutor, DynamicLean4ProofExecutor, DynamicIsabelleProofExecutor]: if self.language == ProofAction.Language.COQ: @@ -69,9 +70,9 @@ def get_proof_executor(self) -> typing.Union[DynamicCoqProofExecutor, DynamicLea lean_context_helper = Lean3ContextHelper(search_exec, self.search_depth, logger=self.logger) return DynamicLeanProofExecutor(lean_context_helper, self.project_folder, self.file_path, context_type=DynamicLeanProofExecutor.ContextType.NoContext, use_hammer=self.use_hammer, timeout_in_seconds=self.timeout_in_secs, suppress_error_log=self.suppress_error_log, use_human_readable_proof_context=self.use_human_readable_proof_context, keep_local_context=self.keep_local_context) elif self.language == ProofAction.Language.LEAN4: - search_exec = SimpleLean4SyncExecutor(self.project_folder, self.prefix, self.file_path, use_hammer=self.use_hammer, timeout_in_sec=self.timeout_in_secs, suppress_error_log=self.suppress_error_log, use_human_readable_proof_context=self.use_human_readable_proof_context, enable_search=self.always_use_retrieval, keep_local_context=self.keep_local_context, enforce_qed=self.enforce_qed) + search_exec = SimpleLean4SyncExecutor(self.project_folder, self.prefix, self.file_path, use_hammer=self.use_hammer, timeout_in_sec=self.timeout_in_secs, suppress_error_log=self.suppress_error_log, use_human_readable_proof_context=self.use_human_readable_proof_context, enable_search=self.always_use_retrieval, keep_local_context=self.keep_local_context, enforce_qed=self.enforce_qed, starting_tactic_sequence=self.starting_tactic_sequence) lean4_context_helper = Lean4ContextHelper(search_exec, self.search_depth, logger=self.logger) - return DynamicLean4ProofExecutor(lean4_context_helper, self.project_folder, self.file_path, context_type=DynamicLeanProofExecutor.ContextType.NoContext, use_hammer=self.use_hammer, timeout_in_seconds=self.timeout_in_secs, suppress_error_log=self.suppress_error_log, use_human_readable_proof_context=self.use_human_readable_proof_context, keep_local_context=self.keep_local_context, enforce_qed=self.enforce_qed) + return DynamicLean4ProofExecutor(lean4_context_helper, self.project_folder, self.file_path, context_type=DynamicLeanProofExecutor.ContextType.NoContext, use_hammer=self.use_hammer, timeout_in_seconds=self.timeout_in_secs, suppress_error_log=self.suppress_error_log, use_human_readable_proof_context=self.use_human_readable_proof_context, keep_local_context=self.keep_local_context, enforce_qed=self.enforce_qed, starting_tactic_sequence=self.starting_tactic_sequence) elif self.language == ProofAction.Language.ISABELLE: search_exec = IsabelleExecutor(self.project_folder, self.file_path, use_hammer=self.use_hammer, timeout_in_sec=self.timeout_in_secs, suppress_error_log=self.suppress_error_log, use_human_readable_proof_context=self.use_human_readable_proof_context, port=self.port) isabelle_context_helper = IsabelleContextHelper(search_exec, self.search_depth, logger=self.logger) diff --git a/src/test/test_simple_proof_env.py b/src/test/test_simple_proof_env.py index 4662ad6..0ecace7 100644 --- a/src/test/test_simple_proof_env.py +++ b/src/test/test_simple_proof_env.py @@ -17,7 +17,10 @@ def scan_action(language, supported_actions): inp = input("Enter tactic(s) (';' separated): ") inp = inp.split(';') return ProofAction(action_type, language, tactics=inp) - elif action_type == ProofAction.ActionType.GET_DFNS_THMS or action_type == ProofAction.ActionType.BACKTRACK or action_type == ProofAction.ActionType.EXIT: + elif action_type == ProofAction.ActionType.GET_DFNS_THMS or \ + action_type == ProofAction.ActionType.BACKTRACK or \ + action_type == ProofAction.ActionType.FULL_BACKTRACK or \ + action_type == ProofAction.ActionType.EXIT: return ProofAction(action_type, language) else: raise Exception(f"Invalid action type {action_type}") diff --git a/src/test/test_simple_proof_env_pool.py b/src/test/test_simple_proof_env_pool.py index 2eb6587..ea5ed39 100644 --- a/src/test/test_simple_proof_env_pool.py +++ b/src/test/test_simple_proof_env_pool.py @@ -23,7 +23,10 @@ def scan_action(language, supported_actions): inp = input("Enter tactic(s) (';' separated): ") inp = inp.split(';') return ProofAction(action_type, language, tactics=inp) - elif action_type == ProofAction.ActionType.GET_DFNS_THMS or action_type == ProofAction.ActionType.BACKTRACK or action_type == ProofAction.ActionType.EXIT: + elif action_type == ProofAction.ActionType.GET_DFNS_THMS or \ + action_type == ProofAction.ActionType.BACKTRACK or \ + action_type == ProofAction.ActionType.FULL_BACKTRACK or \ + action_type == ProofAction.ActionType.EXIT: return ProofAction(action_type, language) else: raise Exception(f"Invalid action type {action_type}") diff --git a/src/test/test_simple_proof_env_ray.py b/src/test/test_simple_proof_env_ray.py index edfd9f6..aed321a 100644 --- a/src/test/test_simple_proof_env_ray.py +++ b/src/test/test_simple_proof_env_ray.py @@ -22,7 +22,10 @@ def scan_action(language, supported_actions): inp = input("Enter tactic(s) (';' separated): ") inp = inp.split(';') return ProofAction(action_type, language, tactics=inp) - elif action_type == ProofAction.ActionType.GET_DFNS_THMS or action_type == ProofAction.ActionType.BACKTRACK or action_type == ProofAction.ActionType.EXIT: + elif action_type == ProofAction.ActionType.GET_DFNS_THMS or \ + action_type == ProofAction.ActionType.BACKTRACK or \ + action_type == ProofAction.ActionType.FULL_BACKTRACK or \ + action_type == ProofAction.ActionType.EXIT: return ProofAction(action_type, language) else: raise Exception(f"Invalid action type {action_type}") From 1dc1615a8e91027f980b0db36d3e8ce51922d540 Mon Sep 17 00:00:00 2001 From: Amitayush Thakur Date: Sat, 10 Jan 2026 17:57:35 +0000 Subject: [PATCH 3/7] Refactor tactic handling in DynamicProofExecutor and update test to use get_theorem_name_resembling --- src/itp_interface/tools/dynamic_lean4_proof_exec.py | 4 ++-- src/test/test_simple_proof_env.py | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/itp_interface/tools/dynamic_lean4_proof_exec.py b/src/itp_interface/tools/dynamic_lean4_proof_exec.py index bb7eda2..5f91aba 100644 --- a/src/itp_interface/tools/dynamic_lean4_proof_exec.py +++ b/src/itp_interface/tools/dynamic_lean4_proof_exec.py @@ -179,12 +179,12 @@ def cancel_tactic_till_line(self, tactic_line_num: int, no_backtracking: bool = assert tactic_line_num >= 0, "tactic_line_num must be >= 0" cancelled_some_tactics = False if tactic_line_num < self.line_num: - state_num = self.run_state.line_tactic_map[tactic_line_num] - self.run_state.tactics_ran = self.run_state.tactics_ran[:state_num] line_tactic_map_keys = list(self.run_state.line_tactic_map.keys()) lowest_line_num_gt_eq_tactic_line = min([ln for ln in line_tactic_map_keys if ln >= tactic_line_num], default=None) assert lowest_line_num_gt_eq_tactic_line is not None, "There must be at least one line number >= tactic_line_num" tactic_line_num = lowest_line_num_gt_eq_tactic_line + state_num = self.run_state.line_tactic_map[tactic_line_num] + self.run_state.tactics_ran = self.run_state.tactics_ran[:state_num] for line_num in line_tactic_map_keys: if line_num >= tactic_line_num: del self.run_state.line_tactic_map[line_num] diff --git a/src/test/test_simple_proof_env.py b/src/test/test_simple_proof_env.py index 0ecace7..7c4b791 100644 --- a/src/test/test_simple_proof_env.py +++ b/src/test/test_simple_proof_env.py @@ -2,6 +2,7 @@ import sys import logging +from itp_interface.lean.simple_lean4_sync_executor import get_theorem_name_resembling from itp_interface.rl.simple_proof_env import ProofEnv, ProofEnvReRankStrategy from itp_interface.rl.proof_action import ProofAction from itp_interface.tools.proof_exec_callback import ProofExecutorCallback @@ -56,14 +57,15 @@ def main(): always_retrieve_thms = True retrieval_strategy = ProofEnvReRankStrategy.BM25 elif inp == 'lean4': + file_path = "src/data/test/lean4_proj/Lean4Proj/Basic.lean" proof_exec_callback = ProofExecutorCallback( project_folder="src/data/test/lean4_proj", - file_path="src/data/test/lean4_proj/Lean4Proj/Basic.lean", + file_path=file_path, language=ProofAction.Language.LEAN4, always_use_retrieval=False, keep_local_context=True ) - theorem_name = "test3" + theorem_name = get_theorem_name_resembling(file_path, "test3") language = ProofAction.Language.LEAN4 always_retrieve_thms = False retrieval_strategy = ProofEnvReRankStrategy.NO_RE_RANK From 78ed4a3024cc8ceda64cb1b9a883e552f792ed49 Mon Sep 17 00:00:00 2001 From: Amitayush Thakur Date: Sat, 10 Jan 2026 18:45:03 +0000 Subject: [PATCH 4/7] Implement pretty_print_proof_so_far method in SimpleLean4SyncExecutor and ProofEnv; update tests to utilize new functionality --- src/itp_interface/lean/simple_lean4_sync_executor.py | 11 ++++++++--- src/itp_interface/rl/simple_proof_env.py | 5 +++++ src/itp_interface/tools/dynamic_lean4_proof_exec.py | 4 +++- src/test/test_simple_proof_env.py | 6 +++++- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/itp_interface/lean/simple_lean4_sync_executor.py b/src/itp_interface/lean/simple_lean4_sync_executor.py index 2e29403..e6fc894 100644 --- a/src/itp_interface/lean/simple_lean4_sync_executor.py +++ b/src/itp_interface/lean/simple_lean4_sync_executor.py @@ -295,14 +295,20 @@ def _tactic_preprocessing(self, stmt: str, baseline_indent: int = 0) -> str: stmt = (" " * (indentation_needed - actual_indent)) + stmt.lstrip() return stmt - def _get_lean_code_with_tactics(self, idx: int, stmt: str): + def pretty_print_proof_so_far(self) -> str: assert self._last_theorem is not None, "Last theorem should not be None" - self._add_last_tactic(idx, stmt) tactics_so_far = self._get_tactics_so_far() + if len(tactics_so_far) == 0: + tactics_so_far = self.possible_proof_tactics assert len(tactics_so_far) > 0, "There should be at least one tactic so far" _ , _, theorem_stmt = self._last_theorem return theorem_stmt + "\n" + tactics_so_far + "\n" + def _get_lean_code_with_tactics(self, idx: int, stmt: str): + assert self._last_theorem is not None, "Last theorem should not be None" + self._add_last_tactic(idx, stmt) + return self.pretty_print_proof_so_far() + def _backtrack_tactic_line(self, idx: int): # identify the keys to remove self._lines_executed = self._lines_executed[:idx] @@ -784,7 +790,6 @@ def validate_proof(self, timeout_sec: int = 30, keep_temp_file: bool = True) -> # Build the complete Lean code with actual proof tactics # The proof tactics are accumulated in self.possible_proof_tactics - actual_proof = "" # Track the actual proof for sorry checking proof_tactics_source = self.possible_proof_tactics # If possible_proof_tactics is empty, try to use _last_tactics as fallback diff --git a/src/itp_interface/rl/simple_proof_env.py b/src/itp_interface/rl/simple_proof_env.py index 578c006..3a99e49 100644 --- a/src/itp_interface/rl/simple_proof_env.py +++ b/src/itp_interface/rl/simple_proof_env.py @@ -243,6 +243,11 @@ def validate_proof_completion(self, timeout_in_secs = 120, keep_validation_file ) else: return {} + + def pretty_print_proof_so_far(self) -> str: + assert self._loaded, "Env not loaded, call reset() first" + assert isinstance(self._dynamic_proof_executor, DynamicLean4ProofExecutor), "Dynamic proof executor must be of type DynamicLean4ProofExecutor" + return self._dynamic_proof_executor.pretty_print_proof_so_far() def checkpoint(self): return super().checkpoint() diff --git a/src/itp_interface/tools/dynamic_lean4_proof_exec.py b/src/itp_interface/tools/dynamic_lean4_proof_exec.py index 5f91aba..a1d2d18 100644 --- a/src/itp_interface/tools/dynamic_lean4_proof_exec.py +++ b/src/itp_interface/tools/dynamic_lean4_proof_exec.py @@ -181,7 +181,9 @@ def cancel_tactic_till_line(self, tactic_line_num: int, no_backtracking: bool = if tactic_line_num < self.line_num: line_tactic_map_keys = list(self.run_state.line_tactic_map.keys()) lowest_line_num_gt_eq_tactic_line = min([ln for ln in line_tactic_map_keys if ln >= tactic_line_num], default=None) - assert lowest_line_num_gt_eq_tactic_line is not None, "There must be at least one line number >= tactic_line_num" + if lowest_line_num_gt_eq_tactic_line is None: + # Nothing to backtrack + return False tactic_line_num = lowest_line_num_gt_eq_tactic_line state_num = self.run_state.line_tactic_map[tactic_line_num] self.run_state.tactics_ran = self.run_state.tactics_ran[:state_num] diff --git a/src/test/test_simple_proof_env.py b/src/test/test_simple_proof_env.py index 7c4b791..da290d0 100644 --- a/src/test/test_simple_proof_env.py +++ b/src/test/test_simple_proof_env.py @@ -63,7 +63,8 @@ def main(): file_path=file_path, language=ProofAction.Language.LEAN4, always_use_retrieval=False, - keep_local_context=True + keep_local_context=True, + starting_tactic_sequence=["by", "apply And.intro"] ) theorem_name = get_theorem_name_resembling(file_path, "test3") language = ProofAction.Language.LEAN4 @@ -96,6 +97,9 @@ def main(): env.render() if not done: action = scan_action(language, supported_actions) + if env.language == ProofAction.Language.LEAN4: + print("Final proof so far:") + print(env.pretty_print_proof_so_far()) finally: if language == ProofAction.Language.ISABELLE: IsabelleExecutor.stop_server() From 2ca570a0316e2c9c4db73094471f2080f686340e Mon Sep 17 00:00:00 2001 From: Amitayush Thakur Date: Sat, 10 Jan 2026 18:45:13 +0000 Subject: [PATCH 5/7] Bump version to 1.5.0 in pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d5f9ded..c08ce53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ requires = [ build-backend = "hatchling.build" [project] name = "itp_interface" -version = "1.4.0" +version = "1.5.0" authors = [ { name="Amitayush Thakur", email="amitayush@utexas.edu" }, ] From 3cb22ce7aca2048c97627e434795521ab68eded4 Mon Sep 17 00:00:00 2001 From: Amitayush Thakur Date: Sat, 10 Jan 2026 18:58:48 +0000 Subject: [PATCH 6/7] Add file locking mechanism to SimpleLean4SyncExecutor for cache management --- .../lean/simple_lean4_sync_executor.py | 84 +++++++++++-------- 1 file changed, 47 insertions(+), 37 deletions(-) diff --git a/src/itp_interface/lean/simple_lean4_sync_executor.py b/src/itp_interface/lean/simple_lean4_sync_executor.py index e6fc894..deb99aa 100644 --- a/src/itp_interface/lean/simple_lean4_sync_executor.py +++ b/src/itp_interface/lean/simple_lean4_sync_executor.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +from contextlib import nullcontext import os import base64 import copy @@ -11,6 +12,7 @@ import typing import bisect import subprocess +from filelock import FileLock from itp_interface.lean.tactic_parser import ( TacticParser, ErrorInfo, @@ -610,48 +612,53 @@ def _skip_to_theorem(self, theorem: str): assert self.main_file_iter is not None, "main_file_iter should not be None" assert self.tactic_parser is not None, "Tactic parser is not initialized" extraction_path = self._get_file_path_in_cache() + lock_file_path = self._get_lock_file_path_in_cache(extraction_path) if extraction_path is not None else None + # Use FileLock otherwise just a pass-through context manager + lock = FileLock(lock_file_path, timeout=self.timeout_in_sec*10) if lock_file_path is not None else nullcontext() can_fetch_from_cache = extraction_path is not None file_dep_analysis = None - if can_fetch_from_cache: - file_path = self.associated_file() - assert file_path is not None, "File path should not be None" - if os.path.exists(extraction_path): - temp_extraction_path_file = open(extraction_path, "r", encoding="utf-8") - file_dep_analysis_str = temp_extraction_path_file.read() - file_dep_analysis = [FileDependencyAnalysis.load_from_string(file_dep_analysis_str)] + with lock: + if can_fetch_from_cache: + file_path = self.associated_file() + assert file_path is not None, "File path should not be None" + if os.path.exists(extraction_path): + temp_extraction_path_file = open(extraction_path, "r", encoding="utf-8") + file_dep_analysis_str = temp_extraction_path_file.read() + file_dep_analysis = [FileDependencyAnalysis.load_from_string(file_dep_analysis_str)] + else: + file_dep_analysis = self.extract_all_theorems_and_definitions( + json_output_path=extraction_path, file_path=file_path) + assert file_dep_analysis is not None, "File dependency analysis should not be None" + assert len(file_dep_analysis) > 0, "File dependency analysis should not be empty" + + temp_extraction_path_file = open(extraction_path, "w", encoding="utf-8") + with temp_extraction_path_file: + temp_extraction_path_file.write(file_dep_analysis[0].to_json()) + with open(file_path, "r", encoding="utf-8") as f: + lines = f.readlines() else: + while True: + try: + stmt = next(self.main_file_iter) + except StopIteration: + break + lines.append(stmt) + full_file = "\n".join(lines) + "\n" + # Dump the file in the temp file + with open(self.temp_file_full_path, "w", encoding="utf-8") as f: + f.write(full_file) + file_path = self.temp_file_full_path + temp_extraction_path_file = NamedTemporaryFile( + prefix="lean4_dep_analysis_", + suffix=".json", + delete_on_close=True) + extraction_path = temp_extraction_path_file.name file_dep_analysis = self.extract_all_theorems_and_definitions( json_output_path=extraction_path, file_path=file_path) - assert file_dep_analysis is not None, "File dependency analysis should not be None" - assert len(file_dep_analysis) > 0, "File dependency analysis should not be empty" - temp_extraction_path_file = open(extraction_path, "w", encoding="utf-8") - with temp_extraction_path_file: - temp_extraction_path_file.write(file_dep_analysis[0].to_json()) - with open(file_path, "r", encoding="utf-8") as f: - lines = f.readlines() - else: - while True: - try: - stmt = next(self.main_file_iter) - except StopIteration: - break - lines.append(stmt) - full_file = "\n".join(lines) + "\n" - # Dump the file in the temp file - with open(self.temp_file_full_path, "w", encoding="utf-8") as f: - f.write(full_file) - file_path = self.temp_file_full_path - temp_extraction_path_file = NamedTemporaryFile( - prefix="lean4_dep_analysis_", - suffix=".json", - delete_on_close=True) - extraction_path = temp_extraction_path_file.name - file_dep_analysis = self.extract_all_theorems_and_definitions( - json_output_path=extraction_path, file_path=file_path) - temp_extraction_path_file.close() - # Remove the temp file - if os.path.exists(self.temp_file_full_path): - os.remove(self.temp_file_full_path) + temp_extraction_path_file.close() + # Remove the temp file + if os.path.exists(self.temp_file_full_path): + os.remove(self.temp_file_full_path) assert file_dep_analysis is not None, "File dependency analysis should not be None" assert len(file_dep_analysis) > 0, "File dependency analysis should not be empty" preprocess_declarations(file_dep_analysis) @@ -755,6 +762,9 @@ def _get_file_path_in_cache(self) -> str | None: fp_encoded = f"b64_{fp_encoded}" file_name_with_timestamp = f"{fp_encoded}_{file_mtime_str}.json" return os.path.join(temp_cache_dir, file_name_with_timestamp) + + def _get_lock_file_path_in_cache(self, file_path: str) -> str: + return file_path + ".lock" def validate_proof(self, timeout_sec: int = 30, keep_temp_file: bool = True) -> typing.Dict[str, typing.Any]: """ From 0ec2b84ef84876556ab2ea0330153ee22b732c63 Mon Sep 17 00:00:00 2001 From: Amitayush Thakur Date: Sat, 10 Jan 2026 19:51:34 +0000 Subject: [PATCH 7/7] Enhance GitHub Actions workflow by adding cleanup steps for conda, pip, and apt caches to free up disk space --- .../github-build-actions-python314t.yaml | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/.github/workflows/github-build-actions-python314t.yaml b/.github/workflows/github-build-actions-python314t.yaml index 4f90686..b864c54 100644 --- a/.github/workflows/github-build-actions-python314t.yaml +++ b/.github/workflows/github-build-actions-python314t.yaml @@ -26,17 +26,21 @@ jobs: run: | apt-get update apt-get install -y wget + apt-get clean + rm -rf /var/lib/apt/lists/* wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O /tmp/miniconda.sh bash /tmp/miniconda.sh -b -p $HOME/miniconda - rm /tmp/miniconda.sh + rm -f /tmp/miniconda.sh export PATH="$HOME/miniconda/bin:$PATH" conda init bash + conda clean -all -y - name: Create Python 3.14 free-threading conda environment shell: bash run: | export PATH="$HOME/miniconda/bin:$PATH" conda create -n py314-ft python=3.14 python-freethreading -c conda-forge -y + conda clean -all -y - name: Check Python version and GIL status shell: bash @@ -53,6 +57,7 @@ jobs: source $HOME/miniconda/bin/activate py314-ft python -m pip install --upgrade pip pip install build==1.3.0 hatchling==1.27.0 + pip cache purge - name: Build package with hatchling shell: bash @@ -67,6 +72,9 @@ jobs: export PATH="$HOME/miniconda/bin:$PATH" source $HOME/miniconda/bin/activate py314-ft pip install dist/*.whl + # Clean up build artifacts and pip cache + rm -rf build/ dist/ *.egg-info + pip cache purge - name: Install Lean (elan) and prepare Lean REPL shell: bash @@ -84,6 +92,30 @@ jobs: source $HOME/.elan/env install-itp-interface + - name: Free up disk space before Lean build + shell: bash + run: | + echo "Disk space before cleanup:" + df -h + # Clean conda caches again + export PATH="$HOME/miniconda/bin:$PATH" + conda clean -all -y + # Clean pip cache + pip cache purge || true + # Clean apt cache + apt-get clean + rm -rf /var/lib/apt/lists/* + # Clean tmp directory + rm -rf /tmp/* 2>/dev/null || true + # Clean any build artifacts + rm -rf build/ dist/ *.egg-info 2>/dev/null || true + # Clean logs + rm -rf .log 2>/dev/null || true + # Clean GitHub runner logs if accessible + rm -rf /home/runner/actions-runner/cached/_diag/*.log 2>/dev/null || true + echo "Disk space after cleanup:" + df -h + - name: Build lean4_proj shell: bash run: |