Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d9b7b56
issue/planned-issue-target-repo: add target_repo field to PlannedIssue
Feb 18, 2026
9acce01
issue/daaccc55-01-core-schemas: Add RepoSpec, WorkspaceRepo, Workspac…
Feb 18, 2026
976289b
Merge issue/daaccc55-01-core-schemas: core-schemas
Feb 18, 2026
03823fe
Merge issue/daaccc55-02-planned-issue-target-repo: planned-issue-targ…
Feb 18, 2026
2e83a5d
issue/coding-loop-repo-name: propagate CoderResult.repo_name to Issue…
Feb 18, 2026
611d942
issue/clone-repos: Add _clone_repos async function and multi-repo bui…
Feb 18, 2026
86f1338
issue/daaccc55-03-workspace-context-utils: Add workspace_context_bloc…
Feb 18, 2026
d99808a
issue/dag-executor-multi-repo: add _init_all_repos and multi-repo mer…
Feb 18, 2026
cf82e24
Merge issue/daaccc55-03-workspace-context-utils: workspace_context_bl…
Feb 18, 2026
aee3605
Merge issue/daaccc55-04-clone-repos: async _clone_repos and multi-rep…
Feb 18, 2026
bca99da
Resolve conflict: merge execute() workspace_manifest docstring from c…
Feb 18, 2026
2d83f61
Merge issue/daaccc55-06-coding-loop-repo-name: propagate repo_name fr…
Feb 18, 2026
fd0baf9
issue/smoke-test-schema-contracts: add smoke tests for AC-01 through …
Feb 18, 2026
2d92fa7
issue/prompt-signatures: add workspace_manifest to all prompt functio…
Feb 18, 2026
36f7333
Merge issue/daaccc55-07-prompt-signatures: update prompt functions wi…
Feb 18, 2026
d31e05a
Merge issue/daaccc55-08-smoke-test-schema-contracts: smoke tests for …
Feb 18, 2026
146b100
chore: clean up repo after merge
Feb 18, 2026
6485f5c
issue/full-integration-test: add integration test suite for all 25 PR…
Feb 18, 2026
7580884
Merge issue/daaccc55-09-full-integration-test: full integration test …
Feb 18, 2026
9f7328b
chore: finalize repo for handoff
Feb 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
720 changes: 0 additions & 720 deletions examples/pyrust/.claude/plans/inherited-stirring-glacier.md

This file was deleted.

387 changes: 303 additions & 84 deletions swe_af/app.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions swe_af/execution/coding_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ async def run_coding_loop(
branch_name=branch_name,
attempts=iteration,
iteration_history=iteration_history,
repo_name=coder_result.get("repo_name", ""),
)

if action == "block":
Expand Down
294 changes: 250 additions & 44 deletions swe_af/execution/dag_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
LevelResult,
ReplanAction,
ReplanDecision,
WorkspaceManifest,
)

# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -134,72 +135,178 @@ async def _merge_level_branches(
) -> dict | None:
"""Merge completed branches into the integration branch.

Single-repo path (workspace_manifest is None): merges all completed
branches into dag_state.git_integration_branch as before.

Multi-repo path (workspace_manifest is set): groups completed IssueResults
by repo_name and dispatches one run_merger call per repo concurrently via
asyncio.gather.

Returns the MergeResult dict, or None if nothing to merge.
"""
completed_branches = []
# --- Single-repo path: unchanged ---
if dag_state.workspace_manifest is None:
completed_branches = []
for r in level_result.completed:
if r.branch_name:
issue_desc = issue_by_name.get(r.issue_name, {}).get("description", "")
completed_branches.append({
"branch_name": r.branch_name,
"issue_name": r.issue_name,
"result_summary": r.result_summary,
"files_changed": r.files_changed,
"issue_description": issue_desc,
})

if not completed_branches:
return None

if note_fn:
branch_names = [b["branch_name"] for b in completed_branches]
note_fn(
f"Merging {len(completed_branches)} branches: {branch_names}",
tags=["execution", "merge", "start"],
)

merge_kwargs = dict(
repo_path=dag_state.repo_path,
integration_branch=dag_state.git_integration_branch,
branches_to_merge=completed_branches,
file_conflicts=file_conflicts,
prd_summary=dag_state.prd_summary,
architecture_summary=dag_state.architecture_summary,
artifacts_dir=dag_state.artifacts_dir,
level=level_result.level_index,
model=config.merger_model,
ai_provider=config.ai_provider,
)

merge_result = await call_fn(f"{node_id}.run_merger", **merge_kwargs)

# Retry once on failure (handles transient auth errors, network blips)
if not merge_result.get("success") and merge_result.get("failed_branches"):
if note_fn:
note_fn(
"Merge failed, retrying once...",
tags=["execution", "merge", "retry"],
)
merge_result = await call_fn(f"{node_id}.run_merger", **merge_kwargs)

dag_state.merge_results.append(merge_result)
for b in merge_result.get("merged_branches", []):
if b not in dag_state.merged_branches:
dag_state.merged_branches.append(b)

