How to Design a Streaming Decision Agent with Partial Reasoning, Online Replanning, and Reactive Mid-Execution Adaptation in Dynamic Environments

how-to-design-a-streaming-decision-agent-with-partial-reasoning,-online-replanning,-and-reactive-mid-execution-adaptation-in-dynamic-environments
How to Design a Streaming Decision Agent with Partial Reasoning, Online Replanning, and Reactive Mid-Execution Adaptation in Dynamic Environments

In this tutorial, we build a Streaming Decision Agent that thinks and acts in an online, changing environment while continuously streaming safe, partial reasoning updates. We implement a dynamic grid world with moving obstacles and a shifting goal, then use an online A* planner in a receding-horizon loop to commit to only a few near-term moves and re-evaluate frequently. As we execute, we make intermediate decisions that can override the plan when a step becomes invalid or locally risky, allowing us to adapt mid-run rather than unthinkingly following a stale trajectory.

import random, math, time from dataclasses import dataclass, field from typing import List, Tuple, Dict, Optional, Generator, Any from collections import deque, defaultdict   try:    from pydantic import BaseModel, Field except Exception:    raise RuntimeError("Please install pydantic: `!pip -q install pydantic` (then rerun).")   class StreamEvent(BaseModel):    t: float = Field(..., description="Wall-clock time (seconds since start)")    kind: str = Field(..., description="event type, e.g., plan/update/act/observe/done")    step: int = Field(..., description="agent step counter")    msg: str = Field(..., description="human-readable partial reasoning summary")    data: Dict[str, Any] = Field(default_factory=dict, description="structured payload")   Coord = Tuple[int, int]

We define the streaming event schema and core type structures that allow us to emit structured reasoning updates. We use Pydantic to formalize the structure of each streamed decision or observation safely and consistently. We establish the foundational interface that powers incremental reasoning throughout the agent lifecycle.

@dataclass class DynamicGridWorld:    w: int = 18    h: int = 10    obstacle_ratio: float = 0.18    seed: int = 7    move_obstacles_every: int = 6    spawn_obstacle_prob: float = 0.25    clear_obstacle_prob: float = 0.15    target_jitter_prob: float = 0.35    rng: random.Random = field(init=False)    obstacles: set = field(init=False, default_factory=set)    agent: Coord = field(init=False, default=(1, 1))    target: Coord = field(init=False, default=(15, 7))    step_count: int = field(init=False, default=0)      def __post_init__(self):        self.rng = random.Random(self.seed)        self.reset()      def reset(self):        self.step_count = 0        self.obstacles = set()        for y in range(self.h):            for x in range(self.w):                if (x, y) in [(1, 1), (self.w - 2, self.h - 2)]:                    continue                if self.rng.random() < self.obstacle_ratio:                    self.obstacles.add((x, y))        self.agent = (1, 1)        self.target = (self.w - 2, self.h - 2)        self._ensure_free(self.agent)        self._ensure_free(self.target)      def _ensure_free(self, c: Coord):        if c in self.obstacles:            self.obstacles.remove(c)      def in_bounds(self, c: Coord) -> bool:        x, y = c        return 0 <= x < self.w and 0 <= y < self.h      def passable(self, c: Coord) -> bool:        return c not in self.obstacles      def neighbors4(self, c: Coord) -> List[Coord]:        x, y = c        cand = [(x+1,y), (x-1,y), (x,y+1), (x,y-1)]        return [p for p in cand if self.in_bounds(p) and self.passable(p)]      def manhattan(self, a: Coord, b: Coord) -> int:        return abs(a[0]-b[0]) + abs(a[1]-b[1])      def maybe_world_changes(self) -> Dict[str, Any]:        changes = {"obstacles_added": [], "obstacles_cleared": [], "target_moved": False}        self.step_count += 1        if self.rng.random() < self.target_jitter_prob:            tx, ty = self.target            options = [(tx+1,ty),(tx-1,ty),(tx,ty+1),(tx,ty-1)]            options = [c for c in options if self.in_bounds(c) and c != self.agent]            self.rng.shuffle(options)            for c in options[:3]:                if c not in self.obstacles:                    self.target = c                    changes["target_moved"] = True                    break        if self.step_count % self.move_obstacles_every == 0:            for _ in range(4):                if self.rng.random() < self.clear_obstacle_prob and self.obstacles:                    c = self.rng.choice(tuple(self.obstacles))                    self.obstacles.remove(c)                    changes["obstacles_cleared"].append(c)            for _ in range(5):                if self.rng.random() < self.spawn_obstacle_prob:                    c = (self.rng.randrange(self.w), self.rng.randrange(self.h))                    if c != self.agent and c != self.target:                        self.obstacles.add(c)                        changes["obstacles_added"].append(c)            self._ensure_free(self.agent)            self._ensure_free(self.target)        return changes      def step(self, action: str) -> Dict[str, Any]:        ax, ay = self.agent        move = {"R": (ax+1, ay), "L": (ax-1, ay), "D": (ax, ay+1), "U": (ax, ay-1), "S": (ax, ay)}[action]        moved = False        if self.in_bounds(move) and self.passable(move):            self.agent = move            moved = True        changes = self.maybe_world_changes()        done = (self.agent == self.target)        return {"moved": moved, "agent": self.agent, "target": self.target, "done": done, "changes": changes}      def render(self, path: Optional[List[Coord]] = None) -> str:        path_set = set(path or [])        lines = []        for y in range(self.h):            row = []            for x in range(self.w):                c = (x, y)                if c == self.agent:                    row.append("A")                elif c == self.target:                    row.append("T")                elif c in path_set:                    row.append("·")                elif c in self.obstacles:                    row.append("█")                else:                    row.append(" ")            lines.append("".join(row))        border = "+" + "-" * self.w + "+"        body = "n".join(["|" + ln + "|" for ln in lines])        return f"{border}n{body}n{border}"

