chore: initial monorepo scaffold + WDS Phase 1+2 artifacts

- Nx 22.7 monorepo (pnpm 11.1, TypeScript 5.9, Node 24)
- apps/api: NestJS 11 (CJS conforme CODING-RULES.md PGD-DB-004)
- apps/web: React 19 + Vite 8 (ESM)
- libs/shared/api-interface: Zod contract base
- Docker Compose dev: Postgres 18, Valkey 8, MinIO, Mailpit
- WDS artifacts:
  - design-artifacts/A-Product-Brief/ (5 docs canônicos + 16 dialogs)
  - design-artifacts/B-Trigger-Map/ (hub + 4 personas + feature impact)
- Stack canon: STACK.md v2.2 + CODING-RULES.md v2.0 + brand.md
- AGENTS.md + README.md como entrada para devs/agentes

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-27 14:34:20 +00:00
commit 17c08e6392
3631 changed files with 855518 additions and 0 deletions

View File

@@ -0,0 +1,115 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.9"
# ///
"""Detect Docker and build the bmad-eval-runner image when needed.
Usage:
python3 docker_setup.py --check # exit 0 if image is ready, 1 otherwise
python3 docker_setup.py --build # build the image (no-op if present)
python3 docker_setup.py --rebuild # force rebuild
"""
from __future__ import annotations
import argparse
import json
import shutil
import subprocess
import sys
from pathlib import Path
IMAGE_TAG = "bmad-eval-runner:latest"
SCRIPT_DIR = Path(__file__).resolve().parent
DOCKERFILE = SCRIPT_DIR.parent / "assets" / "Dockerfile"
def docker_available() -> tuple[bool, str]:
if shutil.which("docker") is None:
return False, "docker CLI not found on PATH"
try:
result = subprocess.run(
["docker", "info"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
return False, f"`docker info` failed: {result.stderr.strip().splitlines()[-1] if result.stderr.strip() else 'unknown'}"
return True, "ok"
except subprocess.TimeoutExpired:
return False, "`docker info` timed out"
except Exception as e:
return False, f"docker check error: {e}"
def image_present(tag: str = IMAGE_TAG) -> bool:
try:
result = subprocess.run(
["docker", "image", "inspect", tag],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=10,
)
return result.returncode == 0
except Exception:
return False
def build_image(tag: str = IMAGE_TAG, force: bool = False, verbose: bool = True) -> int:
if not DOCKERFILE.is_file():
print(f"Dockerfile missing at {DOCKERFILE}", file=sys.stderr)
return 2
cmd = ["docker", "build", "-t", tag, "-f", str(DOCKERFILE), str(DOCKERFILE.parent)]
if force:
cmd.insert(2, "--no-cache")
if verbose:
print(f"Building {tag} from {DOCKERFILE} ...", file=sys.stderr)
proc = subprocess.run(cmd, stdout=sys.stderr if verbose else subprocess.DEVNULL, stderr=sys.stderr)
return proc.returncode
def main() -> int:
parser = argparse.ArgumentParser(description="Manage the bmad-eval-runner Docker image")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--check", action="store_true", help="Report status as JSON; exit 0 if image is ready")
group.add_argument("--build", action="store_true", help="Build the image (no-op if already present)")
group.add_argument("--rebuild", action="store_true", help="Force rebuild")
parser.add_argument("--quiet", action="store_true")
args = parser.parse_args()
available, reason = docker_available()
present = image_present() if available else False
if args.check:
print(json.dumps({
"docker_available": available,
"docker_reason": reason,
"image_present": present,
"image_tag": IMAGE_TAG,
}, indent=2))
return 0 if (available and present) else 1
if not available:
print(f"Docker is not available: {reason}", file=sys.stderr)
return 3
if args.rebuild:
return build_image(force=True, verbose=not args.quiet)
if args.build:
if present:
if not args.quiet:
print(f"{IMAGE_TAG} already present; skipping build (use --rebuild to force).", file=sys.stderr)
return 0
return build_image(force=False, verbose=not args.quiet)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,184 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.9"
# ///
"""Generate an aggregate HTML report for a run folder.
Reads run.json, execution-summary.json, each <eval-id>/grading.json (if present),
and triggers-result.json (if present), then renders a single-file HTML report.
Usage:
python3 generate_report.py --run-dir PATH [-o report.html]
"""
from __future__ import annotations
import argparse
import html as html_lib
import json
import sys
from pathlib import Path
def esc(s: object) -> str:
return html_lib.escape(str(s), quote=True)
def load(path: Path) -> dict | list | None:
if not path.is_file():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return None
def render(run_dir: Path) -> str:
run_meta = load(run_dir / "run.json") or {}
exec_summary = load(run_dir / "execution-summary.json") or {}
triggers = load(run_dir / "triggers-result.json")
eval_blocks: list[str] = []
grading_total = 0
grading_passed = 0
for res in exec_summary.get("results", []):
eval_id = str(res.get("eval_id", "?"))
eval_dir = run_dir / eval_id
grading = load(eval_dir / "grading.json")
metrics = res.get("metrics") or load(eval_dir / "metrics.json") or {}
rc = res.get("return_code")
rows: list[str] = []
if grading:
for exp in grading.get("expectations", []):
passed = bool(exp.get("passed"))
grading_total += 1
if passed:
grading_passed += 1
rows.append(
f'<tr class="{ "pass" if passed else "fail" }">'
f'<td>{ "" if passed else "" }</td>'
f'<td>{esc(exp.get("text", ""))}</td>'
f'<td>{esc(exp.get("evidence", ""))}</td></tr>'
)
feedback = (grading or {}).get("eval_feedback") or {}
feedback_html = ""
if feedback:
sugg = feedback.get("suggestions") or []
sugg_html = "".join(
f"<li><strong>{esc(s.get('assertion','(general)'))}</strong>: {esc(s.get('reason',''))}</li>"
for s in sugg
)
overall = esc(feedback.get("overall", ""))
feedback_html = (
f'<details class="feedback"><summary>Grader feedback on the evals</summary>'
f'<p>{overall}</p>'
f'{"<ul>" + sugg_html + "</ul>" if sugg_html else ""}'
f'</details>'
)
artifacts_listing = ""
artifacts_dir = eval_dir / "artifacts"
if artifacts_dir.is_dir():
files = sorted(p for p in artifacts_dir.rglob("*") if p.is_file())
if files:
artifacts_listing = "<ul>" + "".join(
f'<li><code>{esc(p.relative_to(eval_dir))}</code> '
f'<span class="muted">({p.stat().st_size}b)</span></li>'
for p in files
) + "</ul>"
tool_calls = metrics.get("tool_calls", {})
tool_summary = ", ".join(f"{k}={v}" for k, v in sorted(tool_calls.items())) or ""
eval_blocks.append(f"""
<section class="eval">
<h3>Eval {esc(eval_id)} <span class="muted">rc={esc(rc)} · {esc(metrics.get('elapsed_s', '?'))}s</span></h3>
<p class="muted">Tool calls: {esc(tool_summary)} · output {esc(metrics.get('output_chars', 0))}b · transcript {esc(metrics.get('transcript_chars', 0))}b</p>
{ '<table><thead><tr><th></th><th>Expectation</th><th>Evidence</th></tr></thead><tbody>' + ''.join(rows) + '</tbody></table>' if rows else '<p class="muted">No grading.json yet.</p>' }
{feedback_html}
<details><summary>Artifacts</summary>{artifacts_listing or '<p class="muted">No artifacts captured.</p>'}</details>
</section>
""")
triggers_html = ""
if triggers:
rows = []
for r in triggers.get("results", []):
rows.append(
f'<tr class="{ "pass" if r["pass"] else "fail" }">'
f'<td>{ "" if r["pass"] else "" }</td>'
f'<td>{esc(r["query"])}</td>'
f'<td>{esc(r["should_trigger"])}</td>'
f'<td>{r["triggers"]}/{r["runs"]} ({r["trigger_rate"]:.2f})</td></tr>'
)
s = triggers.get("summary", {})
triggers_html = f"""
<section class="triggers">
<h2>Trigger Evals — {s.get('passed',0)}/{s.get('total',0)} pass</h2>
<table><thead><tr><th></th><th>Query</th><th>Should fire</th><th>Rate</th></tr></thead>
<tbody>{''.join(rows)}</tbody></table>
</section>
"""
artifact_summary = ""
if exec_summary:
artifact_summary = (
f"<p>Executed {exec_summary.get('executed', 0)} / {exec_summary.get('total', 0)} "
f"evals · {exec_summary.get('exec_failures', 0)} execution failures · "
f"grader: {grading_passed}/{grading_total} expectations passed</p>"
)
return f"""<!doctype html>
<html><head><meta charset="utf-8"><title>Eval Run — {esc(run_meta.get('skill_name','?'))}</title>
<style>
body {{ font: 14px/1.5 system-ui, sans-serif; max-width: 1080px; margin: 2em auto; color: #222; padding: 0 1em; }}
h1, h2, h3 {{ font-weight: 600; }}
h1 {{ font-size: 1.6em; margin-bottom: 0.2em; }}
.meta {{ color: #666; margin-bottom: 1.5em; }}
.muted {{ color: #888; font-weight: normal; }}
section.eval {{ border: 1px solid #ddd; border-radius: 6px; padding: 1em 1.2em; margin: 1em 0; background: #fafafa; }}
table {{ width: 100%; border-collapse: collapse; margin: 0.5em 0; font-size: 13px; }}
th, td {{ text-align: left; padding: 6px 8px; border-bottom: 1px solid #eee; vertical-align: top; }}
tr.pass td:first-child {{ color: #2c8a3a; font-weight: 700; }}
tr.fail td:first-child {{ color: #b3261e; font-weight: 700; }}
tr.fail {{ background: #fdf3f2; }}
details.feedback {{ margin-top: 0.6em; padding: 0.4em 0.7em; background: #fff8e1; border-radius: 4px; }}
details summary {{ cursor: pointer; font-weight: 600; }}
code {{ background: #eee; padding: 1px 4px; border-radius: 3px; }}
</style></head>
<body>
<h1>{esc(run_meta.get('skill_name','?'))} — eval run</h1>
<div class="meta">
Run id: <code>{esc(run_meta.get('run_id','?'))}</code> ·
isolation: {esc(run_meta.get('isolation','?'))} ·
started: {esc(run_meta.get('started_at','?'))}
</div>
{artifact_summary}
{''.join(eval_blocks)}
{triggers_html}
</body></html>
"""
def main() -> int:
parser = argparse.ArgumentParser(description="Generate HTML report for an eval run folder")
parser.add_argument("--run-dir", required=True, type=Path)
parser.add_argument("-o", "--output", type=Path, default=None)
args = parser.parse_args()
run_dir = args.run_dir.resolve()
if not run_dir.is_dir():
print(f"run-dir not found: {run_dir}", file=sys.stderr)
return 2
out = args.output or (run_dir / "report.html")
out.write_text(render(run_dir), encoding="utf-8")
print(str(out))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,171 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.9"
# ///
"""Run claude interactively via PTY so the Skill tool is available.
In `claude -p` (print mode) the Skill tool is never offered — Claude handles
everything inline. Running `claude` in interactive mode activates the Skill
tool so dependency skills installed in .claude/skills/ can be properly invoked.
The PTY tricks claude into thinking it has a terminal (interactive mode) while
we capture its stream-json output programmatically.
Usage:
python3 pty_runner.py --prompt-file /path/to/prompt.txt \\
--output /path/to/transcript.jsonl \\
[--timeout 600]
python3 pty_runner.py --prompt "Run headless. ..." --output transcript.jsonl
"""
from __future__ import annotations
import argparse
import json
import os
import pty
import re
import select
import subprocess
import sys
import time
from pathlib import Path
ANSI_RE = re.compile(r"\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])|\r")
# How long to wait for claude to initialize before sending the prompt.
# Claude loads skill registry, checks credentials, etc. on startup.
INIT_WAIT_S = 5.0
# How long to wait after the stream-json 'result' event before killing claude.
# Trailing tool-result output sometimes follows the result event.
POST_RESULT_S = 4.0
def _strip_ansi(text: str) -> str:
return ANSI_RE.sub("", text)
def run_interactive(prompt: str, output: Path, timeout: int = 600) -> None:
"""Spawn claude interactively via PTY, send one prompt, capture transcript."""
master, slave = pty.openpty()
proc = subprocess.Popen(
[
"claude",
"--output-format", "stream-json",
"--verbose",
"--dangerously-skip-permissions",
],
stdin=slave,
stdout=slave,
stderr=slave,
close_fds=True,
)
os.close(slave)
json_lines: list[str] = []
buf = b""
prompt_sent = False
done_at: float | None = None
start = time.time()
try:
while True:
elapsed = time.time() - start
if elapsed > timeout:
print(f"[pty_runner] timeout after {elapsed:.0f}s", file=sys.stderr)
break
if done_at is not None and (time.time() - done_at) > POST_RESULT_S:
break
# Short select so we stay responsive but don't spin.
r, _, _ = select.select([master], [], [], 0.3)
if r:
try:
chunk = os.read(master, 8192)
except OSError:
break # PTY closed — claude exited
buf += chunk
# Process all complete lines in buffer.
while b"\n" in buf:
raw, buf = buf.split(b"\n", 1)
line = _strip_ansi(raw.decode("utf-8", errors="replace")).strip()
if not line.startswith("{"):
continue
json_lines.append(line)
try:
obj = json.loads(line)
# 'result' marks end of a claude turn.
if obj.get("type") == "result" and done_at is None:
done_at = time.time()
print(
f"[pty_runner] result event at t={time.time()-start:.1f}s "
f"({len(json_lines)} lines so far)",
file=sys.stderr,
)
except json.JSONDecodeError:
pass
else:
# Silence window — send prompt once claude has had time to init.
if not prompt_sent and (time.time() - start) >= INIT_WAIT_S:
os.write(master, (prompt + "\n").encode())
prompt_sent = True
print(
f"[pty_runner] prompt sent at t={time.time()-start:.1f}s",
file=sys.stderr,
)
finally:
# Politely ask claude to exit, then hard-kill if needed.
try:
os.write(master, b"exit\n")
time.sleep(0.3)
except OSError:
pass
try:
proc.terminate()
proc.wait(timeout=5)
except Exception:
try:
proc.kill()
except Exception:
pass
try:
os.close(master)
except OSError:
pass
output.parent.mkdir(parents=True, exist_ok=True)
content = "\n".join(json_lines) + ("\n" if json_lines else "")
output.write_text(content, encoding="utf-8")
print(
f"[pty_runner] wrote {len(json_lines)} transcript lines → {output}",
file=sys.stderr,
)
def main() -> int:
p = argparse.ArgumentParser(
description="Run claude interactively via PTY and capture stream-json transcript"
)
grp = p.add_mutually_exclusive_group(required=True)
grp.add_argument("--prompt", help="Prompt text")
grp.add_argument("--prompt-file", type=Path, help="File containing the prompt")
p.add_argument("--output", type=Path, required=True, help="Output .jsonl transcript file")
p.add_argument("--timeout", type=int, default=600, help="Hard timeout in seconds")
args = p.parse_args()
prompt = (
args.prompt_file.read_text(encoding="utf-8").strip()
if args.prompt_file
else args.prompt
)
run_interactive(prompt, args.output, args.timeout)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,492 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.9"
# ///
"""Run a skill's artifact evals in isolated workspaces.
For each eval, the runner:
1. Stages a fresh workspace (Docker container or local tmp dir under ~/bmad-evals).
2. Applies the setup overlay (base then per-eval) so _bmad/ config and dependency
skills land in the workspace BEFORE the skill is staged — the skill's own copy
always wins over overlay content.
3. Copies the skill into .claude/skills/ so it is discoverable by claude.
4. Stages any fixture files declared in the eval's `files` list.
5. Runs `claude -p '<prompt>' --output-format stream-json --verbose`, capturing
the transcript. The Skill tool is available in -p mode and fires for installed
skills, so dependency skills provided by the setup overlay are properly invokable.
6. Rsyncs any files claude wrote into `<run-dir>/<eval-id>/artifacts/`.
7. Writes `metrics.json` (tool-call counts, timing, output sizes).
Grading is performed separately by the parent skill's grader subagents.
Usage:
python3 run_evals.py \\
--skill-path PATH \\
--evals-file PATH/evals.json \\
--project-root PATH \\
--output-dir PATH \\
--isolation docker|local \\
[--workers N] [--timeout SECS] [--eval-ids A1,B3] [--quiet]
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(SCRIPT_DIR))
from utils import ( # noqa: E402
apply_setup_overlay,
discover_setup_dirs,
new_run_id,
parse_skill_md,
read_json,
read_macos_keychain_credentials,
stage_credentials,
utc_now_iso,
write_json,
)
DOCKER_IMAGE = "bmad-eval-runner:latest"
_KEYCHAIN_CREDS: str | None = read_macos_keychain_credentials()
RSYNC_EXCLUDES = (
".git", ".bare", "node_modules", ".venv", "__pycache__",
".pytest_cache", ".next", "dist", "build", ".cache",
".DS_Store", "*.pyc",
)
def stage_workspace_local(
workspace: Path,
project_root: Path,
skill_path: Path,
fixtures: list[tuple[Path, str]],
setup_dirs: list[Path] | None = None,
) -> Path:
"""Build a clean local workspace. Returns the project root inside workspace."""
workspace.mkdir(parents=True, exist_ok=True)
project_dest = workspace / "project"
home_dir = workspace / ".home"
(home_dir / ".claude").mkdir(parents=True, exist_ok=True)
excludes: list[str] = []
for pat in RSYNC_EXCLUDES:
excludes.extend(["--exclude", pat])
if shutil.which("rsync"):
subprocess.run(
["rsync", "-a", *excludes, f"{project_root}/", f"{project_dest}/"],
check=True,
)
else:
shutil.copytree(project_root, project_dest, dirs_exist_ok=True,
ignore=shutil.ignore_patterns(*RSYNC_EXCLUDES))
# Apply setup overlay before staging the skill — the skill's own copy wins.
if setup_dirs:
apply_setup_overlay(setup_dirs, project_dest)
skill_link_dir = project_dest / ".claude" / "skills"
skill_link_dir.mkdir(parents=True, exist_ok=True)
skill_dest = skill_link_dir / skill_path.name
if not skill_dest.exists():
try:
os.symlink(skill_path, skill_dest)
except OSError:
shutil.copytree(skill_path, skill_dest, dirs_exist_ok=True)
for src, dest_rel in fixtures:
dest = project_dest / dest_rel
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dest)
return project_dest
def run_eval_local(
eval_item: dict,
run_dir: Path,
skill_path: Path,
project_root: Path,
timeout: int,
setup_dirs: list[Path] | None = None,
) -> dict:
eval_id = str(eval_item.get("id", "unnamed"))
eval_dir = run_dir / eval_id
workspace_root = eval_dir / "workspace"
artifacts_dir = eval_dir / "artifacts"
transcript_path = eval_dir / "transcript.jsonl"
eval_dir.mkdir(parents=True, exist_ok=True)
artifacts_dir.mkdir(parents=True, exist_ok=True)
fixtures = resolve_fixtures(eval_item.get("files", []), project_root)
workspace_project = stage_workspace_local(
workspace_root, project_root, skill_path, fixtures, setup_dirs
)
(eval_dir / "prompt.txt").write_text(eval_item["prompt"], encoding="utf-8")
workspace_snapshot_before = snapshot_files(workspace_project)
home_dir = workspace_root / ".home"
stage_credentials(home_dir / ".claude", _KEYCHAIN_CREDS)
env = {
"HOME": str(home_dir),
"CLAUDE_CONFIG_DIR": str(home_dir / ".claude"),
"PATH": os.environ.get("PATH", ""),
"ANTHROPIC_API_KEY": os.environ.get("ANTHROPIC_API_KEY", ""),
}
cmd = [
"claude",
"-p", eval_item["prompt"],
"--output-format", "stream-json",
"--verbose",
"--dangerously-skip-permissions",
]
start = time.time()
try:
with transcript_path.open("wb") as out:
proc = subprocess.run(
cmd,
stdout=out,
stderr=subprocess.PIPE,
cwd=str(workspace_project),
env=env,
timeout=timeout,
)
elapsed = time.time() - start
return_code = proc.returncode
stderr_tail = (proc.stderr or b"").decode("utf-8", errors="replace")[-2000:]
except subprocess.TimeoutExpired as e:
elapsed = time.time() - start
return_code = -1
stderr_tail = f"TIMEOUT after {timeout}s"
if e.stderr:
stderr_tail += "\n" + e.stderr.decode("utf-8", errors="replace")[-2000:]
new_files = diff_workspace(workspace_project, workspace_snapshot_before)
sync_artifacts(workspace_project, new_files, artifacts_dir)
metrics = compute_metrics(transcript_path, artifacts_dir, elapsed, return_code, stderr_tail)
write_json(eval_dir / "metrics.json", metrics)
return {
"eval_id": eval_id,
"elapsed_s": elapsed,
"return_code": return_code,
"transcript": str(transcript_path.relative_to(run_dir)),
"artifacts_dir": str(artifacts_dir.relative_to(run_dir)),
"metrics": metrics,
}
def run_eval_docker(
eval_item: dict,
run_dir: Path,
skill_path: Path,
project_root: Path,
timeout: int,
setup_dirs: list[Path] | None = None,
) -> dict:
eval_id = str(eval_item.get("id", "unnamed"))
eval_dir = run_dir / eval_id
artifacts_dir = eval_dir / "artifacts"
transcript_path = eval_dir / "transcript.jsonl"
eval_dir.mkdir(parents=True, exist_ok=True)
artifacts_dir.mkdir(parents=True, exist_ok=True)
fixtures_staging = eval_dir / "fixtures_in"
fixtures_staging.mkdir(parents=True, exist_ok=True)
fixtures = resolve_fixtures(eval_item.get("files", []), project_root)
for src, dest_rel in fixtures:
dest = fixtures_staging / dest_rel
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dest)
(eval_dir / "prompt.txt").write_text(eval_item["prompt"], encoding="utf-8")
# Pre-merge setup overlay dirs on the host; mount as /setup:ro in the container.
setup_merged: Path | None = None
if setup_dirs:
setup_merged = eval_dir / "setup_merged"
apply_setup_overlay(setup_dirs, setup_merged)
if not any(setup_merged.iterdir()):
setup_merged = None
creds_dir: Path | None = None
if _KEYCHAIN_CREDS:
creds_dir = eval_dir / "creds"
creds_dir.mkdir(parents=True, exist_ok=True)
(creds_dir / ".credentials.json").write_text(_KEYCHAIN_CREDS, encoding="utf-8")
container_script = r"""
set -e
mkdir -p /workspace
rsync -a \
--exclude=.git --exclude=.bare --exclude=node_modules --exclude=.venv \
--exclude=__pycache__ --exclude=.pytest_cache --exclude=.next \
--exclude=dist --exclude=build --exclude=.cache --exclude=.DS_Store \
/project/ /workspace/
if [ -d /setup ]; then
rsync -a /setup/ /workspace/
fi
mkdir -p /workspace/.claude/skills
cp -R "$SKILL_SRC" "/workspace/.claude/skills/$SKILL_NAME"
if [ -d /fixtures ]; then
cp -R /fixtures/. /workspace/
fi
if [ -f /creds/.credentials.json ]; then
mkdir -p /home/evaluator/.claude
cp /creds/.credentials.json /home/evaluator/.claude/.credentials.json
fi
cd /workspace
claude -p "$EVAL_PROMPT" \
--output-format stream-json --verbose \
--dangerously-skip-permissions \
> /output/transcript.jsonl 2> /output/stderr.log || true
mkdir -p /output/artifacts
rsync -a --exclude=.claude --exclude=node_modules --exclude=.git \
--filter='+ */' --filter='+ *' \
/workspace/ /output/artifacts/
"""
skill_name = skill_path.name
cmd = [
"docker", "run", "--rm",
"-v", f"{project_root}:/project:ro",
"-v", f"{skill_path}:/skill_src:ro",
"-v", f"{eval_dir}:/output",
"-e", "ANTHROPIC_API_KEY",
"-e", f"EVAL_PROMPT={eval_item['prompt']}",
"-e", f"SKILL_SRC=/skill_src",
"-e", f"SKILL_NAME={skill_name}",
]
if creds_dir:
cmd += ["-v", f"{creds_dir}:/creds:ro"]
if fixtures:
cmd += ["-v", f"{fixtures_staging}:/fixtures:ro"]
if setup_merged:
cmd += ["-v", f"{setup_merged}:/setup:ro"]
cmd += [DOCKER_IMAGE, "bash", "-c", container_script]
start = time.time()
try:
proc = subprocess.run(
cmd,
capture_output=True,
timeout=timeout + 30,
)
elapsed = time.time() - start
return_code = proc.returncode
stderr_tail = proc.stderr.decode("utf-8", errors="replace")[-2000:]
if proc.stdout:
(eval_dir / "docker.stdout.log").write_bytes(proc.stdout)
except subprocess.TimeoutExpired as e:
elapsed = time.time() - start
return_code = -1
stderr_tail = f"TIMEOUT after {timeout}s"
if e.stderr:
stderr_tail += "\n" + e.stderr.decode("utf-8", errors="replace")[-2000:]
metrics = compute_metrics(transcript_path, artifacts_dir, elapsed, return_code, stderr_tail)
write_json(eval_dir / "metrics.json", metrics)
shutil.rmtree(fixtures_staging, ignore_errors=True)
return {
"eval_id": eval_id,
"elapsed_s": elapsed,
"return_code": return_code,
"transcript": str(transcript_path.relative_to(run_dir)),
"artifacts_dir": str(artifacts_dir.relative_to(run_dir)),
"metrics": metrics,
}
def resolve_fixtures(files: list[str], project_root: Path) -> list[tuple[Path, str]]:
out: list[tuple[Path, str]] = []
for entry in files:
candidate = (project_root / entry).resolve()
if not candidate.is_file():
alt = Path(entry).resolve()
if alt.is_file():
candidate = alt
else:
print(f"Warning: fixture not found: {entry}", file=sys.stderr)
continue
out.append((candidate, entry))
return out
def snapshot_files(root: Path) -> set[str]:
snap: set[str] = set()
for p in root.rglob("*"):
if p.is_file():
snap.add(str(p.relative_to(root)))
return snap
def diff_workspace(root: Path, before: set[str]) -> list[str]:
after = snapshot_files(root)
return sorted(after - before)
def sync_artifacts(workspace: Path, new_files: list[str], dest: Path) -> None:
for rel in new_files:
src = workspace / rel
if not src.is_file():
continue
if any(part in (".claude", "node_modules", ".git", ".venv") for part in src.parts):
continue
target = dest / rel
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, target)
def compute_metrics(transcript: Path, artifacts: Path, elapsed: float,
rc: int, stderr_tail: str) -> dict:
tool_calls: dict[str, int] = {}
total_steps = 0
if transcript.is_file():
for raw in transcript.read_text(encoding="utf-8", errors="replace").splitlines():
raw = raw.strip()
if not raw:
continue
try:
evt = json.loads(raw)
except json.JSONDecodeError:
continue
if evt.get("type") == "assistant":
total_steps += 1
for item in evt.get("message", {}).get("content", []):
if item.get("type") == "tool_use":
name = item.get("name", "?")
tool_calls[name] = tool_calls.get(name, 0) + 1
output_chars = 0
for f in artifacts.rglob("*"):
if f.is_file():
try:
output_chars += f.stat().st_size
except OSError:
pass
return {
"elapsed_s": round(elapsed, 2),
"return_code": rc,
"tool_calls": tool_calls,
"total_tool_calls": sum(tool_calls.values()),
"total_steps": total_steps,
"output_chars": output_chars,
"transcript_chars": transcript.stat().st_size if transcript.is_file() else 0,
"stderr_tail": stderr_tail,
}
def main() -> int:
parser = argparse.ArgumentParser(description="Run a skill's artifact evals in isolation")
parser.add_argument("--skill-path", required=True, type=Path)
parser.add_argument("--evals-file", required=True, type=Path)
parser.add_argument("--project-root", required=True, type=Path)
parser.add_argument("--output-dir", required=True, type=Path)
parser.add_argument("--isolation", choices=("docker", "local"), required=True)
parser.add_argument("--workers", type=int, default=8)
parser.add_argument("--timeout", type=int, default=600)
parser.add_argument("--eval-ids", default=None, help="Comma-separated subset of eval ids to run")
parser.add_argument("--quiet", action="store_true")
args = parser.parse_args()
skill_path = args.skill_path.resolve()
project_root = args.project_root.resolve()
evals_file = args.evals_file.resolve()
if not evals_file.is_file():
print(f"evals file not found: {evals_file}", file=sys.stderr)
return 2
skill_name, _, _ = parse_skill_md(skill_path)
data = read_json(evals_file)
evals = data["evals"] if isinstance(data, dict) and "evals" in data else data
if args.eval_ids:
wanted = {x.strip() for x in args.eval_ids.split(",") if x.strip()}
evals = [e for e in evals if str(e.get("id")) in wanted]
run_id = new_run_id(skill_name)
run_dir = (args.output_dir / run_id).resolve()
run_dir.mkdir(parents=True, exist_ok=True)
write_json(run_dir / "run.json", {
"run_id": run_id,
"skill_name": skill_name,
"skill_path": str(skill_path),
"project_root": str(project_root),
"evals_file": str(evals_file),
"isolation": args.isolation,
"started_at": utc_now_iso(),
"eval_count": len(evals),
})
runner = run_eval_docker if args.isolation == "docker" else run_eval_local
results: list[dict] = []
if not args.quiet:
print(
f"[run_evals] {len(evals)} evals, isolation={args.isolation}, run_dir={run_dir}",
file=sys.stderr,
)
with ThreadPoolExecutor(max_workers=args.workers) as pool:
future_to_eval = {
pool.submit(
runner,
item,
run_dir,
skill_path,
project_root,
int(item.get("timeout", args.timeout)),
discover_setup_dirs(evals_file, str(item.get("id", ""))),
): item
for item in evals
}
for fut in as_completed(future_to_eval):
item = future_to_eval[fut]
try:
res = fut.result()
except Exception as e:
res = {"eval_id": str(item.get("id")), "error": str(e), "return_code": -1}
results.append(res)
if not args.quiet:
rc = res.get("return_code")
status = "ok" if rc == 0 else f"rc={rc}"
print(
f" [{status}] eval {res.get('eval_id')} ({res.get('elapsed_s', 0):.1f}s)",
file=sys.stderr,
)
summary = {
"run_id": run_id,
"completed_at": utc_now_iso(),
"total": len(evals),
"executed": len(results),
"exec_failures": sum(1 for r in results if r.get("return_code") != 0),
"run_dir": str(run_dir),
"results": results,
}
write_json(run_dir / "execution-summary.json", summary)
print(json.dumps(summary, indent=2))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,366 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.9"
# ///
"""Run trigger evals: does the skill's description fire on each query?
Adapted from Anthropic skill-creator's run_eval.py
(https://github.com/anthropics/skills/tree/main/skills/skill-creator) with two
adaptations:
1. Isolation. Each query runs in either a fresh Docker container off
bmad-eval-runner:latest, or a fresh local tmp dir under ~/bmad-evals/<run-id>/
with HOME overridden to a clean directory. This prevents the host's global
CLAUDE.md and auto-memory from biasing whether the skill fires.
2. Output. Results are written to a run folder alongside the artifact eval
run-folder layout (so triggers and artifacts can share a single report).
Usage:
python3 run_triggers.py \\
--skill-path PATH \\
--triggers-file PATH/triggers.json \\
--output-dir PATH \\
--isolation docker|local \\
[--workers N] [--runs-per-query N] [--timeout SECS] [--threshold 0.5]
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import subprocess
import sys
import time
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(SCRIPT_DIR))
from utils import ( # noqa: E402
new_run_id,
parse_skill_md,
read_json,
read_macos_keychain_credentials,
stage_credentials,
utc_now_iso,
write_json,
)
DOCKER_IMAGE = "bmad-eval-runner:latest"
_KEYCHAIN_CREDS: str | None = read_macos_keychain_credentials()
def write_synthetic_skill(skills_dir: Path, skill_name: str, description: str, unique_id: str) -> tuple[Path, str]:
"""Place a synthetic skill at <skills_dir>/<clean_name>/SKILL.md.
The Skill tool only fires for entries discovered as actual skills (frontmatter
`name` + `description` under a `.claude/skills/<name>/SKILL.md`). Slash-commands
under `.claude/commands/` do not auto-invoke the Skill tool, so the previous
implementation could never observe a positive trigger. This places the synthetic
skill where Claude Code looks for skills, with a unique name so the detector
can disambiguate it from any pre-existing skill of the same display name.
"""
clean_name = f"{skill_name}-skill-{unique_id}"
skill_root = skills_dir / clean_name
skill_root.mkdir(parents=True, exist_ok=True)
path = skill_root / "SKILL.md"
indented_desc = "\n ".join(description.split("\n"))
path.write_text(
f"---\n"
f"name: {clean_name}\n"
f"description: |\n"
f" {indented_desc}\n"
f"---\n\n"
f"# {skill_name}\n\n"
f"This skill handles: {description}\n",
encoding="utf-8",
)
return path, clean_name
def parse_stream_for_trigger(buffer: str, clean_name: str) -> tuple[bool | None, str]:
"""Return (triggered_or_none, leftover_buffer). None means undecided yet."""
triggered: bool | None = None
pending_tool: str | None = None
accumulated_json = ""
leftover = ""
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if not line:
continue
try:
evt = json.loads(line)
except json.JSONDecodeError:
continue
if evt.get("type") == "stream_event":
se = evt.get("event", {})
t = se.get("type", "")
if t == "content_block_start":
cb = se.get("content_block", {})
if cb.get("type") == "tool_use":
name = cb.get("name", "")
if name in ("Skill", "Read"):
pending_tool = name
accumulated_json = ""
else:
return False, ""
elif t == "content_block_delta" and pending_tool:
delta = se.get("delta", {})
if delta.get("type") == "input_json_delta":
accumulated_json += delta.get("partial_json", "")
if clean_name in accumulated_json:
return True, ""
elif t in ("content_block_stop", "message_stop"):
if pending_tool:
return clean_name in accumulated_json, ""
if t == "message_stop":
return False, ""
elif evt.get("type") == "assistant":
for item in evt.get("message", {}).get("content", []):
if item.get("type") != "tool_use":
continue
tname = item.get("name", "")
tinput = item.get("input", {})
if tname == "Skill" and clean_name in tinput.get("skill", ""):
return True, ""
if tname == "Read" and clean_name in tinput.get("file_path", ""):
return True, ""
return False, ""
elif evt.get("type") == "result":
return triggered if triggered is not None else False, ""
leftover = buffer
return triggered, leftover
def run_query_local(query: str, skill_name: str, description: str,
workspace_root: Path, timeout: int) -> bool:
workspace_root.mkdir(parents=True, exist_ok=True)
home_dir = workspace_root / ".home"
(home_dir / ".claude").mkdir(parents=True, exist_ok=True)
stage_credentials(home_dir / ".claude", _KEYCHAIN_CREDS)
project_dir = workspace_root / "project"
skills_dir = project_dir / ".claude" / "skills"
project_dir.mkdir(parents=True, exist_ok=True)
unique = uuid.uuid4().hex[:8]
cmd_file, clean_name = write_synthetic_skill(skills_dir, skill_name, description, unique)
env = {
"HOME": str(home_dir),
"CLAUDE_CONFIG_DIR": str(home_dir / ".claude"),
"PATH": os.environ.get("PATH", ""),
"ANTHROPIC_API_KEY": os.environ.get("ANTHROPIC_API_KEY", ""),
}
cmd = [
"claude", "-p", query,
"--output-format", "stream-json",
"--verbose",
"--include-partial-messages",
"--dangerously-skip-permissions",
]
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
cwd=str(project_dir),
env=env,
)
buffer = ""
triggered: bool | None = None
start = time.time()
try:
while time.time() - start < timeout:
if proc.poll() is not None:
rest = proc.stdout.read()
if rest:
buffer += rest.decode("utf-8", errors="replace")
break
chunk = proc.stdout.read1(8192) if hasattr(proc.stdout, "read1") else proc.stdout.read(8192)
if not chunk:
time.sleep(0.05)
continue
buffer += chunk.decode("utf-8", errors="replace")
decided, buffer = parse_stream_for_trigger(buffer, clean_name)
if decided is not None:
triggered = decided
break
finally:
if proc.poll() is None:
proc.kill()
proc.wait()
if triggered is None:
decided, _ = parse_stream_for_trigger(buffer + "\n", clean_name)
triggered = bool(decided)
return bool(triggered)
finally:
try:
shutil.rmtree(cmd_file.parent, ignore_errors=True)
except OSError:
pass
def run_query_docker(query: str, skill_name: str, description: str,
workspace_root: Path, timeout: int) -> bool:
workspace_root.mkdir(parents=True, exist_ok=True)
unique = uuid.uuid4().hex[:8]
skills_in = workspace_root / "skills_in"
skills_in.mkdir(parents=True, exist_ok=True)
_, clean_name = write_synthetic_skill(skills_in, skill_name, description, unique)
creds_dir: Path | None = None
if _KEYCHAIN_CREDS:
creds_dir = workspace_root / "creds_in"
creds_dir.mkdir(parents=True, exist_ok=True)
(creds_dir / ".credentials.json").write_text(_KEYCHAIN_CREDS, encoding="utf-8")
container_script = f"""
set -e
mkdir -p /workspace/.claude/skills
cp -R /skills/. /workspace/.claude/skills/ 2>/dev/null || true
if [ -f /creds/.credentials.json ]; then
mkdir -p /home/evaluator/.claude
cp /creds/.credentials.json /home/evaluator/.claude/.credentials.json
fi
cd /workspace
claude -p "$EVAL_QUERY" \\
--output-format stream-json --verbose --include-partial-messages \\
--dangerously-skip-permissions \\
> /output/stream.jsonl 2>/dev/null || true
"""
output_dir = workspace_root / "output"
output_dir.mkdir(parents=True, exist_ok=True)
cmd = [
"docker", "run", "--rm",
"-v", f"{skills_in}:/skills:ro",
"-v", f"{output_dir}:/output",
"-e", "ANTHROPIC_API_KEY",
"-e", f"EVAL_QUERY={query}",
]
if creds_dir:
cmd += ["-v", f"{creds_dir}:/creds:ro"]
cmd += [DOCKER_IMAGE, "bash", "-c", container_script]
try:
subprocess.run(cmd, capture_output=True, timeout=timeout + 30)
except subprocess.TimeoutExpired:
pass
stream_file = output_dir / "stream.jsonl"
if not stream_file.is_file():
return False
decided, _ = parse_stream_for_trigger(stream_file.read_text(encoding="utf-8", errors="replace") + "\n", clean_name)
return bool(decided)
def main() -> int:
parser = argparse.ArgumentParser(description="Run trigger evals in isolation")
parser.add_argument("--skill-path", required=True, type=Path)
parser.add_argument("--triggers-file", required=True, type=Path)
parser.add_argument("--output-dir", required=True, type=Path)
parser.add_argument("--isolation", choices=("docker", "local"), required=True)
parser.add_argument("--workers", type=int, default=8)
parser.add_argument("--runs-per-query", type=int, default=3)
parser.add_argument("--timeout", type=int, default=45)
parser.add_argument("--threshold", type=float, default=0.5)
parser.add_argument("--quiet", action="store_true")
args = parser.parse_args()
skill_path = args.skill_path.resolve()
triggers_file = args.triggers_file.resolve()
if not triggers_file.is_file():
print(f"triggers file not found: {triggers_file}", file=sys.stderr)
return 2
skill_name, description, _ = parse_skill_md(skill_path)
queries = read_json(triggers_file)
run_id = new_run_id(f"{skill_name}-triggers")
run_dir = (args.output_dir / run_id).resolve()
(run_dir / "queries").mkdir(parents=True, exist_ok=True)
write_json(run_dir / "run.json", {
"run_id": run_id,
"skill_name": skill_name,
"description": description,
"isolation": args.isolation,
"started_at": utc_now_iso(),
"query_count": len(queries),
"runs_per_query": args.runs_per_query,
"threshold": args.threshold,
})
runner = run_query_docker if args.isolation == "docker" else run_query_local
def run_one(idx: int, q: dict, run_idx: int) -> tuple[int, bool]:
ws = run_dir / "queries" / f"q{idx:03d}-r{run_idx}"
triggered = runner(q["query"], skill_name, description, ws, args.timeout)
return idx, triggered
per_query: dict[int, list[bool]] = {}
if not args.quiet:
print(f"[run_triggers] {len(queries)} queries × {args.runs_per_query} runs, isolation={args.isolation}", file=sys.stderr)
with ThreadPoolExecutor(max_workers=args.workers) as pool:
futures = []
for idx, q in enumerate(queries):
for run_idx in range(args.runs_per_query):
futures.append(pool.submit(run_one, idx, q, run_idx))
for fut in as_completed(futures):
try:
idx, triggered = fut.result()
except Exception as e:
print(f"Warning: query failed: {e}", file=sys.stderr)
continue
per_query.setdefault(idx, []).append(triggered)
results = []
for idx, q in enumerate(queries):
triggers = per_query.get(idx, [])
rate = (sum(triggers) / len(triggers)) if triggers else 0.0
should = bool(q["should_trigger"])
if should:
passed = rate >= args.threshold
else:
passed = rate < args.threshold
results.append({
"query": q["query"],
"should_trigger": should,
"trigger_rate": rate,
"triggers": int(sum(triggers)),
"runs": len(triggers),
"pass": passed,
})
output = {
"run_id": run_id,
"completed_at": utc_now_iso(),
"skill_name": skill_name,
"description": description,
"isolation": args.isolation,
"results": results,
"summary": {
"total": len(results),
"passed": sum(1 for r in results if r["pass"]),
"failed": sum(1 for r in results if not r["pass"]),
},
}
write_json(run_dir / "triggers-result.json", output)
print(json.dumps(output, indent=2))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,260 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.9"
# ///
"""Shared helpers for the eval runner."""
from __future__ import annotations
import json
import re
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
def parse_skill_md(skill_path: Path) -> tuple[str, str, str]:
"""Return (name, description, body) from the skill's SKILL.md frontmatter."""
text = (skill_path / "SKILL.md").read_text(encoding="utf-8")
fm_match = re.match(r"^---\s*\n(.*?)\n---\s*\n(.*)$", text, re.DOTALL)
if not fm_match:
raise ValueError(f"SKILL.md at {skill_path} is missing frontmatter")
frontmatter, body = fm_match.group(1), fm_match.group(2)
name = None
description_lines: list[str] = []
in_description = False
for line in frontmatter.splitlines():
if line.startswith("name:"):
name = line.split(":", 1)[1].strip()
in_description = False
elif line.startswith("description:"):
value = line.split(":", 1)[1].strip()
if value in ("|", ">"):
in_description = True
else:
description_lines = [value]
in_description = False
elif in_description and line.startswith((" ", "\t")):
description_lines.append(line.strip())
elif in_description:
in_description = False
if not name:
raise ValueError(f"SKILL.md at {skill_path} is missing a name")
return name, " ".join(description_lines).strip(), body
def discover_project_root(skill_path: Path) -> Path:
"""Walk up from the skill looking for _bmad/ or .git; default to skill's grandparent."""
for parent in [skill_path, *skill_path.parents]:
if (parent / "_bmad").is_dir() or (parent / ".git").exists():
return parent
return skill_path.parent.parent
def discover_evals(
skill_path: Path,
project_root: Path,
explicit: Path | None,
) -> dict[str, Path]:
"""Locate evals.json and triggers.json. Return dict with keys 'evals' and/or 'triggers'."""
found: dict[str, Path] = {}
def check_dir(d: Path) -> None:
if not d.is_dir():
return
for key, fname in (("evals", "evals.json"), ("triggers", "triggers.json")):
candidate = d / fname
if candidate.is_file() and key not in found:
found[key] = candidate
if explicit is not None:
explicit = explicit.resolve()
if explicit.is_file():
if explicit.name == "evals.json":
found["evals"] = explicit
elif explicit.name == "triggers.json":
found["triggers"] = explicit
elif explicit.is_dir():
check_dir(explicit)
return found
skill_name = skill_path.name
candidates: list[Path] = [
skill_path / "evals",
skill_path.parent.parent / "evals" / skill_name,
project_root / "evals" / skill_name,
]
for d in candidates:
check_dir(d)
if found:
break
if not found:
evals_root = project_root / "evals"
if evals_root.is_dir():
for sub in evals_root.rglob(skill_name):
if sub.is_dir():
check_dir(sub)
if found:
break
return found
def utc_now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def new_run_id(skill_name: str) -> str:
return f"{datetime.now().strftime('%Y%m%d-%H%M%S')}-{skill_name}"
def have_docker() -> bool:
if shutil.which("docker") is None:
return False
try:
result = subprocess.run(
["docker", "info"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=5,
)
return result.returncode == 0
except Exception:
return False
def docker_image_present(image: str = "bmad-eval-runner:latest") -> bool:
if not have_docker():
return False
try:
result = subprocess.run(
["docker", "image", "inspect", image],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=10,
)
return result.returncode == 0
except Exception:
return False
def read_macos_keychain_credentials() -> str | None:
"""Read the Claude Code OAuth credentials JSON from the macOS Keychain.
Returns the raw JSON string stored under service "Claude Code-credentials",
or None if unavailable (non-macOS, entry missing, or access denied).
Called in the parent process — which owns the Keychain ACL — so the credential
can be staged into each isolated workspace's `.claude/.credentials.json` before
`claude -p` is launched. Without this, an isolated subprocess with HOME pointed
at an empty dir has no auth and every eval fails with "Not logged in."
"""
if sys.platform != "darwin":
return None
try:
result = subprocess.run(
["security", "find-generic-password", "-s", "Claude Code-credentials", "-w"],
capture_output=True,
timeout=5,
)
if result.returncode != 0:
return None
val = result.stdout.decode("utf-8", errors="replace").strip()
return val if val else None
except Exception:
return None
def stage_credentials(claude_dir: Path, credentials_json: str | None) -> None:
"""Write credentials_json to <claude_dir>/.credentials.json. No-op if None."""
if not credentials_json:
return
claude_dir.mkdir(parents=True, exist_ok=True)
(claude_dir / ".credentials.json").write_text(credentials_json, encoding="utf-8")
def write_json(path: Path, data: object) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
def read_json(path: Path) -> object:
return json.loads(path.read_text(encoding="utf-8"))
def parse_skill_dependencies(skill_path: Path) -> list[str]:
"""Return skill names declared under 'dependencies:' in SKILL.md frontmatter."""
try:
text = (skill_path / "SKILL.md").read_text(encoding="utf-8")
except (FileNotFoundError, OSError):
return []
fm = re.match(r"^---\s*\n(.*?)\n---", text, re.DOTALL)
if not fm:
return []
deps: list[str] = []
in_deps = False
for line in fm.group(1).splitlines():
if re.match(r"^dependencies\s*:", line):
in_deps = True
elif in_deps:
m = re.match(r"^\s+-\s+(\S+)", line)
if m:
deps.append(m.group(1))
elif not line.startswith((" ", "\t")):
break
return deps
def discover_setup_dirs(evals_file: Path, eval_id: str | None = None) -> list[Path]:
"""Return ordered list of setup overlay dirs that exist.
base: <evals_dir>/setup/
per-eval: <evals_dir>/<eval_id>/setup/
Applied base-first so per-eval overlays win on conflict.
"""
evals_dir = evals_file.parent
dirs: list[Path] = []
base = evals_dir / "setup"
if base.is_dir():
dirs.append(base)
if eval_id:
per_eval = evals_dir / eval_id / "setup"
if per_eval.is_dir():
dirs.append(per_eval)
return dirs
def apply_setup_overlay(setup_dirs: list[Path], dest: Path) -> None:
"""Rsync each setup dir onto dest in order (base first, per-eval last)."""
dest.mkdir(parents=True, exist_ok=True)
for src in setup_dirs:
if not src.is_dir():
continue
subprocess.run(
["rsync", "-a", f"{src}/", f"{dest}/"],
check=False,
)
__all__ = [
"parse_skill_md",
"discover_project_root",
"discover_evals",
"utc_now_iso",
"new_run_id",
"have_docker",
"docker_image_present",
"read_macos_keychain_credentials",
"stage_credentials",
"write_json",
"read_json",
"parse_skill_dependencies",
"discover_setup_dirs",
"apply_setup_overlay",
]