diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 3b7b9c90d7..b33be4386d 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -702,4 +702,39 @@ pub(crate) mod tests { Ok(()) } + + // Test `DISTINCT` keyword + #[test] + fn test_distinct() -> ResultTest<()> { + let db = TestDB::durable()?; + + let table_id = db.create_table_for_test( + "T", + &[("id", AlgebraicType::U8), ("pos", AlgebraicType::U8)], + &[ColId(0)], + )?; + db.with_auto_commit(Workload::ForTests, |tx| -> Result<_, DBError> { + for i in 0..5u8 { + insert(&db, tx, table_id, &product!(i, 0u8))?; + } + Ok(()) + })?; + + let result = run_for_testing(&db, "SELECT DISTINCT * FROM T")?; + assert_eq!( + result, + vec![ + product![0u8, 0u8], + product![1u8, 0u8], + product![2u8, 0u8], + product![3u8, 0u8], + product![4u8, 0u8] + ] + ); + + let result = run_for_testing(&db, "SELECT DISTINCT pos FROM T WHERE id > 2")?; + assert_eq!(result, vec![product![0u8]]); + + Ok(()) + } } diff --git a/crates/execution/src/iter.rs b/crates/execution/src/iter.rs index a6fc8f4846..66b31a7f9a 100644 --- a/crates/execution/src/iter.rs +++ b/crates/execution/src/iter.rs @@ -1,5 +1,6 @@ use std::collections::{HashMap, HashSet}; +use crate::{Datastore, DeltaScanIter, DeltaStore, Row, Tuple}; use anyhow::{anyhow, bail, Result}; use spacetimedb_lib::{query::Delta, AlgebraicValue, ProductValue}; use spacetimedb_physical_plan::plan::{ @@ -11,8 +12,6 @@ use spacetimedb_table::{ table_index::{TableIndex, TableIndexPointIter}, }; -use crate::{Datastore, DeltaScanIter, DeltaStore, Row, Tuple}; - /// The different iterators for evaluating query plans pub enum PlanIter<'a> { Table(TableScanIter<'a>), @@ -75,6 +74,7 @@ pub enum Iter<'a> { Row(RowRefIter<'a>), Join(LeftDeepJoinIter<'a>), Filter(Filter<'a, Iter<'a>>), + Dedup(Dedup<'a>), } impl<'a> Iterator for Iter<'a> { @@ -85,6 +85,7 @@ impl<'a> Iterator for Iter<'a> { Self::Row(iter) => iter.next().map(Tuple::Row), Self::Join(iter) => iter.next(), Self::Filter(iter) => iter.next(), + Self::Dedup(iter) => iter.next(), } } } @@ -1069,3 +1070,28 @@ impl<'a> Iterator for Filter<'a, Iter<'a>> { self.input.find(|tuple| self.expr.eval_bool(tuple)) } } + +/// A tuple at a time deduplication iterator +pub struct Dedup<'a> { + /// The input iterator + input: Box>, + /// The set of seen row ids + seen: HashSet>, +} + +impl<'a> Dedup<'a> { + pub fn new(input: Box>) -> Self { + Self { + input, + seen: HashSet::new(), + } + } +} + +impl<'a> Iterator for Dedup<'a> { + type Item = Tuple<'a>; + + fn next(&mut self) -> Option { + self.input.find(|tuple| self.seen.insert(tuple.clone())) + } +} diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index 62ea8dcca9..679ac3eb6e 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -95,7 +95,7 @@ pub trait DeltaStore { } } -#[derive(Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Row<'a> { Ptr(RowRef<'a>), Ref(&'a ProductValue), @@ -148,7 +148,7 @@ impl ProjectField for Row<'_> { } /// Each query operator returns a tuple of [RowRef]s -#[derive(Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Tuple<'a> { /// A pointer to a row in a base table Row(Row<'a>), diff --git a/crates/execution/src/pipelined.rs b/crates/execution/src/pipelined.rs index c6142ae574..b439c27d88 100644 --- a/crates/execution/src/pipelined.rs +++ b/crates/execution/src/pipelined.rs @@ -20,6 +20,7 @@ use crate::{Datastore, DeltaStore, Row, Tuple}; pub enum ProjectListExecutor { Name(PipelinedProject), List(PipelinedExecutor, Vec), + Dedup(PipelinedDedup), } impl From for ProjectListExecutor { @@ -27,6 +28,7 @@ impl From for ProjectListExecutor { match plan { ProjectListPlan::Name(plan) => Self::Name(plan.into()), ProjectListPlan::List(plan, fields) => Self::List(plan.into(), fields), + ProjectListPlan::Dedup(plan) => Self::Dedup(PipelinedDedup::new(ProjectListExecutor::from(*plan))), } } } @@ -54,6 +56,9 @@ impl ProjectListExecutor { f(ProductValue::from_iter(fields.iter().map(|field| t.project(field)))) })?; } + Self::Dedup(plan) => { + plan.execute(tx, metrics, &mut |row| f(row))?; + } } metrics.rows_scanned += n; metrics.bytes_scanned += bytes_scanned; @@ -857,3 +862,31 @@ fn project(row: &impl ProjectField, field: &TupleField, bytes_scanned: &mut usiz *bytes_scanned += value.size_of(); value } + +/// A pipelined executor for deduplication +pub struct PipelinedDedup { + pub input: Box, +} + +impl PipelinedDedup { + pub fn new(input: ProjectListExecutor) -> Self { + Self { input: Box::new(input) } + } + + pub fn execute( + &self, + tx: &Tx, + metrics: &mut ExecutionMetrics, + f: &mut dyn FnMut(ProductValue) -> Result<()>, + ) -> Result<()> { + let mut seen = HashSet::new(); + self.input.execute(tx, metrics, &mut |row| { + if seen.insert(row.clone()) { + f(row)?; + } + Ok(()) + })?; + metrics.rows_scanned += seen.len(); + Ok(()) + } +} diff --git a/crates/expr/src/check.rs b/crates/expr/src/check.rs index c147112174..68fe62ca35 100644 --- a/crates/expr/src/check.rs +++ b/crates/expr/src/check.rs @@ -2,22 +2,22 @@ use std::collections::HashMap; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -use crate::expr::{Expr, ProjectList, ProjectName, Relvar}; -use crate::{expr::LeftDeepJoin, statement::Statement}; -use spacetimedb_lib::AlgebraicType; -use spacetimedb_primitives::TableId; -use spacetimedb_schema::schema::TableSchema; -use spacetimedb_sql_parser::ast::BinOp; -use spacetimedb_sql_parser::{ - ast::{sub::SqlSelect, SqlFrom, SqlIdent, SqlJoin}, - parser::sub::parse_subscription, -}; - use super::{ errors::{DuplicateName, TypingError, Unresolved, Unsupported}, expr::RelExpr, type_expr, type_proj, type_select, StatementCtx, StatementSource, }; +use crate::expr::{Expr, ProjectList, ProjectName, Relvar}; +use crate::{expr::LeftDeepJoin, statement::Statement}; +use spacetimedb_lib::AlgebraicType; +use spacetimedb_primitives::TableId; +use spacetimedb_schema::schema::TableSchema; +use spacetimedb_sql_parser::ast::sql::SqlDistinct; +use spacetimedb_sql_parser::ast::BinOp; +use spacetimedb_sql_parser::{ + ast::{sub::SqlSelect, SqlFrom, SqlIdent, SqlJoin}, + parser::sub::parse_subscription, +}; /// The result of type checking and name resolution pub type TypingResult = core::result::Result; @@ -140,7 +140,7 @@ impl TypeChecker for SubChecker { filter: None, } => { let input = Self::type_from(from, vars, tx)?; - type_proj(input, project, vars) + type_proj(input, project, vars, SqlDistinct::No) } SqlSelect { project, @@ -148,7 +148,7 @@ impl TypeChecker for SubChecker { filter: Some(expr), } => { let input = Self::type_from(from, vars, tx)?; - type_proj(type_select(input, expr, vars)?, project, vars) + type_proj(type_select(input, expr, vars)?, project, vars, SqlDistinct::No) } } } @@ -178,6 +178,7 @@ fn expect_table_type(expr: ProjectList) -> TypingResult { match expr { ProjectList::Name(proj) => Ok(proj), ProjectList::List(..) => Err(Unsupported::ReturnType.into()), + ProjectList::Dedup(..) => Err(Unsupported::Dedup.into()), } } diff --git a/crates/expr/src/errors.rs b/crates/expr/src/errors.rs index c523fb9452..40e797825f 100644 --- a/crates/expr/src/errors.rs +++ b/crates/expr/src/errors.rs @@ -44,6 +44,8 @@ pub enum InvalidWildcard { pub enum Unsupported { #[error("Column projections are not supported in subscriptions; Subscriptions must return a table type")] ReturnType, + #[error("Distinct projections are not supported in subscriptions")] + Dedup, #[error("Unsupported expression in projection")] ProjectExpr, } diff --git a/crates/expr/src/expr.rs b/crates/expr/src/expr.rs index 5578650c7e..c1f76e3381 100644 --- a/crates/expr/src/expr.rs +++ b/crates/expr/src/expr.rs @@ -74,6 +74,7 @@ impl ProjectName { pub enum ProjectList { Name(ProjectName), List(RelExpr, Vec<(Box, FieldProject)>), + Dedup(Box), } impl ProjectList { @@ -84,6 +85,7 @@ impl ProjectList { match self { Self::Name(project) => project.return_table(), Self::List(..) => None, + Self::Dedup(project) => project.return_table(), } } @@ -94,6 +96,7 @@ impl ProjectList { match self { Self::Name(project) => project.return_table_id(), Self::List(..) => None, + Self::Dedup(project) => project.return_table_id(), } } @@ -108,6 +111,9 @@ impl ProjectList { f(name, ty); } } + Self::Dedup(project) => { + project.for_each_return_field(f); + } } } } diff --git a/crates/expr/src/lib.rs b/crates/expr/src/lib.rs index 3a45221db4..e49e1f3139 100644 --- a/crates/expr/src/lib.rs +++ b/crates/expr/src/lib.rs @@ -14,6 +14,7 @@ use expr::{Expr, FieldProject, ProjectList, ProjectName, RelExpr}; use spacetimedb_lib::{from_hex_pad, AlgebraicType, AlgebraicValue, ConnectionId, Identity}; use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_schema::schema::ColumnSchema; +use spacetimedb_sql_parser::ast::sql::SqlDistinct; use spacetimedb_sql_parser::ast::{self, BinOp, ProjectElem, SqlExpr, SqlIdent, SqlLiteral}; pub mod check; @@ -30,14 +31,19 @@ pub(crate) fn type_select(input: RelExpr, expr: SqlExpr, vars: &Relvars) -> Typi } /// Type check and lower a [ast::Project] -pub(crate) fn type_proj(input: RelExpr, proj: ast::Project, vars: &Relvars) -> TypingResult { - match proj { - ast::Project::Star(None) if input.nfields() > 1 => Err(InvalidWildcard::Join.into()), - ast::Project::Star(None) => Ok(ProjectList::Name(ProjectName::None(input))), +pub(crate) fn type_proj( + input: RelExpr, + proj: ast::Project, + vars: &Relvars, + distinct: SqlDistinct, +) -> TypingResult { + let project = match proj { + ast::Project::Star(None) if input.nfields() > 1 => return Err(InvalidWildcard::Join.into()), + ast::Project::Star(None) => ProjectList::Name(ProjectName::None(input)), ast::Project::Star(Some(SqlIdent(var))) if input.has_field(&var) => { - Ok(ProjectList::Name(ProjectName::Some(input, var))) + ProjectList::Name(ProjectName::Some(input, var)) } - ast::Project::Star(Some(SqlIdent(var))) => Err(Unresolved::var(&var).into()), + ast::Project::Star(Some(SqlIdent(var))) => return Err(Unresolved::var(&var).into()), ast::Project::Exprs(elems) => { let mut projections = vec![]; let mut names = HashSet::new(); @@ -52,8 +58,14 @@ pub(crate) fn type_proj(input: RelExpr, proj: ast::Project, vars: &Relvars) -> T } } - Ok(ProjectList::List(input, projections)) + ProjectList::List(input, projections) } + }; + + if distinct == SqlDistinct::Yes { + Ok(ProjectList::Dedup(Box::new(project))) + } else { + Ok(project) } } diff --git a/crates/expr/src/statement.rs b/crates/expr/src/statement.rs index 90e3f61ecd..862ea8b081 100644 --- a/crates/expr/src/statement.rs +++ b/crates/expr/src/statement.rs @@ -391,17 +391,19 @@ impl TypeChecker for SqlChecker { project, from, filter: None, + distinct, } => { let input = Self::type_from(from, vars, tx)?; - type_proj(input, project, vars) + type_proj(input, project, vars, distinct) } SqlSelect { project, from, filter: Some(expr), + distinct, } => { let input = Self::type_from(from, vars, tx)?; - type_proj(type_select(input, expr, vars)?, project, vars) + type_proj(type_select(input, expr, vars)?, project, vars, distinct) } } } @@ -469,6 +471,7 @@ mod tests { "select str from t", "select str, arr from t", "select t.str, arr from t", + "select distinct str from t", ] { let result = parse_and_type_sql(sql, &tx); assert!(result.is_ok()); diff --git a/crates/physical-plan/src/compile.rs b/crates/physical-plan/src/compile.rs index bcc211739e..92449e74b0 100644 --- a/crates/physical-plan/src/compile.rs +++ b/crates/physical-plan/src/compile.rs @@ -35,6 +35,7 @@ fn compile_project_list(var: &mut impl VarLabel, expr: ProjectList) -> ProjectLi .map(|(_, expr)| compile_field_project(var, expr)) .collect(), ), + ProjectList::Dedup(proj) => ProjectListPlan::Dedup(Box::new(compile_project_list(var, *proj))), } } diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index 027eb47d4d..3ca50ce24a 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -101,6 +101,7 @@ impl ProjectPlan { pub enum ProjectListPlan { Name(ProjectPlan), List(PhysicalPlan, Vec), + Dedup(Box), } impl Deref for ProjectListPlan { @@ -110,6 +111,7 @@ impl Deref for ProjectListPlan { match self { Self::Name(plan) => plan, Self::List(plan, ..) => plan, + Self::Dedup(plan) => plan, } } } @@ -122,6 +124,7 @@ impl ProjectListPlan { plan.optimize(fields.iter().map(|TupleField { label, .. }| label).copied().collect())?, fields, )), + Self::Dedup(plan) => Ok(Self::Dedup(Box::new(plan.optimize()?))), } } } @@ -275,6 +278,7 @@ impl PhysicalPlan { // Replace the input only if there is a match Self::Filter(Box::new(input.map_if(f, ok)?), expr) } + _ => self, }) } @@ -998,6 +1002,8 @@ mod tests { use pretty_assertions::assert_eq; use spacetimedb_expr::check::{parse_and_type_sub, SchemaView}; + use spacetimedb_expr::expr::ProjectList; + use spacetimedb_expr::statement::{compile_sql_stmt, Statement}; use spacetimedb_lib::{ db::auth::{StAccess, StTableType}, AlgebraicType, AlgebraicValue, @@ -1780,4 +1786,35 @@ mod tests { plan => panic!("unexpected plan: {:#?}", plan), }; } + + // Test that we `DISTINCT` is pushed down to the dedup operator + #[test] + fn distinct() { + let t_id = TableId(1); + + let t = Arc::new(schema( + t_id, + "t", + &[("id", AlgebraicType::U64), ("x", AlgebraicType::U64)], + &[&[0]], + &[&[0]], + Some(0), + )); + + let db = SchemaViewer { + schemas: vec![t.clone()], + }; + + let sql = "select distinct x from t"; + let lp = compile_sql_stmt(sql, &db).unwrap(); + + match lp.statement { + Statement::Select(plan) => { + assert!(matches!(plan, ProjectList::Dedup(..))); + } + Statement::DML(_) => { + panic!("unexpected DML"); + } + } + } } diff --git a/crates/sql-parser/src/ast/sql.rs b/crates/sql-parser/src/ast/sql.rs index 0b74708049..cf38fac678 100644 --- a/crates/sql-parser/src/ast/sql.rs +++ b/crates/sql-parser/src/ast/sql.rs @@ -48,12 +48,20 @@ impl SqlAst { } } +/// A DISTINCT clause in a SQL SELECT statement +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum SqlDistinct { + No, + Yes, +} + /// A SELECT statement in the SQL query language #[derive(Debug)] pub struct SqlSelect { pub project: Project, pub from: SqlFrom, pub filter: Option, + pub distinct: SqlDistinct, } impl SqlSelect { @@ -63,6 +71,7 @@ impl SqlSelect { project: self.project.qualify_vars(alias.clone()), filter: self.filter.map(|expr| expr.qualify_vars(alias.clone())), from: self.from, + distinct: self.distinct, }, SqlFrom::Join(..) => self, } diff --git a/crates/sql-parser/src/parser/sql.rs b/crates/sql-parser/src/parser/sql.rs index 3c93063fd1..d5ef9e3a8a 100644 --- a/crates/sql-parser/src/parser/sql.rs +++ b/crates/sql-parser/src/parser/sql.rs @@ -127,6 +127,16 @@ //! ; //! ``` +use super::{ + errors::SqlUnsupported, parse_expr_opt, parse_ident, parse_literal, parse_parts, parse_projection, RelParser, + SqlParseResult, +}; +use crate::ast::sql::SqlDistinct; +use crate::ast::{ + sql::{SqlAst, SqlDelete, SqlInsert, SqlSelect, SqlSet, SqlShow, SqlUpdate, SqlValues}, + SqlIdent, +}; +use sqlparser::ast::Distinct; use sqlparser::{ ast::{ Assignment, Expr, GroupByExpr, ObjectName, Query, Select, SetExpr, Statement, TableFactor, TableWithJoins, @@ -136,16 +146,6 @@ use sqlparser::{ parser::Parser, }; -use crate::ast::{ - sql::{SqlAst, SqlDelete, SqlInsert, SqlSelect, SqlSet, SqlShow, SqlUpdate, SqlValues}, - SqlIdent, -}; - -use super::{ - errors::SqlUnsupported, parse_expr_opt, parse_ident, parse_literal, parse_parts, parse_projection, RelParser, - SqlParseResult, -}; - /// Parse a SQL string pub fn parse_sql(sql: &str) -> SqlParseResult { let mut stmts = Parser::parse_sql(&PostgreSqlDialect {}, sql)?; @@ -362,7 +362,7 @@ fn parse_set_op(expr: SetExpr) -> SqlParseResult { fn parse_select(select: Select) -> SqlParseResult { match select { Select { - distinct: None, + distinct, top: None, projection, into: None, @@ -381,12 +381,18 @@ fn parse_select(select: Select) -> SqlParseResult { && cluster_by.is_empty() && distribute_by.is_empty() && sort_by.is_empty() - && named_window.is_empty() => + && named_window.is_empty() + && matches!(distinct, None | Some(Distinct::Distinct)) => { Ok(SqlSelect { project: parse_projection(projection)?, from: SqlParser::parse_from(from)?, filter: parse_expr_opt(selection)?, + distinct: if distinct.is_some() { + SqlDistinct::Yes + } else { + SqlDistinct::No + }, }) } _ => Err(SqlUnsupported::feature(select).into()),