We construct a dynamic grid world that evolves with shifting obstacles and a moving target. We simulate environmental non-stationarity to test online planning under uncertainty. We implement rendering and world-transition logic to observe how the agent reacts to real-time changes.

@dataclass class PlanResult:    path: List[Coord]    cost: float    expanded: int    reason: str   def astar(world: DynamicGridWorld, start: Coord, goal: Coord, max_expand: int = 5000) -> PlanResult:    frontier = []    import heapq    heapq.heappush(frontier, (world.manhattan(start, goal), 0, start))    came_from: Dict[Coord, Optional[Coord]] = {start: None}    gscore: Dict[Coord, float] = {start: 0}    expanded = 0    while frontier and expanded < max_expand:        f, g, cur = heapq.heappop(frontier)        expanded += 1        if cur == goal:            path = []            c = cur            while c is not None:                path.append(c)                c = came_from[c]            path.reverse()            return PlanResult(path=path, cost=gscore[cur], expanded=expanded, reason="found_path")        for nb in world.neighbors4(cur):            ng = gscore[cur] + 1            if nb not in gscore or ng < gscore[nb]:                gscore[nb] = ng                came_from[nb] = cur                h = world.manhattan(nb, goal)                heapq.heappush(frontier, (ng + h, ng, nb))    return PlanResult(path=[start], cost=float("inf"), expanded=expanded, reason="no_path_or_budget")   def path_to_actions(path: List[Coord]) -> List[str]:    actions = []    for (x1,y1),(x2,y2) in zip(path, path[1:]):        if x2 == x1+1 and y2 == y1: actions.append("R")        elif x2 == x1-1 and y2 == y1: actions.append("L")        elif x2 == x1 and y2 == y1+1: actions.append("D")        elif x2 == x1 and y2 == y1-1: actions.append("U")        else: actions.append("S")    return actions   def action_risk(world: DynamicGridWorld, next_pos: Coord) -> float:    x, y = next_pos    near = 0    for dx, dy in [(1,0),(-1,0),(0,1),(0,-1)]:        c = (x+dx, y+dy)        if world.in_bounds(c) and c in world.obstacles:            near += 1    edge = 1 if (x in [0, world.w-1] or y in [0, world.h-1]) else 0    return 0.25 * near + 0.15 * edge

We implement the A* online planner along with action extraction and local risk evaluation. We compute shortest paths incrementally while respecting a computational budget. We also introduce a lightweight risk model to enable us to override unsafe planned moves during execution.

