In this tutorial, we build the entire Agentic UI stack from the ground up using plain Python, without relying on external frameworks to abstract away the core ideas. We implement the AG-UI event stream to make agent behavior observable in real time, and we bring in A2UI as a declarative layer that allows interfaces to be defined as structured JSON rather than executable code. As we progress, we enable an LLM to generate full user interfaces from natural language, synchronize agent and UI state through JSON Patch updates, and enforce human-in-the-loop safety for critical actions. Also, we gain a clear, end-to-end understanding of how agent reasoning transforms into interactive, protocol-compliant user interfaces.
import subprocess, sys for pkg in ["openai", "rich", "pydantic"]: subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg]) import os, getpass if os.environ.get("OPENAI_API_KEY"): API_KEY = os.environ["OPENAI_API_KEY"] print("β
Using OPENAI_API_KEY from environment.") else: try: from google.colab import userdata API_KEY = userdata.get("OPENAI_API_KEY") print("β
Using OPENAI_API_KEY from Colab Secrets.") except Exception: API_KEY = getpass.getpass("π Enter your OpenAI API key (hidden): ") print("β
API key received.") BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1") MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini") import json, re, time, uuid, copy, textwrap from enum import Enum from dataclasses import dataclass, field, asdict from typing import Any, Optional, Generator from pydantic import BaseModel, Field from openai import OpenAI from rich.console import Console from rich.panel import Panel from rich.table import Table from rich.tree import Tree from rich.text import Text from rich.markdown import Markdown from rich import box console = Console(width=105) client = OpenAI(api_key=API_KEY, base_url=BASE_URL) def llm(messages, **kw): try: return client.chat.completions.create(model=MODEL, messages=messages, temperature=0.2, **kw) except Exception as e: console.print(f"[red]LLM error: {e}[/]") return None def hdr(n, title, sub=""): console.print() console.rule(f"[bold cyan]SECTION {n}", style="cyan") body = f"[bold white]{title}[/]n[dim]{sub}[/]" if sub else f"[bold white]{title}[/]" console.print(Panel(body, border_style="cyan", padding=(1, 2))) hdr(1, "AG-UI Protocol β Event System", "The real AG-UI protocol uses ~16 event types streamed via SSE.n" "We implement all core event types and a streaming emitter in pure Python.") class AGUIEventType(str, Enum): RUN_STARTED = "RUN_STARTED" RUN_FINISHED = "RUN_FINISHED" RUN_ERROR = "RUN_ERROR" TEXT_MESSAGE_START = "TEXT_MESSAGE_START" TEXT_MESSAGE_CONTENT = "TEXT_MESSAGE_CONTENT" TEXT_MESSAGE_END = "TEXT_MESSAGE_END" TOOL_CALL_START = "TOOL_CALL_START" TOOL_CALL_ARGS = "TOOL_CALL_ARGS" TOOL_CALL_RESULT = "TOOL_CALL_RESULT" TOOL_CALL_END = "TOOL_CALL_END" STATE_SNAPSHOT = "STATE_SNAPSHOT" STATE_DELTA = "STATE_DELTA" INTERRUPT = "INTERRUPT" CUSTOM = "CUSTOM" STEP_STARTED = "STEP_STARTED" STEP_FINISHED = "STEP_FINISHED" @dataclass class AGUIEvent: type: AGUIEventType data: dict = field(default_factory=dict) event_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) timestamp: float = field(default_factory=time.time) def to_sse(self) -> str: payload = {"type": self.type.value, "id": self.event_id, **self.data} return f"event: ag-uindata: {json.dumps(payload)}nn" def to_json(self) -> dict: return {"type": self.type.value, "id": self.event_id, "ts": self.timestamp, **self.data} class AGUIEventStream: def __init__(self): self.events: list[AGUIEvent] = [] self.listeners: list = [] def emit(self, event: AGUIEvent): self.events.append(event) for listener in self.listeners: listener(event) def on(self, callback): self.listeners.append(callback) def replay(self) -> list[dict]: return [e.to_json() for e in self.events] def demo_agui_lifecycle(): stream = AGUIEventStream() event_colors = { "RUN_": "bold green", "TEXT_": "cyan", "TOOL_": "magenta", "STATE_": "yellow", "INTERRUPT": "bold red", "STEP_": "dim", } def frontend_listener(event: AGUIEvent): color = "white" for prefix, c in event_colors.items(): if event.type.value.startswith(prefix): color = c break detail = json.dumps(event.data)[:80] if event.data else "" console.print(f" [{color}]β‘ {event.type.value:.<28}[/] {detail}") stream.on(frontend_listener) run_id = str(uuid.uuid4())[:8] console.print("[bold]Simulating full AG-UI agent run...[/]n") stream.emit(AGUIEvent(AGUIEventType.RUN_STARTED, {"run_id": run_id})) stream.emit(AGUIEvent(AGUIEventType.STEP_STARTED, {"step": "analyzing_query", "label": "Understanding request"})) stream.emit(AGUIEvent(AGUIEventType.STEP_FINISHED, {"step": "analyzing_query"})) msg_id = str(uuid.uuid4())[:8] stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_START, {"message_id": msg_id, "role": "assistant"})) for chunk in ["I'll ", "look up ", "the data ", "and build ", "a dashboard ", "for you."]: stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_CONTENT, {"message_id": msg_id, "delta": chunk})) stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_END, {"message_id": msg_id})) tool_id = str(uuid.uuid4())[:8] stream.emit(AGUIEvent(AGUIEventType.TOOL_CALL_START, {"tool_call_id": tool_id, "name": "query_database"})) stream.emit(AGUIEvent(AGUIEventType.TOOL_CALL_ARGS, {"tool_call_id": tool_id, "args_delta": '{"query": "SELECT revenue FROM sales"}'})) stream.emit(AGUIEvent(AGUIEventType.TOOL_CALL_RESULT, {"tool_call_id": tool_id, "result": [{"month": "Jan", "revenue": 42000}, {"month": "Feb", "revenue": 58000}]})) stream.emit(AGUIEvent(AGUIEventType.TOOL_CALL_END, {"tool_call_id": tool_id})) stream.emit(AGUIEvent(AGUIEventType.STATE_SNAPSHOT, { "state": {"active_agent": "DataAnalyst", "stage": "rendering", "progress": 0.75} })) stream.emit(AGUIEvent(AGUIEventType.STATE_DELTA, { "delta": [{"op": "replace", "path": "https://www.marktechpost.com/progress", "value": 1.0}] })) stream.emit(AGUIEvent(AGUIEventType.INTERRUPT, { "reason": "high_risk_action", "description": "Agent wants to send an email to all 5,000 customers.", "options": ["approve", "reject", "modify"], })) stream.emit(AGUIEvent(AGUIEventType.RUN_FINISHED, {"run_id": run_id, "status": "completed"})) console.print(Panel( stream.events[3].to_sse(), title="[bold]Example SSE wire format (how it looks on the network)", border_style="dim", )) table = Table(title="AG-UI Event Stream Summary", box=box.ROUNDED) table.add_column("Category", style="cyan", width=15) table.add_column("Events", justify="center", style="green") counts = {} for e in stream.events: cat = e.type.value.rsplit("_", 1)[0] if "_" in e.type.value else e.type.value counts[cat] = counts.get(cat, 0) + 1 for cat, n in counts.items(): table.add_row(cat, str(n)) table.add_row("[bold]TOTAL", f"[bold]{len(stream.events)}") console.print(table) demo_agui_lifecycle()
We start by building the backbone of every agentic frontend: the AG-UI event stream. We implement all 16 event types from the real AG-UI specification, lifecycle events, token-by-token text streaming, streamed tool calls, state snapshots, deltas, and interrupt signals, and serialize them into the SSE wire format that production systems use over HTTP. We then wire up a frontend listener that reacts to each event as it arrives, simulating the exact experience a React or Flutter app would have consuming this stream.
hdr(2, "A2UI β Declarative Component Trees", "Google's A2UI spec: agents emit flat JSON component lists with ID refs.n" "The client's widget registry maps types β native widgets.n" "Safe like data, expressive like code. No executable code sent.") class A2UIMessageType(str, Enum): CREATE_SURFACE = "createSurface" UPDATE_COMPONENTS = "updateComponents" UPDATE_DATA_MODEL = "updateDataModel" DELETE_SURFACE = "deleteSurface" @dataclass class A2UIComponent: id: str type: str properties: dict = field(default_factory=dict) children: list[str] = field(default_factory=list) def to_dict(self) -> dict: d = {"id": self.id, "type": self.type, **self.properties} if self.children: d["children"] = self.children return d @dataclass class A2UIDataModel: data: dict = field(default_factory=dict) def get_binding(self, path: str) -> Any: parts = [p for p in path.split("https://www.marktechpost.com/") if p] val = self.data for p in parts: if isinstance(val, dict): val = val.get(p) elif isinstance(val, list) and p.isdigit(): val = val[int(p)] else: return None return val @dataclass class A2UISurface: surface_id: str components: list[A2UIComponent] = field(default_factory=list) data_model: A2UIDataModel = field(default_factory=A2UIDataModel) def to_messages(self) -> list[dict]: msgs = [] msgs.append({ "type": A2UIMessageType.CREATE_SURFACE.value, "surfaceId": self.surface_id, }) msgs.append({ "type": A2UIMessageType.UPDATE_COMPONENTS.value, "surfaceId": self.surface_id, "components": [c.to_dict() for c in self.components], }) if self.data_model.data: msgs.append({ "type": A2UIMessageType.UPDATE_DATA_MODEL.value, "surfaceId": self.surface_id, "dataModel": self.data_model.data, }) return msgs class WidgetRegistry: def __init__(self): self._renderers = {} def register(self, component_type: str, render_fn): self._renderers[component_type] = render_fn def render(self, component: A2UIComponent, surface: A2UISurface, indent: int = 0): fn = self._renderers.get(component.type) if fn: fn(component, surface, indent) else: pad = " " * indent console.print(f"{pad}[dim]β¨{component.type} id={component.id}β© (no renderer)[/]") def render_tree(self, surface: A2UISurface): comp_map = {c.id: c for c in surface.components} all_children = set() for c in surface.components: all_children.update(c.children) roots = [c for c in surface.components if c.id not in all_children] def _render(comp_id: str, indent: int): comp = comp_map.get(comp_id) if not comp: return self.render(comp, surface, indent) for child_id in comp.children: _render(child_id, indent + 1) for root in roots: _render(root.id, 0) registry = WidgetRegistry() def _resolve(comp, surface, key, default=None): val = comp.properties.get(key, default) binding = comp.properties.get("dataBinding") if binding and isinstance(binding, str) and binding.startswith("https://www.marktechpost.com/"): resolved = surface.data_model.get_binding(binding) if resolved is not None: return resolved if isinstance(val, str) and val.startswith("/") and "https://www.marktechpost.com/" in val[1:]: resolved = surface.data_model.get_binding(val) if resolved is not None: return resolved return val def _to_float(val, default=0.0): if isinstance(val, (int, float)): return float(val) if isinstance(val, str): cleaned = val.strip().rstrip("%") try: f = float(cleaned) if "%" in val or f > 1: return f / 100.0 return f except ValueError: return default return default def render_card(comp, surface, indent): pad = " " * indent title = str(_resolve(comp, surface, "title", "Card")) console.print(f"{pad}ββ{'β' * 50}ββ") console.print(f"{pad}β [bold]{title:^50}[/] β") console.print(f"{pad}ββ{'β' * 50}ββ€") if not comp.children: subtitle = str(_resolve(comp, surface, "subtitle", "")) if subtitle: console.print(f"{pad}β {subtitle:<49}β") console.print(f"{pad}ββ{'β' * 50}ββ") def render_text(comp, surface, indent): pad = " " * indent text = _resolve(comp, surface, "text", "") style = comp.properties.get("style", "body") styles = {"headline": "bold white", "body": "white", "caption": "dim", "label": "bold cyan"} console.print(f"{pad}[{styles.get(style, 'white')}]{text}[/]") def render_button(comp, surface, indent): pad = " " * indent label = str(_resolve(comp, surface, "label", "Button")) variant = comp.properties.get("variant", "primary") colors = {"primary": "bold white on blue", "secondary": "white on grey30", "danger": "bold white on red"} console.print(f"{pad} [{colors.get(variant, 'white')}] {label} [/]") def render_text_field(comp, surface, indent): pad = " " * indent label = comp.properties.get("label", "Input") placeholder = comp.properties.get("placeholder", "") console.print(f"{pad} {label}: [dim]ββββββββββββββββββββββββββββ[/]") console.print(f"{pad} [dim]β {placeholder:<25}β[/]") console.print(f"{pad} [dim]ββββββββββββββββββββββββββββ[/]") def render_row(comp, surface, indent): pass def render_column(comp, surface, indent): pass def render_image(comp, surface, indent): pad = " " * indent alt = comp.properties.get("alt", "image") console.print(f"{pad} [dim]πΌ [{alt}][/]") def render_divider(comp, surface, indent): pad = " " * indent console.print(f"{pad} {'β' * 50}") def render_chip(comp, surface, indent): pad = " " * indent label = str(_resolve(comp, surface, "label", "")) console.print(f"{pad} [on grey23] {label} [/]") def render_progress(comp, surface, indent): pad = " " * indent raw_value = _resolve(comp, surface, "value", 0) value = max(0.0, min(1.0, _to_float(raw_value, 0.0))) label = str(_resolve(comp, surface, "label", "")) bar_len = int(value * 40) bar = f"[green]{'β' * bar_len}[/][dim]{'β' * (40 - bar_len)}[/]" console.print(f"{pad} {label}: {bar} {value*100:.0f}%") for name, fn in [ ("card", render_card), ("text", render_text), ("button", render_button), ("text-field", render_text_field), ("row", render_row), ("column", render_column), ("image", render_image), ("divider", render_divider), ("chip", render_chip), ("progress-bar", render_progress), ]: registry.register(name, fn) console.print("n[bold]Demo: A2UI booking form β agent generates a restaurant reservation UI[/]n") booking_surface = A2UISurface( surface_id="booking-form-1", components=[ A2UIComponent("root", "card", {"title": "π½οΈ Reserve a Table"}, children=["c1", "c2", "c3", "c4", "c5", "c6"]), A2UIComponent("c1", "text", {"text": "", "dataBinding": "https://www.marktechpost.com/restaurant/name", "style": "headline"}), A2UIComponent("c2", "text", {"text": "", "dataBinding": "https://www.marktechpost.com/restaurant/cuisine", "style": "caption"}), A2UIComponent("c3", "divider", {}), A2UIComponent("c4", "text-field", {"label": "Date", "placeholder": "YYYY-MM-DD"}), A2UIComponent("c5", "text-field", {"label": "Guests", "placeholder": "1-12"}), A2UIComponent("c6", "button", {"label": "Reserve Now", "variant": "primary", "action": "submit_booking"}), ], data_model=A2UIDataModel({"restaurant": {"name": "Chez Laurent", "cuisine": "French Contemporary β’ $$$$"}}) ) console.print(Panel( "n".join(json.dumps(m, indent=2)[:200] for m in booking_surface.to_messages()), title="[bold]A2UI JSONL stream (what goes over the wire)", border_style="yellow", )) console.print("[bold]Rendered by client widget registry:[/]n") registry.render_tree(booking_surface) console.print() t = Table(title="A2UI Flat Component List (Adjacency Model)", box=box.ROUNDED) t.add_column("ID", style="cyan", width=8) t.add_column("Type", style="green", width=14) t.add_column("Children", style="yellow", width=20) t.add_column("Bindings", style="magenta", width=25) for c in booking_surface.components: binding = c.properties.get("dataBinding", "") t.add_row(c.id, c.type, ", ".join(c.children) if c.children else "β", binding or "β") console.print(t)
We implement Googleβs A2UI specification: a flat adjacency-list model where components reference children by ID rather than nesting, making the format trivially streamable and easy for LLMs to generate incrementally. We build a client-side Widget Registry that maps abstract type strings like βcardβ, βtext-fieldβ, and βprogress-barβ to concrete terminal renderers, mirroring how a production app maps them to React components or Flutter widgets. We demonstrate the full cycle with a restaurant booking form, complete with data model bindings that decouple dynamic values from UI structure, exactly as the A2UI spec prescribes.
hdr(3, "Generative UI β LLM Produces Live Interfaces", "The agent generates A2UI component trees dynamically based on the query.n" "This is the core of 'Generative UI' β context-adaptive interfacesn" "that go far beyond text-only chat responses.") A2UI_GENERATION_PROMPT = """ You are an A2UI Generative UI agent. Given a user query, you generate a rich interactive interface β NOT text. You output an A2UI component tree as JSON. RULES: 1. Output a flat list of components using the adjacency model (children = list of IDs). 2. Available component types: card, text, button, text-field, row, column, divider, chip, image, progress-bar, select, date-picker, data-table 3. Include a separate "dataModel" object for dynamic values. Use "https://www.marktechpost.com/path/to/value" bindings. 4. The ROOT component should be a "card" with all others as descendants. 5. Think about what UI BEST serves the user β forms for input, tables for data, progress bars for status, chips for tags, buttons for actions. OUTPUT FORMAT (strict JSON, nothing else): { "surfaceId": "unique-id", "components": [ {"id": "root", "type": "card", "title": "...", "children": ["c1", "c2"]}, {"id": "c1", "type": "text", "text": "...", "style": "headline"}, ... ], "dataModel": { ... } } """ def generate_ui(user_query: str) -> Optional[A2UISurface]: console.print(f" [dim]Generating UI for:[/] [bold]{user_query}[/]") response = llm([ {"role": "system", "content": A2UI_GENERATION_PROMPT}, {"role": "user", "content": user_query}, ], max_tokens=1200) if not response: return None raw = response.choices[0].message.content try: cleaned = re.sub(r'```jsons*|s*```', '', raw).strip() spec = json.loads(cleaned) except json.JSONDecodeError: console.print(f"[red]Failed to parse generated UI: {raw[:200]}[/]") return None components = [] for c in spec.get("components", []): components.append(A2UIComponent( id=c.get("id", str(uuid.uuid4())[:6]), type=c.get("type", "text"), properties={k: v for k, v in c.items() if k not in ("id", "type", "children")}, children=c.get("children", []), )) surface = A2UISurface( surface_id=spec.get("surfaceId", f"gen-{uuid.uuid4().hex[:6]}"), components=components, data_model=A2UIDataModel(spec.get("dataModel", {})), ) return surface def demo_generative_ui(query: str): surface = generate_ui(query) if surface: console.print(f"n[bold green]Generated {len(surface.components)} components:[/]") registry.render_tree(surface) console.print() types = {} for c in surface.components: types[c.type] = types.get(c.type, 0) + 1 console.print(" [dim]Component types used:[/] " + ", ".join(f"[cyan]{t}[/]Γ{n}" for t, n in types.items())) if surface.data_model.data: console.print(f" [dim]Data model keys:[/] {list(surface.data_model.data.keys())}") console.print() console.print("n[bold]Demo 1: Agent generates an onboarding form[/]") demo_generative_ui( "Create a user onboarding flow: collect name, email, role (dropdown), " "preferred notification method (chips), and a 'Get Started' button." ) console.print("n[bold]Demo 2: Agent generates a data dashboard[/]") demo_generative_ui( "Show a project status dashboard with: project name 'Atlas v2', " "4 team members, sprint progress at 68%, 3 blockers flagged as critical, " "and action buttons for 'View Backlog' and 'Schedule Standup'." ) console.print("n[bold]Demo 3: Agent generates a confirmation dialog[/]") demo_generative_ui( "Show a payment confirmation: $2,450 charge to Visa ending 4242, " "order #ORD-8891, with Approve and Decline buttons." )
We hand the keys to the LLM and let it generate complete A2UI component trees at runtime from plain English descriptions, this is Generative UI in its purest form. We prompt the model with the A2UI schema and component catalog, and it produces fully structured surfaces with cards, forms, chips, progress bars, and data bindings, choosing the best UI pattern for each query. We run three demos, an onboarding flow, a project dashboard, and a payment confirmation, showing how the same agent adapts its interface to wildly different contexts without a single hardcoded layout.
hdr(4, "State Synchronization β Shared State Between Agent & UI", "AG-UI syncs state bidirectionally using STATE_SNAPSHOT and STATE_DELTA.n" "The agent IS the state machine; the UI IS the renderer.n" "JSON Patch diffs keep updates minimal and efficient.") class SharedState: def __init__(self, initial: dict = None): self.state: dict = initial or {} self.history: list[dict] = [] self.version: int = 0 def snapshot(self) -> AGUIEvent: return AGUIEvent(AGUIEventType.STATE_SNAPSHOT, {"state": copy.deepcopy(self.state), "version": self.version}) def apply_delta(self, operations: list[dict]) -> AGUIEvent: for op in operations: path_parts = [p for p in op["path"].split("https://www.marktechpost.com/") if p] target = self.state for part in path_parts[:-1]: if isinstance(target, dict): target = target.setdefault(part, {}) elif isinstance(target, list) and part.isdigit(): target = target[int(part)] key = path_parts[-1] if path_parts else None if key is None: continue if op["op"] == "replace": target[key] = op["value"] elif op["op"] == "add": if isinstance(target, list) and key.isdigit(): target.insert(int(key), op["value"]) else: target[key] = op["value"] elif op["op"] == "remove": if isinstance(target, dict): target.pop(key, None) self.version += 1 self.history.append({"version": self.version, "ops": operations}) return AGUIEvent(AGUIEventType.STATE_DELTA, {"delta": operations, "version": self.version}) console.print("n[bold]Demo: Document review pipeline β 3 agents, shared state[/]n") stream = AGUIEventStream() state = SharedState({ "document": {"title": "Q4 Strategy Report", "status": "draft", "word_count": 2840}, "pipeline": {"stage": "research", "progress": 0.0}, "agents": {"active": "Researcher", "queue": ["Editor", "Reviewer"]}, "feedback": [], }) def log_event(event: AGUIEvent): if event.type in (AGUIEventType.STATE_SNAPSHOT, AGUIEventType.STATE_DELTA): if event.type == AGUIEventType.STATE_DELTA: ops = event.data.get("delta", []) for op in ops: console.print(f" [yellow]STATE_DELTA[/] v{event.data.get('version')}: " f"[cyan]{op['op']}[/] {op['path']} β {op.get('value', 'β
')}") else: console.print(f" [yellow]STATE_SNAPSHOT[/] v{event.data.get('version')}: {list(event.data['state'].keys())}") stream.on(log_event) stream.emit(state.snapshot()) console.print("n[bold green]βΈ Researcher agent working...[/]") stream.emit(state.apply_delta([ {"op": "replace", "path": "https://www.marktechpost.com/pipeline/stage", "value": "research_complete"}, {"op": "replace", "path": "https://www.marktechpost.com/pipeline/progress", "value": 0.33}, {"op": "add", "path": "https://www.marktechpost.com/feedback/0", "value": {"agent": "Researcher", "note": "Added 4 new data sources"}}, ])) console.print("n[bold green]βΈ Editor agent working...[/]") stream.emit(state.apply_delta([ {"op": "replace", "path": "https://www.marktechpost.com/agents/active", "value": "Editor"}, {"op": "replace", "path": "https://www.marktechpost.com/pipeline/stage", "value": "editing"}, {"op": "replace", "path": "https://www.marktechpost.com/pipeline/progress", "value": 0.66}, {"op": "replace", "path": "https://www.marktechpost.com/document/word_count", "value": 3150}, ])) console.print("n[bold green]βΈ Reviewer agent working...[/]") stream.emit(state.apply_delta([ {"op": "replace", "path": "https://www.marktechpost.com/agents/active", "value": "Reviewer"}, {"op": "replace", "path": "https://www.marktechpost.com/pipeline/stage", "value": "review_complete"}, {"op": "replace", "path": "https://www.marktechpost.com/pipeline/progress", "value": 1.0}, {"op": "replace", "path": "https://www.marktechpost.com/document/status", "value": "approved"}, ])) console.print(Panel( json.dumps(state.state, indent=2), title="[bold]Final shared state after pipeline", border_style="green", )) t = Table(title="State History (versions)", box=box.ROUNDED) t.add_column("Version", style="cyan", justify="center") t.add_column("Operations", style="yellow") for h in state.history: ops_summary = "; ".join(f"{o['op']} {o['path']}" for o in h["ops"]) t.add_row(str(h["version"]), ops_summary[:70]) console.print(t) hdr(5, "Human-in-the-Loop β AG-UI INTERRUPT Events", "When an agent hits a high-stakes action, it emits an INTERRUPT event.n" "The frontend renders an approval UI. Execution pauses until the humann" "approves, rejects, or modifies. State is preserved throughout.") @dataclass class InterruptRequest: interrupt_id: str action_description: str risk_level: str affected_resources: list[str] proposed_changes: dict options: list[str] = field(default_factory=lambda: ["approve", "reject", "modify"]) @dataclass class InterruptResponse: interrupt_id: str decision: str modifications: Optional[dict] = None class InterruptableAgent: RISK_RULES = { "delete": "critical", "payment": "critical", "email_all": "high", "publish": "high", "update": "medium", "read": "low", } def __init__(self): self.stream = AGUIEventStream() self.pending_interrupts: dict[str, InterruptRequest] = {} def assess_and_maybe_interrupt(self, action: str, details: dict) -> Optional[InterruptRequest]: risk = "low" for keyword, level in self.RISK_RULES.items(): if keyword in action.lower(): risk = level break if risk in ("critical", "high"): interrupt = InterruptRequest( interrupt_id=str(uuid.uuid4())[:8], action_description=action, risk_level=risk, affected_resources=details.get("resources", []), proposed_changes=details.get("changes", {}), ) self.pending_interrupts[interrupt.interrupt_id] = interrupt self.stream.emit(AGUIEvent(AGUIEventType.INTERRUPT, { "interrupt_id": interrupt.interrupt_id, "reason": risk, "description": interrupt.action_description, "affected_resources": interrupt.affected_resources, "proposed_changes": interrupt.proposed_changes, "options": interrupt.options, })) return interrupt return None def resolve_interrupt(self, response: InterruptResponse) -> str: interrupt = self.pending_interrupts.pop(response.interrupt_id, None) if not interrupt: return "No pending interrupt found." if response.decision == "approve": return f"β
APPROVED: '{interrupt.action_description}' executing now." elif response.decision == "reject": return f"π« REJECTED: '{interrupt.action_description}' cancelled." elif response.decision == "modify": return f"βοΈ MODIFIED: '{interrupt.action_description}' updated with: {response.modifications}" return f"Unknown decision: {response.decision}" console.print("n[bold]Demo: Agent encounters actions of varying risk levels[/]n") agent = InterruptableAgent() actions = [ ("Read user profile", {"resources": ["user:123"]}), ("Update user preferences", {"resources": ["user:123"], "changes": {"theme": "dark"}}), ("Delete user account", {"resources": ["user:123", "data:all"], "changes": {"action": "permanent_delete"}}), ("Email all 12,000 users", {"resources": ["email:newsletter"], "changes": {"subject": "Big Announcement"}}), ("Publish blog post", {"resources": ["post:draft-42"], "changes": {"status": "public"}}), ] def event_logger(event): if event.type == AGUIEventType.INTERRUPT: d = event.data risk_style = {"critical": "bold red", "high": "bold yellow"}.get(d["reason"], "white") console.print(f"n [bold]π¨ INTERRUPT EVENT[/]") console.print(f" Risk: [{risk_style}]{d['reason'].upper()}[/]") console.print(f" Action: {d['description']}") console.print(f" Affected: {d['affected_resources']}") console.print(f" Options: {d['options']}") agent.stream.on(event_logger) results = [] for action_desc, details in actions: interrupt = agent.assess_and_maybe_interrupt(action_desc, details) if interrupt: decision = "reject" if interrupt.risk_level == "critical" else "approve" result = agent.resolve_interrupt(InterruptResponse(interrupt.interrupt_id, decision)) else: result = f"β‘ AUTO-EXECUTED: '{action_desc}' (low risk, no approval needed)" results.append((action_desc, result)) console.print() t = Table(title="Execution Results", box=box.ROUNDED, show_lines=True) t.add_column("Action", style="white", width=28) t.add_column("Outcome", style="dim", width=55) for action_desc, result in results: t.add_row(action_desc, result) console.print(t)
We build a SharedState engine that emits AG-UI STATE_SNAPSHOT and STATE_DELTA events using JSON Patch operations, keeping the agent backend and the frontend UI perfectly synchronized through every mutation. We demonstrate this with a three-agent document review pipeline in which a Researcher, Editor, and Reviewer each modify the shared state in sequence, and the frontend sees every change the instant it occurs. We then implement the AG-UI INTERRUPT pattern, in which the agent assesses risk levels for proposed actions, emits interrupt events for any dangerous actions, and pauses execution until a human approves, rejects, or modifies the plan.
hdr(6, "Full Pipeline β LLM-Driven Adaptive UI", "The complete Agentic UI architecture in one pipeline:n" " User query β Intent analysis β UI pattern selection βn" " A2UI generation β AG-UI event streaming β State sync β Render") UI_ROUTER_PROMPT = """ You are a UI routing agent. Given a user query, decide what type of UI to generate. RESPOND IN JSON ONLY: { "intent": "form | dashboard | confirmation | list | detail | wizard | error", "reasoning": "why this UI pattern fits", "ui_complexity": "simple | moderate | complex", "needs_approval": true/false, "data_requirements": ["what data the UI needs"] } """ class AgenticUIPipeline: def __init__(self): self.stream = AGUIEventStream() self.state = SharedState({"pipeline": {"stage": "idle"}, "renders": 0}) self.interrupt_agent = InterruptableAgent() def route(self, query: str) -> dict: resp = llm([ {"role": "system", "content": UI_ROUTER_PROMPT}, {"role": "user", "content": query}, ], max_tokens=300) if not resp: return {"intent": "dashboard", "reasoning": "fallback", "ui_complexity": "simple", "needs_approval": False, "data_requirements": []} raw = resp.choices[0].message.content try: return json.loads(re.sub(r'```jsons*|s*```', '', raw).strip()) except json.JSONDecodeError: return {"intent": "dashboard", "reasoning": "parse_fallback", "ui_complexity": "simple", "needs_approval": False, "data_requirements": []} def run(self, user_query: str): run_id = str(uuid.uuid4())[:8] console.print(Panel(f"[bold]{user_query}[/]", title="π§ User Query", border_style="white")) self.stream.emit(AGUIEvent(AGUIEventType.RUN_STARTED, {"run_id": run_id})) self.stream.emit(AGUIEvent(AGUIEventType.STEP_STARTED, {"step": "routing"})) routing = self.route(user_query) console.print(f"n [bold cyan]π‘ Router Decision:[/]") console.print(f" Intent: [green]{routing.get('intent')}[/] | " f"Complexity: [yellow]{routing.get('ui_complexity')}[/] | " f"Approval: {'π' if routing.get('needs_approval') else 'β
'}") console.print(f" Reasoning: [dim]{routing.get('reasoning', '')}[/]") self.stream.emit(AGUIEvent(AGUIEventType.STEP_FINISHED, {"step": "routing", "result": routing})) self.state.apply_delta([ {"op": "replace", "path": "https://www.marktechpost.com/pipeline/stage", "value": "generating"}, ]) self.stream.emit(AGUIEvent(AGUIEventType.STEP_STARTED, {"step": "generating_ui"})) msg_id = str(uuid.uuid4())[:8] self.stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_START, {"message_id": msg_id})) self.stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_CONTENT, { "message_id": msg_id, "delta": f"Building a {routing.get('intent')} interface for you..." })) self.stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_END, {"message_id": msg_id})) surface = generate_ui(user_query) self.stream.emit(AGUIEvent(AGUIEventType.STEP_FINISHED, {"step": "generating_ui"})) if routing.get("needs_approval") and surface: self.stream.emit(AGUIEvent(AGUIEventType.INTERRUPT, { "reason": "ui_confirmation", "description": f"Generated {len(surface.components)} component UI. Render it?", "options": ["render", "regenerate", "cancel"], })) console.print("n [bold yellow]βΈ INTERRUPT:[/] UI generated, awaiting human approval...") console.print(" [green]β Auto-approving for demo...[/]") if surface: self.state.apply_delta([ {"op": "replace", "path": "https://www.marktechpost.com/pipeline/stage", "value": "rendering"}, {"op": "replace", "path": "https://www.marktechpost.com/renders", "value": self.state.state.get("renders", 0) + 1}, ]) console.print(f"n[bold green]π₯ Rendered Interface ({len(surface.components)} components):[/]n") registry.render_tree(surface) self.stream.emit(AGUIEvent(AGUIEventType.CUSTOM, { "subtype": "a2ui_surface", "surface": surface.to_messages(), })) self.state.apply_delta([{"op": "replace", "path": "https://www.marktechpost.com/pipeline/stage", "value": "complete"}]) self.stream.emit(AGUIEvent(AGUIEventType.RUN_FINISHED, {"run_id": run_id, "status": "success"})) console.print() event_counts = {} for e in self.stream.events: event_counts[e.type.value] = event_counts.get(e.type.value, 0) + 1 t = Table(title="Pipeline Event Summary", box=box.ROUNDED) t.add_column("Event Type", style="cyan") t.add_column("Count", justify="center", style="green") for etype, count in sorted(event_counts.items()): t.add_row(etype, str(count)) console.print(t) pipeline = AgenticUIPipeline() console.print("n[bold]Demo 1: Agent builds a settings form[/]") pipeline.run( "Create a notification settings panel where I can toggle email/SMS/push, " "set quiet hours, and pick a notification sound." ) pipeline.stream = AGUIEventStream() pipeline.state = SharedState({"pipeline": {"stage": "idle"}, "renders": 0}) console.print("n[bold]Demo 2: Agent builds an order tracking dashboard[/]") pipeline.run( "Show order #ORD-7742 status: shipped via FedEx, tracking 789456123, " "estimated delivery March 24, 2 of 3 items delivered. Show a progress bar " "and action buttons for 'Contact Support' and 'Request Refund'." ) hdr(7, "Incremental UI Updates β Live Surface Modification", "A2UI surfaces are incrementally updateable. The agent can add, remove,n" "or modify components and data bindings on a live surface withoutn" "regenerating the whole tree. Essential for real-time collaboration.") class LiveSurface: def __init__(self, surface: A2UISurface): self.surface = surface self.update_log: list[str] = [] def add_component(self, component: A2UIComponent, parent_id: Optional[str] = None): self.surface.components.append(component) if parent_id: for c in self.surface.components: if c.id == parent_id: c.children.append(component.id) break self.update_log.append(f"ADD {component.type}#{component.id} β parent:{parent_id or 'root'}") def update_component(self, component_id: str, new_props: dict): for c in self.surface.components: if c.id == component_id: c.properties.update(new_props) self.update_log.append(f"UPD #{component_id} props: {list(new_props.keys())}") return self.update_log.append(f"ERR #{component_id} not found") def remove_component(self, component_id: str): self.surface.components = [c for c in self.surface.components if c.id != component_id] for c in self.surface.components: if component_id in c.children: c.children.remove(component_id) self.update_log.append(f"DEL #{component_id}") def update_data(self, path: str, value: Any): self.surface.data_model.data = _set_nested(self.surface.data_model.data, path, value) self.update_log.append(f"DATA {path} = {value}") def _set_nested(d: dict, path: str, value: Any) -> dict: parts = [p for p in path.split("https://www.marktechpost.com/") if p] d = copy.deepcopy(d) current = d for p in parts[:-1]: current = current.setdefault(p, {}) if parts: current[parts[-1]] = value return d console.print("n[bold]Demo: Live collaborative editing β agent modifies UI in real-time[/]n") initial = A2UISurface( surface_id="task-board", components=[ A2UIComponent("board", "card", {"title": "π Sprint Board"}, children=["t1", "t2", "t3"]), A2UIComponent("t1", "chip", {"label": "AUTH-101: Login flow", "variant": "in_progress"}), A2UIComponent("t2", "chip", {"label": "AUTH-102: OAuth setup", "variant": "todo"}), A2UIComponent("t3", "chip", {"label": "AUTH-103: 2FA", "variant": "todo"}), ], data_model=A2UIDataModel({"sprint": {"name": "Sprint 14", "velocity": 21}}), ) live = LiveSurface(initial) console.print("[bold]Initial board:[/]") registry.render_tree(live.surface) console.print("n[bold yellow]Agent updating board in real-time...[/]n") live.update_component("t1", {"variant": "done", "label": "β
AUTH-101: Login flow"}) live.update_component("t2", {"variant": "in_progress", "label": "π AUTH-102: OAuth setup"}) live.add_component( A2UIComponent("t4", "chip", {"label": "AUTH-104: Password reset", "variant": "todo"}), parent_id="board" ) live.update_data("https://www.marktechpost.com/sprint/velocity", 25) live.remove_component("t3") console.print("[bold]Updated board:[/]") registry.render_tree(live.surface) console.print() t = Table(title="Incremental Update Log", box=box.ROUNDED) t.add_column("#", style="cyan", width=4, justify="center") t.add_column("Operation", style="yellow") for i, entry in enumerate(live.update_log, 1): t.add_row(str(i), entry) console.print(t)
We wire every piece together into a single AgenticUIPipeline class that takes a user query, classifies its intent with an LLM router, selects the right UI pattern, generates an A2UI surface, streams the entire process over AG-UI events, manages shared state, and renders the result, the complete architecture in one run. We then build a LiveSurface class that supports incremental A2UI updates: adding, modifying, and removing components on an already-rendered surface without regenerating the whole tree, which is essential for real-time collaborative experiences. We demo this with a sprint board that an agent updates live, marking tasks complete, adding new ones, and adjusting data model values, all tracked in a detailed operation log.
hdr(8, "Reference β The Agentic UI Protocol Stack", "How AG-UI, A2UI, MCP, and A2A fit together in the modern agent architecture.") console.print(Panel(""" [bold white]THE AGENTIC UI STACK (2026)[/] ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β [bold cyan]USER INTERFACE[/] (React, Flutter, SwiftUI, Terminal) β β Renders native widgets from component specs β ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββ β A2UI component trees (JSON) β AG-UI events (SSE / WebSocket) ββββββββββββββββββββββββββ΄ββββββββββββββββββββββββββββββ β [bold yellow]AG-UI PROTOCOL[/] (Agent β User Interaction) β β β’ Event streaming (TEXT, TOOL_CALL, STATE, INTERRUPT)β β β’ Bidirectional state sync (SNAPSHOT + DELTA) β β β’ Human-in-the-loop (INTERRUPT β approval flow) β ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββ β ββββββββββββββββββββββββββ΄ββββββββββββββββββββββββββββββ β [bold magenta]AGENT RUNTIME[/] (LangGraph, CrewAI, custom, etc.) β β β’ Generates A2UI surfaces (Generative UI) β β β’ Manages shared state β β β’ Orchestrates sub-agents via A2A protocol β β β’ Accesses tools via MCP protocol β ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββ β ββββββββββββββββββββββββββ΄ββββββββββββββββββββββββββββββ β [bold green]LLM BACKBONE[/] (GPT, Claude, Gemini, etc.) β β β’ Generates component trees as structured output β β β’ Reasons about UI patterns per context β β β’ Streams tokens for real-time rendering β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ [dim]Protocol roles: AG-UI = Agent β User (streaming events, state, HITL) A2UI = Agent β UI (declarative component specs) A2A = Agent β Agent (delegation, sub-agents) MCP = Agent β Tools (function calling, context)[/] """, title="[bold]Architecture Reference", border_style="cyan")) ref = Table(title="Agentic UI Concepts β Quick Reference", box=box.DOUBLE_EDGE, show_lines=True) ref.add_column("Concept", style="bold cyan", width=22) ref.add_column("What It Does", style="white", width=35) ref.add_column("Key Mechanism", style="yellow", width=28) ref.add_row("AG-UI Events", "Stream agent actions to frontend in real-time", "SSE/WebSocket + ~16 event types") ref.add_row("A2UI Components", "Declarative UI trees β safe, portable, native", "Flat JSON + widget registry") ref.add_row("State Sync", "Keep agent & UI state in lockstep", "STATE_SNAPSHOT + STATE_DELTA") ref.add_row("Generative UI", "LLM generates UI at runtime, not just text", "A2UI JSON as structured output") ref.add_row("INTERRUPT (HITL)", "Pause execution for human approval", "INTERRUPT event β approval flow") ref.add_row("Incremental Update","Modify live surfaces without full regeneration", "A2UI updateComponents message") ref.add_row("Data Binding", "UI reads from a shared data model", "JSON Pointer paths (/path/to/val)") ref.add_row("Widget Registry", "Client maps abstract types to native widgets", "Catalog of trusted components") console.print(ref) console.print(Panel( "[bold green]Tutorial complete![/]nn" "[dim]What you built:[/]n" " β’ A full AG-UI event system with all 16 event typesn" " β’ An A2UI renderer with flat adjacency-list components + data bindingn" " β’ LLM-powered Generative UI that creates interfaces from natural languagen" " β’ Bidirectional state sync with JSON Patch deltasn" " β’ Human-in-the-loop interrupt and approval flowsn" " β’ Incremental live surface updatesnn" "[dim]To go further:[/]n" " β’ Serve AG-UI events over real SSE with FastAPIn" " β’ Connect to CopilotKit React components for a real frontendn" " β’ Use Pydantic AI's AGUIAdapter for production agent hostingn" " β’ Add A2A protocol for multi-agent delegationn" " β’ Deploy on AWS Bedrock AgentCore with native AG-UI support", title="[bold]π What's Next?", border_style="green", padding=(1, 2), ))
We close with a visual protocol stack diagram showing exactly how AG-UI, A2UI, A2A, and MCP fit together in the modern agentic architecture, from the LLM backbone at the bottom to the native UI at the top. We provide a quick-reference table mapping every concept we built, event streaming, component trees, state sync, generative UI, interrupts, incremental updates, data binding, and widget registries, to their core mechanisms. We point the way forward to production: serving AG-UI events over real SSE with FastAPI, connecting to CopilotKit React components, using Pydantic AIβs AGUIAdapter, and deploying on AWS Bedrock AgentCore.
In conclusion, we have a fully functional Agentic UI pipeline that takes a simple natural-language query and transforms it into a structured, interactive interface powered by an intelligent agent. We do not just assemble components; we understand how each layer operates and connects, from real-time AG-UI event streaming and declarative A2UI interface definitions to state synchronization through JSON Patch and enforced human-in-the-loop safety mechanisms. This clarity allows us to reason about system behavior, debug effectively, and extend functionality without relying on black-box abstractions. Also, we leave with the ability to design our own agent-driven UI systems, adapt them to different use cases, and confidently build production-ready experiences where agents and interfaces evolve together in a controlled, transparent, and scalable manner.
Check outΒ theΒ Full Codes with Notebook here.Β Also,Β feel free to follow us onΒ TwitterΒ and donβt forget to join ourΒ 130k+ ML SubRedditΒ and Subscribe toΒ our Newsletter. Wait! are you on telegram?Β now you can join us on telegram as well.
Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.?Β Connect with us
Sana Hassan
Sana Hassan, a consulting intern at Marktechpost and dual-degree student at IIT Madras, is passionate about applying technology and AI to address real-world challenges. With a keen interest in solving practical problems, he brings a fresh perspective to the intersection of AI and real-life solutions.


