Dedup (aka DISTINCT) support on PhysicalPlan

Add dedup
This commit is contained in:
Mario Alejandro Montoya Cortés
2024-10-28 11:51:51 -05:00
committed by Mario Alejandro Montoya Cortes
parent 0328a4fe4b
commit eabb007eee
13 changed files with 209 additions and 38 deletions
+35
View File
@@ -702,4 +702,39 @@ pub(crate) mod tests {
Ok(())
}
// Test `DISTINCT` keyword
#[test]
fn test_distinct() -> ResultTest<()> {
let db = TestDB::durable()?;
let table_id = db.create_table_for_test(
"T",
&[("id", AlgebraicType::U8), ("pos", AlgebraicType::U8)],
&[ColId(0)],
)?;
db.with_auto_commit(Workload::ForTests, |tx| -> Result<_, DBError> {
for i in 0..5u8 {
insert(&db, tx, table_id, &product!(i, 0u8))?;
}
Ok(())
})?;
let result = run_for_testing(&db, "SELECT DISTINCT * FROM T")?;
assert_eq!(
result,
vec![
product![0u8, 0u8],
product![1u8, 0u8],
product![2u8, 0u8],
product![3u8, 0u8],
product![4u8, 0u8]
]
);
let result = run_for_testing(&db, "SELECT DISTINCT pos FROM T WHERE id > 2")?;
assert_eq!(result, vec![product![0u8]]);
Ok(())
}
}
+28 -2
View File
@@ -1,5 +1,6 @@
use std::collections::{HashMap, HashSet};
use crate::{Datastore, DeltaScanIter, DeltaStore, Row, Tuple};
use anyhow::{anyhow, bail, Result};
use spacetimedb_lib::{query::Delta, AlgebraicValue, ProductValue};
use spacetimedb_physical_plan::plan::{
@@ -11,8 +12,6 @@ use spacetimedb_table::{
table_index::{TableIndex, TableIndexPointIter},
};
use crate::{Datastore, DeltaScanIter, DeltaStore, Row, Tuple};
/// The different iterators for evaluating query plans
pub enum PlanIter<'a> {
Table(TableScanIter<'a>),
@@ -75,6 +74,7 @@ pub enum Iter<'a> {
Row(RowRefIter<'a>),
Join(LeftDeepJoinIter<'a>),
Filter(Filter<'a, Iter<'a>>),
Dedup(Dedup<'a>),
}
impl<'a> Iterator for Iter<'a> {
@@ -85,6 +85,7 @@ impl<'a> Iterator for Iter<'a> {
Self::Row(iter) => iter.next().map(Tuple::Row),
Self::Join(iter) => iter.next(),
Self::Filter(iter) => iter.next(),
Self::Dedup(iter) => iter.next(),
}
}
}
@@ -1069,3 +1070,28 @@ impl<'a> Iterator for Filter<'a, Iter<'a>> {
self.input.find(|tuple| self.expr.eval_bool(tuple))
}
}
/// A tuple at a time deduplication iterator
pub struct Dedup<'a> {
/// The input iterator
input: Box<Iter<'a>>,
/// The set of seen row ids
seen: HashSet<Tuple<'a>>,
}
impl<'a> Dedup<'a> {
pub fn new(input: Box<Iter<'a>>) -> Self {
Self {
input,
seen: HashSet::new(),
}
}
}
impl<'a> Iterator for Dedup<'a> {
type Item = Tuple<'a>;
fn next(&mut self) -> Option<Self::Item> {
self.input.find(|tuple| self.seen.insert(tuple.clone()))
}
}
+2 -2
View File
@@ -95,7 +95,7 @@ pub trait DeltaStore {
}
}
#[derive(Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Row<'a> {
Ptr(RowRef<'a>),
Ref(&'a ProductValue),
@@ -148,7 +148,7 @@ impl ProjectField for Row<'_> {
}
/// Each query operator returns a tuple of [RowRef]s
#[derive(Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Tuple<'a> {
/// A pointer to a row in a base table
Row(Row<'a>),
+33
View File
@@ -20,6 +20,7 @@ use crate::{Datastore, DeltaStore, Row, Tuple};
pub enum ProjectListExecutor {
Name(PipelinedProject),
List(PipelinedExecutor, Vec<TupleField>),
Dedup(PipelinedDedup),
}
impl From<ProjectListPlan> for ProjectListExecutor {
@@ -27,6 +28,7 @@ impl From<ProjectListPlan> for ProjectListExecutor {
match plan {
ProjectListPlan::Name(plan) => Self::Name(plan.into()),
ProjectListPlan::List(plan, fields) => Self::List(plan.into(), fields),
ProjectListPlan::Dedup(plan) => Self::Dedup(PipelinedDedup::new(ProjectListExecutor::from(*plan))),
}
}
}
@@ -54,6 +56,9 @@ impl ProjectListExecutor {
f(ProductValue::from_iter(fields.iter().map(|field| t.project(field))))
})?;
}
Self::Dedup(plan) => {
plan.execute(tx, metrics, &mut |row| f(row))?;
}
}
metrics.rows_scanned += n;
metrics.bytes_scanned += bytes_scanned;
@@ -857,3 +862,31 @@ fn project(row: &impl ProjectField, field: &TupleField, bytes_scanned: &mut usiz
*bytes_scanned += value.size_of();
value
}
/// A pipelined executor for deduplication
pub struct PipelinedDedup {
pub input: Box<ProjectListExecutor>,
}
impl PipelinedDedup {
pub fn new(input: ProjectListExecutor) -> Self {
Self { input: Box::new(input) }
}
pub fn execute<Tx: Datastore + DeltaStore>(
&self,
tx: &Tx,
metrics: &mut ExecutionMetrics,
f: &mut dyn FnMut(ProductValue) -> Result<()>,
) -> Result<()> {
let mut seen = HashSet::new();
self.input.execute(tx, metrics, &mut |row| {
if seen.insert(row.clone()) {
f(row)?;
}
Ok(())
})?;
metrics.rows_scanned += seen.len();
Ok(())
}
}
+14 -13
View File
@@ -2,22 +2,22 @@ use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use crate::expr::{Expr, ProjectList, ProjectName, Relvar};
use crate::{expr::LeftDeepJoin, statement::Statement};
use spacetimedb_lib::AlgebraicType;
use spacetimedb_primitives::TableId;
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_sql_parser::ast::BinOp;
use spacetimedb_sql_parser::{
ast::{sub::SqlSelect, SqlFrom, SqlIdent, SqlJoin},
parser::sub::parse_subscription,
};
use super::{
errors::{DuplicateName, TypingError, Unresolved, Unsupported},
expr::RelExpr,
type_expr, type_proj, type_select, StatementCtx, StatementSource,
};
use crate::expr::{Expr, ProjectList, ProjectName, Relvar};
use crate::{expr::LeftDeepJoin, statement::Statement};
use spacetimedb_lib::AlgebraicType;
use spacetimedb_primitives::TableId;
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_sql_parser::ast::sql::SqlDistinct;
use spacetimedb_sql_parser::ast::BinOp;
use spacetimedb_sql_parser::{
ast::{sub::SqlSelect, SqlFrom, SqlIdent, SqlJoin},
parser::sub::parse_subscription,
};
/// The result of type checking and name resolution
pub type TypingResult<T> = core::result::Result<T, TypingError>;
@@ -140,7 +140,7 @@ impl TypeChecker for SubChecker {
filter: None,
} => {
let input = Self::type_from(from, vars, tx)?;
type_proj(input, project, vars)
type_proj(input, project, vars, SqlDistinct::No)
}
SqlSelect {
project,
@@ -148,7 +148,7 @@ impl TypeChecker for SubChecker {
filter: Some(expr),
} => {
let input = Self::type_from(from, vars, tx)?;
type_proj(type_select(input, expr, vars)?, project, vars)
type_proj(type_select(input, expr, vars)?, project, vars, SqlDistinct::No)
}
}
}
@@ -178,6 +178,7 @@ fn expect_table_type(expr: ProjectList) -> TypingResult<ProjectName> {
match expr {
ProjectList::Name(proj) => Ok(proj),
ProjectList::List(..) => Err(Unsupported::ReturnType.into()),
ProjectList::Dedup(..) => Err(Unsupported::Dedup.into()),
}
}
+2
View File
@@ -44,6 +44,8 @@ pub enum InvalidWildcard {
pub enum Unsupported {
#[error("Column projections are not supported in subscriptions; Subscriptions must return a table type")]
ReturnType,
#[error("Distinct projections are not supported in subscriptions")]
Dedup,
#[error("Unsupported expression in projection")]
ProjectExpr,
}
+6
View File
@@ -74,6 +74,7 @@ impl ProjectName {
pub enum ProjectList {
Name(ProjectName),
List(RelExpr, Vec<(Box<str>, FieldProject)>),
Dedup(Box<ProjectList>),
}
impl ProjectList {
@@ -84,6 +85,7 @@ impl ProjectList {
match self {
Self::Name(project) => project.return_table(),
Self::List(..) => None,
Self::Dedup(project) => project.return_table(),
}
}
@@ -94,6 +96,7 @@ impl ProjectList {
match self {
Self::Name(project) => project.return_table_id(),
Self::List(..) => None,
Self::Dedup(project) => project.return_table_id(),
}
}
@@ -108,6 +111,9 @@ impl ProjectList {
f(name, ty);
}
}
Self::Dedup(project) => {
project.for_each_return_field(f);
}
}
}
}
+19 -7
View File
@@ -14,6 +14,7 @@ use expr::{Expr, FieldProject, ProjectList, ProjectName, RelExpr};
use spacetimedb_lib::{from_hex_pad, AlgebraicType, AlgebraicValue, ConnectionId, Identity};
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
use spacetimedb_schema::schema::ColumnSchema;
use spacetimedb_sql_parser::ast::sql::SqlDistinct;
use spacetimedb_sql_parser::ast::{self, BinOp, ProjectElem, SqlExpr, SqlIdent, SqlLiteral};
pub mod check;
@@ -30,14 +31,19 @@ pub(crate) fn type_select(input: RelExpr, expr: SqlExpr, vars: &Relvars) -> Typi
}
/// Type check and lower a [ast::Project]
pub(crate) fn type_proj(input: RelExpr, proj: ast::Project, vars: &Relvars) -> TypingResult<ProjectList> {
match proj {
ast::Project::Star(None) if input.nfields() > 1 => Err(InvalidWildcard::Join.into()),
ast::Project::Star(None) => Ok(ProjectList::Name(ProjectName::None(input))),
pub(crate) fn type_proj(
input: RelExpr,
proj: ast::Project,
vars: &Relvars,
distinct: SqlDistinct,
) -> TypingResult<ProjectList> {
let project = match proj {
ast::Project::Star(None) if input.nfields() > 1 => return Err(InvalidWildcard::Join.into()),
ast::Project::Star(None) => ProjectList::Name(ProjectName::None(input)),
ast::Project::Star(Some(SqlIdent(var))) if input.has_field(&var) => {
Ok(ProjectList::Name(ProjectName::Some(input, var)))
ProjectList::Name(ProjectName::Some(input, var))
}
ast::Project::Star(Some(SqlIdent(var))) => Err(Unresolved::var(&var).into()),
ast::Project::Star(Some(SqlIdent(var))) => return Err(Unresolved::var(&var).into()),
ast::Project::Exprs(elems) => {
let mut projections = vec![];
let mut names = HashSet::new();
@@ -52,8 +58,14 @@ pub(crate) fn type_proj(input: RelExpr, proj: ast::Project, vars: &Relvars) -> T
}
}
Ok(ProjectList::List(input, projections))
ProjectList::List(input, projections)
}
};
if distinct == SqlDistinct::Yes {
Ok(ProjectList::Dedup(Box::new(project)))
} else {
Ok(project)
}
}
+5 -2
View File
@@ -391,17 +391,19 @@ impl TypeChecker for SqlChecker {
project,
from,
filter: None,
distinct,
} => {
let input = Self::type_from(from, vars, tx)?;
type_proj(input, project, vars)
type_proj(input, project, vars, distinct)
}
SqlSelect {
project,
from,
filter: Some(expr),
distinct,
} => {
let input = Self::type_from(from, vars, tx)?;
type_proj(type_select(input, expr, vars)?, project, vars)
type_proj(type_select(input, expr, vars)?, project, vars, distinct)
}
}
}
@@ -469,6 +471,7 @@ mod tests {
"select str from t",
"select str, arr from t",
"select t.str, arr from t",
"select distinct str from t",
] {
let result = parse_and_type_sql(sql, &tx);
assert!(result.is_ok());
+1
View File
@@ -35,6 +35,7 @@ fn compile_project_list(var: &mut impl VarLabel, expr: ProjectList) -> ProjectLi
.map(|(_, expr)| compile_field_project(var, expr))
.collect(),
),
ProjectList::Dedup(proj) => ProjectListPlan::Dedup(Box::new(compile_project_list(var, *proj))),
}
}
+37
View File
@@ -101,6 +101,7 @@ impl ProjectPlan {
pub enum ProjectListPlan {
Name(ProjectPlan),
List(PhysicalPlan, Vec<TupleField>),
Dedup(Box<ProjectListPlan>),
}
impl Deref for ProjectListPlan {
@@ -110,6 +111,7 @@ impl Deref for ProjectListPlan {
match self {
Self::Name(plan) => plan,
Self::List(plan, ..) => plan,
Self::Dedup(plan) => plan,
}
}
}
@@ -122,6 +124,7 @@ impl ProjectListPlan {
plan.optimize(fields.iter().map(|TupleField { label, .. }| label).copied().collect())?,
fields,
)),
Self::Dedup(plan) => Ok(Self::Dedup(Box::new(plan.optimize()?))),
}
}
}
@@ -275,6 +278,7 @@ impl PhysicalPlan {
// Replace the input only if there is a match
Self::Filter(Box::new(input.map_if(f, ok)?), expr)
}
_ => self,
})
}
@@ -998,6 +1002,8 @@ mod tests {
use pretty_assertions::assert_eq;
use spacetimedb_expr::check::{parse_and_type_sub, SchemaView};
use spacetimedb_expr::expr::ProjectList;
use spacetimedb_expr::statement::{compile_sql_stmt, Statement};
use spacetimedb_lib::{
db::auth::{StAccess, StTableType},
AlgebraicType, AlgebraicValue,
@@ -1780,4 +1786,35 @@ mod tests {
plan => panic!("unexpected plan: {:#?}", plan),
};
}
// Test that we `DISTINCT` is pushed down to the dedup operator
#[test]
fn distinct() {
let t_id = TableId(1);
let t = Arc::new(schema(
t_id,
"t",
&[("id", AlgebraicType::U64), ("x", AlgebraicType::U64)],
&[&[0]],
&[&[0]],
Some(0),
));
let db = SchemaViewer {
schemas: vec![t.clone()],
};
let sql = "select distinct x from t";
let lp = compile_sql_stmt(sql, &db).unwrap();
match lp.statement {
Statement::Select(plan) => {
assert!(matches!(plan, ProjectList::Dedup(..)));
}
Statement::DML(_) => {
panic!("unexpected DML");
}
}
}
}
+9
View File
@@ -48,12 +48,20 @@ impl SqlAst {
}
}
/// A DISTINCT clause in a SQL SELECT statement
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum SqlDistinct {
No,
Yes,
}
/// A SELECT statement in the SQL query language
#[derive(Debug)]
pub struct SqlSelect {
pub project: Project,
pub from: SqlFrom,
pub filter: Option<SqlExpr>,
pub distinct: SqlDistinct,
}
impl SqlSelect {
@@ -63,6 +71,7 @@ impl SqlSelect {
project: self.project.qualify_vars(alias.clone()),
filter: self.filter.map(|expr| expr.qualify_vars(alias.clone())),
from: self.from,
distinct: self.distinct,
},
SqlFrom::Join(..) => self,
}
+18 -12
View File
@@ -127,6 +127,16 @@
//! ;
//! ```
use super::{
errors::SqlUnsupported, parse_expr_opt, parse_ident, parse_literal, parse_parts, parse_projection, RelParser,
SqlParseResult,
};
use crate::ast::sql::SqlDistinct;
use crate::ast::{
sql::{SqlAst, SqlDelete, SqlInsert, SqlSelect, SqlSet, SqlShow, SqlUpdate, SqlValues},
SqlIdent,
};
use sqlparser::ast::Distinct;
use sqlparser::{
ast::{
Assignment, Expr, GroupByExpr, ObjectName, Query, Select, SetExpr, Statement, TableFactor, TableWithJoins,
@@ -136,16 +146,6 @@ use sqlparser::{
parser::Parser,
};
use crate::ast::{
sql::{SqlAst, SqlDelete, SqlInsert, SqlSelect, SqlSet, SqlShow, SqlUpdate, SqlValues},
SqlIdent,
};
use super::{
errors::SqlUnsupported, parse_expr_opt, parse_ident, parse_literal, parse_parts, parse_projection, RelParser,
SqlParseResult,
};
/// Parse a SQL string
pub fn parse_sql(sql: &str) -> SqlParseResult<SqlAst> {
let mut stmts = Parser::parse_sql(&PostgreSqlDialect {}, sql)?;
@@ -362,7 +362,7 @@ fn parse_set_op(expr: SetExpr) -> SqlParseResult<SqlSelect> {
fn parse_select(select: Select) -> SqlParseResult<SqlSelect> {
match select {
Select {
distinct: None,
distinct,
top: None,
projection,
into: None,
@@ -381,12 +381,18 @@ fn parse_select(select: Select) -> SqlParseResult<SqlSelect> {
&& cluster_by.is_empty()
&& distribute_by.is_empty()
&& sort_by.is_empty()
&& named_window.is_empty() =>
&& named_window.is_empty()
&& matches!(distinct, None | Some(Distinct::Distinct)) =>
{
Ok(SqlSelect {
project: parse_projection(projection)?,
from: SqlParser::parse_from(from)?,
filter: parse_expr_opt(selection)?,
distinct: if distinct.is_some() {
SqlDistinct::Yes
} else {
SqlDistinct::No
},
})
}
_ => Err(SqlUnsupported::feature(select).into()),