import os, subprocess, json, pathlib, time, shutil import gradio as gr # ---------- CONSTANTS (visible in App Files) ---------- RUN_ROOT = "/home/user/app/runs" # where all runs live LOG_ROOT = "/home/user/app/logs" # global logs (so we don't pre-create run dirs) LAST_PTR = pathlib.Path(RUN_ROOT) / "LAST" # remembers most recent run path os.makedirs(RUN_ROOT, exist_ok=True) os.makedirs(LOG_ROOT, exist_ok=True) # ---------- ENV / HUB ---------- DEFAULT_REPO_ID = os.environ.get("REPO_ID", "") # e.g. "zino36/lerobot-pusht-colab" PUSH_DEFAULT = os.environ.get("PUSH_TO_HUB", "true").lower() in {"1","true","yes"} HF_TOKEN = os.environ.get("HF_TOKEN") if HF_TOKEN: try: from huggingface_hub import login login(token=HF_TOKEN) except Exception as e: print("HF login failed:", e) # ---------- LOG HELPERS ---------- def _run(cmd: str, logfile: str): os.makedirs(os.path.dirname(logfile), exist_ok=True) with open(logfile, "a", buffering=1) as f: f.write("\n---- CMD ----\n" + cmd + "\n--------------\n") p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1) lines = [] for line in p.stdout: f.write(line) lines.append(line) p.wait() return p.returncode, "".join(lines[-200:]) def tail_file(path: str, n=200): if not os.path.exists(path): return "(no log yet)" with open(path, "r", errors="ignore") as f: lines = f.readlines() return "".join(lines[-n:]) # ---------- RUN DIR HELPERS ---------- def new_run_dir(): """Return a unique run dir path WITHOUT creating it (so LeRobot can create it).""" base = pathlib.Path(RUN_ROOT) / f"pusht_{int(time.time())}" d = base i = 1 while d.exists(): d = pathlib.Path(f"{base}_{i}") i += 1 LAST_PTR.write_text(str(d)) return str(d) def current_run_dir(user_override: str | None): """Prefer user text if given, else use the LAST pointer if present.""" if user_override and user_override.strip(): return user_override.strip() if LAST_PTR.exists(): return LAST_PTR.read_text().strip() return "" def has_checkpoint(run_dir: str): """We consider a checkpoint present once checkpoints/last/ exists (first save is at step 500).""" return os.path.isdir(os.path.join(run_dir, "checkpoints", "last")) def train_log_path_for_new(run_dir: str): """Write fresh-run logs to global LOG_ROOT so we don't pre-create run_dir.""" name = pathlib.Path(run_dir).name return os.path.join(LOG_ROOT, f"{name}.train.log") def train_log_path(run_dir: str): return os.path.join(run_dir, "logs", "train.log") def eval_log_path(run_dir: str): return os.path.join(run_dir, "logs", "eval.log") # ---------- ACTIONS ---------- def start_training(steps, batch_size, push_to_hub, repo_id): run_dir = new_run_dir() log = train_log_path_for_new(run_dir) push_flags = (f"--policy.push_to_hub=true --policy.repo_id='{repo_id.strip()}'" if push_to_hub and repo_id.strip() else "--policy.push_to_hub=false") cmd = ( "lerobot-train " f"--output_dir='{run_dir}' " "--policy.type=diffusion " "--dataset.repo_id=lerobot/pusht " "--env.type=pusht " f"--batch_size={batch_size} " f"--steps={steps} " "--eval_freq=500 " "--save_freq=500 " f"{push_flags}" ) rc, tail = _run(cmd, log) msg = f"Started fresh run at: {run_dir}\nTrain exited rc={rc}\n\n=== train.log tail ===\n{tail}" return msg, run_dir, tail_file(log) def resume_training(extra_steps, push_to_hub, repo_id, run_dir_text): run_dir = current_run_dir(run_dir_text) if not run_dir: return "No run found yet. Start a fresh training first.", "", "(no log)" log = train_log_path(run_dir) if not has_checkpoint(run_dir): return f"No checkpoint in {run_dir}/checkpoints/last/ yet โ€” run at least 500 steps once.", run_dir, tail_file(log) push_flags = (f"--policy.push_to_hub=true --policy.repo_id='{repo_id.strip()}'" if push_to_hub and repo_id.strip() else "--policy.push_to_hub=false") cmd = ( "lerobot-train " f"--output_dir='{run_dir}' " "--resume=true " f"--steps={extra_steps} " "--eval_freq=500 " "--save_freq=500 " f"{push_flags}" ) rc, tail = _run(cmd, log) msg = f"Resumed run at: {run_dir}\nResume exited rc={rc}\n\n=== train.log tail ===\n{tail}" return msg, run_dir, tail_file(log) def eval_latest(run_dir_text): run_dir = current_run_dir(run_dir_text) if not run_dir: return "No run found yet. Start a fresh training first.", "", "(no log)", "(no metrics)" elog = eval_log_path(run_dir) if not has_checkpoint(run_dir): return f"No checkpoint in {run_dir}/checkpoints/last/ to evaluate.", run_dir, tail_file(elog), "(no metrics)" ckpt = os.path.join(run_dir, "checkpoints", "last", "pretrained_model") eval_out_dir = os.path.join(run_dir, "eval_latest") os.makedirs(eval_out_dir, exist_ok=True) cmd = ( "lerobot-eval " f"--policy.path='{ckpt}' " "--env.type=pusht " "--eval.n_episodes=100 " "--eval.batch_size=50 " f"--output_dir='{eval_out_dir}'" ) rc, tail = _run(cmd, elog) # --- optional patch: parse the printed dict and write metrics.json --- import re, ast, json, pathlib metrics_txt = "(metrics.json not found)" p = pathlib.Path(eval_out_dir) / "metrics.json" # Try to parse the last dict-like summary from the log tail m = re.findall(r"\{[^}]+pc_success[^}]+\}", tail, flags=re.S) if m: try: d = ast.literal_eval(m[-1]) out = { "success_rate": d.get("pc_success"), "avg_max_overlap": d.get("avg_max_reward"), "avg_sum_reward": d.get("avg_sum_reward"), "eval_s": d.get("eval_s"), } p.write_text(json.dumps(out, indent=2)) metrics_txt = f"Success rate: {out['success_rate']}\nAvg max overlap: {out['avg_max_overlap']}" except Exception: pass elif p.exists(): # Fallback: if a previous metrics.json exists, show it try: d = json.loads(p.read_text()) metrics_txt = f"Success rate: {d.get('success_rate')}\nAvg max overlap: {d.get('avg_max_overlap')}" except Exception: metrics_txt = "(could not parse metrics.json)" # --- end patch --- msg = f"Evaluated run at: {run_dir}\nEval exited rc={rc}\n\n=== eval.log tail ===\n{tail}" return msg, run_dir, tail_file(elog), metrics_txt # ---------- Maintenance (list / delete runs) ---------- def list_runs(): root = pathlib.Path(RUN_ROOT) if not root.exists(): return "(no runs)" rows = [] for d in sorted(root.glob("pusht_*")): try: size = subprocess.check_output( ["bash","-lc", f"du -sh {d} | cut -f1"], text=True ).strip() except Exception: size = "?" ck = "โœ“" if has_checkpoint(str(d)) else "โ€”" rows.append(f"{d.name}\t{size}\tcheckpoint:{ck}") return "name\tsize\tcheckpoint\n" + "\n".join(rows) if rows else "(no runs)" def delete_run_by_name(name: str): name = os.path.basename((name or "").strip()) if not name: return "Type a folder like 'pusht_1234567890'.", list_runs() target = os.path.join(RUN_ROOT, name) if not target.startswith(RUN_ROOT + "/"): return "Refusing to delete outside runs/.", list_runs() if not os.path.isdir(target): return f"Folder not found: {target}", list_runs() shutil.rmtree(target, ignore_errors=True) # clear LAST if it pointed here if LAST_PTR.exists() and LAST_PTR.read_text().strip() == target: LAST_PTR.unlink(missing_ok=True) return f"Deleted {target}", list_runs() def delete_all_runs(): if not os.path.isdir(RUN_ROOT): return "(runs/ missing)", list_runs() for n in os.listdir(RUN_ROOT): p = os.path.join(RUN_ROOT, n) if os.path.isdir(p) and n.startswith("pusht_"): shutil.rmtree(p, ignore_errors=True) LAST_PTR.unlink(missing_ok=True) return "Deleted all pusht_* runs.", list_runs() # ---------- UI ---------- with gr.Blocks(title="LeRobot PushT Trainer (Space)") as demo: gr.Markdown("# ๐Ÿค– LeRobot PushT Trainer\nTrain / Resume / Evaluate. Files persist under `/home/user/app/runs/` (see App Files).") with gr.Row(): repo_id = gr.Textbox(label="Hugging Face Model Repo (optional)", value=DEFAULT_REPO_ID, placeholder="username/repo-name") push_to_hub = gr.Checkbox(label="Push checkpoints to Hub", value=PUSH_DEFAULT) with gr.Row(): steps = gr.Slider(200, 20000, value=2000, step=100, label="Training steps (fresh run)") batch = gr.Slider(4, 64, value=16, step=2, label="Batch size") start_btn = gr.Button("๐Ÿš€ Start Fresh Training") start_out = gr.Textbox(label="Start Output") run_dir_view = gr.Textbox(label="Current run directory (auto-filled after start)") train_log = gr.Textbox(label="train.log (tail)", lines=20) gr.Markdown("### Resume / Evaluate a Specific Run") run_dir_text = gr.Textbox(label="Run directory (leave blank to use the latest)") with gr.Row(): extra_steps = gr.Slider(200, 20000, value=2000, step=100, label="Steps to add on resume") resume_btn = gr.Button("โ–ถ๏ธ Resume from Last Checkpoint") resume_out = gr.Textbox(label="Resume Output") resume_log = gr.Textbox(label="train.log (tail)", lines=20) gr.Markdown("### Evaluate Latest Checkpoint of Selected Run") eval_btn = gr.Button("๐Ÿ“ˆ Evaluate Latest") eval_out = gr.Textbox(label="Eval Output") eval_log = gr.Textbox(label="eval.log (tail)", lines=20) metrics_box = gr.Textbox(label="Parsed metrics (if metrics.json exists)") gr.Markdown("### Runs on disk") list_btn = gr.Button("๐Ÿ“‚ List runs folder") list_out = gr.Textbox(label="runs/ listing", lines=12) gr.Markdown("### Maintenance") del_name = gr.Textbox(label="Run folder name to delete (e.g., pusht_1699999999)") del_one_btn = gr.Button("๐Ÿ—‘๏ธ Delete this run") del_all_btn = gr.Button("๐Ÿงน Delete ALL pusht_* runs") # Wiring start_btn.click(start_training, inputs=[steps, batch, push_to_hub, repo_id], outputs=[start_out, run_dir_view, train_log]) resume_btn.click(resume_training, inputs=[extra_steps, push_to_hub, repo_id, run_dir_text], outputs=[resume_out, run_dir_view, resume_log]) eval_btn.click(eval_latest, inputs=[run_dir_text], outputs=[eval_out, run_dir_view, eval_log, metrics_box]) list_btn.click(list_runs, outputs=list_out) del_one_btn.click(delete_run_by_name, inputs=del_name, outputs=[list_out, list_out]) del_all_btn.click(delete_all_runs, outputs=[list_out, list_out]) if __name__ == "__main__": demo.launch()