From b46baabde9b01481a4f524ee5a27f39b30ba1a16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dawid=20M=C4=99drek?= Date: Fri, 26 Jun 2026 15:51:15 +0200 Subject: [PATCH 1/5] Negotiate the TABLETS_ROUTING_V2 protocol extension Add per-connection negotiation of the TABLETS_ROUTING_V2 extension, the successor to TABLETS_ROUTING_V1. When the server advertises it in the SUPPORTED response, the driver echoes it back during STARTUP to opt in; a driver that negotiates v2 does not negotiate v1. While the feature is experimental the wire name carries the `_EXPERIMENTAL` suffix (TABLETS_ROUTING_V2_EXPERIMENTAL), and the server only advertises it when started with the `strongly-consistent-tables` experimental feature enabled. Also add the optional trailing tablet_version_block byte to the EXECUTE message body, written only when set, so later commits can carry the cached tablet version to the server. --- cassandra/protocol.py | 6 +++- cassandra/protocol_features.py | 22 ++++++++++++-- tests/unit/test_protocol_features.py | 44 +++++++++++++++++++++++++++- 3 files changed, 67 insertions(+), 5 deletions(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 4628c7ee0e..6826374525 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -629,9 +629,11 @@ class ExecuteMessage(_QueryMessage): def __init__(self, query_id, query_params, consistency_level, serial_consistency_level=None, fetch_size=None, paging_state=None, timestamp=None, skip_meta=False, - continuous_paging_options=None, result_metadata_id=None): + continuous_paging_options=None, result_metadata_id=None, + tablet_version_block=None): self.query_id = query_id self.result_metadata_id = result_metadata_id + self.tablet_version_block = tablet_version_block super(ExecuteMessage, self).__init__(query_params, consistency_level, serial_consistency_level, fetch_size, paging_state, timestamp, skip_meta, continuous_paging_options) @@ -643,6 +645,8 @@ def send_body(self, f, protocol_version): if ProtocolVersion.uses_prepared_metadata(protocol_version): write_string(f, self.result_metadata_id) self._write_query_params(f, protocol_version) + if self.tablet_version_block is not None: + write_byte(f, self.tablet_version_block) CUSTOM_TYPE = object() diff --git a/cassandra/protocol_features.py b/cassandra/protocol_features.py index 877998be7d..af954f05e4 100644 --- a/cassandra/protocol_features.py +++ b/cassandra/protocol_features.py @@ -10,19 +10,26 @@ LWT_OPTIMIZATION_META_BIT_MASK = "LWT_OPTIMIZATION_META_BIT_MASK" RATE_LIMIT_ERROR_EXTENSION = "SCYLLA_RATE_LIMIT_ERROR" TABLETS_ROUTING_V1 = "TABLETS_ROUTING_V1" +# The server advertises and expects this exact extension name in SUPPORTED/STARTUP +# (see scylladb transport/cql_protocol_extension.cc). While the feature is gated +# behind the server's `strongly-consistent-tables` experimental flag, the wire +# name carries the `_EXPERIMENTAL` suffix. +TABLETS_ROUTING_V2 = "TABLETS_ROUTING_V2_EXPERIMENTAL" class ProtocolFeatures(object): rate_limit_error = None shard_id = 0 sharding_info = None tablets_routing_v1 = False + tablets_routing_v2 = False lwt_info = None - def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, lwt_info=None): + def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, tablets_routing_v2=False, lwt_info=None): self.rate_limit_error = rate_limit_error self.shard_id = shard_id self.sharding_info = sharding_info self.tablets_routing_v1 = tablets_routing_v1 + self.tablets_routing_v2 = tablets_routing_v2 self.lwt_info = lwt_info @staticmethod @@ -30,8 +37,9 @@ def parse_from_supported(supported): rate_limit_error = ProtocolFeatures.maybe_parse_rate_limit_error(supported) shard_id, sharding_info = ProtocolFeatures.parse_sharding_info(supported) tablets_routing_v1 = ProtocolFeatures.parse_tablets_info(supported) + tablets_routing_v2 = ProtocolFeatures.parse_tablets_v2_info(supported) lwt_info = ProtocolFeatures.parse_lwt_info(supported) - return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, lwt_info) + return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, tablets_routing_v2, lwt_info) @staticmethod def maybe_parse_rate_limit_error(supported): @@ -53,7 +61,11 @@ def get_cql_extension_field(vals, key): def add_startup_options(self, options): if self.rate_limit_error is not None: options[RATE_LIMIT_ERROR_EXTENSION] = "" - if self.tablets_routing_v1: + # Only one of TABLETS_ROUTING_V{1,2} should be negotiated + # per connection. Hence the if-else branch. + if self.tablets_routing_v2: + options[TABLETS_ROUTING_V2] = "" + elif self.tablets_routing_v1: options[TABLETS_ROUTING_V1] = "" if self.lwt_info is not None: options[LWT_ADD_METADATA_MARK] = str(self.lwt_info.lwt_meta_bit_mask) @@ -81,6 +93,10 @@ def parse_sharding_info(options): def parse_tablets_info(options): return TABLETS_ROUTING_V1 in options + @staticmethod + def parse_tablets_v2_info(options): + return TABLETS_ROUTING_V2 in options + @staticmethod def parse_lwt_info(options): value_list = options.get(LWT_ADD_METADATA_MARK, [None]) diff --git a/tests/unit/test_protocol_features.py b/tests/unit/test_protocol_features.py index 895c384f7e..f754a677fe 100644 --- a/tests/unit/test_protocol_features.py +++ b/tests/unit/test_protocol_features.py @@ -2,7 +2,7 @@ import logging -from cassandra.protocol_features import ProtocolFeatures +from cassandra.protocol_features import ProtocolFeatures, TABLETS_ROUTING_V1, TABLETS_ROUTING_V2 LOGGER = logging.getLogger(__name__) @@ -22,3 +22,45 @@ class OptionsHolder(object): assert protocol_features.rate_limit_error == 123 assert protocol_features.shard_id == 0 assert protocol_features.sharding_info is None + + def test_tablets_routing_v2_negotiation(self): + """V2 is detected from SUPPORTED and subsumes V1 in STARTUP options.""" + options = { + TABLETS_ROUTING_V1: [''], + TABLETS_ROUTING_V2: [''], + } + features = ProtocolFeatures.parse_from_supported(options) + assert features.tablets_routing_v1 is True + assert features.tablets_routing_v2 is True + + # V2 subsumes V1: only TABLETS_ROUTING_V2 should appear in startup. + startup = {} + features.add_startup_options(startup) + assert TABLETS_ROUTING_V2 in startup + assert TABLETS_ROUTING_V1 not in startup + + def test_tablets_routing_v1_only(self): + """When server only advertises V1, only V1 is negotiated.""" + options = { + TABLETS_ROUTING_V1: [''], + } + features = ProtocolFeatures.parse_from_supported(options) + assert features.tablets_routing_v1 is True + assert features.tablets_routing_v2 is False + + startup = {} + features.add_startup_options(startup) + assert TABLETS_ROUTING_V1 in startup + assert TABLETS_ROUTING_V2 not in startup + + def test_no_tablets_routing(self): + """When server advertises neither V1 nor V2.""" + options = {} + features = ProtocolFeatures.parse_from_supported(options) + assert features.tablets_routing_v1 is False + assert features.tablets_routing_v2 is False + + startup = {} + features.add_startup_options(startup) + assert TABLETS_ROUTING_V1 not in startup + assert TABLETS_ROUTING_V2 not in startup From a40979f0bb7c0065fcf41e4e8f6014e6a74dd3a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dawid=20M=C4=99drek?= Date: Fri, 26 Jun 2026 15:52:26 +0200 Subject: [PATCH 2/5] Track tablet_version and encode the tablet_version_block Store the server-provided 64-bit tablet_version on each cached Tablet (None until learned) and add helpers to encode it into the one-byte tablet_version_block exchanged on the wire: * choose_tablet_version_block() packs a randomly chosen block index in the high nibble and that block's value in the low nibble, matching the server's locator::compare_tablet_version_block layout. Blocks are indexed from the least significant bits, so block i covers bits [i*4, i*4 + 4) of the version. A random index avoids any shared mutable counter on the hot path while still probing every nibble often enough to detect a server-side version change quickly. * random_tablet_version_block() returns a random byte for cold start, when no version is cached yet. --- cassandra/tablets.py | 45 ++++++++++++++++++++++++--- tests/unit/test_tablets.py | 62 +++++++++++++++++++++++++++++++++++++- 2 files changed, 101 insertions(+), 6 deletions(-) diff --git a/cassandra/tablets.py b/cassandra/tablets.py index 96e61a50c2..a3ecee5160 100644 --- a/cassandra/tablets.py +++ b/cassandra/tablets.py @@ -1,5 +1,6 @@ from bisect import bisect_left from operator import attrgetter +from random import getrandbits from threading import Lock from typing import Optional from uuid import UUID @@ -9,6 +10,38 @@ _get_last_token = attrgetter("last_token") +def choose_tablet_version_block(tablet_version): + """ + Encode a tablet_version_block byte from a cached tablet_version. + Picks a block index at random across calls. + Returns an int in [0, 255]. + + The byte layout matches the server (see locator::compare_tablet_version_block): + the high nibble is the block index, the low nibble is the value of that block. + Blocks are indexed from the least significant bits to the most significant ones, + so block `idx` occupies bits [idx*4, idx*4 + 4). + + The index is chosen randomly rather than round-robin on purpose: this runs on + the hot path of every V2 request, so we avoid any shared mutable counter (which + would be a cross-core contention point) and only need each of the 16 nibbles to + be probed often enough that a server-side version change is detected quickly. + """ + # Pick the block index in [0, 15]; getrandbits(4) is a fast C call with no + # application-level shared state. + idx = getrandbits(4) + # Extract the 4-bit nibble at block index `idx` (0 = least significant). + shift = idx * 4 + nibble = (tablet_version >> shift) & 0xF + return (idx << 4) | nibble + + +def random_tablet_version_block(): + """ + Generate a random tablet_version_block byte for cold start. + """ + return getrandbits(8) + + class Tablet(object): """ Represents a single ScyllaDB tablet. @@ -18,15 +51,17 @@ class Tablet(object): first_token = 0 last_token = 0 replicas = None + tablet_version = None # uint64 hash; None means unknown (cold start) - def __init__(self, first_token=0, last_token=0, replicas=None): + def __init__(self, first_token=0, last_token=0, replicas=None, tablet_version=None): self.first_token = first_token self.last_token = last_token self.replicas = replicas + self.tablet_version = tablet_version def __str__(self): - return "" \ - % (self.first_token, self.last_token, self.replicas) + return "" \ + % (self.first_token, self.last_token, self.replicas, self.tablet_version) __repr__ = __str__ @staticmethod @@ -34,9 +69,9 @@ def _is_valid_tablet(replicas): return replicas is not None and len(replicas) != 0 @staticmethod - def from_row(first_token, last_token, replicas): + def from_row(first_token, last_token, replicas, tablet_version=None): if Tablet._is_valid_tablet(replicas): - tablet = Tablet(first_token, last_token, replicas) + tablet = Tablet(first_token, last_token, replicas, tablet_version) return tablet return None diff --git a/tests/unit/test_tablets.py b/tests/unit/test_tablets.py index 7a40e7de4d..7e45f35039 100644 --- a/tests/unit/test_tablets.py +++ b/tests/unit/test_tablets.py @@ -1,6 +1,6 @@ import unittest -from cassandra.tablets import Tablets, Tablet +from cassandra.tablets import Tablets, Tablet, choose_tablet_version_block, random_tablet_version_block class TabletsTest(unittest.TestCase): def compare_ranges(self, tablets, ranges): @@ -124,3 +124,63 @@ def __init__(self, v): # Token value 50 is not > first_token (100) of the tablet whose # last_token (200) is >= 50, so no match. self.assertIsNone(tablets.get_tablet_for_key("ks", "tb", Token(50))) + + +class TabletVersionBlockTest(unittest.TestCase): + """Tests for tablet_version_block encoding used by TABLETS_ROUTING_V2.""" + + def _server_block_matches(self, version, block): + """Reimplements the server's locator::compare_tablet_version_block.""" + block_value = block & 0x0F + block_index = (block & 0xF0) >> 4 + hash_block = (version >> (block_index * 4)) & 0x0F + return hash_block == block_value + + def test_choose_tablet_version_block_matches_server(self): + """Every block produced by the driver must match the server's check.""" + version = 0x0123456789ABCDEF + # The index is chosen randomly; sample enough times to exercise many indices. + for _ in range(256): + block = choose_tablet_version_block(version) + self.assertTrue(self._server_block_matches(version, block), + f"Block 0x{block:02X} did not match server check for version 0x{version:016X}") + + def test_choose_tablet_version_block_index_in_range_and_value_correct(self): + """The block index must be in [0, 15] and its value nibble must equal the + corresponding nibble of the version (regardless of which index is picked).""" + version = 0x0123456789ABCDEF + for _ in range(256): + block = choose_tablet_version_block(version) + idx = (block >> 4) & 0xF + value = block & 0xF + self.assertTrue(0 <= idx <= 15) + self.assertEqual(value, (version >> (idx * 4)) & 0xF) + + def test_choose_tablet_version_block_covers_all_indices(self): + """Over many calls the random index selection should probe every block + index, so that any server-side version change is eventually detected.""" + version = 0xFFFFFFFFFFFFFFFF # All nibbles are 0xF + seen_indices = set() + # 16 indices; 1000 draws makes a missing index astronomically unlikely. + for _ in range(1000): + block = choose_tablet_version_block(version) + seen_indices.add((block >> 4) & 0xF) + self.assertEqual(seen_indices, set(range(16))) + + def test_random_tablet_version_block_returns_byte(self): + """Verify random_tablet_version_block returns a value in [0, 255].""" + for _ in range(100): + block = random_tablet_version_block() + self.assertIsInstance(block, int) + self.assertGreaterEqual(block, 0) + self.assertLessEqual(block, 255) + + def test_from_row_stores_tablet_version(self): + """Tablet.from_row stores the tablet_version it is given (the V2 payload field).""" + version = 0xDEADBEEFCAFEBABE + tablet = Tablet.from_row(-100, 100, [("host1", 0), ("host2", 1)], tablet_version=version) + self.assertIsNotNone(tablet) + self.assertEqual(tablet.tablet_version, version) + self.assertEqual(tablet.first_token, -100) + self.assertEqual(tablet.last_token, 100) + self.assertEqual(len(tablet.replicas), 2) From 5d2d53824ea0696598280afc3b275f346568de2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dawid=20M=C4=99drek?= Date: Fri, 26 Jun 2026 15:52:26 +0200 Subject: [PATCH 3/5] Expose a strongly_consistent flag on keyspace metadata Add KeyspaceMetadata.strongly_consistent, derived from the per-keyspace `consistency` option in system_schema.scylla_keyspaces. A keyspace is strongly consistent when its consistency is `local` or `global` (null/`eventual` means eventually consistent). The lookup is cached per schema refresh and degrades to "eventually consistent" on non-Scylla clusters or older Scylla versions that lack the table or column. This flag lets the leader-aware routing logic tell which tablet tables actually have a Raft leader. --- cassandra/metadata.py | 63 ++++++++ .../standard/test_tablets_routing_v2.py | 140 ++++++++++++++++++ 2 files changed, 203 insertions(+) create mode 100644 tests/integration/standard/test_tablets_routing_v2.py diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 43399b7152..0563d56a20 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -801,6 +801,14 @@ class KeyspaceMetadata(object): A string indicating whether a graph engine is enabled for this keyspace (Core/Classic). """ + strongly_consistent = False + """ + A boolean indicating whether this keyspace uses strongly-consistent (Raft-based) + tablets. ``True`` only for ScyllaDB keyspaces whose ``consistency`` option is + not eventual (i.e. ``local`` or ``global``). ``False`` for eventually-consistent + keyspaces and for non-ScyllaDB clusters. + """ + _exc_info = None """ set if metadata parsing failed """ @@ -815,6 +823,7 @@ def __init__(self, name, durable_writes, strategy_class, strategy_options, graph self.aggregates = {} self.views = {} self.graph_engine = graph_engine + self.strongly_consistent = False @property def is_graph_enabled(self): @@ -2577,6 +2586,11 @@ class SchemaParserV3(SchemaParserV22): _SELECT_AGGREGATES = "SELECT * FROM system_schema.aggregates" _SELECT_VIEWS = "SELECT * FROM system_schema.views" + # ScyllaDB-only: per-keyspace consistency option. The column is null for + # eventually-consistent keyspaces (and the whole table is absent on Cassandra + # and on Scylla versions without strongly-consistent tablets). + _SELECT_SCYLLA_KEYSPACES = "SELECT keyspace_name, consistency FROM system_schema.scylla_keyspaces" + def _is_not_scylla(self): """Check if NOT connected to ScyllaDB by checking for shard awareness.""" return getattr(getattr(self.connection, 'features', None), 'shard_id', None) is None @@ -2610,12 +2624,61 @@ def __init__(self, connection, timeout, fetch_size, metadata_request_timeout): self.indexes_result = [] self.keyspace_table_index_rows = defaultdict(lambda: defaultdict(list)) self.keyspace_view_rows = defaultdict(list) + self._scylla_consistency_cache = None + + @staticmethod + def _is_strongly_consistent(consistency): + # The server stores the keyspace consistency option as 'eventual', + # 'local', or 'global' (null == eventual). Only non-eventual keyspaces + # have a leader, so treat those as strongly consistent. + return consistency not in (None, "eventual") + + def _get_scylla_keyspaces_consistency(self): + """ + Return a ``{keyspace_name: consistency}`` map read from + ``system_schema.scylla_keyspaces``. + + Only ScyllaDB has this table, and only for keyspaces with a non-default + consistency option. Returns ``{}`` on non-Scylla clusters or when the + table/column is unavailable (older Scylla), in which case every keyspace + is treated as eventually consistent. The result is cached per parser + instance so it is fetched at most once per schema refresh. + """ + if self._scylla_consistency_cache is not None: + return self._scylla_consistency_cache + + if self._is_not_scylla(): + self._scylla_consistency_cache = {} + return self._scylla_consistency_cache + + consistency_by_ks = {} + try: + rows = self._query_build_rows(self._SELECT_SCYLLA_KEYSPACES, lambda row: row) + consistency_by_ks = {row["keyspace_name"]: row.get("consistency") for row in rows} + except Exception: + log.debug("Could not read system_schema.scylla_keyspaces; treating all " + "keyspaces as eventually consistent", exc_info=True) + + self._scylla_consistency_cache = consistency_by_ks + return consistency_by_ks + + def _set_strong_consistency(self, keyspace_meta): + consistency = self._get_scylla_keyspaces_consistency().get(keyspace_meta.name) + keyspace_meta.strongly_consistent = self._is_strongly_consistent(consistency) + return keyspace_meta + + def get_keyspace(self, keyspaces, keyspace): + keyspace_meta = super(SchemaParserV3, self).get_keyspace(keyspaces, keyspace) + if keyspace_meta is not None: + self._set_strong_consistency(keyspace_meta) + return keyspace_meta def get_all_keyspaces(self): for keyspace_meta in super(SchemaParserV3, self).get_all_keyspaces(): for row in self.keyspace_view_rows[keyspace_meta.name]: view_meta = self._build_view_metadata(row) keyspace_meta._add_view_metadata(view_meta) + self._set_strong_consistency(keyspace_meta) yield keyspace_meta def get_table(self, keyspaces, keyspace, table): diff --git a/tests/integration/standard/test_tablets_routing_v2.py b/tests/integration/standard/test_tablets_routing_v2.py new file mode 100644 index 0000000000..5ccd2ec430 --- /dev/null +++ b/tests/integration/standard/test_tablets_routing_v2.py @@ -0,0 +1,140 @@ +""" +End-to-end tests for TABLETS_ROUTING_V2 against a V2-capable Scylla build. + +Unlike the unit tests in tests/unit/test_tablets.py and tests/unit/test_policies.py, +these tests cross the driver<->server boundary: they validate that the driver +negotiates the extension, parses the server's `tablets-routing-v2` payload with +the correct field layout, and that the tablet_version_block it sends actually +matches the server's encoding. + +The whole module is opt-in: the server only advertises the extension when started +with the `strongly-consistent-tables` experimental feature, and it is exchanged on +the wire under the name `TABLETS_ROUTING_V2_EXPERIMENTAL`. When run against a +server that does not advertise it (e.g. a released Scylla), every test skips. +""" + +import pytest + +from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT +from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy + +from tests.integration import PROTOCOL_VERSION, use_cluster + + +def setup_module(module): + try: + use_cluster('tablets_routing_v2', [3], start=True, set_keyspace=False, + configuration_options={ + # `strongly-consistent-tables` is what gates the server's + # advertisement of TABLETS_ROUTING_V2_EXPERIMENTAL + # (see scylladb transport/controller.cc). + 'experimental_features': ['lwt', 'udf', 'strongly-consistent-tables'], + # When V2 is negotiated but a request omits the mandatory + # tablet_version_block, the server calls on_internal_error + # (cql3/statements/select_statement.cc). With this set to + # False that raises a catchable error returned to the client + # instead of aborting the node (see test_no_block_*). + 'abort_on_internal_error': False, + }) + except Exception as exc: + pytest.skip("Could not start a Scylla cluster with the " + "'strongly-consistent-tables' experimental feature: {}".format(exc), + allow_module_level=True) + + +class TestTabletsRoutingV2Integration: + @classmethod + def setup_class(cls): + cls.cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], + protocol_version=PROTOCOL_VERSION, + execution_profiles={ + EXEC_PROFILE_DEFAULT: ExecutionProfile( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) + }, + reconnection_policy=ConstantReconnectionPolicy(1)) + cls.session = cls.cluster.connect() + + @classmethod + def teardown_class(cls): + cls.cluster.shutdown() + + def _v2_negotiated(self): + return bool(self.session.cluster.control_connection._tablets_routing_v2) + + def _skip_if_no_v2(self): + if not self._v2_negotiated(): + pytest.skip("Server did not advertise TABLETS_ROUTING_V2_EXPERIMENTAL; " + "needs a build started with the 'strongly-consistent-tables' feature") + + def test_strongly_consistent_flag_tracks_dynamic_keyspace_changes(self): + """ + The driver reads system_schema.scylla_keyspaces on every schema refresh + and sets KeyspaceMetadata.strongly_consistent for each keyspace: True when + its `consistency` option is non-eventual ('global' or 'local'), and False + otherwise. Crucially, an eventually-consistent keyspace is *absent* from + scylla_keyspaces, so "not in the map" is exactly what maps to + strongly_consistent == False (see SchemaParserV3._set_strong_consistency / + _is_strongly_consistent). + + The flag is not a one-off computed at connect time -- it has to track the + live schema. Because the driver refreshes its metadata synchronously in + response to a DDL it executes (ResponseFuture handles + RESULT_KIND_SCHEMA_CHANGE by refreshing before returning), a keyspace + created or dropped *after* connecting is immediately reflected in + cluster.metadata.keyspaces, with no sleep or manual refresh required. A + keyspace created by some *other* client is picked up through the very same + refresh path, driven by the control connection's schema-change events + (subject to the schema refresh window); this test drives the changes + through the connected session so the assertions stay deterministic. + """ + self._skip_if_no_v2() + + metadata = self.session.cluster.metadata + ec_ks = "dyn_ec_ks" # eventually consistent (no consistency clause) + sc_ks = "dyn_sc_ks" # strongly consistent (Raft, consistency='global') + + # Start from a known-clean slate so the test is repeatable. + self.session.execute("DROP KEYSPACE IF EXISTS {0}".format(ec_ks)) + self.session.execute("DROP KEYSPACE IF EXISTS {0}".format(sc_ks)) + try: + assert ec_ks not in metadata.keyspaces + assert sc_ks not in metadata.keyspaces + + # Create an eventually-consistent keyspace after connecting: it shows + # up in the map with strongly_consistent == False. This exercises the + # "absent from scylla_keyspaces -> eventual" path. + self.session.execute( + "CREATE KEYSPACE {0} WITH replication = " + "{{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} " + "AND tablets = {{'initial': 1}}".format(ec_ks)) + assert ec_ks in metadata.keyspaces + assert metadata.keyspaces[ec_ks].strongly_consistent is False + + # Create a strongly-consistent keyspace: same map, flag now True. + # RF=3 across the 3 nodes gives a well-formed Raft group. + self.session.execute( + "CREATE KEYSPACE {0} WITH replication = " + "{{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} " + "AND tablets = {{'initial': 1}} AND consistency = 'global'".format(sc_ks)) + assert sc_ks in metadata.keyspaces + assert metadata.keyspaces[sc_ks].strongly_consistent is True + # The previously-created keyspace keeps its (False) flag. + assert metadata.keyspaces[ec_ks].strongly_consistent is False + + # A full refresh (the bulk get_all_keyspaces path, as opposed to the + # single-keyspace get_keyspace path the DDL above exercised) agrees. + self.session.cluster.refresh_schema_metadata() + assert metadata.keyspaces[ec_ks].strongly_consistent is False + assert metadata.keyspaces[sc_ks].strongly_consistent is True + + # Dropping a keyspace removes it from the map and leaves the other + # keyspace's flag untouched. + self.session.execute("DROP KEYSPACE {0}".format(sc_ks)) + assert sc_ks not in metadata.keyspaces + assert metadata.keyspaces[ec_ks].strongly_consistent is False + + self.session.execute("DROP KEYSPACE {0}".format(ec_ks)) + assert ec_ks not in metadata.keyspaces + finally: + self.session.execute("DROP KEYSPACE IF EXISTS {0}".format(ec_ks)) + self.session.execute("DROP KEYSPACE IF EXISTS {0}".format(sc_ks)) From 4be88bbfbe637c82a925854b8806705e6e8b341d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dawid=20M=C4=99drek?= Date: Fri, 26 Jun 2026 15:52:26 +0200 Subject: [PATCH 4/5] Route requests to the tablet leader for strongly-consistent tables With TABLETS_ROUTING_V2 the server returns, on a tablet_version mismatch, the tablet's replica set with the Raft leader first plus the new tablet_version. Use this to route strongly-consistent reads and writes straight to the leader: * Every EXECUTE on a V2 connection carries a tablet_version_block computed from the cached version (or a random byte on cold start and for non-single-partition requests). The block is attached per connection, keyed off the connection that actually serves the request, so a rolling upgrade that mixes v1/v2 connections never desyncs the frame. * On the response, the routing payload is parsed according to what the serving connection negotiated; the v2 tuple additionally carries the tablet_version, which is stored back on the tablet. * TokenAwarePolicy yields the leader (the first replica) first, but only for strongly-consistent keyspaces -- a tablet_version is assigned to eventually-consistent tablet tables too and must not be mistaken for a leader hint. To keep this hot path cheap, the ring token is memoized once per statement (Statement.routing_token) and reused by the load balancing policy, the shard selection in the pool, and the tablet_version_block computation, instead of re-hashing the routing key three times. --- cassandra/cluster.py | 120 ++++++++++++++++--- cassandra/policies.py | 34 +++++- cassandra/pool.py | 33 ++++-- cassandra/query.py | 25 ++++ tests/unit/test_policies.py | 227 ++++++++++++++++++++++++++++++++++++ 5 files changed, 409 insertions(+), 30 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 1181c6f686..db18efa049 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -85,7 +85,7 @@ named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET, HostTargetingStatement) from cassandra.marshal import int64_pack -from cassandra.tablets import Tablet, Tablets +from cassandra.tablets import Tablet, Tablets, choose_tablet_version_block, random_tablet_version_block from cassandra.timestamps import MonotonicTimestampGenerator from cassandra.util import _resolve_contact_points_to_string_map, Version, maybe_add_timeout_to_query @@ -3059,6 +3059,8 @@ def _create_response_future(self, query, parameters, trace, custom_payload, continuous_paging_options, statement_keyspace) elif isinstance(query, BoundStatement): prepared_statement = query.prepared_statement + # The tablet_version_block is filled in per-target-host at send time + # (see ResponseFuture._query), because V2 is negotiated per connection. message = ExecuteMessage( prepared_statement.query_id, query.values, cl, serial_cl, fetch_size, paging_state, timestamp, @@ -3093,6 +3095,37 @@ def _create_response_future(self, query, parameters, trace, custom_payload, load_balancer=load_balancing_policy, start_time=start_time, speculative_execution_plan=spec_exec_plan, continuous_paging_state=None, host=host) + def _compute_tablet_version_block(self, query): + """ + Compute the tablet_version_block byte for a BoundStatement. + Returns an int in [0, 255] or None if V2 should not be used. + """ + routing_key = query.routing_key + if routing_key is None: + return random_tablet_version_block() + + keyspace = query.keyspace or self.keyspace + table = query.table + if not keyspace or not table: + return random_tablet_version_block() + + # Skip the Murmur3 token hash + tablet lookup when we have no cached + # tablets for this table (vnode tables, or tablet tables on cold start); + # both correctly fall back to a random block below. + if not self.cluster.metadata._tablets.table_has_tablets(keyspace, table): + return random_tablet_version_block() + + token_map = self.cluster.metadata.token_map + if token_map is None: + return random_tablet_version_block() + + t = query.routing_token(token_map.token_class) + tablet = self.cluster.metadata._tablets.get_tablet_for_key(keyspace, table, t) + if tablet is None or tablet.tablet_version is None: + return random_tablet_version_block() + + return choose_tablet_version_block(tablet.tablet_version) + def get_execution_profile(self, name): """ Returns the execution profile associated with the provided ``name``. @@ -3765,6 +3798,7 @@ class PeersQueryType(object): _uses_peers_v2 = True _tablets_routing_v1 = False + _tablets_routing_v2 = False # for testing purposes _time = time @@ -3899,6 +3933,7 @@ def _try_connect(self, endpoint): else datetime.timedelta(seconds=self._cluster.metadata_request_timeout) self._tablets_routing_v1 = connection.features.tablets_routing_v1 + self._tablets_routing_v2 = connection.features.tablets_routing_v2 # use weak references in both directions # _clear_watcher will be called when this ControlConnection is about to be finalized @@ -4713,6 +4748,7 @@ class ResponseFuture(object): _host = None _control_connection_query_attempted = False _TABLET_ROUTING_CTYPE = None + _TABLET_ROUTING_V2_CTYPE = None _warned_timeout = False @@ -5002,7 +5038,12 @@ def _query(self, host, message=None, cb=None): try: # TODO get connectTimeout from cluster settings if self.query: - connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key, keyspace=self.query.keyspace, table=self.query.table) + # Pass the statement so the pool can reuse the ring token it + # memoized for this request instead of re-hashing the routing key. + connection, request_id = pool.borrow_connection( + timeout=2.0, routing_key=self.query.routing_key, + keyspace=self.query.keyspace, table=self.query.table, + query=self.query) else: connection, request_id = pool.borrow_connection(timeout=2.0) self._connection = connection @@ -5011,6 +5052,28 @@ def _query(self, host, message=None, cb=None): if cb is None: cb = partial(self._set_result, host, connection, pool) + if isinstance(message, ExecuteMessage): + # Attach the tablet_version_block only when the *specific connection* + # we are about to send on negotiated TABLETS_ROUTING_V2. The server + # reads the trailing tablet_version_block byte only on connections + # that negotiated V2 (gated on the cluster-wide feature), so keying + # off the borrowed connection -- which is already in hand here, since + # borrow_connection() ran above -- is both necessary and sufficient: + # * a V2 connection always gets the block, even if this pool was + # created (and any cached flag latched) before the cluster + # feature was enabled, e.g. mid rolling-upgrade; + # * a non-V2 connection never gets it, even if a sibling shard + # connection in the same pool already negotiated V2 -- which can + # happen transiently while connections opened before and after + # the feature flip coexist. Attaching the block to a non-V2 + # connection would leave an unread trailing byte and desync the + # frame, so a pool-level flag cannot get this right regardless of + # how it is latched. + if connection.features.tablets_routing_v2: + message.tablet_version_block = self.session._compute_tablet_version_block(self.query) + else: + message.tablet_version_block = None + self.request_encoded_size = connection.send_msg(message, request_id, cb=cb, encoder=self._protocol_handler.encode_message, decoder=self._protocol_handler.decode_message, @@ -5128,21 +5191,44 @@ def _set_result(self, host, connection, pool, response): self._warnings = getattr(response, 'warnings', None) self._custom_payload = getattr(response, 'custom_payload', None) - if self._custom_payload and self.session.cluster.control_connection._tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload: - protocol = self.session.cluster.protocol_version - info = self._custom_payload.get('tablets-routing-v1') - ctype = ResponseFuture._TABLET_ROUTING_CTYPE - if ctype is None: - ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))') - ResponseFuture._TABLET_ROUTING_CTYPE = ctype - tablet_routing_info = ctype.from_binary(info, protocol) - first_token = tablet_routing_info[0] - last_token = tablet_routing_info[1] - tablet_replicas = tablet_routing_info[2] - tablet = Tablet.from_row(first_token, last_token, tablet_replicas) - keyspace = self.query.keyspace - table = self.query.table - self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet) + if self._custom_payload and connection is not None: + # Parse the routing payload according to what the connection that + # *served this request* negotiated, not the control connection: + # during a rolling upgrade connections may differ, and each + # payload key matches the extension its own connection negotiated. + if connection.features.tablets_routing_v2 and 'tablets-routing-v2' in self._custom_payload: + protocol = self.session.cluster.protocol_version + info = self._custom_payload.get('tablets-routing-v2') + ctype = ResponseFuture._TABLET_ROUTING_V2_CTYPE + if ctype is None: + ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)), LongType)') + ResponseFuture._TABLET_ROUTING_V2_CTYPE = ctype + tablet_routing_info = ctype.from_binary(info, protocol) + first_token = tablet_routing_info[0] + last_token = tablet_routing_info[1] + tablet_replicas = tablet_routing_info[2] + tablet_version = tablet_routing_info[3] + tablet = Tablet.from_row(first_token, last_token, tablet_replicas, tablet_version) + keyspace = self.query.keyspace + table = self.query.table + if tablet: + self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet) + elif connection.features.tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload: + protocol = self.session.cluster.protocol_version + info = self._custom_payload.get('tablets-routing-v1') + ctype = ResponseFuture._TABLET_ROUTING_CTYPE + if ctype is None: + ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))') + ResponseFuture._TABLET_ROUTING_CTYPE = ctype + tablet_routing_info = ctype.from_binary(info, protocol) + first_token = tablet_routing_info[0] + last_token = tablet_routing_info[1] + tablet_replicas = tablet_routing_info[2] + tablet = Tablet.from_row(first_token, last_token, tablet_replicas) + keyspace = self.query.keyspace + table = self.query.table + if tablet: + self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet) if isinstance(response, ResultMessage): if response.kind == RESULT_KIND_SET_KEYSPACE: diff --git a/cassandra/policies.py b/cassandra/policies.py index ceb5ebdc45..e92181e0ee 100644 --- a/cassandra/policies.py +++ b/cassandra/policies.py @@ -503,14 +503,30 @@ def make_query_plan(self, working_keyspace=None, query=None): return replicas = [] - tablet = self._cluster_metadata._tablets.get_tablet_for_key( - keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(query.routing_key)) + leader_host = None + token = query.routing_token(self._cluster_metadata.token_map.token_class) + tablet = self._cluster_metadata._tablets.get_tablet_for_key(keyspace, query.table, token) if tablet is not None: replicas_mapped = set(map(lambda r: r[0], tablet.replicas)) child_plan = child.make_query_plan(keyspace, query) replicas = [host for host in child_plan if host.host_id in replicas_mapped] + + # The leader concept only exists for strongly-consistent keyspaces. + # TABLETS_ROUTING_V2 assigns a tablet_version to *every* tablet table + # (eventually- and strongly-consistent alike), so the version alone + # must not be used to infer a leader. For strongly-consistent + # keyspaces the first replica is the leader; yield it first for + # leader-aware routing. Eventually-consistent keyspaces keep normal + # token-aware/shuffled ordering. + ks_meta = self._cluster_metadata.keyspaces.get(keyspace) + if ks_meta is not None and ks_meta.strongly_consistent and tablet.replicas: + leader_host_id = tablet.replicas[0][0] + for host in replicas: + if host.host_id == leader_host_id: + leader_host = host + break else: replicas = self._cluster_metadata.get_replicas(keyspace, query.routing_key) @@ -523,10 +539,18 @@ def yield_in_order(hosts): if replica.is_up and child.distance(replica) == distance: yield replica - # yield replicas: local_rack, local, remote - yield from yield_in_order(replicas) + # If we have a leader hint, yield it first unconditionally. + if leader_host is not None and leader_host.is_up: + yield leader_host + + # yield replicas: local_rack, local, remote (skipping leader already yielded) + for host in yield_in_order(replicas): + if host is not leader_host: + yield host # yield rest of the cluster: local_rack, local, remote - yield from yield_in_order([host for host in child.make_query_plan(keyspace, query) if host not in replicas]) + for host in yield_in_order([host for host in child.make_query_plan(keyspace, query) if host not in replicas]): + if host is not leader_host: + yield host def on_up(self, *args, **kwargs): return self._child_policy.on_up(*args, **kwargs) diff --git a/cassandra/pool.py b/cassandra/pool.py index 9e949c342c..8ba3815ff7 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -442,7 +442,17 @@ def __init__(self, host, host_distance, session): log.debug("Finished initializing connection for host %s", self.host) - def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table=None): + # TABLETS_ROUTING_V2 capability is a per-host property gated on the cluster + # feature, so it can flip from False to True after the pool is created + # (e.g. once the last node finishes a rolling upgrade). Derive it from the + # live connections instead of latching a value at init time, which would + # otherwise leave the pool stuck on the stale-low value until it is + # recreated. any() short-circuits, so the common (enabled) case is cheap. + @property + def tablets_routing_v2(self): + return any(c.features.tablets_routing_v2 for c in self._connections.values()) + + def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table=None, query=None): if self.is_shutdown: raise ConnectionException( "Pool for %s is shutdown" % (self.host,), self.host) @@ -452,15 +462,22 @@ def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table shard_id = None if not self._session.cluster.shard_aware_options.disable and self.host.sharding_info and routing_key: - t = self._session.cluster.metadata.token_map.token_class.from_key(routing_key) - - shard_id = None - if self.tablets_routing_v1 and table is not None: + token_class = self._session.cluster.metadata.token_map.token_class + # Reuse the token the statement already memoized for this request when + # available, so the routing-key hash runs once per request instead of + # again here; fall back to hashing the routing key directly otherwise. + t = query.routing_token(token_class) if query is not None else None + if t is None: + t = token_class.from_key(routing_key) + if (self.tablets_routing_v2 or self.tablets_routing_v1) and table is not None: if keyspace is None: keyspace = self._keyspace tablet = self._session.cluster.metadata._tablets.get_tablet_for_key(keyspace, table, t) + # In both V1 and V2 the request is sent to this host, so we pick + # the shard that this host owns for the tablet. Leader-aware host + # selection (V2) happens earlier, in the load balancing policy. if tablet is not None: for replica in tablet.replicas: if replica[0] == self.host.host_id: @@ -508,15 +525,15 @@ def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table return random.choice(active_connections) return random.choice(list(self._connections.values())) - def borrow_connection(self, timeout, routing_key=None, keyspace=None, table=None): - conn = self._get_connection_for_routing_key(routing_key, keyspace, table) + def borrow_connection(self, timeout, routing_key=None, keyspace=None, table=None, query=None): + conn = self._get_connection_for_routing_key(routing_key, keyspace, table, query) start = time.time() remaining = timeout last_retry = False while True: if conn.is_closed: # The connection might have been closed in the meantime - if so, try again - conn = self._get_connection_for_routing_key(routing_key, keyspace, table) + conn = self._get_connection_for_routing_key(routing_key, keyspace, table, query) with conn.lock: if (not conn.is_closed or last_retry) and conn.in_flight < conn.max_request_id: # On last retry we ignore connection status, since it is better to return closed connection than diff --git a/cassandra/query.py b/cassandra/query.py index 6c6878fdb4..862c601a03 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -275,6 +275,7 @@ class Statement(object): _serial_consistency_level = None _routing_key = None + _routing_token = None def __init__(self, retry_policy=None, consistency_level=None, routing_key=None, serial_consistency_level=None, fetch_size=FETCH_SIZE_UNSET, keyspace=None, custom_payload=None, @@ -314,9 +315,12 @@ def _set_routing_key(self, key): self._routing_key = b"".join(self._key_parts_packed(key)) else: self._routing_key = key + # The memoized ring token is derived from the routing key; invalidate it. + self._routing_token = None def _del_routing_key(self): self._routing_key = None + self._routing_token = None routing_key = property( _get_routing_key, @@ -331,6 +335,27 @@ def _del_routing_key(self): components should be strings. """) + def routing_token(self, token_class): + """ + Return the ring token for this statement's :attr:`routing_key`, computed + at most once per statement. + + The token is a pure function of the routing key and the cluster's + partitioner (``token_class``), both stable for a given statement, so the + (Murmur3) hash is memoized. A single request would otherwise recompute it + three times -- in the load balancing policy, when selecting the target + shard, and when building the tablet_version_block -- so caching keeps that + hot path to one hash. Returns ``None`` when there is no routing key. + """ + routing_key = self.routing_key + if routing_key is None: + return None + token = self._routing_token + if token is None: + token = token_class.from_key(routing_key) + self._routing_token = token + return token + def _get_serial_consistency_level(self): return self._serial_consistency_level diff --git a/tests/unit/test_policies.py b/tests/unit/test_policies.py index 6142af1aa1..229d766432 100644 --- a/tests/unit/test_policies.py +++ b/tests/unit/test_policies.py @@ -943,6 +943,233 @@ def _assert_shuffle(self, patched_shuffle, cluster, keyspace, routing_key): child_policy.make_query_plan.assert_called_once_with(keyspace, query) assert patched_shuffle.call_count == 1 + def test_leader_aware_routing_with_tablet_version(self): + """ + For a strongly-consistent keyspace, the leader (first replica in the + tablet's replica list) must be yielded first in the query plan, even + when it is not the closest replica. + """ + hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4()) for i in range(4)] + for i, host in enumerate(hosts): + host.set_up() + host.set_location_info("dc1", f"rack{i + 1}") + + # The leader is hosts[2] (first in tablet.replicas). + leader = hosts[2] + other_replica = hosts[3] + tablet = Tablet( + first_token=-100, last_token=100, + replicas=[(leader.host_id, 0), (other_replica.host_id, 1)], + tablet_version=0xDEADBEEF12345678 + ) + + cluster = Mock(spec=Cluster) + cluster.metadata = Mock(spec=Metadata) + cluster.metadata._tablets = Mock(spec=Tablets) + cluster.metadata._tablets.get_tablet_for_key.return_value = tablet + cluster.metadata.get_replicas.return_value = [leader, other_replica] + cluster.metadata.keyspaces = {'ks': Mock(strongly_consistent=True)} + + child_policy = Mock() + # Put the leader last in the child plan and make it the farther replica + # by distance (LOCAL vs LOCAL_RACK). Without leader-first routing, + # other_replica would be yielded before the leader. + child_policy.make_query_plan.return_value = [hosts[0], hosts[1], other_replica, leader] + distances = { + leader: HostDistance.LOCAL, + other_replica: HostDistance.LOCAL_RACK, + hosts[0]: HostDistance.LOCAL, + hosts[1]: HostDistance.LOCAL, + } + child_policy.distance.side_effect = lambda host: distances.get(host, HostDistance.LOCAL) + + # shuffle_replicas=False keeps replica ordering deterministic so the only + # thing that can pull the leader to the front is the leader-first logic. + policy = TokenAwarePolicy(child_policy, shuffle_replicas=False) + policy.populate(cluster, hosts) + + query = Statement(routing_key=b'\x00\x00\x00\x01', keyspace='ks', table='tbl') + qplan = list(policy.make_query_plan(None, query)) + + # Leader must be first, even though other_replica is closer (LOCAL_RACK + # vs LOCAL) and the leader is last in the child plan. + self.assertEqual(qplan[0], leader) + # The closer replica follows, and the leader appears exactly once. + self.assertEqual(qplan[1], other_replica) + self.assertEqual(qplan.count(leader), 1) + + def test_leader_fallback_when_leader_is_down(self): + """ + When the leader host is down, the driver should fall back to other + replicas without crashing. The leader should NOT appear in the plan. + """ + hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4()) for i in range(4)] + for i, host in enumerate(hosts): + host.set_up() + host.set_location_info("dc1", f"rack{i + 1}") + + leader = hosts[2] + other_replica = hosts[3] + leader.set_down() # Simulate leader being unreachable. + + tablet = Tablet( + first_token=-100, last_token=100, + replicas=[(leader.host_id, 0), (other_replica.host_id, 1)], + tablet_version=0xCAFEBABE00000001 + ) + + cluster = Mock(spec=Cluster) + cluster.metadata = Mock(spec=Metadata) + cluster.metadata._tablets = Mock(spec=Tablets) + cluster.metadata._tablets.get_tablet_for_key.return_value = tablet + cluster.metadata.get_replicas.return_value = [leader, other_replica] + cluster.metadata.keyspaces = {'ks': Mock(strongly_consistent=True)} + + child_policy = Mock() + child_policy.make_query_plan.return_value = hosts + child_policy.distance.return_value = HostDistance.LOCAL + + policy = TokenAwarePolicy(child_policy) + policy.populate(cluster, hosts) + + query = Statement(routing_key=b'\x00\x00\x00\x01', keyspace='ks', table='tbl') + qplan = list(policy.make_query_plan(None, query)) + + # Leader is down, should not appear in the plan. + self.assertNotIn(leader, qplan) + # Other replica should be first. + self.assertEqual(qplan[0], other_replica) + + def test_no_leader_routing_without_tablet_version(self): + """ + When a tablet has no tablet_version (V1 behavior), the leader-first + optimization should NOT apply. Replicas should follow normal ordering. + """ + hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4()) for i in range(4)] + for i, host in enumerate(hosts): + host.set_up() + host.set_location_info("dc1", f"rack{i + 1}") + + # Tablet without version (V1-style). + tablet = Tablet( + first_token=-100, last_token=100, + replicas=[(hosts[2].host_id, 0), (hosts[3].host_id, 1)], + tablet_version=None + ) + + cluster = Mock(spec=Cluster) + cluster.metadata = Mock(spec=Metadata) + cluster.metadata._tablets = Mock(spec=Tablets) + cluster.metadata._tablets.get_tablet_for_key.return_value = tablet + cluster.metadata.get_replicas.return_value = [hosts[2], hosts[3]] + cluster.metadata.keyspaces = {'ks': Mock(strongly_consistent=False)} + + child_policy = Mock() + child_policy.make_query_plan.return_value = hosts + child_policy.distance.return_value = HostDistance.LOCAL + + policy = TokenAwarePolicy(child_policy) + policy.populate(cluster, hosts) + + query = Statement(routing_key=b'\x00\x00\x00\x01', keyspace='ks', table='tbl') + qplan = list(policy.make_query_plan(None, query)) + + # Without tablet_version, no leader-first logic. Both replicas appear + # but not necessarily with hosts[2] first (depends on child policy order). + # Just verify all hosts appear and no crash. + self.assertEqual(len(qplan), 4) + self.assertIn(hosts[2], qplan) + self.assertIn(hosts[3], qplan) + + def test_no_leader_routing_for_eventually_consistent_keyspace(self): + """ + A tablet_version is assigned to eventually-consistent tablet tables too + (TABLETS_ROUTING_V2), but the leader concept only exists for + strongly-consistent keyspaces. For an eventually-consistent keyspace the + leader-first optimization must NOT apply even when a tablet_version is + present. + """ + hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4()) for i in range(4)] + for i, host in enumerate(hosts): + host.set_up() + host.set_location_info("dc1", f"rack{i + 1}") + + first_replica = hosts[2] + second_replica = hosts[3] + tablet = Tablet( + first_token=-100, last_token=100, + replicas=[(first_replica.host_id, 0), (second_replica.host_id, 1)], + tablet_version=0xDEADBEEF12345678 + ) + + cluster = Mock(spec=Cluster) + cluster.metadata = Mock(spec=Metadata) + cluster.metadata._tablets = Mock(spec=Tablets) + cluster.metadata._tablets.get_tablet_for_key.return_value = tablet + cluster.metadata.get_replicas.return_value = [first_replica, second_replica] + cluster.metadata.keyspaces = {'ks': Mock(strongly_consistent=False)} + + child_policy = Mock() + # Order the child plan so the second replica comes before the first; if + # leader-first logic wrongly triggered, first_replica would be forced to + # the front instead. + child_policy.make_query_plan.return_value = [second_replica, first_replica, hosts[0], hosts[1]] + child_policy.distance.return_value = HostDistance.LOCAL + + # shuffle_replicas=False keeps replica ordering deterministic so we can + # assert that no leader is forced to the front. + policy = TokenAwarePolicy(child_policy, shuffle_replicas=False) + policy.populate(cluster, hosts) + + query = Statement(routing_key=b'\x00\x00\x00\x01', keyspace='ks', table='tbl') + qplan = list(policy.make_query_plan(None, query)) + + # Leader-first must NOT apply: ordering follows the child plan, so the + # second replica (not replicas[0]) stays first. + self.assertEqual(qplan[0], second_replica) + self.assertEqual(len(qplan), 4) + + def test_no_leader_routing_when_keyspace_metadata_missing(self): + """ + If keyspace metadata is unavailable (e.g. schema refresh disabled), the + policy must safely fall back to no leader-first routing rather than + crashing or guessing. + """ + hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4()) for i in range(4)] + for i, host in enumerate(hosts): + host.set_up() + host.set_location_info("dc1", f"rack{i + 1}") + + first_replica = hosts[2] + second_replica = hosts[3] + tablet = Tablet( + first_token=-100, last_token=100, + replicas=[(first_replica.host_id, 0), (second_replica.host_id, 1)], + tablet_version=0xDEADBEEF12345678 + ) + + cluster = Mock(spec=Cluster) + cluster.metadata = Mock(spec=Metadata) + cluster.metadata._tablets = Mock(spec=Tablets) + cluster.metadata._tablets.get_tablet_for_key.return_value = tablet + cluster.metadata.get_replicas.return_value = [first_replica, second_replica] + cluster.metadata.keyspaces = {} # no metadata for 'ks' + + child_policy = Mock() + child_policy.make_query_plan.return_value = [second_replica, first_replica, hosts[0], hosts[1]] + child_policy.distance.return_value = HostDistance.LOCAL + + # shuffle_replicas=False keeps replica ordering deterministic so we can + # assert that no leader is forced to the front. + policy = TokenAwarePolicy(child_policy, shuffle_replicas=False) + policy.populate(cluster, hosts) + + query = Statement(routing_key=b'\x00\x00\x00\x01', keyspace='ks', table='tbl') + qplan = list(policy.make_query_plan(None, query)) + + self.assertEqual(qplan[0], second_replica) + self.assertEqual(len(qplan), 4) + class ConvictionPolicyTest(unittest.TestCase): def test_not_implemented(self): From 7cbb8a9f35c3147586f972cea313269672ebbafe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dawid=20M=C4=99drek?= Date: Fri, 26 Jun 2026 15:52:26 +0200 Subject: [PATCH 5/5] Add integration tests for TABLETS_ROUTING_V2 Cover the end-to-end behaviour against a live ScyllaDB started with the `strongly-consistent-tables` experimental feature: v2 negotiation, payload-driven cache population, the tablet_version_block matching rules (no payload on a matching block, exactly one matching value per index, precedence over v1), the protocol error on a missing block, and leader-aware routing targeting the Raft leader for a strongly-consistent keyspace. --- .../standard/test_tablets_routing_v2.py | 418 ++++++++++++++++++ 1 file changed, 418 insertions(+) diff --git a/tests/integration/standard/test_tablets_routing_v2.py b/tests/integration/standard/test_tablets_routing_v2.py index 5ccd2ec430..7e981625b1 100644 --- a/tests/integration/standard/test_tablets_routing_v2.py +++ b/tests/integration/standard/test_tablets_routing_v2.py @@ -13,10 +13,19 @@ server that does not advertise it (e.g. a released Scylla), every test skips. """ +from contextlib import contextmanager + import pytest +import cassandra.cqltypes as types +from cassandra import ConsistencyLevel from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy +from cassandra.protocol import ExecuteMessage, ProtocolException +from cassandra.protocol_features import ( + ProtocolFeatures, RATE_LIMIT_ERROR_EXTENSION, LWT_ADD_METADATA_MARK, + TABLETS_ROUTING_V1, TABLETS_ROUTING_V2, +) from tests.integration import PROTOCOL_VERSION, use_cluster @@ -42,6 +51,25 @@ def setup_module(module): allow_module_level=True) +def _startup_with_both_extensions(self, options): + """ + Drop-in replacement for ProtocolFeatures.add_startup_options that negotiates + BOTH tablets_routing_v1 and tablets_routing_v2 on the same connection. + + The real driver makes the two mutually exclusive (V2 wins). Forcing both lets + us prove the server-side precedence rules: scylla checks V2 first and only + falls back to V1 when V2 is not set (cql3/statements/select_statement.cc). + """ + if self.rate_limit_error is not None: + options[RATE_LIMIT_ERROR_EXTENSION] = "" + if self.tablets_routing_v2: + options[TABLETS_ROUTING_V2] = "" + if self.tablets_routing_v1: + options[TABLETS_ROUTING_V1] = "" + if self.lwt_info is not None: + options[LWT_ADD_METADATA_MARK] = str(self.lwt_info.lwt_meta_bit_mask) + + class TestTabletsRoutingV2Integration: @classmethod def setup_class(cls): @@ -53,11 +81,52 @@ def setup_class(cls): }, reconnection_policy=ConstantReconnectionPolicy(1)) cls.session = cls.cluster.connect() + cls._create_schema(cls.session) @classmethod def teardown_class(cls): cls.cluster.shutdown() + @classmethod + def _create_schema(cls, session): + session.execute("DROP KEYSPACE IF EXISTS test_v2") + session.execute( + """ + CREATE KEYSPACE test_v2 + WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} + AND tablets = {'initial': 8} + """) + session.execute("CREATE TABLE test_v2.t (pk int PRIMARY KEY, v int)") + prepared = session.prepare("INSERT INTO test_v2.t (pk, v) VALUES (?, ?)") + for i in range(50): + session.execute(prepared.bind((i, i))) + + # A strongly-consistent (Raft) keyspace, used to exercise leader-aware + # routing. `consistency = 'global'` is currently the only strongly + # consistent mode the server implements; it requires the + # 'strongly-consistent-tables' feature that the module-level setup already + # enabled. RF=3 on the 3-node cluster makes every node a replica, so each + # tablet has a single, well-defined Raft leader to route to. + session.execute("DROP KEYSPACE IF EXISTS test_v2_sc") + session.execute( + """ + CREATE KEYSPACE test_v2_sc + WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} + AND tablets = {'initial': 8} + AND consistency = 'global' + """) + session.execute("CREATE TABLE test_v2_sc.t (pk int PRIMARY KEY, v int)") + prepared_sc = session.prepare("INSERT INTO test_v2_sc.t (pk, v) VALUES (?, ?)") + # Writes to a strongly-consistent (Raft) table are rejected unless they + # use QUORUM/LOCAL_QUORUM (strong_consistency/modification_statement.cc). + # The session default is LOCAL_ONE, so request LOCAL_QUORUM for these + # inserts; it propagates to every statement bound from this prepared one. + prepared_sc.consistency_level = ConsistencyLevel.LOCAL_QUORUM + for i in range(50): + session.execute(prepared_sc.bind((i, i))) + + # -- helpers ---------------------------------------------------------------- + def _v2_negotiated(self): return bool(self.session.cluster.control_connection._tablets_routing_v2) @@ -66,6 +135,321 @@ def _skip_if_no_v2(self): pytest.skip("Server did not advertise TABLETS_ROUTING_V2_EXPERIMENTAL; " "needs a build started with the 'strongly-consistent-tables' feature") + def _cached_tablet(self, bound): + md = self.session.cluster.metadata + token = md.token_map.token_class.from_key(bound.routing_key) + tablet = md._tablets.get_tablet_for_key(bound.keyspace, bound.table, token) + return tablet, token + + def _ensure_cached(self, bound, attempts=30): + """ + Drive requests until the V2 routing cache is populated for `bound`. + + On a cold start the driver sends a *random* tablet_version_block, which + only matches the server ~1/16 of the time; on a mismatch the server + returns routing info and the cache is filled. We retry until that happens. + """ + for _ in range(attempts): + self.session.execute(bound) + tablet, _token = self._cached_tablet(bound) + if tablet is not None and tablet.tablet_version is not None: + return tablet + raise AssertionError("V2 routing cache was never populated; the server " + "never returned a 'tablets-routing-v2' payload") + + # -- tests ------------------------------------------------------------------ + + def test_v2_is_negotiated(self): + self._skip_if_no_v2() + # Every per-host pool must also have negotiated V2 (it gates the request byte). + for pool in self.session._pools.values(): + assert getattr(pool, 'tablets_routing_v2', False) is True + + def test_v2_payload_populates_cache_with_valid_fields(self): + """Regression guard for the payload tuple field order (bug #1).""" + self._skip_if_no_v2() + + select = self.session.prepare("SELECT v FROM test_v2.t WHERE pk = ?") + bound = select.bind([2]) + + tablet = self._ensure_cached(bound) + _, token = self._cached_tablet(bound) + + # If the tuple were decoded in the wrong order, first_token/last_token + # would actually carry the version / replica list and these invariants + # would not hold. + assert tablet.tablet_version is not None + assert tablet.first_token <= tablet.last_token + # get_tablet_for_key matches first_token < token <= last_token. + assert tablet.first_token < token.value <= tablet.last_token + + # Replicas must be real hosts known to the cluster with sane shard ids. + known_host_ids = {h.host_id for h in self.session.cluster.metadata.all_hosts()} + assert tablet.replicas, "tablet has no replicas" + for host_id, shard in tablet.replicas: + assert host_id in known_host_ids, \ + "replica host_id {} is not a known host (corrupt payload?)".format(host_id) + assert isinstance(shard, int) and shard >= 0 + + def test_matching_block_yields_no_payload(self): + """Regression guard for the tablet_version_block encoding (bug #2).""" + self._skip_if_no_v2() + + select = self.session.prepare("SELECT v FROM test_v2.t WHERE pk = ?") + bound = select.bind([7]) + + # Populate the cache so the driver knows the current tablet_version. + self._ensure_cached(bound) + + # The next request carries a block derived from the cached version. If the + # driver's encoding agrees with the server, the versions match and NO + # routing payload is returned. A wrong shift would mismatch and the server + # would keep returning routing info. + result = self.session.execute(bound) + assert result.one() is not None + payload = result.response_future.custom_payload + assert not (payload and 'tablets-routing-v2' in payload), ( + "Server returned routing info despite a cached, up-to-date " + "tablet_version; the driver's tablet_version_block encoding likely " + "disagrees with the server (locator::compare_tablet_version_block)") + + # -- low-level helpers ------------------------------------------------------ + + @staticmethod + def _right_block(version, idx=0): + """ + Build a tablet_version_block that the server will accept as a match for + block `idx` of `version` (high nibble = index, low nibble = that nibble + of the version). Mirrors locator::compare_tablet_version_block. + """ + idx &= 0xF + return (idx << 4) | ((version >> (idx * 4)) & 0xF) + + def _send_raw_execute(self, conn, bound, tablet_version_block): + """ + Send an EXECUTE directly on a specific shard connection with a chosen + tablet_version_block (or None to omit the byte entirely, i.e. behave like + the pre-V2 protocol), and return the decoded response message. + + This bypasses ResponseFuture/load balancing so we control exactly which + node+shard the request hits and which byte is on the wire; it also avoids + polluting the driver's tablet cache. + """ + ps = bound.prepared_statement + msg = ExecuteMessage( + ps.query_id, bound.values, ConsistencyLevel.LOCAL_ONE, + serial_consistency_level=None, fetch_size=None, paging_state=None, + timestamp=None, skip_meta=False, + result_metadata_id=ps.result_metadata_id, + tablet_version_block=tablet_version_block) + return conn.wait_for_response(msg, timeout=30) + + def _decode_v2_payload(self, payload): + ctype = types.lookup_casstype( + 'TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)), LongType)') + info = ctype.from_binary(payload['tablets-routing-v2'], self.cluster.protocol_version) + return {'first_token': info[0], 'last_token': info[1], + 'replicas': info[2], 'tablet_version': info[3]} + + def _any_connection(self): + for pool in self.session._pools.values(): + for conn in pool._connections.values(): + return conn + raise AssertionError("no shard connections available") + + @staticmethod + def _all_shard_connections(session): + for host, pool in session._pools.items(): + for shard, conn in pool._connections.items(): + yield host, shard, conn + + @staticmethod + def _wait_for_shard_connections(session, timeout=30): + """Wait until each pool has filled its shard-aware connections (background).""" + import time + deadline = time.time() + timeout + while time.time() < deadline: + if all( + len(pool._connections) >= (min(host.sharding_info.shards_count, 2) + if host.sharding_info else 1) + for host, pool in session._pools.items() + ): + return + time.sleep(0.05) + + @contextmanager + def _cluster_with_v1_and_v2(self): + """ + Yield a (cluster, session) whose connections negotiated BOTH V1 and V2. + Restores the original startup behaviour and shuts the cluster down on exit. + """ + original = ProtocolFeatures.add_startup_options + ProtocolFeatures.add_startup_options = _startup_with_both_extensions + cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], + protocol_version=PROTOCOL_VERSION, + execution_profiles={ + EXEC_PROFILE_DEFAULT: ExecutionProfile( + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy())) + }, + reconnection_policy=ConstantReconnectionPolicy(1)) + try: + session = cluster.connect('test_v2') + self._wait_for_shard_connections(session) + yield cluster, session + finally: + cluster.shutdown() + ProtocolFeatures.add_startup_options = original + + @staticmethod + def _find_replica_wrong_shard(session, tablet): + """ + Find a connection to a host that *is* a replica of `tablet` but on a shard + that the host does NOT own for it ("right node, wrong shard"). Returns + (host, owner_shard, wrong_shard, conn) or None if no host has >=2 shards. + """ + replica_shard = {host_id: shard for host_id, shard in tablet.replicas} + for host, pool in session._pools.items(): + owner = replica_shard.get(host.host_id) + if owner is None: + continue + for shard, conn in pool._connections.items(): + if shard != owner: + return host, owner, shard, conn + return None + + # -- scenario tests --------------------------------------------------------- + + def test_index0_all_block_values_exactly_one_match(self): + """ + Scenario 1: for block index 0, exactly one of the 16 possible values + matches the server's tablet_version; every other value is reported as a + mismatch carrying that same tablet_version, whose nibble 0 equals the + value that matched. + """ + self._skip_if_no_v2() + select = self.session.prepare("SELECT v FROM test_v2.t WHERE pk = ?") + bound = select.bind([11]) + tablet = self._ensure_cached(bound) + version = tablet.tablet_version + + conn = self._any_connection() + + matched_values = [] + reported_versions = [] + for value in range(16): + block = value # index 0 -> high nibble 0, low nibble = value + resp = self._send_raw_execute(conn, bound, block) + payload = resp.custom_payload or {} + if 'tablets-routing-v2' in payload: + reported_versions.append(self._decode_v2_payload(payload)['tablet_version']) + else: + matched_values.append(value) + + # Exactly one value matches: the low nibble of the version. + assert matched_values == [version & 0xF], \ + "expected exactly one matching block value, got {}".format(matched_values) + # All 15 mismatches report the same tablet_version ... + assert len(reported_versions) == 15 + assert set(reported_versions) == {version} + # ... and that version's block-0 nibble is the value that matched. + assert (version & 0xF) == matched_values[0] + + def test_right_block_to_all_nodes_and_shards_never_returns_payload(self): + """ + Scenario 2: a correct tablet_version_block matches on every node and every + shard (the server's V2 check ignores shard), so no routing payload is ever + returned. + """ + self._skip_if_no_v2() + select = self.session.prepare("SELECT v FROM test_v2.t WHERE pk = ?") + bound = select.bind([13]) + tablet = self._ensure_cached(bound) + version = tablet.tablet_version + + sent = 0 + for host, shard, conn in self._all_shard_connections(self.session): + # Vary the block index per shard to also exercise non-zero indices. + block = self._right_block(version, idx=shard) + resp = self._send_raw_execute(conn, bound, block) + payload = resp.custom_payload or {} + assert 'tablets-routing-v2' not in payload, ( + "host {} shard {} returned a routing payload for a correct " + "tablet_version_block".format(host, shard)) + sent += 1 + assert sent >= 1, "no shard connections were exercised" + + def test_v2_takes_precedence_over_v1_no_v1_payload_on_wrong_shard(self): + """ + Scenario 3: with BOTH V1 and V2 negotiated, send a correct V2 block to the + wrong shard. The server checks V2 first; since the block matches there is + no payload at all -- crucially no `tablets-routing-v1`, which V1 would have + emitted for a wrong-shard request. + """ + self._skip_if_no_v2() + select = self.session.prepare("SELECT v FROM test_v2.t WHERE pk = ?") + bound = select.bind([17]) + tablet = self._ensure_cached(bound) + version = tablet.tablet_version + + with self._cluster_with_v1_and_v2() as (_cluster, session): + target = self._find_replica_wrong_shard(session, tablet) + if target is None: + pytest.skip("need a replica host with >=2 shards to target a wrong shard") + _host, _owner_shard, _wrong_shard, conn = target + assert conn.features.tablets_routing_v1 and conn.features.tablets_routing_v2, \ + "test setup failed: connection did not negotiate both V1 and V2" + + resp = self._send_raw_execute(conn, bound, self._right_block(version)) + payload = resp.custom_payload or {} + assert 'tablets-routing-v1' not in payload, ( + "server emitted V1 routing info despite V2 being negotiated; " + "V2 must take precedence (select_statement.cc)") + # The correct V2 block also means no V2 payload. + assert 'tablets-routing-v2' not in payload + + def test_v2_negotiated_missing_block_returns_protocol_error(self): + """ + Scenario 4: same setup as scenario 3 (both V1 and V2 negotiated, wrong + shard) but follow the old V1 protocol and send NO tablet_version_block. + + Because V2 is negotiated the server's request parser unconditionally reads + the mandatory trailing byte (transport/server.cc). Omitting it leaves the + frame one byte short, so the server rejects the request with a protocol + error (code 0x000A) rather than silently falling back to V1. + """ + self._skip_if_no_v2() + select = self.session.prepare("SELECT v FROM test_v2.t WHERE pk = ?") + bound = select.bind([19]) + tablet = self._ensure_cached(bound) + + with self._cluster_with_v1_and_v2() as (_cluster, session): + target = self._find_replica_wrong_shard(session, tablet) + if target is None: + pytest.skip("need a replica host with >=2 shards to target a wrong shard") + _host, _owner_shard, _wrong_shard, conn = target + assert conn.features.tablets_routing_v1 and conn.features.tablets_routing_v2, \ + "test setup failed: connection did not negotiate both V1 and V2" + + with pytest.raises(ProtocolException) as exc_info: + self._send_raw_execute(conn, bound, None) + assert exc_info.value.code == 0x000A, \ + "expected a protocol error, got: {!r}".format(exc_info.value) + + # -- strongly-consistent (leader-aware) routing ----------------------------- + + def test_strongly_consistent_keyspace_metadata(self): + """ + The driver must learn from system_schema.scylla_keyspaces which keyspaces + are strongly consistent: test_v2_sc (consistency='global') is, test_v2 + (no consistency clause) is not. This flag is the precondition for + leader-aware routing in TokenAwarePolicy.make_query_plan. + """ + self._skip_if_no_v2() + self.session.cluster.refresh_schema_metadata() + keyspaces = self.session.cluster.metadata.keyspaces + assert keyspaces['test_v2_sc'].strongly_consistent is True + assert keyspaces['test_v2'].strongly_consistent is False + def test_strongly_consistent_flag_tracks_dynamic_keyspace_changes(self): """ The driver reads system_schema.scylla_keyspaces on every schema refresh @@ -138,3 +522,37 @@ def test_strongly_consistent_flag_tracks_dynamic_keyspace_changes(self): finally: self.session.execute("DROP KEYSPACE IF EXISTS {0}".format(ec_ks)) self.session.execute("DROP KEYSPACE IF EXISTS {0}".format(sc_ks)) + + def test_leader_aware_routing_targets_the_raft_leader(self): + """ + For a strongly-consistent table the server orders the replica list with + the Raft leader first + (groups_manager.cc::prepare_replicas_for_sc_tablet_version). Once that + payload is cached, a TokenAwarePolicy must route every request for the + tablet to replicas[0] (the leader), saving the extra coordinator->leader + hop. This is the strongly-consistent counterpart to the eventually + consistent test_v2 tests above, which never assert *which* replica is hit. + """ + self._skip_if_no_v2() + select = self.session.prepare("SELECT v FROM test_v2_sc.t WHERE pk = ?") + bound = select.bind([2]) + + tablet = self._ensure_cached(bound) + assert tablet.replicas, "strongly-consistent tablet has no replicas" + leader_host_id = tablet.replicas[0][0] + + # Leader-first routing only triggers when the keyspace is known to be + # strongly consistent. + ks_meta = self.session.cluster.metadata.keyspaces['test_v2_sc'] + assert ks_meta.strongly_consistent is True + + # With an up-to-date cache the block always matches, so the server returns + # no further payload and replicas[0] stays the leader; every request must + # therefore be coordinated by that leader. + for _ in range(10): + result = self.session.execute(bound) + coordinator = result.response_future.coordinator_host + assert coordinator is not None and coordinator.host_id == leader_host_id, ( + "request coordinated by {} but the Raft leader is replicas[0]={}; " + "leader-aware routing did not target the leader".format( + getattr(coordinator, 'host_id', None), leader_host_id))