diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 8a8d3bd..e3f9121 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -15,9 +15,10 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use storage_engine::store::Store; +use storage_engine::segments::entry::Entry; use storage_engine::cursor::{ReadCursor, WriteCursor}; use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; -use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex; +use storage_engine::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex}; use crate::cancellation::Cancellation; const METADATA_FILE: &'static str = "metadata.json"; @@ -172,6 +173,19 @@ impl State { Ok(count) } + + async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>) -> DbResult { + let count = cursor.delete_all_entries(true) + .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; + Ok(count) + } + + async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, eq_column: Column, value: Value) -> DbResult { + let count = + cursor.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true) + .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; + Ok(count) + } } impl StateHandler { @@ -238,14 +252,28 @@ impl StateHandler { } Insert(table_position, values) => { let state = self.state.read().await; - todo!() + + let mut table = state.table_at_mut(table_position).await; + let mut cursor = table.write().await?; + + let entry = Entry::new(values); + cursor.insert_entry(entry).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; + + response_writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await.map_err(|e| RuntimeError::AnyhowError(e)) } Delete(table_position, maybe_condition) => { let state = self.state.read().await; + let mut table = state.table_at_mut(table_position).await; + let table_name = table.table_name().clone(); let cursor = table.write().await?; - todo!() + let count = match maybe_condition { + None => State::delete_all_rows(table_name, cursor).await?, + Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, eq_column, value).await? + }; + + response_writer.write_command_complete(CompleteStatus::Delete(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) } CreateTable(table_schema) => { let mut state = self.state.write().await; @@ -255,7 +283,11 @@ impl StateHandler { } CreateIndex(table_position, column) => { let state = self.state.read().await; - todo!() + + let mut table = state.table_at_mut(table_position).await; + let mut cursor = table.write().await?; + cursor.attach_index(column as storage_engine::store::Column).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; + response_writer.write_command_complete(CompleteStatus::CreateIndex).await.map_err(|e| RuntimeError::AnyhowError(e)) } } } @@ -280,7 +312,6 @@ mod tests { fn drop(&mut self) { println!("CLEANING UP INTERPRETER STATE"); - // TODO: This should be part of the state schema let table_folder = "db-test-0"; // Seems no one has figured out how to do AsyncDrop yet. std::fs::remove_dir_all(table_folder).unwrap(); @@ -310,14 +341,49 @@ mod tests { .interpret(&mut response_writer, &DummyCancellation, Operation::CreateTable(users_schema.clone())).await .unwrap(); - let users_position: TablePosition = 0; - state - .interpret(&mut response_writer, &DummyCancellation, Operation::Select( - users_position, - users_schema.all_selection(), - None, - )).await - .unwrap(); + { + println!("==EMPTY SELECT==="); + let users_position: TablePosition = 0; + state + .interpret(&mut response_writer, &DummyCancellation, Operation::Select( + users_position, + users_schema.all_selection(), + None, + )).await + .unwrap(); + } + + { + let users = 0; + let (id, name, age) = ( + Value::Uuid(0), + Value::String("Plato".to_string()), + Value::Int(64), + ); + + println!("About to insert!"); + + state + .interpret(&mut response_writer, &DummyCancellation, Operation::Insert( + users, + vec![id.clone(), name.clone(), age.clone()], + )).await + .unwrap(); + } + { + println!("==SELECT==="); + let users_position: TablePosition = 0; + state + .interpret(&mut response_writer, &DummyCancellation, Operation::Select( + users_position, + users_schema.all_selection(), + None, + )).await + .unwrap(); + } + + + // assert!(false); diff --git a/minisql/src/response_writer.rs b/minisql/src/response_writer.rs index 30158c5..240a2f8 100644 --- a/minisql/src/response_writer.rs +++ b/minisql/src/response_writer.rs @@ -65,7 +65,7 @@ impl ResponseWriter for ResponseWriterStub async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()> { for (_, value) in row.iter() { - print!("{:?}", value) + print!("{:?}, ", value) } println!(); diff --git a/minisql/src/schema.rs b/minisql/src/schema.rs index 570d81d..10e542f 100644 --- a/minisql/src/schema.rs +++ b/minisql/src/schema.rs @@ -59,10 +59,16 @@ impl TableSchema { } pub fn get_columns(&self) -> Vec<&ColumnName> { - self.column_name_position_mapping + let mut columns_in_random_order: Vec<_> = self.column_name_position_mapping .iter() - .map(|(name, _)| name) - .collect() + .collect(); + columns_in_random_order.sort_by(|&(_, column0), &(_, column1)| column0.cmp(column1)); + + let columns: Vec<_> = columns_in_random_order + .iter() + .map(|(name, _)| *name) + .collect(); + columns } pub fn does_column_exist(&self, column_name: &ColumnName) -> bool { diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 2bc61c1..5c76577 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -244,6 +244,21 @@ impl <'cursor, T> WriteCursor<'cursor, T> Ok(count) } + pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result + where T: Encode + Decode + Ord + Send + Sync + Clone + { + let mut count = 0; + while let Some(entry) = self.next_alive().await? { + count += 1; + self.mark_deleted_at(entry.file_position, false).await? + } + + if enable_garbage_collector { + self.attempt_garbage_collection_if_necessary().await?; + } + Ok(count) + } + pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result where T: Encode + Decode + Ord + Send + Sync + Clone { diff --git a/storage_engine/src/segments/entry.rs b/storage_engine/src/segments/entry.rs index 64f6381..a2f8bc8 100644 --- a/storage_engine/src/segments/entry.rs +++ b/storage_engine/src/segments/entry.rs @@ -23,10 +23,6 @@ impl Entry { Self { header: EntryHeader { is_deleted: false }, data } } - pub fn new_deleted(data: Vec) -> Self { - Self { header: EntryHeader { is_deleted: true}, data } - } - // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] pub fn encode(&self) -> Result> where T: Encode