TuringsSolutions's picture
Create app.py
fc66328 verified
import os, io, re, json, math, struct, tempfile, traceback
from pathlib import Path
from typing import List, Tuple, Dict
import numpy as np
import gradio as gr
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import imageio.v2 as imageio # GIF creation
# -----------------------------
# Optional DOCX support
# -----------------------------
_DOCX_OK = False
try:
from docx import Document
_DOCX_OK = True
except Exception:
_DOCX_OK = False
# -----------------------------
# Embeddings: sentence-transformers (preferred), fallback to hashing
# -----------------------------
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.decomposition import PCA
_ST_MODEL = None
def _load_st_model():
global _ST_MODEL
if _ST_MODEL is not None:
return _ST_MODEL
try:
from sentence_transformers import SentenceTransformer
_ST_MODEL = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
return _ST_MODEL
except Exception:
return None
def embed_texts(texts: List[str], prefer_sentence_transformer: bool = True) -> Tuple[np.ndarray, str]:
texts = [t if isinstance(t, str) else str(t) for t in texts]
if prefer_sentence_transformer:
model = _load_st_model()
if model is not None:
try:
vecs = model.encode(
texts, batch_size=32, show_progress_bar=False,
convert_to_numpy=True, normalize_embeddings=True
)
return vecs.astype(np.float32), "sentence-transformers/all-MiniLM-L6-v2"
except Exception:
pass
hv = HashingVectorizer(n_features=768, alternate_sign=False, norm=None)
X = hv.transform(texts)
vecs = X.toarray().astype(np.float32)
norms = np.linalg.norm(vecs, axis=1, keepdims=True) + 1e-9
vecs = vecs / norms
return vecs, "HashingVectorizer(768d) fallback"
# -----------------------------
# Text ingestion / splitting
# -----------------------------
def _basic_sentence_split(text: str) -> List[str]:
rough = re.split(r'[\n\r]+|(?<=[\.\!\?])\s+', text.strip())
out = []
for s in rough:
s = s.strip()
if s:
out.append(s)
return out
def read_txt_bytes(b: bytes) -> str:
try:
return b.decode("utf-8")
except Exception:
return b.decode("latin-1", errors="ignore")
def read_docx_bytes(b: bytes) -> List[str]:
if not _DOCX_OK:
raise RuntimeError("python-docx not installed in this Space.")
bio = io.BytesIO(b)
doc = Document(bio)
paras = [p.text.strip() for p in doc.paragraphs]
return [p for p in paras if p and not p.isspace()]
def to_units(raw_text: str, mode: str) -> List[str]:
raw_text = raw_text.strip()
if not raw_text:
return []
if mode == "sentences":
return _basic_sentence_split(raw_text)
paras = [p.strip() for p in re.split(r"\n\s*\n+", raw_text) if p.strip()]
return paras
# -----------------------------
# Demo corpus (for effortless investor demos)
# -----------------------------
DEMO_CORPUS = """
In the beginning, people stored knowledge in libraries, then in databases, and now in neural networks.
Compression isn’t just saving space — it’s choosing what matters.
A constellation is a pattern you can navigate.
Entropy is a measure of surprise, and learning is surprise turning into structure.
A system that learns from compressed data never needs the original.
It doesn’t memorize pixels; it memorizes geometry.
It doesn’t hoard text; it extracts signals.
The question isn’t “Can it compress?” but “Can it learn after compressing?”
Investors love seeing systems move.
They love curves that fall.
They love maps that cluster.
They love a demo that feels alive.
This demo builds a codec from your dataset,
then trains a model exclusively on the codec’s byte stream.
No raw text is used during training.
Only the compressed stream exists.
We call the clusters constellations.
We call the structure harvestable.
We call the drop in entropy visible proof.
"""
# -----------------------------
# CHR core
# -----------------------------
def softmax(x, axis=-1):
x = x - np.max(x, axis=axis, keepdims=True)
ex = np.exp(x)
return ex / (np.sum(ex, axis=axis, keepdims=True) + 1e-9)
def global_range_entropy(p: np.ndarray) -> float:
m = p.mean(axis=0)
m_safe = np.clip(m, 1e-12, None)
return float(-(m_safe * np.log(m_safe)).sum())
def soft_slab_entropy(z: np.ndarray, U: np.ndarray, bins: int = 8, tau: float = 5.0) -> float:
t = z @ U.T
K = U.shape[0]
Hs = []
for j in range(K):
tj = t[:, j]
tmin, tmax = float(tj.min()), float(tj.max())
if not np.isfinite(tmin) or not np.isfinite(tmax) or tmax - tmin < 1e-6:
Hs.append(0.0)
continue
centers = np.linspace(tmin, tmax, bins)
dist2 = (tj[:, None] - centers[None, :]) ** 2
weights = softmax(-tau * dist2, axis=1)
hist = weights.mean(axis=0)
hist = np.clip(hist, 1e-12, None)
H = float(-(hist * np.log(hist)).sum())
Hs.append(H)
return float(np.mean(Hs)) if Hs else 0.0
def kmeans_plus_plus_init(z: np.ndarray, K: int, rng: np.random.RandomState) -> np.ndarray:
N, d = z.shape
inds = [rng.randint(0, N)]
centers = [z[inds[0]]]
cos0 = np.clip(z @ centers[0], -1.0, 1.0)
d2 = np.clip(1.0 - cos0, 1e-12, None)
for _ in range(1, K):
s = d2.sum()
if not np.isfinite(s) or s <= 0:
probs = np.full(N, 1.0 / N)
else:
probs = np.clip(d2 / s, 0.0, None)
probs = probs / (probs.sum() + 1e-12)
next_idx = rng.choice(N, p=probs)
inds.append(next_idx)
centers.append(z[next_idx])
cos_new = np.clip(z @ z[next_idx], -1.0, 1.0)
d2 = np.minimum(d2, np.clip(1.0 - cos_new, 1e-12, None))
U = np.stack(centers, axis=0)
U = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-9)
return U
def chr_optimize(z: np.ndarray, K: int = 8, iters: int = 30, beta: float = 12.0,
bins: int = 8, tau: float = 5.0, seed: int = 42):
rng = np.random.RandomState(seed)
N, d = z.shape
U = kmeans_plus_plus_init(z, K, rng) if N >= K else np.pad(z, ((0, max(0, K - N)), (0, 0)), mode="wrap")[:K]
U = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-9)
logits0 = beta * (z @ U.T)
p0 = softmax(logits0, axis=1)
Hg_traj = [global_range_entropy(p0)]
Hs_traj = [soft_slab_entropy(z, U, bins=bins, tau=tau)]
for _ in range(iters):
logits = beta * (z @ U.T)
p = softmax(logits, axis=1)
numer = p.T @ z
denom = p.sum(axis=0)[:, None] + 1e-9
U = numer / denom
U = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-9)
Hg_traj.append(global_range_entropy(p))
Hs_traj.append(soft_slab_entropy(z, U, bins=bins, tau=tau))
logits = beta * (z @ U.T)
p = softmax(logits, axis=1)
return U, p, np.array(Hg_traj), np.array(Hs_traj)
def compute_mhep(Hg_traj: np.ndarray, Hs_traj: np.ndarray, K: int, bins: int, w_g: float = 0.7, w_s: float = 0.3) -> float:
if len(Hg_traj) < 2 or len(Hs_traj) < 2:
return 0.0
maxHg = math.log(max(K, 2))
maxHs = math.log(max(bins, 2))
drop_g = max(0.0, float(Hg_traj[0] - Hg_traj[-1])) / (maxHg + 1e-9)
drop_s = max(0.0, float(Hs_traj[0] - Hs_traj[-1])) / (maxHs + 1e-9)
return float(np.clip(100.0 * (w_g * drop_g + w_s * drop_s), 0.0, 100.0))
# -----------------------------
# CHR → discrete "compressed" byte stream
# -----------------------------
def make_radial_bins(radials: np.ndarray, B: int = 64) -> np.ndarray:
edges = np.quantile(radials, np.linspace(0, 1, B + 1))
for i in range(1, len(edges)):
if edges[i] <= edges[i - 1]:
edges[i] = edges[i - 1] + 1e-6
return edges.astype(np.float32)
def quantize_radial(r: float, edges: np.ndarray) -> int:
b = np.searchsorted(edges, r, side="right") - 1
return int(np.clip(b, 0, len(edges) - 2))
def pack_codes_to_bytes(labels: np.ndarray, bins: np.ndarray) -> bytes:
out = bytearray()
for c, b in zip(labels.tolist(), bins.tolist()):
out.append(int(c) & 0xFF)
out.append(int(b) & 0xFF)
return bytes(out)
def save_codes_and_codec(code_bytes: bytes, codec: Dict, out_dir: str) -> Tuple[str, str]:
os.makedirs(out_dir, exist_ok=True)
bin_path = os.path.join(out_dir, "codes.bin")
meta_path = os.path.join(out_dir, "codec.json")
with open(bin_path, "wb") as f:
f.write(b"CHRC")
f.write(struct.pack("<I", 1))
f.write(code_bytes)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(codec, f, indent=2)
return bin_path, meta_path
# -----------------------------
# Visuals
# -----------------------------
def plot_entropy(Hg, Hs, out_path):
plt.figure(figsize=(6,4))
plt.plot(Hg, label="Global range entropy")
plt.plot(Hs, label="Slab entropy")
plt.xlabel("Iteration"); plt.ylabel("Entropy")
plt.title("Entropy drops during CHR compression")
plt.legend()
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_constellation_map(z, U, labels, out_path):
if z.shape[1] > 2:
pca = PCA(n_components=2, random_state=0)
Z2 = pca.fit_transform(z)
U2 = pca.transform(U)
else:
Z2, U2 = z, U
plt.figure(figsize=(6,5))
plt.scatter(Z2[:,0], Z2[:,1], s=14, alpha=0.8, c=labels)
plt.scatter(U2[:,0], U2[:,1], marker="*", s=200)
plt.title("Constellation map (compressed geometry)")
plt.xlabel("PC1"); plt.ylabel("PC2")
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_training_curves(losses, ppls, out_path):
plt.figure(figsize=(6,4))
plt.plot(losses, label="Loss")
plt.plot(ppls, label="Perplexity")
plt.xlabel("Checkpoint")
plt.title("Learning on compressed stream")
plt.legend()
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_rollout_tracks(seq_bytes: List[int], out_path, title="Compressed rollout"):
cs = seq_bytes[0::2]
bs = seq_bytes[1::2]
plt.figure(figsize=(8,3.6))
plt.plot(cs, label="Constellation id")
plt.plot(bs, label="Radial bin")
plt.ylim(-2, 260)
plt.xlabel("Step"); plt.title(title)
plt.legend()
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def plot_before_after_tracks(before_bytes: List[int], after_bytes: List[int], out_path):
b_c = before_bytes[0::2]; b_b = before_bytes[1::2]
a_c = after_bytes[0::2]; a_b = after_bytes[1::2]
plt.figure(figsize=(10,4))
plt.subplot(1,2,1)
plt.plot(b_c, label="Constellation")
plt.plot(b_b, label="Radial bin")
plt.title("BEFORE (untrained)")
plt.ylim(-2, 260)
plt.legend()
plt.subplot(1,2,2)
plt.plot(a_c, label="Constellation")
plt.plot(a_b, label="Radial bin")
plt.title("AFTER (trained)")
plt.ylim(-2, 260)
plt.legend()
plt.suptitle("Rollout comparison on compressed symbols")
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()
def rollout_to_xy(seq_bytes: List[int], U: np.ndarray, radial_edges: np.ndarray) -> np.ndarray:
"""
Convert (constellation id, radial bin) stream into approximate vectors r*U[c],
then project to 2D using PCA fitted on U only (codec-only visualization).
"""
cs = np.array(seq_bytes[0::2], dtype=np.int32)
bs = np.array(seq_bytes[1::2], dtype=np.int32)
K, d = U.shape
B = len(radial_edges) - 1
cs = np.clip(cs, 0, K-1)
bs = np.clip(bs, 0, B-1)
# use bin midpoints as radius
mids = 0.5 * (radial_edges[bs] + radial_edges[bs + 1]) # [T]
V = U[cs] * mids[:, None] # [T, d]
pca = PCA(n_components=2, random_state=0)
U2 = pca.fit_transform(U)
V2 = pca.transform(V)
return V2, U2
def make_rollout_gif(seq_bytes: List[int], U: np.ndarray, radial_edges: np.ndarray,
out_path: str, title: str = "Compressed rollout (animated)",
stride: int = 2, fps: int = 12):
V2, U2 = rollout_to_xy(seq_bytes, U, radial_edges)
frames = []
# bounds for stable view
xmin = min(V2[:,0].min(), U2[:,0].min()) - 0.2
xmax = max(V2[:,0].max(), U2[:,0].max()) + 0.2
ymin = min(V2[:,1].min(), U2[:,1].min()) - 0.2
ymax = max(V2[:,1].max(), U2[:,1].max()) + 0.2
for t in range(1, len(V2), stride):
fig = plt.figure(figsize=(6,5))
plt.scatter(U2[:,0], U2[:,1], marker="*", s=180) # anchors
plt.plot(V2[:t,0], V2[:t,1], linewidth=2) # path so far
plt.scatter(V2[t-1,0], V2[t-1,1], s=80) # current point
plt.title(title)
plt.xlim(xmin, xmax); plt.ylim(ymin, ymax)
plt.xlabel("PC1 (codec space)"); plt.ylabel("PC2 (codec space)")
plt.tight_layout()
buf = io.BytesIO()
plt.savefig(buf, format="png", dpi=150)
plt.close(fig)
buf.seek(0)
frames.append(imageio.imread(buf))
imageio.mimsave(out_path, frames, fps=fps)
# -----------------------------
# Byte-level transformer (PyTorch)
# -----------------------------
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
class ByteStreamDataset(Dataset):
def __init__(self, bin_path: str, block_size: int = 256):
with open(bin_path, "rb") as f:
blob = f.read()
assert blob[:4] == b"CHRC"
ver = int.from_bytes(blob[4:8], "little")
assert ver == 1
data = blob[8:]
self.data = torch.tensor(list(data), dtype=torch.long)
self.block_size = int(block_size)
def __len__(self):
return max(0, len(self.data) - self.block_size - 1)
def __getitem__(self, idx):
x = self.data[idx:idx+self.block_size]
y = self.data[idx+1:idx+self.block_size+1]
return x, y
class TinyByteTransformer(nn.Module):
def __init__(self, vocab_size=256, d_model=192, n_layers=4, n_heads=6, block_size=256):
super().__init__()
self.tok = nn.Embedding(vocab_size, d_model)
self.pos = nn.Embedding(block_size, d_model)
enc_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=n_heads, dim_feedforward=4*d_model,
dropout=0.1, batch_first=True
)
self.tr = nn.TransformerEncoder(enc_layer, num_layers=n_layers)
self.lm = nn.Linear(d_model, vocab_size)
self.block_size = block_size
def forward(self, x):
B, T = x.shape
pos = torch.arange(T, device=x.device).unsqueeze(0).expand(B, T)
h = self.tok(x) + self.pos(pos)
mask = torch.triu(torch.ones(T, T, device=x.device), diagonal=1).bool()
h = self.tr(h, mask=mask)
return self.lm(h)
@torch.no_grad()
def sample_bytes(model, start: List[int], steps: int, device: str = "cpu", temperature: float = 1.0) -> List[int]:
model.eval()
seq = start[:]
for _ in range(steps):
x = torch.tensor(seq[-model.block_size:], dtype=torch.long, device=device).unsqueeze(0)
logits = model(x)[0, -1] / max(1e-6, float(temperature))
probs = torch.softmax(logits, dim=-1)
nxt = int(torch.multinomial(probs, num_samples=1).item())
seq.append(nxt)
return seq
def train_on_compressed(bin_path: str,
steps: int = 800,
batch_size: int = 64,
block_size: int = 256,
lr: float = 3e-4,
device: str = "cpu",
log_every: int = 50):
ds = ByteStreamDataset(bin_path, block_size=block_size)
if len(ds) < 10:
raise RuntimeError("Not enough compressed data to train. Use more text or smaller block size.")
dl = DataLoader(ds, batch_size=batch_size, shuffle=True, drop_last=True)
it = iter(dl)
model = TinyByteTransformer(block_size=block_size).to(device)
opt = torch.optim.AdamW(model.parameters(), lr=lr)
loss_fn = nn.CrossEntropyLoss()
losses, ppls = [], []
model.train()
for step in range(1, steps+1):
try:
x, y = next(it)
except StopIteration:
it = iter(dl)
x, y = next(it)
x, y = x.to(device), y.to(device)
logits = model(x)
loss = loss_fn(logits.view(-1, 256), y.view(-1))
opt.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
opt.step()
if step % log_every == 0:
l = float(loss.detach().cpu().item())
ppl = float(torch.exp(loss.detach()).cpu().item())
losses.append(l)
ppls.append(ppl)
return model, losses, ppls
# -----------------------------
# Pipeline state
# -----------------------------
STATE = {
"units": None,
"Z": None,
"U": None,
"labels": None,
"bins": None,
"bin_path": None,
"meta_path": None,
"codec": None,
"model": None,
}
def _bytes_from_upload(file_obj) -> Tuple[bytes, str]:
if file_obj is None:
return b"", ""
if isinstance(file_obj, str) and os.path.exists(file_obj):
return Path(file_obj).read_bytes(), os.path.basename(file_obj)
if hasattr(file_obj, "name") and os.path.exists(file_obj.name):
return Path(file_obj.name).read_bytes(), os.path.basename(file_obj.name)
return b"", "upload"
# -----------------------------
# Gradio callbacks
# -----------------------------
def load_demo(units_mode: str):
units = to_units(DEMO_CORPUS, units_mode)
units = [u.strip() for u in units if u.strip()]
STATE["units"] = units
return f"Loaded **{len(units)}** demo units (built-in corpus)."
def ingest_file(file_obj, units_mode: str):
try:
b, name = _bytes_from_upload(file_obj)
if not b:
return "Upload a .txt or .docx file to begin."
if name.lower().endswith(".docx"):
paras = read_docx_bytes(b)
raw = "\n\n".join(paras)
else:
raw = read_txt_bytes(b)
units = to_units(raw, units_mode)
units = [u.strip() for u in units if u.strip()]
if len(units) > 3000:
units = units[:3000]
STATE["units"] = units
return f"Loaded **{len(units)}** units from **{name}**."
except Exception as e:
return f"Error ingesting file: {e}"
def compress_now(K, iters, beta, slab_bins, tau, seed, radial_bins):
try:
units = STATE.get("units")
if not units:
return "No units loaded. Upload a file or load the demo corpus.", None, None, None, None
Z, backend = embed_texts(units, prefer_sentence_transformer=True)
U, p, Hg, Hs = chr_optimize(Z, K=int(K), iters=int(iters), beta=float(beta),
bins=int(slab_bins), tau=float(tau), seed=int(seed))
labels = p.argmax(axis=1).astype(np.int32)
proj = Z @ U.T
radials = proj[np.arange(len(units)), labels].astype(np.float32)
edges = make_radial_bins(radials, B=int(radial_bins))
bins_q = np.array([quantize_radial(float(radials[i]), edges) for i in range(len(units))], dtype=np.int32)
code_bytes = pack_codes_to_bytes(labels, bins_q)
out_dir = tempfile.mkdtemp()
codec = {
"backend": backend,
"K": int(K),
"radial_bins": int(radial_bins),
"iters": int(iters),
"beta": float(beta),
"slab_bins": int(slab_bins),
"tau": float(tau),
"seed": int(seed),
"U": U.tolist(),
"radial_edges": edges.tolist(),
"units_count": int(len(units)),
"bytes_per_unit": 2.0,
"total_bytes": int(len(code_bytes) + 8),
}
bin_path, meta_path = save_codes_and_codec(code_bytes, codec, out_dir)
STATE.update({
"Z": Z, "U": U, "labels": labels, "bins": bins_q,
"bin_path": bin_path, "meta_path": meta_path, "codec": codec
})
ent_plot = os.path.join(out_dir, "entropy.png")
map_plot = os.path.join(out_dir, "map.png")
plot_entropy(Hg, Hs, ent_plot)
plot_constellation_map(Z, U, labels, map_plot)
mhep = compute_mhep(Hg, Hs, K=int(K), bins=int(slab_bins))
summary_md = (
f"## Compression Complete\n"
f"- **Embedding backend:** `{backend}`\n"
f"- **Units:** **{len(units)}**\n"
f"- **Constellations (K):** **{int(K)}**\n"
f"- **Radial bins:** **{int(radial_bins)}**\n"
f"- **Compressed stream size:** **{codec['total_bytes']} bytes**\n"
f"- **Bytes per unit:** **2.0** (constellation + radial bin)\n"
f"- **MHEP score:** **{mhep:.1f}%**\n"
f"\n### Investor-proof constraint\n"
f"Training input is **only** `codes.bin` (a byte stream)."
)
return summary_md, ent_plot, map_plot, bin_path, meta_path
except Exception as e:
return f"Compression error: {e}\n\n{traceback.format_exc()}", None, None, None, None
def train_now(train_steps, batch_size, block_size, lr, log_every, temperature, rollout_steps, gif_stride, gif_fps):
try:
bin_path = STATE.get("bin_path")
codec = STATE.get("codec")
U = STATE.get("U")
if not bin_path or not os.path.exists(bin_path) or codec is None or U is None:
return "No compressed stream found. Run compression first.", None, None, None, None
device = "cuda" if torch.cuda.is_available() else "cpu"
# Load stream bytes for starting context
with open(bin_path, "rb") as f:
blob = f.read()
stream = list(blob[8:])
start = stream[:min(len(stream), int(block_size))]
# ---- BEFORE: untrained (random) model rollout ----
untrained = TinyByteTransformer(block_size=int(block_size)).to(device)
before_seq = sample_bytes(
untrained, start=start, steps=int(rollout_steps),
device=device, temperature=float(temperature)
)
out_dir = os.path.dirname(bin_path)
before_plot = os.path.join(out_dir, "rollout_before.png")
plot_rollout_tracks(before_seq[-2*int(rollout_steps):], before_plot, title="BEFORE training (random)")
# ---- Train on compressed stream ----
model, losses, ppls = train_on_compressed(
bin_path=bin_path,
steps=int(train_steps),
batch_size=int(batch_size),
block_size=int(block_size),
lr=float(lr),
device=device,
log_every=int(log_every),
)
STATE["model"] = model
train_plot = os.path.join(out_dir, "training.png")
plot_training_curves(losses, ppls, train_plot)
# ---- AFTER: trained rollout ----
after_seq = sample_bytes(
model, start=start, steps=int(rollout_steps),
device=device, temperature=float(temperature)
)
after_plot = os.path.join(out_dir, "rollout_after.png")
plot_rollout_tracks(after_seq[-2*int(rollout_steps):], after_plot, title="AFTER training (trained model)")
# ---- Side-by-side comparison plot ----
compare_plot = os.path.join(out_dir, "rollout_compare.png")
plot_before_after_tracks(
before_seq[-2*int(rollout_steps):],
after_seq[-2*int(rollout_steps):],
compare_plot
)
# ---- Animated GIF (AFTER) in codec-only space ----
radial_edges = np.array(codec["radial_edges"], dtype=np.float32)
gif_path = os.path.join(out_dir, "rollout.gif")
make_rollout_gif(
after_seq[-2*int(rollout_steps):],
U=np.array(U, dtype=np.float32),
radial_edges=radial_edges,
out_path=gif_path,
title="AFTER training — animated traversal in codec space",
stride=int(gif_stride),
fps=int(gif_fps),
)
final_md = (
f"## Training Complete (compressed-only)\n"
f"- **Device:** `{device}`\n"
f"- **Steps:** **{int(train_steps)}** (logged every {int(log_every)})\n"
f"- **Final logged loss:** **{losses[-1]:.4f}**\n"
f"- **Final logged perplexity:** **{ppls[-1]:.2f}**\n"
f"\n### What investors should notice\n"
f"1) The **perplexity falls** (learning on compressed bytes).\n"
f"2) The **rollout changes** from noisy/random → structured.\n"
f"3) The GIF shows the model **navigating constellation space**."
)
metrics = {"loss": losses, "ppl": ppls}
return final_md, train_plot, compare_plot, gif_path, json.dumps(metrics, indent=2)
except Exception as e:
return f"Training error: {e}\n\n{traceback.format_exc()}", None, None, None, None
# -----------------------------
# Gradio UI
# -----------------------------
INTRO = """
# CHR Compressed-Only Learning (Investor Demo)
This Space compresses text into a **binary stream** (`codes.bin`) and trains a tiny transformer **only** on that byte stream.
**Investor wow features:**
- Entropy curves + constellation map during compression
- Training curves (loss + perplexity)
- **BEFORE vs AFTER** rollout comparison
- **Animated GIF** showing the model “moving” through codec space while generating compressed symbols
"""
with gr.Blocks(title="CHR Compressed-Only Learning (Investor Demo)") as demo:
gr.Markdown(INTRO)
with gr.Tab("1) Ingest"):
with gr.Row():
file_in = gr.File(label="Upload .txt or .docx", file_types=[".txt", ".docx"])
units_mode = gr.Radio(["paragraphs", "sentences"], value="sentences", label="Unit granularity")
with gr.Row():
ingest_btn = gr.Button("Load file", variant="primary")
demo_btn = gr.Button("Load built-in demo corpus", variant="secondary")
ingest_status = gr.Markdown("")
ingest_btn.click(ingest_file, inputs=[file_in, units_mode], outputs=[ingest_status])
demo_btn.click(load_demo, inputs=[units_mode], outputs=[ingest_status])
with gr.Tab("2) Compress (CHR → codes.bin)"):
with gr.Row():
K = gr.Slider(2, 48, value=16, step=1, label="K (constellations)")
iters = gr.Slider(5, 120, value=40, step=1, label="CHR iterations")
beta = gr.Slider(2, 30, value=16, step=1, label="beta (assignment sharpness)")
with gr.Row():
slab_bins = gr.Slider(3, 16, value=8, step=1, label="slab bins (entropy measure)")
tau = gr.Slider(1, 20, value=5, step=1, label="tau (slab softness)")
radial_bins = gr.Slider(8, 256, value=64, step=8, label="radial bins (compression alphabet)")
seed = gr.Slider(0, 9999, value=42, step=1, label="seed")
compress_btn = gr.Button("Compress → generate codes.bin", variant="primary")
compress_report = gr.Markdown("")
with gr.Row():
ent_img = gr.Image(label="Entropy during compression", type="filepath")
map_img = gr.Image(label="Constellation map (PCA)", type="filepath")
with gr.Row():
bin_file = gr.File(label="codes.bin (compressed stream)")
codec_file = gr.File(label="codec.json (metadata)")
compress_btn.click(
compress_now,
inputs=[K, iters, beta, slab_bins, tau, seed, radial_bins],
outputs=[compress_report, ent_img, map_img, bin_file, codec_file]
)
with gr.Tab("3) Train + Wow"):
with gr.Row():
train_steps = gr.Slider(100, 6000, value=900, step=50, label="training steps")
batch_size = gr.Slider(8, 256, value=64, step=8, label="batch size")
block_size = gr.Slider(64, 512, value=256, step=32, label="sequence length (bytes)")
with gr.Row():
lr = gr.Number(value=3e-4, label="learning rate")
log_every = gr.Slider(10, 200, value=50, step=10, label="log every (steps)")
temperature = gr.Slider(0.5, 2.0, value=1.0, step=0.05, label="rollout temperature")
rollout_steps = gr.Slider(60, 800, value=240, step=20, label="rollout steps (bytes)")
with gr.Row():
gif_stride = gr.Slider(1, 10, value=2, step=1, label="GIF stride (lower = smoother, heavier)")
gif_fps = gr.Slider(6, 24, value=12, step=1, label="GIF FPS")
train_btn = gr.Button("Train (compressed-only) + Generate visuals", variant="primary")
train_report = gr.Markdown("")
with gr.Row():
train_img = gr.Image(label="Loss + perplexity (compressed stream)", type="filepath")
compare_img = gr.Image(label="BEFORE vs AFTER rollout comparison", type="filepath")
with gr.Row():
gif_out = gr.Image(label="Animated rollout GIF (AFTER)", type="filepath")
metrics_json = gr.Code(label="Metrics (JSON)", language="json")
train_btn.click(
train_now,
inputs=[train_steps, batch_size, block_size, lr, log_every, temperature, rollout_steps, gif_stride, gif_fps],
outputs=[train_report, train_img, compare_img, gif_out, metrics_json]
)
if __name__ == "__main__":
demo.launch()