mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
Endpoint for pretty print migration plan (#3137)
# Description of Changes - Adds endpoint for for pretty printing migration plan. - It also changes current `publish` endpoint to optionally provide `MigrationToken` and `MigrationPolicy` to allow migration with breaking clients. # API and ABI breaking changes Backward compatible change to existing API and new Api # Expected complexity level and risk 2 # Testing - Existing smoketest should cover changes for `publish` endpoint. - For pretty print endpoint, smoketests can be written only after cli changes. --------- Signed-off-by: Shubham Mishra <shivam828787@gmail.com> Co-authored-by: James Gilles <jameshgilles@gmail.com> Co-authored-by: Phoebe Goldman <phoebe@clockworklabs.io>
This commit is contained in:
Generated
+1
@@ -6207,6 +6207,7 @@ dependencies = [
|
||||
"spacetimedb-datastore",
|
||||
"spacetimedb-lib 1.4.0",
|
||||
"spacetimedb-paths",
|
||||
"spacetimedb-schema",
|
||||
"spacetimedb-table",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
|
||||
@@ -106,6 +106,27 @@ pub enum PublishResult {
|
||||
PermissionDenied { name: DatabaseName },
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug, Default)]
|
||||
pub enum MigrationPolicy {
|
||||
#[default]
|
||||
Compatible,
|
||||
BreakClients,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug, Default)]
|
||||
pub enum PrettyPrintStyle {
|
||||
#[default]
|
||||
AnsiColor,
|
||||
NoColor,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug)]
|
||||
pub struct PrintPlanResult {
|
||||
pub migrate_plan: Box<str>,
|
||||
pub break_clients: bool,
|
||||
pub token: spacetimedb_lib::Hash,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub enum DnsLookupResponse {
|
||||
/// The lookup was successful and the domain and identity are returned.
|
||||
|
||||
@@ -7,7 +7,7 @@ use http::StatusCode;
|
||||
|
||||
use spacetimedb::client::ClientActorIndex;
|
||||
use spacetimedb::energy::{EnergyBalance, EnergyQuanta};
|
||||
use spacetimedb::host::{HostController, ModuleHost, NoSuchModule, UpdateDatabaseResult};
|
||||
use spacetimedb::host::{HostController, MigratePlanResult, ModuleHost, NoSuchModule, UpdateDatabaseResult};
|
||||
use spacetimedb::identity::{AuthCtx, Identity};
|
||||
use spacetimedb::messages::control_db::{Database, HostType, Node, Replica};
|
||||
use spacetimedb::sql;
|
||||
@@ -15,6 +15,7 @@ use spacetimedb_client_api_messages::http::{SqlStmtResult, SqlStmtStats};
|
||||
use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, SetDomainsResult, Tld};
|
||||
use spacetimedb_lib::{ProductTypeElement, ProductValue};
|
||||
use spacetimedb_paths::server::ModuleLogsDir;
|
||||
use spacetimedb_schema::auto_migrate::{MigrationPolicy, PrettyPrintStyle};
|
||||
use tokio::sync::watch;
|
||||
|
||||
pub mod auth;
|
||||
@@ -146,9 +147,10 @@ impl Host {
|
||||
database: Database,
|
||||
host_type: HostType,
|
||||
program_bytes: Box<[u8]>,
|
||||
policy: MigrationPolicy,
|
||||
) -> anyhow::Result<UpdateDatabaseResult> {
|
||||
self.host_controller
|
||||
.update_module_host(database, host_type, self.replica_id, program_bytes)
|
||||
.update_module_host(database, host_type, self.replica_id, program_bytes, policy)
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -231,8 +233,11 @@ pub trait ControlStateWriteAccess: Send + Sync {
|
||||
&self,
|
||||
publisher: &Identity,
|
||||
spec: DatabaseDef,
|
||||
policy: MigrationPolicy,
|
||||
) -> anyhow::Result<Option<UpdateDatabaseResult>>;
|
||||
|
||||
async fn migrate_plan(&self, spec: DatabaseDef, style: PrettyPrintStyle) -> anyhow::Result<MigratePlanResult>;
|
||||
|
||||
async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()>;
|
||||
|
||||
// Energy
|
||||
@@ -321,8 +326,13 @@ impl<T: ControlStateWriteAccess + ?Sized> ControlStateWriteAccess for Arc<T> {
|
||||
&self,
|
||||
identity: &Identity,
|
||||
spec: DatabaseDef,
|
||||
policy: MigrationPolicy,
|
||||
) -> anyhow::Result<Option<UpdateDatabaseResult>> {
|
||||
(**self).publish_database(identity, spec).await
|
||||
(**self).publish_database(identity, spec, policy).await
|
||||
}
|
||||
|
||||
async fn migrate_plan(&self, spec: DatabaseDef, style: PrettyPrintStyle) -> anyhow::Result<MigratePlanResult> {
|
||||
(**self).migrate_plan(spec, style).await
|
||||
}
|
||||
|
||||
async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()> {
|
||||
|
||||
@@ -20,16 +20,21 @@ use http::StatusCode;
|
||||
use serde::Deserialize;
|
||||
use spacetimedb::database_logger::DatabaseLogger;
|
||||
use spacetimedb::host::module_host::ClientConnectedError;
|
||||
use spacetimedb::host::ReducerArgs;
|
||||
use spacetimedb::host::ReducerCallError;
|
||||
use spacetimedb::host::ReducerOutcome;
|
||||
use spacetimedb::host::UpdateDatabaseResult;
|
||||
use spacetimedb::host::{MigratePlanResult, ReducerArgs};
|
||||
use spacetimedb::identity::Identity;
|
||||
use spacetimedb::messages::control_db::{Database, HostType};
|
||||
use spacetimedb_client_api_messages::name::{self, DatabaseName, DomainName, PublishOp, PublishResult};
|
||||
use spacetimedb_client_api_messages::name::{
|
||||
self, DatabaseName, DomainName, MigrationPolicy, PrettyPrintStyle, PrintPlanResult, PublishOp, PublishResult,
|
||||
};
|
||||
use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9;
|
||||
use spacetimedb_lib::identity::AuthCtx;
|
||||
use spacetimedb_lib::{sats, Timestamp};
|
||||
use spacetimedb_schema::auto_migrate::{
|
||||
MigrationPolicy as SchemaMigrationPolicy, MigrationToken, PrettyPrintStyle as AutoMigratePrettyPrintStyle,
|
||||
};
|
||||
|
||||
use super::subscribe::{handle_websocket, HasWebSocketOptions};
|
||||
|
||||
@@ -474,6 +479,13 @@ pub struct PublishDatabaseQueryParams {
|
||||
#[serde(default)]
|
||||
clear: bool,
|
||||
num_replicas: Option<usize>,
|
||||
/// [`Hash`] of [`MigrationToken`]` to be checked if `MigrationPolicy::BreakClients` is set.
|
||||
///
|
||||
/// Users obtain such a hash via the `/database/:name_or_identity/pre-publish POST` route.
|
||||
/// This is a safeguard to require explicit approval for updates which will break clients.
|
||||
token: Option<spacetimedb_lib::Hash>,
|
||||
#[serde(default)]
|
||||
policy: MigrationPolicy,
|
||||
}
|
||||
|
||||
use std::env;
|
||||
@@ -501,7 +513,12 @@ fn allow_creation(auth: &SpacetimeAuth) -> Result<(), ErrorResponse> {
|
||||
pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
|
||||
State(ctx): State<S>,
|
||||
Path(PublishDatabaseParams { name_or_identity }): Path<PublishDatabaseParams>,
|
||||
Query(PublishDatabaseQueryParams { clear, num_replicas }): Query<PublishDatabaseQueryParams>,
|
||||
Query(PublishDatabaseQueryParams {
|
||||
clear,
|
||||
num_replicas,
|
||||
token,
|
||||
policy,
|
||||
}): Query<PublishDatabaseQueryParams>,
|
||||
Extension(auth): Extension<SpacetimeAuth>,
|
||||
body: Bytes,
|
||||
) -> axum::response::Result<axum::Json<PublishResult>> {
|
||||
@@ -551,6 +568,21 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
|
||||
}
|
||||
};
|
||||
|
||||
let policy: SchemaMigrationPolicy = match policy {
|
||||
MigrationPolicy::BreakClients => {
|
||||
if let Some(token) = token {
|
||||
Ok(SchemaMigrationPolicy::BreakClients(token))
|
||||
} else {
|
||||
Err((
|
||||
StatusCode::BAD_REQUEST,
|
||||
"Migration policy is set to `BreakClients`, but no migration token was provided.",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
MigrationPolicy::Compatible => Ok(SchemaMigrationPolicy::Compatible),
|
||||
}?;
|
||||
|
||||
log::trace!("Publishing to the identity: {}", database_identity.to_hex());
|
||||
|
||||
let op = {
|
||||
@@ -592,6 +624,7 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
|
||||
num_replicas,
|
||||
host_type: HostType::Wasm,
|
||||
},
|
||||
policy,
|
||||
)
|
||||
.await
|
||||
.map_err(log_and_500)?;
|
||||
@@ -619,6 +652,101 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
|
||||
}))
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
pub struct PrePublishParams {
|
||||
name_or_identity: NameOrIdentity,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
pub struct PrePublishQueryParams {
|
||||
#[serde(default)]
|
||||
style: PrettyPrintStyle,
|
||||
}
|
||||
|
||||
pub async fn pre_publish<S: NodeDelegate + ControlStateDelegate>(
|
||||
State(ctx): State<S>,
|
||||
Path(PrePublishParams { name_or_identity }): Path<PrePublishParams>,
|
||||
Query(PrePublishQueryParams { style }): Query<PrePublishQueryParams>,
|
||||
Extension(auth): Extension<SpacetimeAuth>,
|
||||
body: Bytes,
|
||||
) -> axum::response::Result<axum::Json<PrintPlanResult>> {
|
||||
// User should not be able to print migration plans for a database that they do not own
|
||||
let database_identity = resolve_and_authenticate(&ctx, &name_or_identity, &auth).await?;
|
||||
let style = match style {
|
||||
PrettyPrintStyle::NoColor => AutoMigratePrettyPrintStyle::NoColor,
|
||||
PrettyPrintStyle::AnsiColor => AutoMigratePrettyPrintStyle::AnsiColor,
|
||||
};
|
||||
|
||||
let migrate_plan = ctx
|
||||
.migrate_plan(
|
||||
DatabaseDef {
|
||||
database_identity,
|
||||
program_bytes: body.into(),
|
||||
num_replicas: None,
|
||||
host_type: HostType::Wasm,
|
||||
},
|
||||
style,
|
||||
)
|
||||
.await
|
||||
.map_err(log_and_500)?;
|
||||
|
||||
match migrate_plan {
|
||||
MigratePlanResult::Success {
|
||||
old_module_hash,
|
||||
new_module_hash,
|
||||
breaks_client,
|
||||
plan,
|
||||
} => {
|
||||
let token = MigrationToken {
|
||||
database_identity,
|
||||
old_module_hash,
|
||||
new_module_hash,
|
||||
}
|
||||
.hash();
|
||||
|
||||
Ok(PrintPlanResult {
|
||||
token,
|
||||
migrate_plan: plan,
|
||||
break_clients: breaks_client,
|
||||
})
|
||||
}
|
||||
MigratePlanResult::AutoMigrationError(e) => Err((
|
||||
StatusCode::BAD_REQUEST,
|
||||
format!("Automatic migration is not possible: {e}"),
|
||||
)
|
||||
.into()),
|
||||
}
|
||||
.map(axum::Json)
|
||||
}
|
||||
|
||||
/// Resolves the [`NameOrIdentity`] to a database identity and checks if the
|
||||
/// `auth` identity owns the database.
|
||||
async fn resolve_and_authenticate<S: ControlStateDelegate>(
|
||||
ctx: &S,
|
||||
name_or_identity: &NameOrIdentity,
|
||||
auth: &SpacetimeAuth,
|
||||
) -> axum::response::Result<Identity> {
|
||||
let database_identity = name_or_identity.resolve(ctx).await?;
|
||||
|
||||
let database = worker_ctx_find_database(ctx, &database_identity)
|
||||
.await?
|
||||
.ok_or(NO_SUCH_DATABASE)?;
|
||||
|
||||
if database.owner_identity != auth.identity {
|
||||
return Err((
|
||||
StatusCode::UNAUTHORIZED,
|
||||
format!(
|
||||
"Identity does not own database, expected: {} got: {}",
|
||||
database.owner_identity.to_hex(),
|
||||
auth.identity.to_hex()
|
||||
),
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
Ok(database_identity)
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct DeleteDatabaseParams {
|
||||
name_or_identity: NameOrIdentity,
|
||||
@@ -788,7 +916,8 @@ pub struct DatabaseRoutes<S> {
|
||||
pub logs_get: MethodRouter<S>,
|
||||
/// POST: /database/:name_or_identity/sql
|
||||
pub sql_post: MethodRouter<S>,
|
||||
|
||||
/// POST: /database/:name_or_identity/pre-publish
|
||||
pub pre_publish: MethodRouter<S>,
|
||||
/// GET: /database/: name_or_identity/unstable/timestamp
|
||||
pub timestamp_get: MethodRouter<S>,
|
||||
}
|
||||
@@ -813,6 +942,7 @@ where
|
||||
schema_get: get(schema::<S>),
|
||||
logs_get: get(logs::<S>),
|
||||
sql_post: post(sql::<S>),
|
||||
pre_publish: post(pre_publish::<S>),
|
||||
timestamp_get: get(get_timestamp::<S>),
|
||||
}
|
||||
}
|
||||
@@ -836,7 +966,8 @@ where
|
||||
.route("/schema", self.schema_get)
|
||||
.route("/logs", self.logs_get)
|
||||
.route("/sql", self.sql_post)
|
||||
.route("/unstable/timestamp", self.timestamp_get);
|
||||
.route("/unstable/timestamp", self.timestamp_get)
|
||||
.route("/pre-publish", self.pre_publish);
|
||||
|
||||
axum::Router::new()
|
||||
.route("/", self.root_post)
|
||||
|
||||
@@ -21,6 +21,7 @@ use async_trait::async_trait;
|
||||
use durability::{Durability, EmptyHistory};
|
||||
use log::{info, trace, warn};
|
||||
use parking_lot::Mutex;
|
||||
use spacetimedb_data_structures::error_stream::ErrorStream;
|
||||
use spacetimedb_data_structures::map::IntMap;
|
||||
use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS;
|
||||
use spacetimedb_datastore::db_metrics::DB_METRICS;
|
||||
@@ -30,6 +31,7 @@ use spacetimedb_lib::{hash_bytes, Identity};
|
||||
use spacetimedb_paths::server::{ReplicaDir, ServerDataDir};
|
||||
use spacetimedb_paths::FromPathUnchecked;
|
||||
use spacetimedb_sats::hash::Hash;
|
||||
use spacetimedb_schema::auto_migrate::{ponder_migrate, AutoMigrateError, MigrationPolicy, PrettyPrintStyle};
|
||||
use spacetimedb_schema::def::ModuleDef;
|
||||
use spacetimedb_table::page_pool::PagePool;
|
||||
use std::future::Future;
|
||||
@@ -355,6 +357,7 @@ impl HostController {
|
||||
host_type: HostType,
|
||||
replica_id: u64,
|
||||
program_bytes: Box<[u8]>,
|
||||
policy: MigrationPolicy,
|
||||
) -> anyhow::Result<UpdateDatabaseResult> {
|
||||
let program = Program {
|
||||
hash: hash_bytes(&program_bytes),
|
||||
@@ -404,6 +407,7 @@ impl HostController {
|
||||
this.runtimes.clone(),
|
||||
host_type,
|
||||
program,
|
||||
policy,
|
||||
this.energy_monitor.clone(),
|
||||
this.unregister_fn(replica_id),
|
||||
this.db_cores.take(),
|
||||
@@ -418,6 +422,32 @@ impl HostController {
|
||||
Ok(update_result)
|
||||
}
|
||||
|
||||
pub async fn migrate_plan(
|
||||
&self,
|
||||
database: Database,
|
||||
host_type: HostType,
|
||||
replica_id: u64,
|
||||
program_bytes: Box<[u8]>,
|
||||
style: PrettyPrintStyle,
|
||||
) -> anyhow::Result<MigratePlanResult> {
|
||||
let program = Program {
|
||||
hash: hash_bytes(&program_bytes),
|
||||
bytes: program_bytes,
|
||||
};
|
||||
trace!(
|
||||
"migrate plan {}/{}: genesis={} update-to={}",
|
||||
database.database_identity,
|
||||
replica_id,
|
||||
database.initial_program,
|
||||
program.hash
|
||||
);
|
||||
|
||||
let guard = self.acquire_read_lock(replica_id).await;
|
||||
let host = guard.as_ref().ok_or(NoSuchModule)?;
|
||||
|
||||
host.migrate_plan(host_type, program, style).await
|
||||
}
|
||||
|
||||
/// Start the host `replica_id` and conditionally update it.
|
||||
///
|
||||
/// If the host was not initialized before, it is initialized with the
|
||||
@@ -503,6 +533,7 @@ impl HostController {
|
||||
this.runtimes.clone(),
|
||||
host_type,
|
||||
program,
|
||||
MigrationPolicy::Compatible,
|
||||
this.energy_monitor.clone(),
|
||||
this.unregister_fn(replica_id),
|
||||
this.db_cores.take(),
|
||||
@@ -773,6 +804,7 @@ async fn update_module(
|
||||
module: &ModuleHost,
|
||||
program: Program,
|
||||
old_module_info: Arc<ModuleInfo>,
|
||||
policy: MigrationPolicy,
|
||||
) -> anyhow::Result<UpdateDatabaseResult> {
|
||||
let addr = db.database_identity();
|
||||
match stored_program_hash(db)? {
|
||||
@@ -783,7 +815,7 @@ async fn update_module(
|
||||
UpdateDatabaseResult::NoUpdateNeeded
|
||||
} else {
|
||||
info!("updating `{}` from {} to {}", addr, stored, program.hash);
|
||||
module.update_database(program, old_module_info).await?
|
||||
module.update_database(program, old_module_info, policy).await?
|
||||
};
|
||||
|
||||
Ok(res)
|
||||
@@ -1016,11 +1048,13 @@ impl Host {
|
||||
/// otherwise it stays the same.
|
||||
///
|
||||
/// Either way, the [`UpdateDatabaseResult`] is returned.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn update_module(
|
||||
&mut self,
|
||||
runtimes: Arc<HostRuntimes>,
|
||||
host_type: HostType,
|
||||
program: Program,
|
||||
policy: MigrationPolicy,
|
||||
energy_monitor: Arc<dyn EnergyMonitor>,
|
||||
on_panic: impl Fn() + Send + Sync + 'static,
|
||||
core: JobCore,
|
||||
@@ -1043,7 +1077,8 @@ impl Host {
|
||||
// Get the old module info to diff against when building a migration plan.
|
||||
let old_module_info = self.module.borrow().info.clone();
|
||||
|
||||
let update_result = update_module(&replica_ctx.relational_db, &module, program, old_module_info).await?;
|
||||
let update_result =
|
||||
update_module(&replica_ctx.relational_db, &module, program, old_module_info, policy).await?;
|
||||
trace!("update result: {update_result:?}");
|
||||
// Only replace the module + scheduler if the update succeeded.
|
||||
// Otherwise, we want the database to continue running with the old state.
|
||||
@@ -1057,6 +1092,30 @@ impl Host {
|
||||
Ok(update_result)
|
||||
}
|
||||
|
||||
/// Generate a migration plan for the given `program`.
|
||||
async fn migrate_plan(
|
||||
&self,
|
||||
host_type: HostType,
|
||||
program: Program,
|
||||
style: PrettyPrintStyle,
|
||||
) -> anyhow::Result<MigratePlanResult> {
|
||||
let old_module = self.module.borrow().info.clone();
|
||||
|
||||
let module_def = extract_schema(program.bytes, host_type).await?;
|
||||
|
||||
let res = match ponder_migrate(&old_module.module_def, &module_def) {
|
||||
Ok(plan) => MigratePlanResult::Success {
|
||||
old_module_hash: old_module.module_hash,
|
||||
new_module_hash: program.hash,
|
||||
breaks_client: plan.breaks_client(),
|
||||
plan: plan.pretty_print(style)?.into(),
|
||||
},
|
||||
Err(e) => MigratePlanResult::AutoMigrationError(e),
|
||||
};
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn db(&self) -> &RelationalDB {
|
||||
&self.replica_ctx.relational_db
|
||||
}
|
||||
@@ -1069,6 +1128,16 @@ impl Drop for Host {
|
||||
}
|
||||
}
|
||||
|
||||
pub enum MigratePlanResult {
|
||||
Success {
|
||||
old_module_hash: Hash,
|
||||
new_module_hash: Hash,
|
||||
plan: Box<str>,
|
||||
breaks_client: bool,
|
||||
},
|
||||
AutoMigrationError(ErrorStream<AutoMigrateError>),
|
||||
}
|
||||
|
||||
const STORAGE_METERING_INTERVAL: Duration = Duration::from_secs(15);
|
||||
|
||||
/// Periodically collect gauge stats and update prometheus metrics.
|
||||
|
||||
@@ -25,8 +25,8 @@ mod wasm_common;
|
||||
|
||||
pub use disk_storage::DiskStorage;
|
||||
pub use host_controller::{
|
||||
extract_schema, DurabilityProvider, ExternalDurability, ExternalStorage, HostController, ProgramStorage,
|
||||
ReducerCallResult, ReducerOutcome, StartSnapshotWatcher,
|
||||
extract_schema, DurabilityProvider, ExternalDurability, ExternalStorage, HostController, MigratePlanResult,
|
||||
ProgramStorage, ReducerCallResult, ReducerOutcome, StartSnapshotWatcher,
|
||||
};
|
||||
pub use module_host::{ModuleHost, NoSuchModule, ReducerCallError, UpdateDatabaseResult};
|
||||
pub use scheduler::Scheduler;
|
||||
|
||||
@@ -41,7 +41,7 @@ use spacetimedb_lib::Timestamp;
|
||||
use spacetimedb_primitives::TableId;
|
||||
use spacetimedb_query::compile_subscription;
|
||||
use spacetimedb_sats::ProductValue;
|
||||
use spacetimedb_schema::auto_migrate::AutoMigrateError;
|
||||
use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy};
|
||||
use spacetimedb_schema::def::deserialize::ReducerArgsDeserializeSeed;
|
||||
use spacetimedb_schema::def::{ModuleDef, ReducerDef, TableDef};
|
||||
use spacetimedb_schema::schema::{Schema, TableSchema};
|
||||
@@ -335,6 +335,7 @@ pub trait ModuleInstance: Send + 'static {
|
||||
&mut self,
|
||||
program: Program,
|
||||
old_module_info: Arc<ModuleInfo>,
|
||||
policy: MigrationPolicy,
|
||||
) -> anyhow::Result<UpdateDatabaseResult>;
|
||||
|
||||
fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult;
|
||||
@@ -459,8 +460,9 @@ impl<T: Module> ModuleInstance for AutoReplacingModuleInstance<T> {
|
||||
&mut self,
|
||||
program: Program,
|
||||
old_module_info: Arc<ModuleInfo>,
|
||||
policy: MigrationPolicy,
|
||||
) -> anyhow::Result<UpdateDatabaseResult> {
|
||||
let ret = self.inst.update_database(program, old_module_info);
|
||||
let ret = self.inst.update_database(program, old_module_info, policy);
|
||||
self.check_trap();
|
||||
ret
|
||||
}
|
||||
@@ -1058,9 +1060,10 @@ impl ModuleHost {
|
||||
&self,
|
||||
program: Program,
|
||||
old_module_info: Arc<ModuleInfo>,
|
||||
policy: MigrationPolicy,
|
||||
) -> Result<UpdateDatabaseResult, anyhow::Error> {
|
||||
self.call("<update_database>", move |inst| {
|
||||
inst.update_database(program, old_module_info)
|
||||
inst.update_database(program, old_module_info, policy)
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
@@ -19,7 +19,9 @@ use ser::serialize_to_js;
|
||||
use spacetimedb_client_api_messages::energy::{EnergyQuanta, ReducerBudget};
|
||||
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
|
||||
use spacetimedb_datastore::traits::Program;
|
||||
use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef};
|
||||
use spacetimedb_lib::RawModuleDef;
|
||||
use spacetimedb_lib::{ConnectionId, Identity};
|
||||
use spacetimedb_schema::auto_migrate::MigrationPolicy;
|
||||
use std::sync::{Arc, LazyLock};
|
||||
use v8::{Context, ContextOptions, ContextScope, Function, HandleScope, Isolate, Local, Value};
|
||||
|
||||
@@ -137,9 +139,11 @@ impl ModuleInstance for JsInstance {
|
||||
&mut self,
|
||||
program: Program,
|
||||
old_module_info: Arc<ModuleInfo>,
|
||||
policy: MigrationPolicy,
|
||||
) -> anyhow::Result<UpdateDatabaseResult> {
|
||||
let replica_ctx = &self.replica_ctx;
|
||||
self.common.update_database(replica_ctx, program, old_module_info)
|
||||
self.common
|
||||
.update_database(replica_ctx, program, old_module_info, policy)
|
||||
}
|
||||
|
||||
fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> super::ReducerCallResult {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use prometheus::{Histogram, IntCounter, IntGauge};
|
||||
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
|
||||
use spacetimedb_schema::auto_migrate::ponder_migrate;
|
||||
use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::span::EnteredSpan;
|
||||
@@ -233,9 +233,11 @@ impl<T: WasmInstance> ModuleInstance for WasmModuleInstance<T> {
|
||||
&mut self,
|
||||
program: Program,
|
||||
old_module_info: Arc<ModuleInfo>,
|
||||
policy: MigrationPolicy,
|
||||
) -> anyhow::Result<UpdateDatabaseResult> {
|
||||
let replica_ctx = &self.instance.instance_env().replica_ctx;
|
||||
self.common.update_database(replica_ctx, program, old_module_info)
|
||||
self.common
|
||||
.update_database(replica_ctx, program, old_module_info, policy)
|
||||
}
|
||||
|
||||
fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> ReducerCallResult {
|
||||
@@ -277,15 +279,24 @@ impl InstanceCommon {
|
||||
replica_ctx: &ReplicaContext,
|
||||
program: Program,
|
||||
old_module_info: Arc<ModuleInfo>,
|
||||
policy: MigrationPolicy,
|
||||
) -> Result<UpdateDatabaseResult, anyhow::Error> {
|
||||
let system_logger = replica_ctx.logger.system_logger();
|
||||
let stdb = &replica_ctx.relational_db;
|
||||
|
||||
let plan = ponder_migrate(&old_module_info.module_def, &self.info.module_def);
|
||||
let plan = match plan {
|
||||
let plan: MigratePlan = match policy.try_migrate(
|
||||
self.info.database_identity,
|
||||
old_module_info.module_hash,
|
||||
&old_module_info.module_def,
|
||||
self.info.module_hash,
|
||||
&self.info.module_def,
|
||||
) {
|
||||
Ok(plan) => plan,
|
||||
Err(errs) => {
|
||||
return Ok(UpdateDatabaseResult::AutoMigrateError(errs));
|
||||
Err(e) => {
|
||||
return match e {
|
||||
MigrationPolicyError::AutoMigrateFailure(e) => Ok(UpdateDatabaseResult::AutoMigrateError(e)),
|
||||
_ => Ok(UpdateDatabaseResult::ErrorExecutingMigration(e.into())),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -8,13 +8,14 @@ use spacetimedb_data_structures::{
|
||||
};
|
||||
use spacetimedb_lib::{
|
||||
db::raw_def::v9::{RawRowLevelSecurityDefV9, TableType},
|
||||
AlgebraicType,
|
||||
hash_bytes, AlgebraicType, Identity,
|
||||
};
|
||||
use spacetimedb_sats::{
|
||||
layout::{HasLayout, SumTypeLayout},
|
||||
WithTypespace,
|
||||
};
|
||||
use termcolor_formatter::{ColorScheme, TermColorFormatter};
|
||||
use thiserror::Error;
|
||||
mod formatter;
|
||||
mod termcolor_formatter;
|
||||
|
||||
@@ -50,9 +51,19 @@ impl<'def> MigratePlan<'def> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn breaks_client(&self) -> bool {
|
||||
match self {
|
||||
//TODO: fix it when support for manual migration plans is added.
|
||||
MigratePlan::Manual(_) => true,
|
||||
MigratePlan::Auto(plan) => plan
|
||||
.steps
|
||||
.iter()
|
||||
.any(|step| matches!(step, AutoMigrateStep::DisconnectAllUsers)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pretty_print(&self, style: PrettyPrintStyle) -> anyhow::Result<String> {
|
||||
use PrettyPrintStyle::*;
|
||||
|
||||
match self {
|
||||
MigratePlan::Manual(_) => {
|
||||
anyhow::bail!("Manual migration plans are not yet supported for pretty printing.")
|
||||
@@ -73,6 +84,101 @@ impl<'def> MigratePlan<'def> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A migration policy that determines whether a module update is allowed to break client compatibility.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum MigrationPolicy {
|
||||
/// Migration must maintain backward compatibility with existing clients.
|
||||
Compatible,
|
||||
/// To use this, a valid [`MigrationToken`] must be provided.
|
||||
/// The token is issued through the pre-publish API (see the `client-api` crate)
|
||||
/// and proves that the publisher explicitly acknowledged the breaking change.
|
||||
BreakClients(spacetimedb_lib::Hash),
|
||||
}
|
||||
|
||||
impl MigrationPolicy {
|
||||
/// Verifies whether the given migration plan is allowed under the current policy.
|
||||
///
|
||||
/// Returns `Ok(())` if allowed, otherwise an appropriate `MigrationPolicyError`
|
||||
fn permits_plan(&self, plan: &MigratePlan<'_>, token: &MigrationToken) -> anyhow::Result<(), MigrationPolicyError> {
|
||||
match self {
|
||||
MigrationPolicy::Compatible => {
|
||||
if plan.breaks_client() {
|
||||
Err(MigrationPolicyError::ClientBreakingChangeDisallowed)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
MigrationPolicy::BreakClients(expected_hash) => {
|
||||
if token.hash() == *expected_hash {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(MigrationPolicyError::InvalidToken)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to generate a migration plan and validate it under this policy.
|
||||
///
|
||||
/// Fails if migration is not permitted by the policy or migration planning fails.
|
||||
pub fn try_migrate<'def>(
|
||||
&self,
|
||||
database_identity: Identity,
|
||||
old_module_hash: spacetimedb_lib::Hash,
|
||||
old_module_def: &'def ModuleDef,
|
||||
new_module_hash: spacetimedb_lib::Hash,
|
||||
new_module_def: &'def ModuleDef,
|
||||
) -> anyhow::Result<MigratePlan<'def>, MigrationPolicyError> {
|
||||
let plan = ponder_migrate(old_module_def, new_module_def).map_err(MigrationPolicyError::AutoMigrateFailure)?;
|
||||
|
||||
let token = MigrationToken {
|
||||
database_identity,
|
||||
old_module_hash,
|
||||
new_module_hash,
|
||||
};
|
||||
self.permits_plan(&plan, &token)?;
|
||||
Ok(plan)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum MigrationPolicyError {
|
||||
#[error("Automatic migration planning failed")]
|
||||
AutoMigrateFailure(ErrorStream<AutoMigrateError>),
|
||||
|
||||
#[error("Token provided is invalid or does not match expected hash")]
|
||||
InvalidToken,
|
||||
|
||||
#[error("Migration plan contains a client-breaking change which is disallowed under current policy")]
|
||||
ClientBreakingChangeDisallowed,
|
||||
}
|
||||
|
||||
/// A token acknowledging a breaking migration.
|
||||
///
|
||||
/// Note: This token is only intended as a UX safeguard, not as a security measure.
|
||||
/// No secret is used in its generation, which means anyone can reproduce it given
|
||||
/// the inputs. That is acceptable for our purposes since it only signals user intent,
|
||||
/// not authorization.
|
||||
pub struct MigrationToken {
|
||||
pub database_identity: Identity,
|
||||
pub old_module_hash: spacetimedb_lib::Hash,
|
||||
pub new_module_hash: spacetimedb_lib::Hash,
|
||||
}
|
||||
|
||||
impl MigrationToken {
|
||||
pub fn hash(&self) -> spacetimedb_lib::Hash {
|
||||
hash_bytes(
|
||||
format!(
|
||||
"{}{}{}",
|
||||
self.database_identity.to_hex(),
|
||||
self.old_module_hash.to_hex(),
|
||||
self.new_module_hash.to_hex()
|
||||
)
|
||||
.as_str(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// A plan for a manual migration.
|
||||
/// `new` must have a reducer marked with `Lifecycle::Update`.
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -28,6 +28,7 @@ spacetimedb-datastore.workspace = true
|
||||
spacetimedb-lib.workspace = true
|
||||
spacetimedb-paths.workspace = true
|
||||
spacetimedb-table.workspace = true
|
||||
spacetimedb-schema.workspace = true
|
||||
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -5,7 +5,7 @@ pub mod version;
|
||||
|
||||
use crate::control_db::ControlDb;
|
||||
use crate::subcommands::{extract_schema, start};
|
||||
use anyhow::{ensure, Context, Ok};
|
||||
use anyhow::{ensure, Context as _, Ok};
|
||||
use async_trait::async_trait;
|
||||
use clap::{ArgMatches, Command};
|
||||
use spacetimedb::client::ClientActorIndex;
|
||||
@@ -13,7 +13,8 @@ use spacetimedb::config::{CertificateAuthority, MetadataFile};
|
||||
use spacetimedb::db::{self, relational_db};
|
||||
use spacetimedb::energy::{EnergyBalance, EnergyQuanta, NullEnergyMonitor};
|
||||
use spacetimedb::host::{
|
||||
DiskStorage, DurabilityProvider, ExternalDurability, HostController, StartSnapshotWatcher, UpdateDatabaseResult,
|
||||
DiskStorage, DurabilityProvider, ExternalDurability, HostController, MigratePlanResult, StartSnapshotWatcher,
|
||||
UpdateDatabaseResult,
|
||||
};
|
||||
use spacetimedb::identity::Identity;
|
||||
use spacetimedb::messages::control_db::{Database, Node, Replica};
|
||||
@@ -28,6 +29,7 @@ use spacetimedb_datastore::db_metrics::DB_METRICS;
|
||||
use spacetimedb_datastore::traits::Program;
|
||||
use spacetimedb_paths::server::{ModuleLogsDir, PidFile, ServerDataDir};
|
||||
use spacetimedb_paths::standalone::StandaloneDataDirExt;
|
||||
use spacetimedb_schema::auto_migrate::{MigrationPolicy, PrettyPrintStyle};
|
||||
use spacetimedb_table::page_pool::PagePool;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -239,6 +241,7 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv {
|
||||
&self,
|
||||
publisher: &Identity,
|
||||
spec: spacetimedb_client_api::DatabaseDef,
|
||||
policy: MigrationPolicy,
|
||||
) -> anyhow::Result<Option<UpdateDatabaseResult>> {
|
||||
let existing_db = self.control_db.get_database_by_identity(&spec.database_identity)?;
|
||||
|
||||
@@ -295,7 +298,7 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv {
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("No leader for database"))?;
|
||||
let update_result = leader
|
||||
.update(database, spec.host_type, spec.program_bytes.into())
|
||||
.update(database, spec.host_type, spec.program_bytes.into(), policy)
|
||||
.await?;
|
||||
if update_result.was_successful() {
|
||||
let replicas = self.control_db.get_replicas_by_database(database_id)?;
|
||||
@@ -345,6 +348,30 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv {
|
||||
}
|
||||
}
|
||||
|
||||
async fn migrate_plan(
|
||||
&self,
|
||||
spec: spacetimedb_client_api::DatabaseDef,
|
||||
style: PrettyPrintStyle,
|
||||
) -> anyhow::Result<MigratePlanResult> {
|
||||
let existing_db = self.control_db.get_database_by_identity(&spec.database_identity)?;
|
||||
|
||||
match existing_db {
|
||||
Some(db) => {
|
||||
let host = self
|
||||
.leader(db.id)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("No leader for database"))?;
|
||||
self.host_controller
|
||||
.migrate_plan(db, spec.host_type, host.replica_id, spec.program_bytes.into(), style)
|
||||
.await
|
||||
}
|
||||
None => anyhow::bail!(
|
||||
"Database `{}` does not exist",
|
||||
spec.database_identity.to_abbreviated_hex()
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()> {
|
||||
let Some(database) = self.control_db.get_database_by_identity(database_identity)? else {
|
||||
return Ok(());
|
||||
|
||||
@@ -12,6 +12,7 @@ use spacetimedb::Identity;
|
||||
use spacetimedb_client_api::auth::SpacetimeAuth;
|
||||
use spacetimedb_client_api::routes::subscribe::{generate_random_connection_id, WebSocketOptions};
|
||||
use spacetimedb_paths::{RootDir, SpacetimePaths};
|
||||
use spacetimedb_schema::auto_migrate::MigrationPolicy;
|
||||
use spacetimedb_schema::def::ModuleDef;
|
||||
use tokio::runtime::{Builder, Runtime};
|
||||
|
||||
@@ -205,6 +206,7 @@ impl CompiledModule {
|
||||
num_replicas: None,
|
||||
host_type: HostType::Wasm,
|
||||
},
|
||||
MigrationPolicy::Compatible,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user