@dataclass class AgentConfig:    horizon: int = 6    replan_on_target_move: bool = True    replan_on_obstacle_change: bool = True    max_steps: int = 120    think_latency: float = 0.02    act_latency: float = 0.01    risk_gate: float = 0.85    alt_search_depth: int = 2   @dataclass class StreamingDecisionAgent:    cfg: AgentConfig    world: DynamicGridWorld    start_time: float = field(init=False, default_factory=time.time)    step_id: int = field(init=False, default=0)    current_plan: List[Coord] = field(init=False, default_factory=list)    current_actions: List[str] = field(init=False, default_factory=list)    last_snapshot: Dict[str, Any] = field(init=False, default_factory=dict)    stats: Dict[str, Any] = field(init=False, default_factory=lambda: defaultdict(int))      def _now(self) -> float:        return time.time() - self.start_time      def _emit(self, kind: str, msg: str, data: Optional[Dict[str, Any]] = None) -> StreamEvent:        return StreamEvent(t=self._now(), kind=kind, step=self.step_id, msg=msg, data=data or {})      def _need_replan(self, obs: Dict[str, Any]) -> bool:        ch = obs["changes"]        if obs["done"]:            return False        if not self.current_plan or len(self.current_plan) <= 1:            return True        if self.cfg.replan_on_target_move and ch.get("target_moved"):            return True        if self.cfg.replan_on_obstacle_change and (ch.get("obstacles_added") or ch.get("obstacles_cleared")):            return True        if len(self.current_plan) > 1 and self.current_plan[1] in self.world.obstacles:            return True        return False      def _plan(self) -> PlanResult:        time.sleep(self.cfg.think_latency)        self.stats["replans"] += 1        return astar(self.world, self.world.agent, self.world.target)      def _choose_action(self, planned_action: str) -> Tuple[str, str]:        ax, ay = self.world.agent        action_to_delta = {"R": (1,0), "L": (-1,0), "D": (0,1), "U": (0,-1), "S": (0,0)}        dx, dy = action_to_delta[planned_action]        nxt = (ax+dx, ay+dy)        if not self.world.in_bounds(nxt) or not self.world.passable(nxt):            self.stats["overrides"] += 1            return "S", "planned_move_invalid -> wait."        r = action_risk(self.world, nxt)        if r > self.cfg.risk_gate:            candidates = ["U","D","L","R","S"]            best = (planned_action, float("inf"), "keep_plan")            for a in candidates:                dx, dy = action_to_delta[a]                p = (ax+dx, ay+dy)                if not self.world.in_bounds(p) or not self.world.passable(p):                    continue                score = action_risk(self.world, p) + 0.05 * self.world.manhattan(p, self.world.target)                if score < best[1]:                    best = (a, score, "risk_avoidance_override")            if best[0] != planned_action:                self.stats["overrides"] += 1                return best[0], best[2]        return planned_action, "follow_plan"      def run(self) -> Generator[StreamEvent, None, None]:        yield self._emit("observe", "Initialize: reading initial state.", {"agent": self.world.agent, "target": self.world.target})        yield self._emit("world", "Initial world snapshot.", {"grid": self.world.render()})        for self.step_id in range(1, self.cfg.max_steps + 1):            if self.step_id == 1 or self._need_replan(self.last_snapshot):                pr = self._plan()                self.current_plan = pr.path                self.current_actions = path_to_actions(pr.path)                if pr.reason != "found_path":                    yield self._emit("plan", "Planner could not find a path within budget; switching to reactive exploration.", {"reason": pr.reason, "expanded": pr.expanded})                    self.current_actions = []                else:                    horizon_path = pr.path[: max(2, min(len(pr.path), self.cfg.horizon + 1))]                    yield self._emit("plan", f"Plan updated (online A*). Commit to next {len(horizon_path)-1} moves, then re-evaluate.", {"reason": pr.reason, "path_len": len(pr.path), "expanded": pr.expanded, "commit_horizon": self.cfg.horizon, "horizon_path": horizon_path, "grid_with_path": self.world.render(path=horizon_path)})            if self.current_actions:                planned_action = self.current_actions[0]            else:                ax, ay = self.world.agent                tx, ty = self.world.target                options = []                if tx > ax: options.append("R")                if tx < ax: options.append("L")                if ty > ay: options.append("D")                if ty < ay: options.append("U")                options += ["S","U","D","L","R"]                planned_action = options[0]            action, why = self._choose_action(planned_action)            yield self._emit("decide", f"Intermediate decision: action={action} ({why}).", {"planned_action": planned_action, "chosen_action": action, "agent": self.world.agent, "target": self.world.target})            time.sleep(self.cfg.act_latency)            obs = self.world.step(action)            self.last_snapshot = obs            if self.current_actions:                if action == planned_action:                    self.current_actions = self.current_actions[1:]                    if len(self.current_plan) > 1:                        self.current_plan = self.current_plan[1:]            ch = obs["changes"]            surprise = []            if ch.get("target_moved"): surprise.append("target_moved")            if ch.get("obstacles_added"): surprise.append(f"obstacles_added={len(ch['obstacles_added'])}")            if ch.get("obstacles_cleared"): surprise.append(f"obstacles_cleared={len(ch['obstacles_cleared'])}")            surprise_msg = ("Surprises: " + ", ".join(surprise)) if surprise else "No major surprises."            self.stats["steps"] += 1            if obs["moved"]: self.stats["moves"] += 1            if ch.get("target_moved"): self.stats["target_moves"] += 1            if ch.get("obstacles_added") or ch.get("obstacles_cleared"): self.stats["world_shifts"] += 1            yield self._emit("observe", f"Observed outcome. {surprise_msg}", {"moved": obs["moved"], "agent": obs["agent"], "target": obs["target"], "done": obs["done"], "changes": ch, "grid": self.world.render(path=self.current_plan[: min(len(self.current_plan), 10)])})            if obs["done"]:                yield self._emit("done", "Goal reached. Stopping execution.", {"final_agent": obs["agent"], "final_target": obs["target"], "stats": dict(self.stats)})                return        yield self._emit("done", "Max steps reached without reaching the goal.", {"final_agent": self.world.agent, "final_target": self.world.target, "stats": dict(self.stats)})

