From 997162c555b31c1ae55559011272a78ddf0e1597 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 21:46:18 +0100 Subject: [PATCH 1/5] Prepare for delete --- minisql/src/interpreter2.rs | 88 ++++++++++++++++++++++++---- minisql/src/response_writer.rs | 2 +- storage_engine/src/cursor.rs | 10 ++++ storage_engine/src/segments/entry.rs | 4 -- 4 files changed, 86 insertions(+), 18 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 7004b1f..9ca3de1 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -14,9 +14,10 @@ use std::rc::Rc; use tokio::fs; use storage_engine::store::Store; +use storage_engine::segments::entry::Entry; use storage_engine::cursor::{ReadCursor, WriteCursor}; use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; -use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex; +use storage_engine::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex}; // ==============Interpreter================ #[derive(Debug)] @@ -140,6 +141,20 @@ impl State { Ok(count) } + + async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>, response_writer: &mut Writer) -> DbResult { + // cursor. + let count = cursor.delete_all_entries(true) + .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; + Ok(count) + } + + async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, response_writer: &mut Writer, eq_column: Column, value: Value) -> DbResult { + let count = + cursor.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true) + .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; + Ok(count) + } } impl StateHandler { @@ -176,14 +191,26 @@ impl StateHandler { } Insert(table_position, values) => { let state = self.state.read().await; - todo!() + let mut table = state.table_at_mut(table_position).await; + let mut cursor = table.write().await?; + + let entry = Entry::new(values); + cursor.insert_entry(entry).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; + + response_writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await.map_err(|e| RuntimeError::AnyhowError(e)) } Delete(table_position, maybe_condition) => { let state = self.state.read().await; let mut table = state.table_at_mut(table_position).await; - let cursor = table.write().await?; + let table_name = table.table_name().clone(); + let mut cursor = table.write().await?; - todo!() + let count = match maybe_condition { + None => State::delete_all_rows(table_name, cursor, response_writer).await?, + Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, response_writer, eq_column, value).await? + }; + + response_writer.write_command_complete(CompleteStatus::Delete(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) } CreateTable(table_schema) => { let mut state = self.state.write().await; @@ -247,16 +274,51 @@ mod tests { .interpret(&mut response_writer, Operation::CreateTable(users_schema.clone())).await .unwrap(); - let users_position: TablePosition = 0; - state - .interpret(&mut response_writer, Operation::Select( - users_position, - users_schema.all_selection(), - None, - )).await - .unwrap(); + { + println!("==EMPTY SELECT==="); + let users_position: TablePosition = 0; + state + .interpret(&mut response_writer, Operation::Select( + users_position, + users_schema.all_selection(), + None, + )).await + .unwrap(); + } - // assert!(false); + { + let users = 0; + let (id, name, age) = ( + Value::Uuid(0), + Value::String("Plato".to_string()), + Value::Int(64), + ); + + println!("About to insert!"); + + state + .interpret(&mut response_writer, Operation::Insert( + users, + vec![id.clone(), name.clone(), age.clone()], + )).await + .unwrap(); + } + { + println!("==SELECT==="); + let users_position: TablePosition = 0; + state + .interpret(&mut response_writer, Operation::Select( + users_position, + users_schema.all_selection(), + None, + )).await + .unwrap(); + } + + + + + assert!(false); // assert!(state.tables.len() == 1); // let table = &state.tables[0]; diff --git a/minisql/src/response_writer.rs b/minisql/src/response_writer.rs index 30158c5..240a2f8 100644 --- a/minisql/src/response_writer.rs +++ b/minisql/src/response_writer.rs @@ -65,7 +65,7 @@ impl ResponseWriter for ResponseWriterStub async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()> { for (_, value) in row.iter() { - print!("{:?}", value) + print!("{:?}, ", value) } println!(); diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 2bc61c1..43b0ca5 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -244,6 +244,16 @@ impl <'cursor, T> WriteCursor<'cursor, T> Ok(count) } + pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result + where T: Encode + Decode + Ord + Send + Sync + Clone + { + let count = todo!(); + if enable_garbage_collector { + self.attempt_garbage_collection_if_necessary().await?; + } + Ok(count) + } + pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result where T: Encode + Decode + Ord + Send + Sync + Clone { diff --git a/storage_engine/src/segments/entry.rs b/storage_engine/src/segments/entry.rs index 64f6381..a2f8bc8 100644 --- a/storage_engine/src/segments/entry.rs +++ b/storage_engine/src/segments/entry.rs @@ -23,10 +23,6 @@ impl Entry { Self { header: EntryHeader { is_deleted: false }, data } } - pub fn new_deleted(data: Vec) -> Self { - Self { header: EntryHeader { is_deleted: true}, data } - } - // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] pub fn encode(&self) -> Result> where T: Encode From 9af6ad90f36faa60ffbb5e6f15432cf70f45cd8a Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 21:50:42 +0100 Subject: [PATCH 2/5] Implement delete --- minisql/src/interpreter2.rs | 9 ++++----- storage_engine/src/cursor.rs | 7 ++++++- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 9ca3de1..5105597 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -142,14 +142,13 @@ impl State { Ok(count) } - async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>, response_writer: &mut Writer) -> DbResult { - // cursor. + async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>) -> DbResult { let count = cursor.delete_all_entries(true) .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; Ok(count) } - async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, response_writer: &mut Writer, eq_column: Column, value: Value) -> DbResult { + async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, eq_column: Column, value: Value) -> DbResult { let count = cursor.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true) .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; @@ -206,8 +205,8 @@ impl StateHandler { let mut cursor = table.write().await?; let count = match maybe_condition { - None => State::delete_all_rows(table_name, cursor, response_writer).await?, - Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, response_writer, eq_column, value).await? + None => State::delete_all_rows(table_name, cursor).await?, + Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, eq_column, value).await? }; response_writer.write_command_complete(CompleteStatus::Delete(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 43b0ca5..5c76577 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -247,7 +247,12 @@ impl <'cursor, T> WriteCursor<'cursor, T> pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result where T: Encode + Decode + Ord + Send + Sync + Clone { - let count = todo!(); + let mut count = 0; + while let Some(entry) = self.next_alive().await? { + count += 1; + self.mark_deleted_at(entry.file_position, false).await? + } + if enable_garbage_collector { self.attempt_garbage_collection_if_necessary().await?; } From 48442cdd9e2db5000151b99779d2d0d64a9fd10b Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 21:54:20 +0100 Subject: [PATCH 3/5] Attach index --- minisql/src/interpreter2.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 5105597..043e241 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -190,6 +190,7 @@ impl StateHandler { } Insert(table_position, values) => { let state = self.state.read().await; + let mut table = state.table_at_mut(table_position).await; let mut cursor = table.write().await?; @@ -200,9 +201,10 @@ impl StateHandler { } Delete(table_position, maybe_condition) => { let state = self.state.read().await; + let mut table = state.table_at_mut(table_position).await; let table_name = table.table_name().clone(); - let mut cursor = table.write().await?; + let cursor = table.write().await?; let count = match maybe_condition { None => State::delete_all_rows(table_name, cursor).await?, @@ -219,7 +221,11 @@ impl StateHandler { } CreateIndex(table_position, column) => { let state = self.state.read().await; - todo!() + + let mut table = state.table_at_mut(table_position).await; + let mut cursor = table.write().await?; + cursor.attach_index(column as storage_engine::store::Column).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; + response_writer.write_command_complete(CompleteStatus::CreateIndex).await.map_err(|e| RuntimeError::AnyhowError(e)) } } } From d28fc85228d3bf9c39500120ae92cb5e1ccba0d4 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 22:12:59 +0100 Subject: [PATCH 4/5] Fix ordering columns bug --- minisql/src/schema.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/minisql/src/schema.rs b/minisql/src/schema.rs index 570d81d..10e542f 100644 --- a/minisql/src/schema.rs +++ b/minisql/src/schema.rs @@ -59,10 +59,16 @@ impl TableSchema { } pub fn get_columns(&self) -> Vec<&ColumnName> { - self.column_name_position_mapping + let mut columns_in_random_order: Vec<_> = self.column_name_position_mapping .iter() - .map(|(name, _)| name) - .collect() + .collect(); + columns_in_random_order.sort_by(|&(_, column0), &(_, column1)| column0.cmp(column1)); + + let columns: Vec<_> = columns_in_random_order + .iter() + .map(|(name, _)| *name) + .collect(); + columns } pub fn does_column_exist(&self, column_name: &ColumnName) -> bool { From 4bc8539f71ce5d28fd649d05828ead5f8fe81e55 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 22:17:20 +0100 Subject: [PATCH 5/5] Make test succeed --- minisql/src/interpreter2.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 8cc7d1c..aaf4282 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -290,13 +290,11 @@ mod tests { use crate::type_system::{DbType, IndexableValue, Value}; use std::collections::HashSet; use tokio::fs::{File, OpenOptions, DirBuilder}; - use tokio::fs; impl Drop for State { fn drop(&mut self) { println!("CLEANING UP INTERPRETER STATE"); - // TODO: This should be part of the state schema let table_folder = "db-test-0"; // Seems no one has figured out how to do AsyncDrop yet. std::fs::remove_dir_all(table_folder).unwrap(); @@ -370,7 +368,7 @@ mod tests { - assert!(false); + // assert!(false); // assert!(state.tables.len() == 1); // let table = &state.tables[0];