- Nx 22.7 monorepo (pnpm 11.1, TypeScript 5.9, Node 24) - apps/api: NestJS 11 (CJS conforme CODING-RULES.md PGD-DB-004) - apps/web: React 19 + Vite 8 (ESM) - libs/shared/api-interface: Zod contract base - Docker Compose dev: Postgres 18, Valkey 8, MinIO, Mailpit - WDS artifacts: - design-artifacts/A-Product-Brief/ (5 docs canônicos + 16 dialogs) - design-artifacts/B-Trigger-Map/ (hub + 4 personas + feature impact) - Stack canon: STACK.md v2.2 + CODING-RULES.md v2.0 + brand.md - AGENTS.md + README.md como entrada para devs/agentes Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
367 lines
13 KiB
Python
367 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
# /// script
|
||
# requires-python = ">=3.9"
|
||
# ///
|
||
"""Run trigger evals: does the skill's description fire on each query?
|
||
|
||
Adapted from Anthropic skill-creator's run_eval.py
|
||
(https://github.com/anthropics/skills/tree/main/skills/skill-creator) with two
|
||
adaptations:
|
||
|
||
1. Isolation. Each query runs in either a fresh Docker container off
|
||
bmad-eval-runner:latest, or a fresh local tmp dir under ~/bmad-evals/<run-id>/
|
||
with HOME overridden to a clean directory. This prevents the host's global
|
||
CLAUDE.md and auto-memory from biasing whether the skill fires.
|
||
|
||
2. Output. Results are written to a run folder alongside the artifact eval
|
||
run-folder layout (so triggers and artifacts can share a single report).
|
||
|
||
Usage:
|
||
python3 run_triggers.py \\
|
||
--skill-path PATH \\
|
||
--triggers-file PATH/triggers.json \\
|
||
--output-dir PATH \\
|
||
--isolation docker|local \\
|
||
[--workers N] [--runs-per-query N] [--timeout SECS] [--threshold 0.5]
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import uuid
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from pathlib import Path
|
||
|
||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||
sys.path.insert(0, str(SCRIPT_DIR))
|
||
|
||
from utils import ( # noqa: E402
|
||
new_run_id,
|
||
parse_skill_md,
|
||
read_json,
|
||
read_macos_keychain_credentials,
|
||
stage_credentials,
|
||
utc_now_iso,
|
||
write_json,
|
||
)
|
||
|
||
DOCKER_IMAGE = "bmad-eval-runner:latest"
|
||
_KEYCHAIN_CREDS: str | None = read_macos_keychain_credentials()
|
||
|
||
|
||
def write_synthetic_skill(skills_dir: Path, skill_name: str, description: str, unique_id: str) -> tuple[Path, str]:
|
||
"""Place a synthetic skill at <skills_dir>/<clean_name>/SKILL.md.
|
||
|
||
The Skill tool only fires for entries discovered as actual skills (frontmatter
|
||
`name` + `description` under a `.claude/skills/<name>/SKILL.md`). Slash-commands
|
||
under `.claude/commands/` do not auto-invoke the Skill tool, so the previous
|
||
implementation could never observe a positive trigger. This places the synthetic
|
||
skill where Claude Code looks for skills, with a unique name so the detector
|
||
can disambiguate it from any pre-existing skill of the same display name.
|
||
"""
|
||
clean_name = f"{skill_name}-skill-{unique_id}"
|
||
skill_root = skills_dir / clean_name
|
||
skill_root.mkdir(parents=True, exist_ok=True)
|
||
path = skill_root / "SKILL.md"
|
||
indented_desc = "\n ".join(description.split("\n"))
|
||
path.write_text(
|
||
f"---\n"
|
||
f"name: {clean_name}\n"
|
||
f"description: |\n"
|
||
f" {indented_desc}\n"
|
||
f"---\n\n"
|
||
f"# {skill_name}\n\n"
|
||
f"This skill handles: {description}\n",
|
||
encoding="utf-8",
|
||
)
|
||
return path, clean_name
|
||
|
||
|
||
def parse_stream_for_trigger(buffer: str, clean_name: str) -> tuple[bool | None, str]:
|
||
"""Return (triggered_or_none, leftover_buffer). None means undecided yet."""
|
||
triggered: bool | None = None
|
||
pending_tool: str | None = None
|
||
accumulated_json = ""
|
||
leftover = ""
|
||
|
||
while "\n" in buffer:
|
||
line, buffer = buffer.split("\n", 1)
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
try:
|
||
evt = json.loads(line)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
if evt.get("type") == "stream_event":
|
||
se = evt.get("event", {})
|
||
t = se.get("type", "")
|
||
if t == "content_block_start":
|
||
cb = se.get("content_block", {})
|
||
if cb.get("type") == "tool_use":
|
||
name = cb.get("name", "")
|
||
if name in ("Skill", "Read"):
|
||
pending_tool = name
|
||
accumulated_json = ""
|
||
else:
|
||
return False, ""
|
||
elif t == "content_block_delta" and pending_tool:
|
||
delta = se.get("delta", {})
|
||
if delta.get("type") == "input_json_delta":
|
||
accumulated_json += delta.get("partial_json", "")
|
||
if clean_name in accumulated_json:
|
||
return True, ""
|
||
elif t in ("content_block_stop", "message_stop"):
|
||
if pending_tool:
|
||
return clean_name in accumulated_json, ""
|
||
if t == "message_stop":
|
||
return False, ""
|
||
elif evt.get("type") == "assistant":
|
||
for item in evt.get("message", {}).get("content", []):
|
||
if item.get("type") != "tool_use":
|
||
continue
|
||
tname = item.get("name", "")
|
||
tinput = item.get("input", {})
|
||
if tname == "Skill" and clean_name in tinput.get("skill", ""):
|
||
return True, ""
|
||
if tname == "Read" and clean_name in tinput.get("file_path", ""):
|
||
return True, ""
|
||
return False, ""
|
||
elif evt.get("type") == "result":
|
||
return triggered if triggered is not None else False, ""
|
||
leftover = buffer
|
||
return triggered, leftover
|
||
|
||
|
||
def run_query_local(query: str, skill_name: str, description: str,
|
||
workspace_root: Path, timeout: int) -> bool:
|
||
workspace_root.mkdir(parents=True, exist_ok=True)
|
||
home_dir = workspace_root / ".home"
|
||
(home_dir / ".claude").mkdir(parents=True, exist_ok=True)
|
||
stage_credentials(home_dir / ".claude", _KEYCHAIN_CREDS)
|
||
project_dir = workspace_root / "project"
|
||
skills_dir = project_dir / ".claude" / "skills"
|
||
project_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
unique = uuid.uuid4().hex[:8]
|
||
cmd_file, clean_name = write_synthetic_skill(skills_dir, skill_name, description, unique)
|
||
|
||
env = {
|
||
"HOME": str(home_dir),
|
||
"CLAUDE_CONFIG_DIR": str(home_dir / ".claude"),
|
||
"PATH": os.environ.get("PATH", ""),
|
||
"ANTHROPIC_API_KEY": os.environ.get("ANTHROPIC_API_KEY", ""),
|
||
}
|
||
|
||
cmd = [
|
||
"claude", "-p", query,
|
||
"--output-format", "stream-json",
|
||
"--verbose",
|
||
"--include-partial-messages",
|
||
"--dangerously-skip-permissions",
|
||
]
|
||
|
||
try:
|
||
proc = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.DEVNULL,
|
||
cwd=str(project_dir),
|
||
env=env,
|
||
)
|
||
buffer = ""
|
||
triggered: bool | None = None
|
||
start = time.time()
|
||
try:
|
||
while time.time() - start < timeout:
|
||
if proc.poll() is not None:
|
||
rest = proc.stdout.read()
|
||
if rest:
|
||
buffer += rest.decode("utf-8", errors="replace")
|
||
break
|
||
chunk = proc.stdout.read1(8192) if hasattr(proc.stdout, "read1") else proc.stdout.read(8192)
|
||
if not chunk:
|
||
time.sleep(0.05)
|
||
continue
|
||
buffer += chunk.decode("utf-8", errors="replace")
|
||
decided, buffer = parse_stream_for_trigger(buffer, clean_name)
|
||
if decided is not None:
|
||
triggered = decided
|
||
break
|
||
finally:
|
||
if proc.poll() is None:
|
||
proc.kill()
|
||
proc.wait()
|
||
if triggered is None:
|
||
decided, _ = parse_stream_for_trigger(buffer + "\n", clean_name)
|
||
triggered = bool(decided)
|
||
return bool(triggered)
|
||
finally:
|
||
try:
|
||
shutil.rmtree(cmd_file.parent, ignore_errors=True)
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def run_query_docker(query: str, skill_name: str, description: str,
|
||
workspace_root: Path, timeout: int) -> bool:
|
||
workspace_root.mkdir(parents=True, exist_ok=True)
|
||
unique = uuid.uuid4().hex[:8]
|
||
skills_in = workspace_root / "skills_in"
|
||
skills_in.mkdir(parents=True, exist_ok=True)
|
||
_, clean_name = write_synthetic_skill(skills_in, skill_name, description, unique)
|
||
|
||
creds_dir: Path | None = None
|
||
if _KEYCHAIN_CREDS:
|
||
creds_dir = workspace_root / "creds_in"
|
||
creds_dir.mkdir(parents=True, exist_ok=True)
|
||
(creds_dir / ".credentials.json").write_text(_KEYCHAIN_CREDS, encoding="utf-8")
|
||
|
||
container_script = f"""
|
||
set -e
|
||
mkdir -p /workspace/.claude/skills
|
||
cp -R /skills/. /workspace/.claude/skills/ 2>/dev/null || true
|
||
if [ -f /creds/.credentials.json ]; then
|
||
mkdir -p /home/evaluator/.claude
|
||
cp /creds/.credentials.json /home/evaluator/.claude/.credentials.json
|
||
fi
|
||
cd /workspace
|
||
claude -p "$EVAL_QUERY" \\
|
||
--output-format stream-json --verbose --include-partial-messages \\
|
||
--dangerously-skip-permissions \\
|
||
> /output/stream.jsonl 2>/dev/null || true
|
||
"""
|
||
|
||
output_dir = workspace_root / "output"
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
cmd = [
|
||
"docker", "run", "--rm",
|
||
"-v", f"{skills_in}:/skills:ro",
|
||
"-v", f"{output_dir}:/output",
|
||
"-e", "ANTHROPIC_API_KEY",
|
||
"-e", f"EVAL_QUERY={query}",
|
||
]
|
||
if creds_dir:
|
||
cmd += ["-v", f"{creds_dir}:/creds:ro"]
|
||
cmd += [DOCKER_IMAGE, "bash", "-c", container_script]
|
||
|
||
try:
|
||
subprocess.run(cmd, capture_output=True, timeout=timeout + 30)
|
||
except subprocess.TimeoutExpired:
|
||
pass
|
||
|
||
stream_file = output_dir / "stream.jsonl"
|
||
if not stream_file.is_file():
|
||
return False
|
||
decided, _ = parse_stream_for_trigger(stream_file.read_text(encoding="utf-8", errors="replace") + "\n", clean_name)
|
||
return bool(decided)
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(description="Run trigger evals in isolation")
|
||
parser.add_argument("--skill-path", required=True, type=Path)
|
||
parser.add_argument("--triggers-file", required=True, type=Path)
|
||
parser.add_argument("--output-dir", required=True, type=Path)
|
||
parser.add_argument("--isolation", choices=("docker", "local"), required=True)
|
||
parser.add_argument("--workers", type=int, default=8)
|
||
parser.add_argument("--runs-per-query", type=int, default=3)
|
||
parser.add_argument("--timeout", type=int, default=45)
|
||
parser.add_argument("--threshold", type=float, default=0.5)
|
||
parser.add_argument("--quiet", action="store_true")
|
||
args = parser.parse_args()
|
||
|
||
skill_path = args.skill_path.resolve()
|
||
triggers_file = args.triggers_file.resolve()
|
||
if not triggers_file.is_file():
|
||
print(f"triggers file not found: {triggers_file}", file=sys.stderr)
|
||
return 2
|
||
|
||
skill_name, description, _ = parse_skill_md(skill_path)
|
||
queries = read_json(triggers_file)
|
||
|
||
run_id = new_run_id(f"{skill_name}-triggers")
|
||
run_dir = (args.output_dir / run_id).resolve()
|
||
(run_dir / "queries").mkdir(parents=True, exist_ok=True)
|
||
|
||
write_json(run_dir / "run.json", {
|
||
"run_id": run_id,
|
||
"skill_name": skill_name,
|
||
"description": description,
|
||
"isolation": args.isolation,
|
||
"started_at": utc_now_iso(),
|
||
"query_count": len(queries),
|
||
"runs_per_query": args.runs_per_query,
|
||
"threshold": args.threshold,
|
||
})
|
||
|
||
runner = run_query_docker if args.isolation == "docker" else run_query_local
|
||
|
||
def run_one(idx: int, q: dict, run_idx: int) -> tuple[int, bool]:
|
||
ws = run_dir / "queries" / f"q{idx:03d}-r{run_idx}"
|
||
triggered = runner(q["query"], skill_name, description, ws, args.timeout)
|
||
return idx, triggered
|
||
|
||
per_query: dict[int, list[bool]] = {}
|
||
if not args.quiet:
|
||
print(f"[run_triggers] {len(queries)} queries × {args.runs_per_query} runs, isolation={args.isolation}", file=sys.stderr)
|
||
|
||
with ThreadPoolExecutor(max_workers=args.workers) as pool:
|
||
futures = []
|
||
for idx, q in enumerate(queries):
|
||
for run_idx in range(args.runs_per_query):
|
||
futures.append(pool.submit(run_one, idx, q, run_idx))
|
||
for fut in as_completed(futures):
|
||
try:
|
||
idx, triggered = fut.result()
|
||
except Exception as e:
|
||
print(f"Warning: query failed: {e}", file=sys.stderr)
|
||
continue
|
||
per_query.setdefault(idx, []).append(triggered)
|
||
|
||
results = []
|
||
for idx, q in enumerate(queries):
|
||
triggers = per_query.get(idx, [])
|
||
rate = (sum(triggers) / len(triggers)) if triggers else 0.0
|
||
should = bool(q["should_trigger"])
|
||
if should:
|
||
passed = rate >= args.threshold
|
||
else:
|
||
passed = rate < args.threshold
|
||
results.append({
|
||
"query": q["query"],
|
||
"should_trigger": should,
|
||
"trigger_rate": rate,
|
||
"triggers": int(sum(triggers)),
|
||
"runs": len(triggers),
|
||
"pass": passed,
|
||
})
|
||
|
||
output = {
|
||
"run_id": run_id,
|
||
"completed_at": utc_now_iso(),
|
||
"skill_name": skill_name,
|
||
"description": description,
|
||
"isolation": args.isolation,
|
||
"results": results,
|
||
"summary": {
|
||
"total": len(results),
|
||
"passed": sum(1 for r in results if r["pass"]),
|
||
"failed": sum(1 for r in results if not r["pass"]),
|
||
},
|
||
}
|
||
write_json(run_dir / "triggers-result.json", output)
|
||
print(json.dumps(output, indent=2))
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|