How a Haystack-Powered Multi-Agent System Detects Incidents, Investigates Metrics and Logs, and Produces Production-Grade Incident Reviews End-to-End

how-a-haystack-powered-multi-agent-system-detects-incidents,-investigates-metrics-and-logs,-and-produces-production-grade-incident-reviews-end-to-end
How a Haystack-Powered Multi-Agent System Detects Incidents, Investigates Metrics and Logs, and Produces Production-Grade Incident Reviews End-to-End

In this tutorial, we design this implementation to demonstrate how Haystack enables building advanced, agentic AI systems that go far beyond toy examples while remaining fully runnable. We focus on a cohesive, end-to-end setup that highlights orchestration, stateful decision-making, tool execution, and structured control flow, demonstrating how complex agent behavior can be cleanly expressed. We deliberately keep everything in a single executable snippet to emphasize reproducibility and to make it easy for us to experiment, extend, and stress-test the system in realistic scenarios. Check out the FULL CODES here.

import os, json, math, random, textwrap from datetime import datetime, timedelta   try:    import pandas as pd except Exception:    os.system("pip -q install pandas")    import pandas as pd   try:    import numpy as np except Exception:    os.system("pip -q install numpy")    import numpy as np   try:    import duckdb except Exception:    os.system("pip -q install duckdb")    import duckdb   os.system("pip -q install haystack-ai openai")   from haystack.components.agents import Agent from haystack.components.generators.chat import OpenAIChatGenerator from haystack.dataclasses import ChatMessage from haystack.tools import tool from haystack.components.agents.state import State from haystack.components.agents.state.state_utils import merge_lists from haystack.tools import ComponentTool   from getpass import getpass   if not os.getenv("OPENAI_API_KEY"):    key = getpass("Enter OPENAI_API_KEY (input hidden): ").strip()    if key:        os.environ["OPENAI_API_KEY"] = key   if not os.getenv("OPENAI_API_KEY"):    raise RuntimeError("OPENAI_API_KEY missing. Set it in the environment or paste when prompted.")

We install and import all required libraries, ensuring that Haystack, OpenAI, and data tooling are available, and securely load the OpenAI API key at runtime. We configure the environment to gracefully handle missing dependencies and prompt for credentials without hardcoding sensitive information. We prepare the foundation for an agent-driven workflow by initializing core Haystack components, tools, and state utilities in a Colab-ready setup. Check out the FULL CODES here.

random.seed(7) np.random.seed(7)   now = datetime.utcnow() start = now - timedelta(hours=24)   services = ["api-gateway", "payments", "auth", "db-proxy", "worker", "web"] regions = ["eu-central-1", "eu-west-1", "us-east-1"] levels = ["INFO", "WARN", "ERROR"] error_kinds = [    "UpstreamTimeout",    "DBConnPoolExhausted",    "JWTSignatureInvalid",    "RateLimitExceeded",    "DeadlockDetected",    "CacheMissStorm",    "OOMKilled",    "TLSHandshakeFailure", ]   def synth_metrics(n=1440):    ts = [start + timedelta(minutes=i) for i in range(n)]    base_rps = 220 + 40*np.sin(np.linspace(0, 8*math.pi, n)) + np.random.normal(0, 10, n)    base_p95 = 180 + 30*np.sin(np.linspace(0, 6*math.pi, n) + 0.5) + np.random.normal(0, 8, n)    base_err = np.clip(np.random.normal(0.006, 0.002, n), 0.0, 0.05)    incident_t0 = int(n*0.62)    incident_t1 = incident_t0 + int(n*0.10)    base_p95[incident_t0:incident_t1] += np.linspace(120, 520, incident_t1-incident_t0)    base_err[incident_t0:incident_t1] += np.linspace(0.01, 0.07, incident_t1-incident_t0)    base_rps[incident_t0:incident_t1] -= np.linspace(5, 80, incident_t1-incident_t0)    df = pd.DataFrame({        "ts": ts,        "rps": np.clip(base_rps, 5, None),        "p95_ms": np.clip(base_p95, 10, None),        "error_rate": np.clip(base_err, 0.0, 0.2),    })    return df, (ts[incident_t0], ts[incident_t1])   metrics_df, (incident_begin, incident_end) = synth_metrics()

