From 3d1a4c94794c951cea48f1b54995dde4960795f3 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 16:17:04 +0100 Subject: [PATCH 01/23] initial commit --- Cargo.lock | 1 + minisql/Cargo.toml | 1 + minisql/src/lib.rs | 1 + 3 files changed, 3 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index d76a63c..3f22665 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -287,6 +287,7 @@ dependencies = [ "proto", "serde", "thiserror", + "tokio", ] [[package]] diff --git a/minisql/Cargo.toml b/minisql/Cargo.toml index f44d711..03d86dd 100644 --- a/minisql/Cargo.toml +++ b/minisql/Cargo.toml @@ -9,5 +9,6 @@ rust-version = "1.74" [dependencies] bimap = { version = "0.6.3", features = ["serde"] } serde = { version = "1.0.196", features = ["derive"] } +tokio = { version = "1.34.0", features = ["full"] } thiserror = "1.0.50" proto = { path = "../proto" } diff --git a/minisql/src/lib.rs b/minisql/src/lib.rs index baa4b4b..6db1e24 100644 --- a/minisql/src/lib.rs +++ b/minisql/src/lib.rs @@ -1,6 +1,7 @@ mod error; mod internals; pub mod interpreter; +pub mod interpreter2; pub mod operation; pub mod restricted_row; mod result; From 18b8049958dea0dca6220c2d580795a4bbbacfa3 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 16:46:18 +0100 Subject: [PATCH 02/23] Forgot to include interpreter file --- minisql/src/interpreter2.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 minisql/src/interpreter2.rs diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs new file mode 100644 index 0000000..9b40b68 --- /dev/null +++ b/minisql/src/interpreter2.rs @@ -0,0 +1,10 @@ + + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn new_state() { + } +} From e0d08e758ac9499c84bcc6ce70058f56273906d3 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 17:01:49 +0100 Subject: [PATCH 03/23] Renaming storage_engine.rs ~> store.rs --- Cargo.lock | 1 + minisql/Cargo.toml | 1 + minisql/src/interpreter2.rs | 77 +++++++++++++++++++ storage_engine/src/cursor.rs | 2 +- .../src/cursor_capabilities/header_access.rs | 2 +- .../src/cursor_capabilities/index_access.rs | 2 +- .../src/cursor_capabilities/primitive.rs | 2 +- storage_engine/src/error.rs | 2 +- storage_engine/src/lib.rs | 10 +-- storage_engine/src/segments/entry.rs | 2 +- storage_engine/src/segments/entry_header.rs | 2 +- storage_engine/src/segments/store_header.rs | 2 +- .../src/{storage_engine.rs => store.rs} | 0 13 files changed, 92 insertions(+), 13 deletions(-) rename storage_engine/src/{storage_engine.rs => store.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index bb1218e..a9a9cdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -288,6 +288,7 @@ dependencies = [ "bimap", "proto", "serde", + "storage_engine", "thiserror", "tokio", ] diff --git a/minisql/Cargo.toml b/minisql/Cargo.toml index 56ffcc1..7fdf355 100644 --- a/minisql/Cargo.toml +++ b/minisql/Cargo.toml @@ -14,3 +14,4 @@ serde = { version = "1.0.196", features = ["derive"] } tokio = { version = "1.34.0", features = ["full"] } thiserror = "1.0.50" proto = { path = "../proto" } +storage_engine = { path = "../storage_engine" } diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 9b40b68..120c113 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -1,3 +1,79 @@ +use crate::operation::{ColumnSelection, Condition, Operation}; +use crate::restricted_row::RestrictedRow; +use crate::result::DbResult; +use crate::schema::{Column, TableName, TablePosition, TableSchema}; +use crate::type_system::Value; +use bimap::BiMap; +use serde::{Deserialize, Serialize}; + +use storage_engine::store::{Store}; + +// ==============Interpreter================ +#[derive(Debug)] +pub struct State { + table_name_position_mapping: BiMap, + tables: Tables, +} + +pub type Tables = Vec; + +#[derive(Debug)] +pub struct Table { + schema: TableSchema, + store: Store +} + +pub type DbSchema<'a> = Vec<(TableName, TablePosition, &'a TableSchema)>; +// To satisfy clippy. +impl Default for State { + fn default() -> Self { + Self::new() + } +} + +impl State { + pub fn new() -> Self { + Self { + table_name_position_mapping: BiMap::new(), + tables: vec![], + } + } + + pub fn db_schema(&self) -> DbSchema { + // let mut schema: DbSchema = Vec::new(); + // for (table_name, &table_position) in &self.table_name_position_mapping { + // let table_schema = self.tables[table_position].schema(); + // schema.push((table_name.clone(), table_position, table_schema)); + // } + // schema + + todo!() + } + + pub async fn interpret(&mut self, operation: Operation) -> DbResult<()> { + use Operation::*; + + match operation { + Select(table_position, column_selection, maybe_condition) => { + todo!() + } + Insert(table_position, values) => { + todo!() + } + Delete(table_position, maybe_condition) => { + todo!() + } + CreateTable(table_schema) => { + todo!() + } + CreateIndex(table_position, column) => { + todo!() + } + } + } +} + + #[cfg(test)] @@ -8,3 +84,4 @@ mod tests { async fn new_state() { } } + diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 3f68560..80d8de8 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -10,7 +10,7 @@ use bincode::{Decode, Encode}; use crate::segments::entry::EntryDetailed; use crate::segments::entry_header::EntryHeader; use crate::segments::store_header::StoreHeader; -use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; +use crate::store::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; use crate::index::Index; use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader}; diff --git a/storage_engine/src/cursor_capabilities/header_access.rs b/storage_engine/src/cursor_capabilities/header_access.rs index 57ccabb..96bdbea 100644 --- a/storage_engine/src/cursor_capabilities/header_access.rs +++ b/storage_engine/src/cursor_capabilities/header_access.rs @@ -9,7 +9,7 @@ use crate::error::{Error, DecodeErrorKind}; use crate::segments::entry::{Entry, EntryDetailed}; use crate::segments::entry_header::EntryHeaderWithDataSize; use crate::segments::store_header::StoreHeader; -use crate::storage_engine::{FilePosition, Column, Result}; +use crate::store::{FilePosition, Column, Result}; use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; #[async_trait] diff --git a/storage_engine/src/cursor_capabilities/index_access.rs b/storage_engine/src/cursor_capabilities/index_access.rs index d9d7fc3..1eca46c 100644 --- a/storage_engine/src/cursor_capabilities/index_access.rs +++ b/storage_engine/src/cursor_capabilities/index_access.rs @@ -7,7 +7,7 @@ use bincode::{Decode, Encode}; use crate::error::Error; use crate::segments::entry::{Entry, EntryDetailed}; -use crate::storage_engine::{FilePosition, Column, Result}; +use crate::store::{FilePosition, Column, Result}; use crate::index::Index; use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader}; diff --git a/storage_engine/src/cursor_capabilities/primitive.rs b/storage_engine/src/cursor_capabilities/primitive.rs index d11b79b..82fb23d 100644 --- a/storage_engine/src/cursor_capabilities/primitive.rs +++ b/storage_engine/src/cursor_capabilities/primitive.rs @@ -2,7 +2,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; use tokio::fs::File; use async_trait::async_trait; -use crate::storage_engine::{FilePosition, Result}; +use crate::store::{FilePosition, Result}; #[async_trait] pub(crate) trait CursorCanRead { diff --git a/storage_engine/src/error.rs b/storage_engine/src/error.rs index 47bf5cc..7b21fdb 100644 --- a/storage_engine/src/error.rs +++ b/storage_engine/src/error.rs @@ -1,4 +1,4 @@ -use crate::storage_engine::Column; +use crate::store::Column; #[derive(Debug)] pub enum Error { diff --git a/storage_engine/src/lib.rs b/storage_engine/src/lib.rs index e7920dd..00ff87c 100644 --- a/storage_engine/src/lib.rs +++ b/storage_engine/src/lib.rs @@ -1,7 +1,7 @@ -pub mod storage_engine; +pub mod store; mod binary_coding; -mod error; +pub mod error; mod index; -mod cursor; -mod segments; -mod cursor_capabilities; +pub mod cursor; +pub mod segments; +pub mod cursor_capabilities; diff --git a/storage_engine/src/segments/entry.rs b/storage_engine/src/segments/entry.rs index b42c490..64f6381 100644 --- a/storage_engine/src/segments/entry.rs +++ b/storage_engine/src/segments/entry.rs @@ -1,7 +1,7 @@ use bincode::{Decode, Encode}; use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence}; -use crate::storage_engine::{Result, FilePosition}; +use crate::store::{Result, FilePosition}; use crate::error::{Error, DecodeErrorKind}; use crate::segments::entry_header::{EntryHeader, EntryHeaderWithDataSize}; diff --git a/storage_engine/src/segments/entry_header.rs b/storage_engine/src/segments/entry_header.rs index cee5496..c0dc11e 100644 --- a/storage_engine/src/segments/entry_header.rs +++ b/storage_engine/src/segments/entry_header.rs @@ -1,5 +1,5 @@ use crate::binary_coding::{decode, encode, decode_sequence}; -use crate::storage_engine::{Result, Column}; +use crate::store::{Result, Column}; use crate::error::{Error, DecodeErrorKind}; use std::mem::size_of; diff --git a/storage_engine/src/segments/store_header.rs b/storage_engine/src/segments/store_header.rs index 890b198..8582402 100644 --- a/storage_engine/src/segments/store_header.rs +++ b/storage_engine/src/segments/store_header.rs @@ -1,5 +1,5 @@ use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence}; -use crate::storage_engine::{Result, Column}; +use crate::store::{Result, Column}; use crate::error::{Error, DecodeErrorKind}; use std::mem::size_of; diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/store.rs similarity index 100% rename from storage_engine/src/storage_engine.rs rename to storage_engine/src/store.rs From 167028a530fdc8e9e9c7a023385452bcc6cc1950 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 17:11:53 +0100 Subject: [PATCH 04/23] Error formatting --- minisql/src/error.rs | 2 ++ storage_engine/src/error.rs | 20 ++++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/minisql/src/error.rs b/minisql/src/error.rs index d74af4e..a38446e 100644 --- a/minisql/src/error.rs +++ b/minisql/src/error.rs @@ -13,6 +13,8 @@ pub enum RuntimeError { AttemptToIndexNonIndexableColumn(TableName, ColumnName), #[error("table {0} already indexes column {1}")] AttemptToIndexAlreadyIndexedColumn(TableName, ColumnName), + #[error("Storage Engine error for table {0}: {1}")] + StorageEngineError(TableName, storage_engine::error::Error) } #[derive(Debug, Error)] diff --git a/storage_engine/src/error.rs b/storage_engine/src/error.rs index 7b21fdb..1ca849c 100644 --- a/storage_engine/src/error.rs +++ b/storage_engine/src/error.rs @@ -1,25 +1,41 @@ use crate::store::Column; +use thiserror::Error; -#[derive(Debug)] +#[derive(Debug, Error)] pub enum Error { + #[error("Decoding Error: {0} {1}")] DecodeError(DecodeErrorKind, bincode::error::DecodeError), + #[error("Encoding Error: {0}")] EncodeError(bincode::error::EncodeError), + #[error("Attempt to index non-indexed column {0}")] AttemptToIndexNonIndexableColumn(Column), + #[error("Index Corruption: Index is storing eof file position for column {0}")] IndexIsStoringEofFilePosition(Column), + #[error("Column {0} is already indexed")] ColumnAlreadyIndexed(Column), + #[error("File-System Error: {0}")] IoError(std::io::Error), } -#[derive(Debug)] +#[derive(Debug, Error)] pub enum DecodeErrorKind { + #[error("StoreHeaderNumberOfColumns")] StoreHeaderNumberOfColumns, + #[error("StoreHeaderDeletedCount")] StoreHeaderDeletedCount, + #[error("StoreHeaderTotalCount")] StoreHeaderTotalCount, + #[error("StoreHeaderPrimaryColumn")] StoreHeaderPrimaryColumn, + #[error("StoreHeaderIndexedColumns")] StoreHeaderIndexedColumns, + #[error("EntryData")] EntryData, + #[error("EntryIsDeleted")] EntryIsDeleted, + #[error("EntryHeaderWithDataSizes")] EntryHeaderWithDataSizes, + #[error("CorruptedData")] CorruptedData, } From 84a880f9e65ce377d644544d4dbc6b84b7279a81 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 17:39:38 +0100 Subject: [PATCH 05/23] Renaming --- minisql/src/interpreter2.rs | 49 ++++++++++++-- storage_engine/src/cursor.rs | 13 ++-- .../cursor_capabilities/entry_modification.rs | 67 +++++++++++++++++++ .../src/cursor_capabilities/index_access.rs | 7 +- storage_engine/src/cursor_capabilities/mod.rs | 3 +- .../{header_access.rs => traversal.rs} | 67 ++----------------- storage_engine/src/store.rs | 4 +- 7 files changed, 133 insertions(+), 77 deletions(-) create mode 100644 storage_engine/src/cursor_capabilities/entry_modification.rs rename storage_engine/src/cursor_capabilities/{header_access.rs => traversal.rs} (74%) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 120c113..3a13950 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -3,10 +3,13 @@ use crate::restricted_row::RestrictedRow; use crate::result::DbResult; use crate::schema::{Column, TableName, TablePosition, TableSchema}; use crate::type_system::Value; -use bimap::BiMap; -use serde::{Deserialize, Serialize}; +use crate::error::RuntimeError; +use crate::response_writer::ResponseWriter; -use storage_engine::store::{Store}; +use bimap::BiMap; + +use storage_engine::store::Store; +use storage_engine::cursor::{ReadCursor, WriteCursor}; // ==============Interpreter================ #[derive(Debug)] @@ -31,6 +34,18 @@ impl Default for State { } } +impl Table { + async fn read(&self) -> DbResult> { + let cursor = self.store.read_cursor().await.map_err(|err| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), err))?; + Ok(cursor) + } + + async fn write(&mut self) -> DbResult> { + let cursor = self.store.write_cursor().await.map_err(|err| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), err))?; + Ok(cursor) + } +} + impl State { pub fn new() -> Self { Self { @@ -50,17 +65,43 @@ impl State { todo!() } - pub async fn interpret(&mut self, operation: Operation) -> DbResult<()> { + fn table_at(&self, table_position: TablePosition) -> &Table { + &self.tables[table_position] + } + + fn table_at_mut(&mut self, table_position: TablePosition) -> &mut Table { + &mut self.tables[table_position] + } + + pub async fn interpret(&mut self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { use Operation::*; match operation { Select(table_position, column_selection, maybe_condition) => { + let table: &Table = self.table_at(table_position); + let cursor = table.read().await?; + + let selected_rows = match maybe_condition { + None => { + // select all rows + todo!() + } + + Some(Condition::Eq(eq_column, value)) => { + todo!() + } + }; + todo!() } Insert(table_position, values) => { + let table: &mut Table = self.table_at_mut(table_position); + let cursor = table.write().await?; todo!() } Delete(table_position, maybe_condition) => { + let table: &mut Table = self.table_at_mut(table_position); + let cursor = table.write().await?; todo!() } CreateTable(table_schema) => { diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 80d8de8..2aab2b6 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -13,7 +13,8 @@ use crate::segments::store_header::StoreHeader; use crate::store::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; use crate::index::Index; use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; -use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader}; +use crate::cursor_capabilities::traversal::CursorCanTraverse; +use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries; use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex}; const GARBAGE_COLLECTION_TRIGGER: usize = 100; @@ -82,24 +83,24 @@ impl CursorCanWrite for AppendOnlyCursor {} // ===capability to access header=== -impl CursorCanReadHeader for ReadCursor<'_, T> { +impl CursorCanTraverse for ReadCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } -impl CursorCanReadHeader for WriteCursor<'_, T> { +impl CursorCanTraverse for WriteCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } -impl CursorCanReadHeader for AppendOnlyCursor { +impl CursorCanTraverse for AppendOnlyCursor { fn header(&self) -> &StoreHeader { &self.header } } -impl CursorCanWriteHeader for WriteCursor<'_, T> { +impl CursorCanModifyEntries for WriteCursor<'_, T> { fn header_mut(&mut self) -> &mut StoreHeader { self.header } fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } } -impl CursorCanWriteHeader for AppendOnlyCursor { +impl CursorCanModifyEntries for AppendOnlyCursor { fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header } fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } } diff --git a/storage_engine/src/cursor_capabilities/entry_modification.rs b/storage_engine/src/cursor_capabilities/entry_modification.rs new file mode 100644 index 0000000..6622347 --- /dev/null +++ b/storage_engine/src/cursor_capabilities/entry_modification.rs @@ -0,0 +1,67 @@ +use async_trait::async_trait; + +use bincode; +use bincode::Encode; +use crate::binary_coding::encode; + +use crate::segments::entry::Entry; +use crate::segments::store_header::StoreHeader; +use crate::store::{FilePosition, Result}; +use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; +use crate::cursor_capabilities::traversal::CursorCanTraverse; + +#[async_trait] +pub trait CursorCanModifyEntries: CursorCanTraverse + CursorCanWrite { + fn header_mut(&mut self) -> &mut StoreHeader; + fn set_eof_file_position(&mut self, new_file_position: FilePosition); + + // ===Store Header Manipulation=== + async fn increment_total_count(&mut self) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; + let new_count = self.header_mut().increment_total_count(); + self.write_bytes(&encode::(&new_count)?).await?; + Ok(()) + } + + async fn increment_deleted_count(&mut self) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; + let new_count = self.header_mut().increment_deleted_count(); + self.write_bytes(&encode::(&new_count)?).await?; + Ok(()) + } + + async fn set_header(&mut self, header: &StoreHeader) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + let encoded_header: Vec = header.encode()?; + self.write_bytes(&encoded_header).await?; + + Ok(()) + } + + // ===Append Entry=== + + // Moves cursor to the end. + // Returns file position to the start of the new entry. + async fn append_entry_no_indexing(&mut self, entry: &Entry) -> Result + where T: Encode + Send + Sync + { + self.increment_total_count().await?; + + let encoded_entry: Vec = entry.encode()?; + let file_position = self.seek_to_end().await?; + self.write_bytes(&encoded_entry).await?; + + let eof_file_position: FilePosition = self.current_file_position().await?; + self.set_eof_file_position(eof_file_position); + + Ok(file_position) + } +} diff --git a/storage_engine/src/cursor_capabilities/index_access.rs b/storage_engine/src/cursor_capabilities/index_access.rs index 1eca46c..81ee38e 100644 --- a/storage_engine/src/cursor_capabilities/index_access.rs +++ b/storage_engine/src/cursor_capabilities/index_access.rs @@ -9,10 +9,11 @@ use crate::error::Error; use crate::segments::entry::{Entry, EntryDetailed}; use crate::store::{FilePosition, Column, Result}; use crate::index::Index; -use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader}; +use crate::cursor_capabilities::traversal::CursorCanTraverse; +use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries; #[async_trait] -pub trait CursorCanReadIndex: CursorCanReadHeader { +pub trait CursorCanReadIndex: CursorCanTraverse { fn indexes(&mut self) -> &[Option>]; async fn index_lookup(&mut self, column: Column, value: &T) -> Result>> @@ -52,7 +53,7 @@ pub trait CursorCanReadIndex: CursorCanReadHeader { } #[async_trait] -pub trait CursorCanWriteToIndex: CursorCanReadIndex + CursorCanWriteHeader { +pub trait CursorCanWriteToIndex: CursorCanReadIndex + CursorCanModifyEntries { fn indexes_mut(&mut self) -> &mut [Option>]; // Assumes that the column is indexable. diff --git a/storage_engine/src/cursor_capabilities/mod.rs b/storage_engine/src/cursor_capabilities/mod.rs index 6d301cb..d3a3bcb 100644 --- a/storage_engine/src/cursor_capabilities/mod.rs +++ b/storage_engine/src/cursor_capabilities/mod.rs @@ -1,3 +1,4 @@ pub(crate) mod primitive; -pub(crate) mod header_access; +pub(crate) mod traversal; +pub(crate) mod entry_modification; pub(crate) mod index_access; diff --git a/storage_engine/src/cursor_capabilities/header_access.rs b/storage_engine/src/cursor_capabilities/traversal.rs similarity index 74% rename from storage_engine/src/cursor_capabilities/header_access.rs rename to storage_engine/src/cursor_capabilities/traversal.rs index 96bdbea..00a1da6 100644 --- a/storage_engine/src/cursor_capabilities/header_access.rs +++ b/storage_engine/src/cursor_capabilities/traversal.rs @@ -2,18 +2,19 @@ use tokio::io::AsyncReadExt; use async_trait::async_trait; use bincode; -use bincode::{Decode, Encode}; -use crate::binary_coding::{encode, decode}; +use bincode::Decode; +use crate::binary_coding::decode; use crate::error::{Error, DecodeErrorKind}; -use crate::segments::entry::{Entry, EntryDetailed}; +use crate::segments::entry::EntryDetailed; use crate::segments::entry_header::EntryHeaderWithDataSize; use crate::segments::store_header::StoreHeader; use crate::store::{FilePosition, Column, Result}; -use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; +use crate::cursor_capabilities::primitive::CursorCanRead; + #[async_trait] -pub trait CursorCanReadHeader: CursorCanRead { +pub trait CursorCanTraverse: CursorCanRead { fn header(&self) -> &StoreHeader; async fn seek_to_start_of_data(&mut self) -> Result { @@ -172,59 +173,3 @@ pub trait CursorCanReadHeader: CursorCanRead { Ok(bytes) } } - -#[async_trait] -pub trait CursorCanWriteHeader: CursorCanReadHeader + CursorCanWrite { - fn header_mut(&mut self) -> &mut StoreHeader; - fn set_eof_file_position(&mut self, new_file_position: FilePosition); - - // ===Store Header Manipulation=== - async fn increment_total_count(&mut self) -> Result<()> - where T: Send - { - self.seek_to_start().await?; - self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; - let new_count = self.header_mut().increment_total_count(); - self.write_bytes(&encode::(&new_count)?).await?; - Ok(()) - } - - async fn increment_deleted_count(&mut self) -> Result<()> - where T: Send - { - self.seek_to_start().await?; - self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; - let new_count = self.header_mut().increment_deleted_count(); - self.write_bytes(&encode::(&new_count)?).await?; - Ok(()) - } - - async fn set_header(&mut self, header: &StoreHeader) -> Result<()> - where T: Send - { - self.seek_to_start().await?; - let encoded_header: Vec = header.encode()?; - self.write_bytes(&encoded_header).await?; - - Ok(()) - } - - // ===Append Entry=== - - // Moves cursor to the end. - // Returns file position to the start of the new entry. - async fn append_entry_no_indexing(&mut self, entry: &Entry) -> Result - where T: Encode + Send + Sync - { - self.increment_total_count().await?; - - let encoded_entry: Vec = entry.encode()?; - let file_position = self.seek_to_end().await?; - self.write_bytes(&encoded_entry).await?; - - let eof_file_position: FilePosition = self.current_file_position().await?; - self.set_eof_file_position(eof_file_position); - - Ok(file_position) - } -} diff --git a/storage_engine/src/store.rs b/storage_engine/src/store.rs index 7798e10..ec2dca1 100644 --- a/storage_engine/src/store.rs +++ b/storage_engine/src/store.rs @@ -6,7 +6,7 @@ use bincode::{Decode, Encode}; use crate::error::Error; use crate::cursor::{ReadCursor, WriteCursor}; -use crate::cursor_capabilities::header_access::CursorCanReadHeader; +use crate::cursor_capabilities::traversal::CursorCanTraverse; use crate::segments::store_header::StoreHeader; use crate::index::Index; @@ -208,8 +208,8 @@ impl Store { mod tests { use super::*; use crate::segments::entry::Entry; - use crate::cursor_capabilities::header_access::CursorCanReadHeader; use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex}; + use crate::cursor_capabilities::traversal::CursorCanTraverse; impl Drop for Store { fn drop(&mut self) { From c7166bd12eef38f0a0d58d5d1c6a6cd0985b5af0 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 18:41:40 +0100 Subject: [PATCH 06/23] select all --- Cargo.lock | 1 + minisql/Cargo.toml | 1 + minisql/src/error.rs | 11 +++++++- minisql/src/internals/row.rs | 7 ++++++ minisql/src/interpreter2.rs | 25 +++++++++++++------ minisql/src/restricted_row.rs | 4 ++- minisql/src/type_system.rs | 19 ++++++++++++-- storage_engine/src/cursor_capabilities/mod.rs | 6 ++--- 8 files changed, 59 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9a9cdc..b653809 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,6 +286,7 @@ dependencies = [ "anyhow", "async-trait", "bimap", + "bincode", "proto", "serde", "storage_engine", diff --git a/minisql/Cargo.toml b/minisql/Cargo.toml index 7fdf355..ca06eb6 100644 --- a/minisql/Cargo.toml +++ b/minisql/Cargo.toml @@ -10,6 +10,7 @@ rust-version = "1.74" anyhow = "1.0.79" async-trait = "0.1.77" bimap = { version = "0.6.3", features = ["serde"] } +bincode = "2.0.0-rc.3" serde = { version = "1.0.196", features = ["derive"] } tokio = { version = "1.34.0", features = ["full"] } thiserror = "1.0.50" diff --git a/minisql/src/error.rs b/minisql/src/error.rs index a38446e..827de93 100644 --- a/minisql/src/error.rs +++ b/minisql/src/error.rs @@ -14,7 +14,9 @@ pub enum RuntimeError { #[error("table {0} already indexes column {1}")] AttemptToIndexAlreadyIndexedColumn(TableName, ColumnName), #[error("Storage Engine error for table {0}: {1}")] - StorageEngineError(TableName, storage_engine::error::Error) + StorageEngineError(TableName, storage_engine::error::Error), + #[error("runtime anyhow error: {0}")] + AnyhowError(anyhow::Error) } #[derive(Debug, Error)] @@ -28,3 +30,10 @@ pub enum TypeConversionError { #[error("unknown type with oid {oid} and size {size}")] UnknownType { oid: PgOid, size: i16 }, } + + +impl From for RuntimeError { + fn from(e: anyhow::Error) -> RuntimeError { + Self::AnyhowError(e) + } +} diff --git a/minisql/src/internals/row.rs b/minisql/src/internals/row.rs index 248f2f7..91bfd2f 100644 --- a/minisql/src/internals/row.rs +++ b/minisql/src/internals/row.rs @@ -5,6 +5,7 @@ use crate::type_system::Value; use serde::{Deserialize, Serialize}; use std::ops::{Index, IndexMut}; use std::slice::SliceIndex; +use storage_engine::segments::entry::EntryDetailed; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Row(Vec); @@ -39,6 +40,12 @@ impl FromIterator for Row { } } +impl From> for Row { + fn from(entry: EntryDetailed) -> Self { + Row(entry.data) + } +} + // To satisfy clippy. impl Default for Row { fn default() -> Self { diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 3a13950..a312e1a 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -5,11 +5,13 @@ use crate::schema::{Column, TableName, TablePosition, TableSchema}; use crate::type_system::Value; use crate::error::RuntimeError; use crate::response_writer::ResponseWriter; +use crate::internals::row::Row; use bimap::BiMap; use storage_engine::store::Store; use storage_engine::cursor::{ReadCursor, WriteCursor}; +use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; // ==============Interpreter================ #[derive(Debug)] @@ -36,12 +38,12 @@ impl Default for State { impl Table { async fn read(&self) -> DbResult> { - let cursor = self.store.read_cursor().await.map_err(|err| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), err))?; + let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e))?; Ok(cursor) } async fn write(&mut self) -> DbResult> { - let cursor = self.store.write_cursor().await.map_err(|err| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), err))?; + let cursor = self.store.write_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e))?; Ok(cursor) } } @@ -73,6 +75,15 @@ impl State { &mut self.tables[table_position] } + async fn select_all_rows(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection) -> DbResult<()> { + while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.schema.table_name().to_string(), e))? { + let row: Row = From::from(entry); + let restricted_row = row.restrict_columns(&column_selection); + response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; + } + Ok(()) + } + pub async fn interpret(&mut self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { use Operation::*; @@ -81,18 +92,16 @@ impl State { let table: &Table = self.table_at(table_position); let cursor = table.read().await?; - let selected_rows = match maybe_condition { + response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; + match maybe_condition { None => { - // select all rows - todo!() + Self::select_all_rows(&table, cursor, response_writer, column_selection).await } Some(Condition::Eq(eq_column, value)) => { todo!() } - }; - - todo!() + } } Insert(table_position, values) => { let table: &mut Table = self.table_at_mut(table_position); diff --git a/minisql/src/restricted_row.rs b/minisql/src/restricted_row.rs index 15aebba..07ad0e0 100644 --- a/minisql/src/restricted_row.rs +++ b/minisql/src/restricted_row.rs @@ -1,7 +1,9 @@ -use crate::schema::Column; +use crate::schema::{Column, TableSchema}; use crate::type_system::Value; +use crate::operation::ColumnSelection; use std::ops::Index; use std::slice::SliceIndex; +use storage_engine::segments::entry::EntryDetailed; #[derive(Debug, Clone)] pub struct RestrictedRow(Vec<(Column, Value)>); diff --git a/minisql/src/type_system.rs b/minisql/src/type_system.rs index e30edea..8aa811d 100644 --- a/minisql/src/type_system.rs +++ b/minisql/src/type_system.rs @@ -2,9 +2,12 @@ use crate::error::TypeConversionError; use proto::message::primitive::pgoid::PgOid; use serde::{Deserialize, Serialize}; use std::cmp::Ordering; +// TODO: Private??? +// use bincode::{Encode, Encoder, EncodeError, Decode, Decoder, DecodeError}; +use bincode::{Encode, Decode}; // ==============Types================ -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)] pub enum DbType { String, Int, @@ -15,7 +18,7 @@ pub enum DbType { // ==============Values================ pub type Uuid = u64; -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Encode, Decode)] #[serde(try_from = "String", into = "String")] pub enum Value { Number(f64), @@ -81,6 +84,18 @@ impl Ord for IndexableValue { } } +// impl Encode for Value { +// fn encode(&self, encoder: &mut E) -> Result<(), EncodeError> { +// todo!() +// } +// } + +// impl Decode for Value { +// fn decode(decoder: &mut D) -> Result { +// todo!() +// } +// } + impl DbType { fn new_n_option(n: usize, inside: DbType) -> DbType { if n == 0 { diff --git a/storage_engine/src/cursor_capabilities/mod.rs b/storage_engine/src/cursor_capabilities/mod.rs index d3a3bcb..9872f1c 100644 --- a/storage_engine/src/cursor_capabilities/mod.rs +++ b/storage_engine/src/cursor_capabilities/mod.rs @@ -1,4 +1,4 @@ pub(crate) mod primitive; -pub(crate) mod traversal; -pub(crate) mod entry_modification; -pub(crate) mod index_access; +pub mod traversal; +pub mod entry_modification; +pub mod index_access; From f973df2ca230655b2268f8718c89b4696ff1e377 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 18:51:47 +0100 Subject: [PATCH 07/23] Sketch select_eq --- minisql/src/interpreter2.rs | 13 ++++++++++++- minisql/src/type_system.rs | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index a312e1a..3944710 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -12,6 +12,7 @@ use bimap::BiMap; use storage_engine::store::Store; use storage_engine::cursor::{ReadCursor, WriteCursor}; use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; +use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex; // ==============Interpreter================ #[derive(Debug)] @@ -84,6 +85,16 @@ impl State { Ok(()) } + async fn select_eq(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult<()> { + let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.schema.table_name().to_string(), e))?; + for entry in entries { + let row: Row = From::from(entry); + let restricted_row = row.restrict_columns(&column_selection); + response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; + } + Ok(()) + } + pub async fn interpret(&mut self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { use Operation::*; @@ -99,7 +110,7 @@ impl State { } Some(Condition::Eq(eq_column, value)) => { - todo!() + Self::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await } } } diff --git a/minisql/src/type_system.rs b/minisql/src/type_system.rs index 8aa811d..155eb99 100644 --- a/minisql/src/type_system.rs +++ b/minisql/src/type_system.rs @@ -84,6 +84,42 @@ impl Ord for IndexableValue { } } +impl Eq for Value {} + +impl PartialOrd for Value { + fn partial_cmp(&self, other: &Self) -> Option { + todo!() + } +} + +// TODO: Make column know about indexable types +impl Ord for Value { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (Value::String(s0), Value::String(s1)) => s0.cmp(s1), + (Value::Int(n0), Value::Int(n1)) => n0.cmp(n1), + (Value::Uuid(id0), Value::Uuid(id1)) => id0.cmp(id1), + (Value::None(_), Value::None(_)) => Ordering::Equal, + (Value::None(_), Value::Some(_)) => Ordering::Less, + (Value::Some(_), Value::None(_)) => Ordering::Greater, + (Value::Some(v0), Value::Some(v1)) => v0.cmp(v1), + _ => + // SAFETY: + // We are using indexable values as keys in key-value maps. + // When validation is done, it can't happen that we will be comparing two values + // of different types. + // Ofcourse another option is to artificialy order e.g. + // None < Some(...) < String < Int < Uuid + // where ... is again None < Some(...) < String < Int < Uuid + // where ... + // infinitely deep total order. But this is pointless for our usecase. + { + unreachable!() + } + } + } +} + // impl Encode for Value { // fn encode(&self, encoder: &mut E) -> Result<(), EncodeError> { // todo!() From 2dd0555174bd4d094173ad2fcbc6cd8e3a02cc90 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 19:27:46 +0100 Subject: [PATCH 08/23] Create table partial implementation --- minisql/src/interpreter2.rs | 96 +++++++++++++++++---- minisql/src/schema.rs | 4 + storage_engine/src/cursor.rs | 2 +- storage_engine/src/segments/store_header.rs | 5 +- storage_engine/src/store.rs | 23 +++-- 5 files changed, 98 insertions(+), 32 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 3944710..b96a071 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -1,13 +1,13 @@ use crate::operation::{ColumnSelection, Condition, Operation}; -use crate::restricted_row::RestrictedRow; use crate::result::DbResult; use crate::schema::{Column, TableName, TablePosition, TableSchema}; use crate::type_system::Value; use crate::error::RuntimeError; -use crate::response_writer::ResponseWriter; +use crate::response_writer::{ResponseWriter, CompleteStatus}; use crate::internals::row::Row; use bimap::BiMap; +use std::path::Path; use storage_engine::store::Store; use storage_engine::cursor::{ReadCursor, WriteCursor}; @@ -38,8 +38,23 @@ impl Default for State { } impl Table { + async fn new(table_schema: TableSchema, db_path: &Path) -> DbResult { + let table_folder_name = table_schema.table_name(); + let path_to_table_folder = db_path.join(table_folder_name); + + let number_of_columns = table_schema.number_of_columns(); + let primary_column = table_schema.primary_column() as storage_engine::store::Column; + let store: Store = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap(); + + let table = Self { + schema: table_schema, + store, + }; + Ok(table) + } + async fn read(&self) -> DbResult> { - let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e))?; + let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?; Ok(cursor) } @@ -47,6 +62,14 @@ impl Table { let cursor = self.store.write_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e))?; Ok(cursor) } + + pub fn schema(&self) -> &TableSchema { + &self.schema + } + + pub fn table_name(&self) -> &TableName { + self.schema.table_name() + } } impl State { @@ -58,14 +81,12 @@ impl State { } pub fn db_schema(&self) -> DbSchema { - // let mut schema: DbSchema = Vec::new(); - // for (table_name, &table_position) in &self.table_name_position_mapping { - // let table_schema = self.tables[table_position].schema(); - // schema.push((table_name.clone(), table_position, table_schema)); - // } - // schema - - todo!() + let mut schema: DbSchema = Vec::new(); + for (table_name, &table_position) in &self.table_name_position_mapping { + let table_schema: &TableSchema = self.tables[table_position].schema(); + schema.push((table_name.clone(), table_position, table_schema)); + } + schema } fn table_at(&self, table_position: TablePosition) -> &Table { @@ -76,22 +97,32 @@ impl State { &mut self.tables[table_position] } + async fn attach_table(&mut self, table: Table) { + // TODO: You need to update the global DB SCHEMA! + let new_table_position: TablePosition = self.tables.len(); + self.table_name_position_mapping + .insert(table.schema().table_name().clone(), new_table_position); + self.tables.push(table); + } + async fn select_all_rows(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection) -> DbResult<()> { - while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.schema.table_name().to_string(), e))? { + while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? { let row: Row = From::from(entry); let restricted_row = row.restrict_columns(&column_selection); response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; } + // TODO: You need to write COmmand complete Ok(()) } async fn select_eq(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult<()> { - let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.schema.table_name().to_string(), e))?; + let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; for entry in entries { let row: Row = From::from(entry); let restricted_row = row.restrict_columns(&column_selection); response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; } + // TODO: You need to write COmmand complete Ok(()) } @@ -125,7 +156,12 @@ impl State { todo!() } CreateTable(table_schema) => { - todo!() + // TODO: This needs to be given from a config + let db_path = "test_db"; + + let table = Table::new(table_schema, Path::new(db_path)).await?; + self.attach_table(table).await; + response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e)) } CreateIndex(table_position, column) => { todo!() @@ -140,9 +176,35 @@ impl State { #[cfg(test)] mod tests { use super::*; + use crate::operation::Operation; + use crate::schema::Column; + use crate::type_system::{DbType, IndexableValue, Value}; + use std::collections::HashSet; - #[tokio::test] - async fn new_state() { - } + fn users_schema() -> TableSchema { + TableSchema::new( + "users".to_string(), + "id".to_string(), + vec!["id".to_string(), "name".to_string(), "age".to_string()], + vec![DbType::Uuid, DbType::String, DbType::Int], + ) + } + + #[test] + fn test_table_creation() { + let mut state = State::new(); + let users_schema = users_schema(); + let users = users_schema.table_name().clone(); + + // state + // .interpret(Operation::CreateTable(users_schema)) + // .unwrap(); + + // assert!(state.tables.len() == 1); + // let table = &state.tables[0]; + // assert!(table.rows().len() == 0); + + // assert!(table.table_name() == &users); + } } diff --git a/minisql/src/schema.rs b/minisql/src/schema.rs index 6833a3d..570d81d 100644 --- a/minisql/src/schema.rs +++ b/minisql/src/schema.rs @@ -50,6 +50,10 @@ impl TableSchema { &self.table_name } + pub fn primary_column(&self) -> Column { + self.primary_key + } + pub fn column_type(&self, column: Column) -> DbType { self.types[column].clone() } diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 2aab2b6..2bc61c1 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -364,7 +364,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> async fn spawn_cursor_to_intermediate_file(&self) -> Result> where T: Send { - let table_folder = self.header.table_folder.to_string(); + let table_folder = self.header.table_folder.clone(); let path_to_table = Path::new(&table_folder); let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); diff --git a/storage_engine/src/segments/store_header.rs b/storage_engine/src/segments/store_header.rs index 8582402..77d6de6 100644 --- a/storage_engine/src/segments/store_header.rs +++ b/storage_engine/src/segments/store_header.rs @@ -2,10 +2,11 @@ use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence}; use crate::store::{Result, Column}; use crate::error::{Error, DecodeErrorKind}; use std::mem::size_of; +use std::path::PathBuf; #[derive(Debug, Clone)] pub struct StoreHeader { - pub table_folder: String, // This one is not encoded into the file + pub table_folder: PathBuf, // This one is not encoded into the file pub number_of_columns: usize, pub deleted_count: usize, @@ -93,7 +94,7 @@ impl StoreHeader { .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?; Ok(StoreHeader { - table_folder: header.table_folder, + table_folder: header.table_folder.into(), number_of_columns: header.number_of_columns, deleted_count: header.deleted_count, total_count: header.total_count, diff --git a/storage_engine/src/store.rs b/storage_engine/src/store.rs index ec2dca1..cf741a8 100644 --- a/storage_engine/src/store.rs +++ b/storage_engine/src/store.rs @@ -36,10 +36,9 @@ pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_i impl Store { // ===Creation=== - pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result + pub async fn new(path_to_table: &Path, number_of_columns: usize, primary_column: Column) -> Result where T: Encode + Decode + Ord { - let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); DirBuilder::new() .create(path_to_table).await?; @@ -48,7 +47,7 @@ impl Store { let mut indexed_columns = vec![false; number_of_columns]; indexed_columns[primary_column as usize] = true; StoreHeader { - table_folder: table_folder.to_string(), + table_folder: path_to_table.to_path_buf(), number_of_columns, deleted_count: 0, total_count: 0, @@ -225,7 +224,7 @@ mod tests { async fn test_create() { type Data = u32; - let table_path = "test_table_0"; + let table_path = Path::new("test_table_0"); let number_of_columns = 5; let primary_column = 0; let store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -240,7 +239,7 @@ mod tests { async fn test_insert() { type Data = u32; - let table_path = "test_table_1"; + let table_path = Path::new("test_table_1"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -262,7 +261,7 @@ mod tests { async fn test_select_next() { type Data = u32; - let table_path = "test_table_2"; + let table_path = Path::new("test_table_2"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -294,7 +293,7 @@ mod tests { async fn test_select_all() { type Data = u32; - let table_path = "test_table_3"; + let table_path = Path::new("test_table_3"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -329,7 +328,7 @@ mod tests { async fn test_select_eq() { type Data = u32; - let table_path = "test_table_4"; + let table_path = Path::new("test_table_4"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -369,7 +368,7 @@ mod tests { async fn test_select_eq_indexed() { type Data = u32; - let table_path = "test_table_5"; + let table_path = Path::new("test_table_5"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -415,7 +414,7 @@ mod tests { async fn test_delete_entry() { type Data = u32; - let table_path = "test_table_6"; + let table_path = Path::new("test_table_6"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -452,7 +451,7 @@ mod tests { async fn test_delete_where_eq() { type Data = u32; - let table_path = "test_table_7"; + let table_path = Path::new("test_table_7"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -496,7 +495,7 @@ mod tests { async fn test_garbage_collection() { type Data = u32; - let table_path = "test_table_8"; + let table_path = Path::new("test_table_8"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); From 002ac4c6485730eb9dfe2da9978be23e1a382cd4 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 19:37:10 +0100 Subject: [PATCH 09/23] Selet responses + table creation --- minisql/src/interpreter2.rs | 48 +++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index b96a071..2ef8765 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -105,28 +105,31 @@ impl State { self.tables.push(table); } - async fn select_all_rows(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection) -> DbResult<()> { + async fn select_all_rows(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection) -> DbResult { + let mut count = 0; while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? { + count += 1; let row: Row = From::from(entry); let restricted_row = row.restrict_columns(&column_selection); response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; } - // TODO: You need to write COmmand complete - Ok(()) + + Ok(count) } - async fn select_eq(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult<()> { + async fn select_eq(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult { let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; + let count = entries.len(); for entry in entries { let row: Row = From::from(entry); let restricted_row = row.restrict_columns(&column_selection); response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; } - // TODO: You need to write COmmand complete - Ok(()) + + Ok(count) } - pub async fn interpret(&mut self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { + pub async fn interpret(&mut self, db_path: &Path, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { use Operation::*; match operation { @@ -135,15 +138,11 @@ impl State { let cursor = table.read().await?; response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; - match maybe_condition { - None => { - Self::select_all_rows(&table, cursor, response_writer, column_selection).await - } - - Some(Condition::Eq(eq_column, value)) => { - Self::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await - } - } + let count = match maybe_condition { + None => Self::select_all_rows(&table, cursor, response_writer, column_selection).await?, + Some(Condition::Eq(eq_column, value)) => Self::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await? + }; + response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) } Insert(table_position, values) => { let table: &mut Table = self.table_at_mut(table_position); @@ -156,9 +155,6 @@ impl State { todo!() } CreateTable(table_schema) => { - // TODO: This needs to be given from a config - let db_path = "test_db"; - let table = Table::new(table_schema, Path::new(db_path)).await?; self.attach_table(table).await; response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e)) @@ -181,6 +177,16 @@ mod tests { use crate::type_system::{DbType, IndexableValue, Value}; use std::collections::HashSet; + impl Drop for State { + fn drop(&mut self) { + println!("CLEANING UP INTERPRETER STATE"); + // TODO + // let table_folder = self.header.table_folder.clone(); + // Seems no one has figured out how to do AsyncDrop yet. + // std::fs::remove_dir_all(table_folder).unwrap(); + } + } + fn users_schema() -> TableSchema { TableSchema::new( "users".to_string(), @@ -190,8 +196,8 @@ mod tests { ) } - #[test] - fn test_table_creation() { + #[tokio::test] + async fn test_table_creation() { let mut state = State::new(); let users_schema = users_schema(); let users = users_schema.table_name().clone(); From ad067156762cbff75f0d5b6a1279bab6ae90ccf8 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 19:57:42 +0100 Subject: [PATCH 10/23] Implement ResponseWriterStub for testing --- minisql/src/interpreter2.rs | 7 ++++- minisql/src/response_writer.rs | 50 ++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 2ef8765..72271e7 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -174,6 +174,7 @@ mod tests { use super::*; use crate::operation::Operation; use crate::schema::Column; + use crate::response_writer::ResponseWriterStub; use crate::type_system::{DbType, IndexableValue, Value}; use std::collections::HashSet; @@ -202,8 +203,12 @@ mod tests { let users_schema = users_schema(); let users = users_schema.table_name().clone(); + let db_path = Path::new("db-test-0"); + + let mut response_writer = ResponseWriterStub::new(); + // state - // .interpret(Operation::CreateTable(users_schema)) + // .interpret(db_path, &mut response_writer, Operation::CreateTable(users_schema)).await // .unwrap(); // assert!(state.tables.len() == 1); diff --git a/minisql/src/response_writer.rs b/minisql/src/response_writer.rs index 15bd788..30158c5 100644 --- a/minisql/src/response_writer.rs +++ b/minisql/src/response_writer.rs @@ -34,3 +34,53 @@ pub trait ResponseWriter { async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()>; async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()>; } + +// ===Stub implementation for testing=== +// +pub struct ResponseWriterStub {} + +impl ResponseWriterStub { + pub fn new() -> Self { + ResponseWriterStub {} + } +} + +#[async_trait] +impl ResponseWriter for ResponseWriterStub +{ + async fn write_table_header( + &mut self, + table_schema: &TableSchema, + columns: &ColumnSelection, + ) -> anyhow::Result<()> { + let column_names = table_schema.get_columns(); + for &column in columns { + let column_name = column_names[column]; + print!("{}, ", column_name) + } + println!(); + + Ok(()) + } + + async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()> { + for (_, value) in row.iter() { + print!("{:?}", value) + } + println!(); + + Ok(()) + } + + async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()> { + use CompleteStatus::*; + match status { + Insert { oid, rows } => println!("oid = {}, rows = {}", oid, rows), + Delete(count) => println!("Deleted {}", count), + Select(count) => println!("Selected {}", count), + CreateTable => println!("Table created"), + CreateIndex => println!("Index created"), + } + Ok(()) + } +} From b884a6286d3ad9c98c8f70fc4f0a7a5810e7ddba Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 20:04:18 +0100 Subject: [PATCH 11/23] Test table creation and select --- minisql/src/interpreter2.rs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 72271e7..dfcd568 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -177,14 +177,17 @@ mod tests { use crate::response_writer::ResponseWriterStub; use crate::type_system::{DbType, IndexableValue, Value}; use std::collections::HashSet; + use tokio::fs::{File, OpenOptions, DirBuilder}; + use tokio::fs; impl Drop for State { fn drop(&mut self) { println!("CLEANING UP INTERPRETER STATE"); - // TODO - // let table_folder = self.header.table_folder.clone(); + + // TODO: This should be part of the state schema + let table_folder = "db-test-0"; // Seems no one has figured out how to do AsyncDrop yet. - // std::fs::remove_dir_all(table_folder).unwrap(); + std::fs::remove_dir_all(table_folder).unwrap(); } } @@ -204,12 +207,24 @@ mod tests { let users = users_schema.table_name().clone(); let db_path = Path::new("db-test-0"); + fs::create_dir(db_path).await.unwrap(); let mut response_writer = ResponseWriterStub::new(); - // state - // .interpret(db_path, &mut response_writer, Operation::CreateTable(users_schema)).await - // .unwrap(); + state + .interpret(db_path, &mut response_writer, Operation::CreateTable(users_schema.clone())).await + .unwrap(); + + let users_position: TablePosition = 0; + state + .interpret(db_path, &mut response_writer, Operation::Select( + users_position, + users_schema.all_selection(), + None, + )).await + .unwrap(); + + // assert!(false); // assert!(state.tables.len() == 1); // let table = &state.tables[0]; From 4c2e14acdcddd2726ef5026b7a2be4b477b6d648 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 20:31:43 +0100 Subject: [PATCH 12/23] Introduce a global database lock (for table creation) --- minisql/src/error.rs | 2 + minisql/src/interpreter2.rs | 77 +++++++++++++++++++++++++++++++++---- 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/minisql/src/error.rs b/minisql/src/error.rs index 827de93..db2d416 100644 --- a/minisql/src/error.rs +++ b/minisql/src/error.rs @@ -13,6 +13,8 @@ pub enum RuntimeError { AttemptToIndexNonIndexableColumn(TableName, ColumnName), #[error("table {0} already indexes column {1}")] AttemptToIndexAlreadyIndexedColumn(TableName, ColumnName), + #[error("File-System Error: {0}")] + IoError(std::io::Error), #[error("Storage Engine error for table {0}: {1}")] StorageEngineError(TableName, storage_engine::error::Error), #[error("runtime anyhow error: {0}")] diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index dfcd568..f09c4e8 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -7,7 +7,10 @@ use crate::response_writer::{ResponseWriter, CompleteStatus}; use crate::internals::row::Row; use bimap::BiMap; -use std::path::Path; +use std::path::{Path, PathBuf}; + +use tokio::sync::RwLock; +use tokio::fs; use storage_engine::store::Store; use storage_engine::cursor::{ReadCursor, WriteCursor}; @@ -21,6 +24,12 @@ pub struct State { tables: Tables, } + +pub struct StateHandler { + db_path: PathBuf, + state: RwLock, +} + pub type Tables = Vec
; #[derive(Debug)] @@ -166,6 +175,61 @@ impl State { } } +impl StateHandler { + pub async fn new(db_path: PathBuf) -> DbResult { + fs::create_dir(db_path.clone()).await.map_err(|e| RuntimeError::IoError(e))?; + + let state = Self { + db_path, + state: RwLock::new(State::new()), + }; + Ok(state) + } + + pub async fn connect(db_path: PathBuf) -> Self { + todo!() + } + + pub async fn interpret(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { + use Operation::*; + + match operation { + Select(table_position, column_selection, maybe_condition) => { + let state = self.state.read().await; + + let table: &Table = state.table_at(table_position); + let cursor = table.read().await?; + + response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; + let count = match maybe_condition { + None => State::select_all_rows(&table, cursor, response_writer, column_selection).await?, + Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await? + }; + response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) + } + Insert(table_position, values) => { + let state = self.state.read().await; + todo!() + } + Delete(table_position, maybe_condition) => { + let state = self.state.read().await; + + todo!() + } + CreateTable(table_schema) => { + let mut state = self.state.write().await; + let table = Table::new(table_schema, &self.db_path).await?; + state.attach_table(table).await; + response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e)) + } + CreateIndex(table_position, column) => { + let state = self.state.read().await; + todo!() + } + } + } +} + @@ -202,22 +266,21 @@ mod tests { #[tokio::test] async fn test_table_creation() { - let mut state = State::new(); + let db_path = Path::new("db-test-0"); + let state = StateHandler::new(db_path.to_path_buf()).await.unwrap(); + let users_schema = users_schema(); let users = users_schema.table_name().clone(); - let db_path = Path::new("db-test-0"); - fs::create_dir(db_path).await.unwrap(); - let mut response_writer = ResponseWriterStub::new(); state - .interpret(db_path, &mut response_writer, Operation::CreateTable(users_schema.clone())).await + .interpret(&mut response_writer, Operation::CreateTable(users_schema.clone())).await .unwrap(); let users_position: TablePosition = 0; state - .interpret(db_path, &mut response_writer, Operation::Select( + .interpret(&mut response_writer, Operation::Select( users_position, users_schema.all_selection(), None, From 84fc58471d229a9b2ab2d08939f5551e69f18c9f Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 21:02:30 +0100 Subject: [PATCH 13/23] Make table schemas less painful --- minisql/src/interpreter2.rs | 99 +++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 48 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index f09c4e8..2dfb716 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -9,7 +9,8 @@ use crate::internals::row::Row; use bimap::BiMap; use std::path::{Path, PathBuf}; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::rc::Rc; use tokio::fs; use storage_engine::store::Store; @@ -21,24 +22,24 @@ use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex; #[derive(Debug)] pub struct State { table_name_position_mapping: BiMap, + table_schemas: Vec>, tables: Tables, } - pub struct StateHandler { db_path: PathBuf, state: RwLock, } -pub type Tables = Vec
; +pub type Tables = Vec>; #[derive(Debug)] pub struct Table { - schema: TableSchema, + schema: Rc, store: Store } -pub type DbSchema<'a> = Vec<(TableName, TablePosition, &'a TableSchema)>; +pub type DbSchema = Vec<(TableName, TablePosition, Rc)>; // To satisfy clippy. impl Default for State { fn default() -> Self { @@ -56,7 +57,7 @@ impl Table { let store: Store = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap(); let table = Self { - schema: table_schema, + schema: Rc::new(table_schema), store, }; Ok(table) @@ -72,8 +73,8 @@ impl Table { Ok(cursor) } - pub fn schema(&self) -> &TableSchema { - &self.schema + pub fn schema(&self) -> Rc { + self.schema.clone() } pub fn table_name(&self) -> &TableName { @@ -85,6 +86,7 @@ impl State { pub fn new() -> Self { Self { table_name_position_mapping: BiMap::new(), + table_schemas: vec![], tables: vec![], } } @@ -92,18 +94,18 @@ impl State { pub fn db_schema(&self) -> DbSchema { let mut schema: DbSchema = Vec::new(); for (table_name, &table_position) in &self.table_name_position_mapping { - let table_schema: &TableSchema = self.tables[table_position].schema(); + let table_schema: Rc = self.table_schemas[table_position].clone(); schema.push((table_name.clone(), table_position, table_schema)); } schema } - fn table_at(&self, table_position: TablePosition) -> &Table { - &self.tables[table_position] + async fn table_at(&self, table_position: TablePosition) -> RwLockReadGuard
{ + self.tables[table_position].read().await } - fn table_at_mut(&mut self, table_position: TablePosition) -> &mut Table { - &mut self.tables[table_position] + async fn table_at_mut(&self, table_position: TablePosition) -> RwLockWriteGuard
{ + self.tables[table_position].write().await } async fn attach_table(&mut self, table: Table) { @@ -111,7 +113,8 @@ impl State { let new_table_position: TablePosition = self.tables.len(); self.table_name_position_mapping .insert(table.schema().table_name().clone(), new_table_position); - self.tables.push(table); + self.table_schemas.push(table.schema()); + self.tables.push(RwLock::new(table)); } async fn select_all_rows(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection) -> DbResult { @@ -138,41 +141,41 @@ impl State { Ok(count) } - pub async fn interpret(&mut self, db_path: &Path, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { - use Operation::*; + // pub async fn interpret(&mut self, db_path: &Path, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { + // use Operation::*; - match operation { - Select(table_position, column_selection, maybe_condition) => { - let table: &Table = self.table_at(table_position); - let cursor = table.read().await?; + // match operation { + // Select(table_position, column_selection, maybe_condition) => { + // let table: &Table = self.table_at(table_position); + // let cursor = table.read().await?; - response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; - let count = match maybe_condition { - None => Self::select_all_rows(&table, cursor, response_writer, column_selection).await?, - Some(Condition::Eq(eq_column, value)) => Self::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await? - }; - response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) - } - Insert(table_position, values) => { - let table: &mut Table = self.table_at_mut(table_position); - let cursor = table.write().await?; - todo!() - } - Delete(table_position, maybe_condition) => { - let table: &mut Table = self.table_at_mut(table_position); - let cursor = table.write().await?; - todo!() - } - CreateTable(table_schema) => { - let table = Table::new(table_schema, Path::new(db_path)).await?; - self.attach_table(table).await; - response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e)) - } - CreateIndex(table_position, column) => { - todo!() - } - } - } + // response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; + // let count = match maybe_condition { + // None => Self::select_all_rows(&table, cursor, response_writer, column_selection).await?, + // Some(Condition::Eq(eq_column, value)) => Self::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await? + // }; + // response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) + // } + // Insert(table_position, values) => { + // let table: &mut Table = self.table_at_mut(table_position); + // let cursor = table.write().await?; + // todo!() + // } + // Delete(table_position, maybe_condition) => { + // let table: &mut Table = self.table_at_mut(table_position); + // let cursor = table.write().await?; + // todo!() + // } + // CreateTable(table_schema) => { + // let table = Table::new(table_schema, Path::new(db_path)).await?; + // self.attach_table(table).await; + // response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e)) + // } + // CreateIndex(table_position, column) => { + // todo!() + // } + // } + // } } impl StateHandler { @@ -197,7 +200,7 @@ impl StateHandler { Select(table_position, column_selection, maybe_condition) => { let state = self.state.read().await; - let table: &Table = state.table_at(table_position); + let table = state.table_at(table_position).await; let cursor = table.read().await?; response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; From 610d70378efccd790b649872bfad64693b79ae85 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 21:07:10 +0100 Subject: [PATCH 14/23] Remove redundant code --- minisql/src/interpreter2.rs | 38 ++----------------------------------- 1 file changed, 2 insertions(+), 36 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 2dfb716..7004b1f 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -140,42 +140,6 @@ impl State { Ok(count) } - - // pub async fn interpret(&mut self, db_path: &Path, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { - // use Operation::*; - - // match operation { - // Select(table_position, column_selection, maybe_condition) => { - // let table: &Table = self.table_at(table_position); - // let cursor = table.read().await?; - - // response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; - // let count = match maybe_condition { - // None => Self::select_all_rows(&table, cursor, response_writer, column_selection).await?, - // Some(Condition::Eq(eq_column, value)) => Self::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await? - // }; - // response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) - // } - // Insert(table_position, values) => { - // let table: &mut Table = self.table_at_mut(table_position); - // let cursor = table.write().await?; - // todo!() - // } - // Delete(table_position, maybe_condition) => { - // let table: &mut Table = self.table_at_mut(table_position); - // let cursor = table.write().await?; - // todo!() - // } - // CreateTable(table_schema) => { - // let table = Table::new(table_schema, Path::new(db_path)).await?; - // self.attach_table(table).await; - // response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e)) - // } - // CreateIndex(table_position, column) => { - // todo!() - // } - // } - // } } impl StateHandler { @@ -216,6 +180,8 @@ impl StateHandler { } Delete(table_position, maybe_condition) => { let state = self.state.read().await; + let mut table = state.table_at_mut(table_position).await; + let cursor = table.write().await?; todo!() } From 9b9f9f16f66d034d83209b2d7e89a83545f8f91e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jind=C5=99ich=20Moravec?= Date: Mon, 5 Feb 2024 21:36:58 +0100 Subject: [PATCH 15/23] feat: metadata serialization --- Cargo.lock | 5 ++- minisql/Cargo.toml | 3 +- minisql/src/error.rs | 13 ++---- minisql/src/interpreter2.rs | 44 +++++++++++++++++++-- storage_engine/src/segments/store_header.rs | 6 +-- storage_engine/src/store.rs | 2 +- 6 files changed, 54 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b653809..42c5df8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,6 +289,7 @@ dependencies = [ "bincode", "proto", "serde", + "serde_json", "storage_engine", "thiserror", "tokio", @@ -557,9 +558,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.112" +version = "1.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d1bd37ce2324cf3bf85e5a25f96eb4baf0d5aa6eba43e7ae8958870c4ec48ed" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" dependencies = [ "itoa", "ryu", diff --git a/minisql/Cargo.toml b/minisql/Cargo.toml index ca06eb6..4029a5c 100644 --- a/minisql/Cargo.toml +++ b/minisql/Cargo.toml @@ -11,8 +11,9 @@ anyhow = "1.0.79" async-trait = "0.1.77" bimap = { version = "0.6.3", features = ["serde"] } bincode = "2.0.0-rc.3" -serde = { version = "1.0.196", features = ["derive"] } +serde = { version = "1.0.196", features = ["derive", "rc"] } tokio = { version = "1.34.0", features = ["full"] } thiserror = "1.0.50" proto = { path = "../proto" } storage_engine = { path = "../storage_engine" } +serde_json = "1.0.113" diff --git a/minisql/src/error.rs b/minisql/src/error.rs index db2d416..03b772f 100644 --- a/minisql/src/error.rs +++ b/minisql/src/error.rs @@ -14,11 +14,13 @@ pub enum RuntimeError { #[error("table {0} already indexes column {1}")] AttemptToIndexAlreadyIndexedColumn(TableName, ColumnName), #[error("File-System Error: {0}")] - IoError(std::io::Error), + IoError(#[from] std::io::Error), #[error("Storage Engine error for table {0}: {1}")] StorageEngineError(TableName, storage_engine::error::Error), #[error("runtime anyhow error: {0}")] - AnyhowError(anyhow::Error) + AnyhowError(#[from] anyhow::Error), + #[error("serde error: {0}")] + SerdeError(#[from] serde_json::Error), } #[derive(Debug, Error)] @@ -32,10 +34,3 @@ pub enum TypeConversionError { #[error("unknown type with oid {oid} and size {size}")] UnknownType { oid: PgOid, size: i16 }, } - - -impl From for RuntimeError { - fn from(e: anyhow::Error) -> RuntimeError { - Self::AnyhowError(e) - } -} diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 7004b1f..cb4eea1 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -11,6 +11,7 @@ use std::path::{Path, PathBuf}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::rc::Rc; +use serde::{Deserialize, Serialize}; use tokio::fs; use storage_engine::store::Store; @@ -18,11 +19,14 @@ use storage_engine::cursor::{ReadCursor, WriteCursor}; use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex; +const METADATA_FILE: &'static str = "metadata.json"; + // ==============Interpreter================ -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct State { table_name_position_mapping: BiMap, table_schemas: Vec>, + #[serde(skip)] tables: Tables, } @@ -63,6 +67,19 @@ impl Table { Ok(table) } + async fn connect(table_schema: Rc, db_path: &Path) -> DbResult { + let table_folder_name = table_schema.table_name(); + let path_to_table_folder = db_path.join(table_folder_name); + + let store: Store = Store::connect(&path_to_table_folder).await.map_err(|e| RuntimeError::StorageEngineError(table_schema.table_name().to_string(), e))?; + + let table = Self { + schema: table_schema, + store, + }; + Ok(table) + } + async fn read(&self) -> DbResult> { let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?; Ok(cursor) @@ -153,8 +170,29 @@ impl StateHandler { Ok(state) } - pub async fn connect(db_path: PathBuf) -> Self { - todo!() + pub async fn connect(db_path: PathBuf) -> DbResult { + let metadata_file = db_path.join(METADATA_FILE); + let metadata_raw = fs::read_to_string(metadata_file).await?; + let mut metadata: State = serde_json::from_str(&metadata_raw)?; + + let mut tables = Vec::with_capacity(metadata.table_schemas.len()); + for table_schema in &metadata.table_schemas { + let table = Table::connect(table_schema.clone(), &db_path).await?; + tables.push(RwLock::new(table)); + } + + metadata.tables = tables; + Ok(Self { + db_path, + state: RwLock::new(metadata), + }) + } + + async fn save_metadata(&self) -> DbResult<()> { + let metadata_file = self.db_path.join(METADATA_FILE); + let state = self.state.read().await; + let metadata_raw = serde_json::to_string(&*state)?; + fs::write(metadata_file, metadata_raw).await.map_err(|e| RuntimeError::IoError(e)) } pub async fn interpret(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { diff --git a/storage_engine/src/segments/store_header.rs b/storage_engine/src/segments/store_header.rs index 77d6de6..b8bdfd5 100644 --- a/storage_engine/src/segments/store_header.rs +++ b/storage_engine/src/segments/store_header.rs @@ -17,7 +17,7 @@ pub struct StoreHeader { #[derive(Debug, Clone)] pub struct StoreHeaderFixedPart { - pub table_folder: String, // This one is not encoded into the file + pub table_folder: PathBuf, // This one is not encoded into the file pub number_of_columns: usize, pub deleted_count: usize, @@ -64,7 +64,7 @@ impl StoreHeader { vec![0; Self::indexed_columns_size(header.number_of_columns)] } - pub async fn decode_fixed(table_folder: &str, result: &[u8]) -> Result { + pub async fn decode_fixed(table_folder: &PathBuf, result: &[u8]) -> Result { let (number_of_columns, _) = decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; @@ -78,7 +78,7 @@ impl StoreHeader { decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; let header = StoreHeaderFixedPart { - table_folder: table_folder.to_string(), + table_folder: table_folder.clone(), number_of_columns, deleted_count, total_count, diff --git a/storage_engine/src/store.rs b/storage_engine/src/store.rs index cf741a8..bb256ff 100644 --- a/storage_engine/src/store.rs +++ b/storage_engine/src/store.rs @@ -120,7 +120,7 @@ impl Store { Ok(file) } - pub async fn connect(table_folder: &str) -> Result + pub async fn connect(table_folder: &PathBuf) -> Result where T: std::fmt::Debug + Encode + Decode + Ord { let path_to_table = Path::new(table_folder); From 997162c555b31c1ae55559011272a78ddf0e1597 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 21:46:18 +0100 Subject: [PATCH 16/23] Prepare for delete --- minisql/src/interpreter2.rs | 88 ++++++++++++++++++++++++---- minisql/src/response_writer.rs | 2 +- storage_engine/src/cursor.rs | 10 ++++ storage_engine/src/segments/entry.rs | 4 -- 4 files changed, 86 insertions(+), 18 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 7004b1f..9ca3de1 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -14,9 +14,10 @@ use std::rc::Rc; use tokio::fs; use storage_engine::store::Store; +use storage_engine::segments::entry::Entry; use storage_engine::cursor::{ReadCursor, WriteCursor}; use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; -use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex; +use storage_engine::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex}; // ==============Interpreter================ #[derive(Debug)] @@ -140,6 +141,20 @@ impl State { Ok(count) } + + async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>, response_writer: &mut Writer) -> DbResult { + // cursor. + let count = cursor.delete_all_entries(true) + .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; + Ok(count) + } + + async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, response_writer: &mut Writer, eq_column: Column, value: Value) -> DbResult { + let count = + cursor.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true) + .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; + Ok(count) + } } impl StateHandler { @@ -176,14 +191,26 @@ impl StateHandler { } Insert(table_position, values) => { let state = self.state.read().await; - todo!() + let mut table = state.table_at_mut(table_position).await; + let mut cursor = table.write().await?; + + let entry = Entry::new(values); + cursor.insert_entry(entry).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; + + response_writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await.map_err(|e| RuntimeError::AnyhowError(e)) } Delete(table_position, maybe_condition) => { let state = self.state.read().await; let mut table = state.table_at_mut(table_position).await; - let cursor = table.write().await?; + let table_name = table.table_name().clone(); + let mut cursor = table.write().await?; - todo!() + let count = match maybe_condition { + None => State::delete_all_rows(table_name, cursor, response_writer).await?, + Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, response_writer, eq_column, value).await? + }; + + response_writer.write_command_complete(CompleteStatus::Delete(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) } CreateTable(table_schema) => { let mut state = self.state.write().await; @@ -247,16 +274,51 @@ mod tests { .interpret(&mut response_writer, Operation::CreateTable(users_schema.clone())).await .unwrap(); - let users_position: TablePosition = 0; - state - .interpret(&mut response_writer, Operation::Select( - users_position, - users_schema.all_selection(), - None, - )).await - .unwrap(); + { + println!("==EMPTY SELECT==="); + let users_position: TablePosition = 0; + state + .interpret(&mut response_writer, Operation::Select( + users_position, + users_schema.all_selection(), + None, + )).await + .unwrap(); + } - // assert!(false); + { + let users = 0; + let (id, name, age) = ( + Value::Uuid(0), + Value::String("Plato".to_string()), + Value::Int(64), + ); + + println!("About to insert!"); + + state + .interpret(&mut response_writer, Operation::Insert( + users, + vec![id.clone(), name.clone(), age.clone()], + )).await + .unwrap(); + } + { + println!("==SELECT==="); + let users_position: TablePosition = 0; + state + .interpret(&mut response_writer, Operation::Select( + users_position, + users_schema.all_selection(), + None, + )).await + .unwrap(); + } + + + + + assert!(false); // assert!(state.tables.len() == 1); // let table = &state.tables[0]; diff --git a/minisql/src/response_writer.rs b/minisql/src/response_writer.rs index 30158c5..240a2f8 100644 --- a/minisql/src/response_writer.rs +++ b/minisql/src/response_writer.rs @@ -65,7 +65,7 @@ impl ResponseWriter for ResponseWriterStub async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()> { for (_, value) in row.iter() { - print!("{:?}", value) + print!("{:?}, ", value) } println!(); diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 2bc61c1..43b0ca5 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -244,6 +244,16 @@ impl <'cursor, T> WriteCursor<'cursor, T> Ok(count) } + pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result + where T: Encode + Decode + Ord + Send + Sync + Clone + { + let count = todo!(); + if enable_garbage_collector { + self.attempt_garbage_collection_if_necessary().await?; + } + Ok(count) + } + pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result where T: Encode + Decode + Ord + Send + Sync + Clone { diff --git a/storage_engine/src/segments/entry.rs b/storage_engine/src/segments/entry.rs index 64f6381..a2f8bc8 100644 --- a/storage_engine/src/segments/entry.rs +++ b/storage_engine/src/segments/entry.rs @@ -23,10 +23,6 @@ impl Entry { Self { header: EntryHeader { is_deleted: false }, data } } - pub fn new_deleted(data: Vec) -> Self { - Self { header: EntryHeader { is_deleted: true}, data } - } - // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] pub fn encode(&self) -> Result> where T: Encode From 9af6ad90f36faa60ffbb5e6f15432cf70f45cd8a Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 21:50:42 +0100 Subject: [PATCH 17/23] Implement delete --- minisql/src/interpreter2.rs | 9 ++++----- storage_engine/src/cursor.rs | 7 ++++++- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 9ca3de1..5105597 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -142,14 +142,13 @@ impl State { Ok(count) } - async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>, response_writer: &mut Writer) -> DbResult { - // cursor. + async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>) -> DbResult { let count = cursor.delete_all_entries(true) .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; Ok(count) } - async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, response_writer: &mut Writer, eq_column: Column, value: Value) -> DbResult { + async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, eq_column: Column, value: Value) -> DbResult { let count = cursor.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true) .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; @@ -206,8 +205,8 @@ impl StateHandler { let mut cursor = table.write().await?; let count = match maybe_condition { - None => State::delete_all_rows(table_name, cursor, response_writer).await?, - Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, response_writer, eq_column, value).await? + None => State::delete_all_rows(table_name, cursor).await?, + Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, eq_column, value).await? }; response_writer.write_command_complete(CompleteStatus::Delete(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 43b0ca5..5c76577 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -247,7 +247,12 @@ impl <'cursor, T> WriteCursor<'cursor, T> pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result where T: Encode + Decode + Ord + Send + Sync + Clone { - let count = todo!(); + let mut count = 0; + while let Some(entry) = self.next_alive().await? { + count += 1; + self.mark_deleted_at(entry.file_position, false).await? + } + if enable_garbage_collector { self.attempt_garbage_collection_if_necessary().await?; } From 48442cdd9e2db5000151b99779d2d0d64a9fd10b Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 21:54:20 +0100 Subject: [PATCH 18/23] Attach index --- minisql/src/interpreter2.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 5105597..043e241 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -190,6 +190,7 @@ impl StateHandler { } Insert(table_position, values) => { let state = self.state.read().await; + let mut table = state.table_at_mut(table_position).await; let mut cursor = table.write().await?; @@ -200,9 +201,10 @@ impl StateHandler { } Delete(table_position, maybe_condition) => { let state = self.state.read().await; + let mut table = state.table_at_mut(table_position).await; let table_name = table.table_name().clone(); - let mut cursor = table.write().await?; + let cursor = table.write().await?; let count = match maybe_condition { None => State::delete_all_rows(table_name, cursor).await?, @@ -219,7 +221,11 @@ impl StateHandler { } CreateIndex(table_position, column) => { let state = self.state.read().await; - todo!() + + let mut table = state.table_at_mut(table_position).await; + let mut cursor = table.write().await?; + cursor.attach_index(column as storage_engine::store::Column).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; + response_writer.write_command_complete(CompleteStatus::CreateIndex).await.map_err(|e| RuntimeError::AnyhowError(e)) } } } From f5d45f6a1d3d7658ba765196dcdea788940571c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jind=C5=99ich=20Moravec?= Date: Mon, 5 Feb 2024 21:59:33 +0100 Subject: [PATCH 19/23] feat: connect server to the new interpreter --- minisql/src/interpreter2.rs | 25 +++++++--- parser/src/core.rs | 2 +- parser/src/validation.rs | 67 ++++++++++++------------- server/src/config.rs | 8 +-- server/src/main.rs | 98 ++++++------------------------------- server/src/persistence.rs | 15 ------ 6 files changed, 70 insertions(+), 145 deletions(-) delete mode 100644 server/src/persistence.rs diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index cb4eea1..81075cb 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -8,9 +8,9 @@ use crate::internals::row::Row; use bimap::BiMap; use std::path::{Path, PathBuf}; +use std::sync::Arc; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use std::rc::Rc; use serde::{Deserialize, Serialize}; use tokio::fs; @@ -25,7 +25,7 @@ const METADATA_FILE: &'static str = "metadata.json"; #[derive(Debug, Serialize, Deserialize)] pub struct State { table_name_position_mapping: BiMap, - table_schemas: Vec>, + table_schemas: Vec>, #[serde(skip)] tables: Tables, } @@ -39,11 +39,11 @@ pub type Tables = Vec>; #[derive(Debug)] pub struct Table { - schema: Rc, + schema: Arc, store: Store } -pub type DbSchema = Vec<(TableName, TablePosition, Rc)>; +pub type DbSchema = Vec<(TableName, TablePosition, Arc)>; // To satisfy clippy. impl Default for State { fn default() -> Self { @@ -61,13 +61,13 @@ impl Table { let store: Store = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap(); let table = Self { - schema: Rc::new(table_schema), + schema: Arc::new(table_schema), store, }; Ok(table) } - async fn connect(table_schema: Rc, db_path: &Path) -> DbResult { + async fn connect(table_schema: Arc, db_path: &Path) -> DbResult { let table_folder_name = table_schema.table_name(); let path_to_table_folder = db_path.join(table_folder_name); @@ -90,7 +90,7 @@ impl Table { Ok(cursor) } - pub fn schema(&self) -> Rc { + pub fn schema(&self) -> Arc { self.schema.clone() } @@ -111,7 +111,7 @@ impl State { pub fn db_schema(&self) -> DbSchema { let mut schema: DbSchema = Vec::new(); for (table_name, &table_position) in &self.table_name_position_mapping { - let table_schema: Rc = self.table_schemas[table_position].clone(); + let table_schema: Arc = self.table_schemas[table_position].clone(); schema.push((table_name.clone(), table_position, table_schema)); } schema @@ -160,6 +160,11 @@ impl State { } impl StateHandler { + pub fn is_existing_db(db_path: &PathBuf) -> bool { + db_path.exists() && db_path.is_dir() && + db_path.join(METADATA_FILE).exists() && db_path.join(METADATA_FILE).is_file() + } + pub async fn new(db_path: PathBuf) -> DbResult { fs::create_dir(db_path.clone()).await.map_err(|e| RuntimeError::IoError(e))?; @@ -195,6 +200,10 @@ impl StateHandler { fs::write(metadata_file, metadata_raw).await.map_err(|e| RuntimeError::IoError(e)) } + pub async fn read_state(&self) -> RwLockReadGuard { + self.state.read().await + } + pub async fn interpret(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { use Operation::*; diff --git a/parser/src/core.rs b/parser/src/core.rs index 1782f6b..28059fe 100644 --- a/parser/src/core.rs +++ b/parser/src/core.rs @@ -1,5 +1,5 @@ use crate::syntax::RawQuerySyntax; -use minisql::{interpreter::DbSchema, operation::Operation}; +use minisql::{interpreter2::DbSchema, operation::Operation}; use nom::{branch::alt, character::complete::{multispace0, char}, multi::many1, sequence::{delimited, terminated}, IResult}; use thiserror::Error; diff --git a/parser/src/validation.rs b/parser/src/validation.rs index 741d0fd..87eec28 100644 --- a/parser/src/validation.rs +++ b/parser/src/validation.rs @@ -1,4 +1,5 @@ use std::collections::{BTreeMap, HashSet}; +use std::sync::Arc; use thiserror::Error; use crate::parsing::literal::Literal; @@ -6,7 +7,7 @@ use crate::syntax; use crate::syntax::{ColumnSchema, RawQuerySyntax, RawTableSchema}; use minisql::operation; use minisql::{ - interpreter::DbSchema, + interpreter2::DbSchema, operation::Operation, schema::{Column, ColumnName, TableName, TablePosition, TableSchema}, type_system::DbType, @@ -63,15 +64,15 @@ pub fn validate_operation( } } -fn validate_table_exists<'a>( - db_schema: &DbSchema<'a>, - table_name: &'a TableName, -) -> Result<(TablePosition, &'a TableSchema), ValidationError> { +fn validate_table_exists( + db_schema: &DbSchema, + table_name: &TableName, +) -> Result<(TablePosition, Arc), ValidationError> { db_schema .iter() .find(|(tname, _, _)| table_name.eq(tname)) .ok_or(ValidationError::TableDoesNotExist(table_name.to_string())) - .map(|(_, table_position, table_schema)| (*table_position, *table_schema)) + .map(|(_, table_position, table_schema)| (*table_position, table_schema.clone())) } fn validate_create_table( @@ -167,7 +168,7 @@ fn validate_select( .iter() .filter_map(|column_name| schema.get_column(column_name)) .collect(); - let validated_condition = validate_condition(condition, schema)?; + let validated_condition = validate_condition(condition, &schema)?; Ok(Operation::Select( table_position, selection, @@ -178,7 +179,7 @@ fn validate_select( } } syntax::ColumnSelection::All => { - let validated_condition = validate_condition(condition, schema)?; + let validated_condition = validate_condition(condition, &schema)?; Ok(Operation::Select( table_position, schema.all_selection(), @@ -267,13 +268,13 @@ fn validate_delete( db_schema: &DbSchema, ) -> Result { let (table_position, schema) = validate_table_exists(db_schema, &table_name)?; - let validated_condition = validate_condition(condition, schema)?; + let validated_condition = validate_condition(condition, &schema)?; Ok(Operation::Delete(table_position, validated_condition)) } fn validate_condition( condition: Option, - schema: &TableSchema, + schema: &Arc, ) -> Result, ValidationError> { match condition { Some(condition) => match condition { @@ -338,14 +339,14 @@ where None } -fn get_table_schema<'a>( - db_schema: &DbSchema<'a>, - table_name: &'a TableName, -) -> Option<&'a TableSchema> { +fn get_table_schema( + db_schema: &DbSchema, + table_name: &TableName, +) -> Option> { let (_, _, table_schema) = db_schema .iter() .find(|(tname, _, _)| table_name.eq(tname))?; - Some(table_schema) + Some(table_schema.clone()) } fn literal_to_value(lit: Literal, hint: &DbType) -> Value { @@ -428,11 +429,11 @@ mod tests { } } - fn db_schema(users_schema: &TableSchema) -> DbSchema { - vec![("users".to_string(), 0, users_schema)] + fn db_schema(users_schema: Arc) -> DbSchema { + vec![("users".to_string(), 0, users_schema.clone())] } - fn empty_db_schema() -> DbSchema<'static> { + fn empty_db_schema() -> DbSchema { vec![] } @@ -547,7 +548,7 @@ mod tests { #[test] fn test_create_already_exists() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = CreateTable(raw_users_schema()); let result = validate_operation(syntax, &db_schema); @@ -561,7 +562,7 @@ mod tests { #[test] fn test_select_basic() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let id = 0; let name = 1; @@ -583,7 +584,7 @@ mod tests { #[test] fn test_select_non_existent_table() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Select("does_not_exist".to_string(), ColumnSelection::All, None); @@ -594,7 +595,7 @@ mod tests { #[test] fn test_select_eq() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let id = 0; @@ -622,7 +623,7 @@ mod tests { #[test] fn test_select_eq_columns_selection() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let name = 1; @@ -652,7 +653,7 @@ mod tests { #[test] fn test_select_eq_columns_selection_nonexistent_column_selected() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Select( "users".to_string(), @@ -666,7 +667,7 @@ mod tests { #[test] fn test_select_eq_non_existent_column() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Select( "users".to_string(), @@ -680,7 +681,7 @@ mod tests { #[test] fn test_select_eq_type_error() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Select( "users".to_string(), @@ -695,7 +696,7 @@ mod tests { #[test] fn test_insert() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; @@ -732,7 +733,7 @@ mod tests { #[test] fn test_insert_non_existent_column() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Insert( "users".to_string(), @@ -750,7 +751,7 @@ mod tests { #[test] fn test_insert_ill_typed_column() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Insert( "users".to_string(), @@ -768,7 +769,7 @@ mod tests { #[test] fn test_delete_all() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; @@ -786,7 +787,7 @@ mod tests { #[test] fn test_delete_eq() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let age = 2; @@ -816,7 +817,7 @@ mod tests { #[test] fn test_create_index() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let age = 2; @@ -836,7 +837,7 @@ mod tests { #[test] fn test_create_index_nonexistent_column() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = CreateIndex("users".to_string(), "does_not_exist".to_string()); let result = validate_operation(syntax, &db_schema); diff --git a/server/src/config.rs b/server/src/config.rs index 2dedcbc..f709511 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -17,8 +17,8 @@ pub struct Configuration { help = "Port for the server to listen on" )] port: u16, - #[arg(short, long, help = "Path to the data file")] - file: PathBuf, + #[arg(short, long, help = "Path to the folder for database data")] + folder: PathBuf, #[arg(short, long, help = "Delay between rows in milliseconds")] throttle: Option, } @@ -30,8 +30,8 @@ impl Configuration { } #[inline] - pub fn get_file_path(&self) -> &PathBuf { - &self.file + pub fn get_folder_path(&self) -> &PathBuf { + &self.folder } #[inline] diff --git a/server/src/main.rs b/server/src/main.rs index e1e6f3d..a31ae91 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,14 +1,13 @@ use std::collections::HashMap; -use std::io::ErrorKind; use std::sync::Arc; use clap::Parser; use tokio::io::{BufReader, BufWriter}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::Mutex; -use minisql::interpreter::{Response, State}; -use minisql::response_writer::{CompleteStatus, ResponseWriter}; +use minisql::interpreter2::StateHandler; +use minisql::response_writer::ResponseWriter; use parser::parse_and_validate; use proto::handshake::errors::ServerHandshakeError; use proto::handshake::request::HandshakeRequest; @@ -22,23 +21,21 @@ use proto::writer::protowriter::{ProtoFlush, ProtoWriter}; use crate::cancellation::ResetCancelToken; use crate::config::Configuration; -use crate::persistence::state_to_file; use crate::proto_wrapper::ServerProtoWrapper; mod cancellation; mod config; -mod persistence; mod proto_wrapper; type TokenStore = Arc>>; -type SharedDbState = Arc>; +type SharedDbState = Arc; #[tokio::main] async fn main() -> anyhow::Result<()> { let config = Configuration::parse(); let config = Arc::new(config); - let state = Arc::new(RwLock::new(get_state(&config).await?)); + let state = Arc::new(get_state(&config).await?); let tokens = Arc::new(Mutex::new(HashMap::<(i32, i32), ResetCancelToken>::new())); let addr = config.get_socket_address(); @@ -59,15 +56,12 @@ async fn main() -> anyhow::Result<()> { } } -async fn get_state(config: &Configuration) -> anyhow::Result { - let result = persistence::state_from_file(config.get_file_path()).await; - match result { - Err(ref e) if e.kind() == ErrorKind::NotFound => { - println!("WARNING: No DB state file found, creating new one"); - Ok(State::new()) - } - Err(e) => Err(e)?, - Ok(state) => Ok(state), +async fn get_state(config: &Configuration) -> anyhow::Result { + let path = config.get_folder_path(); + if StateHandler::is_existing_db(&path) { + Ok(StateHandler::connect(path.clone()).await?) + } else { + Ok(StateHandler::new(path.clone()).await?) } } @@ -191,75 +185,11 @@ where token.reset(); let operation = { - let state = state.read().await; - let db_schema = state.db_schema(); + let db_schema = state.read_state().await.db_schema(); parse_and_validate(query, &db_schema)? }; - let need_write = { - let mut state = state.write().await; - let response = state.interpret(operation)?; - - match response { - Response::Deleted(i) => { - writer - .write_command_complete(CompleteStatus::Delete(i)) - .await?; - true - } - Response::Inserted => { - writer - .write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }) - .await?; - true - } - Response::Selected(schema, columns, mut rows) => { - writer.write_table_header(schema, &columns).await?; - match rows.next() { - Some(row) => { - writer.write_table_row(&row).await?; - - let mut sent_rows = 1; - for row in rows { - sent_rows += 1; - writer.write_table_row(&row).await?; - if token.is_canceled() { - token.reset(); - break; - } - } - - writer - .write_command_complete(CompleteStatus::Select(sent_rows)) - .await?; - } - _ => { - writer - .write_command_complete(CompleteStatus::Select(0)) - .await?; - } - } - false - } - Response::TableCreated => { - writer - .write_command_complete(CompleteStatus::CreateTable) - .await?; - true - } - Response::IndexCreated => { - writer - .write_command_complete(CompleteStatus::CreateIndex) - .await?; - true - } - } - }; - - if need_write { - let state = state.read().await; - state_to_file(&state, config.get_file_path()).await?; - } - + // TODO: PASS DOWN RESET CANCEL TOKEN + state.interpret(writer, operation).await?; Ok(()) } diff --git a/server/src/persistence.rs b/server/src/persistence.rs deleted file mode 100644 index 12479c2..0000000 --- a/server/src/persistence.rs +++ /dev/null @@ -1,15 +0,0 @@ -use minisql::interpreter::State; -use std::path::PathBuf; -use tokio::{fs, io}; - -pub async fn state_from_file(path: &PathBuf) -> io::Result { - let content = fs::read_to_string(path).await?; - let state = serde_json::from_str(&content)?; - Ok(state) -} - -pub async fn state_to_file(state: &State, path: &PathBuf) -> io::Result<()> { - let content = serde_json::to_string(state)?; - fs::write(path, content).await?; - Ok(()) -} From d28fc85228d3bf9c39500120ae92cb5e1ccba0d4 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 22:12:59 +0100 Subject: [PATCH 20/23] Fix ordering columns bug --- minisql/src/schema.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/minisql/src/schema.rs b/minisql/src/schema.rs index 570d81d..10e542f 100644 --- a/minisql/src/schema.rs +++ b/minisql/src/schema.rs @@ -59,10 +59,16 @@ impl TableSchema { } pub fn get_columns(&self) -> Vec<&ColumnName> { - self.column_name_position_mapping + let mut columns_in_random_order: Vec<_> = self.column_name_position_mapping .iter() - .map(|(name, _)| name) - .collect() + .collect(); + columns_in_random_order.sort_by(|&(_, column0), &(_, column1)| column0.cmp(column1)); + + let columns: Vec<_> = columns_in_random_order + .iter() + .map(|(name, _)| *name) + .collect(); + columns } pub fn does_column_exist(&self, column_name: &ColumnName) -> bool { From eeb34a51ce6594558c3c7f5f7c817f87a75455e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jind=C5=99ich=20Moravec?= Date: Mon, 5 Feb 2024 22:13:48 +0100 Subject: [PATCH 21/23] feat: pass cancellation token to interpreter --- minisql/src/cancellation.rs | 13 +++++++++++++ minisql/src/interpreter2.rs | 30 +++++++++++++++++++++++------- minisql/src/lib.rs | 1 + server/src/cancellation.rs | 11 +++++++---- server/src/main.rs | 3 +-- 5 files changed, 45 insertions(+), 13 deletions(-) create mode 100644 minisql/src/cancellation.rs diff --git a/minisql/src/cancellation.rs b/minisql/src/cancellation.rs new file mode 100644 index 0000000..d29624c --- /dev/null +++ b/minisql/src/cancellation.rs @@ -0,0 +1,13 @@ +pub trait Cancellation { + fn is_canceled(&self) -> bool; +} + +#[cfg(test)] +pub(crate) struct DummyCancellation; + +#[cfg(test)] +impl Cancellation for DummyCancellation { + fn is_canceled(&self) -> bool { + false + } +} \ No newline at end of file diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 81075cb..8a8d3bd 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -18,6 +18,7 @@ use storage_engine::store::Store; use storage_engine::cursor::{ReadCursor, WriteCursor}; use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex; +use crate::cancellation::Cancellation; const METADATA_FILE: &'static str = "metadata.json"; @@ -134,25 +135,39 @@ impl State { self.tables.push(RwLock::new(table)); } - async fn select_all_rows(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection) -> DbResult { + async fn select_all_rows(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection) -> DbResult + where W: ResponseWriter, + C: Cancellation + { let mut count = 0; while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? { count += 1; let row: Row = From::from(entry); let restricted_row = row.restrict_columns(&column_selection); response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; + + if cancellation.is_canceled() { + break; + } } Ok(count) } - async fn select_eq(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult { + async fn select_eq(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult + where W: ResponseWriter, + C: Cancellation + { let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; let count = entries.len(); for entry in entries { let row: Row = From::from(entry); let restricted_row = row.restrict_columns(&column_selection); response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; + + if cancellation.is_canceled() { + break; + } } Ok(count) @@ -204,7 +219,7 @@ impl StateHandler { self.state.read().await } - pub async fn interpret(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { + pub async fn interpret(&self, response_writer: &mut W, cancellation: &C, operation: Operation) -> DbResult<()> { use Operation::*; match operation { @@ -216,8 +231,8 @@ impl StateHandler { response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; let count = match maybe_condition { - None => State::select_all_rows(&table, cursor, response_writer, column_selection).await?, - Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await? + None => State::select_all_rows(&table, cursor, response_writer, cancellation, column_selection).await?, + Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, cancellation, column_selection, eq_column, value).await? }; response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) } @@ -259,6 +274,7 @@ mod tests { use std::collections::HashSet; use tokio::fs::{File, OpenOptions, DirBuilder}; use tokio::fs; + use crate::cancellation::DummyCancellation; impl Drop for State { fn drop(&mut self) { @@ -291,12 +307,12 @@ mod tests { let mut response_writer = ResponseWriterStub::new(); state - .interpret(&mut response_writer, Operation::CreateTable(users_schema.clone())).await + .interpret(&mut response_writer, &DummyCancellation, Operation::CreateTable(users_schema.clone())).await .unwrap(); let users_position: TablePosition = 0; state - .interpret(&mut response_writer, Operation::Select( + .interpret(&mut response_writer, &DummyCancellation, Operation::Select( users_position, users_schema.all_selection(), None, diff --git a/minisql/src/lib.rs b/minisql/src/lib.rs index 57953a4..97cb941 100644 --- a/minisql/src/lib.rs +++ b/minisql/src/lib.rs @@ -8,3 +8,4 @@ pub mod restricted_row; mod result; pub mod schema; pub mod type_system; +pub mod cancellation; diff --git a/server/src/cancellation.rs b/server/src/cancellation.rs index 4609f48..5519f2d 100644 --- a/server/src/cancellation.rs +++ b/server/src/cancellation.rs @@ -1,5 +1,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use minisql::cancellation::Cancellation; pub struct ResetCancelToken { is_canceled: Arc, @@ -12,10 +13,6 @@ impl ResetCancelToken { } } - pub fn is_canceled(&self) -> bool { - self.is_canceled.load(Ordering::SeqCst) - } - pub fn cancel(&self) { self.is_canceled.store(true, Ordering::SeqCst); } @@ -25,6 +22,12 @@ impl ResetCancelToken { } } +impl Cancellation for ResetCancelToken { + fn is_canceled(&self) -> bool { + self.is_canceled.load(Ordering::SeqCst) + } +} + impl Clone for ResetCancelToken { fn clone(&self) -> Self { Self { diff --git a/server/src/main.rs b/server/src/main.rs index a31ae91..c76c371 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -189,7 +189,6 @@ where parse_and_validate(query, &db_schema)? }; - // TODO: PASS DOWN RESET CANCEL TOKEN - state.interpret(writer, operation).await?; + state.interpret(writer, token, operation).await?; Ok(()) } From 4bc8539f71ce5d28fd649d05828ead5f8fe81e55 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 22:17:20 +0100 Subject: [PATCH 22/23] Make test succeed --- minisql/src/interpreter2.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 8cc7d1c..aaf4282 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -290,13 +290,11 @@ mod tests { use crate::type_system::{DbType, IndexableValue, Value}; use std::collections::HashSet; use tokio::fs::{File, OpenOptions, DirBuilder}; - use tokio::fs; impl Drop for State { fn drop(&mut self) { println!("CLEANING UP INTERPRETER STATE"); - // TODO: This should be part of the state schema let table_folder = "db-test-0"; // Seems no one has figured out how to do AsyncDrop yet. std::fs::remove_dir_all(table_folder).unwrap(); @@ -370,7 +368,7 @@ mod tests { - assert!(false); + // assert!(false); // assert!(state.tables.len() == 1); // let table = &state.tables[0]; From a4077658cfb70c0c00828f6e44c6d1fbca9db36c Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 23:07:56 +0100 Subject: [PATCH 23/23] Saving of metadata --- minisql/src/interpreter2.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index e3f9121..9426018 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -201,6 +201,8 @@ impl StateHandler { db_path, state: RwLock::new(State::new()), }; + + state.save_metadata().await?; Ok(state) } @@ -276,9 +278,13 @@ impl StateHandler { response_writer.write_command_complete(CompleteStatus::Delete(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) } CreateTable(table_schema) => { - let mut state = self.state.write().await; - let table = Table::new(table_schema, &self.db_path).await?; - state.attach_table(table).await; + { + let mut state = self.state.write().await; + let table = Table::new(table_schema, &self.db_path).await?; + state.attach_table(table).await; + // WARNING: We need to drop the write lock on state unless we want a deadlock. + } + self.save_metadata().await?; response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e)) } CreateIndex(table_position, column) => {