feat: pass cancellation token to interpreter

This commit is contained in:
Jindřich Moravec 2024-02-05 22:13:48 +01:00
parent f5d45f6a1d
commit eeb34a51ce
5 changed files with 45 additions and 13 deletions

View file

@ -0,0 +1,13 @@
pub trait Cancellation {
fn is_canceled(&self) -> bool;
}
#[cfg(test)]
pub(crate) struct DummyCancellation;
#[cfg(test)]
impl Cancellation for DummyCancellation {
fn is_canceled(&self) -> bool {
false
}
}

View file

@ -18,6 +18,7 @@ use storage_engine::store::Store;
use storage_engine::cursor::{ReadCursor, WriteCursor};
use storage_engine::cursor_capabilities::traversal::CursorCanTraverse;
use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex;
use crate::cancellation::Cancellation;
const METADATA_FILE: &'static str = "metadata.json";
@ -134,25 +135,39 @@ impl State {
self.tables.push(RwLock::new(table));
}
async fn select_all_rows<Writer: ResponseWriter>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection) -> DbResult<usize> {
async fn select_all_rows<W, C>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection) -> DbResult<usize>
where W: ResponseWriter,
C: Cancellation
{
let mut count = 0;
while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? {
count += 1;
let row: Row = From::from(entry);
let restricted_row = row.restrict_columns(&column_selection);
response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?;
if cancellation.is_canceled() {
break;
}
}
Ok(count)
}
async fn select_eq<Writer: ResponseWriter>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult<usize> {
async fn select_eq<W, C>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult<usize>
where W: ResponseWriter,
C: Cancellation
{
let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
let count = entries.len();
for entry in entries {
let row: Row = From::from(entry);
let restricted_row = row.restrict_columns(&column_selection);
response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?;
if cancellation.is_canceled() {
break;
}
}
Ok(count)
@ -204,7 +219,7 @@ impl StateHandler {
self.state.read().await
}
pub async fn interpret<Writer: ResponseWriter>(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> {
pub async fn interpret<W: ResponseWriter, C: Cancellation>(&self, response_writer: &mut W, cancellation: &C, operation: Operation) -> DbResult<()> {
use Operation::*;
match operation {
@ -216,8 +231,8 @@ impl StateHandler {
response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?;
let count = match maybe_condition {
None => State::select_all_rows(&table, cursor, response_writer, column_selection).await?,
Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await?
None => State::select_all_rows(&table, cursor, response_writer, cancellation, column_selection).await?,
Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, cancellation, column_selection, eq_column, value).await?
};
response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e))
}
@ -259,6 +274,7 @@ mod tests {
use std::collections::HashSet;
use tokio::fs::{File, OpenOptions, DirBuilder};
use tokio::fs;
use crate::cancellation::DummyCancellation;
impl Drop for State {
fn drop(&mut self) {
@ -291,12 +307,12 @@ mod tests {
let mut response_writer = ResponseWriterStub::new();
state
.interpret(&mut response_writer, Operation::CreateTable(users_schema.clone())).await
.interpret(&mut response_writer, &DummyCancellation, Operation::CreateTable(users_schema.clone())).await
.unwrap();
let users_position: TablePosition = 0;
state
.interpret(&mut response_writer, Operation::Select(
.interpret(&mut response_writer, &DummyCancellation, Operation::Select(
users_position,
users_schema.all_selection(),
None,

View file

@ -8,3 +8,4 @@ pub mod restricted_row;
mod result;
pub mod schema;
pub mod type_system;
pub mod cancellation;

View file

@ -1,5 +1,6 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use minisql::cancellation::Cancellation;
pub struct ResetCancelToken {
is_canceled: Arc<AtomicBool>,
@ -12,10 +13,6 @@ impl ResetCancelToken {
}
}
pub fn is_canceled(&self) -> bool {
self.is_canceled.load(Ordering::SeqCst)
}
pub fn cancel(&self) {
self.is_canceled.store(true, Ordering::SeqCst);
}
@ -25,6 +22,12 @@ impl ResetCancelToken {
}
}
impl Cancellation for ResetCancelToken {
fn is_canceled(&self) -> bool {
self.is_canceled.load(Ordering::SeqCst)
}
}
impl Clone for ResetCancelToken {
fn clone(&self) -> Self {
Self {

View file

@ -189,7 +189,6 @@ where
parse_and_validate(query, &db_schema)?
};
// TODO: PASS DOWN RESET CANCEL TOKEN
state.interpret(writer, operation).await?;
state.interpret(writer, token, operation).await?;
Ok(())
}