From 770339dafba695b4e02a1d6bfe5960ef80fa264f Mon Sep 17 00:00:00 2001 From: Mario Alejandro Montoya Cortes Date: Wed, 11 Jun 2025 11:19:37 -0500 Subject: [PATCH] Lift restriction that multi-col indices only allow = constraints #1317 benches --- crates/core/src/estimation.rs | 2 + crates/core/src/sql/compiler.rs | 37 +- crates/core/src/subscription/subscription.rs | 22 +- crates/execution/src/iter.rs | 22 + crates/execution/src/lib.rs | 2 +- crates/execution/src/pipelined.rs | 82 ++- crates/physical-plan/src/plan.rs | 503 ++++++++++++-- crates/physical-plan/src/printer.rs | 13 +- crates/physical-plan/src/rules.rs | 685 +++++++------------ crates/sql-parser/src/ast/mod.rs | 2 +- 10 files changed, 841 insertions(+), 529 deletions(-) diff --git a/crates/core/src/estimation.rs b/crates/core/src/estimation.rs index 64892e9b9b..5271c0904f 100644 --- a/crates/core/src/estimation.rs +++ b/crates/core/src/estimation.rs @@ -14,6 +14,7 @@ pub fn num_rows(tx: &Tx, expr: &QueryExpr) -> u64 { pub fn estimate_rows_scanned(tx: &Tx, plan: &PhysicalPlan) -> u64 { match plan { PhysicalPlan::TableScan(..) | PhysicalPlan::IxScan(..) => row_estimate(tx, plan), + PhysicalPlan::IxScansAnd(plans) => plans.iter().map(|p| estimate_rows_scanned(tx, p)).sum(), PhysicalPlan::Filter(input, _) => estimate_rows_scanned(tx, input).saturating_add(row_estimate(tx, input)), PhysicalPlan::NLJoin(lhs, rhs) => estimate_rows_scanned(tx, lhs) .saturating_add(estimate_rows_scanned(tx, rhs)) @@ -51,6 +52,7 @@ pub fn row_estimate(tx: &Tx, plan: &PhysicalPlan) -> u64 { // Use a row limit as the estimate if present PhysicalPlan::TableScan(TableScan { limit: Some(n), .. }, _) | PhysicalPlan::IxScan(IxScan { limit: Some(n), .. }, _) => *n, + PhysicalPlan::IxScansAnd(idx) => idx.iter().map(|plan| row_estimate(tx, plan)).sum(), // Table scans return the number of rows in the table PhysicalPlan::TableScan( TableScan { diff --git a/crates/core/src/sql/compiler.rs b/crates/core/src/sql/compiler.rs index ff0513f79c..0d17c40800 100644 --- a/crates/core/src/sql/compiler.rs +++ b/crates/core/src/sql/compiler.rs @@ -507,15 +507,14 @@ Seq Scan on test db.create_table_for_test("test", schema, indexes)?; let tx = begin_tx(&db); - // TODO: Need support for index range scans. expect_query( &tx, "select * from test where b > 2", expect![ r#" -Seq Scan on test - Output: test.a, test.b - -> Filter: (test.b > U64(2))"# +Index Scan using Index test_b_idx_btree (test.b) on test + Index Cond: (test.b > U64(2)) + Output: test.a, test.b"# ], ); @@ -532,15 +531,15 @@ Seq Scan on test db.create_table_for_test("test", schema, indexes)?; let tx = begin_tx(&db); - //TODO(sql): Need support for index scans for ranges expect_query( &tx, "select * from test where b > 2 and b < 5", expect![ r#" -Seq Scan on test +Index Scan using Index test_b_idx_btree (test.b) on test + Index Cond: (test.b > U64(2)) Output: test.a, test.b - -> Filter: (test.b > U64(2) AND test.b < U64(5))"# + -> Filter: (test.b < U64(5))"# ], ); @@ -557,17 +556,20 @@ Seq Scan on test db.create_table_for_test("test", schema, indexes)?; let tx = begin_tx(&db); - // Note, order matters - the equality condition occurs first which - // means an index scan will be generated rather than the range condition. expect_query( &tx, "select * from test where a = 3 and b > 2 and b < 5", expect![ r#" -Index Scan using Index test_a_idx_btree (test.a) on test - Index Cond: (test.a = U64(3)) +Union + -> Index Scan using Index test_a_idx_btree (test.a) on test + Index Cond: (test.a = U64(3)) + Output: test.a, test.b + -> Index Scan using Index test_b_idx_btree (test.b) on test + Index Cond: (test.b > U64(2)) + Output: test.a, test.b Output: test.a, test.b - -> Filter: (test.b < U64(5) AND test.b > U64(2))"# + -> Filter: (test.b < U64(5) AND test.a = U64(3) AND test.b > U64(2))"# ], ); @@ -710,9 +712,9 @@ Hash Join: Lhs Index Cond: (lhs.a = U64(3)) Output: lhs.a, lhs.b -> Hash Build: rhs.b - -> Seq Scan on rhs - Output: rhs.b, rhs.c - -> Filter: (rhs.c < U64(4))"# + -> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs + Index Cond: (rhs.c < U64(4)) + Output: rhs.b, rhs.c"# ], ); @@ -749,9 +751,10 @@ Index Join: Rhs on lhs Inner Unique: false Join Cond: (rhs.b = lhs.b) Output: lhs.a, lhs.b - -> Seq Scan on rhs + -> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs + Index Cond: (rhs.c > U64(2)) Output: rhs.b, rhs.c, rhs.d - -> Filter: (rhs.c > U64(2) AND rhs.c < U64(4) AND rhs.d = U64(3))"# + -> Filter: (rhs.c < U64(4) AND rhs.d = U64(3))"# ], ); diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 40e34f4d77..271473ad35 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -710,7 +710,6 @@ mod tests { panic!("expected an index join, but got {join:#?}"); }; - //TODO(sql): Remove manual checks to just `EXPLAIN` the query. expect_sub( &tx, sql, @@ -720,9 +719,10 @@ Index Join: Rhs on lhs Inner Unique: false Join Cond: (rhs.b = lhs.b) Output: lhs.a, lhs.b - -> Seq Scan on rhs + -> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs + Index Cond: (rhs.c > U64(2)) Output: rhs.b, rhs.c, rhs.d - -> Filter: (rhs.c > U64(2) AND rhs.c < U64(4) AND rhs.d = U64(3))"# + -> Filter: (rhs.c < U64(4) AND rhs.d = U64(3))"# ], ); @@ -806,8 +806,7 @@ Index Join: Rhs on lhs panic!("expected an index join, but got {join:#?}"); }; - //TODO(sql): Remove manual checks to just `EXPLAIN` the query. - // Why this generate same plan than the previous test? 'compile_incremental_index_join_index_side' + //TODO(sql): Why this generate same plan than the previous test? 'compile_incremental_index_join_index_side' expect_sub( &tx, sql, @@ -817,9 +816,10 @@ Index Join: Rhs on lhs Inner Unique: false Join Cond: (rhs.b = lhs.b) Output: lhs.a, lhs.b - -> Seq Scan on rhs + -> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs + Index Cond: (rhs.c > U64(2)) Output: rhs.b, rhs.c, rhs.d - -> Filter: (rhs.c > U64(2) AND rhs.c < U64(4) AND rhs.d = U64(3))"# + -> Filter: (rhs.c < U64(4) AND rhs.d = U64(3))"# ], ); @@ -909,8 +909,7 @@ Index Join: Rhs on lhs "expected an index join, but got {src_join:#?}" ); - //TODO(sql): Remove manual checks to just `EXPLAIN` the query. - // Why this generate same plan than the previous test? 'compile_incremental_index_join_index_side' + //TODO(sql): Why this generate same plan than the previous test? 'compile_incremental_index_join_index_side' expect_sub( &tx, sql, @@ -920,9 +919,10 @@ Index Join: Rhs on lhs Inner Unique: false Join Cond: (rhs.b = lhs.b) Output: lhs.a, lhs.b - -> Seq Scan on rhs + -> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs + Index Cond: (rhs.c > U64(2)) Output: rhs.b, rhs.c, rhs.d - -> Filter: (rhs.c > U64(2) AND rhs.c < U64(4) AND rhs.d = U64(3))"# + -> Filter: (rhs.c < U64(4) AND rhs.d = U64(3))"# ], ); diff --git a/crates/execution/src/iter.rs b/crates/execution/src/iter.rs index 0dee161ca5..00a727b5c0 100644 --- a/crates/execution/src/iter.rs +++ b/crates/execution/src/iter.rs @@ -75,6 +75,7 @@ pub enum Iter<'a> { Row(RowRefIter<'a>), Join(LeftDeepJoinIter<'a>), Filter(Filter<'a, Iter<'a>>), + FilterIdx(FilterIdx<'a>), } impl<'a> Iterator for Iter<'a> { @@ -85,6 +86,7 @@ impl<'a> Iterator for Iter<'a> { Self::Row(iter) => iter.next().map(Tuple::Row), Self::Join(iter) => iter.next(), Self::Filter(iter) => iter.next(), + Self::FilterIdx(iter) => iter.next(), } } } @@ -96,6 +98,12 @@ impl<'a> Iter<'a> { { match plan { PhysicalPlan::TableScan(..) | PhysicalPlan::IxScan(..) => RowRefIter::build(plan, tx).map(Self::Row), + PhysicalPlan::IxScansAnd(input) => { + let input: Vec<_> = input.iter().map(|plan| Iter::build(plan, tx)).collect::>()?; + Ok(Iter::FilterIdx(FilterIdx { + input: input.into_boxed_slice(), + })) + } PhysicalPlan::Filter(input, expr) => { // Build a filter iterator Iter::build(input, tx) @@ -1050,3 +1058,17 @@ impl<'a> Iterator for Filter<'a, Iter<'a>> { self.input.find(|tuple| self.expr.eval_bool(tuple)) } } + +/// A tuple-at-a-time filter iterator based on check multiple indexes +pub struct FilterIdx<'a> { + input: Box<[Iter<'a>]>, +} + +impl<'a> Iterator for FilterIdx<'a> { + type Item = Tuple<'a>; + + fn next(&mut self) -> Option { + // Find the row that matches all the indexes + self.input.iter_mut().find_map(|iter| iter.next()) + } +} diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index 8fb8e50e59..dd35570d40 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -190,7 +190,7 @@ impl ProjectField for Row<'_> { } /// Each query operator returns a tuple of [RowRef]s -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub enum Tuple<'a> { /// A pointer to a row in a base table Row(Row<'a>), diff --git a/crates/execution/src/pipelined.rs b/crates/execution/src/pipelined.rs index 2a43f0382d..aa84de82e2 100644 --- a/crates/execution/src/pipelined.rs +++ b/crates/execution/src/pipelined.rs @@ -186,6 +186,7 @@ impl PipelinedProject { pub enum PipelinedExecutor { TableScan(PipelinedScan), IxScan(PipelinedIxScan), + IxScansAnd(PipelinedIxScanAnd), IxJoin(PipelinedIxJoin), IxDeltaScan(PipelinedIxDeltaScan), IxDeltaJoin(PipelinedIxDeltaJoin), @@ -205,6 +206,12 @@ impl From for PipelinedExecutor { }), PhysicalPlan::IxScan(scan @ IxScan { delta: None, .. }, _) => Self::IxScan(scan.into()), PhysicalPlan::IxScan(scan, _) => Self::IxDeltaScan(scan.into()), + PhysicalPlan::IxScansAnd(scans) => { + // Apply the filter to each scan to form an intersection + Self::IxScansAnd(PipelinedIxScanAnd { + scans: scans.into_iter().map(PipelinedExecutor::from).collect(), + }) + } PhysicalPlan::IxJoin( IxJoin { lhs, @@ -292,7 +299,7 @@ impl PipelinedExecutor { lhs.visit(f); rhs.visit(f); } - Self::TableScan(..) | Self::IxScan(..) | Self::IxDeltaScan(..) => {} + Self::TableScan(..) | Self::IxScan(..) | Self::IxDeltaScan(..) | Self::IxScansAnd(..) => {} } } @@ -301,6 +308,7 @@ impl PipelinedExecutor { match self { Self::TableScan(scan) => scan.is_empty(tx), Self::IxScan(scan) => scan.is_empty(tx), + Self::IxScansAnd(scan) => scan.is_empty(tx), Self::IxDeltaScan(scan) => scan.is_empty(tx), Self::IxJoin(join) => join.is_empty(tx), Self::IxDeltaJoin(join) => join.is_empty(tx), @@ -320,6 +328,7 @@ impl PipelinedExecutor { match self { Self::TableScan(scan) => scan.execute(tx, metrics, f), Self::IxScan(scan) => scan.execute(tx, metrics, f), + Self::IxScansAnd(scan) => scan.execute(tx, metrics, f), Self::IxDeltaScan(scan) => scan.execute(tx, metrics, f), Self::IxJoin(join) => join.execute(tx, metrics, f), Self::IxDeltaJoin(join) => join.execute(tx, metrics, f), @@ -567,6 +576,7 @@ pub struct PipelinedIxScan { pub table_id: TableId, /// The index id pub index_id: IndexId, + pub index_is_multi_column: bool, pub limit: Option, /// An equality prefix for multi-column scans pub prefix: Vec, @@ -578,6 +588,12 @@ pub struct PipelinedIxScan { impl From for PipelinedIxScan { fn from(scan: IxScan) -> Self { + let index_is_multi_column = scan + .schema + .indexes + .iter() + .find(|i| i.index_id == scan.index_id) + .is_some_and(|index| index.index_algorithm.columns().iter().count() > 1); match scan { IxScan { schema, @@ -589,6 +605,7 @@ impl From for PipelinedIxScan { } => Self { table_id: schema.table_id, index_id, + index_is_multi_column, limit, prefix: prefix.into_iter().map(|(_, v)| v).collect(), lower: Bound::Included(v.clone()), @@ -604,6 +621,7 @@ impl From for PipelinedIxScan { } => Self { table_id: schema.table_id, index_id, + index_is_multi_column, limit, prefix: prefix.into_iter().map(|(_, v)| v).collect(), lower, @@ -672,7 +690,7 @@ impl PipelinedIxScan { f(t) }; match self.prefix.as_slice() { - [] => { + [] if !self.index_is_multi_column => { for ptr in single_col_limit_scan(self.limit.map(|n| n as usize))? .map(Row::Ptr) .map(Tuple::Row) @@ -695,6 +713,66 @@ impl PipelinedIxScan { } } +/// A pipelined executor for scanning multiple indexes +#[derive(Debug)] +pub struct PipelinedIxScanAnd { + scans: Vec, +} + +impl PipelinedIxScanAnd { + /// Does this operation contain an empty scan? + pub fn is_empty(&self, tx: &impl DeltaStore) -> bool { + self.scans.iter().all(|scan| scan.is_empty(tx)) + } + + /// Executes the pipelined index scans, producing tuples when all + pub fn execute<'a, Tx: Datastore + DeltaStore>( + &self, + tx: &'a Tx, + metrics: &mut ExecutionMetrics, + f: &mut dyn FnMut(Tuple<'a>) -> Result<()>, + ) -> Result<()> { + let scans = match self.scans.as_slice() { + [] => return Ok(()), // No scans to execute + [scan] => { + // Single scan, execute directly + return scan.execute(tx, metrics, f); + } + scans => scans, + }; + + // Execute first scan and materialize all rows + let mut first_rows = HashSet::new(); + scans[0].execute(tx, metrics, &mut |t| { + first_rows.insert(t); + Ok(()) + })?; + + // Intersect with remaining scans + for scan in &scans[1..] { + let mut current_rows = HashSet::new(); + scan.execute(tx, metrics, &mut |t| { + if first_rows.contains(&t) { + current_rows.insert(t); + } + Ok(()) + })?; + first_rows = current_rows; + if first_rows.is_empty() { + break; + } + } + + // Emit results + for t in first_rows { + metrics.rows_scanned += 1; + f(t)?; + } + + Ok(()) + } +} + /// A pipelined index join executor #[derive(Debug)] pub struct PipelinedIxJoin { diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index 8406497b22..16703044c2 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -15,9 +15,8 @@ use spacetimedb_sql_parser::ast::{BinOp, LogOp}; use spacetimedb_table::table::RowRef; use crate::rules::{ - ComputePositions, HashToIxJoin, IxScanAnd, IxScanEq, IxScanEq2Col, IxScanEq3Col, PullFilterAboveHashJoin, - PushConstAnd, PushConstEq, PushLimit, ReorderDeltaJoinRhs, ReorderHashJoin, RewriteRule, UniqueHashJoinRule, - UniqueIxJoinRule, + ComputePositions, HashToIxJoin, IxScanBinOp, IxScanOpMultiCol, PullFilterAboveHashJoin, PushConstAnd, PushConstEq, + PushLimit, ReorderDeltaJoinRhs, ReorderHashJoin, RewriteRule, UniqueHashJoinRule, UniqueIxJoinRule, }; /// Table aliases are replaced with labels in the physical plan @@ -201,6 +200,8 @@ pub enum PhysicalPlan { TableScan(TableScan, Label), /// Fetch row ids from an index IxScan(IxScan, Label), + /// Fetch rows from using an intersection from index scans + IxScansAnd(Vec), /// An index join + projection IxJoin(IxJoin, Semi), /// A hash join + projection @@ -223,7 +224,7 @@ impl PhysicalPlan { lhs.visit(f); rhs.visit(f); } - Self::TableScan(..) | Self::IxScan(..) => {} + Self::TableScan(..) | Self::IxScan(..) | Self::IxScansAnd(..) => {} } } @@ -238,7 +239,7 @@ impl PhysicalPlan { lhs.visit_mut(f); rhs.visit_mut(f); } - Self::TableScan(..) | Self::IxScan(..) => {} + Self::TableScan(..) | Self::IxScan(..) | Self::IxScansAnd(..) => {} } } @@ -271,7 +272,7 @@ impl PhysicalPlan { }, semi, ), - plan @ Self::TableScan(..) | plan @ Self::IxScan(..) => plan, + plan @ Self::TableScan(..) | plan @ Self::IxScan(..) | plan @ Self::IxScansAnd(..) => plan, } } @@ -290,7 +291,7 @@ impl PhysicalPlan { plan.any(&|plan| ok(plan).is_some()) }; Ok(match self { - Self::TableScan(..) | Self::IxScan(..) => self, + Self::TableScan(..) | Self::IxScan(..) | Self::IxScansAnd(..) => self, Self::NLJoin(lhs, rhs) => { if matches(&lhs) { return Ok(Self::NLJoin(Box::new(lhs.map_if(f, ok)?), rhs)); @@ -393,10 +394,8 @@ impl PhysicalPlan { .apply_rec::()? .apply_rec::()? .apply_rec::()? - .apply_rec::()? - .apply_rec::()? - .apply_rec::()? - .apply_rec::()? + .apply_rec::()? + .apply_rec::()? .apply_rec::()? .apply_rec::()? .apply_rec::()? @@ -831,6 +830,7 @@ impl PhysicalPlan { fn nfields(&self) -> usize { match self { Self::TableScan(..) | Self::IxScan(..) | Self::IxJoin(_, Semi::Rhs) => 1, + Self::IxScansAnd(plans) => plans.iter().map(PhysicalPlan::nfields).sum(), Self::Filter(input, _) => input.nfields(), Self::IxJoin(join, Semi::Lhs) => join.lhs.nfields(), Self::IxJoin(join, Semi::All) => join.lhs.nfields() + 1, @@ -859,6 +859,7 @@ impl PhysicalPlan { | PhysicalPlan::IxJoin(IxJoin { rhs_label: alias, .. }, Semi::Rhs) => { labels.push(*alias); } + PhysicalPlan::IxScansAnd(idx) => labels.extend(idx.iter().flat_map(|plan| plan.labels())), PhysicalPlan::Filter(input, _) | PhysicalPlan::IxJoin(IxJoin { lhs: input, .. }, Semi::Lhs) | PhysicalPlan::HashJoin(HashJoin { lhs: input, .. }, Semi::Lhs) @@ -983,6 +984,17 @@ pub enum Sarg { } impl Sarg { + pub fn from_op(op: BinOp, col: ColId, value: AlgebraicValue) -> Self { + match op { + BinOp::Eq => Sarg::Eq(col, value), + BinOp::Ne => unreachable!("Cannot create a search argument for inequality"), + BinOp::Lt => Sarg::Range(col, Bound::Unbounded, Bound::Excluded(value)), + BinOp::Gt => Sarg::Range(col, Bound::Excluded(value), Bound::Unbounded), + BinOp::Lte => Sarg::Range(col, Bound::Unbounded, Bound::Included(value)), + BinOp::Gte => Sarg::Range(col, Bound::Included(value), Bound::Unbounded), + } + } + /// Decodes the sarg into a binary operator pub fn to_op(&self) -> BinOp { match self { @@ -1227,7 +1239,7 @@ pub mod tests_utils { Ok(compile(plan)) } - fn query<'a>(db: &'a impl SchemaView, auth: &AuthCtx, sql: &'a str) -> TypingResult> { + pub fn query<'a>(db: &'a impl SchemaView, auth: &AuthCtx, sql: &'a str) -> TypingResult> { let plan = compile_sql_stmt_with_ctx(sql, db, auth, true)?; Ok(compile(plan)) } @@ -1241,6 +1253,16 @@ pub mod tests_utils { let explain = Explain::new(&plan).with_options(options).build(); expect.assert_eq(&explain.to_string()); } + #[cfg(test)] + fn check_assert(plan: PhysicalCtx, options: ExplainOptions, expect: String) { + let plan = if options.optimize { + plan.optimize().unwrap() + } else { + plan + }; + let explain = Explain::new(&plan).with_options(options).build(); + pretty_assertions::assert_eq!(explain.to_string(), expect, "{}", plan.sql) + } pub fn check_sub( db: &impl SchemaView, @@ -1253,6 +1275,11 @@ pub mod tests_utils { check(plan, options, expect); Ok(()) } + #[cfg(test)] + pub fn check_sub_assert(db: &impl SchemaView, options: ExplainOptions, auth: &AuthCtx, sql: &str, expect: String) { + let plan = sub(db, auth, sql).unwrap(); + check_assert(plan, options, expect); + } pub fn check_query( db: &impl SchemaView, @@ -1265,13 +1292,25 @@ pub mod tests_utils { check(plan, options, expect); Ok(()) } + #[cfg(test)] + pub fn check_query_assert( + db: &impl SchemaView, + options: ExplainOptions, + auth: &AuthCtx, + sql: &str, + expect: String, + ) { + let plan = query(db, auth, sql).unwrap(); + check_assert(plan, options, expect); + } } #[cfg(test)] mod tests { use super::*; - use crate::printer::ExplainOptions; + use crate::plan::tests_utils::query; + use crate::printer::{Explain, ExplainOptions}; use expect_test::{expect, Expect}; use spacetimedb_expr::check::SchemaView; use spacetimedb_lib::identity::AuthCtx; @@ -1384,10 +1423,18 @@ mod tests { tests_utils::check_sub(db, db.options, &AuthCtx::for_testing(), sql, expect).unwrap(); } + fn check_sub_assert(db: &SchemaViewer, sql: &str, expect: String) { + tests_utils::check_sub_assert(db, db.options, &AuthCtx::for_testing(), sql, expect); + } + fn check_query(db: &SchemaViewer, sql: &str, expect: Expect) { tests_utils::check_query(db, db.options, &AuthCtx::for_testing(), sql, expect).unwrap(); } + fn check_query_assert(db: &SchemaViewer, sql: &str, expect: String) { + tests_utils::check_query_assert(db, db.options, &AuthCtx::for_testing(), sql, expect); + } + fn data() -> SchemaViewer { let m_id = TableId(1); let w_id = TableId(2); @@ -1420,7 +1467,16 @@ mod tests { Some(0), )); - SchemaViewer::new(vec![m.clone(), w.clone(), p.clone()]).with_options(ExplainOptions::default().optimize(false)) + let x = Arc::new(schema( + m_id, + "test", + &[("x", AlgebraicType::I32), ("y", AlgebraicType::I32)], + &[&[0], &[1]], + &[&[0]], + Some(0), + )); + SchemaViewer::new(vec![m.clone(), w.clone(), p.clone(), x.clone()]) + .with_options(ExplainOptions::default().optimize(false)) } #[test] @@ -1591,26 +1647,234 @@ Seq Scan on t ); } - /// Test single and multi-column index selections - #[test] - fn index_scans() { - let t_id = TableId(1); - + fn make_table_index() -> SchemaViewer { let t = Arc::new(schema( - t_id, + TableId(1), "t", &[ ("w", AlgebraicType::U8), ("x", AlgebraicType::U8), ("y", AlgebraicType::U8), ("z", AlgebraicType::U8), + ("id", AlgebraicType::U64), ], - &[&[1], &[2, 3], &[1, 2, 3]], + &[&[1], &[2, 3], &[1, 2, 3], &[0, 1, 2, 3], &[0]], &[], None, )); - let db = SchemaViewer::new(vec![t.clone()]).optimize(true); + SchemaViewer::new(vec![t]).optimize(true) + } + + // We optimize with the following rules in mind: + // - When all the comparisons are`!=` is always a full table scan + // - Multi-column indexes are only used if the query has a prefix match (ie all operators are `=`) + // - Else are converted to a single column index scan on the leftmost column and a filter on the rest + + /// Test index selections on 1 column + #[test] + fn index_scans_and() { + let t = Arc::new(schema( + TableId(1), + "t", + &[ + ("a", AlgebraicType::U8), + ("b", AlgebraicType::U8), + ("c", AlgebraicType::U8), + ], + &[&[1], &[2], &[0, 1, 2]], + &[], + None, + )); + + let db = SchemaViewer::new(vec![t]).optimize(true); + + check_sub( + &db, + "select * from t where a >= 3 and a <= 5", + expect![ + r#" +Index Scan using Index id 2 (t.a, t.b, t.c) on t + Index Cond: (t.a >= U8(3)) + Output: t.a, t.b, t.c + -> Filter: (t.a <= U8(5))"# + ], + ); + } + /// Test index selections on 1 column + #[test] + fn index_scans_1_col() { + let db = make_table_index(); + + for op in ["=", ">", "<", ">=", "<="] { + check_sub_assert( + &db, + &format!("SELECT * FROM t WHERE x {op} 4"), + format!( + "Index Scan using Index id 0 (t.x) on t + Index Cond: (t.x {op} U8(4)) + Output: t.w, t.x, t.y, t.z, t.id", + ), + ) + } + // `!=` is not supported in index scans + check_query( + &db, + "select * from t where x != 4", + expect![ + r#" +Seq Scan on t + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.x <> U8(4))"# + ], + ); + + // Select index on x + check_query( + &db, + "select * from t where x = 5 and id = 4", + expect![ + r#" +Index Scan using Index id 0 (t.x) on t + Index Cond: (t.x = U8(5)) + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.id = U64(4))"# + ], + ); + + // Do not select index on (y, z) + check_query( + &db, + "select * from t where y = 1", + expect![ + r#" +Seq Scan on t + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.y = U8(1))"# + ], + ); + + //Query multiple times the same index + check_query( + &db, + "select * from t where x = 5 and x = 6 and x = 4", + expect![ + r#" +Union + -> Index Scan using Index id 0 (t.x) on t + Index Cond: (t.x = U8(6)) + Output: t.w, t.x, t.y, t.z, t.id + -> Index Scan using Index id 0 (t.x) on t + Index Cond: (t.x = U8(4)) + Output: t.w, t.x, t.y, t.z, t.id + -> Index Scan using Index id 0 (t.x) on t + Index Cond: (t.x = U8(5)) + Output: t.w, t.x, t.y, t.z, t.id + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.x = U8(6) AND t.x = U8(4) AND t.x = U8(5))"# + ], + ); + + check_query( + &db, + "select * from t where x = 5 or x = 6 or x = 4", + expect![ + r#" +Seq Scan on t + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.x = U8(5) OR t.x = U8(6) OR t.x = U8(4))"# + ], + ); + } + + /// Test index selections on 2 columns + #[test] + fn index_scans_2_col() { + let db = make_table_index(); + + // Select index on [y, z] + check_query( + &db, + "select * from t where y = 1 and z = 2", + expect![ + r#" +Index Scan using Index id 1 (t.y, t.z) on t + Index Cond: (t.y = U8(1), t.z = U8(2)) + Output: t.w, t.x, t.y, t.z, t.id"# + ], + ); + + for op in [">", "<", ">=", "<="] { + check_query_assert( + &db, + &format!("select * from t where y {op} 1 and z {op} 2"), + format!( + r#"Index Scan using Index id 1 (t.y, t.z) on t + Index Cond: (t.y {op} U8(1)) + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.z {op} U8(2))"# + ), + ); + + // Check permutations of the same query + check_query_assert( + &db, + &format!("select * from t where z {op} 2 and y {op} 1"), + format!( + r#"Index Scan using Index id 1 (t.y, t.z) on t + Index Cond: (t.y {op} U8(1)) + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.z {op} U8(2))"# + ), + ); + + check_query_assert( + &db, + &format!("select * from t where z != 2 and y {op} 1"), + format!( + r#"Index Scan using Index id 1 (t.y, t.z) on t + Index Cond: (t.y {op} U8(1)) + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.z <> U8(2))"# + ), + ); + } + + // Select index on (y, z), (w) and filter on (id) + check_query( + &db, + "select * from t where w = 1 and y = 2 and z = 3 and id = 4", + expect![ + r#" +Union + -> Index Scan using Index id 1 (t.y, t.z) on t + Index Cond: (t.y = U8(2), t.z = U8(3)) + Output: t.w, t.x, t.y, t.z, t.id + -> Index Scan using Index id 3 (t.w, t.x, t.y, t.z) on t + Index Cond: (t.w = U8(1)) + Output: t.w, t.x, t.y, t.z, t.id + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.id = U64(4) AND t.y = U8(2) AND t.z = U8(3) AND t.w = U8(1))"# + ], + ); + + // `!=` is not supported in index scans + check_query( + &db, + "select * from t where y != 1 and z != 2", + expect![ + r#" +Seq Scan on t + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.y <> U8(1) AND t.z <> U8(2))"# + ], + ); + } + + /// Test index selections on 3 columns + #[test] + fn index_scans_3_col() { + let db = make_table_index(); // Select index on (x, y, z) check_sub( &db, @@ -1619,7 +1883,7 @@ Seq Scan on t r#" Index Scan using Index id 2 (t.x, t.y, t.z) on t Index Cond: (x.x = U8(3), x.y = U8(4), x.z = U8(5)) - Output: x.w, x.x, x.y, x.z"# + Output: x.w, x.x, x.y, x.z, x.id"# ], ); @@ -1631,78 +1895,130 @@ Index Scan using Index id 2 (t.x, t.y, t.z) on t r#" Index Scan using Index id 2 (t.x, t.y, t.z) on t Index Cond: (t.x = U8(3), t.y = U8(4), t.z = U8(5)) - Output: t.w, t.x, t.y, t.z"# + Output: t.w, t.x, t.y, t.z, t.id"# ], ); - // Select index on x - check_sub( - &db, - "select * from t where x = 3 and y = 4", - expect![ - r#" -Index Scan using Index id 0 (t.x) on t - Index Cond: (t.x = U8(3)) - Output: t.w, t.x, t.y, t.z - -> Filter: (t.y = U8(4))"# - ], - ); - // Select index on x + + for op in [">", "<", ">=", "<="] { + check_sub_assert( + &db, + &format!("select * from t where x {op} 3 and y {op} 4 and z {op} 5"), + format!( + r#"Union + -> Index Scan using Index id 0 (t.x) on t + Index Cond: (t.x {op} U8(3)) + Output: t.w, t.x, t.y, t.z, t.id + -> Index Scan using Index id 1 (t.y, t.z) on t + Index Cond: (t.y {op} U8(4)) + Output: t.w, t.x, t.y, t.z, t.id + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.z {op} U8(5) AND t.x {op} U8(3) AND t.y {op} U8(4))"# + ), + ); + + // Check permutations of the same query + check_sub_assert( + &db, + &format!("select * from t where z {op} 5 and y {op} 4 and x {op} 3"), + format!( + r#"Union + -> Index Scan using Index id 0 (t.x) on t + Index Cond: (t.x {op} U8(3)) + Output: t.w, t.x, t.y, t.z, t.id + -> Index Scan using Index id 1 (t.y, t.z) on t + Index Cond: (t.y {op} U8(4)) + Output: t.w, t.x, t.y, t.z, t.id + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.z {op} U8(5) AND t.x {op} U8(3) AND t.y {op} U8(4))"# + ), + ); + + check_sub_assert( + &db, + &format!("select * from t where x {op} 3 and y != 4 and z {op} 5"), + format!( + r#"Index Scan using Index id 0 (t.x) on t + Index Cond: (t.x {op} U8(3)) + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.y <> U8(4) AND t.z {op} U8(5))"# + ), + ); + } + + // `!=` is not supported in index scans check_query( &db, - "select * from t where w = 5 and x = 4", - expect![ - r#" -Index Scan using Index id 0 (t.x) on t - Index Cond: (t.x = U8(4)) - Output: t.w, t.x, t.y, t.z - -> Filter: (t.w = U8(5))"# - ], - ); - // Do not select index on (y, z) - check_query( - &db, - "select * from t where y = 1", + "select * from t where x != 3 and y != 4 and z != 5", expect![ r#" Seq Scan on t - Output: t.w, t.x, t.y, t.z - -> Filter: (t.y = U8(1))"# + Output: t.w, t.x, t.y, t.z, t.id + -> Filter: (t.x <> U8(3) AND t.y <> U8(4) AND t.z <> U8(5))"# ], ); - // Select index on [y, z] - check_query( + // Select index on (y, z) with multiple conditions on y + check_sub( &db, - "select * from t where y = 1 and z = 2", + "select * from t as x where y = 4 and y < 5 and id = 6", expect![ r#" -Index Scan using Index id 1 (t.y, t.z) on t - Index Cond: (t.y = U8(1), t.z = U8(2)) - Output: t.w, t.x, t.y, t.z"# +Union + -> Index Scan using Index id 1 (t.y, t.z) on t + Index Cond: (x.y = U8(4)) + Output: x.w, x.x, x.y, x.z, x.id + -> Index Scan using Index id 1 (t.y, t.z) on t + Index Cond: (x.y < U8(5)) + Output: x.w, x.x, x.y, x.z, x.id + Output: x.w, x.x, x.y, x.z, x.id + -> Filter: (x.id = U64(6) AND x.y < U8(5) AND x.y = U8(4))"# + ], + ); + } + + /// Test index selections above 3 columns + #[test] + fn index_scans_after_3_col() { + let db = make_table_index(); + // Select index on (x, y, z) + check_sub( + &db, + "select * from t as x where x = 3 and y = 4 and z = 5 and w = 6", + expect![ + r#" +Index Scan using Index id 3 (t.w, t.x, t.y, t.z) on t + Index Cond: (x.w = U8(6), x.x = U8(3), x.y = U8(4), x.z = U8(5)) + Output: x.w, x.x, x.y, x.z, x.id"# ], ); - // Check permutations of the same query - check_query( + // Test permutations of the same query + check_sub( &db, - "select * from t where z = 2 and y = 1", + "select * from t where z = 5 and y = 4 and w = 6 and x = 3", expect![ r#" -Index Scan using Index id 1 (t.y, t.z) on t - Index Cond: (t.y = U8(1), t.z = U8(2)) - Output: t.w, t.x, t.y, t.z"# +Index Scan using Index id 3 (t.w, t.x, t.y, t.z) on t + Index Cond: (t.w = U8(6), t.x = U8(3), t.y = U8(4), t.z = U8(5)) + Output: t.w, t.x, t.y, t.z, t.id"# ], ); - // Select index on (y, z) and filter on (w) - check_query( + } + + /// Test index selections select the shorter index when multiple indexes match + #[test] + fn index_scans_pick_shorter() { + let db = make_table_index(); + // Select index on (x) instead of (x, y) + check_sub( &db, - "select * from t where w = 1 and y = 2 and z = 3", + "select * from t as x where x = 3 and id > 4", expect![ r#" -Index Scan using Index id 1 (t.y, t.z) on t - Index Cond: (t.y = U8(2), t.z = U8(3)) - Output: t.w, t.x, t.y, t.z - -> Filter: (t.w = U8(1))"# +Index Scan using Index id 0 (t.x) on t + Index Cond: (x.x = U8(3)) + Output: x.w, x.x, x.y, x.z, x.id + -> Filter: (x.id > U64(4))"# ], ); } @@ -2121,4 +2437,49 @@ Limit: 10 Output: p.id, p.name"#]], ); } + + #[test] + fn overflow() { + let db = data().with_options(ExplainOptions::default().optimize(true)); + + let build_query = |total| { + let mut sql = "select * from m where ".to_string(); + for x in 1..total { + let fragment = format!("(manager = {x}) or "); + sql.push_str(&fragment.repeat((total - 1) as usize)); + } + sql.push_str("(employee = 0)"); + sql + }; + let run = |sep: char, sql: &str| { + query(&db, &AuthCtx::for_testing(), sql) + .map(|plan| { + // Check that the plan can be explained without overflow + let explain = Explain::new(&plan) + .with_options(ExplainOptions::default().optimize(true)) + .build(); + let out = explain.to_string(); + !out.is_empty() + }) + .map_err(|e| e.to_string().split(sep).next().unwrap_or_default().to_string()) + }; + let sql = build_query(1_000); + assert_eq!( + run(':', &sql), + Err("SQL query exceeds maximum allowed length".to_string()) + ); + + let sql = build_query(41); + assert_eq!(run(',', &sql), Err("Recursion limit exceeded".to_string())); + + let sql = build_query(40); + assert_eq!(run(',', &sql), Ok(true), "Query should not overflow"); + + // Check no overflow with lot of joins + let mut sql = "SELECT m.* FROM m ".to_string(); + for i in 0..1_000 { + sql.push_str(&format!("JOIN m AS m{i} ON m.employee = m{i}.manager ")); + } + assert_eq!(run(',', &sql), Ok(true), "Query with 1_000 joins should not overflow"); + } } diff --git a/crates/physical-plan/src/printer.rs b/crates/physical-plan/src/printer.rs index f876c845b0..e9ff278d79 100644 --- a/crates/physical-plan/src/printer.rs +++ b/crates/physical-plan/src/printer.rs @@ -544,6 +544,17 @@ fn scan_tables<'a>(lines: &mut Lines<'a>, plan: &'a PhysicalPlan) -> PrinterPlan output, } } + PhysicalPlan::IxScansAnd(idx) => { + let mut plans = Vec::new(); + for plan in idx { + plans.push(scan_tables(lines, plan)); + } + let output = plans.last().map(|x| x.output()).unwrap_or(Output::Empty); + PrinterPlan::Union { + output: output.clone(), + plans, + } + } PhysicalPlan::IxJoin(idx, semi) => { lines.add_table(idx.rhs_label, &idx.rhs); let plan = scan_tables(lines, &idx.lhs); @@ -909,7 +920,7 @@ fn eval_plan<'a>(lines: &mut Lines<'a>, plan: PrinterPlan<'a>, ident: u16) { /// - Showing the schema of the tables /// - Showing the planning time pub struct Explain<'a> { - ctx: &'a PhysicalCtx<'a>, + pub ctx: &'a PhysicalCtx<'a>, lines: Vec>, labels: Labels<'a>, options: ExplainOptions, diff --git a/crates/physical-plan/src/rules.rs b/crates/physical-plan/src/rules.rs index 4a45257cab..bdb590339c 100644 --- a/crates/physical-plan/src/rules.rs +++ b/crates/physical-plan/src/rules.rs @@ -5,36 +5,36 @@ //! * [PushConstEq] //! Push down predicates of the form `x=1` //! * [PushConstAnd] -//! Push down predicates of the form `x=1 and y=2` -//! * [IxScanEq] +//! * [IxScanBinOp] //! Generate 1-column index scan for `x=1` -//! * [IxScanAnd] -//! Generate 1-column index scan for `x=1 and y=2` -//! * [IxScanEq2Col] -//! Generate 2-column index scan -//! * [IxScanEq3Col] -//! Generate 3-column index scan -//! * [ReorderHashJoin] -//! Reorder the sides of a hash join //! * [ReorderDeltaJoinRhs] //! Reorder the sides of a hash join with delta tables //! * [PullFilterAboveHashJoin] //! Pull a filter above a hash join with delta tables -//! * [HashToIxJoin] +//! * [HashToIxJoin] //! Convert hash join to index join -//! * [UniqueIxJoinRule] +//! * [UniqueIxJoinRule] //! Mark index join as unique -//! * [UniqueHashJoinRule] +//! * [UniqueHashJoinRule] //! Mark hash join as unique -use anyhow::{bail, Result}; -use spacetimedb_primitives::{ColId, ColSet, IndexId}; -use spacetimedb_schema::schema::IndexSchema; -use spacetimedb_sql_parser::ast::{BinOp, LogOp}; +//! +//! We optimize with the following rules in mind: +//! - When all the comparisons are`!=` is always a full table scan +//! - Multi-column indexes are only used if the query has a prefix match (ie all operators are `=`) +//! - Else are converted to a single column index scan on the leftmost column after `=` and a filter on the rest use crate::plan::{ HashJoin, IxJoin, IxScan, Label, PhysicalExpr, PhysicalPlan, ProjectListPlan, ProjectPlan, Sarg, Semi, TableScan, TupleField, }; +use anyhow::{bail, Result}; +use either::Either; +use itertools::Itertools; +use spacetimedb_lib::AlgebraicValue; +use spacetimedb_primitives::{ColId, ColSet, IndexId}; +use spacetimedb_schema::schema::IndexSchema; +use spacetimedb_sql_parser::ast::{BinOp, LogOp}; +use std::collections::{HashMap, HashSet}; /// A rewrite will only fail due to an internal logic bug. /// However we don't want to panic in such a situation. @@ -400,7 +400,7 @@ impl RewriteRule for PushConstAnd { } } -/// Match single field equality predicates such as: +/// Match single field for [`BinOp`] predicates such as: /// /// ```sql /// select * from t where x = 1 @@ -408,20 +408,18 @@ impl RewriteRule for PushConstAnd { /// /// Rewrite as an index scan if applicable. /// -/// NOTE: This rule does not consider multi-column indexes. -pub(crate) struct IxScanEq; +/// NOTE: This rule does not consider multi-column indexes, or [`BinOp::Ne`] predicates. +pub(crate) struct IxScanBinOp; -pub(crate) struct IxScanInfo { - index_id: IndexId, - cols: Vec<(usize, ColId)>, -} - -impl RewriteRule for IxScanEq { +impl RewriteRule for IxScanBinOp { type Plan = PhysicalPlan; type Info = (IndexId, ColId); fn matches(plan: &PhysicalPlan) -> Option { - if let PhysicalPlan::Filter(input, PhysicalExpr::BinOp(BinOp::Eq, expr, value)) = plan { + if let PhysicalPlan::Filter(input, PhysicalExpr::BinOp(op, expr, value)) = plan { + if *op == BinOp::Ne { + return None; + } if let PhysicalPlan::TableScan( TableScan { schema, @@ -455,7 +453,7 @@ impl RewriteRule for IxScanEq { } fn rewrite(plan: PhysicalPlan, (index_id, col_id): Self::Info) -> Result { - if let PhysicalPlan::Filter(input, PhysicalExpr::BinOp(BinOp::Eq, _, value)) = plan { + if let PhysicalPlan::Filter(input, PhysicalExpr::BinOp(op, _, value)) = plan { if let PhysicalPlan::TableScan(TableScan { schema, limit, delta }, var) = *input { if let PhysicalExpr::Value(v) = *value { return Ok(PhysicalPlan::IxScan( @@ -465,7 +463,7 @@ impl RewriteRule for IxScanEq { delta, index_id, prefix: vec![], - arg: Sarg::Eq(col_id, v), + arg: Sarg::from_op(op, col_id, v), }, var, )); @@ -476,115 +474,56 @@ impl RewriteRule for IxScanEq { } } -/// Match multi-field equality predicates such as: +/// Match `1...N` multi-field [`BinOp`] predicates such as: /// /// ```sql -/// select * from t where x = 1 and y = 1 -/// ``` -/// -/// Create an index scan for one of the equality conditions. -/// -/// NOTE: This rule does not consider multi-column indexes. -pub(crate) struct IxScanAnd; - -impl RewriteRule for IxScanAnd { - type Plan = PhysicalPlan; - type Info = (IndexId, usize, ColId); - - fn matches(plan: &PhysicalPlan) -> Option { - if let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, exprs)) = plan { - if let PhysicalPlan::TableScan( - TableScan { - schema, - limit: None, - delta: _, - }, - _, - ) = &**input - { - return exprs.iter().enumerate().find_map(|(i, expr)| { - if let PhysicalExpr::BinOp(BinOp::Eq, lhs, value) = expr { - if let (PhysicalExpr::Field(TupleField { field_pos: pos, .. }), PhysicalExpr::Value(_)) = - (&**lhs, &**value) - { - return schema.indexes.iter().find_map( - |IndexSchema { - index_id, - index_algorithm, - .. - }| { - index_algorithm - .columns() - // TODO: Support prefix scans - .as_singleton() - .filter(|col_id| col_id.idx() == *pos) - .map(|col_id| (*index_id, i, col_id)) - }, - ); - } - } - None - }); - } - } - None - } - - fn rewrite(plan: PhysicalPlan, (index_id, i, col_id): Self::Info) -> Result { - if let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, mut exprs)) = plan { - if let PhysicalPlan::TableScan(TableScan { schema, limit, delta }, label) = *input { - if let PhysicalExpr::BinOp(BinOp::Eq, _, value) = exprs.swap_remove(i) { - if let PhysicalExpr::Value(v) = *value { - return Ok(PhysicalPlan::Filter( - Box::new(PhysicalPlan::IxScan( - IxScan { - schema, - limit, - delta, - index_id, - prefix: vec![], - arg: Sarg::Eq(col_id, v), - }, - label, - )), - match exprs.len() { - 1 => exprs.swap_remove(0), - _ => PhysicalExpr::LogOp(LogOp::And, exprs), - }, - )); - } - } - } - } - bail!("{INVARIANT_VIOLATION}: Failed to create single column index scan from conjunction") - } -} - -/// Match multi-field equality predicates such as: -/// -/// ```sql -/// select * from t where x = 1 and y = 1 +/// select * from t where x = 1 and y = 1 // Full match +/// select * from t where x = 1 and y > 1 // Partial match /// ``` /// /// Rewrite as a multi-column index scan if applicable. /// -/// NOTE: This rule does not consider indexes on 3 or more columns. -pub(crate) struct IxScanEq2Col; +/// NOTE: This rule does not consider [`BinOp::Ne`] predicates. +pub(crate) struct IxScanOpMultiCol; -impl RewriteRule for IxScanEq2Col { +#[derive(Debug, Clone)] +struct ColInfo { + op: BinOp, + field: TupleField, + value: AlgebraicValue, +} + +#[derive(Debug)] +struct IndexMatch { + index_id: IndexId, + index_name: Box, + matched_columns: Vec, +} + +#[derive(Debug)] +pub(crate) struct IxScanInfo { + matched: Vec, + unmatched: Vec, + unmatched_filter: Vec, +} + +impl RewriteRule for IxScanOpMultiCol { type Plan = PhysicalPlan; type Info = IxScanInfo; - - fn matches(plan: &PhysicalPlan) -> Option { + // We need to match the conditions against the index columns, so that: + // 1. We match the columns in the same order as the index + // 2. Build a prefix on the `columns - N` with `BinOp::Eq` + // 3. Build a [Sarg] on the first column after the prefix (Note: it must always succeed) + // [a Eq 1, b Eq 2, c Lte 3] -> [(a, 1), (b, 2), Sarg::from_op(Lte, c, 3)] + // 4. All the others become unmatched and turns into a filter scan` + fn matches(plan: &Self::Plan) -> Option { + // Match a filter with a conjunction of binary expressions let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, exprs)) = plan else { return None; }; - let PhysicalPlan::TableScan( TableScan { - schema, - limit: None, - delta: _, + schema, limit: None, .. }, _, ) = &**input @@ -592,316 +531,212 @@ impl RewriteRule for IxScanEq2Col { return None; }; - for (i, a) in exprs.iter().enumerate() { - for (j, b) in exprs.iter().enumerate().filter(|(j, _)| i != *j) { - let (PhysicalExpr::BinOp(BinOp::Eq, a, u), PhysicalExpr::BinOp(BinOp::Eq, b, v)) = (a, b) else { - continue; - }; - - let (PhysicalExpr::Field(u), PhysicalExpr::Value(_), PhysicalExpr::Field(v), PhysicalExpr::Value(_)) = - (&**a, &**u, &**b, &**v) - else { - continue; - }; - - if let Some(scan) = schema - .indexes - .iter() - .filter(|idx| idx.index_algorithm.columns().len() == 2) // TODO: Support prefix scans - .map(|idx| (idx.index_id, idx.index_algorithm.columns())) - .find_map(|(index_id, columns)| { - let mut columns = columns.iter(); - let x = columns.next()?; - if x.idx() != u.field_pos { - return None; - } - let y = columns.next()?; - if y.idx() != v.field_pos { - return None; - } - Some(IxScanInfo { - index_id, - cols: vec![(i, x), (j, y)], - }) - }) + if schema.indexes.is_empty() { + return None; + } + // Partition expressions into indexable and non-indexable + let (expr_cols, unmatched): (Vec<_>, Vec<_>) = + exprs.iter().enumerate().partition_map(|(expr_idx, e)| match e { + PhysicalExpr::BinOp(op, lhs, rhs) + if matches!(op, BinOp::Eq | BinOp::Lt | BinOp::Gt | BinOp::Lte | BinOp::Gte) => { - return Some(scan); - } - } - } - - None - } - - fn rewrite(plan: PhysicalPlan, info: Self::Info) -> Result { - match info.cols.as_slice() { - [(i, a), (j, b)] => { - if let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, exprs)) = plan { - if let PhysicalPlan::TableScan(TableScan { schema, limit, delta }, label) = *input { - if let ( - Some(PhysicalExpr::BinOp(BinOp::Eq, _, u)), - Some(PhysicalExpr::BinOp(BinOp::Eq, _, v)), - ) = (exprs.get(*i), exprs.get(*j)) - { - if let (PhysicalExpr::Value(u), PhysicalExpr::Value(v)) = (&**u, &**v) { - return Ok(match exprs.len() { - n @ 0 | n @ 1 => { - bail!("{INVARIANT_VIOLATION}: Cannot create 2-column index scan from {n} conditions") - } - // If there are only 2 conditions in this filter, - // we replace the filter with an index scan. - 2 => PhysicalPlan::IxScan( - IxScan { - schema, - limit, - delta, - index_id: info.index_id, - prefix: vec![(*a, u.clone())], - arg: Sarg::Eq(*b, v.clone()), - }, - label, - ), - // If there are 3 conditions in this filter, - // we create an index scan from 2 of them. - // The original conjunction is no longer well defined, - // because it only has a single operand now. - // Hence we must replace it with its operand. - 3 => PhysicalPlan::Filter( - Box::new(PhysicalPlan::IxScan( - IxScan { - schema, - limit, - delta, - index_id: info.index_id, - prefix: vec![(*a, u.clone())], - arg: Sarg::Eq(*b, v.clone()), - }, - label, - )), - exprs - .into_iter() - .enumerate() - .find(|(pos, _)| pos != i && pos != j) - .map(|(_, expr)| expr) - .unwrap(), - ), - // If there are more than 3 conditions in this filter, - // we remove the 2 conditions used in the index scan. - // The remaining conditions still form a conjunction. - _ => PhysicalPlan::Filter( - Box::new(PhysicalPlan::IxScan( - IxScan { - schema, - limit, - delta, - index_id: info.index_id, - prefix: vec![(*a, u.clone())], - arg: Sarg::Eq(*b, v.clone()), - }, - label, - )), - PhysicalExpr::LogOp( - LogOp::And, - exprs - .into_iter() - .enumerate() - .filter(|(pos, _)| pos != i && pos != j) - .map(|(_, expr)| expr) - .collect(), - ), - ), - }); - } - } - } - } - bail!("{INVARIANT_VIOLATION}: Failed to create 2-column index scan") - } - _ => Ok(plan), - } - } -} - -/// Match multi-field equality predicates such as: -/// -/// ```sql -/// select * from t where x = 1 and y = 1 and z = 1 -/// ``` -/// -/// Rewrite as a multi-column index scan if applicable. -/// -/// NOTE: This rule does not consider indexes on 4 or more columns. -pub(crate) struct IxScanEq3Col; - -impl RewriteRule for IxScanEq3Col { - type Plan = PhysicalPlan; - type Info = IxScanInfo; - - fn matches(plan: &PhysicalPlan) -> Option { - // Match outer plan structure - let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, exprs)) = plan else { - return None; - }; - - let PhysicalPlan::TableScan( - TableScan { - schema, - limit: None, - delta: _, - }, - _, - ) = &**input - else { - return None; - }; - - for (i, a) in exprs.iter().enumerate() { - for (j, b) in exprs.iter().enumerate().filter(|(j, _)| i != *j) { - for (k, c) in exprs.iter().enumerate().filter(|(k, _)| i != *k && j != *k) { - let ( - PhysicalExpr::BinOp(BinOp::Eq, a, u), - PhysicalExpr::BinOp(BinOp::Eq, b, v), - PhysicalExpr::BinOp(BinOp::Eq, c, w), - ) = (a, b, c) - else { - continue; - }; - - let ( - PhysicalExpr::Field(u), - PhysicalExpr::Value(_), - PhysicalExpr::Field(v), - PhysicalExpr::Value(_), - PhysicalExpr::Field(w), - PhysicalExpr::Value(_), - ) = (&**a, &**u, &**b, &**v, &**c, &**w) - else { - continue; - }; - - if let Some(scan) = schema - .indexes - .iter() - .filter(|idx| idx.index_algorithm.columns().len() == 3) - .map(|idx| (idx.index_id, idx.index_algorithm.columns())) - .find_map(|(index_id, columns)| { - let mut columns = columns.iter(); - let x = columns.next()?; - if x.idx() != u.field_pos { - return None; - } - let y = columns.next()?; - if y.idx() != v.field_pos { - return None; - } - let z = columns.next()?; - if z.idx() != w.field_pos { - return None; - } - Some(IxScanInfo { - index_id, - cols: vec![(i, x), (j, y), (k, z)], - }) + if let (PhysicalExpr::Field(field), PhysicalExpr::Value(value)) = (&**lhs, &**rhs) { + Either::Left(ColInfo { + op: *op, + field: field.clone(), + value: value.clone(), }) - { - return Some(scan); + } else { + Either::Right(expr_idx) } } - } + _ => Either::Right(expr_idx), + }); + + if expr_cols.is_empty() { + return None; } - None + // We should allow for same column to be used multiple times, like `x = 1 and x > 2`. + let mut cond_map: HashMap> = HashMap::new(); + let binding = expr_cols.clone(); + for c in &binding { + cond_map.entry(c.field.field_pos.into()).or_default().push(c); + } + + let mut candidates = vec![]; + + for idx in &schema.indexes { + let mut prefix_buf: Vec<&ColInfo> = vec![]; + let mut has_range = false; + + for col_id in idx.index_algorithm.columns().iter() { + let Some(conds) = cond_map.get(&col_id) else { break }; + let mut pushed = false; + + for cond in conds { + // If we have duplicate fields like in `x = 1 and x = 2 and y = 3`, then split: + // prefix: [x = 1, y = 3] + // extra : [x = 2] + if prefix_buf.iter().any(|c| c.field.field_pos == cond.field.field_pos) { + if cond.op == BinOp::Eq || !has_range { + candidates.push((idx.index_id, idx.index_name.clone(), vec![*cond])); + } + + continue; + } + match cond.op { + BinOp::Eq => { + prefix_buf.push(*cond); + pushed = true; + } + // Only take the first range condition + BinOp::Lt | BinOp::Gt | BinOp::Lte | BinOp::Gte if !has_range => { + prefix_buf.push(*cond); + has_range = true; + break; + } + _ => {} + } + } + + if !pushed { + break; + } + } + + if !prefix_buf.is_empty() { + candidates.push((idx.index_id, idx.index_name.clone(), prefix_buf)); + } + } + if candidates.is_empty() { + return None; + } + // Match the index with the longest prefix first + candidates.sort_by_key(|(_, _, cols)| -(cols.len() as isize)); + + let mut matched = vec![]; + let mut covered = HashSet::new(); + + for (index_id, index_name, prefix) in candidates { + if prefix + .iter() + .all(|c| covered.contains(&(c.op, c.field.field_pos, &c.value))) + { + continue; + } + covered.extend(prefix.iter().map(|c| (c.op, c.field.field_pos, &c.value))); + + matched.push(IndexMatch { + index_id, + index_name, + matched_columns: prefix.into_iter().cloned().collect(), + }); + } + + if matched.is_empty() { + return None; + } + + matched.sort_by_key(|m| m.index_name.clone()); + + let unmatched_filter = expr_cols + .into_iter() + .filter(|c| !covered.contains(&(c.op, c.field.field_pos, &c.value))) + .collect(); + + Some(IxScanInfo { + matched, + unmatched, + unmatched_filter, + }) } - fn rewrite(plan: PhysicalPlan, info: Self::Info) -> Result { - match info.cols.as_slice() { - [(i, a), (j, b), (k, c)] => { - if let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, exprs)) = plan { - if let PhysicalPlan::TableScan(TableScan { schema, limit, delta }, label) = *input { - if let ( - Some(PhysicalExpr::BinOp(BinOp::Eq, _, u)), - Some(PhysicalExpr::BinOp(BinOp::Eq, _, v)), - Some(PhysicalExpr::BinOp(BinOp::Eq, _, w)), - ) = (exprs.get(*i), exprs.get(*j), exprs.get(*k)) - { - if let (PhysicalExpr::Value(u), PhysicalExpr::Value(v), PhysicalExpr::Value(w)) = - (&**u, &**v, &**w) - { - return Ok(match exprs.len() { - n @ 0 | n @ 1 | n @ 2 => { - bail!("{INVARIANT_VIOLATION}: Cannot create 3-column index scan from {n} conditions") - } - // If there are only 3 conditions in this filter, - // we replace the filter with an index scan. - 3 => PhysicalPlan::IxScan( - IxScan { - schema, - limit, - delta, - index_id: info.index_id, - prefix: vec![(*a, u.clone()), (*b, v.clone())], - arg: Sarg::Eq(*c, w.clone()), - }, - label, - ), - // If there are 4 conditions in this filter, - // we create an index scan from 3 of them. - // The original conjunction is no longer well defined, - // because it only has a single operand now. - // Hence we must replace it with its operand. - 4 => PhysicalPlan::Filter( - Box::new(PhysicalPlan::IxScan( - IxScan { - schema, - limit, - delta, - index_id: info.index_id, - prefix: vec![(*a, u.clone()), (*b, v.clone())], - arg: Sarg::Eq(*c, w.clone()), - }, - label, - )), - exprs - .into_iter() - .enumerate() - .find(|(pos, _)| pos != i && pos != j && pos != k) - .map(|(_, expr)| expr) - .unwrap(), - ), - // If there are more than 4 conditions in this filter, - // we remove the 3 conditions used in the index scan. - // The remaining conditions still form a conjunction. - _ => PhysicalPlan::Filter( - Box::new(PhysicalPlan::IxScan( - IxScan { - schema, - limit, - delta, - index_id: info.index_id, - prefix: vec![(*a, u.clone()), (*b, v.clone())], - arg: Sarg::Eq(*c, w.clone()), - }, - label, - )), - PhysicalExpr::LogOp( - LogOp::And, - exprs - .into_iter() - .enumerate() - .filter(|(pos, _)| pos != i && pos != j && pos != k) - .map(|(_, expr)| expr) - .collect(), - ), - ), - }); - } - } - } - } - bail!("{INVARIANT_VIOLATION}: Failed to create 3-column index scan") - } - _ => Ok(plan), + fn rewrite(plan: Self::Plan, info: Self::Info) -> Result { + let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, exprs)) = plan else { + bail!("{INVARIANT_VIOLATION}: Expected Filter(LogOp::And)") + }; + + let PhysicalPlan::TableScan(scan, label) = *input else { + bail!("{INVARIANT_VIOLATION}: Expected TableScan") + }; + + let mut plans = Vec::with_capacity(info.matched.len()); + for m in &info.matched { + let (prefix, arg) = { + let take = m.matched_columns.len() - 1; + + let mut it = m.matched_columns.clone().into_iter(); + let prefix: Vec<_> = it + .by_ref() + .take(take) + .map(|c| (c.field.field_pos.into(), c.value.clone())) + .collect(); + + let arg = it + .next() + .map(|c| Sarg::from_op(c.op, c.field.field_pos.into(), c.value)) + .unwrap(); + + (prefix, arg) + }; + + plans.push(( + m, + PhysicalPlan::IxScan( + IxScan { + schema: scan.schema.clone(), + limit: scan.limit, + delta: scan.delta, + index_id: m.index_id, + prefix, + arg, + }, + label, + ), + )); } + let mut unmatched_filter = info.unmatched_filter; + let index_plan = if plans.len() == 1 { + plans.pop().unwrap().1 + } else { + // Must add back the filter expression because the index matches need to form an intersection + // of the index scans, like in `x > 1 and y < 2` each index scan output partial matches. + unmatched_filter.extend(info.matched.iter().flat_map(|m| m.matched_columns.iter()).cloned()); + + // Sort by the number of matched columns by `=` operator, so it scan less rows later + let plans = plans + .into_iter() + .sorted_by_key(|(m, _)| -(m.matched_columns.iter().filter(|c| c.op == BinOp::Eq).count() as isize)) + .map(|(_, plan)| plan) + .collect(); + PhysicalPlan::IxScansAnd(plans) + }; + + // No unmatched filters: done + if info.unmatched.is_empty() && unmatched_filter.is_empty() { + return Ok(index_plan); + } + + // Reconstruct remaining filter expressions + let remaining = exprs + .into_iter() + .enumerate() + .filter(|(i, _)| info.unmatched.contains(i)) + .map(|(_, e)| e) + .chain(unmatched_filter.into_iter().map(|c| { + PhysicalExpr::BinOp( + c.op, + Box::new(PhysicalExpr::Field(c.field)), + Box::new(PhysicalExpr::Value(c.value)), + ) + })) + .collect(); + + Ok(PhysicalPlan::Filter( + Box::new(index_plan), + PhysicalExpr::LogOp(LogOp::And, remaining), + )) } } diff --git a/crates/sql-parser/src/ast/mod.rs b/crates/sql-parser/src/ast/mod.rs index 776d4fc500..470e95d67b 100644 --- a/crates/sql-parser/src/ast/mod.rs +++ b/crates/sql-parser/src/ast/mod.rs @@ -210,7 +210,7 @@ pub enum SqlLiteral { } /// Binary infix operators -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum BinOp { Eq, Ne,