diff --git a/src/datajoint/__init__.py b/src/datajoint/__init__.py index 4970b19d4..5ec72afdb 100644 --- a/src/datajoint/__init__.py +++ b/src/datajoint/__init__.py @@ -51,6 +51,8 @@ "get_codec", "ObjectRef", "NpyRef", + # SparkAdapter Codec Protocol + "SparkAdapter", # Storage Adapter API "StorageAdapter", "get_storage_adapter", @@ -85,6 +87,7 @@ from .instance import Instance, _ConfigProxy, _get_singleton_connection, _global_config, _check_thread_safe from .logging import logger from .objectref import ObjectRef +from .spark import SparkAdapter from .storage_adapter import StorageAdapter, get_storage_adapter from .schemas import _Schema, VirtualModule, list_schemas, virtual_schema from .autopopulate import AutoPopulate diff --git a/src/datajoint/spark.py b/src/datajoint/spark.py new file mode 100644 index 000000000..29397b64f --- /dev/null +++ b/src/datajoint/spark.py @@ -0,0 +1,92 @@ +""" +SparkAdapter Codec Protocol. + +Opt-in contract for codecs that adapt their decoded values to Spark-native +types — primitives, lists, dicts, and nested combinations. + +Codecs implement this method when they want their column eligible for +downstream typed-query systems (Spark SQL, Delta Sharing, BI tools). +Generic codecs like ```` and ```` deliberately do not +implement it: their decoded values can be arbitrary Python objects with +no fixed Spark-native shape. + +The contract is intentionally a Protocol rather than an abstract method +on :class:`datajoint.Codec`: + +- Generic codecs need no acknowledgement (no ``NotImplementedError`` stubs). +- Existing plugin codecs continue to work unchanged. +- Codec authors opt in by adding the method on their own release cadence. +- Consumers detect support structurally via ``isinstance(codec, SparkAdapter)``. + +See ``datajoint-docs/src/reference/specs/spark-adapter.md`` for the +normative specification (signature, return-value shape constraints, +worked codec examples). +""" + +from __future__ import annotations + +from typing import Any, Protocol, runtime_checkable + + +@runtime_checkable +class SparkAdapter(Protocol): + """ + A codec that adapts its decoded values to Spark-native types. + + Opt-in. Codecs implementing this method declare that their decoded + values can be expressed as primitives, lists, or dicts of the same — + i.e., shapes that map cleanly to Spark's ``StructType`` / + ``ArrayType`` / ``MapType``. + + Consumers (e.g., a Databricks silver-layer publish pipeline) check + ``isinstance(codec, SparkAdapter)`` per column to determine eligibility. + + Allowed return-value shapes: + + - Primitives: ``bool``, ``int``, ``float``, ``str``, ``bytes``, + ``None``, ``datetime.date``, ``datetime.datetime``. + - ``list[T]`` where ``T`` is any allowed shape (→ Spark ``ArrayType``). + - ``dict[str, T]`` where ``T`` is any allowed shape (→ Spark + ``StructType`` or ``MapType``, consumer-decided). + + NumPy arrays must be converted to lists; no tuples, sets, or custom + objects in the return value. + + Examples + -------- + A 1D float-array codec (shipped as a plugin, not in datajoint-python):: + + class FloatArrayCodec(dj.Codec): + name = "float_array" + + def encode(self, value, *, key=None, store_name=None): ... + def decode(self, stored, *, key=None) -> np.ndarray: ... + + def to_spark(self, decoded: np.ndarray, *, key=None) -> list[float]: + return decoded.tolist() # → Spark ARRAY + + Eligibility check:: + + from datajoint import SparkAdapter + isinstance(FloatArrayCodec(), SparkAdapter) # True + """ + + def to_spark(self, decoded: Any, *, key: dict | None = None) -> Any: + """ + Adapt a decoded codec value to a Spark-native shape. + + Parameters + ---------- + decoded : Any + The Python value produced by the codec's ``decode()``. + key : dict, optional + Optional context dict — same shape as ``Codec.encode``'s + ``key`` parameter. Most codecs ignore it. + + Returns + ------- + Any + A value composed entirely of allowed Spark-native shapes + (see class docstring). + """ + ... diff --git a/tests/unit/test_spark.py b/tests/unit/test_spark.py new file mode 100644 index 000000000..854d554a6 --- /dev/null +++ b/tests/unit/test_spark.py @@ -0,0 +1,105 @@ +""" +Unit tests for the SparkAdapter Codec Protocol (#1458). + +The Protocol is a structural-typing contract — codecs opt in by +implementing ``to_spark`` and consumers detect support via +``isinstance(codec, SparkAdapter)``. These tests cover the detection +behavior, not specific rendering implementations (which live downstream). +""" + +from __future__ import annotations + +import datajoint as dj +from datajoint.spark import SparkAdapter + + +class _SparkAdapterCodec: + """A minimal codec-like object that opts into the protocol.""" + + name = "fake_spark_adapter" + + def to_spark(self, decoded, *, key=None): + return list(decoded) if hasattr(decoded, "__iter__") else decoded + + +class _OpaqueCodec: + """A minimal codec-like object that does NOT opt into the protocol.""" + + name = "fake_opaque" + + def encode(self, value, *, key=None, store_name=None): + return bytes(value) + + def decode(self, stored, *, key=None): + return stored + + +def test_protocol_detects_opt_in(): + """A class implementing ``to_spark`` is detected as a SparkAdapter.""" + assert isinstance(_SparkAdapterCodec(), SparkAdapter) + + +def test_protocol_rejects_non_opt_in(): + """A class without ``to_spark`` is not detected as a SparkAdapter.""" + assert not isinstance(_OpaqueCodec(), SparkAdapter) + + +def test_protocol_exported_at_top_level(): + """``dj.SparkAdapter`` is accessible at the top level.""" + assert dj.SparkAdapter is SparkAdapter + + +def test_protocol_is_runtime_checkable(): + """The Protocol is decorated with @runtime_checkable (the test fixtures + above rely on this).""" + # Direct assertion: classes lacking runtime_checkable would raise TypeError + # on isinstance(). The previous tests would error rather than fail. + try: + isinstance(object(), SparkAdapter) + except TypeError: + raise AssertionError("SparkAdapter must be @runtime_checkable") + + +def test_blob_codec_is_not_spark_adapter(): + """The built-in codec is intentionally non-adapting per the spec.""" + from datajoint.builtin_codecs.blob import BlobCodec + + assert not isinstance(BlobCodec(), SparkAdapter) + + +def test_hash_codec_is_not_spark_adapter(): + """The built-in codec is intentionally non-adapting per the spec.""" + from datajoint.builtin_codecs.hash import HashCodec + + assert not isinstance(HashCodec(), SparkAdapter) + + +def test_to_spark_invocation_passes_through(): + """A codec implementing the method can be invoked and returns its result.""" + codec = _SparkAdapterCodec() + assert codec.to_spark([1, 2, 3]) == [1, 2, 3] + assert codec.to_spark(42) == 42 + + +def test_to_spark_method_accepts_key_kwarg(): + """The method signature accepts the optional ``key`` keyword argument.""" + codec = _SparkAdapterCodec() + # Should not raise + codec.to_spark([1, 2, 3], key={"some_pk": 1}) + + +def test_subclass_adding_to_spark_becomes_adapter(): + """A subclass of an opaque codec that adds the method becomes a SparkAdapter.""" + + class _OpaqueBase: + name = "base" + + def encode(self, value, *, key=None, store_name=None): + return b"" + + class _TypedSubclass(_OpaqueBase): + def to_spark(self, decoded, *, key=None): + return decoded + + assert not isinstance(_OpaqueBase(), SparkAdapter) + assert isinstance(_TypedSubclass(), SparkAdapter)