| import io |
| import json |
| import os |
| import zipfile |
| from dataclasses import dataclass |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| from .canon import DRP_BUNDLE_SPEC, DRP_EVENT_SPEC, hash_event, now_utc_iso |
|
|
|
|
| @dataclass |
| class Bundle: |
| manifest: Dict[str, Any] |
| events: List[Dict[str, Any]] |
|
|
|
|
| def _read_json_from_zip(z: zipfile.ZipFile, name: str) -> Dict[str, Any]: |
| with z.open(name, "r") as f: |
| return json.loads(f.read().decode("utf-8")) |
|
|
|
|
| def _read_jsonl_from_zip(z: zipfile.ZipFile, name: str) -> List[Dict[str, Any]]: |
| out: List[Dict[str, Any]] = [] |
| with z.open(name, "r") as f: |
| for line in f.read().decode("utf-8").splitlines(): |
| line = line.strip() |
| if not line: |
| continue |
| out.append(json.loads(line)) |
| return out |
|
|
|
|
| def load_bundle(zip_path: str) -> Bundle: |
| with zipfile.ZipFile(zip_path, "r") as z: |
| manifest = _read_json_from_zip(z, "manifest.json") |
| events = _read_jsonl_from_zip(z, "events.jsonl") |
| return Bundle(manifest=manifest, events=events) |
|
|
|
|
| def verify_bundle(zip_path: str) -> Tuple[bool, Dict[str, Any]]: |
| """ |
| Verifies: |
| - bundle spec fields exist |
| - each event has correct hash |
| - hash chain prev pointers match |
| """ |
| b = load_bundle(zip_path) |
|
|
| issues: List[str] = [] |
| if b.manifest.get("spec") != DRP_BUNDLE_SPEC: |
| issues.append(f"manifest.spec mismatch (expected {DRP_BUNDLE_SPEC})") |
|
|
| events = b.events |
| if not events: |
| issues.append("no events found") |
| return (False, {"ok": False, "issues": issues}) |
|
|
| prev_hash: Optional[str] = None |
| for idx, ev in enumerate(events): |
| if ev.get("spec") != DRP_EVENT_SPEC: |
| issues.append(f"event[{idx}].spec mismatch (expected {DRP_EVENT_SPEC})") |
|
|
| computed = hash_event(ev) |
| if ev.get("hash") != computed: |
| issues.append(f"event[{idx}] hash mismatch") |
|
|
| if idx > 0: |
| if ev.get("prev") != prev_hash: |
| issues.append(f"event[{idx}] prev pointer mismatch") |
|
|
| prev_hash = ev.get("hash") |
|
|
| ok = len(issues) == 0 |
| summary = { |
| "ok": ok, |
| "issues": issues, |
| "event_count": len(events), |
| "run_id": b.manifest.get("run_id"), |
| "created_at": b.manifest.get("created_at"), |
| "framework": b.manifest.get("framework"), |
| "model_id": b.manifest.get("model_id"), |
| } |
| return (ok, summary) |
|
|
|
|
| def write_bundle_zip( |
| out_zip_path: str, |
| *, |
| run_id: str, |
| framework: str, |
| model_id: str, |
| env_fingerprint: Dict[str, Any], |
| events_payloads: List[Dict[str, Any]], |
| created_at: Optional[str] = None, |
| replay: Optional[Dict[str, Any]] = None, |
| run_url: Optional[str] = None, |
| ) -> str: |
| """ |
| Creates a DRP bundle zip: |
| - manifest.json |
| - events.jsonl (hash-chained) |
| """ |
| created_at = created_at or now_utc_iso() |
|
|
| manifest: Dict[str, Any] = { |
| "spec": DRP_BUNDLE_SPEC, |
| "run_id": run_id, |
| "created_at": created_at, |
| "framework": framework, |
| "model_id": model_id, |
| "env": env_fingerprint, |
| } |
| if replay: |
| manifest["replay"] = replay |
| if run_url: |
| manifest["run_url"] = run_url |
|
|
| events: List[Dict[str, Any]] = [] |
| prev_hash: Optional[str] = None |
| for i, payload in enumerate(events_payloads): |
| ev = { |
| "spec": DRP_EVENT_SPEC, |
| "i": i, |
| "ts": payload.get("ts") or now_utc_iso(), |
| "kind": payload.get("kind", "state_snapshot"), |
| "step": payload.get("step", f"step-{i}"), |
| "payload": payload.get("payload", {}), |
| "prev": prev_hash, |
| } |
| ev["hash"] = hash_event(ev) |
| prev_hash = ev["hash"] |
| events.append(ev) |
|
|
| os.makedirs(os.path.dirname(out_zip_path) or ".", exist_ok=True) |
| with zipfile.ZipFile(out_zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z: |
| z.writestr("manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2)) |
| buf = io.StringIO() |
| for ev in events: |
| buf.write(json.dumps(ev, ensure_ascii=False, separators=(",", ":"))) |
| buf.write("\n") |
| z.writestr("events.jsonl", buf.getvalue()) |
|
|
| return out_zip_path |