We seed randomness and generate a realistic 24-hour stream of synthetic service metrics with periodic behavior and noise. We deliberately introduce an incident window during which latency and error rates spike while request throughput degrades. We return both the metrics DataFrame and precise incident boundaries to support downstream detection and agent reasoning. Check out the FULL CODES here.

def synth_logs(n=9000):    rows = []    for _ in range(n):        t = start + timedelta(seconds=random.randint(0, 24*3600-1))        svc = random.choice(services)        reg = random.choice(regions)        lvl = random.choices(levels, weights=[0.86, 0.10, 0.04])[0]        kind = None        msg = "ok"        latency = max(5, int(np.random.normal(120, 55)))        if incident_begin <= t <= incident_end and svc in ["api-gateway", "payments", "db-proxy"]:            if random.random() < 0.24:                lvl = random.choices(["WARN","ERROR"], weights=[0.55,0.45])[0]                kind = random.choices(                    ["UpstreamTimeout","DBConnPoolExhausted","DeadlockDetected","CacheMissStorm"],                    weights=[0.40,0.28,0.10,0.22]                )[0]                latency += random.randint(300, 1200)                msg = f"{kind}: request failed"        if lvl == "ERROR" and kind is None and random.random() < 0.45:            kind = random.choice(error_kinds)            msg = f"{kind}: unexpected failure"            latency += random.randint(80, 700)        trace = f"tr_{random.randint(10**7,10**8-1)}"        user = f"u_{random.randint(1,20000)}"        endpoint = random.choice(["https://www.marktechpost.com/pay","https://www.marktechpost.com/auth","https://www.marktechpost.com/refund","https://www.marktechpost.com/status","https://www.marktechpost.com/checkout","https://www.marktechpost.com/profile","https://www.marktechpost.com/ledger"])        rows.append({            "ts": t,            "service": svc,            "region": reg,            "level": lvl,            "error_kind": kind or "",            "endpoint": endpoint,            "latency_ms": latency,            "trace_id": trace,            "user_id": user,            "message": msg        })    df = pd.DataFrame(rows).sort_values("ts").reset_index(drop=True)    return df   logs_df = synth_logs()   metrics_path = "https://www.marktechpost.com/content/metrics.csv" logs_path = "https://www.marktechpost.com/content/logs.csv" metrics_df.to_csv(metrics_path, index=False) logs_df.to_csv(logs_path, index=False)   con = duckdb.connect(database=":memory:") con.execute("CREATE TABLE metrics AS SELECT * FROM read_csv_auto(?, HEADER=TRUE)", [metrics_path]) con.execute("CREATE TABLE logs AS SELECT * FROM read_csv_auto(?, HEADER=TRUE)", [logs_path])

We synthesize high-volume, time-distributed logs with realistic service, region, severity, and error patterns that intensify during the incident window. We persist both metrics and logs to CSV and load them into an in-memory DuckDB database for fast analytical queries. We prepare a unified, queryable observability dataset that supports correlation between latency, errors, and log-level signals. Check out the FULL CODES here.

