diff --git a/minisql/src/cancellation.rs b/minisql/src/cancellation.rs new file mode 100644 index 0000000..d29624c --- /dev/null +++ b/minisql/src/cancellation.rs @@ -0,0 +1,13 @@ +pub trait Cancellation { + fn is_canceled(&self) -> bool; +} + +#[cfg(test)] +pub(crate) struct DummyCancellation; + +#[cfg(test)] +impl Cancellation for DummyCancellation { + fn is_canceled(&self) -> bool { + false + } +} \ No newline at end of file diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index aaf4282..e3f9121 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -19,6 +19,7 @@ use storage_engine::segments::entry::Entry; use storage_engine::cursor::{ReadCursor, WriteCursor}; use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; use storage_engine::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex}; +use crate::cancellation::Cancellation; const METADATA_FILE: &'static str = "metadata.json"; @@ -135,25 +136,39 @@ impl State { self.tables.push(RwLock::new(table)); } - async fn select_all_rows(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection) -> DbResult { + async fn select_all_rows(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection) -> DbResult + where W: ResponseWriter, + C: Cancellation + { let mut count = 0; while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? { count += 1; let row: Row = From::from(entry); let restricted_row = row.restrict_columns(&column_selection); response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; + + if cancellation.is_canceled() { + break; + } } Ok(count) } - async fn select_eq(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult { + async fn select_eq(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult + where W: ResponseWriter, + C: Cancellation + { let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; let count = entries.len(); for entry in entries { let row: Row = From::from(entry); let restricted_row = row.restrict_columns(&column_selection); response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; + + if cancellation.is_canceled() { + break; + } } Ok(count) @@ -218,7 +233,7 @@ impl StateHandler { self.state.read().await } - pub async fn interpret(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { + pub async fn interpret(&self, response_writer: &mut W, cancellation: &C, operation: Operation) -> DbResult<()> { use Operation::*; match operation { @@ -230,8 +245,8 @@ impl StateHandler { response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; let count = match maybe_condition { - None => State::select_all_rows(&table, cursor, response_writer, column_selection).await?, - Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await? + None => State::select_all_rows(&table, cursor, response_writer, cancellation, column_selection).await?, + Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, cancellation, column_selection, eq_column, value).await? }; response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) } @@ -240,7 +255,7 @@ impl StateHandler { let mut table = state.table_at_mut(table_position).await; let mut cursor = table.write().await?; - + let entry = Entry::new(values); cursor.insert_entry(entry).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; @@ -290,6 +305,8 @@ mod tests { use crate::type_system::{DbType, IndexableValue, Value}; use std::collections::HashSet; use tokio::fs::{File, OpenOptions, DirBuilder}; + use tokio::fs; + use crate::cancellation::DummyCancellation; impl Drop for State { fn drop(&mut self) { @@ -321,14 +338,14 @@ mod tests { let mut response_writer = ResponseWriterStub::new(); state - .interpret(&mut response_writer, Operation::CreateTable(users_schema.clone())).await + .interpret(&mut response_writer, &DummyCancellation, Operation::CreateTable(users_schema.clone())).await .unwrap(); { println!("==EMPTY SELECT==="); let users_position: TablePosition = 0; state - .interpret(&mut response_writer, Operation::Select( + .interpret(&mut response_writer, &DummyCancellation, Operation::Select( users_position, users_schema.all_selection(), None, @@ -347,7 +364,7 @@ mod tests { println!("About to insert!"); state - .interpret(&mut response_writer, Operation::Insert( + .interpret(&mut response_writer, &DummyCancellation, Operation::Insert( users, vec![id.clone(), name.clone(), age.clone()], )).await @@ -357,7 +374,7 @@ mod tests { println!("==SELECT==="); let users_position: TablePosition = 0; state - .interpret(&mut response_writer, Operation::Select( + .interpret(&mut response_writer, &DummyCancellation, Operation::Select( users_position, users_schema.all_selection(), None, diff --git a/minisql/src/lib.rs b/minisql/src/lib.rs index 57953a4..97cb941 100644 --- a/minisql/src/lib.rs +++ b/minisql/src/lib.rs @@ -8,3 +8,4 @@ pub mod restricted_row; mod result; pub mod schema; pub mod type_system; +pub mod cancellation; diff --git a/server/src/cancellation.rs b/server/src/cancellation.rs index 4609f48..5519f2d 100644 --- a/server/src/cancellation.rs +++ b/server/src/cancellation.rs @@ -1,5 +1,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use minisql::cancellation::Cancellation; pub struct ResetCancelToken { is_canceled: Arc, @@ -12,10 +13,6 @@ impl ResetCancelToken { } } - pub fn is_canceled(&self) -> bool { - self.is_canceled.load(Ordering::SeqCst) - } - pub fn cancel(&self) { self.is_canceled.store(true, Ordering::SeqCst); } @@ -25,6 +22,12 @@ impl ResetCancelToken { } } +impl Cancellation for ResetCancelToken { + fn is_canceled(&self) -> bool { + self.is_canceled.load(Ordering::SeqCst) + } +} + impl Clone for ResetCancelToken { fn clone(&self) -> Self { Self { diff --git a/server/src/main.rs b/server/src/main.rs index a31ae91..c76c371 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -189,7 +189,6 @@ where parse_and_validate(query, &db_schema)? }; - // TODO: PASS DOWN RESET CANCEL TOKEN - state.interpret(writer, operation).await?; + state.interpret(writer, token, operation).await?; Ok(()) }