Building a Stable Fable 5 Traces Workflow in Colab: Parsing Tool Calls, Auditing Data, and Training Baselines

building-a-stable-fable-5-traces-workflow-in-colab:-parsing-tool-calls,-auditing-data,-and-training-baselines
Building a Stable Fable 5 Traces Workflow in Colab: Parsing Tool Calls, Auditing Data, and Training Baselines

In this tutorial, we work with the Fable 5 Traces dataset from Hugging Face and build a complete workflow around real coding-agent trace data. We start by setting up a lightweight environment that avoids fragile dependencies such as datasets, scikit-learn, and scipy. Then we manually download and parse the merged JSONL file to keep the notebook stable in Colab. From there, we inspect repository files, preview raw trace examples, normalize tool calls and text outputs, audit the dataset structure, detect potential secret-like patterns, and visualize key distributions, including output types, tools, source roots, and text lengths. We also create safe no-CoT chat/SFT exports, build a simple keyword-search helper, and train pure-Python Naive Bayes baselines to assess whether trace context can predict the assistant’s output type and tool usage.

Setting Up the Fable 5 Traces Colab Environment and Helpers

import os import sys import json import re import math import random import subprocess from pathlib import Path from collections import Counter, defaultdict def install_packages():    packages = [        "huggingface_hub>=0.23.0",        "rich>=13.0.0",        "tqdm>=4.66.0",    ]    subprocess.run(        [            sys.executable,            "-m",            "pip",            "install",            "-q",            "-U",            "--upgrade-strategy",            "only-if-needed",            *packages,        ],        check=False,    ) install_packages() import pandas as pd import matplotlib.pyplot as plt try:    import numpy as np except Exception:    np = None from tqdm.auto import tqdm from rich import print as rprint from rich.panel import Panel from rich.table import Table from huggingface_hub import HfApi, hf_hub_download from IPython.display import display DATASET_ID = "Glint-Research/Fable-5-traces" FLAT_JSONL_FILENAME = "fable5_cot_merged.jsonl" OUT_DIR = Path("https://www.marktechpost.com/content/fable5_traces_tutorial_outputs") OUT_DIR.mkdir(parents=True, exist_ok=True) SEED = 42 random.seed(SEED) if np is not None:    np.random.seed(SEED) MAX_PREVIEW_CHARS = 900 N_AGENT_TRACE_PREVIEWS = 2 N_SAFE_DATASET_PREVIEWS = 3 SAVE_COT_RESEARCH_EXPORT = False MAX_ROWS_TO_LOAD = None rprint(    Panel.fit(        f"[bold]Fable 5 Traces Advanced Tutorial[/bold]n"        f"Dataset: {DATASET_ID}n"        f"Output directory: {OUT_DIR}n"        f"Manual JSONL loading: Truen"        f"CoT research export enabled: {SAVE_COT_RESEARCH_EXPORT}",        title="Setup",    ) ) SECRET_PATTERNS = [    r"sk-[A-Za-z0-9_-]{20,}",    r"hf_[A-Za-z0-9_-]{20,}",    r"github_pat_[A-Za-z0-9_]{20,}",    r"ghp_[A-Za-z0-9]{20,}",    r"xox[baprs]-[A-Za-z0-9-]{20,}",    r"AKIA[0-9A-Z]{16}",    r"(?i:(api[_-]?key|secret|token|password)s*[:=]s*['"]?[^'"s]{8,})", ] SECRET_RE = re.compile("|".join(f"(?:{pattern})" for pattern in SECRET_PATTERNS)) TOKEN_RE = re.compile(r"[A-Za-z_][A-Za-z_0-9]{1,}|[./\-]{2,}|[{}()[]:=<>]+") def safe_json_dumps(obj, max_chars=None):    try:        text = json.dumps(obj, ensure_ascii=False, indent=2, default=str)    except Exception:        text = str(obj)    if max_chars is not None and len(text) > max_chars:        return text[:max_chars] + "n... [truncated]"    return text def is_missing_scalar(value):    if value is None:        return True    if isinstance(value, (list, dict, tuple, set)):        return False    try:        return bool(pd.isna(value))    except Exception:        return False def clean_for_json(value):    if is_missing_scalar(value):        return None    if isinstance(value, dict):        return {str(k): clean_for_json(v) for k, v in value.items()}    if isinstance(value, list):        return [clean_for_json(v) for v in value]    if isinstance(value, tuple):        return [clean_for_json(v) for v in value]    if np is not None:        if isinstance(value, np.integer):            return int(value)        if isinstance(value, np.floating):            if math.isnan(float(value)):                return None            return float(value)        if isinstance(value, np.ndarray):            return value.tolist()    return value def redact_possible_secrets(text):    if text is None:        return ""    text = str(text)    return SECRET_RE.sub("[REDACTED_POSSIBLE_SECRET]", text) def contains_possible_secret(text):    if text is None:        return False    return bool(SECRET_RE.search(str(text))) def preview_text(text, max_chars=MAX_PREVIEW_CHARS):    text = redact_possible_secrets(text)    text = re.sub(r"s+", " ", text).strip()    if len(text) > max_chars:        return text[:max_chars] + " ... [truncated]"    return text 

We begin by setting up the Colab environment with only the lightweight packages needed for this workflow. We define the dataset path, output directory, random seed, preview limits, and export options so the tutorial behaves consistently. We also create the first set of helper functions for safe JSON formatting, secret redaction, missing-value handling, and clean text previews.

Building Parsing Utilities for Tool Calls and Text Outputs

def maybe_parse_json_string(value):    if isinstance(value, str):        stripped = value.strip()        if (stripped.startswith("{") and stripped.endswith("}")) or (            stripped.startswith("[") and stripped.endswith("]")        ):            try:                return json.loads(stripped)            except Exception:                return value    return value def normalize_output_obj(value):    return maybe_parse_json_string(value) def extract_tool_name(output):    output = normalize_output_obj(output)    if isinstance(output, dict):        direct_keys = [            "name",            "tool_name",            "tool",            "function",            "command_name",            "recipient_name",            "toolName",            "callee",        ]        for key in direct_keys:            value = output.get(key)            if isinstance(value, str) and value.strip():                return value.strip()        nested_keys = [            "tool_call",            "toolCall",            "function_call",            "call",            "action",        ]        for nested_key in nested_keys:            nested = output.get(nested_key)            if isinstance(nested, dict):                found = extract_tool_name(nested)                if found:                    return found        output_type = output.get("type")        if isinstance(output_type, str):            output_type = output_type.strip()            if output_type and output_type.lower() not in {"tool_use", "text", "message"}:                return output_type    return "" def extract_tool_args(output):    output = normalize_output_obj(output)    if isinstance(output, dict):        direct_arg_keys = [            "input",            "args",            "arguments",            "parameters",            "kwargs",            "json",            "payload",        ]        for key in direct_arg_keys:            if key in output:                return output[key]        nested_keys = [            "tool_call",            "toolCall",            "function_call",            "call",            "action",        ]        for nested_key in nested_keys:            nested = output.get(nested_key)            if isinstance(nested, dict):                args = extract_tool_args(nested)                if args not in [None, "", {}]:                    return args        ignored = {            "name",            "tool_name",            "tool",            "function",            "command_name",            "recipient_name",            "toolName",            "callee",            "type",        }        return {key: value for key, value in output.items() if key not in ignored}    return {} def extract_text_payload(output):    output = normalize_output_obj(output)    if isinstance(output, str):        return output    if isinstance(output, dict):        text_keys = [            "text",            "content",            "message",            "output",            "value",            "result",        ]        for key in text_keys:            value = output.get(key)            if isinstance(value, str):                return value            if isinstance(value, list):                return safe_json_dumps(value)            if isinstance(value, dict):                nested = extract_text_payload(value)                if nested:                    return nested        return safe_json_dumps(output)    return str(output) def robust_len(value):    if value is None:        return 0    return len(str(value)) def source_root(source_file):    source_file = str(source_file or "").replace("\", "https://www.marktechpost.com/")    if not source_file:        return "unknown"    parts = [part for part in source_file.split("https://www.marktechpost.com/") if part]    for marker in ["projects", "AIArchives", "archives", "claude"]:        if marker in parts:            idx = parts.index(marker)            if idx + 1 < len(parts):                return parts[idx + 1]    if len(parts) >= 2:        return parts[-2]    if parts:        return parts[0]    return "unknown" def write_jsonl(path, records):    path = Path(path)    with path.open("w", encoding="utf-8") as file:        for record in records:            file.write(json.dumps(clean_for_json(record), ensure_ascii=False, default=str) + "n") def save_plot(path):    path = Path(path)    plt.tight_layout()    plt.savefig(path, dpi=160, bbox_inches="tight")    plt.show()    plt.close()    return path def print_basic_table(title, rows, columns=("Metric", "Value")):    table = Table(title=title)    for column in columns:        table.add_column(str(column))    for row in rows:        table.add_row(*[str(item) for item in row])    rprint(table) def tokenize(text, max_chars=12000):    text = str(text or "")[:max_chars].lower()    return TOKEN_RE.findall(text) def load_jsonl_manual(path, max_rows=None):    records = []    bad_lines = []    with open(path, "r", encoding="utf-8") as file:        for line_number, line in tqdm(enumerate(file, start=1), desc="Reading JSONL"):            line = line.strip()            if not line:                continue            try:                records.append(json.loads(line))            except Exception as error:                bad_lines.append(                    {                        "line_number": line_number,                        "error": repr(error),                        "preview": line[:500],                    }                )            if max_rows is not None and len(records) >= max_rows:                break    return records, bad_lines 

We build the core parsing utilities that turn raw output fields into usable tool names, tool arguments, and text payloads. We also define helpers for measuring text length, identifying source roots, writing JSONL files, saving plots, and printing clean tables. We finish this snippet by adding tokenization and manual JSONL loading to avoid fragile dataset-loading dependencies.

Inspecting the Hugging Face Repository and Loading JSONL Traces

rprint(Panel.fit("[bold]Inspecting Hugging Face dataset repository[/bold]")) api = HfApi() files = api.list_repo_files(repo_id=DATASET_ID, repo_type="dataset") pi_trace_files = [    file for file in files    if file.startswith("pi-traces/") and file.endswith(".jsonl") ] file_summary = {    "total_repo_files": len(files),    "jsonl_files": sum(file.endswith(".jsonl") for file in files),    "pi_trace_files": len(pi_trace_files),    "claude_files": sum(file.startswith("claude/") for file in files),    "has_flat_jsonl": FLAT_JSONL_FILENAME in files, } print_basic_table(    "Repository File Summary",    [(key, value) for key, value in file_summary.items()], ) rprint("[bold]Sample repository files:[/bold]") for file in files[:20]:    print(" -", file) rprint(Panel.fit("[bold]Manual raw pi-trace preview[/bold]")) pi_examples = [] if pi_trace_files:    for trace_file in pi_trace_files[:N_AGENT_TRACE_PREVIEWS]:        try:            local_trace_path = hf_hub_download(                repo_id=DATASET_ID,                repo_type="dataset",                filename=trace_file,            )            trace_records, trace_bad_lines = load_jsonl_manual(local_trace_path, max_rows=1)            if trace_records:                example = trace_records[0]                pi_examples.append(example)                preview_payload = {                    "trace_file": trace_file,                    "keys": list(example.keys()),                    "preview": example,                }                rprint(                    Panel(                        safe_json_dumps(preview_payload, max_chars=3000),                        title=f"Raw pi-trace preview: {trace_file}",                    )                )            if trace_bad_lines:                rprint(                    f"[yellow]Bad JSONL lines in {trace_file}: {len(trace_bad_lines)}[/yellow]"                )        except Exception as error:            rprint(f"[yellow]Could not preview {trace_file}[/yellow]")            rprint(repr(error)) else:    rprint("[yellow]No pi-traces JSONL files found.[/yellow]") rprint(Panel.fit("[bold]Downloading flat merged JSONL from Hugging Face Hub[/bold]")) flat_path = hf_hub_download(    repo_id=DATASET_ID,    repo_type="dataset",    filename=FLAT_JSONL_FILENAME, ) rprint(f"[green]Downloaded flat file:[/green] {flat_path}") rprint(Panel.fit("[bold]Loading flat JSONL manually[/bold]")) records, bad_lines = load_jsonl_manual(flat_path, max_rows=MAX_ROWS_TO_LOAD) if bad_lines:    bad_lines_path = OUT_DIR / "bad_jsonl_lines.json"    with open(bad_lines_path, "w", encoding="utf-8") as file:        json.dump(bad_lines, file, ensure_ascii=False, indent=2)    rprint(f"[yellow]Bad JSONL lines found: {len(bad_lines)} -> {bad_lines_path}[/yellow]") df = pd.DataFrame.from_records(records) rprint(f"[green]Loaded rows:[/green] {len(df):,}") rprint(f"[green]DataFrame shape:[/green] {df.shape}") rprint("[bold]Columns:[/bold]") print(list(df.columns)) display(df.head(3)) expected_cols = [    "uid",    "source_file",    "session",    "model",    "context",    "cot",    "output_type",    "output",    "completion",    "origin", ] for column in expected_cols:    if column not in df.columns:        df[column] = None df["output_norm"] = df["output"].map(normalize_output_obj) df["tool_name"] = df["output_norm"].map(extract_tool_name) df["tool_args"] = df["output_norm"].map(extract_tool_args) df["text_payload"] = df["output_norm"].map(extract_text_payload) df["context_chars"] = df["context"].map(robust_len) df["cot_chars"] = df["cot"].map(robust_len) df["completion_chars"] = df["completion"].map(robust_len) df["text_payload_chars"] = df["text_payload"].map(robust_len) df["source_root"] = df["source_file"].map(source_root) df["possible_secret_in_context"] = df["context"].map(contains_possible_secret) df["possible_secret_in_completion"] = df["completion"].map(contains_possible_secret) df["possible_secret_anywhere"] = (    df["possible_secret_in_context"] | df["possible_secret_in_completion"] ) 

We inspect the Hugging Face dataset repository and summarize the number of files, JSONL traces, and flat-merged files available. We manually preview a few raw Pi trace files to understand the structure without relying on the datasets library. We then download the merged JSONL file, load it into a DataFrame, and normalize key fields for later analysis.

Auditing Dataset Structure and Visualizing Trace Distributions

audit_rows = [    ("rows", len(df)),    ("columns", len(df.columns)),    ("unique_uid", df["uid"].nunique(dropna=True)),    ("duplicate_uid_rows", int(df["uid"].duplicated().sum())),    ("unique_sessions", df["session"].nunique(dropna=True)),    ("unique_models", df["model"].nunique(dropna=True)),    ("missing_context", int(df["context"].isna().sum())),    ("missing_cot", int(df["cot"].isna().sum())),    ("missing_output", int(df["output"].isna().sum())),    ("rows_with_possible_secret_pattern", int(df["possible_secret_anywhere"].sum())),    ("median_context_chars", round(float(df["context_chars"].median()), 2)),    ("median_cot_chars", round(float(df["cot_chars"].median()), 2)),    ("median_completion_chars", round(float(df["completion_chars"].median()), 2)),    ("max_completion_chars", int(df["completion_chars"].max())), ] print_basic_table("Flat JSONL Audit", audit_rows) rprint("n[bold]Output type distribution:[/bold]") display(df["output_type"].value_counts(dropna=False).to_frame("rows")) rprint("n[bold]Model distribution:[/bold]") display(df["model"].value_counts(dropna=False).to_frame("rows").head(20)) rprint("n[bold]Origin distribution:[/bold]") display(df["origin"].value_counts(dropna=False).to_frame("rows")) rprint("n[bold]Top source roots:[/bold]") display(df["source_root"].value_counts().head(20).to_frame("rows")) rprint("n[bold]Top tool names:[/bold]") display(    df.loc[df["output_type"].eq("tool_use"), "tool_name"]    .replace("", pd.NA)    .value_counts(dropna=False)    .head(25)    .to_frame("rows") ) rprint(    Panel.fit(        "[bold]Safe previews[/bold]n"        "These previews redact common secret-like patterns and never execute trace commands."    ) ) sample_df = df.sample(    n=min(N_SAFE_DATASET_PREVIEWS, len(df)),    random_state=SEED, ).reset_index(drop=True) for index, row in sample_df.iterrows():    payload = {        "uid": row.get("uid"),        "session": row.get("session"),        "model": row.get("model"),        "origin": row.get("origin"),        "output_type": row.get("output_type"),        "tool_name": row.get("tool_name"),        "context_preview": preview_text(row.get("context")),        "cot_preview": preview_text(row.get("cot")),        "text_or_tool_payload_preview": preview_text(row.get("text_payload")),    }    rprint(        Panel(            safe_json_dumps(payload, max_chars=4000),            title=f"Safe Row Preview {index}",        )    ) rprint(Panel.fit("[bold]Creating plots[/bold]")) plot_paths = {} output_counts = df["output_type"].fillna("missing").value_counts() plt.figure(figsize=(8, 5)) output_counts.plot(kind="bar") plt.title("Output Type Distribution") plt.xlabel("Output Type") plt.ylabel("Rows") plt.xticks(rotation=25, ha="right") plot_paths["output_type_distribution"] = str(    save_plot(OUT_DIR / "output_type_distribution.png") ) tool_counts = (    df.loc[df["output_type"].eq("tool_use"), "tool_name"]    .replace("", "unknown")    .value_counts()    .head(20) ) if len(tool_counts) > 0:    plt.figure(figsize=(9, 6))    tool_counts.sort_values().plot(kind="barh")    plt.title("Top Tool Names")    plt.xlabel("Rows")    plt.ylabel("Tool")    plot_paths["top_tools"] = str(save_plot(OUT_DIR / "top_tools.png")) else:    rprint("[yellow]No tool-use rows found for tool plot.[/yellow]") source_counts = df["source_root"].fillna("unknown").value_counts().head(20) plt.figure(figsize=(9, 6)) source_counts.sort_values().plot(kind="barh") plt.title("Top Source Roots") plt.xlabel("Rows") plt.ylabel("Source Root") plot_paths["top_source_roots"] = str(save_plot(OUT_DIR / "top_source_roots.png")) length_cols = [    "context_chars",    "cot_chars",    "completion_chars",    "text_payload_chars", ] for column in length_cols:    plt.figure(figsize=(8, 5))    clipped = df[column].clip(upper=df[column].quantile(0.99))    plt.hist(clipped, bins=50)    plt.title(f"{column} Distribution, Clipped at P99")    plt.xlabel("Characters")    plt.ylabel("Rows")    plot_paths[f"{column}_histogram"] = str(        save_plot(OUT_DIR / f"{column}_histogram.png")    ) 

We audit the dataset by checking row counts, unique sessions, duplicate IDs, missing fields, text lengths, and possible secret-like patterns. We display important distributions across output types, models, origins, source roots, and tool names to understand the data’s shape. We also create safe previews and visual plots so we can inspect the traces without executing any commands.

Projecting Traces and Exporting Safe No-CoT Chat Datasets

rprint(Panel.fit("[bold]Creating pure NumPy TF-IDF-style projection[/bold]")) if np is not None:    try:        projection_sample = df.sample(n=min(1000, len(df)), random_state=SEED).copy()        projection_texts = projection_sample["context"].fillna("").astype(str).tolist()        doc_tokens = [tokenize(text, max_chars=8000) for text in projection_texts]        doc_freq = Counter()        for tokens in doc_tokens:            doc_freq.update(set(tokens))        vocab_items = [            item for item in doc_freq.items()            if item[1] >= 2 and len(item[0]) > 1        ]        vocab_items = sorted(vocab_items, key=lambda item: item[1], reverse=True)[:1000]        vocab = {token: idx for idx, (token, _) in enumerate(vocab_items)}        if len(vocab) >= 3 and len(doc_tokens) >= 10:            X = np.zeros((len(doc_tokens), len(vocab)), dtype=np.float32)            df_counts = np.zeros(len(vocab), dtype=np.float32)            for row_idx, tokens in enumerate(doc_tokens):                counts = Counter(token for token in tokens if token in vocab)                for token, count in counts.items():                    col_idx = vocab[token]                    X[row_idx, col_idx] = float(count)                for token in counts.keys():                    df_counts[vocab[token]] += 1.0            idf = np.log((1.0 + len(doc_tokens)) / (1.0 + df_counts)) + 1.0            X = X * idf.reshape(1, -1)            row_norms = np.linalg.norm(X, axis=1, keepdims=True)            row_norms[row_norms == 0] = 1.0            X = X / row_norms            X = X - X.mean(axis=0, keepdims=True)            U, S, Vt = np.linalg.svd(X, full_matrices=False)            coords = U[:, :2] * S[:2]            projection_sample["svd_x"] = coords[:, 0]            projection_sample["svd_y"] = coords[:, 1]            projection_sample["plot_label"] = projection_sample["output_type"].fillna("missing").astype(str)            plt.figure(figsize=(8, 6))            for label, part in projection_sample.groupby("plot_label"):                plt.scatter(                    part["svd_x"],                    part["svd_y"],                    s=12,                    alpha=0.65,                    label=label,                )            plt.title("Context Projection with Pure NumPy TF-IDF + SVD")            plt.xlabel("SVD component 1")            plt.ylabel("SVD component 2")            plt.legend()            plot_paths["tfidf_svd_projection"] = str(                save_plot(OUT_DIR / "tfidf_svd_projection.png")            )            projection_sample[                [                    "uid",                    "output_type",                    "tool_name",                    "source_root",                    "svd_x",                    "svd_y",                ]            ].to_csv(                OUT_DIR / "tfidf_svd_projection_points.csv",                index=False,            )            pd.DataFrame(vocab_items, columns=["token", "document_frequency"]).to_csv(                OUT_DIR / "projection_vocabulary.csv",                index=False,            )        else:            rprint("[yellow]Skipping projection because vocabulary or row count is too small.[/yellow]")    except Exception as error:        rprint("[yellow]Projection failed, but the rest of the tutorial will continue.[/yellow]")        rprint(repr(error)) else:    rprint("[yellow]NumPy is not available, so projection is skipped.[/yellow]") rprint(Panel.fit("[bold]Creating safe no-CoT chat/SFT exports[/bold]")) SYSTEM_PROMPT = (    "You are a coding agent. Given the user's context and prior transcript, "    "produce the next assistant action. If a tool call is needed, return a structured tool call JSON. "    "Do not expose hidden reasoning." ) def make_no_cot_target(row):    output_type = str(row.get("output_type") or "")    if output_type == "tool_use":        tool_name = row.get("tool_name") or "unknown_tool"        tool_args = row.get("tool_args")        return json.dumps(            {                "type": "tool_call",                "tool_name": tool_name,                "arguments": tool_args,            },            ensure_ascii=False,            default=str,        )    payload = row.get("text_payload")    if payload is None or str(payload).strip() == "":        payload = row.get("completion", "")    return str(payload) def make_chat_record(row, include_cot=False):    user_context = redact_possible_secrets(row.get("context", ""))    target = redact_possible_secrets(make_no_cot_target(row))    messages = [        {            "role": "system",            "content": SYSTEM_PROMPT,        },        {            "role": "user",            "content": user_context,        },        {            "role": "assistant",            "content": target,        },    ]    record = {        "uid": row.get("uid"),        "session": row.get("session"),        "model": row.get("model"),        "origin": row.get("origin"),        "output_type": row.get("output_type"),        "tool_name": row.get("tool_name"),        "messages": messages,    }    if include_cot:        record["reasoning_trace"] = redact_possible_secrets(row.get("cot", ""))    return clean_for_json(record) export_df = df.copy() export_df = export_df.sample(frac=1.0, random_state=SEED).reset_index(drop=True) num_rows = len(export_df) train_end = int(0.90 * num_rows) validation_end = int(0.95 * num_rows) splits = {    "train": export_df.iloc[:train_end],    "validation": export_df.iloc[train_end:validation_end],    "test": export_df.iloc[validation_end:], } for split_name, split_df in splits.items():    records = [        make_chat_record(row, include_cot=False)        for _, row in split_df.iterrows()    ]    output_path = OUT_DIR / f"fable5_no_cot_chat_{split_name}.jsonl"    write_jsonl(output_path, records)    rprint(        f"[green]Saved[/green] {split_name}: "        f"{len(records)} records -> {output_path}"    ) if SAVE_COT_RESEARCH_EXPORT:    cot_records = [        make_chat_record(row, include_cot=True)        for _, row in export_df.iterrows()    ]    cot_path = OUT_DIR / "fable5_cot_research_export.jsonl"    write_jsonl(cot_path, cot_records)    rprint(f"[yellow]Saved CoT-preserving research export:[/yellow] {cot_path}") else:    rprint(        "[cyan]Skipped CoT-preserving export because "        "SAVE_COT_RESEARCH_EXPORT=False.[/cyan]"    ) analysis_cols = [    "uid",    "session",    "model",    "origin",    "source_file",    "source_root",    "output_type",    "tool_name",    "context_chars",    "cot_chars",    "completion_chars",    "text_payload_chars",    "possible_secret_anywhere", ] analysis_df = df[analysis_cols].copy() analysis_df.to_csv(    OUT_DIR / "fable5_analysis_index.csv",    index=False, ) analysis_df.to_pickle(    OUT_DIR / "fable5_analysis_index.pkl", ) rprint(f"[green]Saved analysis CSV:[/green] {OUT_DIR / 'fable5_analysis_index.csv'}") rprint(f"[green]Saved analysis pickle:[/green] {OUT_DIR / 'fable5_analysis_index.pkl'}") 

We create a pure NumPy TF-IDF-style projection to visualize trace contexts without using scikit-learn or scipy. We then prepare safe no-CoT chat-style exports that turn each trace into a structured system, user, and assistant message format. We save the train, validation, and test CSV and pickle artifacts so the dataset is easier to inspect, reuse, and fine-tune.

Implementing Pure-Python Naive Bayes Classification Utilities

def stratified_train_test_indices(labels, test_size=0.2, seed=SEED):    rng = random.Random(seed)    label_to_indices = defaultdict(list)    for idx, label in enumerate(labels):        label_to_indices[label].append(idx)    train_indices = []    test_indices = []    for label, indices in label_to_indices.items():        indices = indices[:]        rng.shuffle(indices)        if len(indices) <= 1:            train_indices.extend(indices)            continue        n_test = max(1, int(round(len(indices) * test_size)))        if n_test >= len(indices):            n_test = len(indices) - 1        test_indices.extend(indices[:n_test])        train_indices.extend(indices[n_test:])    rng.shuffle(train_indices)    rng.shuffle(test_indices)    return train_indices, test_indices class PureMultinomialNB:    def __init__(self, max_features=20000, min_df=2, alpha=1.0):        self.max_features = max_features        self.min_df = min_df        self.alpha = alpha        self.vocab = {}        self.labels = []        self.class_log_prior = {}        self.feature_log_prob = {}        self.class_token_totals = {}    def fit(self, texts, labels):        texts = list(texts)        labels = list(labels)        doc_freq = Counter()        for text in texts:            doc_freq.update(set(tokenize(text)))        vocab_items = [            item for item in doc_freq.items()            if item[1] >= self.min_df        ]        vocab_items = sorted(vocab_items, key=lambda item: item[1], reverse=True)        vocab_items = vocab_items[:self.max_features]        self.vocab = {token: idx for idx, (token, _) in enumerate(vocab_items)}        self.labels = sorted(set(labels))        class_doc_counts = Counter(labels)        total_docs = len(labels)        num_classes = len(self.labels)        token_counts_by_class = {label: Counter() for label in self.labels}        token_totals_by_class = {label: 0 for label in self.labels}        for text, label in zip(texts, labels):            counts = Counter(token for token in tokenize(text) if token in self.vocab)            token_counts_by_class[label].update(counts)            token_totals_by_class[label] += sum(counts.values())        vocab_size = max(len(self.vocab), 1)        for label in self.labels:            self.class_log_prior[label] = math.log(                (class_doc_counts[label] + self.alpha) /                (total_docs + self.alpha * num_classes)            )            denom = token_totals_by_class[label] + self.alpha * vocab_size            self.class_token_totals[label] = token_totals_by_class[label]            self.feature_log_prob[label] = {}            for token in self.vocab:                count = token_counts_by_class[label][token]                self.feature_log_prob[label][token] = math.log((count + self.alpha) / denom)        return self    def predict_one(self, text):        counts = Counter(token for token in tokenize(text) if token in self.vocab)        best_label = None        best_score = -float("inf")        for label in self.labels:            score = self.class_log_prior[label]            feature_probs = self.feature_log_prob[label]            for token, count in counts.items():                score += count * feature_probs.get(token, 0.0)            if score > best_score:                best_score = score                best_label = label        return best_label    def predict(self, texts):        return [self.predict_one(text) for text in texts]    def top_tokens_for_class(self, label, n=20):        if label not in self.feature_log_prob:            return []        base_scores = self.feature_log_prob[label]        other_labels = [item for item in self.labels if item != label]        rows = []        for token in self.vocab:            this_score = base_scores[token]            if other_labels:                other_score = sum(                    self.feature_log_prob[other][token]                    for other in other_labels                ) / len(other_labels)                margin = this_score - other_score            else:                margin = this_score            rows.append((token, margin))        rows = sorted(rows, key=lambda item: item[1], reverse=True)        return rows[:n] def evaluate_predictions(y_true, y_pred):    labels = sorted(set(y_true) | set(y_pred))    rows = []    total_correct = 0    total = len(y_true)    for label in labels:        tp = sum((true == label and pred == label) for true, pred in zip(y_true, y_pred))        fp = sum((true != label and pred == label) for true, pred in zip(y_true, y_pred))        fn = sum((true == label and pred != label) for true, pred in zip(y_true, y_pred))        support = sum(true == label for true in y_true)        precision = tp / (tp + fp) if (tp + fp) else 0.0        recall = tp / (tp + fn) if (tp + fn) else 0.0        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) else 0.0        rows.append(            {                "label": label,                "precision": precision,                "recall": recall,                "f1": f1,                "support": support,            }        )        total_correct += tp    accuracy = total_correct / total if total else 0.0    macro_f1 = sum(row["f1"] for row in rows) / len(rows) if rows else 0.0    weighted_f1 = (        sum(row["f1"] * row["support"] for row in rows) / total        if total        else 0.0    )    report_df = pd.DataFrame(rows)    metrics = {        "accuracy": accuracy,        "macro_f1": macro_f1,        "weighted_f1": weighted_f1,        "labels": labels,        "rows": rows,    }    return metrics, report_df def confusion_matrix_df(y_true, y_pred):    labels = sorted(set(y_true) | set(y_pred))    matrix = pd.DataFrame(        0,        index=labels,        columns=labels,        dtype=int,    )    for true, pred in zip(y_true, y_pred):        matrix.loc[true, pred] += 1    matrix.index.name = "actual"    matrix.columns.name = "predicted"    return matrix 

We define pure-Python classification utilities for stratified train-test splitting, Naive Bayes training, prediction, and evaluation. We implement the classifier from scratch, so the tutorial stays stable even in Colab environments with broken scientific Python binaries. We also add reporting tools for precision, recall, F1 score, confusion matrices, and top class-specific tokens.

Training Naive Bayes Baselines and Keyword Search Over Traces

rprint(Panel.fit("[bold]Baseline 1: Predict output_type from context using pure Python Naive Bayes[/bold]")) model_artifacts = {} classifier_df = df.dropna(subset=["output_type"]).copy() classifier_df = classifier_df[    classifier_df["output_type"].astype(str).str.len() > 0 ].copy() if classifier_df["output_type"].nunique() >= 2 and len(classifier_df) >= 30:    X_text = (        classifier_df["context"]        .fillna("")        .astype(str)        .map(lambda text: text[:12000])        .tolist()    )    y = classifier_df["output_type"].astype(str).tolist()    train_indices, test_indices = stratified_train_test_indices(y, test_size=0.2, seed=SEED)    X_train = [X_text[i] for i in train_indices]    y_train = [y[i] for i in train_indices]    X_test = [X_text[i] for i in test_indices]    y_test = [y[i] for i in test_indices]    output_type_classifier = PureMultinomialNB(        max_features=20000,        min_df=2,        alpha=1.0,    )    output_type_classifier.fit(X_train, y_train)    predictions = output_type_classifier.predict(X_test)    output_type_metrics, output_report_df = evaluate_predictions(y_test, predictions)    output_matrix_df = confusion_matrix_df(y_test, predictions)    output_type_metrics["train_rows"] = len(X_train)    output_type_metrics["test_rows"] = len(X_test)    output_type_metrics["vocab_size"] = len(output_type_classifier.vocab)    rprint("[bold]Output type classifier report:[/bold]")    display(output_report_df)    display(output_matrix_df)    output_report_df.to_csv(OUT_DIR / "output_type_classifier_report.csv", index=False)    output_matrix_df.to_csv(OUT_DIR / "output_type_confusion_matrix.csv")    top_token_records = []    for label in output_type_classifier.labels:        for token, margin in output_type_classifier.top_tokens_for_class(label, n=25):            top_token_records.append(                {                    "label": label,                    "token": token,                    "score_margin": margin,                }            )    pd.DataFrame(top_token_records).to_csv(        OUT_DIR / "output_type_top_tokens.csv",        index=False,    )    with open(        OUT_DIR / "output_type_classifier_metrics.json",        "w",        encoding="utf-8",    ) as file:        json.dump(output_type_metrics, file, ensure_ascii=False, indent=2)    model_artifacts["output_type_classifier_metrics"] = str(        OUT_DIR / "output_type_classifier_metrics.json"    )    model_artifacts["output_type_classifier_report"] = str(        OUT_DIR / "output_type_classifier_report.csv"    )    model_artifacts["output_type_confusion_matrix"] = str(        OUT_DIR / "output_type_confusion_matrix.csv"    )    model_artifacts["output_type_top_tokens"] = str(        OUT_DIR / "output_type_top_tokens.csv"    ) else:    rprint(        "[yellow]Skipping output_type classifier because there are too few "        "classes or rows.[/yellow]"    )    output_type_metrics = {} rprint(Panel.fit("[bold]Baseline 2: Predict tool_name from context using pure Python Naive Bayes[/bold]")) tool_classifier_df = df[    df["output_type"].eq("tool_use")    & df["tool_name"].fillna("").astype(str).str.len().gt(0) ].copy() if len(tool_classifier_df) >= 50 and tool_classifier_df["tool_name"].nunique() >= 2:    top_tools = tool_classifier_df["tool_name"].value_counts().head(12).index.tolist()    tool_classifier_df["tool_label"] = tool_classifier_df["tool_name"].where(        tool_classifier_df["tool_name"].isin(top_tools),        "__OTHER__",    )    y_tool = tool_classifier_df["tool_label"].astype(str).tolist()    X_tool_text = (        tool_classifier_df["context"]        .fillna("")        .astype(str)        .map(lambda text: text[:12000])        .tolist()    )    if len(set(y_tool)) >= 2:        train_indices, test_indices = stratified_train_test_indices(y_tool, test_size=0.2, seed=SEED)        X_train = [X_tool_text[i] for i in train_indices]        y_train = [y_tool[i] for i in train_indices]        X_test = [X_tool_text[i] for i in test_indices]        y_test = [y_tool[i] for i in test_indices]        tool_classifier = PureMultinomialNB(            max_features=20000,            min_df=2,            alpha=1.0,        )        tool_classifier.fit(X_train, y_train)        tool_predictions = tool_classifier.predict(X_test)        tool_metrics, tool_report_df = evaluate_predictions(y_test, tool_predictions)        tool_matrix_df = confusion_matrix_df(y_test, tool_predictions)        tool_metrics["train_rows"] = len(X_train)        tool_metrics["test_rows"] = len(X_test)        tool_metrics["vocab_size"] = len(tool_classifier.vocab)        rprint("[bold]Tool classifier report:[/bold]")        display(tool_report_df)        display(tool_matrix_df)        tool_report_df.to_csv(OUT_DIR / "tool_name_classifier_report.csv", index=False)        tool_matrix_df.to_csv(OUT_DIR / "tool_name_confusion_matrix.csv")        top_tool_token_records = []        for label in tool_classifier.labels:            for token, margin in tool_classifier.top_tokens_for_class(label, n=25):                top_tool_token_records.append(                    {                        "label": label,                        "token": token,                        "score_margin": margin,                    }                )        pd.DataFrame(top_tool_token_records).to_csv(            OUT_DIR / "tool_name_top_tokens.csv",            index=False,        )        with open(            OUT_DIR / "tool_name_classifier_metrics.json",            "w",            encoding="utf-8",        ) as file:            json.dump(tool_metrics, file, ensure_ascii=False, indent=2)        model_artifacts["tool_name_classifier_metrics"] = str(            OUT_DIR / "tool_name_classifier_metrics.json"        )        model_artifacts["tool_name_classifier_report"] = str(            OUT_DIR / "tool_name_classifier_report.csv"        )        model_artifacts["tool_name_confusion_matrix"] = str(            OUT_DIR / "tool_name_confusion_matrix.csv"        )        model_artifacts["tool_name_top_tokens"] = str(            OUT_DIR / "tool_name_top_tokens.csv"        )    else:        rprint("[yellow]Skipping tool classifier because labels collapsed to one class.[/yellow]")        tool_metrics = {} else:    rprint(        "[yellow]Skipping tool classifier because there are too few tool-use "        "rows or tool classes.[/yellow]"    )    tool_metrics = {} rprint(Panel.fit("[bold]Building simple keyword search helper[/bold]")) def search_rows(keyword, limit=5, search_cols=("context", "cot", "completion", "text_payload")):    keyword = str(keyword).lower()    mask = pd.Series(False, index=df.index)    for column in search_cols:        mask = mask | (            df[column]            .fillna("")            .astype(str)            .str.lower()            .str.contains(re.escape(keyword), regex=True)        )    hits = df[mask].head(limit)    results = []    for _, row in hits.iterrows():        results.append(            {                "uid": row.get("uid"),                "session": row.get("session"),                "output_type": row.get("output_type"),                "tool_name": row.get("tool_name"),                "context_preview": preview_text(row.get("context"), 400),                "payload_preview": preview_text(row.get("text_payload"), 400),            }        )    return results example_queries = [    "Bash",    "Write",    "browser",    "test",    "README", ] search_demo = {    query: search_rows(query, limit=2)    for query in example_queries } with open(    OUT_DIR / "keyword_search_demo.json",    "w",    encoding="utf-8", ) as file:    json.dump(search_demo, file, ensure_ascii=False, indent=2) rprint("[bold]Example keyword search results:[/bold]") rprint(safe_json_dumps(search_demo, max_chars=5000)) summary = {    "dataset_id": DATASET_ID,    "flat_jsonl_filename": FLAT_JSONL_FILENAME,    "output_directory": str(OUT_DIR),    "repo_file_summary": file_summary,    "rows": int(len(df)),    "columns": list(df.columns),    "output_type_distribution": (        df["output_type"]        .fillna("missing")        .value_counts()        .to_dict()    ),    "top_tools": (        df.loc[df["output_type"].eq("tool_use"), "tool_name"]        .replace("", "unknown")        .value_counts()        .head(20)        .to_dict()    ),    "top_source_roots": (        df["source_root"]        .fillna("unknown")        .value_counts()        .head(20)        .to_dict()    ),    "length_summary": {        column: {            "mean": float(df[column].mean()),            "median": float(df[column].median()),            "p90": float(df[column].quantile(0.90)),            "p95": float(df[column].quantile(0.95)),            "max": int(df[column].max()),        }        for column in [            "context_chars",            "cot_chars",            "completion_chars",            "text_payload_chars",        ]    },    "possible_secret_rows": int(df["possible_secret_anywhere"].sum()),    "plots": plot_paths,    "model_artifacts": model_artifacts,    "safe_exports": {        "train": str(OUT_DIR / "fable5_no_cot_chat_train.jsonl"),        "validation": str(OUT_DIR / "fable5_no_cot_chat_validation.jsonl"),        "test": str(OUT_DIR / "fable5_no_cot_chat_test.jsonl"),    },    "analysis_files": {        "csv": str(OUT_DIR / "fable5_analysis_index.csv"),        "pickle": str(OUT_DIR / "fable5_analysis_index.pkl"),        "keyword_search_demo": str(OUT_DIR / "keyword_search_demo.json"),    }, } with open(    OUT_DIR / "analysis_summary.json",    "w",    encoding="utf-8", ) as file:    json.dump(clean_for_json(summary), file, ensure_ascii=False, indent=2, default=str) FENCE = chr(96) * 3 report_md = (    "# Fable 5 Traces Advanced Tutorial Reportnn"    "## Datasetnn"    f"- Dataset: `{DATASET_ID}`n"    f"- Flat JSONL: `{FLAT_JSONL_FILENAME}`n"    f"- Rows loaded: `{len(df):,}`n"    f"- Unique source sessions: `{df['session'].nunique(dropna=True):,}`n"    f"- Unique models: `{df['model'].nunique(dropna=True):,}`nn"    "## Important safety notenn"    "This tutorial treats the dataset as agent telemetry. It previews and analyzes commands, "    "tool calls, file edits, and transcript text, but it never executes commands found inside "    "the traces.nn"    f"Potential secret-like patterns detected: `{int(df['possible_secret_anywhere'].sum()):,}` rows.n"    "Exports redact common API-key/token-like patterns.nn"    "## Output type distributionnn"    f"{FENCE}jsonn"    f"{json.dumps(clean_for_json(summary['output_type_distribution']), indent=2, ensure_ascii=False)}n"    f"{FENCE}nn"    "## Top toolsnn"    f"{FENCE}jsonn"    f"{json.dumps(clean_for_json(summary['top_tools']), indent=2, ensure_ascii=False)}n"    f"{FENCE}nn"    "## Saved filesnn"    "- `analysis_summary.json`n"    "- `fable5_analysis_index.csv`n"    "- `fable5_analysis_index.pkl`n"    "- `fable5_no_cot_chat_train.jsonl`n"    "- `fable5_no_cot_chat_validation.jsonl`n"    "- `fable5_no_cot_chat_test.jsonl`n"    "- plot PNG filesn"    "- baseline classifier metrics, when enough rows/classes are availablenn"    "## Recommended next stepsnn"    "1. Inspect `fable5_no_cot_chat_train.jsonl` before any fine-tuning.n"    "2. Keep the dataset license in mind before model training or redistribution.n"    "3. Avoid training directly on raw terminal outputs without additional privacy and safety filtering.n"    "4. Start with the no-CoT chat export unless your research explicitly requires reasoning-trace supervision.n" ) with open(    OUT_DIR / "REPORT.md",    "w",    encoding="utf-8", ) as file:    file.write(report_md) rprint(    Panel.fit(        f"[bold green]Tutorial complete.[/bold green]nn"        f"Artifacts saved in:n{OUT_DIR}nn"        f"Key files:n"        f"- {OUT_DIR / 'REPORT.md'}n"        f"- {OUT_DIR / 'analysis_summary.json'}n"        f"- {OUT_DIR / 'fable5_no_cot_chat_train.jsonl'}n"        f"- {OUT_DIR / 'fable5_analysis_index.csv'}",        title="Done",    ) ) display(    pd.DataFrame(        {            "artifact": [                "Report",                "Summary JSON",                "No-CoT train export",                "No-CoT validation export",                "No-CoT test export",                "Analysis CSV",                "Analysis pickle",                "Keyword search demo",            ],            "path": [                str(OUT_DIR / "REPORT.md"),                str(OUT_DIR / "analysis_summary.json"),                str(OUT_DIR / "fable5_no_cot_chat_train.jsonl"),                str(OUT_DIR / "fable5_no_cot_chat_validation.jsonl"),                str(OUT_DIR / "fable5_no_cot_chat_test.jsonl"),                str(OUT_DIR / "fable5_analysis_index.csv"),                str(OUT_DIR / "fable5_analysis_index.pkl"),                str(OUT_DIR / "keyword_search_demo.json"),            ],        }    ) ) 

We train a baseline model to predict whether the assistant’s output is text or a tool call based on the trace context. We also train a second baseline that predicts the likely tool name for tool-use rows and save the evaluation artifacts. We finish by adding keyword search, writing the final summary JSON and Markdown report, and displaying the saved tutorial outputs.

Conclusion

In conclusion, we have a practical and reliable workflow for exploring Fable 5 Traces without depending on packages that may break in a Colab runtime. We moved from raw Hugging Face files to structured analysis tables, safe previews, plots, searchable examples, cleaned chat-style exports, and baseline modeling artifacts. We treated the traces as agent telemetry, so we redacted possible secrets, avoided executing any commands from the dataset, and kept the chain of thought out of the default training export.


Check out the Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 150k+ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us

Sana Hassan, a consulting intern at Marktechpost and dual-degree student at IIT Madras, is passionate about applying technology and AI to address real-world challenges. With a keen interest in solving practical problems, he brings a fresh perspective to the intersection of AI and real-life solutions.

Leave a Reply

Your email address will not be published. Required fields are marked *