Merge branch 'interpreter2-misc' into 'interpreter-to-storage-engine'

Misc 2: Electring Boogaloo **Cancellation Token**

See merge request x433485/minisql!36
This commit is contained in:
Jindřich Moravec 2024-02-05 22:24:54 +01:00
commit b2b99f7614
5 changed files with 49 additions and 16 deletions

View file

@ -0,0 +1,13 @@
pub trait Cancellation {
fn is_canceled(&self) -> bool;
}
#[cfg(test)]
pub(crate) struct DummyCancellation;
#[cfg(test)]
impl Cancellation for DummyCancellation {
fn is_canceled(&self) -> bool {
false
}
}

View file

@ -19,6 +19,7 @@ use storage_engine::segments::entry::Entry;
use storage_engine::cursor::{ReadCursor, WriteCursor}; use storage_engine::cursor::{ReadCursor, WriteCursor};
use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; use storage_engine::cursor_capabilities::traversal::CursorCanTraverse;
use storage_engine::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex}; use storage_engine::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex};
use crate::cancellation::Cancellation;
const METADATA_FILE: &'static str = "metadata.json"; const METADATA_FILE: &'static str = "metadata.json";
@ -135,25 +136,39 @@ impl State {
self.tables.push(RwLock::new(table)); self.tables.push(RwLock::new(table));
} }
async fn select_all_rows<Writer: ResponseWriter>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection) -> DbResult<usize> { async fn select_all_rows<W, C>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection) -> DbResult<usize>
where W: ResponseWriter,
C: Cancellation
{
let mut count = 0; let mut count = 0;
while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? { while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? {
count += 1; count += 1;
let row: Row = From::from(entry); let row: Row = From::from(entry);
let restricted_row = row.restrict_columns(&column_selection); let restricted_row = row.restrict_columns(&column_selection);
response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?;
if cancellation.is_canceled() {
break;
}
} }
Ok(count) Ok(count)
} }
async fn select_eq<Writer: ResponseWriter>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult<usize> { async fn select_eq<W, C>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult<usize>
where W: ResponseWriter,
C: Cancellation
{
let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
let count = entries.len(); let count = entries.len();
for entry in entries { for entry in entries {
let row: Row = From::from(entry); let row: Row = From::from(entry);
let restricted_row = row.restrict_columns(&column_selection); let restricted_row = row.restrict_columns(&column_selection);
response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?;
if cancellation.is_canceled() {
break;
}
} }
Ok(count) Ok(count)
@ -218,7 +233,7 @@ impl StateHandler {
self.state.read().await self.state.read().await
} }
pub async fn interpret<Writer: ResponseWriter>(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { pub async fn interpret<W: ResponseWriter, C: Cancellation>(&self, response_writer: &mut W, cancellation: &C, operation: Operation) -> DbResult<()> {
use Operation::*; use Operation::*;
match operation { match operation {
@ -230,8 +245,8 @@ impl StateHandler {
response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?;
let count = match maybe_condition { let count = match maybe_condition {
None => State::select_all_rows(&table, cursor, response_writer, column_selection).await?, None => State::select_all_rows(&table, cursor, response_writer, cancellation, column_selection).await?,
Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await? Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, cancellation, column_selection, eq_column, value).await?
}; };
response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e))
} }
@ -290,6 +305,8 @@ mod tests {
use crate::type_system::{DbType, IndexableValue, Value}; use crate::type_system::{DbType, IndexableValue, Value};
use std::collections::HashSet; use std::collections::HashSet;
use tokio::fs::{File, OpenOptions, DirBuilder}; use tokio::fs::{File, OpenOptions, DirBuilder};
use tokio::fs;
use crate::cancellation::DummyCancellation;
impl Drop for State { impl Drop for State {
fn drop(&mut self) { fn drop(&mut self) {
@ -321,14 +338,14 @@ mod tests {
let mut response_writer = ResponseWriterStub::new(); let mut response_writer = ResponseWriterStub::new();
state state
.interpret(&mut response_writer, Operation::CreateTable(users_schema.clone())).await .interpret(&mut response_writer, &DummyCancellation, Operation::CreateTable(users_schema.clone())).await
.unwrap(); .unwrap();
{ {
println!("==EMPTY SELECT==="); println!("==EMPTY SELECT===");
let users_position: TablePosition = 0; let users_position: TablePosition = 0;
state state
.interpret(&mut response_writer, Operation::Select( .interpret(&mut response_writer, &DummyCancellation, Operation::Select(
users_position, users_position,
users_schema.all_selection(), users_schema.all_selection(),
None, None,
@ -347,7 +364,7 @@ mod tests {
println!("About to insert!"); println!("About to insert!");
state state
.interpret(&mut response_writer, Operation::Insert( .interpret(&mut response_writer, &DummyCancellation, Operation::Insert(
users, users,
vec![id.clone(), name.clone(), age.clone()], vec![id.clone(), name.clone(), age.clone()],
)).await )).await
@ -357,7 +374,7 @@ mod tests {
println!("==SELECT==="); println!("==SELECT===");
let users_position: TablePosition = 0; let users_position: TablePosition = 0;
state state
.interpret(&mut response_writer, Operation::Select( .interpret(&mut response_writer, &DummyCancellation, Operation::Select(
users_position, users_position,
users_schema.all_selection(), users_schema.all_selection(),
None, None,

View file

@ -8,3 +8,4 @@ pub mod restricted_row;
mod result; mod result;
pub mod schema; pub mod schema;
pub mod type_system; pub mod type_system;
pub mod cancellation;

View file

@ -1,5 +1,6 @@
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use minisql::cancellation::Cancellation;
pub struct ResetCancelToken { pub struct ResetCancelToken {
is_canceled: Arc<AtomicBool>, is_canceled: Arc<AtomicBool>,
@ -12,10 +13,6 @@ impl ResetCancelToken {
} }
} }
pub fn is_canceled(&self) -> bool {
self.is_canceled.load(Ordering::SeqCst)
}
pub fn cancel(&self) { pub fn cancel(&self) {
self.is_canceled.store(true, Ordering::SeqCst); self.is_canceled.store(true, Ordering::SeqCst);
} }
@ -25,6 +22,12 @@ impl ResetCancelToken {
} }
} }
impl Cancellation for ResetCancelToken {
fn is_canceled(&self) -> bool {
self.is_canceled.load(Ordering::SeqCst)
}
}
impl Clone for ResetCancelToken { impl Clone for ResetCancelToken {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {

View file

@ -189,7 +189,6 @@ where
parse_and_validate(query, &db_schema)? parse_and_validate(query, &db_schema)?
}; };
// TODO: PASS DOWN RESET CANCEL TOKEN state.interpret(writer, token, operation).await?;
state.interpret(writer, operation).await?;
Ok(()) Ok(())
} }