# Record unmerged branches for visibility
for b in merge_result.get("failed_branches", []):
if b not in dag_state.unmerged_branches:
dag_state.unmerged_branches.append(b)

if note_fn:
note_fn(
f"Merge complete: merged={merge_result.get('merged_branches', [])}, "
f"failed={merge_result.get('failed_branches', [])}",
tags=["execution", "merge", "complete"],
)

return merge_result

# --- Multi-repo path: group by repo_name, one merger call per repo ---
manifest = WorkspaceManifest(**dag_state.workspace_manifest)

# Group IssueResults by repo_name (fall back to primary if empty)
by_repo: dict[str, list] = {}
for r in level_result.completed:
if r.branch_name:
issue_desc = issue_by_name.get(r.issue_name, {}).get("description", "")
completed_branches.append({
"branch_name": r.branch_name,
"issue_name": r.issue_name,
"result_summary": r.result_summary,
"files_changed": r.files_changed,
"issue_description": issue_desc,
})
repo = r.repo_name or manifest.primary_repo_name
by_repo.setdefault(repo, []).append(r)

if not completed_branches:
if not by_repo:
return None

if note_fn:
branch_names = [b["branch_name"] for b in completed_branches]
note_fn(
f"Merging {len(completed_branches)} branches: {branch_names}",
f"Multi-repo merge: dispatching to {list(by_repo.keys())}",
tags=["execution", "merge", "start"],
)

merge_kwargs = dict(
repo_path=dag_state.repo_path,
integration_branch=dag_state.git_integration_branch,
branches_to_merge=completed_branches,
file_conflicts=file_conflicts,
prd_summary=dag_state.prd_summary,
architecture_summary=dag_state.architecture_summary,
artifacts_dir=dag_state.artifacts_dir,
level=level_result.level_index,
model=config.merger_model,
ai_provider=config.ai_provider,
)
async def _call_merger_for_repo(
repo_name: str,
issue_results: list,
) -> dict:
"""Invoke run_merger for a single repo."""
ws_repo = next(
(r for r in manifest.repos if r.repo_name == repo_name), None
)
if ws_repo is None or ws_repo.git_init_result is None:
return {"success": False, "merged_branches": [], "failed_branches": []}

merge_result = await call_fn(f"{node_id}.run_merger", **merge_kwargs)
git_init = ws_repo.git_init_result
integration_branch = git_init.get("integration_branch", "")
if not integration_branch:
return {"success": False, "merged_branches": [], "failed_branches": []}

# Retry once on failure (handles transient auth errors, network blips)
if not merge_result.get("success") and merge_result.get("failed_branches"):
if note_fn:
note_fn(
"Merge failed, retrying once...",
tags=["execution", "merge", "retry"],
)
merge_result = await call_fn(f"{node_id}.run_merger", **merge_kwargs)
branches_to_merge = [
{
"branch_name": r.branch_name,
"issue_name": r.issue_name,
"result_summary": r.result_summary,
"files_changed": r.files_changed,
"issue_description": issue_by_name.get(r.issue_name, {}).get("description", ""),
}
for r in issue_results
]

dag_state.merge_results.append(merge_result)
for b in merge_result.get("merged_branches", []):
if b not in dag_state.merged_branches:
dag_state.merged_branches.append(b)
result = await call_fn(
f"{node_id}.run_merger",
repo_path=ws_repo.absolute_path,
integration_branch=integration_branch,
branches_to_merge=branches_to_merge,
file_conflicts=file_conflicts,
prd_summary=dag_state.prd_summary,
architecture_summary=dag_state.architecture_summary,
artifacts_dir=dag_state.artifacts_dir,
level=level_result.level_index,
model=config.merger_model,
ai_provider=config.ai_provider,
)
return result

# Record unmerged branches for visibility
for b in merge_result.get("failed_branches", []):
if b not in dag_state.unmerged_branches:
dag_state.unmerged_branches.append(b)
# Dispatch all repo merges concurrently
tasks = [
_call_merger_for_repo(repo_name, issues)
for repo_name, issues in by_repo.items()
]
repo_names = list(by_repo.keys())
results = await asyncio.gather(*tasks, return_exceptions=True)

last_good: dict | None = None
for i, result in enumerate(results):
if isinstance(result, Exception):
if note_fn:
note_fn(
f"Merge failed for repo '{repo_names[i]}': {result}",
tags=["execution", "merge", "error"],
)
continue
dag_state.merge_results.append({**result, "repo_name": repo_names[i]})
for b in result.get("merged_branches", []):
if b not in dag_state.merged_branches:
dag_state.merged_branches.append(b)
for b in result.get("failed_branches", []):
if b not in dag_state.unmerged_branches:
dag_state.unmerged_branches.append(b)
if result.get("success"):
last_good = result

