{"id":374,"date":"2026-02-07T13:59:59","date_gmt":"2026-02-07T05:59:59","guid":{"rendered":"https:\/\/connectword.dpdns.org\/?p=374"},"modified":"2026-02-07T13:59:59","modified_gmt":"2026-02-07T05:59:59","slug":"how-to-build-a-production-grade-agentic-ai-system-with-hybrid-retrieval-provenance-first-citations-repair-loops-and-episodic-memory","status":"publish","type":"post","link":"https:\/\/connectword.dpdns.org\/?p=374","title":{"rendered":"How to Build a Production-Grade Agentic AI System with Hybrid Retrieval, Provenance-First Citations, Repair Loops, and Episodic Memory"},"content":{"rendered":"<p>In this tutorial, we build an ultra-advanced agentic AI workflow that behaves like a production-grade research and reasoning system rather than a single prompt call. We ingest real web sources asynchronously, split them into provenance-tracked chunks, and run hybrid retrieval using both TF-IDF (sparse) and OpenAI embeddings (dense), then fuse results for higher recall and stability. We orchestrate multiple agents, planning, synthesis, and repair, while enforcing strict guardrails so every major claim is grounded in retrieved evidence, and we persist episodic memory. Hence, the system improves its strategy over time. Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Agentic%20AI%20Codes\/Ultra_Agentic_AI_Hybrid_Retrieval_Guardrails_Episodic_Memory_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">!pip -q install openai openai-agents pydantic httpx beautifulsoup4 lxml scikit-learn numpy\n\n\nimport os, re, json, time, getpass, asyncio, sqlite3, hashlib\nfrom typing import List, Dict, Tuple, Optional, Any\n\n\nimport numpy as np\nimport httpx\nfrom bs4 import BeautifulSoup\nfrom pydantic import BaseModel, Field\n\n\nfrom sklearn.feature_extraction.text import TfidfVectorizer\nfrom sklearn.metrics.pairwise import cosine_similarity\n\n\nfrom openai import AsyncOpenAI\nfrom agents import Agent, Runner, SQLiteSession\n\n\nif not os.environ.get(\"OPENAI_API_KEY\"):\n   os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"Enter your OpenAI API key: \")\nif not os.environ.get(\"OPENAI_API_KEY\"):\n   raise RuntimeError(\"OPENAI_API_KEY not provided.\")\nprint(\"<img decoding=\"async\" src=\"https:\/\/s.w.org\/images\/core\/emoji\/17.0.2\/72x72\/2705.png\" alt=\"\u2705\" class=\"wp-smiley\" \/> OpenAI API key loaded securely.\")\noa = AsyncOpenAI(api_key=os.environ[\"OPENAI_API_KEY\"])\n\n\ndef sha1(s: str) -&gt; str:\n   return hashlib.sha1(s.encode(\"utf-8\", errors=\"ignore\")).hexdigest()\n\n\ndef normalize_url(u: str) -&gt; str:\n   u = (u or \"\").strip()\n   return u.rstrip(\").,]\"'\")\n\n\ndef clean_html_to_text(html: str) -&gt; str:\n   soup = BeautifulSoup(html, \"lxml\")\n   for tag in soup([\"script\", \"style\", \"noscript\"]):\n       tag.decompose()\n   txt = soup.get_text(\"n\")\n   txt = re.sub(r\"n{3,}\", \"nn\", txt).strip()\n   txt = re.sub(r\"[ t]+\", \" \", txt)\n   return txt\n\n\ndef chunk_text(text: str, chunk_chars: int = 1600, overlap_chars: int = 320) -&gt; List[str]:\n   if not text:\n       return []\n   text = re.sub(r\"s+\", \" \", text).strip()\n   n = len(text)\n   step = max(1, chunk_chars - overlap_chars)\n   chunks = []\n   i = 0\n   while i &lt; n:\n       chunks.append(text[i:i + chunk_chars])\n       i += step\n   return chunks\n\n\ndef canonical_chunk_id(s: str) -&gt; str:\n   if s is None:\n       return \"\"\n   s = str(s).strip()\n   s = s.strip(\"&lt;&gt;\"'()[]{}\")\n   s = s.rstrip(\".,;:\")\n   return s\n\n\ndef inject_exec_summary_citations(exec_summary: str, citations: List[str], allowed_chunk_ids: List[str]) -&gt; str:\n   exec_summary = exec_summary or \"\"\n   cset = []\n   for c in citations:\n       c = canonical_chunk_id(c)\n       if c and c in allowed_chunk_ids and c not in cset:\n           cset.append(c)\n       if len(cset) &gt;= 2:\n           break\n   if len(cset) &lt; 2:\n       for c in allowed_chunk_ids:\n           if c not in cset:\n               cset.append(c)\n           if len(cset) &gt;= 2:\n               break\n   if len(cset) &gt;= 2:\n       needed = [c for c in cset if c not in exec_summary]\n       if needed:\n           exec_summary = exec_summary.strip()\n           if exec_summary and not exec_summary.endswith(\".\"):\n               exec_summary += \".\"\n           exec_summary += f\" (cite: {cset[0]}) (cite: {cset[1]})\"\n   return exec_summary<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We set up the environment, securely load the OpenAI API key, and initialize core utilities that everything else depends on. We define hashing, URL normalization, HTML cleaning, and chunking so all downstream steps operate on clean, consistent text. We also add deterministic helpers to normalize and inject citations, ensuring guardrails are always satisfied. Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Agentic%20AI%20Codes\/Ultra_Agentic_AI_Hybrid_Retrieval_Guardrails_Episodic_Memory_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">async def fetch_many(urls: List[str], timeout_s: float = 25.0, per_url_char_limit: int = 60000) -&gt; Dict[str, str]:\n   headers = {\"User-Agent\": \"Mozilla\/5.0 (AgenticAI\/4.2)\"}\n   urls = [normalize_url(u) for u in urls]\n   urls = [u for u in urls if u.startswith(\"http\")]\n   urls = list(dict.fromkeys(urls))\n   out: Dict[str, str] = {}\n   async with httpx.AsyncClient(timeout=timeout_s, follow_redirects=True, headers=headers) as client:\n       async def _one(url: str):\n           try:\n               r = await client.get(url)\n               r.raise_for_status()\n               out[url] = clean_html_to_text(r.text)[:per_url_char_limit]\n           except Exception as e:\n               out[url] = f\"__FETCH_ERROR__ {type(e).__name__}: {e}\"\n       await asyncio.gather(*[_one(u) for u in urls])\n   return out\n\n\ndef dedupe_texts(sources: Dict[str, str]) -&gt; Dict[str, str]:\n   seen = set()\n   out = {}\n   for url, txt in sources.items():\n       if not isinstance(txt, str) or txt.startswith(\"__FETCH_ERROR__\"):\n           continue\n       h = sha1(txt[:25000])\n       if h in seen:\n           continue\n       seen.add(h)\n       out[url] = txt\n   return out\n\n\nclass ChunkRecord(BaseModel):\n   chunk_id: str\n   url: str\n   chunk_index: int\n   text: str\n\n\nclass RetrievalHit(BaseModel):\n   chunk_id: str\n   url: str\n   chunk_index: int\n   score_sparse: float = 0.0\n   score_dense: float = 0.0\n   score_fused: float = 0.0\n   text: str\n\n\nclass EvidencePack(BaseModel):\n   query: str\n   hits: List[RetrievalHit]<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We asynchronously fetch multiple web sources in parallel and aggressively deduplicate content to avoid redundant evidence. We convert raw pages into structured text and define the core data models that represent chunks and retrieval hits. We ensure every piece of text is traceable back to a specific source and chunk index. Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Agentic%20AI%20Codes\/Ultra_Agentic_AI_Hybrid_Retrieval_Guardrails_Episodic_Memory_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">EPISODE_DB = \"agentic_episode_memory.db\"\n\n\ndef episode_db_init():\n   con = sqlite3.connect(EPISODE_DB)\n   cur = con.cursor()\n   cur.execute(\"\"\"\n   CREATE TABLE IF NOT EXISTS episodes (\n       id INTEGER PRIMARY KEY AUTOINCREMENT,\n       ts INTEGER NOT NULL,\n       question TEXT NOT NULL,\n       urls_json TEXT NOT NULL,\n       retrieval_queries_json TEXT NOT NULL,\n       useful_sources_json TEXT NOT NULL\n   )\n   \"\"\")\n   con.commit()\n   con.close()\n\n\ndef episode_store(question: str, urls: List[str], retrieval_queries: List[str], useful_sources: List[str]):\n   con = sqlite3.connect(EPISODE_DB)\n   cur = con.cursor()\n   cur.execute(\n       \"INSERT INTO episodes(ts, question, urls_json, retrieval_queries_json, useful_sources_json) VALUES(?,?,?,?,?)\",\n       (int(time.time()), question, json.dumps(urls), json.dumps(retrieval_queries), json.dumps(useful_sources)),\n   )\n   con.commit()\n   con.close()\n\n\ndef episode_recall(question: str, top_k: int = 2) -&gt; List[Dict[str, Any]]:\n   con = sqlite3.connect(EPISODE_DB)\n   cur = con.cursor()\n   cur.execute(\"SELECT ts, question, urls_json, retrieval_queries_json, useful_sources_json FROM episodes ORDER BY ts DESC LIMIT 200\")\n   rows = cur.fetchall()\n   con.close()\n   q_tokens = set(re.findall(r\"[A-Za-z]{3,}\", (question or \"\").lower()))\n   scored = []\n   for ts, q2, u, rq, us in rows:\n       t2 = set(re.findall(r\"[A-Za-z]{3,}\", (q2 or \"\").lower()))\n       if not t2:\n           continue\n       score = len(q_tokens &amp; t2) \/ max(1, len(q_tokens))\n       if score &gt; 0:\n           scored.append((score, {\n               \"ts\": ts,\n               \"question\": q2,\n               \"urls\": json.loads(u),\n               \"retrieval_queries\": json.loads(rq),\n               \"useful_sources\": json.loads(us),\n           }))\n   scored.sort(key=lambda x: x[0], reverse=True)\n   return [x[1] for x in scored[:top_k]]\n\n\nepisode_db_init()<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We introduce episodic memory backed by SQLite so the system can recall what worked in previous runs. We store questions, retrieval strategies, and useful sources to guide future planning. We also implement lightweight similarity-based recall to bias the system toward historically effective patterns. Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Agentic%20AI%20Codes\/Ultra_Agentic_AI_Hybrid_Retrieval_Guardrails_Episodic_Memory_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">class HybridIndex:\n   def __init__(self):\n       self.records: List[ChunkRecord] = []\n       self.tfidf: Optional[TfidfVectorizer] = None\n       self.tfidf_mat = None\n       self.emb_mat: Optional[np.ndarray] = None\n\n\n   def build_sparse(self):\n       corpus = [r.text for r in self.records] if self.records else [\"\"]\n       self.tfidf = TfidfVectorizer(stop_words=\"english\", ngram_range=(1, 2), max_features=80000)\n       self.tfidf_mat = self.tfidf.fit_transform(corpus)\n\n\n   def search_sparse(self, query: str, k: int) -&gt; List[Tuple[int, float]]:\n       if not self.records or self.tfidf is None or self.tfidf_mat is None:\n           return []\n       qv = self.tfidf.transform([query])\n       sims = cosine_similarity(qv, self.tfidf_mat).flatten()\n       top = np.argsort(-sims)[:k]\n       return [(int(i), float(sims[i])) for i in top]\n\n\n   def set_dense(self, mat: np.ndarray):\n       self.emb_mat = mat.astype(np.float32)\n\n\n   def search_dense(self, q_emb: np.ndarray, k: int) -&gt; List[Tuple[int, float]]:\n       if self.emb_mat is None or not self.records:\n           return []\n       M = self.emb_mat\n       q = q_emb.astype(np.float32).reshape(1, -1)\n       M_norm = M \/ (np.linalg.norm(M, axis=1, keepdims=True) + 1e-9)\n       q_norm = q \/ (np.linalg.norm(q) + 1e-9)\n       sims = (M_norm @ q_norm.T).flatten()\n       top = np.argsort(-sims)[:k]\n       return [(int(i), float(sims[i])) for i in top]\n\n\ndef rrf_fuse(rankings: List[List[int]], k: int = 60) -&gt; Dict[int, float]:\n   scores: Dict[int, float] = {}\n   for r in rankings:\n       for pos, idx in enumerate(r, start=1):\n           scores[idx] = scores.get(idx, 0.0) + 1.0 \/ (k + pos)\n   return scores\n\n\nHYBRID = HybridIndex()\nALLOWED_URLS: List[str] = []\n\n\nEMBED_MODEL = \"text-embedding-3-small\"\n\n\nasync def embed_batch(texts: List[str]) -&gt; np.ndarray:\n   resp = await oa.embeddings.create(model=EMBED_MODEL, input=texts, encoding_format=\"float\")\n   vecs = [np.array(item.embedding, dtype=np.float32) for item in resp.data]\n   return np.vstack(vecs) if vecs else np.zeros((0, 0), dtype=np.float32)\n\n\nasync def embed_texts(texts: List[str], batch_size: int = 96, max_concurrency: int = 3) -&gt; np.ndarray:\n   sem = asyncio.Semaphore(max_concurrency)\n   mats: List[Tuple[int, np.ndarray]] = []\n\n\n   async def _one(start: int, batch: List[str]):\n       async with sem:\n           m = await embed_batch(batch)\n           mats.append((start, m))\n\n\n   tasks = []\n   for start in range(0, len(texts), batch_size):\n       batch = [t[:7000] for t in texts[start:start + batch_size]]\n       tasks.append(_one(start, batch))\n   await asyncio.gather(*tasks)\n\n\n   mats.sort(key=lambda x: x[0])\n   emb = np.vstack([m for _, m in mats]) if mats else np.zeros((len(texts), 0), dtype=np.float32)\n   if emb.shape[0] != len(texts):\n       raise RuntimeError(f\"Embedding rows mismatch: got {emb.shape[0]} expected {len(texts)}\")\n   return emb\n\n\nasync def embed_query(query: str) -&gt; np.ndarray:\n   m = await embed_batch([query[:7000]])\n   return m[0] if m.shape[0] else np.zeros((0,), dtype=np.float32)\n\n\nasync def build_index(urls: List[str], max_chunks_per_url: int = 60):\n   global ALLOWED_URLS\n   fetched = await fetch_many(urls)\n   fetched = dedupe_texts(fetched)\n\n\n   records: List[ChunkRecord] = []\n   allowed: List[str] = []\n\n\n   for url, txt in fetched.items():\n       if not isinstance(txt, str) or txt.startswith(\"__FETCH_ERROR__\"):\n           continue\n       allowed.append(url)\n       chunks = chunk_text(txt)[:max_chunks_per_url]\n       for i, ch in enumerate(chunks):\n           cid = f\"{sha1(url)}:{i}\"\n           records.append(ChunkRecord(chunk_id=cid, url=url, chunk_index=i, text=ch))\n\n\n   if not records:\n       err_view = {normalize_url(u): fetched.get(normalize_url(u), \"\") for u in urls}\n       raise RuntimeError(\"No sources fetched successfully.n\" + json.dumps(err_view, indent=2)[:4000])\n\n\n   ALLOWED_URLS = allowed\n   HYBRID.records = records\n   HYBRID.build_sparse()\n\n\n   texts = [r.text for r in HYBRID.records]\n   emb = await embed_texts(texts, batch_size=96, max_concurrency=3)\n   HYBRID.set_dense(emb)<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We build a hybrid retrieval index that combines sparse TF-IDF search with dense OpenAI embeddings. We enable reciprocal rank fusion, so that sparse and dense signals complement each other rather than compete. We construct the index once per run and reuse it across all retrieval queries for efficiency. Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Agentic%20AI%20Codes\/Ultra_Agentic_AI_Hybrid_Retrieval_Guardrails_Episodic_Memory_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">def build_evidence_pack(query: str, sparse: List[Tuple[int,float]], dense: List[Tuple[int,float]], k: int = 10) -&gt; EvidencePack:\n   sparse_rank = [i for i,_ in sparse]\n   dense_rank  = [i for i,_ in dense]\n   sparse_scores = {i:s for i,s in sparse}\n   dense_scores  = {i:s for i,s in dense}\n   fused = rrf_fuse([sparse_rank, dense_rank], k=60) if dense_rank else rrf_fuse([sparse_rank], k=60)\n   top = sorted(fused.keys(), key=lambda i: fused[i], reverse=True)[:k]\n\n\n   hits: List[RetrievalHit] = []\n   for idx in top:\n       r = HYBRID.records[idx]\n       hits.append(RetrievalHit(\n           chunk_id=r.chunk_id, url=r.url, chunk_index=r.chunk_index,\n           score_sparse=float(sparse_scores.get(idx, 0.0)),\n           score_dense=float(dense_scores.get(idx, 0.0)),\n           score_fused=float(fused.get(idx, 0.0)),\n           text=r.text\n       ))\n   return EvidencePack(query=query, hits=hits)\n\n\nasync def gather_evidence(queries: List[str], per_query_k: int = 10, sparse_k: int = 60, dense_k: int = 60):\n   evidence: List[EvidencePack] = []\n   useful_sources_count: Dict[str, int] = {}\n   all_chunk_ids: List[str] = []\n\n\n   for q in queries:\n       sparse = HYBRID.search_sparse(q, k=sparse_k)\n       q_emb = await embed_query(q)\n       dense = HYBRID.search_dense(q_emb, k=dense_k)\n       pack = build_evidence_pack(q, sparse, dense, k=per_query_k)\n       evidence.append(pack)\n       for h in pack.hits[:6]:\n           useful_sources_count[h.url] = useful_sources_count.get(h.url, 0) + 1\n       for h in pack.hits:\n           all_chunk_ids.append(h.chunk_id)\n\n\n   useful_sources = sorted(useful_sources_count.keys(), key=lambda u: useful_sources_count[u], reverse=True)\n   all_chunk_ids = sorted(list(dict.fromkeys(all_chunk_ids)))\n   return evidence, useful_sources[:8], all_chunk_ids\n\n\nclass Plan(BaseModel):\n   objective: str\n   subtasks: List[str]\n   retrieval_queries: List[str]\n   acceptance_checks: List[str]\n\n\nclass UltraAnswer(BaseModel):\n   title: str\n   executive_summary: str\n   architecture: List[str]\n   retrieval_strategy: List[str]\n   agent_graph: List[str]\n   implementation_notes: List[str]\n   risks_and_limits: List[str]\n   citations: List[str]\n   sources: List[str]\n\n\ndef normalize_answer(ans: UltraAnswer, allowed_chunk_ids: List[str]) -&gt; UltraAnswer:\n   data = ans.model_dump()\n   data[\"citations\"] = [canonical_chunk_id(x) for x in (data.get(\"citations\") or [])]\n   data[\"citations\"] = [x for x in data[\"citations\"] if x in allowed_chunk_ids]\n   data[\"executive_summary\"] = inject_exec_summary_citations(data.get(\"executive_summary\",\"\"), data[\"citations\"], allowed_chunk_ids)\n   return UltraAnswer(**data)\n\n\ndef validate_ultra(ans: UltraAnswer, allowed_chunk_ids: List[str]) -&gt; None:\n   extras = [u for u in ans.sources if u not in ALLOWED_URLS]\n   if extras:\n       raise ValueError(f\"Non-allowed sources in output: {extras}\")\n\n\n   cset = set(ans.citations or [])\n   missing = [cid for cid in cset if cid not in set(allowed_chunk_ids)]\n   if missing:\n       raise ValueError(f\"Citations reference unknown chunk_ids (not retrieved): {missing}\")\n\n\n   if len(cset) &lt; 6:\n       raise ValueError(\"Need at least 6 distinct chunk_id citations in ultra mode.\")\n\n\n   es_text = ans.executive_summary or \"\"\n   es_count = sum(1 for cid in cset if cid in es_text)\n   if es_count &lt; 2:\n       raise ValueError(\"Executive summary must include at least 2 chunk_id citations verbatim.\")\n\n\nPLANNER = Agent(\n   name=\"Planner\",\n   model=\"gpt-4o-mini\",\n   instructions=(\n       \"Return a technical Plan schema.n\"\n       \"Make 10-16 retrieval_queries.n\"\n       \"Acceptance must include: at least 6 citations and exec_summary contains at least 2 citations verbatim.\"\n   ),\n   output_type=Plan,\n)\n\n\nSYNTHESIZER = Agent(\n   name=\"Synthesizer\",\n   model=\"gpt-4o-mini\",\n   instructions=(\n       \"Return UltraAnswer schema.n\"\n       \"Hard constraints:n\"\n       \"- executive_summary MUST include at least TWO citations verbatim as: (cite: &lt;chunk_id&gt;).n\"\n       \"- citations must be chosen ONLY from ALLOWED_CHUNK_IDS list.n\"\n       \"- citations list must include at least 6 unique chunk_ids.n\"\n       \"- sources must be subset of allowed URLs.n\"\n   ),\n   output_type=UltraAnswer,\n)\n\n\nFIXER = Agent(\n   name=\"Fixer\",\n   model=\"gpt-4o-mini\",\n   instructions=(\n       \"Repair to satisfy guardrails.n\"\n       \"Ensure executive_summary includes at least TWO citations verbatim.n\"\n       \"Choose citations ONLY from ALLOWED_CHUNK_IDS list.n\"\n       \"Return UltraAnswer schema.\"\n   ),\n   output_type=UltraAnswer,\n)\n\n\nsession = SQLiteSession(\"ultra_agentic_user\", \"ultra_agentic_session.db\")<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We gather evidence by running multiple targeted queries, fusing sparse and dense results, and assembling evidence packs with scores and provenance. We define strict schemas for plans and final answers, then normalize and validate citations against retrieved chunk IDs. We enforce hard guardrails so every answer remains grounded and auditable. Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Agentic%20AI%20Codes\/Ultra_Agentic_AI_Hybrid_Retrieval_Guardrails_Episodic_Memory_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">async def run_ultra_agentic(question: str, urls: List[str], max_repairs: int = 2) -&gt; UltraAnswer:\n   await build_index(urls)\n   recall_hint = json.dumps(episode_recall(question, top_k=2), indent=2)[:2000]\n\n\n   plan_res = await Runner.run(\n       PLANNER,\n       f\"Question:n{question}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nnRecall:n{recall_hint}n\",\n       session=session\n   )\n   plan: Plan = plan_res.final_output\n   queries = (plan.retrieval_queries or [])[:16]\n\n\n   evidence_packs, useful_sources, allowed_chunk_ids = await gather_evidence(queries)\n\n\n   evidence_json = json.dumps([p.model_dump() for p in evidence_packs], indent=2)[:16000]\n   allowed_chunk_ids_json = json.dumps(allowed_chunk_ids[:200], indent=2)\n\n\n   draft_res = await Runner.run(\n       SYNTHESIZER,\n       f\"Question:n{question}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nn\"\n       f\"ALLOWED_CHUNK_IDS:n{allowed_chunk_ids_json}nn\"\n       f\"Evidence packs:n{evidence_json}nn\"\n       \"Return UltraAnswer.\",\n       session=session\n   )\n   draft = normalize_answer(draft_res.final_output, allowed_chunk_ids)\n\n\n   last_err = None\n   for i in range(max_repairs + 1):\n       try:\n           validate_ultra(draft, allowed_chunk_ids)\n           episode_store(question, ALLOWED_URLS, plan.retrieval_queries, useful_sources)\n           return draft\n       except Exception as e:\n           last_err = str(e)\n           if i &gt;= max_repairs:\n               draft = normalize_answer(draft, allowed_chunk_ids)\n               validate_ultra(draft, allowed_chunk_ids)\n               return draft\n\n\n           fixer_res = await Runner.run(\n               FIXER,\n               f\"Question:n{question}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nn\"\n               f\"ALLOWED_CHUNK_IDS:n{allowed_chunk_ids_json}nn\"\n               f\"Guardrail error:n{last_err}nn\"\n               f\"Draft:n{json.dumps(draft.model_dump(), indent=2)[:12000]}nn\"\n               f\"Evidence packs:n{evidence_json}nn\"\n               \"Return corrected UltraAnswer that passes guardrails.\",\n               session=session\n           )\n           draft = normalize_answer(fixer_res.final_output, allowed_chunk_ids)\n\n\n   raise RuntimeError(f\"Unexpected failure: {last_err}\")\n\n\nquestion = (\n   \"Design a production-lean but advanced agentic AI workflow in Python with hybrid retrieval, \"\n   \"provenance-first citations, critique-and-repair loops, and episodic memory. \"\n   \"Explain why each layer matters, failure modes, and evaluation.\"\n)\n\n\nurls = [\n   \"https:\/\/openai.github.io\/openai-agents-python\/\",\n   \"https:\/\/openai.github.io\/openai-agents-python\/agents\/\",\n   \"https:\/\/openai.github.io\/openai-agents-python\/running_agents\/\",\n   \"https:\/\/github.com\/openai\/openai-agents-python\",\n]\n\n\nans = await run_ultra_agentic(question, urls, max_repairs=2)\n\n\nprint(\"nTITLE:n\", ans.title)\nprint(\"nEXECUTIVE SUMMARY:n\", ans.executive_summary)\nprint(\"nARCHITECTURE:\")\nfor x in ans.architecture:\n   print(\"-\", x)\nprint(\"nRETRIEVAL STRATEGY:\")\nfor x in ans.retrieval_strategy:\n   print(\"-\", x)\nprint(\"nAGENT GRAPH:\")\nfor x in ans.agent_graph:\n   print(\"-\", x)\nprint(\"nIMPLEMENTATION NOTES:\")\nfor x in ans.implementation_notes:\n   print(\"-\", x)\nprint(\"nRISKS &amp; LIMITS:\")\nfor x in ans.risks_and_limits:\n   print(\"-\", x)\nprint(\"nCITATIONS (chunk_ids):\")\nfor c in ans.citations:\n   print(\"-\", c)\nprint(\"nSOURCES:\")\nfor s in ans.sources:\n   print(\"-\", s)<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We orchestrate the full agentic loop by chaining planning, synthesis, validation, and repair in an async-safe pipeline. We automatically retry and fix outputs until they pass all constraints without human intervention. We finish by running a full example and printing a fully grounded, production-ready agentic response.<\/p>\n<p>In conclusion, we developed a comprehensive agentic pipeline robust to common failure modes: unstable embedding shapes, citation drift, and missing grounding in executive summaries. We validated outputs against allowlisted sources, retrieved chunk IDs, automatically normalized citations, and injected deterministic citations when needed to guarantee compliance without sacrificing correctness. By combining hybrid retrieval, critique-and-repair loops, and episodic memory, we created a reusable foundation we can extend with stronger evaluations (claim-to-evidence coverage scoring, adversarial red-teaming, and regression tests) to continuously harden the system as it scales to new domains and larger corpora.<\/p>\n<hr class=\"wp-block-separator has-alpha-channel-opacity\" \/>\n<p>Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Agentic%20AI%20Codes\/Ultra_Agentic_AI_Hybrid_Retrieval_Guardrails_Episodic_Memory_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.\u00a0Also,\u00a0feel free to follow us on\u00a0<strong><a href=\"https:\/\/x.com\/intent\/follow?screen_name=marktechpost\" target=\"_blank\" rel=\"noreferrer noopener\"><mark>Twitter<\/mark><\/a><\/strong>\u00a0and don\u2019t forget to join our\u00a0<strong><a href=\"https:\/\/www.reddit.com\/r\/machinelearningnews\/\" target=\"_blank\" rel=\"noreferrer noopener\">100k+ ML SubReddit<\/a><\/strong>\u00a0and Subscribe to\u00a0<strong><a href=\"https:\/\/www.aidevsignals.com\/\" target=\"_blank\" rel=\"noreferrer noopener\">our Newsletter<\/a><\/strong>. Wait! are you on telegram?\u00a0<strong><a href=\"https:\/\/t.me\/machinelearningresearchnews\" target=\"_blank\" rel=\"noreferrer noopener\">now you can join us on telegram as well.<\/a><\/strong><\/p>\n<p>The post <a href=\"https:\/\/www.marktechpost.com\/2026\/02\/06\/how-to-build-a-production-grade-agentic-ai-system-with-hybrid-retrieval-provenance-first-citations-repair-loops-and-episodic-memory\/\">How to Build a Production-Grade Agentic AI System with Hybrid Retrieval, Provenance-First Citations, Repair Loops, and Episodic Memory<\/a> appeared first on <a href=\"https:\/\/www.marktechpost.com\/\">MarkTechPost<\/a>.<\/p>","protected":false},"excerpt":{"rendered":"<p>In this tutorial, we build an &hellip;<\/p>\n","protected":false},"author":1,"featured_media":29,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-374","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/posts\/374","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=374"}],"version-history":[{"count":0,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/posts\/374\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/media\/29"}],"wp:attachment":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=374"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=374"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=374"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}