{"id":243,"date":"2026-01-08T05:08:53","date_gmt":"2026-01-07T21:08:53","guid":{"rendered":"https:\/\/connectword.dpdns.org\/?p=243"},"modified":"2026-01-08T05:08:53","modified_gmt":"2026-01-07T21:08:53","slug":"a-coding-implementation-to-build-a-unified-apache-beam-pipeline-demonstrating-batch-and-stream-processing-with-event-time-windowing-using-directrunner","status":"publish","type":"post","link":"https:\/\/connectword.dpdns.org\/?p=243","title":{"rendered":"A Coding Implementation to Build a Unified Apache Beam Pipeline Demonstrating Batch and Stream Processing with Event-Time Windowing Using DirectRunner"},"content":{"rendered":"<p>In this tutorial, we demonstrate how to build a unified <a href=\"https:\/\/github.com\/apache\/beam\"><strong>Apache Beam<\/strong><\/a> pipeline that works seamlessly in both batch and stream-like modes using the DirectRunner. We generate synthetic, event-time\u2013aware data and apply fixed windowing with triggers and allowed lateness to demonstrate how Apache Beam consistently handles both on-time and late events. By switching only the input source, we keep the core aggregation logic identical, which helps us clearly understand how Beam\u2019s event-time model, windows, and panes behave without relying on external streaming infrastructure. Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Distributed%20Systems\/apache_beam_batch_and_stream_windowing_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">!pip -q install -U \"grpcio&gt;=1.71.2\" \"grpcio-status&gt;=1.71.2\"\n!pip -q install -U apache-beam crcmod\n\n\nimport apache_beam as beam\nfrom apache_beam.options.pipeline_options import PipelineOptions, StandardOptions\nfrom apache_beam.transforms.window import FixedWindows\nfrom apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode\nfrom apache_beam.testing.test_stream import TestStream\nimport json\nfrom datetime import datetime, timezone<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We install the required dependencies and ensure version compatibility so that Apache Beam. We import the core Beam APIs along with windowing, triggers, and TestStream utilities needed later in the pipeline. We also bring in standard Python modules for time handling and JSON formatting. Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Distributed%20Systems\/apache_beam_batch_and_stream_windowing_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">MODE = \"stream\"\nWINDOW_SIZE_SECS = 60\nALLOWED_LATENESS_SECS = 120\n\n\ndef make_event(user_id, event_type, amount, event_time_epoch_s):\n   return {\"user_id\": user_id, \"event_type\": event_type, \"amount\": float(amount), \"event_time\": int(event_time_epoch_s)}\n\n\nbase = datetime.now(timezone.utc).replace(microsecond=0)\nt0 = int(base.timestamp())\n\n\nBATCH_EVENTS = [\n   make_event(\"u1\", \"purchase\", 20, t0 + 5),\n   make_event(\"u1\", \"purchase\", 15, t0 + 20),\n   make_event(\"u2\", \"purchase\",  8, t0 + 35),\n   make_event(\"u1\", \"refund\",   -5, t0 + 62),\n   make_event(\"u2\", \"purchase\", 12, t0 + 70),\n   make_event(\"u3\", \"purchase\",  9, t0 + 75),\n   make_event(\"u2\", \"purchase\",  3, t0 + 50),\n]\n<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We define the global configuration that controls window size, lateness, and execution mode. We create synthetic events with explicit event-time timestamps so that windowing behavior is deterministic and easy to reason about. We prepare a small dataset that intentionally includes out-of-order and late events to observe Beam\u2019s event-time semantics. Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Distributed%20Systems\/apache_beam_batch_and_stream_windowing_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">def format_joined_record(kv):\n   user_id, d = kv\n   return {\n       \"user_id\": user_id,\n       \"count\": int(d[\"count\"][0]) if d[\"count\"] else 0,\n       \"sum_amount\": float(d[\"sum_amount\"][0]) if d[\"sum_amount\"] else 0.0,\n   }\n\n\nclass WindowedUserAgg(beam.PTransform):\n   def expand(self, pcoll):\n       stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e[\"event_time\"]))\n       windowed = stamped | beam.WindowInto(\n           FixedWindows(WINDOW_SIZE_SECS),\n           allowed_lateness=ALLOWED_LATENESS_SECS,\n           trigger=AfterWatermark(\n               early=AfterProcessingTime(10),\n               late=AfterProcessingTime(10),\n           ),\n           accumulation_mode=AccumulationMode.ACCUMULATING,\n       )\n       keyed = windowed | beam.Map(lambda e: (e[\"user_id\"], e[\"amount\"]))\n       counts = keyed | beam.combiners.Count.PerKey()\n       sums = keyed | beam.CombinePerKey(sum)\n       return (\n           {\"count\": counts, \"sum_amount\": sums}\n           | beam.CoGroupByKey()\n           | beam.Map(format_joined_record)\n       )<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We build a reusable Beam PTransform that encapsulates all windowed aggregation logic. We apply fixed windows, triggers, and accumulation rules, then group events by user and compute counts and sums. We keep this transform independent of the data source, so the same logic applies to both batch and streaming inputs. Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Distributed%20Systems\/apache_beam_batch_and_stream_windowing_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">class AddWindowInfo(beam.DoFn):\n   def process(self, element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):\n       ws = float(window.start)\n       we = float(window.end)\n       yield {\n           **element,\n           \"window_start_utc\": datetime.fromtimestamp(ws, tz=timezone.utc).strftime(\"%H:%M:%S\"),\n           \"window_end_utc\": datetime.fromtimestamp(we, tz=timezone.utc).strftime(\"%H:%M:%S\"),\n           \"pane_timing\": str(pane_info.timing),\n           \"pane_is_first\": pane_info.is_first,\n           \"pane_is_last\": pane_info.is_last,\n       }\n\n\ndef build_test_stream():\n   return (\n       TestStream()\n       .advance_watermark_to(t0)\n       .add_elements([\n           beam.window.TimestampedValue(make_event(\"u1\", \"purchase\", 20, t0 + 5), t0 + 5),\n           beam.window.TimestampedValue(make_event(\"u1\", \"purchase\", 15, t0 + 20), t0 + 20),\n           beam.window.TimestampedValue(make_event(\"u2\", \"purchase\", 8, t0 + 35), t0 + 35),\n       ])\n       .advance_processing_time(5)\n       .advance_watermark_to(t0 + 61)\n       .add_elements([\n           beam.window.TimestampedValue(make_event(\"u1\", \"refund\", -5, t0 + 62), t0 + 62),\n           beam.window.TimestampedValue(make_event(\"u2\", \"purchase\", 12, t0 + 70), t0 + 70),\n           beam.window.TimestampedValue(make_event(\"u3\", \"purchase\", 9, t0 + 75), t0 + 75),\n       ])\n       .advance_processing_time(5)\n       .add_elements([\n           beam.window.TimestampedValue(make_event(\"u2\", \"purchase\", 3, t0 + 50), t0 + 50),\n       ])\n       .advance_watermark_to(t0 + 121)\n       .advance_watermark_to_infinity()\n   )<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We enrich each aggregated record with window and pane metadata so we can clearly see when and why results are emitted. We convert Beam\u2019s internal timestamps into human-readable UTC times for clarity. We also define a TestStream that simulates real streaming behavior using watermarks, processing-time advances, and late data. Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Distributed%20Systems\/apache_beam_batch_and_stream_windowing_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.<\/p>\n<div class=\"dm-code-snippet dark dm-normal-version default no-background-mobile\">\n<div class=\"control-language\">\n<div class=\"dm-buttons\">\n<div class=\"dm-buttons-left\">\n<div class=\"dm-button-snippet red-button\"><\/div>\n<div class=\"dm-button-snippet orange-button\"><\/div>\n<div class=\"dm-button-snippet green-button\"><\/div>\n<\/div>\n<div class=\"dm-buttons-right\"><a><span class=\"dm-copy-text\">Copy Code<\/span><span class=\"dm-copy-confirmed\">Copied<\/span><span class=\"dm-error-message\">Use a different Browser<\/span><\/a><\/div>\n<\/div>\n<pre class=\" no-line-numbers\"><code class=\" no-wrap language-php\">def run_batch():\n   with beam.Pipeline(options=PipelineOptions([])) as p:\n       (\n           p\n           | beam.Create(BATCH_EVENTS)\n           | WindowedUserAgg()\n           | beam.ParDo(AddWindowInfo())\n           | beam.Map(json.dumps)\n           | beam.Map(print)\n       )\n\n\ndef run_stream():\n   opts = PipelineOptions([])\n   opts.view_as(StandardOptions).streaming = True\n   with beam.Pipeline(options=opts) as p:\n       (\n           p\n           | build_test_stream()\n           | WindowedUserAgg()\n           | beam.ParDo(AddWindowInfo())\n           | beam.Map(json.dumps)\n           | beam.Map(print)\n       )\n\n\nrun_stream() if MODE == \"stream\" else run_batch()<\/code><\/pre>\n<\/div>\n<\/div>\n<p>We wire everything together into executable batch and stream-like pipelines. We toggle between modes by changing a single flag while reusing the same aggregation transform. We run the pipeline and print the windowed results directly, making the execution flow and outputs easy to inspect.<\/p>\n<p>In conclusion, we demonstrated that the same Beam pipeline can process both bounded batch data and unbounded, stream-like data while preserving identical windowing and aggregation semantics. We observed how watermarks, triggers, and accumulation modes influence when results are emitted and how late data updates previously computed windows. Also, we focused on the conceptual foundations of Beam\u2019s unified model, providing a solid base for later scaling the same design to real streaming runners and production environments.<\/p>\n<hr class=\"wp-block-separator has-alpha-channel-opacity\" \/>\n<p>Check out the\u00a0<strong><a href=\"https:\/\/github.com\/Marktechpost\/AI-Tutorial-Codes-Included\/blob\/main\/Distributed%20Systems\/apache_beam_batch_and_stream_windowing_Marktechpost.ipynb\" target=\"_blank\" rel=\"noreferrer noopener\">FULL CODES here<\/a><\/strong>.\u00a0Also,\u00a0feel free to follow us on\u00a0<strong><a href=\"https:\/\/x.com\/intent\/follow?screen_name=marktechpost\" target=\"_blank\" rel=\"noreferrer noopener\"><mark>Twitter<\/mark><\/a><\/strong>\u00a0and don\u2019t forget to join our\u00a0<strong><a href=\"https:\/\/www.reddit.com\/r\/machinelearningnews\/\" target=\"_blank\" rel=\"noreferrer noopener\">100k+ ML SubReddit<\/a><\/strong>\u00a0and Subscribe to\u00a0<strong><a href=\"https:\/\/www.aidevsignals.com\/\" target=\"_blank\" rel=\"noreferrer noopener\">our Newsletter<\/a><\/strong>. Wait! are you on telegram?\u00a0<strong><a href=\"https:\/\/t.me\/machinelearningresearchnews\" target=\"_blank\" rel=\"noreferrer noopener\">now you can join us on telegram as well.<\/a><\/strong><\/p>\n<p>Check out our latest release of\u00a0<a href=\"https:\/\/ai2025.dev\/\" target=\"_blank\" rel=\"noreferrer noopener\"><strong><mark>ai2025.dev<\/mark><\/strong><\/a>, a 2025-focused analytics platform that turns model launches, benchmarks, and ecosystem activity into a structured dataset you can filter, compare, and export<\/p>\n<p>The post <a href=\"https:\/\/www.marktechpost.com\/2026\/01\/07\/a-coding-implementation-to-build-a-unified-apache-beam-pipeline-demonstrating-batch-and-stream-processing-with-event-time-windowing-using-directrunner\/\">A Coding Implementation to Build a Unified Apache Beam Pipeline Demonstrating Batch and Stream Processing with Event-Time Windowing Using DirectRunner<\/a> appeared first on <a href=\"https:\/\/www.marktechpost.com\/\">MarkTechPost<\/a>.<\/p>","protected":false},"excerpt":{"rendered":"<p>In this tutorial, we demonstra&hellip;<\/p>\n","protected":false},"author":1,"featured_media":29,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-243","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/posts\/243","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=243"}],"version-history":[{"count":0,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/posts\/243\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=\/wp\/v2\/media\/29"}],"wp:attachment":[{"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=243"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=243"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/connectword.dpdns.org\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=243"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}