if note_fn:
note_fn(
f"Merge complete: merged={merge_result.get('merged_branches', [])}, "
f"failed={merge_result.get('failed_branches', [])}",
f"Multi-repo merge complete: repos={repo_names}, "
f"merged={dag_state.merged_branches}",
tags=["execution", "merge", "complete"],
)

return merge_result
return last_good


async def _run_integration_tests(
Expand Down Expand Up @@ -335,6 +442,88 @@ async def _cleanup_worktrees(
)


async def _init_all_repos(
dag_state: DAGState,
call_fn: Callable,
node_id: str,
git_model: str,
ai_provider: str,
permission_mode: str = "",
build_id: str = "",
note_fn: Callable | None = None,
) -> None:
"""Run git_init concurrently for all repos in workspace_manifest.

When ``dag_state.workspace_manifest`` is None (single-repo path), returns
immediately without invoking call_fn.

After successful completion, ``dag_state.workspace_manifest`` is updated
with ``git_init_result`` populated on each WorkspaceRepo entry.

Args:
dag_state: Mutated in-place. dag_state.workspace_manifest must be a
dict (WorkspaceManifest.model_dump()) set before calling.
call_fn: AgentField call function for invoking run_git_init.
node_id: e.g. 'swe-planner'.
git_model: Resolved model string. Source: config.git_model.
ai_provider: 'claude' or 'opencode'. Source: config.ai_provider.
permission_mode: Forwarded to run_git_init.
build_id: Forwarded to run_git_init for branch namespace isolation.
note_fn: Optional callback for observability.
"""
if dag_state.workspace_manifest is None:
return # single-repo path: git_init already ran in build()

manifest = WorkspaceManifest(**dag_state.workspace_manifest)

if note_fn:
repo_names = [r.repo_name for r in manifest.repos]
note_fn(
f"Initialising git for {len(manifest.repos)} repos: {repo_names}",
tags=["execution", "init_all_repos", "start"],
)

async def _init_one(ws_repo) -> tuple[str, dict]:
result = await call_fn(
f"{node_id}.run_git_init",
repo_path=ws_repo.absolute_path,
goal="", # goal not needed for dependency repos
artifacts_dir=dag_state.artifacts_dir,
model=git_model,
permission_mode=permission_mode,
ai_provider=ai_provider,
build_id=build_id,
)
return ws_repo.repo_name, result

tasks = [_init_one(r) for r in manifest.repos]
results = await asyncio.gather(*tasks, return_exceptions=True)

# Write results back (WorkspaceRepo is mutable: model_config = ConfigDict(frozen=False))
repo_map = {r.repo_name: r for r in manifest.repos}
for item in results:
if isinstance(item, Exception):
# Non-fatal: single-repo git_init failure is already non-fatal
if note_fn:
note_fn(
f"git_init failed for a repo (non-fatal): {item}",
tags=["execution", "init_all_repos", "error"],
)
continue
name, git_init_dict = item
if name in repo_map:
repo_map[name].git_init_result = git_init_dict

# Replace dag_state manifest dict with updated version
dag_state.workspace_manifest = manifest.model_dump()

if note_fn:
note_fn(
"git init complete for all repos",
tags=["execution", "init_all_repos", "complete"],
)


# ---------------------------------------------------------------------------
# Checkpoint helpers
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -811,6 +1000,9 @@ async def _execute_level(
)
level_result.failed.append(issue_result)
elif isinstance(result, IssueResult):
# Backfill repo_name from issue's target_repo if CoderResult didn't set it
if not result.repo_name:
result.repo_name = active_issues[i].get("target_repo", "")
if result.outcome in (IssueOutcome.COMPLETED, IssueOutcome.COMPLETED_WITH_DEBT):
level_result.completed.append(result)
elif result.outcome == IssueOutcome.SKIPPED:
Expand Down Expand Up @@ -982,6 +1174,7 @@ async def run_dag(
git_config: dict | None = None,
resume: bool = False,
build_id: str = "",
workspace_manifest: dict | None = None,
) -> DAGState:
"""Execute a planned DAG with self-healing replanning.

Expand Down Expand Up @@ -1027,6 +1220,7 @@ async def call_fn(target: str, **kwargs):
return unwrap_call_result(result, target)

dag_state = _init_dag_state(plan_result, repo_path, git_config=git_config, build_id=build_id)
dag_state.workspace_manifest = workspace_manifest
dag_state.max_replans = config.max_replans

# Resume from checkpoint if requested
Expand Down Expand Up @@ -1055,6 +1249,18 @@ async def call_fn(target: str, **kwargs):
# Save initial checkpoint
_save_checkpoint(dag_state, note_fn)

# Per-repo git init for multi-repo builds
if workspace_manifest and call_fn:
await _init_all_repos(
dag_state=dag_state,
call_fn=call_fn,
node_id=node_id,
git_model=config.git_model,
ai_provider=config.ai_provider,
build_id=build_id,
note_fn=note_fn,
)

# Shared memory store for cross-issue learning within this run.
# All issues share the same store via the memory_fn closure.
_shared_memory: dict = {}
Expand Down
Loading
Loading