{"id":902,"date":"2026-05-14T10:31:47","date_gmt":"2026-05-14T02:31:47","guid":{"rendered":"https:\/\/connectword.dpdns.org\/?p=902"},"modified":"2026-05-14T10:31:47","modified_gmt":"2026-05-14T02:31:47","slug":"how-to-build-a-dynamic-zero-trust-network-simulation-with-graph-based-micro-segmentation-adaptive-policy-engine-and-insider-threat-detection","status":"publish","type":"post","link":"https:\/\/connectword.dpdns.org\/?p=902","title":{"rendered":"How to Build a Dynamic Zero-Trust Network Simulation with Graph-Based Micro-Segmentation, Adaptive Policy Engine, and Insider Threat Detection"},"content":{"rendered":"<p>In this tutorial, we build a realistic Zero-Trust network simulation by modeling a micro-segmented environment as a directed graph and forcing every request to earn access through continuous verification. We implement a dynamic policy engine that blends ABAC-style permissions with device posture, MFA, path reachability, zone sensitivity, and live risk signals such as anomaly and data-volume indicators. We then operationalize the model through a Flask API and run mixed traffic, including insider-lateral movement and exfiltration attempts, to show how trust scoring, adaptive controls, and automated quarantines block malicious flows in real time.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">!pip -q install networkx flask\n\n\nimport math\nimport json\nimport time\nimport random\nimport hashlib\nfrom dataclasses import dataclass, field\nfrom typing import Dict, Any, List, Tuple, Optional\n\n\nimport networkx as nx\nfrom flask import Flask, request, jsonify\n\n\nimport matplotlib.pyplot as plt\n\n\n\n\ndef _sigmoid(x: float) -&gt; float:\n   return 1.0 \/ (1.0 + math.exp(-x))\n\n\ndef _clamp(x: float, lo: float = 0.0, hi: float = 1.0) -&gt; float:\n   return max(lo, min(hi, x))\n\n\ndef _now_ts() -&gt; float:\n   return time.time()\n\n\ndef _stable_hash(s: str) -&gt; int:\n   h = hashlib.sha256(s.encode(\"utf-8\")).hexdigest()\n   return int(h[:10], 16)\n\n\ndef _rand_choice_weighted(items: List[Any], weights: List[float]) -&gt; Any:\n   return random.choices(items, weights=weights, k=1)[0]\n\n\ndef _pretty(obj: Any) -&gt; str:\n   return json.dumps(obj, indent=2, sort_keys=False)<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We set up the environment by installing the required libraries and importing all dependencies needed for graph modeling, risk scoring, and API handling. We define utility functions for trust normalization, hashing, timestamping, and weighted sampling to support deterministic simulations. We prepare helper functions that simplify logging and structured output formatting throughout the tutorial.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">ZONES = [\"public\", \"dmz\", \"app\", \"data\", \"admin\"]\nSENSITIVITY = {\"public\": 0.15, \"dmz\": 0.35, \"app\": 0.6, \"data\": 0.85, \"admin\": 0.95}\n\n\nASSETS = {\n   \"public\": [\"cdn\", \"landing\", \"status\"],\n   \"dmz\": [\"api_gateway\", \"waf\", \"vpn\"],\n   \"app\": [\"orders_svc\", \"billing_svc\", \"ml_inference\", \"inventory_svc\"],\n   \"data\": [\"customer_db\", \"ledger_db\", \"feature_store\"],\n   \"admin\": [\"iam\", \"siem\", \"backup_vault\"]\n}      \n\n\nACTIONS = [\"read\", \"write\", \"deploy\", \"admin\", \"exfiltrate\"]\n\n\nROLES = [\"customer\", \"employee\", \"analyst\", \"engineer\", \"admin\", \"secops\"]\n\n\nDEVICE_TYPES = [\"managed_laptop\", \"managed_server\", \"byod_phone\", \"unknown_iot\"]\nNETWORK_CONTEXT = [\"corp_lan\", \"corp_vpn\", \"public_wifi\", \"tor_exit\"]\n\n\n@dataclass\nclass RequestContext:\n   user: str\n   role: str\n   device_id: str\n   device_type: str\n   device_posture: float\n   mfa: bool\n   source: str\n   src_node: str\n   dst_node: str\n   action: str\n   time_bucket: str\n   geo_risk: float\n   behavior_anomaly: float\n   data_volume: float\n   reason: str = \"\"\n\n\n@dataclass\nclass Decision:\n   allowed: bool\n   trust_score: float\n   rule_hits: List[str] = field(default_factory=list)\n   controls: Dict[str, Any] = field(default_factory=dict)\n   explanation: str = \"\"\n   ts: float = field(default_factory=_now_ts)\n\n\n@dataclass\nclass PrincipalState:\n   user: str\n   role: str\n   base_risk: float\n   last_seen_ts: float\n   rolling_denies: int = 0\n   rolling_allows: int = 0\n   quarantined: bool = False\n   compromise_score: float = 0.0\n\n\n@dataclass\nclass DeviceState:\n   device_id: str\n   device_type: str\n   owner: str\n   posture: float\n   attested: bool\n   quarantined: bool = False\n\n\n@dataclass\nclass FlowRecord:\n   ts: float\n   ctx: Dict[str, Any]\n   decision: Dict[str, Any]<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We define the core domain schema including zones, assets, roles, device types, and contextual signals that shape our Zero-Trust environment. We formalize request, decision, principal, device, and flow record structures using dataclasses to maintain clarity and state integrity. We establish the foundational data model that enables continuous trust evaluation across identities, devices, and network paths.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">def build_microsegmented_graph(seed: int = 7) -&gt; nx.DiGraph:\n   random.seed(seed)\n   G = nx.DiGraph()\n\n\n   for z in ZONES:\n       G.add_node(f\"zone:{z}\", kind=\"zone\", zone=z, sensitivity=SENSITIVITY[z])\n\n\n   for z, assets in ASSETS.items():\n       for a in assets:\n           node = f\"{z}:{a}\"\n           G.add_node(node, kind=\"asset\", zone=z, sensitivity=SENSITIVITY[z] + random.uniform(-0.05, 0.05))\n           G.add_edge(f\"zone:{z}\", node, kind=\"contains\")\n\n\n   allowed_paths = [\n       (\"public\", \"dmz\"),\n       (\"dmz\", \"app\"),\n       (\"app\", \"data\"),\n       (\"admin\", \"app\"),\n       (\"admin\", \"data\"),\n       (\"admin\", \"dmz\"),\n       (\"dmz\", \"admin\")\n   ]\n\n\n   for src_z, dst_z in allowed_paths:\n       G.add_edge(f\"zone:{src_z}\", f\"zone:{dst_z}\", kind=\"zone_route\", base_allowed=True)\n\n\n   for src_z, dst_z in allowed_paths:\n       for src_a in ASSETS[src_z]:\n           for dst_a in ASSETS[dst_z]:\n               if random.random() &lt; 0.45:\n                   G.add_edge(f\"{src_z}:{src_a}\", f\"{dst_z}:{dst_a}\", kind=\"service_call\", base_allowed=True)\n\n\n   for z in ZONES:\n       for a in ASSETS[z]:\n           if random.random() &lt; 0.35:\n               G.add_edge(f\"{z}:{a}\", f\"{z}:{a}\", kind=\"self\", base_allowed=True)\n\n\n   return G\n\n\ndef draw_graph(G: nx.DiGraph, title: str = \"Zero-Trust Microsegmented Network Graph\") -&gt; None:\n   plt.figure(figsize=(14, 9))\n   pos = nx.spring_layout(G, seed=42, k=0.35)\n   kinds = nx.get_node_attributes(G, \"kind\")\n   node_colors = []\n   for n in G.nodes():\n       if kinds.get(n) == \"zone\":\n           node_colors.append(0.85)\n       else:\n           node_colors.append(G.nodes[n].get(\"sensitivity\", 0.5))\n   nx.draw_networkx_nodes(G, pos, node_size=350, node_color=node_colors)\n   nx.draw_networkx_edges(G, pos, arrows=True, alpha=0.25)\n   nx.draw_networkx_labels(G, pos, font_size=8)\n   plt.title(title)\n   plt.axis(\"off\")\n   plt.show()<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We construct a micro-segmented directed network graph where zones and assets are explicitly modeled with sensitivity attributes. We programmatically generate inter-zone and service-level communication paths to simulate realistic enterprise traffic patterns. We visualize the network topology to clearly observe segmentation boundaries and potential lateral movement routes.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">class ZeroTrustPolicyEngine:\n   def __init__(self, G: nx.DiGraph):\n       self.G = G\n       self.principals: Dict[str, PrincipalState] = {}\n       self.devices: Dict[str, DeviceState] = {}\n       self.flow_log: List[FlowRecord] = []\n       self.blocked_edges: set = set()\n       self.policy_version = \"ztpe-v1.3\"\n\n\n       self.role_perms = {\n           \"customer\": {\"public\": {\"read\"}, \"dmz\": {\"read\"}},\n           \"employee\": {\"public\": {\"read\"}, \"dmz\": {\"read\"}, \"app\": {\"read\", \"write\"}},\n           \"analyst\": {\"public\": {\"read\"}, \"dmz\": {\"read\"}, \"app\": {\"read\"}, \"data\": {\"read\"}},\n           \"engineer\": {\"public\": {\"read\"}, \"dmz\": {\"read\"}, \"app\": {\"read\", \"write\", \"deploy\"}, \"data\": {\"read\"}},\n           \"admin\": {\"public\": {\"read\"}, \"dmz\": {\"read\", \"write\"}, \"app\": {\"read\", \"write\", \"deploy\", \"admin\"}, \"data\": {\"read\", \"write\", \"admin\"}, \"admin\": {\"read\", \"write\", \"admin\"}},\n           \"secops\": {\"public\": {\"read\"}, \"dmz\": {\"read\", \"write\"}, \"app\": {\"read\", \"admin\"}, \"data\": {\"read\", \"admin\"}, \"admin\": {\"read\", \"admin\"}},\n       }\n\n\n       self.w = {\n           \"role_fit\": 1.4,\n           \"device_posture\": 1.8,\n           \"mfa\": 1.0,\n           \"network_context\": 1.2,\n           \"time\": 0.6,\n           \"geo_risk\": 1.2,\n           \"behavior_anomaly\": 2.2,\n           \"data_volume\": 1.4,\n           \"principal_base_risk\": 1.3,\n           \"principal_compromise\": 2.0,\n           \"asset_sensitivity\": 1.6,\n           \"path_validity\": 1.5,\n           \"quarantine\": 4.0,\n       }\n\n\n       self.thresholds = {\n           \"allow\": 0.72,\n           \"step_up\": 0.62,\n           \"rate_limit\": 0.55,\n           \"deny\": 0.0\n       }\n\n\n   def register_principal(self, user: str, role: str, base_risk: float) -&gt; None:\n       self.principals[user] = PrincipalState(\n           user=user,\n           role=role,\n           base_risk=_clamp(base_risk),\n           last_seen_ts=_now_ts()\n       )\n\n\n   def register_device(self, device_id: str, device_type: str, owner: str, posture: float, attested: bool) -&gt; None:\n       self.devices[device_id] = DeviceState(\n           device_id=device_id,\n           device_type=device_type,\n           owner=owner,\n           posture=_clamp(posture),\n           attested=bool(attested)\n       )\n\n\n   def _asset_zone_and_sensitivity(self, node: str) -&gt; Tuple[str, float]:\n       if node.startswith(\"zone:\"):\n           z = node.split(\":\", 1)[1]\n           return z, SENSITIVITY.get(z, 0.5)\n       z = self.G.nodes[node].get(\"zone\", \"public\")\n       sens = float(self.G.nodes[node].get(\"sensitivity\", SENSITIVITY.get(z, 0.5)))\n       return z, _clamp(sens)\n\n\n   def _base_abac_check(self, role: str, dst_zone: str, action: str) -&gt; bool:\n       return action in self.role_perms.get(role, {}).get(dst_zone, set())\n\n\n   def _path_is_valid(self, src: str, dst: str) -&gt; bool:\n       if (src, dst) in self.blocked_edges:\n           return False\n       try:\n           return nx.has_path(self.G, src, dst)\n       except nx.NetworkXError:\n           return False\n\n\n   def _network_context_risk(self, source: str) -&gt; float:\n       table = {\"corp_lan\": 0.1, \"corp_vpn\": 0.25, \"public_wifi\": 0.65, \"tor_exit\": 0.9}\n       return table.get(source, 0.6)\n\n\n   def _time_risk(self, time_bucket: str) -&gt; float:\n       return 0.15 if time_bucket == \"business_hours\" else 0.55\n\n\n   def _compute_trust_score(self, ctx: RequestContext) -&gt; Tuple[float, List[str], Dict[str, Any]]:\n       rule_hits = []\n       controls: Dict[str, Any] = {}\n\n\n       principal = self.principals.get(ctx.user)\n       device = self.devices.get(ctx.device_id)\n\n\n       if principal is None:\n           rule_hits.append(\"unknown_principal\")\n           principal = PrincipalState(ctx.user, ctx.role, base_risk=0.85, last_seen_ts=_now_ts())\n\n\n       if device is None:\n           rule_hits.append(\"unknown_device\")\n           device = DeviceState(ctx.device_id, ctx.device_type, owner=ctx.user, posture=0.25, attested=False)\n\n\n       src_zone, src_sens = self._asset_zone_and_sensitivity(ctx.src_node)\n       dst_zone, dst_sens = self._asset_zone_and_sensitivity(ctx.dst_node)\n\n\n       abac_ok = self._base_abac_check(ctx.role, dst_zone, ctx.action)\n       if not abac_ok:\n           rule_hits.append(\"abac_denied\")\n\n\n       path_ok = self._path_is_valid(ctx.src_node, ctx.dst_node)\n       if not path_ok:\n           rule_hits.append(\"invalid_path_or_blocked\")\n\n\n       if principal.quarantined or device.quarantined:\n           rule_hits.append(\"quarantined\")\n           controls[\"auto_response\"] = \"deny_quarantine\"\n\n\n       if ctx.action == \"exfiltrate\":\n           rule_hits.append(\"exfil_attempt\")\n\n\n       if dst_zone in [\"admin\", \"data\"] and not ctx.mfa:\n           rule_hits.append(\"mfa_required_for_sensitive_zone\")\n           controls[\"step_up_mfa\"] = True\n\n\n       if device.owner != ctx.user:\n           rule_hits.append(\"device_owner_mismatch\")\n\n\n       net_r = self._network_context_risk(ctx.source)\n       t_r = self._time_risk(ctx.time_bucket)\n\n\n       role_fit = 1.0 if abac_ok else 0.0\n       posture = _clamp(device.posture if device.attested else device.posture * 0.75)\n       mfa = 1.0 if ctx.mfa else 0.0\n       path_valid = 1.0 if path_ok else 0.0\n       sens = _clamp(dst_sens)\n\n\n       principal_risk = _clamp(principal.base_risk)\n       compromise = _clamp(principal.compromise_score)\n       anomaly = _clamp(ctx.behavior_anomaly)\n       geo = _clamp(ctx.geo_risk)\n       data_vol = _clamp(ctx.data_volume)\n\n\n       quarantine_penalty = 1.0 if (principal.quarantined or device.quarantined) else 0.0\n       owner_mismatch_penalty = 1.0 if (device.owner != ctx.user) else 0.0\n       exfil_penalty = 1.0 if (ctx.action == \"exfiltrate\") else 0.0\n\n\n       z = 0.0\n       z += self.w[\"role_fit\"] * (role_fit - 0.5)\n       z += self.w[\"device_posture\"] * (posture - 0.5)\n       z += self.w[\"mfa\"] * (mfa - 0.5)\n       z += self.w[\"path_validity\"] * (path_valid - 0.5)\n\n\n       z -= self.w[\"asset_sensitivity\"] * (sens - 0.35)\n\n\n       z -= self.w[\"network_context\"] * (net_r - 0.25)\n       z -= self.w[\"time\"] * (t_r - 0.15)\n       z -= self.w[\"geo_risk\"] * (geo - 0.2)\n\n\n       z -= self.w[\"behavior_anomaly\"] * (anomaly - 0.1)\n       z -= self.w[\"data_volume\"] * (data_vol - 0.15)\n\n\n       z -= self.w[\"principal_base_risk\"] * (principal_risk - 0.2)\n       z -= self.w[\"principal_compromise\"] * (compromise - 0.0)\n\n\n       z -= 2.0 * owner_mismatch_penalty\n       z -= 2.5 * exfil_penalty\n       z -= self.w[\"quarantine\"] * quarantine_penalty\n\n\n       trust = _sigmoid(z)\n\n\n       if trust &lt; self.thresholds[\"rate_limit\"]:\n           controls[\"rate_limit\"] = True\n       if trust &lt; self.thresholds[\"step_up\"]:\n           controls[\"step_up\"] = bool(controls.get(\"step_up_mfa\", False) or dst_zone in [\"admin\", \"data\"])\n       if trust &lt; self.thresholds[\"allow\"]:\n           controls[\"continuous_auth\"] = True\n\n\n       if \"abac_denied\" in rule_hits or \"invalid_path_or_blocked\" in rule_hits or \"exfil_attempt\" in rule_hits:\n           controls[\"risk_signal\"] = \"policy_violation\"\n\n\n       if anomaly &gt; 0.75 and sens &gt; 0.75:\n           controls[\"auto_response\"] = \"quarantine_candidate\"\n\n\n       return _clamp(trust), rule_hits, controls\n\n\n   def evaluate(self, ctx: RequestContext) -&gt; Decision:\n       trust, rule_hits, controls = self._compute_trust_score(ctx)\n       allowed = trust &gt;= self.thresholds[\"allow\"]\n\n\n       if controls.get(\"step_up\"):\n           if not ctx.mfa:\n               allowed = False\n               rule_hits.append(\"step_up_failed_no_mfa\")\n           else:\n               allowed = allowed or (trust &gt;= self.thresholds[\"step_up\"])\n\n\n       if controls.get(\"rate_limit\") and trust &lt; 0.5:\n           allowed = False\n           rule_hits.append(\"rate_limited_denied\")\n\n\n       explanation = self._explain(ctx, trust, allowed, rule_hits, controls)\n       dec = Decision(allowed=allowed, trust_score=trust, rule_hits=rule_hits, controls=controls, explanation=explanation)\n\n\n       self._post_decision_updates(ctx, dec)\n\n\n       self.flow_log.append(\n           FlowRecord(\n               ts=dec.ts,\n               ctx=ctx.__dict__.copy(),\n               decision={\n                   \"allowed\": dec.allowed,\n                   \"trust_score\": dec.trust_score,\n                   \"rule_hits\": dec.rule_hits,\n                   \"controls\": dec.controls,\n                   \"explanation\": dec.explanation\n               }\n           )\n       )\n       return dec\n\n\n   def _explain(self, ctx: RequestContext, trust: float, allowed: bool, hits: List[str], controls: Dict[str, Any]) -&gt; str:\n       src_z, _ = self._asset_zone_and_sensitivity(ctx.src_node)\n       dst_z, dst_s = self._asset_zone_and_sensitivity(ctx.dst_node)\n       bits = []\n       bits.append(f\"Decision={'ALLOW' if allowed else 'DENY'} | trust={trust:.3f} | {ctx.user}({ctx.role}) {ctx.action} {ctx.src_node}-&gt;{ctx.dst_node}\")\n       bits.append(f\"Context: source={ctx.source}, time={ctx.time_bucket}, geo_risk={ctx.geo_risk:.2f}, anomaly={ctx.behavior_anomaly:.2f}, data_vol={ctx.data_volume:.2f}\")\n       bits.append(f\"Zones: {src_z} -&gt; {dst_z} (dst_sensitivity={dst_s:.2f}) | MFA={'yes' if ctx.mfa else 'no'} | posture={ctx.device_posture:.2f}\")\n       if hits:\n           bits.append(f\"Rule hits: {', '.join(hits)}\")\n       if controls:\n           bits.append(f\"Controls: {controls}\")\n       return \" | \".join(bits)\n\n\n   def _post_decision_updates(self, ctx: RequestContext, dec: Decision) -&gt; None:\n       p = self.principals.get(ctx.user)\n       d = self.devices.get(ctx.device_id)\n\n\n       if p is None:\n           self.register_principal(ctx.user, ctx.role, base_risk=0.65)\n           p = self.principals[ctx.user]\n       if d is None:\n           self.register_device(ctx.device_id, ctx.device_type, ctx.user, ctx.device_posture, attested=(ctx.device_type.startswith(\"managed\")))\n           d = self.devices[ctx.device_id]\n\n\n       p.last_seen_ts = dec.ts\n\n\n       if dec.allowed:\n           p.rolling_allows += 1\n           p.rolling_denies = max(0, p.rolling_denies - 1)\n           p.compromise_score = _clamp(p.compromise_score - 0.02)\n       else:\n           p.rolling_denies += 1\n           p.compromise_score = _clamp(p.compromise_score + 0.06 + 0.10 * (1.0 if \"exfil_attempt\" in dec.rule_hits else 0.0))\n\n\n       if dec.controls.get(\"auto_response\") == \"quarantine_candidate\" or p.rolling_denies &gt;= 4 or p.compromise_score &gt; 0.78:\n           p.quarantined = True\n           if d:\n               d.quarantined = True\n\n\n       if (\"invalid_path_or_blocked\" in dec.rule_hits) or (\"exfil_attempt\" in dec.rule_hits) or (\"abac_denied\" in dec.rule_hits):\n           self.blocked_edges.add((ctx.src_node, ctx.dst_node))\n\n\n   def stats(self) -&gt; Dict[str, Any]:\n       total = len(self.flow_log)\n       allows = sum(1 for r in self.flow_log if r.decision[\"allowed\"])\n       denies = total - allows\n       top_denies = {}\n       for r in self.flow_log:\n           if not r.decision[\"allowed\"]:\n               for h in r.decision[\"rule_hits\"]:\n                   top_denies[h] = top_denies.get(h, 0) + 1\n       principals = {\n           u: {\n               \"role\": p.role,\n               \"base_risk\": round(p.base_risk, 3),\n               \"compromise_score\": round(p.compromise_score, 3),\n               \"rolling_denies\": p.rolling_denies,\n               \"rolling_allows\": p.rolling_allows,\n               \"quarantined\": p.quarantined\n           }\n           for u, p in self.principals.items()\n       }\n       devices = {\n           did: {\n               \"owner\": d.owner,\n               \"type\": d.device_type,\n               \"posture\": round(d.posture, 3),\n               \"attested\": d.attested,\n               \"quarantined\": d.quarantined\n           }\n           for did, d in self.devices.items()\n       }\n       return {\n           \"policy_version\": self.policy_version,\n           \"flows_total\": total,\n           \"flows_allow\": allows,\n           \"flows_deny\": denies,\n           \"deny_reasons_top\": dict(sorted(top_denies.items(), key=lambda kv: kv[1], reverse=True)[:10]),\n           \"blocked_edges_count\": len(self.blocked_edges),\n           \"principals\": principals,\n           \"devices\": devices\n       }<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We implement the dynamic Zero-Trust Policy Engine that evaluates every request using ABAC, contextual risk signals, behavioral anomaly scores, and path validation. We compute a continuous trust score through a weighted risk model and trigger adaptive controls such as step-up authentication, rate limiting, and quarantine. We update the principal and device state after each decision to simulate continuous verification and evolving risk posture.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">def make_world(engine: ZeroTrustPolicyEngine, seed: int = 13) -&gt; Dict[str, Any]:\n   random.seed(seed)\n\n\n   users = [\n       (\"alice\", \"employee\", 0.18),\n       (\"bob\", \"engineer\", 0.22),\n       (\"cathy\", \"analyst\", 0.25),\n       (\"dan\", \"admin\", 0.15),\n       (\"eve\", \"secops\", 0.10),\n       (\"mallory\", \"employee\", 0.55)\n   ]\n   for u, r, br in users:\n       engine.register_principal(u, r, br)\n\n\n   devices = [\n       (\"dev-alice-lt\", \"managed_laptop\", \"alice\", 0.82, True),\n       (\"dev-bob-lt\", \"managed_laptop\", \"bob\", 0.77, True),\n       (\"dev-cathy-lt\", \"managed_laptop\", \"cathy\", 0.74, True),\n       (\"dev-dan-lt\", \"managed_laptop\", \"dan\", 0.88, True),\n       (\"dev-eve-lt\", \"managed_laptop\", \"eve\", 0.90, True),\n       (\"dev-mallory-byod\", \"byod_phone\", \"mallory\", 0.42, False),\n       (\"unknown-iot-7\", \"unknown_iot\", \"unknown\", 0.20, False),\n   ]\n   for did, dt, owner, posture, attested in devices:\n       engine.register_device(did, dt, owner, posture, attested)\n\n\n   all_assets = [n for n in engine.G.nodes() if engine.G.nodes[n].get(\"kind\") == \"asset\"]\n   by_zone = {z: [a for a in all_assets if engine.G.nodes[a].get(\"zone\") == z] for z in ZONES}\n\n\n   return {\"users\": users, \"devices\": devices, \"assets\": all_assets, \"by_zone\": by_zone}\n\n\ndef gen_request(engine: ZeroTrustPolicyEngine, world: Dict[str, Any], kind: str = \"normal\", seed_salt: str = \"\") -&gt; RequestContext:\n   rnd = random.Random(_stable_hash(kind + seed_salt + str(_now_ts())[:6]))\n\n\n   users = world[\"users\"]\n   by_zone = world[\"by_zone\"]\n\n\n   def pick_user(role_bias: Optional[str] = None) -&gt; Tuple[str, str]:\n       if role_bias:\n           filtered = [u for u in users if u[1] == role_bias]\n           if filtered:\n               u, r, _ = rnd.choice(filtered)\n               return u, r\n       u, r, _ = rnd.choice(users)\n       return u, r\n\n\n   def user_device(u: str) -&gt; Tuple[str, str, float]:\n       candidates = [d for d in engine.devices.values() if d.owner == u]\n       if candidates:\n           d = rnd.choice(candidates)\n       else:\n           d = rnd.choice(list(engine.devices.values()))\n       return d.device_id, d.device_type, d.posture\n\n\n   def time_bucket():\n       return \"business_hours\" if rnd.random() &lt; 0.75 else \"after_hours\"\n\n\n   source = _rand_choice_weighted(NETWORK_CONTEXT, [0.45, 0.25, 0.22, 0.08])\n   geo_risk = _clamp(rnd.uniform(0.05, 0.35) + (0.25 if source in [\"public_wifi\", \"tor_exit\"] else 0.0))\n   behavior_anomaly = _clamp(rnd.uniform(0.02, 0.25))\n   data_volume = _clamp(rnd.uniform(0.02, 0.25))\n\n\n   if kind == \"normal\":\n       u, r = pick_user()\n       did, dt, posture = user_device(u)\n\n\n       src_zone = _rand_choice_weighted([\"public\", \"dmz\", \"app\"], [0.15, 0.55, 0.30])\n       dst_zone = _rand_choice_weighted([\"dmz\", \"app\", \"data\"], [0.35, 0.45, 0.20])\n       action = _rand_choice_weighted(ACTIONS, [0.55, 0.28, 0.07, 0.08, 0.02])\n\n\n       src = rnd.choice(by_zone[src_zone])\n       dst = rnd.choice(by_zone[dst_zone])\n\n\n       mfa = True if dst_zone in [\"data\", \"admin\"] else (rnd.random() &lt; 0.55)\n\n\n       return RequestContext(\n           user=u, role=r,\n           device_id=did, device_type=dt, device_posture=posture,\n           mfa=mfa, source=source,\n           src_node=src, dst_node=dst,\n           action=action,\n           time_bucket=time_bucket(),\n           geo_risk=geo_risk,\n           behavior_anomaly=behavior_anomaly,\n           data_volume=data_volume,\n           reason=\"routine_access\"\n       )\n\n\n   if kind == \"malicious_flow\":\n       u, r = (\"unknown_actor\", \"customer\")\n       did, dt, posture = (\"unknown-dev\", \"unknown_iot\", 0.18)\n\n\n       source = _rand_choice_weighted([\"tor_exit\", \"public_wifi\"], [0.65, 0.35])\n       geo_risk = _clamp(rnd.uniform(0.6, 0.95))\n       behavior_anomaly = _clamp(rnd.uniform(0.75, 0.98))\n       data_volume = _clamp(rnd.uniform(0.75, 0.98))\n\n\n       src = rnd.choice(by_zone[\"public\"] + by_zone[\"dmz\"])\n       dst = rnd.choice(by_zone[\"data\"] + by_zone[\"admin\"])\n       action = _rand_choice_weighted([\"write\", \"admin\", \"exfiltrate\"], [0.25, 0.25, 0.50])\n       mfa = False\n\n\n       return RequestContext(\n           user=u, role=r,\n           device_id=did, device_type=dt, device_posture=posture,\n           mfa=mfa, source=source,\n           src_node=src, dst_node=dst,\n           action=action,\n           time_bucket=\"after_hours\",\n           geo_risk=geo_risk,\n           behavior_anomaly=behavior_anomaly,\n           data_volume=data_volume,\n           reason=\"external_malicious_attempt\"\n       )\n\n\n   if kind == \"insider_threat\":\n       u, r = (\"mallory\", \"employee\")\n       did, dt, posture = user_device(u)\n\n\n       source = _rand_choice_weighted([\"corp_vpn\", \"public_wifi\"], [0.55, 0.45])\n       geo_risk = _clamp(rnd.uniform(0.25, 0.65))\n       behavior_anomaly = _clamp(rnd.uniform(0.55, 0.95))\n       data_volume = _clamp(rnd.uniform(0.55, 0.95))\n\n\n       src = rnd.choice(by_zone[\"app\"] + by_zone[\"dmz\"])\n       dst = rnd.choice(by_zone[\"data\"] + by_zone[\"admin\"])\n       action = _rand_choice_weighted([\"read\", \"write\", \"exfiltrate\", \"admin\"], [0.18, 0.22, 0.45, 0.15])\n\n\n       mfa = rnd.random() &lt; 0.25\n\n\n       return RequestContext(\n           user=u, role=r,\n           device_id=did, device_type=dt, device_posture=posture,\n           mfa=mfa, source=source,\n           src_node=src, dst_node=dst,\n           action=action,\n           time_bucket=\"after_hours\",\n           geo_risk=geo_risk,\n           behavior_anomaly=behavior_anomaly,\n           data_volume=data_volume,\n           reason=\"insider_lateral_and_exfil\"\n       )\n\n\n   raise ValueError(f\"Unknown kind={kind}\")\n\n\n\n\ndef run_simulation(engine: ZeroTrustPolicyEngine, world: Dict[str, Any], steps: int = 60, seed: int = 99) -&gt; Dict[str, Any]:\n   random.seed(seed)\n   results = {\"allowed\": 0, \"denied\": 0, \"samples\": []}\n\n\n   for i in range(steps):\n       if i in [12, 13, 14, 28, 29]:\n           ctx = gen_request(engine, world, kind=\"malicious_flow\", seed_salt=str(i))\n       elif i in [18, 19, 20, 34, 35, 36, 50, 51]:\n           ctx = gen_request(engine, world, kind=\"insider_threat\", seed_salt=str(i))\n       else:\n           ctx = gen_request(engine, world, kind=\"normal\", seed_salt=str(i))\n\n\n       dec = engine.evaluate(ctx)\n       if dec.allowed:\n           results[\"allowed\"] += 1\n       else:\n           results[\"denied\"] += 1\n\n\n       if i &lt; 10 or (not dec.allowed and len(results[\"samples\"]) &lt; 18):\n           results[\"samples\"].append({\"ctx\": ctx.__dict__, \"decision\": dec.__dict__})\n\n\n   return results<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We generate realistic traffic scenarios, including normal business activity, malicious external flows, and insider lateral-movement attempts. We simulate contextual variables, including geo-risk, anomaly scores, and data volume, to stress-test the policy engine. We run multi-step simulations to observe how trust scores shift and how the engine progressively blocks risky behavior.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">def make_app(engine: ZeroTrustPolicyEngine, world: Dict[str, Any]) -&gt; Flask:\n   app = Flask(__name__)\n\n\n   @app.get(\"\/health\")\n   def health():\n       return jsonify({\"ok\": True, \"policy_version\": engine.policy_version})\n\n\n   @app.get(\"\/graph\")\n   def graph():\n       nodes = [{\"id\": n, **engine.G.nodes[n]} for n in engine.G.nodes()]\n       edges = [{\"src\": u, \"dst\": v, **engine.G.edges[u, v]} for u, v in engine.G.edges()]\n       return jsonify({\"nodes\": nodes, \"edges\": edges, \"blocked_edges\": list(map(list, engine.blocked_edges))})\n\n\n   @app.post(\"\/request\")\n   def evaluate_request():\n       payload = request.get_json(force=True)\n       ctx = RequestContext(**payload)\n       dec = engine.evaluate(ctx)\n       return jsonify({\"allowed\": dec.allowed, \"trust_score\": dec.trust_score, \"rule_hits\": dec.rule_hits, \"controls\": dec.controls, \"explanation\": dec.explanation})\n\n\n   @app.post(\"\/simulate\")\n   def simulate():\n       payload = request.get_json(force=True) if request.data else {}\n       steps = int(payload.get(\"steps\", 50))\n       res = run_simulation(engine, world, steps=steps, seed=int(payload.get(\"seed\", 123)))\n       return jsonify({\"steps\": steps, \"allowed\": res[\"allowed\"], \"denied\": res[\"denied\"], \"stats\": engine.stats()})\n\n\n   @app.get(\"\/stats\")\n   def stats():\n       return jsonify(engine.stats())\n\n\n   return app\n\n\n\n\nG = build_microsegmented_graph(seed=7)\nengine = ZeroTrustPolicyEngine(G)\nworld = make_world(engine, seed=13)\n\n\ndraw_graph(G, title=\"Zero-Trust Microsegmented Network (Zones + Assets + Directed Flows)\")\n\n\napp = make_app(engine, world)\nclient = app.test_client()\n\n\nprint(\"== Health ==\")\nprint(client.get(\"\/health\").json)\n\n\nprint(\"n== Run simulation (mixture: normal + malicious flows + insider threat) ==\")\nsim_out = client.post(\"\/simulate\", json={\"steps\": 70, \"seed\": 2026}).json\nprint(_pretty({\"allowed\": sim_out[\"allowed\"], \"denied\": sim_out[\"denied\"], \"blocked_edges_count\": sim_out[\"stats\"][\"blocked_edges_count\"]}))\n\n\nprint(\"n== Top deny reasons ==\")\nprint(_pretty(sim_out[\"stats\"][\"deny_reasons_top\"]))\n\n\nprint(\"n== Principal risk snapshot (watch mallory) ==\")\nprincipals = sim_out[\"stats\"][\"principals\"]\nfocus = {k: principals[k] for k in sorted(principals.keys()) if k in [\"alice\",\"bob\",\"cathy\",\"dan\",\"eve\",\"mallory\",\"unknown_actor\"]}\nprint(_pretty(focus))\n\n\nprint(\"n== Example: send a direct insider exfil request via the policy API ==\")\ninsider_ctx = gen_request(engine, world, kind=\"insider_threat\", seed_salt=\"manual-1\")\ninsider_ctx.action = \"exfiltrate\"\ninsider_ctx.mfa = False\ninsider_ctx.behavior_anomaly = 0.92\ninsider_ctx.data_volume = 0.88\ninsider_ctx.geo_risk = 0.62\n\n\nresp = client.post(\"\/request\", json=insider_ctx.__dict__).json\nprint(_pretty(resp))\n\n\nprint(\"n== Example: a legitimate admin read with MFA from corp_lan ==\")\nadmin_ctx = RequestContext(\n   user=\"dan\", role=\"admin\",\n   device_id=\"dev-dan-lt\", device_type=\"managed_laptop\", device_posture=engine.devices[\"dev-dan-lt\"].posture,\n   mfa=True, source=\"corp_lan\",\n   src_node=random.choice(world[\"by_zone\"][\"admin\"]),\n   dst_node=random.choice(world[\"by_zone\"][\"data\"]),\n   action=\"read\",\n   time_bucket=\"business_hours\",\n   geo_risk=0.08,\n   behavior_anomaly=0.06,\n   data_volume=0.10,\n   reason=\"admin_operational_access\"\n)\nresp2 = client.post(\"\/request\", json=admin_ctx.__dict__).json\nprint(_pretty(resp2))\n\n\nprint(\"n== Final stats ==\")\nfinal_stats = client.get(\"\/stats\").json\nprint(_pretty({\n   \"flows_total\": final_stats[\"flows_total\"],\n   \"flows_allow\": final_stats[\"flows_allow\"],\n   \"flows_deny\": final_stats[\"flows_deny\"],\n   \"blocked_edges_count\": final_stats[\"blocked_edges_count\"],\n   \"deny_reasons_top\": final_stats[\"deny_reasons_top\"]\n}))\n\n\nscores = [r.decision[\"trust_score\"] for r in engine.flow_log]\nplt.figure(figsize=(9, 4))\nplt.hist(scores, bins=18)\nplt.title(\"Trust Score Distribution Across Simulated Flows\")\nplt.xlabel(\"trust_score\")\nplt.ylabel(\"count\")\nplt.show()\n\n\ndenied = [r for r in engine.flow_log if not r.decision[\"allowed\"]]\nprint(\"n== Recent denied explanations (last 6) ==\")\nfor r in denied[-6:]:\n   print(\"-\", r.decision[\"explanation\"])<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We expose the policy engine through a Flask API and interact with it using a test client to keep the notebook self-contained. We run simulations, inspect trust distributions, analyze denial reasons, and observe quarantine and edge-blocking behavior. We conclude by visualizing trust score patterns and examining denied explanations to validate the Zero-Trust enforcement logic in action.<\/p>\n<p>In conclusion, we demonstrated how Zero Trust becomes a measurable, programmable system when identity, device state, network context, and behavior signals are evaluated together for every interaction. We saw the policy engine deny or step up risky requests, rate-limit low-trust activity, and dynamically block abusive edges to prevent repeated lateral movement and data theft. By combining graph-based segmentation with an evolving trust score and automated responses, we ended with a repeatable framework that we can extend with richer telemetry, better anomaly models, and environment-specific policies while keeping the core \u201cnever trust, always verify\u201d loop intact.<\/p>\n<hr class=\"wp-block-separator has-alpha-channel-opacity\" \/>\n<p>Check out\u00a0the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Agents-Projects-Tutorials\/blob\/main\/Security\/zero_trust_network_simulation_policy_engine_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">Full Codes with Notebook<\/a>.\u00a0<\/strong>Also,\u00a0feel free to follow us on\u00a0<strong><a href=\"https:\/\/x.com\/intent\/follow?screen_name=marktechpost\" target=\"_blank\" rel=\"noreferrer noopener\"><mark>Twitter<\/mark><\/a><\/strong>\u00a0and don\u2019t forget to join our\u00a0<strong><a href=\"https:\/\/www.reddit.com\/r\/machinelearningnews\/\" target=\"_blank\" rel=\"noreferrer noopener\">150k+ ML SubReddit<\/a><\/strong>\u00a0and Subscribe to\u00a0<strong><a href=\"https:\/\/www.aidevsignals.com\/\" target=\"_blank\" rel=\"noreferrer noopener\">our Newsletter<\/a><\/strong>. Wait! are you on telegram?\u00a0<strong><a href=\"https:\/\/t.me\/machinelearningresearchnews\" target=\"_blank\" rel=\"noreferrer noopener\">now you can join us on telegram as well.<\/a><\/strong><\/p>\n<p>Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.?\u00a0<strong><a href=\"https:\/\/forms.gle\/MTNLpmJtsFA3VRVd9\" target=\"_blank\" rel=\"noreferrer noopener\"><mark>Connect with us<\/mark><\/a><\/strong><\/p>\n<p>The post <a href=\"https:\/\/www.marktechpost.com\/2026\/05\/13\/how-to-build-a-dynamic-zero-trust-network-simulation-with-graph-based-micro-segmentation-adaptive-policy-engine-and-insider-threat-detection\/\">How to Build a Dynamic Zero-Trust Network Simulation with Graph-Based Micro-Segmentation, Adaptive Policy Engine, and Insider Threat Detection<\/a> appeared first on <a href=\"https:\/\/www.marktechpost.com\/\">MarkTechPost<\/a>.<\/p>","protected":false},"excerpt":{"rendered":"<p>In this tutorial, we build a r&hellip;<\/p>\n","protected":false},"author":1,"featured_media":29,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-902","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/posts\/902","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=902"}],"version-history":[{"count":0,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/posts\/902\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/media\/29"}],"wp:attachment":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=902"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=902"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=902"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}