We design the Streaming Decision Agent that integrates planning, monitoring, and reactive overrides. We implement receding-horizon control, committing only to near-term steps and replanning when surprises occur. We stream structured events at every stage, planning, deciding, acting, and observing, to demonstrate incremental reasoning in action.

def run_and_print(agent: StreamingDecisionAgent, throttle: float = 0.0):    last_kind = None    for ev in agent.run():        header = f"[t={ev.t:6.2f}s | step={ev.step:03d} | {ev.kind.upper():7}]"        print(header, ev.msg)        if ev.kind in {"plan", "observe", "world"}:            if "grid_with_path" in ev.data:                print(ev.data["grid_with_path"])            elif "grid" in ev.data:                print(ev.data["grid"])        if throttle > 0:            time.sleep(throttle)   world = DynamicGridWorld(w=18, h=10, obstacle_ratio=0.18, seed=10, move_obstacles_every=6) cfg = AgentConfig(horizon=6, replan_on_target_move=True, replan_on_obstacle_change=True, max_steps=120, think_latency=0.01, act_latency=0.01, risk_gate=0.85, alt_search_depth=2) agent = StreamingDecisionAgent(cfg=cfg, world=world) run_and_print(agent, throttle=0.0)

We build the streaming runner and execute the full agent loop inside the dynamic environment. We print structured reasoning updates in real time to simulate a live decision stream. We finally observe the agent adapting continuously, replanning when needed, and completing the task under changing conditions.

In conclusion, we have an agent that demonstrates incremental reasoning, online planning, and reactive behavior, and it is easy to run and inspect in Colab. We saw how streaming structured events makes the agent’s decision process observable while still keeping reasoning summaries concise and user-safe. Also, we showed how continuous monitoring and replanning turn a static planner into a responsive system that can handle surprises, moving targets, changing obstacles, and step-level risk without stopping execution.


Check out Full Codes hereAlso, feel free to follow us on Twitter and don’t forget to join our 120k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Leave a Reply

Your email address will not be published. Required fields are marked *