diff --git a/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py b/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py index c6899529..e829ecdc 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py @@ -1,6 +1,6 @@ from abc import ABCMeta, abstractmethod from asyncio import Task -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from enum import IntEnum from typing import TypeAlias, TypeVar @@ -223,25 +223,19 @@ def keys(self) -> list[str]: return self._keys -@dataclass +@dataclass(slots=True) class AccumulatorResult: """Defines the object to hold the result of accumulator computation.""" - __slots__ = ( - "_future", - "_iterator", - "_key", - "_result_queue", - "_consumer_future", - "_latest_watermark", - ) - _future: Task _iterator: NonBlockingIterator _key: list[str] _result_queue: NonBlockingIterator _consumer_future: Task _latest_watermark: datetime + # The CLOSE request's keyed window is only known when the task is closed, so it is set + # later (via the close_window setter) rather than passed to the constructor. + _close_window: KeyedWindow | None = field(default=None, init=False) @property def future(self) -> Task: @@ -310,6 +304,30 @@ def update_watermark(self, new_watermark: datetime): raise TypeError("new_watermark must be a datetime object") self._latest_watermark = new_watermark + @property + def close_window(self) -> KeyedWindow | None: + """Returns the keyed window from the CLOSE request, if the task was closed. + + Returns: + KeyedWindow | None: The window carried by the CLOSE request, echoed back in + the EOF response; None if the task has not received a CLOSE. + """ + return self._close_window + + @close_window.setter + def close_window(self, window: KeyedWindow): + """Stashes the CLOSE request's keyed window so the EOF response can echo it. + + Args: + window (KeyedWindow): The keyed window from the CLOSE request. + + Raises: + TypeError: If window is not a KeyedWindow object. + """ + if not isinstance(window, KeyedWindow): + raise TypeError("window must be a KeyedWindow object") + self._close_window = window + @dataclass class AccumulatorRequest: diff --git a/packages/pynumaflow/pynumaflow/accumulator/servicer/async_servicer.py b/packages/pynumaflow/pynumaflow/accumulator/servicer/async_servicer.py index 886f0e66..0668a938 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/servicer/async_servicer.py +++ b/packages/pynumaflow/pynumaflow/accumulator/servicer/async_servicer.py @@ -1,5 +1,6 @@ import asyncio from collections.abc import AsyncIterable +from datetime import datetime from google.protobuf import empty_pb2 as _empty_pb2 @@ -16,6 +17,25 @@ from pynumaflow.shared.server import update_context_err from pynumaflow.types import NumaflowServicerContext +# protobuf Timestamp.ToDatetime() is only valid for years 1..9999, i.e. seconds in +# [-62135596800, 253402300799]. Accumulator windows are global per key, so core sends the +# window end as an "infinite" sentinel (chrono DateTime::::MAX_UTC, ~year 262137) whose +# seconds exceed that range and make ToDatetime() raise. Clamp out-of-range timestamps to the +# representable datetime bounds instead of crashing the UDF. +_TIMESTAMP_MAX_SECONDS = 253402300799 +_TIMESTAMP_MIN_SECONDS = -62135596800 + + +def _to_datetime(ts) -> datetime: + """Convert a protobuf Timestamp to a datetime, clamping values outside the representable + range (year 1..9999) to datetime.max / datetime.min so an infinite window bound does not + crash decoding.""" + if ts.seconds > _TIMESTAMP_MAX_SECONDS: + return datetime.max + if ts.seconds < _TIMESTAMP_MIN_SECONDS: + return datetime.min + return ts.ToDatetime() + async def datum_generator( request_iterator: AsyncIterable[accumulator_pb2.AccumulatorRequest], @@ -24,8 +44,8 @@ async def datum_generator( async for d in request_iterator: # Convert protobuf KeyedWindow to our KeyedWindow dataclass keyed_window = KeyedWindow( - start=d.operation.keyedWindow.start.ToDatetime(), - end=d.operation.keyedWindow.end.ToDatetime(), + start=_to_datetime(d.operation.keyedWindow.start), + end=_to_datetime(d.operation.keyedWindow.end), slot=d.operation.keyedWindow.slot, keys=list(d.operation.keyedWindow.keys), ) diff --git a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py index ee14e3ea..5acd8433 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py +++ b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py @@ -1,6 +1,6 @@ import asyncio from collections.abc import AsyncIterable -from datetime import datetime +from datetime import datetime, timezone from google.protobuf import timestamp_pb2 from pynumaflow._constants import ( @@ -112,6 +112,10 @@ async def close_task(self, req: AccumulatorRequest): curr_task = self.tasks.get(unified_key, None) if curr_task: + # Stash the CLOSE request's keyed window BEFORE signalling EOF so the task's + # consumer (write_to_global_queue) can echo it back in the EOF response. Core + # uses the echoed window to identify and garbage-collect the closed window. + curr_task.close_window = req.keyed_window await self.tasks[unified_key].iterator.put(STREAM_EOF) await curr_task.future await curr_task.consumer_future @@ -318,7 +322,7 @@ async def write_to_global_queue( watermark_pb.FromDatetime(msg.watermark) start_dt_pb = timestamp_pb2.Timestamp() - start_dt_pb.FromDatetime(datetime.fromtimestamp(0)) + start_dt_pb.FromDatetime(datetime.fromtimestamp(0, tz=timezone.utc)) end_dt_pb = timestamp_pb2.Timestamp() end_dt_pb.FromDatetime(wm) @@ -339,17 +343,39 @@ async def write_to_global_queue( tags=msg.tags, ) await output_queue.put(res) - # send EOF - start_eof_pb = timestamp_pb2.Timestamp() - start_eof_pb.FromDatetime(datetime.fromtimestamp(0)) + # Send EOF. Echo the CLOSE request's keyed window (start/end/slot/keys) so core can + # match the EOF to the window it is closing and garbage-collect it. Mirrors the + # numaflow-rs accumulator behavior (PR #177). + close_window = task.close_window + if close_window is not None: + start_eof_pb = timestamp_pb2.Timestamp() + start_eof_pb.FromDatetime(close_window.start) + + end_eof_pb = timestamp_pb2.Timestamp() + end_eof_pb.FromDatetime(close_window.end) + + eof_window = accumulator_pb2.KeyedWindow( + start=start_eof_pb, + end=end_eof_pb, + slot=close_window.slot, + keys=close_window.keys, + ) + else: + # Fallback for the stream-close/shutdown path (no CLOSE request, e.g. + # stream_send_eof on SIGTERM): synthesize the window from epoch(0) and the + # latest watermark, preserving prior behavior. + start_eof_pb = timestamp_pb2.Timestamp() + start_eof_pb.FromDatetime(datetime.fromtimestamp(0, tz=timezone.utc)) - end_eof_pb = timestamp_pb2.Timestamp() - end_eof_pb.FromDatetime(wm) + end_eof_pb = timestamp_pb2.Timestamp() + end_eof_pb.FromDatetime(wm) - res = accumulator_pb2.AccumulatorResponse( - window=accumulator_pb2.KeyedWindow( + eof_window = accumulator_pb2.KeyedWindow( start=start_eof_pb, end=end_eof_pb, slot="slot-0", keys=task.keys - ), + ) + + res = accumulator_pb2.AccumulatorResponse( + window=eof_window, EOF=True, ) await output_queue.put(res) diff --git a/packages/pynumaflow/tests/accumulator/test_async_accumulator.py b/packages/pynumaflow/tests/accumulator/test_async_accumulator.py index eb499cdf..20e57894 100644 --- a/packages/pynumaflow/tests/accumulator/test_async_accumulator.py +++ b/packages/pynumaflow/tests/accumulator/test_async_accumulator.py @@ -4,6 +4,7 @@ import grpc import pytest from google.protobuf import empty_pb2 as _empty_pb2 +from google.protobuf import timestamp_pb2 from pynumaflow import setup_logging from pynumaflow.accumulator import ( @@ -87,6 +88,69 @@ def request_generator_mixed(count, request, resetkey: bool = False): yield request +# Distinct, recognizable close-window values used to prove the EOF echoes the +# CLOSE request's window rather than the synthesized fallback window. +CLOSE_WINDOW_START_SECONDS = 1000000000 +CLOSE_WINDOW_END_SECONDS = 2000000000 +CLOSE_WINDOW_SLOT = "slot-7" + +# The accumulator's global window carries an "infinite" end (chrono MAX_UTC, ~year 262137) +# whose seconds exceed Python datetime's range. Core sends this on OPEN/APPEND. +GLOBAL_WINDOW_END_SECONDS = 8210266876799 + + +def request_generator_custom_close(count, request): + """Yields OPEN + APPEND requests, then a CLOSE whose keyed window carries + distinct start/end/slot values (mirroring core sending a real + max_event_time + timeout window on close).""" + for i in range(count): + if i == 0: + request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.OPEN + else: + request.operation.event = ( + accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND + ) + yield request + + # CLOSE carrying a distinct keyed window to be echoed back in the EOF response. + request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.CLOSE + request.operation.keyedWindow.start.CopyFrom( + timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_START_SECONDS) + ) + request.operation.keyedWindow.end.CopyFrom( + timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_END_SECONDS) + ) + request.operation.keyedWindow.slot = CLOSE_WINDOW_SLOT + yield request + + +def request_generator_infinite_then_close(count, request): + """OPEN + APPEND requests whose window end is the global 'infinite' sentinel + (chrono MAX_UTC, out of Python datetime range), then a CLOSE carrying a concrete, + representable window. Mirrors what real core sends to an accumulator.""" + request.operation.keyedWindow.end.CopyFrom( + timestamp_pb2.Timestamp(seconds=GLOBAL_WINDOW_END_SECONDS) + ) + for i in range(count): + if i == 0: + request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.OPEN + else: + request.operation.event = ( + accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND + ) + yield request + + request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.CLOSE + request.operation.keyedWindow.start.CopyFrom( + timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_START_SECONDS) + ) + request.operation.keyedWindow.end.CopyFrom( + timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_END_SECONDS) + ) + request.operation.keyedWindow.slot = CLOSE_WINDOW_SLOT + yield request + + def start_request() -> accumulator_pb2.AccumulatorRequest: event_time_timestamp, watermark_timestamp = get_time_args() window = accumulator_pb2.KeyedWindow( @@ -289,6 +353,68 @@ def test_accumulate_with_close(accumulator_stub) -> None: assert 1 == eof_count +def test_accumulate_close_echoes_eof_window(accumulator_stub) -> None: + """The EOF response must echo the exact KeyedWindow from the CLOSE request.""" + request = start_request() + generator_response = accumulator_stub.AccumulateFn( + request_iterator=request_generator_custom_close(count=5, request=request) + ) + + eof_count = 0 + for r in generator_response: + if r.EOF: + eof_count += 1 + assert r.window.start.seconds == CLOSE_WINDOW_START_SECONDS + assert r.window.end.seconds == CLOSE_WINDOW_END_SECONDS + assert r.window.slot == CLOSE_WINDOW_SLOT + assert list(r.window.keys) == ["test_key"] + + assert 1 == eof_count + + +def test_accumulate_infinite_window_end_does_not_crash(accumulator_stub) -> None: + """A global window with an 'infinite' end (out of Python datetime range) on OPEN/APPEND + must not crash decoding; the stream completes and the EOF echoes the CLOSE window.""" + request = start_request() + generator_response = accumulator_stub.AccumulateFn( + request_iterator=request_generator_infinite_then_close(count=5, request=request) + ) + + count = 0 + eof_count = 0 + for r in generator_response: + if r.EOF: + eof_count += 1 + assert r.window.start.seconds == CLOSE_WINDOW_START_SECONDS + assert r.window.end.seconds == CLOSE_WINDOW_END_SECONDS + assert r.window.slot == CLOSE_WINDOW_SLOT + elif r.payload.value: + count += 1 + + # All 5 datums were processed and exactly one EOF was emitted (no crash). + assert 5 == count + assert 1 == eof_count + + +def test_accumulate_eof_window_fallback_without_close(accumulator_stub) -> None: + """When the stream closes without a CLOSE (e.g. shutdown), the EOF window falls + back to the synthesized window (start=epoch(0), slot='slot-0').""" + request = start_request() + generator_response = accumulator_stub.AccumulateFn( + request_iterator=request_generator(count=5, request=request, send_close=False) + ) + + eof_count = 0 + for r in generator_response: + if r.EOF: + eof_count += 1 + assert r.window.start.seconds == 0 + assert r.window.slot == "slot-0" + assert list(r.window.keys) == ["test_key"] + + assert 1 == eof_count + + def test_accumulate_append_without_open(accumulator_stub) -> None: request = start_request_without_open() generator_response = None diff --git a/packages/pynumaflow/tests/accumulator/test_datatypes.py b/packages/pynumaflow/tests/accumulator/test_datatypes.py index 0f829024..ababcf3c 100644 --- a/packages/pynumaflow/tests/accumulator/test_datatypes.py +++ b/packages/pynumaflow/tests/accumulator/test_datatypes.py @@ -176,6 +176,30 @@ def test_accumulator_result_update_watermark_invalid_type(): result.update_watermark("not a datetime") +def test_accumulator_result_close_window_setter(): + result = AccumulatorResult( + None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc) + ) + # Not set until a CLOSE arrives. + assert result.close_window is None + window = KeyedWindow( + start=datetime.fromtimestamp(1662998400, timezone.utc), + end=datetime.fromtimestamp(1662998460, timezone.utc), + slot="slot-0", + keys=["key1"], + ) + result.close_window = window + assert result.close_window is window + + +def test_accumulator_result_close_window_setter_invalid_type(): + result = AccumulatorResult( + None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc) + ) + with pytest.raises(TypeError): + result.close_window = "not a keyed window" + + # --- TestAccumulatorRequest ---