{"id":497,"date":"2026-03-02T03:20:17","date_gmt":"2026-03-01T19:20:17","guid":{"rendered":"https:\/\/connectword.dpdns.org\/?p=497"},"modified":"2026-03-02T03:20:17","modified_gmt":"2026-03-01T19:20:17","slug":"how-to-design-a-production-grade-multi-agent-communication-system-using-langgraph-structured-message-bus-acp-logging-and-persistent-shared-state-architecture","status":"publish","type":"post","link":"https:\/\/connectword.dpdns.org\/?p=497","title":{"rendered":"How to Design a Production-Grade Multi-Agent Communication System Using LangGraph Structured Message Bus, ACP Logging, and Persistent Shared State Architecture"},"content":{"rendered":"<p>In this tutorial, we build an advanced multi-agent communication system using a structured message bus architecture powered by LangGraph and Pydantic. We define a strict ACP-style message schema that allows agents to communicate via a shared state rather than calling each other directly, enabling modularity, traceability, and production-grade orchestration. We implement three specialized agents, a Planner, Executor, and Validator, that coordinate through structured messages, persistent state, and routing logic. We also integrate SQLite-based persistence to provide durable memory across executions and visualize the agent communication flow to understand how messages propagate through the system.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">!pip -q install -U \"pydantic==2.12.3\"\n!pip -q install -U langgraph langchain-core networkx matplotlib\n!pip -q install -U langgraph-checkpoint-sqlite\n\n\nimport os\nimport json\nimport uuid\nimport sqlite3\nfrom datetime import datetime, timezone\nfrom typing import Any, Dict, List, Literal, Optional, Tuple\n\n\nfrom pydantic import BaseModel, Field\n\n\nimport networkx as nx\nimport matplotlib.pyplot as plt\n\n\nfrom langgraph.graph import StateGraph, END\nfrom langgraph.checkpoint.sqlite import SqliteSaver\n\n\n\n\nRole = Literal[\"planner\", \"executor\", \"validator\", \"user\", \"system\"]\nMsgType = Literal[\"task\", \"plan\", \"result\", \"validation\", \"error\", \"control\"]\n\n\nclass ACPMessage(BaseModel):\n   msg_id: str = Field(default_factory=lambda: str(uuid.uuid4()))\n   ts: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat().replace(\"+00:00\", \"Z\"))\n   sender: Role\n   receiver: Role\n   msg_type: MsgType\n   content: str\n   meta: Dict[str, Any] = Field(default_factory=dict)\n   trace: Dict[str, Any] = Field(default_factory=dict)\n\n\ndef acp_log_path() -&gt; str:\n   os.makedirs(\"acp_logs\", exist_ok=True)\n   return os.path.join(\"acp_logs\", \"acp_messages.jsonl\")\n\n\ndef append_acp_log(m: ACPMessage) -&gt; None:\n   with open(acp_log_path(), \"a\", encoding=\"utf-8\") as f:\n       f.write(m.model_dump_json() + \"n\")<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We install and import all the required libraries needed to build a structured multi-agent communication system. We define the ACP-style message schema using Pydantic, which allows us to enforce a strict and structured format for agent communication. We also implement structured logging to persist every message exchanged between agents, enabling traceability and observability of the system.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">class BusState(BaseModel):\n   goal: str = \"\"\n   done: bool = False\n   errors: List[str] = Field(default_factory=list)\n   mailbox: List[ACPMessage] = Field(default_factory=list)\n   edges: List[Tuple[str, str, str]] = Field(default_factory=list)\n   active_role: Role = \"user\"\n   step: int = 0\n\n\n\n\ndef bus_update(\n   state: BusState,\n   sender: Role,\n   receiver: Role,\n   msg_type: MsgType,\n   content: str,\n   meta: Optional[Dict[str, Any]] = None,\n   trace: Optional[Dict[str, Any]] = None,\n) -&gt; Dict[str, Any]:\n   m = ACPMessage(\n       sender=sender,\n       receiver=receiver,\n       msg_type=msg_type,\n       content=content,\n       meta=meta or {},\n       trace=trace or {},\n   )\n   append_acp_log(m)\n   return {\n       \"goal\": state.goal,\n       \"done\": state.done,\n       \"errors\": state.errors,\n       \"mailbox\": state.mailbox + [m],\n       \"edges\": state.edges + [(sender, receiver, msg_type)],\n       \"active_role\": receiver,\n       \"step\": state.step + 1,\n   }<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We define the shared state structure that acts as the centralized message bus for all agents. We implement the BusState class to store the goal, mailbox, routing information, and execution progress. We also create the bus_update function, which allows us to generate structured messages, update the shared state, and consistently persist message logs.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">def planner_agent(state_dict: Dict[str, Any]) -&gt; Dict[str, Any]:\n   state = BusState.model_validate(state_dict)\n   goal = state.goal.strip()\n   if not goal:\n       return bus_update(state, \"planner\", \"validator\", \"error\", \"No goal provided.\", meta={\"reason\": \"empty_goal\"})\n   plan = [\n       \"Interpret the goal and extract requirements.\",\n       \"Decide an execution strategy with clear outputs.\",\n       \"Ask Executor to produce the result.\",\n       \"Ask Validator to check correctness + completeness.\",\n   ]\n   plan_text = \"n\".join([f\"{i+1}. {p}\" for i, p in enumerate(plan)])\n   return bus_update(\n       state,\n       \"planner\",\n       \"executor\",\n       \"plan\",\n       plan_text,\n       meta={\"goal\": goal, \"plan_steps\": len(plan)},\n       trace={\"policy\": \"deterministic_planner_v1\"},\n   )\n\n\n\n\ndef executor_agent(state_dict: Dict[str, Any]) -&gt; Dict[str, Any]:\n   state = BusState.model_validate(state_dict)\n   goal = state.goal.strip()\n   latest_plan = None\n   for m in reversed(state.mailbox):\n       if m.receiver == \"executor\" and m.msg_type == \"plan\":\n           latest_plan = m.content\n           break\n   result = {\n       \"goal\": goal,\n       \"assumptions\": [\n           \"We can produce a concise, actionable output.\",\n           \"We can validate via rule-based checks.\",\n       ],\n       \"output\": f\"Executed task for goal: {goal}\",\n       \"deliverables\": [\n           \"A clear summary\",\n           \"A step-by-step action list\",\n           \"Any constraints and edge cases\",\n       ],\n       \"plan_seen\": bool(latest_plan),\n   }\n   result_text = json.dumps(result, indent=2)\n   return bus_update(\n       state,\n       \"executor\",\n       \"validator\",\n       \"result\",\n       result_text,\n       meta={\"artifact_type\": \"json\", \"bytes\": len(result_text.encode(\"utf-8\"))},\n       trace={\"policy\": \"deterministic_executor_v1\"},\n   )<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We implement the Planner and Executor agents, which handle task planning and execution. We design the Planner agent to interpret the goal and generate a structured execution plan, which is then passed through the message bus. We implement the Executor agent to read the plan, execute it, and produce a structured result artifact that downstream agents can validate.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">def validator_agent(state_dict: Dict[str, Any]) -&gt; Dict[str, Any]:\n   state = BusState.model_validate(state_dict)\n   goal = state.goal.strip()\n   latest_result = None\n   for m in reversed(state.mailbox):\n       if m.receiver == \"validator\" and m.msg_type in (\"result\", \"error\"):\n           latest_result = m\n           break\n   if latest_result is None:\n       upd = bus_update(state, \"validator\", \"planner\", \"error\", \"No result to validate.\", meta={\"reason\": \"missing_result\"})\n       upd[\"done\"] = True\n       upd[\"errors\"] = state.errors + [\"missing_result\"]\n       return upd\n   if latest_result.msg_type == \"error\":\n       upd = bus_update(\n           state,\n           \"validator\",\n           \"planner\",\n           \"validation\",\n           f\"Validation failed because upstream error occurred: {latest_result.content}\",\n           meta={\"status\": \"fail\"},\n       )\n       upd[\"done\"] = True\n       upd[\"errors\"] = state.errors + [latest_result.content]\n       return upd\n   try:\n       parsed = json.loads(latest_result.content)\n   except Exception as e:\n       upd = bus_update(\n           state,\n           \"validator\",\n           \"planner\",\n           \"validation\",\n           f\"Result is not valid JSON: {e}\",\n           meta={\"status\": \"fail\"},\n       )\n       upd[\"done\"] = True\n       upd[\"errors\"] = state.errors + [f\"invalid_json: {e}\"]\n       return upd\n   issues = []\n   if parsed.get(\"goal\") != goal:\n       issues.append(\"Result.goal does not match input goal.\")\n   if \"deliverables\" not in parsed or not isinstance(parsed[\"deliverables\"], list) or len(parsed[\"deliverables\"]) == 0:\n       issues.append(\"Missing or empty deliverables list.\")\n   if issues:\n       upd = bus_update(\n           state,\n           \"validator\",\n           \"planner\",\n           \"validation\",\n           \"Validation failed:n- \" + \"n- \".join(issues),\n           meta={\"status\": \"fail\", \"issues\": issues},\n       )\n       upd[\"done\"] = True\n       upd[\"errors\"] = state.errors + issues\n       return upd\n   upd = bus_update(\n       state,\n       \"validator\",\n       \"user\",\n       \"validation\",\n       \"Validation passed <img decoding=\"async\" src=\"https:\/\/s.w.org\/images\/core\/emoji\/17.0.2\/72x72\/2705.png\" alt=\"\u2705\" class=\"wp-smiley\" \/> Result looks consistent and complete.\",\n       meta={\"status\": \"pass\"},\n   )\n   upd[\"done\"] = True\n   upd[\"errors\"] = state.errors\n   return upd\n\n\n\n\ndef route_next(state_dict: Dict[str, Any]) -&gt; str:\n   if state_dict.get(\"done\", False):\n       return END\n   role = state_dict.get(\"active_role\", \"user\")\n   if role == \"planner\":\n       return \"planner\"\n   if role == \"executor\":\n       return \"executor\"\n   if role == \"validator\":\n       return \"validator\"\n   return END<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We implement the Validator agent and the routing logic that controls agent execution flow. We design the Validator to inspect the execution results, verify correctness, and generate validation outcomes through structured checks. We also implement the routing function that dynamically determines which agent should execute next, enabling coordinated multi-agent orchestration.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">graph = StateGraph(dict)\n\n\ngraph.add_node(\"planner\", planner_agent)\ngraph.add_node(\"executor\", executor_agent)\ngraph.add_node(\"validator\", validator_agent)\n\n\ngraph.set_entry_point(\"planner\")\n\n\ngraph.add_conditional_edges(\"planner\", route_next, {\"planner\": \"planner\", \"executor\": \"executor\", \"validator\": \"validator\", END: END})\ngraph.add_conditional_edges(\"executor\", route_next, {\"planner\": \"planner\", \"executor\": \"executor\", \"validator\": \"validator\", END: END})\ngraph.add_conditional_edges(\"validator\", route_next, {\"planner\": \"planner\", \"executor\": \"executor\", \"validator\": \"validator\", END: END})\n\n\nos.makedirs(\"checkpoints\", exist_ok=True)\ndb_path = \"checkpoints\/langgraph_bus.sqlite\"\nconn = sqlite3.connect(db_path, check_same_thread=False)\ncheckpointer = SqliteSaver(conn)\n\n\napp = graph.compile(checkpointer=checkpointer)\n\n\n\n\ndef run_thread(goal: str, thread_id: str) -&gt; BusState:\n   init = BusState(goal=goal, active_role=\"planner\", done=False).model_dump()\n   final_state_dict = app.invoke(init, config={\"configurable\": {\"thread_id\": thread_id}})\n   return BusState.model_validate(final_state_dict)\n\n\n\n\nthread_id = \"demo-thread-001\"\ngoal = \"Design an ACP-style message bus where planner\/executor\/validator coordinate through shared state.\"\n\n\nfinal_state = run_thread(goal, thread_id)\nprint(\"Done:\", final_state.done)\nprint(\"Steps:\", final_state.step)\nprint(\"Errors:\", final_state.errors)\n\n\nprint(\"nLast 5 messages:\")\nfor m in final_state.mailbox[-5:]:\n   print(f\"- [{m.msg_type}] {m.sender} -&gt; {m.receiver}: {m.content[:80]}\")\n\n\nsnapshot = checkpointer.get_tuple({\"configurable\": {\"thread_id\": thread_id}})\ncp = snapshot.checkpoint or {}\ncv = cp.get(\"channel_values\", {}) or {}\nsv = cp.get(\"state\", {}) or {}\nvals = cv if isinstance(cv, dict) and len(cv) else sv if isinstance(sv, dict) else {}\n\n\nprint(\"nCheckpoint keys:\", list(cp.keys()))\nif isinstance(cv, dict):\n   print(\"channel_values keys:\", list(cv.keys())[:30])\nif isinstance(sv, dict):\n   print(\"state keys:\", list(sv.keys())[:30])\n\n\nprint(\"nPersisted step (best-effort):\", vals.get(\"step\", \"NOT_FOUND\"))\nprint(\"Persisted active_role (best-effort):\", vals.get(\"active_role\", \"NOT_FOUND\"))\n\n\nprint(\"nACP logs:\", acp_log_path())\nprint(\"Checkpoint DB:\", db_path)\n\n\n\n\nG = nx.DiGraph()\nG.add_edge(\"planner\", \"executor\")\nG.add_edge(\"executor\", \"validator\")\nG.add_edge(\"validator\", \"user\")\n\n\nplt.figure(figsize=(6, 4))\npos = nx.spring_layout(G, seed=7)\nnx.draw(G, pos, with_labels=True, node_size=1800, font_size=10, arrows=True)\nplt.title(\"Orchestration Graph: Planner \u2192 Executor \u2192 Validator\")\nplt.show()\n\n\n\n\ncomm = nx.MultiDiGraph()\nfor (s, r, t) in final_state.edges:\n   comm.add_edge(s, r, label=t)\n\n\nplt.figure(figsize=(8, 5))\npos2 = nx.spring_layout(comm, seed=11)\nnx.draw(comm, pos2, with_labels=True, node_size=1800, font_size=10, arrows=True)\nplt.title(\"Communication Graph from Structured Message Bus (Runtime Edges)\")\nplt.show()\n\n\n\n\ndef tail_jsonl(path: str, n: int = 8) -&gt; List[Dict[str, Any]]:\n   if not os.path.exists(path):\n       return []\n   with open(path, \"r\", encoding=\"utf-8\") as f:\n       lines = f.readlines()[-n:]\n   return [json.loads(x) for x in lines]\n\n\n\n\nprint(\"nLast ACP log entries:\")\nfor row in tail_jsonl(acp_log_path(), 6):\n   print(f\"{row['msg_type']:&gt;10} | {row['sender']} -&gt; {row['receiver']} | {row['ts']}\")<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We construct the LangGraph state graph, enable SQLite-based persistence, and execute the multi-agent workflow. We use a thread identifier to ensure the agent state can be saved and recovered reliably across executions. We also visualize the orchestration and communication graphs and inspect persisted logs, which allows us to understand how agents interact through the structured message bus.<\/p>\n<p>In this tutorial, we successfully designed and implemented a structured multi-agent communication framework using LangGraph\u2019s shared-state architecture and ACP-style message-bus principles. We enabled agents to operate independently while communicating through structured, persistent messages, which improves reliability, observability, and scalability. We logged every interaction, persisted agent state across executions, and visualized communication patterns to gain deep insight into agent coordination. This architecture allows us to build robust, modular, and production-ready multi-agent systems that can be extended with additional agents, LLM reasoning, memory systems, and complex routing strategies.<\/p>\n<hr class=\"wp-block-separator has-alpha-channel-opacity\" \/>\n<p>Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Agent%20Communication%20Protocol\/Getting%20Started\/langgraph_acp_structured_message_bus_multi_agent_system_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">Full Codes here<\/a>.\u00a0<\/strong>Also,\u00a0feel free to follow us on\u00a0<strong><a href=\"https:\/\/x.com\/intent\/follow?screen_name=marktechpost\" target=\"_blank\" rel=\"noreferrer noopener\"><mark>Twitter<\/mark><\/a><\/strong>\u00a0and don\u2019t forget to join our\u00a0<strong><a href=\"https:\/\/www.reddit.com\/r\/machinelearningnews\/\" target=\"_blank\" rel=\"noreferrer noopener\">120k+ ML SubReddit<\/a><\/strong>\u00a0and Subscribe to\u00a0<strong><a href=\"https:\/\/www.aidevsignals.com\/\" target=\"_blank\" rel=\"noreferrer noopener\">our Newsletter<\/a><\/strong>. Wait! are you on telegram?\u00a0<strong><a href=\"https:\/\/t.me\/machinelearningresearchnews\" target=\"_blank\" rel=\"noreferrer noopener\">now you can join us on telegram as well.<\/a><\/strong><\/p>\n<p>The post <a href=\"https:\/\/www.marktechpost.com\/2026\/03\/01\/how-to-design-a-production-grade-multi-agent-communication-system-using-langgraph-structured-message-bus-acp-logging-and-persistent-shared-state-architecture\/\">How to Design a Production-Grade Multi-Agent Communication System Using LangGraph Structured Message Bus, ACP Logging, and Persistent Shared State Architecture<\/a> appeared first on <a href=\"https:\/\/www.marktechpost.com\/\">MarkTechPost<\/a>.<\/p>","protected":false},"excerpt":{"rendered":"<p>In this tutorial, we build an &hellip;<\/p>\n","protected":false},"author":1,"featured_media":29,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-497","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/posts\/497","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=497"}],"version-history":[{"count":0,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/posts\/497\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/media\/29"}],"wp:attachment":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=497"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=497"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=497"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}