Files
Shubham Mishra e4098f98d9 Rust: macro change name -> accessor (#4264)
## Description of Changes

This PR primarily affects the `bindings-macro` and `schema` crates to
review:

### Core changes

1. Replaces the `name` macro with `accessor` for **Tables, Views,
Procedures, and Reducers** in Rust modules.
2. Extends `RawModuleDefV10` with a new section for:

   * case conversion policies
   * explicit names
    New sections are not validated in this PR so not functional.
3. Updates index behavior:

* Index names are now always **system-generated** for clients. Which
will be fixed in follow-up PR when we start validating RawModuleDef with
explicit names.
   * The `accessor` name for an index is used only inside the module.


## Breaking changes (API/ABI)

1. **Rust modules**

   * The `name` macro must be replaced with `accessor`.

2. **Client bindings (all languages)**

* Index names are now system-generated instead of using explicitly
provided names.


**Complexity:** 3

A follow-up PR will reintroduce explicit names with support for case
conversion.

---------

Co-authored-by: rekhoff <r.ekhoff@clockworklabs.io>
Co-authored-by: clockwork-labs-bot <clockwork-labs-bot@users.noreply.github.com>
Co-authored-by: clockwork-labs-bot <bot@clockworklabs.com>
2026-02-16 15:23:50 +00:00

508 lines
18 KiB
Python

import time
import unittest
from typing import Callable
import json
from .. import COMPOSE_FILE, Smoketest, random_string, requires_docker, spacetime, parse_sql_result
from ..docker import DockerManager
def retry(func: Callable, max_retries: int = 3, retry_delay: int = 2):
"""Retry a function on failure with delay."""
for attempt in range(1, max_retries + 1):
try:
return func()
except Exception as e:
if attempt < max_retries:
print(f"Attempt {attempt} failed: {e}. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
else:
print("Max retries reached. Skipping the exception.")
return False
def int_vals(rows: list[dict]) -> list[dict]:
"""For all dicts in list, cast all values in dict to int."""
return [{k: int(v) for k, v in row.items()} for row in rows]
class Cluster:
"""Manages leader-related operations and state for SpaceTime database cluster."""
def __init__(self, docker_manager, smoketest: Smoketest):
self.docker = docker_manager
self.test = smoketest
# Ensure all containers are up.
self.docker.compose("up", "-d")
def sql(self, sql: str) -> list[dict]:
"""Query the test database."""
res = self.test.sql(sql)
return parse_sql_result(str(res))
def read_controldb(self, sql: str) -> list[dict]:
"""Query the control database."""
res = self.test.spacetime("sql", "spacetime-control", sql)
return parse_sql_result(str(res))
def get_db_id(self):
"""Query database ID."""
sql = f"select id from database where database_identity=0x{self.test.database_identity}"
res = self.read_controldb(sql)
return int(res[0]['id'])
def get_all_replicas(self):
"""Get all replica nodes in the cluster."""
database_id = self.get_db_id()
sql = f"select id, node_id from replica where database_id={database_id}"
return int_vals(self.read_controldb(sql))
def get_leader_info(self):
"""Get current leader's node information including ID, hostname, and container ID."""
database_id = self.get_db_id()
sql = f""" \
select node_v2.id, node_v2.network_addr from node_v2 \
join replica on replica.node_id=node_v2.id \
join replication_state on replication_state.leader=replica.id \
where replication_state.database_id={database_id} \
"""
rows = self.read_controldb(sql)
if not rows:
raise Exception("Could not find current leader's node")
leader_node_id = int(rows[0]['id'])
hostname = ""
if "(some =" in rows[0]['network_addr']:
address = rows[0]['network_addr'].split('"')[1]
hostname = address.split(':')[0]
# Find container ID
container_id = ""
containers = self.docker.list_containers()
for container in containers:
if hostname in container.name:
container_id = container.id
break
return {
'node_id': leader_node_id,
'hostname': hostname,
'container_id': container_id
}
def wait_for_leader_change(self, previous_leader_node, max_attempts=10, delay=2):
"""Wait for leader to change and return new leader node_id."""
for _ in range(max_attempts):
try:
current_leader_node = self.get_leader_info()['node_id']
if current_leader_node != previous_leader_node:
return current_leader_node
except Exception:
print("No current leader")
time.sleep(delay)
return None
def ensure_leader_health(self, id):
"""Verify leader is healthy by inserting a row."""
retry(lambda: self.test.call("start", id, 1))
rows = self.sql(f"select id from counter where id={id}")
if len(rows) < 1 or int(rows[0]['id']) != id:
raise ValueError(f"Could not find {id} in counter table")
# Wait for at least one tick to ensure buffers are flushed.
# TODO: Replace with confirmed read.
time.sleep(0.6)
def wait_counter_value(self, id, value, max_attempts=10, delay=1):
"""Wait for the value for `id` in the counter table to reach `value`"""
for _ in range(max_attempts):
rows = self.sql(f"select * from counter where id={id}")
if len(rows) >= 1 and int(rows[0]['value']) >= value:
return
else:
time.sleep(delay)
raise ValueError(f"Counter {id} below {value}")
def fail_leader(self, action='kill'):
"""Force leader failure through either killing or network disconnect."""
leader_info = self.get_leader_info()
container_id = leader_info['container_id']
if not container_id:
raise ValueError("Could not find leader container")
if action == 'kill':
self.docker.kill_container(container_id)
elif action == 'disconnect':
self.docker.disconnect_container(container_id)
else:
raise ValueError(f"Unknown action: {action}")
return container_id
def restore_leader(self, container_id, action='start'):
"""Restore failed leader through either starting or network reconnect."""
if action == 'start':
self.docker.start_container(container_id)
elif action == 'connect':
self.docker.connect_container(container_id)
else:
raise ValueError(f"Unknown action: {action}")
@requires_docker
class ReplicationTest(Smoketest):
MODULE_CODE = """
use spacetimedb::{duration, ReducerContext, Table};
#[spacetimedb::table(accessor = counter, public)]
pub struct Counter {
#[primary_key]
#[auto_inc]
id: u64,
#[index(btree)]
value: u64,
}
#[spacetimedb::table(accessor = schedule_counter, public, scheduled(increment, at = sched_at))]
pub struct ScheduledCounter {
#[primary_key]
#[auto_inc]
scheduled_id: u64,
sched_at: spacetimedb::ScheduleAt,
count: u64,
}
#[spacetimedb::reducer]
fn increment(ctx: &ReducerContext, arg: ScheduledCounter) {
// if the counter exists, increment it
if let Some(counter) = ctx.db.counter().id().find(arg.scheduled_id) {
if counter.value == arg.count {
ctx.db.schedule_counter().delete(arg);
return;
}
// update counter
ctx.db.counter().id().update(Counter {
id: arg.scheduled_id,
value: counter.value + 1,
});
} else {
// insert fresh counter
ctx.db.counter().insert(Counter {
id: arg.scheduled_id,
value: 1,
});
}
}
#[spacetimedb::reducer]
fn start(ctx: &ReducerContext, id: u64, count: u64) {
ctx.db.schedule_counter().insert(ScheduledCounter {
scheduled_id: id,
sched_at: duration!(0ms).into(),
count,
});
}
#[spacetimedb::table(accessor = message, public)]
pub struct Message {
#[primary_key]
#[auto_inc]
id: u64,
text: String
}
#[spacetimedb::reducer]
fn send_message(ctx: &ReducerContext, text: String) {
ctx.db.message().insert(Message { id: 0, text });
}
"""
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.root_config = cls.project_path / "root_config"
spacetime("--config-path", cls.root_config, "server", "set-default", "local")
def setUp(self):
self.docker = DockerManager(COMPOSE_FILE)
self.root_token = self.docker.generate_root_token()
self.cluster = Cluster(self.docker, self)
def tearDown(self):
# Ensure containers that were brought down during a test are back up.
self.docker.compose("up", "-d")
super().tearDown()
def add_me_as_admin(self):
"""Add the current user as an admin account"""
db_owner_id = str(self.spacetime("login", "show")).split()[-1]
spacetime("--config-path", self.root_config, "login", "--token", self.root_token)
spacetime("--config-path", self.root_config, "call", "spacetime-control", "create_admin_account", f"0x{db_owner_id}")
def start(self, id: int, count: int):
"""Send a message to the database."""
retry(lambda: self.call("start", id, count))
def collect_counter_rows(self):
return int_vals(self.cluster.sql("select * from counter"))
def call_control(self, reducer, *args):
self.spacetime("call", "spacetime-control", reducer, *map(json.dumps, args))
class LeaderElection(ReplicationTest):
def test_leader_election_in_loop(self):
"""This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader"""
iterations = 5
row_ids = [101 + i for i in range(iterations * 2)]
for (first_id, second_id) in zip(row_ids[::2], row_ids[1::2]):
cur_leader = self.cluster.wait_for_leader_change(None)
print(f"ensure leader health {first_id}")
self.cluster.ensure_leader_health(first_id)
print(f"killing current leader: {cur_leader}")
container_id = self.cluster.fail_leader()
self.assertIsNotNone(container_id)
next_leader = self.cluster.wait_for_leader_change(cur_leader)
self.assertNotEqual(cur_leader, next_leader)
# this check if leader election happened
print(f"ensure_leader_health {second_id}")
self.cluster.ensure_leader_health(second_id)
# restart the old leader, so that we can maintain quorum for next iteration
print(f"reconnect leader {container_id}")
self.cluster.restore_leader(container_id, 'start')
# Ensure we have a current leader
last_row_id = row_ids[-1] + 1
self.cluster.ensure_leader_health(row_ids[-1] + 1)
row_ids.append(last_row_id)
# Verify that all inserted rows are present
stored_row_ids = [row['id'] for row in self.collect_counter_rows()]
self.assertEqual(set(stored_row_ids), set(row_ids))
class LeaderDisconnect(ReplicationTest):
def test_leader_c_disconnect_in_loop(self):
"""This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader"""
iterations = 5
row_ids = [201 + i for i in range(iterations * 2)]
for (first_id, second_id) in zip(row_ids[::2], row_ids[1::2]):
print(f"first={first_id} second={second_id}")
cur_leader = self.cluster.wait_for_leader_change(None)
print(f"ensure leader health {first_id}")
self.cluster.ensure_leader_health(first_id)
print("disconnect current leader")
container_id = self.cluster.fail_leader('disconnect')
self.assertIsNotNone(container_id)
print(f"disconnected leader's container is {container_id}")
next_leader = self.cluster.wait_for_leader_change(cur_leader)
self.assertNotEqual(cur_leader, next_leader)
# this check if leader election happened
print(f"ensure_leader_health {second_id}")
self.cluster.ensure_leader_health(second_id)
# restart the old leader, so that we can maintain quorum for next iteration
print(f"reconnect leader {container_id}")
self.cluster.restore_leader(container_id, 'connect')
# Ensure we have a current leader
last_row_id = row_ids[-1] + 1
self.cluster.ensure_leader_health(last_row_id)
row_ids.append(last_row_id)
# Verify that all inserted rows are present
stored_row_ids = [row['id'] for row in self.collect_counter_rows()]
self.assertEqual(set(stored_row_ids), set(row_ids))
@unittest.skip("drain_node not yet supported")
class DrainLeader(ReplicationTest):
def test_drain_leader_node(self):
"""This test moves leader replica to different node"""
self.add_me_as_admin()
cur_leader_node_id = self.cluster.wait_for_leader_change(None)
self.cluster.ensure_leader_health(301)
replicas = self.cluster.get_all_replicas()
empty_node_id = 14
for replica in replicas:
empty_node_id = empty_node_id - replica['node_id']
self.spacetime("call", "spacetime-control", "drain_node", f"{cur_leader_node_id}", f"{empty_node_id}")
time.sleep(5)
self.cluster.ensure_leader_health(302)
replicas = self.cluster.get_all_replicas()
for replica in replicas:
self.assertNotEqual(replica['node_id'], cur_leader_node_id)
class PreferLeader(ReplicationTest):
def test_prefer_leader(self):
"""This test moves leader replica to different node"""
self.add_me_as_admin()
cur_leader_node_id = self.cluster.wait_for_leader_change(None)
self.cluster.ensure_leader_health(401)
replicas = self.cluster.get_all_replicas()
prefer_replica = {}
for replica in replicas:
if replica['node_id'] != cur_leader_node_id:
prefer_replica = replica
break
prefer_replica_id = prefer_replica['id']
self.spacetime("call", "spacetime-control", "prefer_leader", f"{prefer_replica_id}")
next_leader_node_id = self.cluster.wait_for_leader_change(cur_leader_node_id)
self.cluster.ensure_leader_health(402)
self.assertEqual(prefer_replica['node_id'], next_leader_node_id)
# verify if all past rows are present in new leader
stored_row_ids = [row['id'] for row in self.collect_counter_rows()]
self.assertEqual(set(stored_row_ids), set([401, 402]))
class ManyTransactions(ReplicationTest):
def test_a_many_transactions(self):
"""This test sends many messages to the database and verifies that they are all present"""
self.cluster.wait_for_leader_change(None)
num_messages = 10000
sub = self.subscribe("SELECT * FROM counter", n = num_messages)
self.start(1, num_messages)
message_table = sub()[-1:]
self.assertIn({
'counter': {
'deletes': [{'id': 1, 'value': num_messages - 1}],
'inserts': [{'id': 1, 'value': num_messages}]
}
}, message_table)
class QuorumLoss(ReplicationTest):
def test_quorum_loss(self):
"""This test makes cluster to lose majority of followers to verify if leader eventually stop accepting writes"""
for i in range(11):
self.call("send_message", f"{i}")
leader = self.cluster.get_leader_info()
containers = self.docker.list_containers()
for container in containers:
if leader['container_id'] != container.id and "worker" in container.name:
self.docker.kill_container(container.id)
time.sleep(2)
with self.assertRaises(Exception):
for i in range(1001):
self.call("send_message", "terminal")
class EnableReplicationTest(ReplicationTest):
AUTOPUBLISH = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.expected_counter_rows = []
def run_counter(self, id, n = 100):
self.start(id, n)
self.cluster.wait_counter_value(id, n)
self.expected_counter_rows.append({"id": id, "value": n})
self.assertEqual(self.collect_counter_rows(), self.expected_counter_rows)
def subscribe_to_enable_replication_events(self):
id = self.cluster.get_db_id()
return self.subscribe(
f"select * from staged_enable_replication_event where database_id={id}",
n = 2,
database = "spacetime-control"
)
def assert_bootstrap_complete(self, sub):
events = sub()
self.assertEqual(
events[-1]['staged_enable_replication_event']['inserts'][0]['message'],
'bootstrap complete',
)
def enable_replication(self, database_name):
sub = self.subscribe_to_enable_replication_events()
self.call_control("enable_replication", {"Name": database_name}, 3)
self.assert_bootstrap_complete(sub)
class EnableReplicationUnsuspended(EnableReplicationTest):
def test_enable_replication_fails_if_not_suspended(self):
"""Tests that the database to enable replication on must be suspended"""
self.add_me_as_admin()
name = random_string()
self.publish_module(name, num_replicas = 1)
self.cluster.wait_for_leader_change(None)
with self.assertRaises(Exception):
self.call_control("enable_replication", {"Name": name}, 3)
class EnableReplicationSuspended(EnableReplicationTest):
def test_enable_replication_on_suspended_database(self):
"""Tests that we can enable replication on a suspended database"""
self.add_me_as_admin()
name = random_string()
self.publish_module(name, num_replicas = 1)
self.cluster.wait_for_leader_change(None)
self.cluster.ensure_leader_health(1)
self.call_control("suspend_database", {"Name": name})
# Database is now unreachable.
with self.assertRaises(Exception):
self.call("send_message", "hi")
self.enable_replication(name)
# Still unreachable until we call unsuspend.
with self.assertRaises(Exception):
self.call("send_message", "hi")
self.call_control("unsuspend_database", {"Name": name})
self.cluster.wait_for_leader_change(None)
self.cluster.ensure_leader_health(2)
class EnableDisableReplication(EnableReplicationTest):
def test_enable_disable_replication(self):
"""Tests that we can enable then disable replication"""
self.add_me_as_admin()
name = random_string()
self.publish_module(name, num_replicas = 1)
# ensure database is up and commitlog ends up non-empty
self.run_counter(1, 100)
# suspend first
self.call_control("suspend_database", {"Name": name})
# enable replication and wait for it to complete
self.enable_replication(name)
# unsuspend
self.call_control("unsuspend_database", {"Name": name})
self.cluster.wait_for_leader_change(None)
self.run_counter(2, 100)
self.call_control("disable_replication", {"Name": name})
self.run_counter(3, 100)