Lift restriction that multi-col indices only allow = constraints #1317

benches
This commit is contained in:
Mario Alejandro Montoya Cortes
2025-06-11 11:19:37 -05:00
parent ca10cd0b60
commit 770339dafb
10 changed files with 841 additions and 529 deletions
+2
View File
@@ -14,6 +14,7 @@ pub fn num_rows(tx: &Tx, expr: &QueryExpr) -> u64 {
pub fn estimate_rows_scanned(tx: &Tx, plan: &PhysicalPlan) -> u64 {
match plan {
PhysicalPlan::TableScan(..) | PhysicalPlan::IxScan(..) => row_estimate(tx, plan),
PhysicalPlan::IxScansAnd(plans) => plans.iter().map(|p| estimate_rows_scanned(tx, p)).sum(),
PhysicalPlan::Filter(input, _) => estimate_rows_scanned(tx, input).saturating_add(row_estimate(tx, input)),
PhysicalPlan::NLJoin(lhs, rhs) => estimate_rows_scanned(tx, lhs)
.saturating_add(estimate_rows_scanned(tx, rhs))
@@ -51,6 +52,7 @@ pub fn row_estimate(tx: &Tx, plan: &PhysicalPlan) -> u64 {
// Use a row limit as the estimate if present
PhysicalPlan::TableScan(TableScan { limit: Some(n), .. }, _)
| PhysicalPlan::IxScan(IxScan { limit: Some(n), .. }, _) => *n,
PhysicalPlan::IxScansAnd(idx) => idx.iter().map(|plan| row_estimate(tx, plan)).sum(),
// Table scans return the number of rows in the table
PhysicalPlan::TableScan(
TableScan {
+20 -17
View File
@@ -507,15 +507,14 @@ Seq Scan on test
db.create_table_for_test("test", schema, indexes)?;
let tx = begin_tx(&db);
// TODO: Need support for index range scans.
expect_query(
&tx,
"select * from test where b > 2",
expect![
r#"
Seq Scan on test
Output: test.a, test.b
-> Filter: (test.b > U64(2))"#
Index Scan using Index test_b_idx_btree (test.b) on test
Index Cond: (test.b > U64(2))
Output: test.a, test.b"#
],
);
@@ -532,15 +531,15 @@ Seq Scan on test
db.create_table_for_test("test", schema, indexes)?;
let tx = begin_tx(&db);
//TODO(sql): Need support for index scans for ranges
expect_query(
&tx,
"select * from test where b > 2 and b < 5",
expect![
r#"
Seq Scan on test
Index Scan using Index test_b_idx_btree (test.b) on test
Index Cond: (test.b > U64(2))
Output: test.a, test.b
-> Filter: (test.b > U64(2) AND test.b < U64(5))"#
-> Filter: (test.b < U64(5))"#
],
);
@@ -557,17 +556,20 @@ Seq Scan on test
db.create_table_for_test("test", schema, indexes)?;
let tx = begin_tx(&db);
// Note, order matters - the equality condition occurs first which
// means an index scan will be generated rather than the range condition.
expect_query(
&tx,
"select * from test where a = 3 and b > 2 and b < 5",
expect![
r#"
Index Scan using Index test_a_idx_btree (test.a) on test
Index Cond: (test.a = U64(3))
Union
-> Index Scan using Index test_a_idx_btree (test.a) on test
Index Cond: (test.a = U64(3))
Output: test.a, test.b
-> Index Scan using Index test_b_idx_btree (test.b) on test
Index Cond: (test.b > U64(2))
Output: test.a, test.b
Output: test.a, test.b
-> Filter: (test.b < U64(5) AND test.b > U64(2))"#
-> Filter: (test.b < U64(5) AND test.a = U64(3) AND test.b > U64(2))"#
],
);
@@ -710,9 +712,9 @@ Hash Join: Lhs
Index Cond: (lhs.a = U64(3))
Output: lhs.a, lhs.b
-> Hash Build: rhs.b
-> Seq Scan on rhs
Output: rhs.b, rhs.c
-> Filter: (rhs.c < U64(4))"#
-> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs
Index Cond: (rhs.c < U64(4))
Output: rhs.b, rhs.c"#
],
);
@@ -749,9 +751,10 @@ Index Join: Rhs on lhs
Inner Unique: false
Join Cond: (rhs.b = lhs.b)
Output: lhs.a, lhs.b
-> Seq Scan on rhs
-> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs
Index Cond: (rhs.c > U64(2))
Output: rhs.b, rhs.c, rhs.d
-> Filter: (rhs.c > U64(2) AND rhs.c < U64(4) AND rhs.d = U64(3))"#
-> Filter: (rhs.c < U64(4) AND rhs.d = U64(3))"#
],
);
+11 -11
View File
@@ -710,7 +710,6 @@ mod tests {
panic!("expected an index join, but got {join:#?}");
};
//TODO(sql): Remove manual checks to just `EXPLAIN` the query.
expect_sub(
&tx,
sql,
@@ -720,9 +719,10 @@ Index Join: Rhs on lhs
Inner Unique: false
Join Cond: (rhs.b = lhs.b)
Output: lhs.a, lhs.b
-> Seq Scan on rhs
-> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs
Index Cond: (rhs.c > U64(2))
Output: rhs.b, rhs.c, rhs.d
-> Filter: (rhs.c > U64(2) AND rhs.c < U64(4) AND rhs.d = U64(3))"#
-> Filter: (rhs.c < U64(4) AND rhs.d = U64(3))"#
],
);
@@ -806,8 +806,7 @@ Index Join: Rhs on lhs
panic!("expected an index join, but got {join:#?}");
};
//TODO(sql): Remove manual checks to just `EXPLAIN` the query.
// Why this generate same plan than the previous test? 'compile_incremental_index_join_index_side'
//TODO(sql): Why this generate same plan than the previous test? 'compile_incremental_index_join_index_side'
expect_sub(
&tx,
sql,
@@ -817,9 +816,10 @@ Index Join: Rhs on lhs
Inner Unique: false
Join Cond: (rhs.b = lhs.b)
Output: lhs.a, lhs.b
-> Seq Scan on rhs
-> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs
Index Cond: (rhs.c > U64(2))
Output: rhs.b, rhs.c, rhs.d
-> Filter: (rhs.c > U64(2) AND rhs.c < U64(4) AND rhs.d = U64(3))"#
-> Filter: (rhs.c < U64(4) AND rhs.d = U64(3))"#
],
);
@@ -909,8 +909,7 @@ Index Join: Rhs on lhs
"expected an index join, but got {src_join:#?}"
);
//TODO(sql): Remove manual checks to just `EXPLAIN` the query.
// Why this generate same plan than the previous test? 'compile_incremental_index_join_index_side'
//TODO(sql): Why this generate same plan than the previous test? 'compile_incremental_index_join_index_side'
expect_sub(
&tx,
sql,
@@ -920,9 +919,10 @@ Index Join: Rhs on lhs
Inner Unique: false
Join Cond: (rhs.b = lhs.b)
Output: lhs.a, lhs.b
-> Seq Scan on rhs
-> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs
Index Cond: (rhs.c > U64(2))
Output: rhs.b, rhs.c, rhs.d
-> Filter: (rhs.c > U64(2) AND rhs.c < U64(4) AND rhs.d = U64(3))"#
-> Filter: (rhs.c < U64(4) AND rhs.d = U64(3))"#
],
);
+22
View File
@@ -75,6 +75,7 @@ pub enum Iter<'a> {
Row(RowRefIter<'a>),
Join(LeftDeepJoinIter<'a>),
Filter(Filter<'a, Iter<'a>>),
FilterIdx(FilterIdx<'a>),
}
impl<'a> Iterator for Iter<'a> {
@@ -85,6 +86,7 @@ impl<'a> Iterator for Iter<'a> {
Self::Row(iter) => iter.next().map(Tuple::Row),
Self::Join(iter) => iter.next(),
Self::Filter(iter) => iter.next(),
Self::FilterIdx(iter) => iter.next(),
}
}
}
@@ -96,6 +98,12 @@ impl<'a> Iter<'a> {
{
match plan {
PhysicalPlan::TableScan(..) | PhysicalPlan::IxScan(..) => RowRefIter::build(plan, tx).map(Self::Row),
PhysicalPlan::IxScansAnd(input) => {
let input: Vec<_> = input.iter().map(|plan| Iter::build(plan, tx)).collect::<Result<_>>()?;
Ok(Iter::FilterIdx(FilterIdx {
input: input.into_boxed_slice(),
}))
}
PhysicalPlan::Filter(input, expr) => {
// Build a filter iterator
Iter::build(input, tx)
@@ -1050,3 +1058,17 @@ impl<'a> Iterator for Filter<'a, Iter<'a>> {
self.input.find(|tuple| self.expr.eval_bool(tuple))
}
}
/// A tuple-at-a-time filter iterator based on check multiple indexes
pub struct FilterIdx<'a> {
input: Box<[Iter<'a>]>,
}
impl<'a> Iterator for FilterIdx<'a> {
type Item = Tuple<'a>;
fn next(&mut self) -> Option<Self::Item> {
// Find the row that matches all the indexes
self.input.iter_mut().find_map(|iter| iter.next())
}
}
+1 -1
View File
@@ -190,7 +190,7 @@ impl ProjectField for Row<'_> {
}
/// Each query operator returns a tuple of [RowRef]s
#[derive(Clone)]
#[derive(Clone, PartialEq, Eq, Hash)]
pub enum Tuple<'a> {
/// A pointer to a row in a base table
Row(Row<'a>),
+80 -2
View File
@@ -186,6 +186,7 @@ impl PipelinedProject {
pub enum PipelinedExecutor {
TableScan(PipelinedScan),
IxScan(PipelinedIxScan),
IxScansAnd(PipelinedIxScanAnd),
IxJoin(PipelinedIxJoin),
IxDeltaScan(PipelinedIxDeltaScan),
IxDeltaJoin(PipelinedIxDeltaJoin),
@@ -205,6 +206,12 @@ impl From<PhysicalPlan> for PipelinedExecutor {
}),
PhysicalPlan::IxScan(scan @ IxScan { delta: None, .. }, _) => Self::IxScan(scan.into()),
PhysicalPlan::IxScan(scan, _) => Self::IxDeltaScan(scan.into()),
PhysicalPlan::IxScansAnd(scans) => {
// Apply the filter to each scan to form an intersection
Self::IxScansAnd(PipelinedIxScanAnd {
scans: scans.into_iter().map(PipelinedExecutor::from).collect(),
})
}
PhysicalPlan::IxJoin(
IxJoin {
lhs,
@@ -292,7 +299,7 @@ impl PipelinedExecutor {
lhs.visit(f);
rhs.visit(f);
}
Self::TableScan(..) | Self::IxScan(..) | Self::IxDeltaScan(..) => {}
Self::TableScan(..) | Self::IxScan(..) | Self::IxDeltaScan(..) | Self::IxScansAnd(..) => {}
}
}
@@ -301,6 +308,7 @@ impl PipelinedExecutor {
match self {
Self::TableScan(scan) => scan.is_empty(tx),
Self::IxScan(scan) => scan.is_empty(tx),
Self::IxScansAnd(scan) => scan.is_empty(tx),
Self::IxDeltaScan(scan) => scan.is_empty(tx),
Self::IxJoin(join) => join.is_empty(tx),
Self::IxDeltaJoin(join) => join.is_empty(tx),
@@ -320,6 +328,7 @@ impl PipelinedExecutor {
match self {
Self::TableScan(scan) => scan.execute(tx, metrics, f),
Self::IxScan(scan) => scan.execute(tx, metrics, f),
Self::IxScansAnd(scan) => scan.execute(tx, metrics, f),
Self::IxDeltaScan(scan) => scan.execute(tx, metrics, f),
Self::IxJoin(join) => join.execute(tx, metrics, f),
Self::IxDeltaJoin(join) => join.execute(tx, metrics, f),
@@ -567,6 +576,7 @@ pub struct PipelinedIxScan {
pub table_id: TableId,
/// The index id
pub index_id: IndexId,
pub index_is_multi_column: bool,
pub limit: Option<u64>,
/// An equality prefix for multi-column scans
pub prefix: Vec<AlgebraicValue>,
@@ -578,6 +588,12 @@ pub struct PipelinedIxScan {
impl From<IxScan> for PipelinedIxScan {
fn from(scan: IxScan) -> Self {
let index_is_multi_column = scan
.schema
.indexes
.iter()
.find(|i| i.index_id == scan.index_id)
.is_some_and(|index| index.index_algorithm.columns().iter().count() > 1);
match scan {
IxScan {
schema,
@@ -589,6 +605,7 @@ impl From<IxScan> for PipelinedIxScan {
} => Self {
table_id: schema.table_id,
index_id,
index_is_multi_column,
limit,
prefix: prefix.into_iter().map(|(_, v)| v).collect(),
lower: Bound::Included(v.clone()),
@@ -604,6 +621,7 @@ impl From<IxScan> for PipelinedIxScan {
} => Self {
table_id: schema.table_id,
index_id,
index_is_multi_column,
limit,
prefix: prefix.into_iter().map(|(_, v)| v).collect(),
lower,
@@ -672,7 +690,7 @@ impl PipelinedIxScan {
f(t)
};
match self.prefix.as_slice() {
[] => {
[] if !self.index_is_multi_column => {
for ptr in single_col_limit_scan(self.limit.map(|n| n as usize))?
.map(Row::Ptr)
.map(Tuple::Row)
@@ -695,6 +713,66 @@ impl PipelinedIxScan {
}
}
/// A pipelined executor for scanning multiple indexes
#[derive(Debug)]
pub struct PipelinedIxScanAnd {
scans: Vec<PipelinedExecutor>,
}
impl PipelinedIxScanAnd {
/// Does this operation contain an empty scan?
pub fn is_empty(&self, tx: &impl DeltaStore) -> bool {
self.scans.iter().all(|scan| scan.is_empty(tx))
}
/// Executes the pipelined index scans, producing tuples when all
pub fn execute<'a, Tx: Datastore + DeltaStore>(
&self,
tx: &'a Tx,
metrics: &mut ExecutionMetrics,
f: &mut dyn FnMut(Tuple<'a>) -> Result<()>,
) -> Result<()> {
let scans = match self.scans.as_slice() {
[] => return Ok(()), // No scans to execute
[scan] => {
// Single scan, execute directly
return scan.execute(tx, metrics, f);
}
scans => scans,
};
// Execute first scan and materialize all rows
let mut first_rows = HashSet::new();
scans[0].execute(tx, metrics, &mut |t| {
first_rows.insert(t);
Ok(())
})?;
// Intersect with remaining scans
for scan in &scans[1..] {
let mut current_rows = HashSet::new();
scan.execute(tx, metrics, &mut |t| {
if first_rows.contains(&t) {
current_rows.insert(t);
}
Ok(())
})?;
first_rows = current_rows;
if first_rows.is_empty() {
break;
}
}
// Emit results
for t in first_rows {
metrics.rows_scanned += 1;
f(t)?;
}
Ok(())
}
}
/// A pipelined index join executor
#[derive(Debug)]
pub struct PipelinedIxJoin {
+432 -71
View File
@@ -15,9 +15,8 @@ use spacetimedb_sql_parser::ast::{BinOp, LogOp};
use spacetimedb_table::table::RowRef;
use crate::rules::{
ComputePositions, HashToIxJoin, IxScanAnd, IxScanEq, IxScanEq2Col, IxScanEq3Col, PullFilterAboveHashJoin,
PushConstAnd, PushConstEq, PushLimit, ReorderDeltaJoinRhs, ReorderHashJoin, RewriteRule, UniqueHashJoinRule,
UniqueIxJoinRule,
ComputePositions, HashToIxJoin, IxScanBinOp, IxScanOpMultiCol, PullFilterAboveHashJoin, PushConstAnd, PushConstEq,
PushLimit, ReorderDeltaJoinRhs, ReorderHashJoin, RewriteRule, UniqueHashJoinRule, UniqueIxJoinRule,
};
/// Table aliases are replaced with labels in the physical plan
@@ -201,6 +200,8 @@ pub enum PhysicalPlan {
TableScan(TableScan, Label),
/// Fetch row ids from an index
IxScan(IxScan, Label),
/// Fetch rows from using an intersection from index scans
IxScansAnd(Vec<PhysicalPlan>),
/// An index join + projection
IxJoin(IxJoin, Semi),
/// A hash join + projection
@@ -223,7 +224,7 @@ impl PhysicalPlan {
lhs.visit(f);
rhs.visit(f);
}
Self::TableScan(..) | Self::IxScan(..) => {}
Self::TableScan(..) | Self::IxScan(..) | Self::IxScansAnd(..) => {}
}
}
@@ -238,7 +239,7 @@ impl PhysicalPlan {
lhs.visit_mut(f);
rhs.visit_mut(f);
}
Self::TableScan(..) | Self::IxScan(..) => {}
Self::TableScan(..) | Self::IxScan(..) | Self::IxScansAnd(..) => {}
}
}
@@ -271,7 +272,7 @@ impl PhysicalPlan {
},
semi,
),
plan @ Self::TableScan(..) | plan @ Self::IxScan(..) => plan,
plan @ Self::TableScan(..) | plan @ Self::IxScan(..) | plan @ Self::IxScansAnd(..) => plan,
}
}
@@ -290,7 +291,7 @@ impl PhysicalPlan {
plan.any(&|plan| ok(plan).is_some())
};
Ok(match self {
Self::TableScan(..) | Self::IxScan(..) => self,
Self::TableScan(..) | Self::IxScan(..) | Self::IxScansAnd(..) => self,
Self::NLJoin(lhs, rhs) => {
if matches(&lhs) {
return Ok(Self::NLJoin(Box::new(lhs.map_if(f, ok)?), rhs));
@@ -393,10 +394,8 @@ impl PhysicalPlan {
.apply_rec::<PushConstEq>()?
.apply_rec::<ReorderDeltaJoinRhs>()?
.apply_rec::<PullFilterAboveHashJoin>()?
.apply_rec::<IxScanEq3Col>()?
.apply_rec::<IxScanEq2Col>()?
.apply_rec::<IxScanEq>()?
.apply_rec::<IxScanAnd>()?
.apply_rec::<IxScanBinOp>()?
.apply_rec::<IxScanOpMultiCol>()?
.apply_rec::<ReorderHashJoin>()?
.apply_rec::<HashToIxJoin>()?
.apply_rec::<UniqueIxJoinRule>()?
@@ -831,6 +830,7 @@ impl PhysicalPlan {
fn nfields(&self) -> usize {
match self {
Self::TableScan(..) | Self::IxScan(..) | Self::IxJoin(_, Semi::Rhs) => 1,
Self::IxScansAnd(plans) => plans.iter().map(PhysicalPlan::nfields).sum(),
Self::Filter(input, _) => input.nfields(),
Self::IxJoin(join, Semi::Lhs) => join.lhs.nfields(),
Self::IxJoin(join, Semi::All) => join.lhs.nfields() + 1,
@@ -859,6 +859,7 @@ impl PhysicalPlan {
| PhysicalPlan::IxJoin(IxJoin { rhs_label: alias, .. }, Semi::Rhs) => {
labels.push(*alias);
}
PhysicalPlan::IxScansAnd(idx) => labels.extend(idx.iter().flat_map(|plan| plan.labels())),
PhysicalPlan::Filter(input, _)
| PhysicalPlan::IxJoin(IxJoin { lhs: input, .. }, Semi::Lhs)
| PhysicalPlan::HashJoin(HashJoin { lhs: input, .. }, Semi::Lhs)
@@ -983,6 +984,17 @@ pub enum Sarg {
}
impl Sarg {
pub fn from_op(op: BinOp, col: ColId, value: AlgebraicValue) -> Self {
match op {
BinOp::Eq => Sarg::Eq(col, value),
BinOp::Ne => unreachable!("Cannot create a search argument for inequality"),
BinOp::Lt => Sarg::Range(col, Bound::Unbounded, Bound::Excluded(value)),
BinOp::Gt => Sarg::Range(col, Bound::Excluded(value), Bound::Unbounded),
BinOp::Lte => Sarg::Range(col, Bound::Unbounded, Bound::Included(value)),
BinOp::Gte => Sarg::Range(col, Bound::Included(value), Bound::Unbounded),
}
}
/// Decodes the sarg into a binary operator
pub fn to_op(&self) -> BinOp {
match self {
@@ -1227,7 +1239,7 @@ pub mod tests_utils {
Ok(compile(plan))
}
fn query<'a>(db: &'a impl SchemaView, auth: &AuthCtx, sql: &'a str) -> TypingResult<PhysicalCtx<'a>> {
pub fn query<'a>(db: &'a impl SchemaView, auth: &AuthCtx, sql: &'a str) -> TypingResult<PhysicalCtx<'a>> {
let plan = compile_sql_stmt_with_ctx(sql, db, auth, true)?;
Ok(compile(plan))
}
@@ -1241,6 +1253,16 @@ pub mod tests_utils {
let explain = Explain::new(&plan).with_options(options).build();
expect.assert_eq(&explain.to_string());
}
#[cfg(test)]
fn check_assert(plan: PhysicalCtx, options: ExplainOptions, expect: String) {
let plan = if options.optimize {
plan.optimize().unwrap()
} else {
plan
};
let explain = Explain::new(&plan).with_options(options).build();
pretty_assertions::assert_eq!(explain.to_string(), expect, "{}", plan.sql)
}
pub fn check_sub(
db: &impl SchemaView,
@@ -1253,6 +1275,11 @@ pub mod tests_utils {
check(plan, options, expect);
Ok(())
}
#[cfg(test)]
pub fn check_sub_assert(db: &impl SchemaView, options: ExplainOptions, auth: &AuthCtx, sql: &str, expect: String) {
let plan = sub(db, auth, sql).unwrap();
check_assert(plan, options, expect);
}
pub fn check_query(
db: &impl SchemaView,
@@ -1265,13 +1292,25 @@ pub mod tests_utils {
check(plan, options, expect);
Ok(())
}
#[cfg(test)]
pub fn check_query_assert(
db: &impl SchemaView,
options: ExplainOptions,
auth: &AuthCtx,
sql: &str,
expect: String,
) {
let plan = query(db, auth, sql).unwrap();
check_assert(plan, options, expect);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::printer::ExplainOptions;
use crate::plan::tests_utils::query;
use crate::printer::{Explain, ExplainOptions};
use expect_test::{expect, Expect};
use spacetimedb_expr::check::SchemaView;
use spacetimedb_lib::identity::AuthCtx;
@@ -1384,10 +1423,18 @@ mod tests {
tests_utils::check_sub(db, db.options, &AuthCtx::for_testing(), sql, expect).unwrap();
}
fn check_sub_assert(db: &SchemaViewer, sql: &str, expect: String) {
tests_utils::check_sub_assert(db, db.options, &AuthCtx::for_testing(), sql, expect);
}
fn check_query(db: &SchemaViewer, sql: &str, expect: Expect) {
tests_utils::check_query(db, db.options, &AuthCtx::for_testing(), sql, expect).unwrap();
}
fn check_query_assert(db: &SchemaViewer, sql: &str, expect: String) {
tests_utils::check_query_assert(db, db.options, &AuthCtx::for_testing(), sql, expect);
}
fn data() -> SchemaViewer {
let m_id = TableId(1);
let w_id = TableId(2);
@@ -1420,7 +1467,16 @@ mod tests {
Some(0),
));
SchemaViewer::new(vec![m.clone(), w.clone(), p.clone()]).with_options(ExplainOptions::default().optimize(false))
let x = Arc::new(schema(
m_id,
"test",
&[("x", AlgebraicType::I32), ("y", AlgebraicType::I32)],
&[&[0], &[1]],
&[&[0]],
Some(0),
));
SchemaViewer::new(vec![m.clone(), w.clone(), p.clone(), x.clone()])
.with_options(ExplainOptions::default().optimize(false))
}
#[test]
@@ -1591,26 +1647,234 @@ Seq Scan on t
);
}
/// Test single and multi-column index selections
#[test]
fn index_scans() {
let t_id = TableId(1);
fn make_table_index() -> SchemaViewer {
let t = Arc::new(schema(
t_id,
TableId(1),
"t",
&[
("w", AlgebraicType::U8),
("x", AlgebraicType::U8),
("y", AlgebraicType::U8),
("z", AlgebraicType::U8),
("id", AlgebraicType::U64),
],
&[&[1], &[2, 3], &[1, 2, 3]],
&[&[1], &[2, 3], &[1, 2, 3], &[0, 1, 2, 3], &[0]],
&[],
None,
));
let db = SchemaViewer::new(vec![t.clone()]).optimize(true);
SchemaViewer::new(vec![t]).optimize(true)
}
// We optimize with the following rules in mind:
// - When all the comparisons are`!=` is always a full table scan
// - Multi-column indexes are only used if the query has a prefix match (ie all operators are `=`)
// - Else are converted to a single column index scan on the leftmost column and a filter on the rest
/// Test index selections on 1 column
#[test]
fn index_scans_and() {
let t = Arc::new(schema(
TableId(1),
"t",
&[
("a", AlgebraicType::U8),
("b", AlgebraicType::U8),
("c", AlgebraicType::U8),
],
&[&[1], &[2], &[0, 1, 2]],
&[],
None,
));
let db = SchemaViewer::new(vec![t]).optimize(true);
check_sub(
&db,
"select * from t where a >= 3 and a <= 5",
expect![
r#"
Index Scan using Index id 2 (t.a, t.b, t.c) on t
Index Cond: (t.a >= U8(3))
Output: t.a, t.b, t.c
-> Filter: (t.a <= U8(5))"#
],
);
}
/// Test index selections on 1 column
#[test]
fn index_scans_1_col() {
let db = make_table_index();
for op in ["=", ">", "<", ">=", "<="] {
check_sub_assert(
&db,
&format!("SELECT * FROM t WHERE x {op} 4"),
format!(
"Index Scan using Index id 0 (t.x) on t
Index Cond: (t.x {op} U8(4))
Output: t.w, t.x, t.y, t.z, t.id",
),
)
}
// `!=` is not supported in index scans
check_query(
&db,
"select * from t where x != 4",
expect![
r#"
Seq Scan on t
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.x <> U8(4))"#
],
);
// Select index on x
check_query(
&db,
"select * from t where x = 5 and id = 4",
expect![
r#"
Index Scan using Index id 0 (t.x) on t
Index Cond: (t.x = U8(5))
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.id = U64(4))"#
],
);
// Do not select index on (y, z)
check_query(
&db,
"select * from t where y = 1",
expect![
r#"
Seq Scan on t
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.y = U8(1))"#
],
);
//Query multiple times the same index
check_query(
&db,
"select * from t where x = 5 and x = 6 and x = 4",
expect![
r#"
Union
-> Index Scan using Index id 0 (t.x) on t
Index Cond: (t.x = U8(6))
Output: t.w, t.x, t.y, t.z, t.id
-> Index Scan using Index id 0 (t.x) on t
Index Cond: (t.x = U8(4))
Output: t.w, t.x, t.y, t.z, t.id
-> Index Scan using Index id 0 (t.x) on t
Index Cond: (t.x = U8(5))
Output: t.w, t.x, t.y, t.z, t.id
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.x = U8(6) AND t.x = U8(4) AND t.x = U8(5))"#
],
);
check_query(
&db,
"select * from t where x = 5 or x = 6 or x = 4",
expect![
r#"
Seq Scan on t
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.x = U8(5) OR t.x = U8(6) OR t.x = U8(4))"#
],
);
}
/// Test index selections on 2 columns
#[test]
fn index_scans_2_col() {
let db = make_table_index();
// Select index on [y, z]
check_query(
&db,
"select * from t where y = 1 and z = 2",
expect![
r#"
Index Scan using Index id 1 (t.y, t.z) on t
Index Cond: (t.y = U8(1), t.z = U8(2))
Output: t.w, t.x, t.y, t.z, t.id"#
],
);
for op in [">", "<", ">=", "<="] {
check_query_assert(
&db,
&format!("select * from t where y {op} 1 and z {op} 2"),
format!(
r#"Index Scan using Index id 1 (t.y, t.z) on t
Index Cond: (t.y {op} U8(1))
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.z {op} U8(2))"#
),
);
// Check permutations of the same query
check_query_assert(
&db,
&format!("select * from t where z {op} 2 and y {op} 1"),
format!(
r#"Index Scan using Index id 1 (t.y, t.z) on t
Index Cond: (t.y {op} U8(1))
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.z {op} U8(2))"#
),
);
check_query_assert(
&db,
&format!("select * from t where z != 2 and y {op} 1"),
format!(
r#"Index Scan using Index id 1 (t.y, t.z) on t
Index Cond: (t.y {op} U8(1))
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.z <> U8(2))"#
),
);
}
// Select index on (y, z), (w) and filter on (id)
check_query(
&db,
"select * from t where w = 1 and y = 2 and z = 3 and id = 4",
expect![
r#"
Union
-> Index Scan using Index id 1 (t.y, t.z) on t
Index Cond: (t.y = U8(2), t.z = U8(3))
Output: t.w, t.x, t.y, t.z, t.id
-> Index Scan using Index id 3 (t.w, t.x, t.y, t.z) on t
Index Cond: (t.w = U8(1))
Output: t.w, t.x, t.y, t.z, t.id
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.id = U64(4) AND t.y = U8(2) AND t.z = U8(3) AND t.w = U8(1))"#
],
);
// `!=` is not supported in index scans
check_query(
&db,
"select * from t where y != 1 and z != 2",
expect![
r#"
Seq Scan on t
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.y <> U8(1) AND t.z <> U8(2))"#
],
);
}
/// Test index selections on 3 columns
#[test]
fn index_scans_3_col() {
let db = make_table_index();
// Select index on (x, y, z)
check_sub(
&db,
@@ -1619,7 +1883,7 @@ Seq Scan on t
r#"
Index Scan using Index id 2 (t.x, t.y, t.z) on t
Index Cond: (x.x = U8(3), x.y = U8(4), x.z = U8(5))
Output: x.w, x.x, x.y, x.z"#
Output: x.w, x.x, x.y, x.z, x.id"#
],
);
@@ -1631,78 +1895,130 @@ Index Scan using Index id 2 (t.x, t.y, t.z) on t
r#"
Index Scan using Index id 2 (t.x, t.y, t.z) on t
Index Cond: (t.x = U8(3), t.y = U8(4), t.z = U8(5))
Output: t.w, t.x, t.y, t.z"#
Output: t.w, t.x, t.y, t.z, t.id"#
],
);
// Select index on x
check_sub(
&db,
"select * from t where x = 3 and y = 4",
expect![
r#"
Index Scan using Index id 0 (t.x) on t
Index Cond: (t.x = U8(3))
Output: t.w, t.x, t.y, t.z
-> Filter: (t.y = U8(4))"#
],
);
// Select index on x
for op in [">", "<", ">=", "<="] {
check_sub_assert(
&db,
&format!("select * from t where x {op} 3 and y {op} 4 and z {op} 5"),
format!(
r#"Union
-> Index Scan using Index id 0 (t.x) on t
Index Cond: (t.x {op} U8(3))
Output: t.w, t.x, t.y, t.z, t.id
-> Index Scan using Index id 1 (t.y, t.z) on t
Index Cond: (t.y {op} U8(4))
Output: t.w, t.x, t.y, t.z, t.id
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.z {op} U8(5) AND t.x {op} U8(3) AND t.y {op} U8(4))"#
),
);
// Check permutations of the same query
check_sub_assert(
&db,
&format!("select * from t where z {op} 5 and y {op} 4 and x {op} 3"),
format!(
r#"Union
-> Index Scan using Index id 0 (t.x) on t
Index Cond: (t.x {op} U8(3))
Output: t.w, t.x, t.y, t.z, t.id
-> Index Scan using Index id 1 (t.y, t.z) on t
Index Cond: (t.y {op} U8(4))
Output: t.w, t.x, t.y, t.z, t.id
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.z {op} U8(5) AND t.x {op} U8(3) AND t.y {op} U8(4))"#
),
);
check_sub_assert(
&db,
&format!("select * from t where x {op} 3 and y != 4 and z {op} 5"),
format!(
r#"Index Scan using Index id 0 (t.x) on t
Index Cond: (t.x {op} U8(3))
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.y <> U8(4) AND t.z {op} U8(5))"#
),
);
}
// `!=` is not supported in index scans
check_query(
&db,
"select * from t where w = 5 and x = 4",
expect![
r#"
Index Scan using Index id 0 (t.x) on t
Index Cond: (t.x = U8(4))
Output: t.w, t.x, t.y, t.z
-> Filter: (t.w = U8(5))"#
],
);
// Do not select index on (y, z)
check_query(
&db,
"select * from t where y = 1",
"select * from t where x != 3 and y != 4 and z != 5",
expect![
r#"
Seq Scan on t
Output: t.w, t.x, t.y, t.z
-> Filter: (t.y = U8(1))"#
Output: t.w, t.x, t.y, t.z, t.id
-> Filter: (t.x <> U8(3) AND t.y <> U8(4) AND t.z <> U8(5))"#
],
);
// Select index on [y, z]
check_query(
// Select index on (y, z) with multiple conditions on y
check_sub(
&db,
"select * from t where y = 1 and z = 2",
"select * from t as x where y = 4 and y < 5 and id = 6",
expect![
r#"
Index Scan using Index id 1 (t.y, t.z) on t
Index Cond: (t.y = U8(1), t.z = U8(2))
Output: t.w, t.x, t.y, t.z"#
Union
-> Index Scan using Index id 1 (t.y, t.z) on t
Index Cond: (x.y = U8(4))
Output: x.w, x.x, x.y, x.z, x.id
-> Index Scan using Index id 1 (t.y, t.z) on t
Index Cond: (x.y < U8(5))
Output: x.w, x.x, x.y, x.z, x.id
Output: x.w, x.x, x.y, x.z, x.id
-> Filter: (x.id = U64(6) AND x.y < U8(5) AND x.y = U8(4))"#
],
);
}
/// Test index selections above 3 columns
#[test]
fn index_scans_after_3_col() {
let db = make_table_index();
// Select index on (x, y, z)
check_sub(
&db,
"select * from t as x where x = 3 and y = 4 and z = 5 and w = 6",
expect![
r#"
Index Scan using Index id 3 (t.w, t.x, t.y, t.z) on t
Index Cond: (x.w = U8(6), x.x = U8(3), x.y = U8(4), x.z = U8(5))
Output: x.w, x.x, x.y, x.z, x.id"#
],
);
// Check permutations of the same query
check_query(
// Test permutations of the same query
check_sub(
&db,
"select * from t where z = 2 and y = 1",
"select * from t where z = 5 and y = 4 and w = 6 and x = 3",
expect![
r#"
Index Scan using Index id 1 (t.y, t.z) on t
Index Cond: (t.y = U8(1), t.z = U8(2))
Output: t.w, t.x, t.y, t.z"#
Index Scan using Index id 3 (t.w, t.x, t.y, t.z) on t
Index Cond: (t.w = U8(6), t.x = U8(3), t.y = U8(4), t.z = U8(5))
Output: t.w, t.x, t.y, t.z, t.id"#
],
);
// Select index on (y, z) and filter on (w)
check_query(
}
/// Test index selections select the shorter index when multiple indexes match
#[test]
fn index_scans_pick_shorter() {
let db = make_table_index();
// Select index on (x) instead of (x, y)
check_sub(
&db,
"select * from t where w = 1 and y = 2 and z = 3",
"select * from t as x where x = 3 and id > 4",
expect![
r#"
Index Scan using Index id 1 (t.y, t.z) on t
Index Cond: (t.y = U8(2), t.z = U8(3))
Output: t.w, t.x, t.y, t.z
-> Filter: (t.w = U8(1))"#
Index Scan using Index id 0 (t.x) on t
Index Cond: (x.x = U8(3))
Output: x.w, x.x, x.y, x.z, x.id
-> Filter: (x.id > U64(4))"#
],
);
}
@@ -2121,4 +2437,49 @@ Limit: 10
Output: p.id, p.name"#]],
);
}
#[test]
fn overflow() {
let db = data().with_options(ExplainOptions::default().optimize(true));
let build_query = |total| {
let mut sql = "select * from m where ".to_string();
for x in 1..total {
let fragment = format!("(manager = {x}) or ");
sql.push_str(&fragment.repeat((total - 1) as usize));
}
sql.push_str("(employee = 0)");
sql
};
let run = |sep: char, sql: &str| {
query(&db, &AuthCtx::for_testing(), sql)
.map(|plan| {
// Check that the plan can be explained without overflow
let explain = Explain::new(&plan)
.with_options(ExplainOptions::default().optimize(true))
.build();
let out = explain.to_string();
!out.is_empty()
})
.map_err(|e| e.to_string().split(sep).next().unwrap_or_default().to_string())
};
let sql = build_query(1_000);
assert_eq!(
run(':', &sql),
Err("SQL query exceeds maximum allowed length".to_string())
);
let sql = build_query(41);
assert_eq!(run(',', &sql), Err("Recursion limit exceeded".to_string()));
let sql = build_query(40);
assert_eq!(run(',', &sql), Ok(true), "Query should not overflow");
// Check no overflow with lot of joins
let mut sql = "SELECT m.* FROM m ".to_string();
for i in 0..1_000 {
sql.push_str(&format!("JOIN m AS m{i} ON m.employee = m{i}.manager "));
}
assert_eq!(run(',', &sql), Ok(true), "Query with 1_000 joins should not overflow");
}
}
+12 -1
View File
@@ -544,6 +544,17 @@ fn scan_tables<'a>(lines: &mut Lines<'a>, plan: &'a PhysicalPlan) -> PrinterPlan
output,
}
}
PhysicalPlan::IxScansAnd(idx) => {
let mut plans = Vec::new();
for plan in idx {
plans.push(scan_tables(lines, plan));
}
let output = plans.last().map(|x| x.output()).unwrap_or(Output::Empty);
PrinterPlan::Union {
output: output.clone(),
plans,
}
}
PhysicalPlan::IxJoin(idx, semi) => {
lines.add_table(idx.rhs_label, &idx.rhs);
let plan = scan_tables(lines, &idx.lhs);
@@ -909,7 +920,7 @@ fn eval_plan<'a>(lines: &mut Lines<'a>, plan: PrinterPlan<'a>, ident: u16) {
/// - Showing the schema of the tables
/// - Showing the planning time
pub struct Explain<'a> {
ctx: &'a PhysicalCtx<'a>,
pub ctx: &'a PhysicalCtx<'a>,
lines: Vec<Line<'a>>,
labels: Labels<'a>,
options: ExplainOptions,
+260 -425
View File
@@ -5,36 +5,36 @@
//! * [PushConstEq]
//! Push down predicates of the form `x=1`
//! * [PushConstAnd]
//! Push down predicates of the form `x=1 and y=2`
//! * [IxScanEq]
//! * [IxScanBinOp]
//! Generate 1-column index scan for `x=1`
//! * [IxScanAnd]
//! Generate 1-column index scan for `x=1 and y=2`
//! * [IxScanEq2Col]
//! Generate 2-column index scan
//! * [IxScanEq3Col]
//! Generate 3-column index scan
//! * [ReorderHashJoin]
//! Reorder the sides of a hash join
//! * [ReorderDeltaJoinRhs]
//! Reorder the sides of a hash join with delta tables
//! * [PullFilterAboveHashJoin]
//! Pull a filter above a hash join with delta tables
//! * [HashToIxJoin]
//! * [HashToIxJoin]
//! Convert hash join to index join
//! * [UniqueIxJoinRule]
//! * [UniqueIxJoinRule]
//! Mark index join as unique
//! * [UniqueHashJoinRule]
//! * [UniqueHashJoinRule]
//! Mark hash join as unique
use anyhow::{bail, Result};
use spacetimedb_primitives::{ColId, ColSet, IndexId};
use spacetimedb_schema::schema::IndexSchema;
use spacetimedb_sql_parser::ast::{BinOp, LogOp};
//!
//! We optimize with the following rules in mind:
//! - When all the comparisons are`!=` is always a full table scan
//! - Multi-column indexes are only used if the query has a prefix match (ie all operators are `=`)
//! - Else are converted to a single column index scan on the leftmost column after `=` and a filter on the rest
use crate::plan::{
HashJoin, IxJoin, IxScan, Label, PhysicalExpr, PhysicalPlan, ProjectListPlan, ProjectPlan, Sarg, Semi, TableScan,
TupleField,
};
use anyhow::{bail, Result};
use either::Either;
use itertools::Itertools;
use spacetimedb_lib::AlgebraicValue;
use spacetimedb_primitives::{ColId, ColSet, IndexId};
use spacetimedb_schema::schema::IndexSchema;
use spacetimedb_sql_parser::ast::{BinOp, LogOp};
use std::collections::{HashMap, HashSet};
/// A rewrite will only fail due to an internal logic bug.
/// However we don't want to panic in such a situation.
@@ -400,7 +400,7 @@ impl RewriteRule for PushConstAnd {
}
}
/// Match single field equality predicates such as:
/// Match single field for [`BinOp`] predicates such as:
///
/// ```sql
/// select * from t where x = 1
@@ -408,20 +408,18 @@ impl RewriteRule for PushConstAnd {
///
/// Rewrite as an index scan if applicable.
///
/// NOTE: This rule does not consider multi-column indexes.
pub(crate) struct IxScanEq;
/// NOTE: This rule does not consider multi-column indexes, or [`BinOp::Ne`] predicates.
pub(crate) struct IxScanBinOp;
pub(crate) struct IxScanInfo {
index_id: IndexId,
cols: Vec<(usize, ColId)>,
}
impl RewriteRule for IxScanEq {
impl RewriteRule for IxScanBinOp {
type Plan = PhysicalPlan;
type Info = (IndexId, ColId);
fn matches(plan: &PhysicalPlan) -> Option<Self::Info> {
if let PhysicalPlan::Filter(input, PhysicalExpr::BinOp(BinOp::Eq, expr, value)) = plan {
if let PhysicalPlan::Filter(input, PhysicalExpr::BinOp(op, expr, value)) = plan {
if *op == BinOp::Ne {
return None;
}
if let PhysicalPlan::TableScan(
TableScan {
schema,
@@ -455,7 +453,7 @@ impl RewriteRule for IxScanEq {
}
fn rewrite(plan: PhysicalPlan, (index_id, col_id): Self::Info) -> Result<PhysicalPlan> {
if let PhysicalPlan::Filter(input, PhysicalExpr::BinOp(BinOp::Eq, _, value)) = plan {
if let PhysicalPlan::Filter(input, PhysicalExpr::BinOp(op, _, value)) = plan {
if let PhysicalPlan::TableScan(TableScan { schema, limit, delta }, var) = *input {
if let PhysicalExpr::Value(v) = *value {
return Ok(PhysicalPlan::IxScan(
@@ -465,7 +463,7 @@ impl RewriteRule for IxScanEq {
delta,
index_id,
prefix: vec![],
arg: Sarg::Eq(col_id, v),
arg: Sarg::from_op(op, col_id, v),
},
var,
));
@@ -476,115 +474,56 @@ impl RewriteRule for IxScanEq {
}
}
/// Match multi-field equality predicates such as:
/// Match `1...N` multi-field [`BinOp`] predicates such as:
///
/// ```sql
/// select * from t where x = 1 and y = 1
/// ```
///
/// Create an index scan for one of the equality conditions.
///
/// NOTE: This rule does not consider multi-column indexes.
pub(crate) struct IxScanAnd;
impl RewriteRule for IxScanAnd {
type Plan = PhysicalPlan;
type Info = (IndexId, usize, ColId);
fn matches(plan: &PhysicalPlan) -> Option<Self::Info> {
if let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, exprs)) = plan {
if let PhysicalPlan::TableScan(
TableScan {
schema,
limit: None,
delta: _,
},
_,
) = &**input
{
return exprs.iter().enumerate().find_map(|(i, expr)| {
if let PhysicalExpr::BinOp(BinOp::Eq, lhs, value) = expr {
if let (PhysicalExpr::Field(TupleField { field_pos: pos, .. }), PhysicalExpr::Value(_)) =
(&**lhs, &**value)
{
return schema.indexes.iter().find_map(
|IndexSchema {
index_id,
index_algorithm,
..
}| {
index_algorithm
.columns()
// TODO: Support prefix scans
.as_singleton()
.filter(|col_id| col_id.idx() == *pos)
.map(|col_id| (*index_id, i, col_id))
},
);
}
}
None
});
}
}
None
}
fn rewrite(plan: PhysicalPlan, (index_id, i, col_id): Self::Info) -> Result<PhysicalPlan> {
if let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, mut exprs)) = plan {
if let PhysicalPlan::TableScan(TableScan { schema, limit, delta }, label) = *input {
if let PhysicalExpr::BinOp(BinOp::Eq, _, value) = exprs.swap_remove(i) {
if let PhysicalExpr::Value(v) = *value {
return Ok(PhysicalPlan::Filter(
Box::new(PhysicalPlan::IxScan(
IxScan {
schema,
limit,
delta,
index_id,
prefix: vec![],
arg: Sarg::Eq(col_id, v),
},
label,
)),
match exprs.len() {
1 => exprs.swap_remove(0),
_ => PhysicalExpr::LogOp(LogOp::And, exprs),
},
));
}
}
}
}
bail!("{INVARIANT_VIOLATION}: Failed to create single column index scan from conjunction")
}
}
/// Match multi-field equality predicates such as:
///
/// ```sql
/// select * from t where x = 1 and y = 1
/// select * from t where x = 1 and y = 1 // Full match
/// select * from t where x = 1 and y > 1 // Partial match
/// ```
///
/// Rewrite as a multi-column index scan if applicable.
///
/// NOTE: This rule does not consider indexes on 3 or more columns.
pub(crate) struct IxScanEq2Col;
/// NOTE: This rule does not consider [`BinOp::Ne`] predicates.
pub(crate) struct IxScanOpMultiCol;
impl RewriteRule for IxScanEq2Col {
#[derive(Debug, Clone)]
struct ColInfo {
op: BinOp,
field: TupleField,
value: AlgebraicValue,
}
#[derive(Debug)]
struct IndexMatch {
index_id: IndexId,
index_name: Box<str>,
matched_columns: Vec<ColInfo>,
}
#[derive(Debug)]
pub(crate) struct IxScanInfo {
matched: Vec<IndexMatch>,
unmatched: Vec<usize>,
unmatched_filter: Vec<ColInfo>,
}
impl RewriteRule for IxScanOpMultiCol {
type Plan = PhysicalPlan;
type Info = IxScanInfo;
fn matches(plan: &PhysicalPlan) -> Option<Self::Info> {
// We need to match the conditions against the index columns, so that:
// 1. We match the columns in the same order as the index
// 2. Build a prefix on the `columns - N` with `BinOp::Eq`
// 3. Build a [Sarg] on the first column after the prefix (Note: it must always succeed)
// [a Eq 1, b Eq 2, c Lte 3] -> [(a, 1), (b, 2), Sarg::from_op(Lte, c, 3)]
// 4. All the others become unmatched and turns into a filter scan`
fn matches(plan: &Self::Plan) -> Option<Self::Info> {
// Match a filter with a conjunction of binary expressions
let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, exprs)) = plan else {
return None;
};
let PhysicalPlan::TableScan(
TableScan {
schema,
limit: None,
delta: _,
schema, limit: None, ..
},
_,
) = &**input
@@ -592,316 +531,212 @@ impl RewriteRule for IxScanEq2Col {
return None;
};
for (i, a) in exprs.iter().enumerate() {
for (j, b) in exprs.iter().enumerate().filter(|(j, _)| i != *j) {
let (PhysicalExpr::BinOp(BinOp::Eq, a, u), PhysicalExpr::BinOp(BinOp::Eq, b, v)) = (a, b) else {
continue;
};
let (PhysicalExpr::Field(u), PhysicalExpr::Value(_), PhysicalExpr::Field(v), PhysicalExpr::Value(_)) =
(&**a, &**u, &**b, &**v)
else {
continue;
};
if let Some(scan) = schema
.indexes
.iter()
.filter(|idx| idx.index_algorithm.columns().len() == 2) // TODO: Support prefix scans
.map(|idx| (idx.index_id, idx.index_algorithm.columns()))
.find_map(|(index_id, columns)| {
let mut columns = columns.iter();
let x = columns.next()?;
if x.idx() != u.field_pos {
return None;
}
let y = columns.next()?;
if y.idx() != v.field_pos {
return None;
}
Some(IxScanInfo {
index_id,
cols: vec![(i, x), (j, y)],
})
})
if schema.indexes.is_empty() {
return None;
}
// Partition expressions into indexable and non-indexable
let (expr_cols, unmatched): (Vec<_>, Vec<_>) =
exprs.iter().enumerate().partition_map(|(expr_idx, e)| match e {
PhysicalExpr::BinOp(op, lhs, rhs)
if matches!(op, BinOp::Eq | BinOp::Lt | BinOp::Gt | BinOp::Lte | BinOp::Gte) =>
{
return Some(scan);
}
}
}
None
}
fn rewrite(plan: PhysicalPlan, info: Self::Info) -> Result<PhysicalPlan> {
match info.cols.as_slice() {
[(i, a), (j, b)] => {
if let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, exprs)) = plan {
if let PhysicalPlan::TableScan(TableScan { schema, limit, delta }, label) = *input {
if let (
Some(PhysicalExpr::BinOp(BinOp::Eq, _, u)),
Some(PhysicalExpr::BinOp(BinOp::Eq, _, v)),
) = (exprs.get(*i), exprs.get(*j))
{
if let (PhysicalExpr::Value(u), PhysicalExpr::Value(v)) = (&**u, &**v) {
return Ok(match exprs.len() {
n @ 0 | n @ 1 => {
bail!("{INVARIANT_VIOLATION}: Cannot create 2-column index scan from {n} conditions")
}
// If there are only 2 conditions in this filter,
// we replace the filter with an index scan.
2 => PhysicalPlan::IxScan(
IxScan {
schema,
limit,
delta,
index_id: info.index_id,
prefix: vec![(*a, u.clone())],
arg: Sarg::Eq(*b, v.clone()),
},
label,
),
// If there are 3 conditions in this filter,
// we create an index scan from 2 of them.
// The original conjunction is no longer well defined,
// because it only has a single operand now.
// Hence we must replace it with its operand.
3 => PhysicalPlan::Filter(
Box::new(PhysicalPlan::IxScan(
IxScan {
schema,
limit,
delta,
index_id: info.index_id,
prefix: vec![(*a, u.clone())],
arg: Sarg::Eq(*b, v.clone()),
},
label,
)),
exprs
.into_iter()
.enumerate()
.find(|(pos, _)| pos != i && pos != j)
.map(|(_, expr)| expr)
.unwrap(),
),
// If there are more than 3 conditions in this filter,
// we remove the 2 conditions used in the index scan.
// The remaining conditions still form a conjunction.
_ => PhysicalPlan::Filter(
Box::new(PhysicalPlan::IxScan(
IxScan {
schema,
limit,
delta,
index_id: info.index_id,
prefix: vec![(*a, u.clone())],
arg: Sarg::Eq(*b, v.clone()),
},
label,
)),
PhysicalExpr::LogOp(
LogOp::And,
exprs
.into_iter()
.enumerate()
.filter(|(pos, _)| pos != i && pos != j)
.map(|(_, expr)| expr)
.collect(),
),
),
});
}
}
}
}
bail!("{INVARIANT_VIOLATION}: Failed to create 2-column index scan")
}
_ => Ok(plan),
}
}
}
/// Match multi-field equality predicates such as:
///
/// ```sql
/// select * from t where x = 1 and y = 1 and z = 1
/// ```
///
/// Rewrite as a multi-column index scan if applicable.
///
/// NOTE: This rule does not consider indexes on 4 or more columns.
pub(crate) struct IxScanEq3Col;
impl RewriteRule for IxScanEq3Col {
type Plan = PhysicalPlan;
type Info = IxScanInfo;
fn matches(plan: &PhysicalPlan) -> Option<Self::Info> {
// Match outer plan structure
let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, exprs)) = plan else {
return None;
};
let PhysicalPlan::TableScan(
TableScan {
schema,
limit: None,
delta: _,
},
_,
) = &**input
else {
return None;
};
for (i, a) in exprs.iter().enumerate() {
for (j, b) in exprs.iter().enumerate().filter(|(j, _)| i != *j) {
for (k, c) in exprs.iter().enumerate().filter(|(k, _)| i != *k && j != *k) {
let (
PhysicalExpr::BinOp(BinOp::Eq, a, u),
PhysicalExpr::BinOp(BinOp::Eq, b, v),
PhysicalExpr::BinOp(BinOp::Eq, c, w),
) = (a, b, c)
else {
continue;
};
let (
PhysicalExpr::Field(u),
PhysicalExpr::Value(_),
PhysicalExpr::Field(v),
PhysicalExpr::Value(_),
PhysicalExpr::Field(w),
PhysicalExpr::Value(_),
) = (&**a, &**u, &**b, &**v, &**c, &**w)
else {
continue;
};
if let Some(scan) = schema
.indexes
.iter()
.filter(|idx| idx.index_algorithm.columns().len() == 3)
.map(|idx| (idx.index_id, idx.index_algorithm.columns()))
.find_map(|(index_id, columns)| {
let mut columns = columns.iter();
let x = columns.next()?;
if x.idx() != u.field_pos {
return None;
}
let y = columns.next()?;
if y.idx() != v.field_pos {
return None;
}
let z = columns.next()?;
if z.idx() != w.field_pos {
return None;
}
Some(IxScanInfo {
index_id,
cols: vec![(i, x), (j, y), (k, z)],
})
if let (PhysicalExpr::Field(field), PhysicalExpr::Value(value)) = (&**lhs, &**rhs) {
Either::Left(ColInfo {
op: *op,
field: field.clone(),
value: value.clone(),
})
{
return Some(scan);
} else {
Either::Right(expr_idx)
}
}
}
_ => Either::Right(expr_idx),
});
if expr_cols.is_empty() {
return None;
}
None
// We should allow for same column to be used multiple times, like `x = 1 and x > 2`.
let mut cond_map: HashMap<ColId, Vec<&ColInfo>> = HashMap::new();
let binding = expr_cols.clone();
for c in &binding {
cond_map.entry(c.field.field_pos.into()).or_default().push(c);
}
let mut candidates = vec![];
for idx in &schema.indexes {
let mut prefix_buf: Vec<&ColInfo> = vec![];
let mut has_range = false;
for col_id in idx.index_algorithm.columns().iter() {
let Some(conds) = cond_map.get(&col_id) else { break };
let mut pushed = false;
for cond in conds {
// If we have duplicate fields like in `x = 1 and x = 2 and y = 3`, then split:
// prefix: [x = 1, y = 3]
// extra : [x = 2]
if prefix_buf.iter().any(|c| c.field.field_pos == cond.field.field_pos) {
if cond.op == BinOp::Eq || !has_range {
candidates.push((idx.index_id, idx.index_name.clone(), vec![*cond]));
}
continue;
}
match cond.op {
BinOp::Eq => {
prefix_buf.push(*cond);
pushed = true;
}
// Only take the first range condition
BinOp::Lt | BinOp::Gt | BinOp::Lte | BinOp::Gte if !has_range => {
prefix_buf.push(*cond);
has_range = true;
break;
}
_ => {}
}
}
if !pushed {
break;
}
}
if !prefix_buf.is_empty() {
candidates.push((idx.index_id, idx.index_name.clone(), prefix_buf));
}
}
if candidates.is_empty() {
return None;
}
// Match the index with the longest prefix first
candidates.sort_by_key(|(_, _, cols)| -(cols.len() as isize));
let mut matched = vec![];
let mut covered = HashSet::new();
for (index_id, index_name, prefix) in candidates {
if prefix
.iter()
.all(|c| covered.contains(&(c.op, c.field.field_pos, &c.value)))
{
continue;
}
covered.extend(prefix.iter().map(|c| (c.op, c.field.field_pos, &c.value)));
matched.push(IndexMatch {
index_id,
index_name,
matched_columns: prefix.into_iter().cloned().collect(),
});
}
if matched.is_empty() {
return None;
}
matched.sort_by_key(|m| m.index_name.clone());
let unmatched_filter = expr_cols
.into_iter()
.filter(|c| !covered.contains(&(c.op, c.field.field_pos, &c.value)))
.collect();
Some(IxScanInfo {
matched,
unmatched,
unmatched_filter,
})
}
fn rewrite(plan: PhysicalPlan, info: Self::Info) -> Result<PhysicalPlan> {
match info.cols.as_slice() {
[(i, a), (j, b), (k, c)] => {
if let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, exprs)) = plan {
if let PhysicalPlan::TableScan(TableScan { schema, limit, delta }, label) = *input {
if let (
Some(PhysicalExpr::BinOp(BinOp::Eq, _, u)),
Some(PhysicalExpr::BinOp(BinOp::Eq, _, v)),
Some(PhysicalExpr::BinOp(BinOp::Eq, _, w)),
) = (exprs.get(*i), exprs.get(*j), exprs.get(*k))
{
if let (PhysicalExpr::Value(u), PhysicalExpr::Value(v), PhysicalExpr::Value(w)) =
(&**u, &**v, &**w)
{
return Ok(match exprs.len() {
n @ 0 | n @ 1 | n @ 2 => {
bail!("{INVARIANT_VIOLATION}: Cannot create 3-column index scan from {n} conditions")
}
// If there are only 3 conditions in this filter,
// we replace the filter with an index scan.
3 => PhysicalPlan::IxScan(
IxScan {
schema,
limit,
delta,
index_id: info.index_id,
prefix: vec![(*a, u.clone()), (*b, v.clone())],
arg: Sarg::Eq(*c, w.clone()),
},
label,
),
// If there are 4 conditions in this filter,
// we create an index scan from 3 of them.
// The original conjunction is no longer well defined,
// because it only has a single operand now.
// Hence we must replace it with its operand.
4 => PhysicalPlan::Filter(
Box::new(PhysicalPlan::IxScan(
IxScan {
schema,
limit,
delta,
index_id: info.index_id,
prefix: vec![(*a, u.clone()), (*b, v.clone())],
arg: Sarg::Eq(*c, w.clone()),
},
label,
)),
exprs
.into_iter()
.enumerate()
.find(|(pos, _)| pos != i && pos != j && pos != k)
.map(|(_, expr)| expr)
.unwrap(),
),
// If there are more than 4 conditions in this filter,
// we remove the 3 conditions used in the index scan.
// The remaining conditions still form a conjunction.
_ => PhysicalPlan::Filter(
Box::new(PhysicalPlan::IxScan(
IxScan {
schema,
limit,
delta,
index_id: info.index_id,
prefix: vec![(*a, u.clone()), (*b, v.clone())],
arg: Sarg::Eq(*c, w.clone()),
},
label,
)),
PhysicalExpr::LogOp(
LogOp::And,
exprs
.into_iter()
.enumerate()
.filter(|(pos, _)| pos != i && pos != j && pos != k)
.map(|(_, expr)| expr)
.collect(),
),
),
});
}
}
}
}
bail!("{INVARIANT_VIOLATION}: Failed to create 3-column index scan")
}
_ => Ok(plan),
fn rewrite(plan: Self::Plan, info: Self::Info) -> Result<Self::Plan> {
let PhysicalPlan::Filter(input, PhysicalExpr::LogOp(LogOp::And, exprs)) = plan else {
bail!("{INVARIANT_VIOLATION}: Expected Filter(LogOp::And)")
};
let PhysicalPlan::TableScan(scan, label) = *input else {
bail!("{INVARIANT_VIOLATION}: Expected TableScan")
};
let mut plans = Vec::with_capacity(info.matched.len());
for m in &info.matched {
let (prefix, arg) = {
let take = m.matched_columns.len() - 1;
let mut it = m.matched_columns.clone().into_iter();
let prefix: Vec<_> = it
.by_ref()
.take(take)
.map(|c| (c.field.field_pos.into(), c.value.clone()))
.collect();
let arg = it
.next()
.map(|c| Sarg::from_op(c.op, c.field.field_pos.into(), c.value))
.unwrap();
(prefix, arg)
};
plans.push((
m,
PhysicalPlan::IxScan(
IxScan {
schema: scan.schema.clone(),
limit: scan.limit,
delta: scan.delta,
index_id: m.index_id,
prefix,
arg,
},
label,
),
));
}
let mut unmatched_filter = info.unmatched_filter;
let index_plan = if plans.len() == 1 {
plans.pop().unwrap().1
} else {
// Must add back the filter expression because the index matches need to form an intersection
// of the index scans, like in `x > 1 and y < 2` each index scan output partial matches.
unmatched_filter.extend(info.matched.iter().flat_map(|m| m.matched_columns.iter()).cloned());
// Sort by the number of matched columns by `=` operator, so it scan less rows later
let plans = plans
.into_iter()
.sorted_by_key(|(m, _)| -(m.matched_columns.iter().filter(|c| c.op == BinOp::Eq).count() as isize))
.map(|(_, plan)| plan)
.collect();
PhysicalPlan::IxScansAnd(plans)
};
// No unmatched filters: done
if info.unmatched.is_empty() && unmatched_filter.is_empty() {
return Ok(index_plan);
}
// Reconstruct remaining filter expressions
let remaining = exprs
.into_iter()
.enumerate()
.filter(|(i, _)| info.unmatched.contains(i))
.map(|(_, e)| e)
.chain(unmatched_filter.into_iter().map(|c| {
PhysicalExpr::BinOp(
c.op,
Box::new(PhysicalExpr::Field(c.field)),
Box::new(PhysicalExpr::Value(c.value)),
)
}))
.collect();
Ok(PhysicalPlan::Filter(
Box::new(index_plan),
PhysicalExpr::LogOp(LogOp::And, remaining),
))
}
}
+1 -1
View File
@@ -210,7 +210,7 @@ pub enum SqlLiteral {
}
/// Binary infix operators
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum BinOp {
Eq,
Ne,