def zscore_anomalies(series, window=60, z=3.0):    x = series.astype(float).values    out = np.zeros_like(x, dtype=bool)    for i in range(len(x)):        lo = max(0, i-window)        hi = i        if hi - lo < max(10, window//4):            continue        mu = float(np.mean(x[lo:hi]))        sd = float(np.std(x[lo:hi])) + 1e-9        out[i] = abs((x[i]-mu)/sd) >= z    return out   @tool def load_inputs(metrics_csv_path: str, logs_csv_path: str) -> dict:    m = pd.read_csv(metrics_csv_path, parse_dates=["ts"])    l = pd.read_csv(logs_csv_path, parse_dates=["ts"])    return {        "metrics_summary": {            "rows": int(len(m)),            "start": str(m["ts"].min()),            "end": str(m["ts"].max()),            "cols": list(m.columns)        },        "logs_summary": {            "rows": int(len(l)),            "start": str(l["ts"].min()),            "end": str(l["ts"].max()),            "cols": list(l.columns),            "services": sorted(l["service"].unique().tolist()),            "regions": sorted(l["region"].unique().tolist())        }    }   @tool def detect_incident_window(metric: str, z_threshold: float = 3.2, min_span_minutes: int = 10) -> dict:    if metric not in ["rps","p95_ms","error_rate"]:        return {"error": "metric must be one of: rps, p95_ms, error_rate"}    df = metrics_df.copy().sort_values("ts")    flags = zscore_anomalies(df[metric], window=75, z=float(z_threshold))    df["flag"] = flags    idx = np.where(df["flag"].values)[0]    if len(idx) == 0:        return {"found": False}    groups = []    cur = [idx[0]]    for i in idx[1:]:        if i == cur[-1] + 1:            cur.append(i)        else:            groups.append(cur)            cur = [i]    groups.append(cur)    spans = []    for g in groups:        t0 = df.loc[g[0], "ts"]        t1 = df.loc[g[-1], "ts"]        span = (t1 - t0).total_seconds() / 60.0        if span >= float(min_span_minutes):            spans.append((span, t0, t1, int(len(g))))    spans.sort(key=lambda x: (-x[0], -x[3]))    if not spans:        best = max(groups, key=len)        t0 = df.loc[best[0], "ts"]        t1 = df.loc[best[-1], "ts"]        return {"found": True, "metric": metric, "start": str(t0), "end": str(t1), "points": int(len(best)), "note": "short anomaly span; consider lowering min_span_minutes"}    best = spans[0]    return {"found": True, "metric": metric, "start": str(best[1]), "end": str(best[2]), "minutes": float(best[0]), "points": int(best[3])}

We implement a rolling z-score detector to flag statistically significant deviations in key metrics over time. We expose tools that load observability inputs and summarize their structure to ground the agent’s reasoning. We detect and rank contiguous anomaly windows, returning the most meaningful incident span with clear temporal boundaries. Check out the FULL CODES here.

@tool def sql_investigate(query: str) -> dict:    try:        df = con.execute(query).df()        head = df.head(30)        return {            "rows": int(len(df)),            "columns": list(df.columns),            "preview": head.to_dict(orient="records")        }    except Exception as e:        return {"error": str(e)}   @tool def log_pattern_scan(window_start_iso: str, window_end_iso: str, top_k: int = 8) -> dict:    ws = pd.to_datetime(window_start_iso)    we = pd.to_datetime(window_end_iso)    df = logs_df[(logs_df["ts"] >= ws) & (logs_df["ts"] <= we)].copy()    if df.empty:        return {"rows": 0, "top_error_kinds": [], "top_services": [], "top_endpoints": []}    df["error_kind_norm"] = df["error_kind"].fillna("").replace("", "NONE")    err = df[df["level"].isin(["WARN","ERROR"])].copy()    top_err = err["error_kind_norm"].value_counts().head(int(top_k)).to_dict()    top_svc = err["service"].value_counts().head(int(top_k)).to_dict()    top_ep = err["endpoint"].value_counts().head(int(top_k)).to_dict()    by_region = err.groupby("region").size().sort_values(ascending=False).head(int(top_k)).to_dict()    p95_latency = float(np.percentile(df["latency_ms"].values, 95))    return {        "rows": int(len(df)),        "warn_error_rows": int(len(err)),        "p95_latency_ms": p95_latency,        "top_error_kinds": top_err,        "top_services": top_svc,        "top_endpoints": top_ep,        "error_by_region": by_region    }   @tool def propose_mitigations(hypothesis: str) -> dict:    h = hypothesis.lower()    mitigations = []    if "conn" in h or "pool" in h or "db" in h:        mitigations += [            {"action": "Increase DB connection pool size (bounded) and add backpressure at db-proxy", "owner": "Platform", "eta_days": 3},            {"action": "Add circuit breaker + adaptive timeouts between api-gateway and db-proxy", "owner": "Backend", "eta_days": 5},            {"action": "Tune query hotspots; add indexes for top offending endpoints", "owner": "Data/DBA", "eta_days": 7},        ]    if "timeout" in h or "upstream" in h:        mitigations += [            {"action": "Implement hedged requests for idempotent calls (carefully) and tighten retry budgets", "owner": "Backend", "eta_days": 6},            {"action": "Add upstream SLO-aware load shedding at api-gateway", "owner": "Platform", "eta_days": 7},        ]    if "cache" in h:        mitigations += [            {"action": "Add request coalescing and negative caching to prevent cache-miss storms", "owner": "Backend", "eta_days": 6},            {"action": "Prewarm cache for top endpoints during deploys", "owner": "SRE", "eta_days": 4},        ]    if not mitigations:        mitigations += [            {"action": "Add targeted dashboards and alerts for the suspected bottleneck metric", "owner": "SRE", "eta_days": 3},            {"action": "Run controlled load test to reproduce and validate the hypothesis", "owner": "Perf Eng", "eta_days": 5},        ]    mitigations = mitigations[:10]    return {"hypothesis": hypothesis, "mitigations": mitigations}   @tool def draft_postmortem(title: str, window_start_iso: str, window_end_iso: str, customer_impact: str, suspected_root_cause: str, key_facts_json: str, mitigations_json: str) -> dict:    try:        facts = json.loads(key_facts_json)    except Exception:        facts = {"note": "key_facts_json was not valid JSON"}    try:        mits = json.loads(mitigations_json)    except Exception:        mits = {"note": "mitigations_json was not valid JSON"}    doc = {        "title": title,        "date_utc": datetime.utcnow().strftime("%Y-%m-%d"),        "incident_window_utc": {"start": window_start_iso, "end": window_end_iso},        "customer_impact": customer_impact,        "suspected_root_cause": suspected_root_cause,        "detection": {            "how_detected": "Automated anomaly detection + error-rate spike triage",            "gaps": ["Add earlier saturation alerting", "Improve symptom-to-cause correlation dashboards"]        },        "timeline": [            {"t": window_start_iso, "event": "Symptoms begin (latency/error anomalies)"},            {"t": "T+10m", "event": "On-call begins triage; identifies top services/endpoints"},            {"t": "T+25m", "event": "Mitigation actions initiated (throttling/backpressure)"},            {"t": window_end_iso, "event": "Customer impact ends; metrics stabilize"},        ],        "key_facts": facts,        "corrective_actions": mits.get("mitigations", mits),        "followups": [            {"area": "Reliability", "task": "Add saturation signals + budget-based retries", "priority": "P1"},            {"area": "Observability", "task": "Add golden signals per service/endpoint", "priority": "P1"},            {"area": "Performance", "task": "Reproduce with load test and validate fix", "priority": "P2"},        ],        "appendix": {"notes": "Generated by a Haystack multi-agent workflow (non-RAG)."}    }    return {"postmortem_json": doc}   llm = OpenAIChatGenerator(model="gpt-4o-mini")   state_schema = {    "metrics_csv_path": {"type": str},    "logs_csv_path": {"type": str},    "metrics_summary": {"type": dict},    "logs_summary": {"type": dict},    "incident_window": {"type": dict},    "investigation_notes": {"type": list, "handler": merge_lists},    "hypothesis": {"type": str},    "key_facts": {"type": dict},    "mitigation_plan": {"type": dict},    "postmortem": {"type": dict}, }   profiler_prompt = """You are a specialist incident profiler. Goal: turn raw metrics/log summaries into crisp, high-signal findings. Rules: - Prefer calling tools over guessing. - Output must be a JSON object with keys: window, symptoms, top_contributors, hypothesis, key_facts. - Hypothesis must be falsifiable and mention at least one specific service and mechanism. """   writer_prompt = """You are a specialist postmortem writer. Goal: produce a high-quality postmortem JSON (not prose) using the provided evidence and mitigation plan. Rules: - Call tools only if needed. - Keep 'suspected_root_cause' specific and not generic. - Ensure corrective actions have owners and eta_days. """   coordinator_prompt = """You are an incident commander coordinating a non-RAG multi-agent workflow. You must: 1) Load inputs 2) Find an incident window (use p95_ms or error_rate) 3) Investigate with targeted SQL and log pattern scan 4) Ask the specialist profiler to synthesize evidence 5) Propose mitigations 6) Ask the specialist writer to draft a postmortem JSON Return a final response with: - A short executive summary (max 10 lines) - The postmortem JSON - A compact runbook checklist (bulleted) """   profiler_agent = Agent(    chat_generator=llm,    tools=[load_inputs, detect_incident_window, sql_investigate, log_pattern_scan],    system_prompt=profiler_prompt,    exit_conditions=["text"],    state_schema=state_schema )   writer_agent = Agent(    chat_generator=llm,    tools=[draft_postmortem],    system_prompt=writer_prompt,    exit_conditions=["text"],    state_schema=state_schema )   profiler_tool = ComponentTool(    component=profiler_agent,    name="profiler_specialist",    description="Synthesizes incident evidence into a falsifiable hypothesis and key facts (JSON output).",    outputs_to_string={"source": "last_message"} )   writer_tool = ComponentTool(    component=writer_agent,    name="postmortem_writer_specialist",    description="Drafts a postmortem JSON using title/window/impact/rca/facts/mitigations.",    outputs_to_string={"source": "last_message"} )   coordinator_agent = Agent(    chat_generator=llm,    tools=[        load_inputs,        detect_incident_window,        sql_investigate,        log_pattern_scan,        propose_mitigations,        profiler_tool,        writer_tool,        draft_postmortem    ],    system_prompt=coordinator_prompt,    exit_conditions=["text"],    state_schema=state_schema ) 

We define a suite of investigative, synthesis, and documentation tools that let agents query data, extract patterns, and propose concrete mitigations. We orchestrate specialist profiler and writer agents under a coordinator that drives an end-to-end, non-RAG incident workflow. We configure prompts, state schemas, and tool bridges so the system produces falsifiable hypotheses, actionable plans, and a structured postmortem.Check out the FULL CODES here.

profiler_agent.warm_up() writer_agent.warm_up() coordinator_agent.warm_up()   initial_state = {    "metrics_csv_path": metrics_path,    "logs_csv_path": logs_path,    "investigation_notes": [] }   task = """ We have an incident in the last 24h. Investigate using the provided CSVs. Constraints: - Do not use RAG or any document retriever/store. - Use tools + SQL to ground conclusions. - Produce a realistic postmortem JSON and a runbook checklist. """   result = coordinator_agent.run(    messages=[ChatMessage.from_user(task)],    state=State(schema=state_schema, data=initial_state) )   last = result["last_message"].text if "last_message" in result else result["messages"][-1].text print(last)

We warm up all agents to ensure tools, prompts, and state transitions are fully initialized before execution. We define the investigation task and initial state, then delegate end-to-end incident handling to the coordinator agent. We execute the workflow and surface the final executive summary, postmortem JSON, and runbook output.

In conclusion, we showed how Haystack supports sophisticated agentic patterns that scale in complexity without becoming fragile or hard to reason about. We demonstrated that, even within a notebook, we can express rich agent logic, maintain explicit state, and coordinate multiple components in a controlled and extensible way. By structuring the system this way, we placed ourselves in a strong position to iterate on more advanced behaviors, evaluate agent decisions, and evolve the tutorial into production-grade agentic workflows.


Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Leave a Reply

Your email address will not be published. Required fields are marked *