diff --git a/Cargo.lock b/Cargo.lock index 325dc15..42c5df8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,9 +286,13 @@ dependencies = [ "anyhow", "async-trait", "bimap", + "bincode", "proto", "serde", + "serde_json", + "storage_engine", "thiserror", + "tokio", ] [[package]] @@ -554,9 +558,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.112" +version = "1.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d1bd37ce2324cf3bf85e5a25f96eb4baf0d5aa6eba43e7ae8958870c4ec48ed" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" dependencies = [ "itoa", "ryu", diff --git a/minisql/Cargo.toml b/minisql/Cargo.toml index 8ab1ba7..4029a5c 100644 --- a/minisql/Cargo.toml +++ b/minisql/Cargo.toml @@ -10,6 +10,10 @@ rust-version = "1.74" anyhow = "1.0.79" async-trait = "0.1.77" bimap = { version = "0.6.3", features = ["serde"] } -serde = { version = "1.0.196", features = ["derive"] } +bincode = "2.0.0-rc.3" +serde = { version = "1.0.196", features = ["derive", "rc"] } +tokio = { version = "1.34.0", features = ["full"] } thiserror = "1.0.50" proto = { path = "../proto" } +storage_engine = { path = "../storage_engine" } +serde_json = "1.0.113" diff --git a/minisql/src/cancellation.rs b/minisql/src/cancellation.rs new file mode 100644 index 0000000..d29624c --- /dev/null +++ b/minisql/src/cancellation.rs @@ -0,0 +1,13 @@ +pub trait Cancellation { + fn is_canceled(&self) -> bool; +} + +#[cfg(test)] +pub(crate) struct DummyCancellation; + +#[cfg(test)] +impl Cancellation for DummyCancellation { + fn is_canceled(&self) -> bool { + false + } +} \ No newline at end of file diff --git a/minisql/src/error.rs b/minisql/src/error.rs index d74af4e..03b772f 100644 --- a/minisql/src/error.rs +++ b/minisql/src/error.rs @@ -13,6 +13,14 @@ pub enum RuntimeError { AttemptToIndexNonIndexableColumn(TableName, ColumnName), #[error("table {0} already indexes column {1}")] AttemptToIndexAlreadyIndexedColumn(TableName, ColumnName), + #[error("File-System Error: {0}")] + IoError(#[from] std::io::Error), + #[error("Storage Engine error for table {0}: {1}")] + StorageEngineError(TableName, storage_engine::error::Error), + #[error("runtime anyhow error: {0}")] + AnyhowError(#[from] anyhow::Error), + #[error("serde error: {0}")] + SerdeError(#[from] serde_json::Error), } #[derive(Debug, Error)] diff --git a/minisql/src/internals/row.rs b/minisql/src/internals/row.rs index 248f2f7..91bfd2f 100644 --- a/minisql/src/internals/row.rs +++ b/minisql/src/internals/row.rs @@ -5,6 +5,7 @@ use crate::type_system::Value; use serde::{Deserialize, Serialize}; use std::ops::{Index, IndexMut}; use std::slice::SliceIndex; +use storage_engine::segments::entry::EntryDetailed; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Row(Vec); @@ -39,6 +40,12 @@ impl FromIterator for Row { } } +impl From> for Row { + fn from(entry: EntryDetailed) -> Self { + Row(entry.data) + } +} + // To satisfy clippy. impl Default for Row { fn default() -> Self { diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs new file mode 100644 index 0000000..9426018 --- /dev/null +++ b/minisql/src/interpreter2.rs @@ -0,0 +1,403 @@ +use crate::operation::{ColumnSelection, Condition, Operation}; +use crate::result::DbResult; +use crate::schema::{Column, TableName, TablePosition, TableSchema}; +use crate::type_system::Value; +use crate::error::RuntimeError; +use crate::response_writer::{ResponseWriter, CompleteStatus}; +use crate::internals::row::Row; + +use bimap::BiMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use serde::{Deserialize, Serialize}; +use tokio::fs; + +use storage_engine::store::Store; +use storage_engine::segments::entry::Entry; +use storage_engine::cursor::{ReadCursor, WriteCursor}; +use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; +use storage_engine::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex}; +use crate::cancellation::Cancellation; + +const METADATA_FILE: &'static str = "metadata.json"; + +// ==============Interpreter================ +#[derive(Debug, Serialize, Deserialize)] +pub struct State { + table_name_position_mapping: BiMap, + table_schemas: Vec>, + #[serde(skip)] + tables: Tables, +} + +pub struct StateHandler { + db_path: PathBuf, + state: RwLock, +} + +pub type Tables = Vec>; + +#[derive(Debug)] +pub struct Table { + schema: Arc, + store: Store +} + +pub type DbSchema = Vec<(TableName, TablePosition, Arc)>; +// To satisfy clippy. +impl Default for State { + fn default() -> Self { + Self::new() + } +} + +impl Table { + async fn new(table_schema: TableSchema, db_path: &Path) -> DbResult { + let table_folder_name = table_schema.table_name(); + let path_to_table_folder = db_path.join(table_folder_name); + + let number_of_columns = table_schema.number_of_columns(); + let primary_column = table_schema.primary_column() as storage_engine::store::Column; + let store: Store = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap(); + + let table = Self { + schema: Arc::new(table_schema), + store, + }; + Ok(table) + } + + async fn connect(table_schema: Arc, db_path: &Path) -> DbResult { + let table_folder_name = table_schema.table_name(); + let path_to_table_folder = db_path.join(table_folder_name); + + let store: Store = Store::connect(&path_to_table_folder).await.map_err(|e| RuntimeError::StorageEngineError(table_schema.table_name().to_string(), e))?; + + let table = Self { + schema: table_schema, + store, + }; + Ok(table) + } + + async fn read(&self) -> DbResult> { + let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?; + Ok(cursor) + } + + async fn write(&mut self) -> DbResult> { + let cursor = self.store.write_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e))?; + Ok(cursor) + } + + pub fn schema(&self) -> Arc { + self.schema.clone() + } + + pub fn table_name(&self) -> &TableName { + self.schema.table_name() + } +} + +impl State { + pub fn new() -> Self { + Self { + table_name_position_mapping: BiMap::new(), + table_schemas: vec![], + tables: vec![], + } + } + + pub fn db_schema(&self) -> DbSchema { + let mut schema: DbSchema = Vec::new(); + for (table_name, &table_position) in &self.table_name_position_mapping { + let table_schema: Arc = self.table_schemas[table_position].clone(); + schema.push((table_name.clone(), table_position, table_schema)); + } + schema + } + + async fn table_at(&self, table_position: TablePosition) -> RwLockReadGuard { + self.tables[table_position].read().await + } + + async fn table_at_mut(&self, table_position: TablePosition) -> RwLockWriteGuard
{ + self.tables[table_position].write().await + } + + async fn attach_table(&mut self, table: Table) { + // TODO: You need to update the global DB SCHEMA! + let new_table_position: TablePosition = self.tables.len(); + self.table_name_position_mapping + .insert(table.schema().table_name().clone(), new_table_position); + self.table_schemas.push(table.schema()); + self.tables.push(RwLock::new(table)); + } + + async fn select_all_rows(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection) -> DbResult + where W: ResponseWriter, + C: Cancellation + { + let mut count = 0; + while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? { + count += 1; + let row: Row = From::from(entry); + let restricted_row = row.restrict_columns(&column_selection); + response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; + + if cancellation.is_canceled() { + break; + } + } + + Ok(count) + } + + async fn select_eq(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult + where W: ResponseWriter, + C: Cancellation + { + let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; + let count = entries.len(); + for entry in entries { + let row: Row = From::from(entry); + let restricted_row = row.restrict_columns(&column_selection); + response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; + + if cancellation.is_canceled() { + break; + } + } + + Ok(count) + } + + async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>) -> DbResult { + let count = cursor.delete_all_entries(true) + .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; + Ok(count) + } + + async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, eq_column: Column, value: Value) -> DbResult { + let count = + cursor.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true) + .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; + Ok(count) + } +} + +impl StateHandler { + pub fn is_existing_db(db_path: &PathBuf) -> bool { + db_path.exists() && db_path.is_dir() && + db_path.join(METADATA_FILE).exists() && db_path.join(METADATA_FILE).is_file() + } + + pub async fn new(db_path: PathBuf) -> DbResult { + fs::create_dir(db_path.clone()).await.map_err(|e| RuntimeError::IoError(e))?; + + let state = Self { + db_path, + state: RwLock::new(State::new()), + }; + + state.save_metadata().await?; + Ok(state) + } + + pub async fn connect(db_path: PathBuf) -> DbResult { + let metadata_file = db_path.join(METADATA_FILE); + let metadata_raw = fs::read_to_string(metadata_file).await?; + let mut metadata: State = serde_json::from_str(&metadata_raw)?; + + let mut tables = Vec::with_capacity(metadata.table_schemas.len()); + for table_schema in &metadata.table_schemas { + let table = Table::connect(table_schema.clone(), &db_path).await?; + tables.push(RwLock::new(table)); + } + + metadata.tables = tables; + Ok(Self { + db_path, + state: RwLock::new(metadata), + }) + } + + async fn save_metadata(&self) -> DbResult<()> { + let metadata_file = self.db_path.join(METADATA_FILE); + let state = self.state.read().await; + let metadata_raw = serde_json::to_string(&*state)?; + fs::write(metadata_file, metadata_raw).await.map_err(|e| RuntimeError::IoError(e)) + } + + pub async fn read_state(&self) -> RwLockReadGuard { + self.state.read().await + } + + pub async fn interpret(&self, response_writer: &mut W, cancellation: &C, operation: Operation) -> DbResult<()> { + use Operation::*; + + match operation { + Select(table_position, column_selection, maybe_condition) => { + let state = self.state.read().await; + + let table = state.table_at(table_position).await; + let cursor = table.read().await?; + + response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; + let count = match maybe_condition { + None => State::select_all_rows(&table, cursor, response_writer, cancellation, column_selection).await?, + Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, cancellation, column_selection, eq_column, value).await? + }; + response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) + } + Insert(table_position, values) => { + let state = self.state.read().await; + + let mut table = state.table_at_mut(table_position).await; + let mut cursor = table.write().await?; + + let entry = Entry::new(values); + cursor.insert_entry(entry).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; + + response_writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await.map_err(|e| RuntimeError::AnyhowError(e)) + } + Delete(table_position, maybe_condition) => { + let state = self.state.read().await; + + let mut table = state.table_at_mut(table_position).await; + let table_name = table.table_name().clone(); + let cursor = table.write().await?; + + let count = match maybe_condition { + None => State::delete_all_rows(table_name, cursor).await?, + Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, eq_column, value).await? + }; + + response_writer.write_command_complete(CompleteStatus::Delete(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) + } + CreateTable(table_schema) => { + { + let mut state = self.state.write().await; + let table = Table::new(table_schema, &self.db_path).await?; + state.attach_table(table).await; + // WARNING: We need to drop the write lock on state unless we want a deadlock. + } + self.save_metadata().await?; + response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e)) + } + CreateIndex(table_position, column) => { + let state = self.state.read().await; + + let mut table = state.table_at_mut(table_position).await; + let mut cursor = table.write().await?; + cursor.attach_index(column as storage_engine::store::Column).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; + response_writer.write_command_complete(CompleteStatus::CreateIndex).await.map_err(|e| RuntimeError::AnyhowError(e)) + } + } + } +} + + + + +#[cfg(test)] +mod tests { + use super::*; + use crate::operation::Operation; + use crate::schema::Column; + use crate::response_writer::ResponseWriterStub; + use crate::type_system::{DbType, IndexableValue, Value}; + use std::collections::HashSet; + use tokio::fs::{File, OpenOptions, DirBuilder}; + use tokio::fs; + use crate::cancellation::DummyCancellation; + + impl Drop for State { + fn drop(&mut self) { + println!("CLEANING UP INTERPRETER STATE"); + + let table_folder = "db-test-0"; + // Seems no one has figured out how to do AsyncDrop yet. + std::fs::remove_dir_all(table_folder).unwrap(); + } + } + + fn users_schema() -> TableSchema { + TableSchema::new( + "users".to_string(), + "id".to_string(), + vec!["id".to_string(), "name".to_string(), "age".to_string()], + vec![DbType::Uuid, DbType::String, DbType::Int], + ) + } + + #[tokio::test] + async fn test_table_creation() { + let db_path = Path::new("db-test-0"); + let state = StateHandler::new(db_path.to_path_buf()).await.unwrap(); + + let users_schema = users_schema(); + let users = users_schema.table_name().clone(); + + let mut response_writer = ResponseWriterStub::new(); + + state + .interpret(&mut response_writer, &DummyCancellation, Operation::CreateTable(users_schema.clone())).await + .unwrap(); + + { + println!("==EMPTY SELECT==="); + let users_position: TablePosition = 0; + state + .interpret(&mut response_writer, &DummyCancellation, Operation::Select( + users_position, + users_schema.all_selection(), + None, + )).await + .unwrap(); + } + + { + let users = 0; + let (id, name, age) = ( + Value::Uuid(0), + Value::String("Plato".to_string()), + Value::Int(64), + ); + + println!("About to insert!"); + + state + .interpret(&mut response_writer, &DummyCancellation, Operation::Insert( + users, + vec![id.clone(), name.clone(), age.clone()], + )).await + .unwrap(); + } + { + println!("==SELECT==="); + let users_position: TablePosition = 0; + state + .interpret(&mut response_writer, &DummyCancellation, Operation::Select( + users_position, + users_schema.all_selection(), + None, + )).await + .unwrap(); + } + + + + + // assert!(false); + + // assert!(state.tables.len() == 1); + // let table = &state.tables[0]; + // assert!(table.rows().len() == 0); + + // assert!(table.table_name() == &users); + } +} + diff --git a/minisql/src/lib.rs b/minisql/src/lib.rs index 9d27314..97cb941 100644 --- a/minisql/src/lib.rs +++ b/minisql/src/lib.rs @@ -1,9 +1,11 @@ mod error; mod internals; pub mod interpreter; +pub mod interpreter2; pub mod operation; pub mod response_writer; pub mod restricted_row; mod result; pub mod schema; pub mod type_system; +pub mod cancellation; diff --git a/minisql/src/response_writer.rs b/minisql/src/response_writer.rs index 15bd788..240a2f8 100644 --- a/minisql/src/response_writer.rs +++ b/minisql/src/response_writer.rs @@ -34,3 +34,53 @@ pub trait ResponseWriter { async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()>; async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()>; } + +// ===Stub implementation for testing=== +// +pub struct ResponseWriterStub {} + +impl ResponseWriterStub { + pub fn new() -> Self { + ResponseWriterStub {} + } +} + +#[async_trait] +impl ResponseWriter for ResponseWriterStub +{ + async fn write_table_header( + &mut self, + table_schema: &TableSchema, + columns: &ColumnSelection, + ) -> anyhow::Result<()> { + let column_names = table_schema.get_columns(); + for &column in columns { + let column_name = column_names[column]; + print!("{}, ", column_name) + } + println!(); + + Ok(()) + } + + async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()> { + for (_, value) in row.iter() { + print!("{:?}, ", value) + } + println!(); + + Ok(()) + } + + async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()> { + use CompleteStatus::*; + match status { + Insert { oid, rows } => println!("oid = {}, rows = {}", oid, rows), + Delete(count) => println!("Deleted {}", count), + Select(count) => println!("Selected {}", count), + CreateTable => println!("Table created"), + CreateIndex => println!("Index created"), + } + Ok(()) + } +} diff --git a/minisql/src/restricted_row.rs b/minisql/src/restricted_row.rs index 15aebba..07ad0e0 100644 --- a/minisql/src/restricted_row.rs +++ b/minisql/src/restricted_row.rs @@ -1,7 +1,9 @@ -use crate::schema::Column; +use crate::schema::{Column, TableSchema}; use crate::type_system::Value; +use crate::operation::ColumnSelection; use std::ops::Index; use std::slice::SliceIndex; +use storage_engine::segments::entry::EntryDetailed; #[derive(Debug, Clone)] pub struct RestrictedRow(Vec<(Column, Value)>); diff --git a/minisql/src/schema.rs b/minisql/src/schema.rs index 6833a3d..10e542f 100644 --- a/minisql/src/schema.rs +++ b/minisql/src/schema.rs @@ -50,15 +50,25 @@ impl TableSchema { &self.table_name } + pub fn primary_column(&self) -> Column { + self.primary_key + } + pub fn column_type(&self, column: Column) -> DbType { self.types[column].clone() } pub fn get_columns(&self) -> Vec<&ColumnName> { - self.column_name_position_mapping + let mut columns_in_random_order: Vec<_> = self.column_name_position_mapping .iter() - .map(|(name, _)| name) - .collect() + .collect(); + columns_in_random_order.sort_by(|&(_, column0), &(_, column1)| column0.cmp(column1)); + + let columns: Vec<_> = columns_in_random_order + .iter() + .map(|(name, _)| *name) + .collect(); + columns } pub fn does_column_exist(&self, column_name: &ColumnName) -> bool { diff --git a/minisql/src/type_system.rs b/minisql/src/type_system.rs index e30edea..155eb99 100644 --- a/minisql/src/type_system.rs +++ b/minisql/src/type_system.rs @@ -2,9 +2,12 @@ use crate::error::TypeConversionError; use proto::message::primitive::pgoid::PgOid; use serde::{Deserialize, Serialize}; use std::cmp::Ordering; +// TODO: Private??? +// use bincode::{Encode, Encoder, EncodeError, Decode, Decoder, DecodeError}; +use bincode::{Encode, Decode}; // ==============Types================ -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)] pub enum DbType { String, Int, @@ -15,7 +18,7 @@ pub enum DbType { // ==============Values================ pub type Uuid = u64; -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Encode, Decode)] #[serde(try_from = "String", into = "String")] pub enum Value { Number(f64), @@ -81,6 +84,54 @@ impl Ord for IndexableValue { } } +impl Eq for Value {} + +impl PartialOrd for Value { + fn partial_cmp(&self, other: &Self) -> Option { + todo!() + } +} + +// TODO: Make column know about indexable types +impl Ord for Value { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (Value::String(s0), Value::String(s1)) => s0.cmp(s1), + (Value::Int(n0), Value::Int(n1)) => n0.cmp(n1), + (Value::Uuid(id0), Value::Uuid(id1)) => id0.cmp(id1), + (Value::None(_), Value::None(_)) => Ordering::Equal, + (Value::None(_), Value::Some(_)) => Ordering::Less, + (Value::Some(_), Value::None(_)) => Ordering::Greater, + (Value::Some(v0), Value::Some(v1)) => v0.cmp(v1), + _ => + // SAFETY: + // We are using indexable values as keys in key-value maps. + // When validation is done, it can't happen that we will be comparing two values + // of different types. + // Ofcourse another option is to artificialy order e.g. + // None < Some(...) < String < Int < Uuid + // where ... is again None < Some(...) < String < Int < Uuid + // where ... + // infinitely deep total order. But this is pointless for our usecase. + { + unreachable!() + } + } + } +} + +// impl Encode for Value { +// fn encode(&self, encoder: &mut E) -> Result<(), EncodeError> { +// todo!() +// } +// } + +// impl Decode for Value { +// fn decode(decoder: &mut D) -> Result { +// todo!() +// } +// } + impl DbType { fn new_n_option(n: usize, inside: DbType) -> DbType { if n == 0 { diff --git a/parser/src/core.rs b/parser/src/core.rs index 1782f6b..28059fe 100644 --- a/parser/src/core.rs +++ b/parser/src/core.rs @@ -1,5 +1,5 @@ use crate::syntax::RawQuerySyntax; -use minisql::{interpreter::DbSchema, operation::Operation}; +use minisql::{interpreter2::DbSchema, operation::Operation}; use nom::{branch::alt, character::complete::{multispace0, char}, multi::many1, sequence::{delimited, terminated}, IResult}; use thiserror::Error; diff --git a/parser/src/validation.rs b/parser/src/validation.rs index 741d0fd..87eec28 100644 --- a/parser/src/validation.rs +++ b/parser/src/validation.rs @@ -1,4 +1,5 @@ use std::collections::{BTreeMap, HashSet}; +use std::sync::Arc; use thiserror::Error; use crate::parsing::literal::Literal; @@ -6,7 +7,7 @@ use crate::syntax; use crate::syntax::{ColumnSchema, RawQuerySyntax, RawTableSchema}; use minisql::operation; use minisql::{ - interpreter::DbSchema, + interpreter2::DbSchema, operation::Operation, schema::{Column, ColumnName, TableName, TablePosition, TableSchema}, type_system::DbType, @@ -63,15 +64,15 @@ pub fn validate_operation( } } -fn validate_table_exists<'a>( - db_schema: &DbSchema<'a>, - table_name: &'a TableName, -) -> Result<(TablePosition, &'a TableSchema), ValidationError> { +fn validate_table_exists( + db_schema: &DbSchema, + table_name: &TableName, +) -> Result<(TablePosition, Arc), ValidationError> { db_schema .iter() .find(|(tname, _, _)| table_name.eq(tname)) .ok_or(ValidationError::TableDoesNotExist(table_name.to_string())) - .map(|(_, table_position, table_schema)| (*table_position, *table_schema)) + .map(|(_, table_position, table_schema)| (*table_position, table_schema.clone())) } fn validate_create_table( @@ -167,7 +168,7 @@ fn validate_select( .iter() .filter_map(|column_name| schema.get_column(column_name)) .collect(); - let validated_condition = validate_condition(condition, schema)?; + let validated_condition = validate_condition(condition, &schema)?; Ok(Operation::Select( table_position, selection, @@ -178,7 +179,7 @@ fn validate_select( } } syntax::ColumnSelection::All => { - let validated_condition = validate_condition(condition, schema)?; + let validated_condition = validate_condition(condition, &schema)?; Ok(Operation::Select( table_position, schema.all_selection(), @@ -267,13 +268,13 @@ fn validate_delete( db_schema: &DbSchema, ) -> Result { let (table_position, schema) = validate_table_exists(db_schema, &table_name)?; - let validated_condition = validate_condition(condition, schema)?; + let validated_condition = validate_condition(condition, &schema)?; Ok(Operation::Delete(table_position, validated_condition)) } fn validate_condition( condition: Option, - schema: &TableSchema, + schema: &Arc, ) -> Result, ValidationError> { match condition { Some(condition) => match condition { @@ -338,14 +339,14 @@ where None } -fn get_table_schema<'a>( - db_schema: &DbSchema<'a>, - table_name: &'a TableName, -) -> Option<&'a TableSchema> { +fn get_table_schema( + db_schema: &DbSchema, + table_name: &TableName, +) -> Option> { let (_, _, table_schema) = db_schema .iter() .find(|(tname, _, _)| table_name.eq(tname))?; - Some(table_schema) + Some(table_schema.clone()) } fn literal_to_value(lit: Literal, hint: &DbType) -> Value { @@ -428,11 +429,11 @@ mod tests { } } - fn db_schema(users_schema: &TableSchema) -> DbSchema { - vec![("users".to_string(), 0, users_schema)] + fn db_schema(users_schema: Arc) -> DbSchema { + vec![("users".to_string(), 0, users_schema.clone())] } - fn empty_db_schema() -> DbSchema<'static> { + fn empty_db_schema() -> DbSchema { vec![] } @@ -547,7 +548,7 @@ mod tests { #[test] fn test_create_already_exists() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = CreateTable(raw_users_schema()); let result = validate_operation(syntax, &db_schema); @@ -561,7 +562,7 @@ mod tests { #[test] fn test_select_basic() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let id = 0; let name = 1; @@ -583,7 +584,7 @@ mod tests { #[test] fn test_select_non_existent_table() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Select("does_not_exist".to_string(), ColumnSelection::All, None); @@ -594,7 +595,7 @@ mod tests { #[test] fn test_select_eq() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let id = 0; @@ -622,7 +623,7 @@ mod tests { #[test] fn test_select_eq_columns_selection() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let name = 1; @@ -652,7 +653,7 @@ mod tests { #[test] fn test_select_eq_columns_selection_nonexistent_column_selected() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Select( "users".to_string(), @@ -666,7 +667,7 @@ mod tests { #[test] fn test_select_eq_non_existent_column() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Select( "users".to_string(), @@ -680,7 +681,7 @@ mod tests { #[test] fn test_select_eq_type_error() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Select( "users".to_string(), @@ -695,7 +696,7 @@ mod tests { #[test] fn test_insert() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; @@ -732,7 +733,7 @@ mod tests { #[test] fn test_insert_non_existent_column() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Insert( "users".to_string(), @@ -750,7 +751,7 @@ mod tests { #[test] fn test_insert_ill_typed_column() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Insert( "users".to_string(), @@ -768,7 +769,7 @@ mod tests { #[test] fn test_delete_all() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; @@ -786,7 +787,7 @@ mod tests { #[test] fn test_delete_eq() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let age = 2; @@ -816,7 +817,7 @@ mod tests { #[test] fn test_create_index() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let age = 2; @@ -836,7 +837,7 @@ mod tests { #[test] fn test_create_index_nonexistent_column() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = CreateIndex("users".to_string(), "does_not_exist".to_string()); let result = validate_operation(syntax, &db_schema); diff --git a/server/src/cancellation.rs b/server/src/cancellation.rs index 4609f48..5519f2d 100644 --- a/server/src/cancellation.rs +++ b/server/src/cancellation.rs @@ -1,5 +1,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use minisql::cancellation::Cancellation; pub struct ResetCancelToken { is_canceled: Arc, @@ -12,10 +13,6 @@ impl ResetCancelToken { } } - pub fn is_canceled(&self) -> bool { - self.is_canceled.load(Ordering::SeqCst) - } - pub fn cancel(&self) { self.is_canceled.store(true, Ordering::SeqCst); } @@ -25,6 +22,12 @@ impl ResetCancelToken { } } +impl Cancellation for ResetCancelToken { + fn is_canceled(&self) -> bool { + self.is_canceled.load(Ordering::SeqCst) + } +} + impl Clone for ResetCancelToken { fn clone(&self) -> Self { Self { diff --git a/server/src/config.rs b/server/src/config.rs index 2dedcbc..f709511 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -17,8 +17,8 @@ pub struct Configuration { help = "Port for the server to listen on" )] port: u16, - #[arg(short, long, help = "Path to the data file")] - file: PathBuf, + #[arg(short, long, help = "Path to the folder for database data")] + folder: PathBuf, #[arg(short, long, help = "Delay between rows in milliseconds")] throttle: Option, } @@ -30,8 +30,8 @@ impl Configuration { } #[inline] - pub fn get_file_path(&self) -> &PathBuf { - &self.file + pub fn get_folder_path(&self) -> &PathBuf { + &self.folder } #[inline] diff --git a/server/src/main.rs b/server/src/main.rs index e1e6f3d..c76c371 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,14 +1,13 @@ use std::collections::HashMap; -use std::io::ErrorKind; use std::sync::Arc; use clap::Parser; use tokio::io::{BufReader, BufWriter}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::Mutex; -use minisql::interpreter::{Response, State}; -use minisql::response_writer::{CompleteStatus, ResponseWriter}; +use minisql::interpreter2::StateHandler; +use minisql::response_writer::ResponseWriter; use parser::parse_and_validate; use proto::handshake::errors::ServerHandshakeError; use proto::handshake::request::HandshakeRequest; @@ -22,23 +21,21 @@ use proto::writer::protowriter::{ProtoFlush, ProtoWriter}; use crate::cancellation::ResetCancelToken; use crate::config::Configuration; -use crate::persistence::state_to_file; use crate::proto_wrapper::ServerProtoWrapper; mod cancellation; mod config; -mod persistence; mod proto_wrapper; type TokenStore = Arc>>; -type SharedDbState = Arc>; +type SharedDbState = Arc; #[tokio::main] async fn main() -> anyhow::Result<()> { let config = Configuration::parse(); let config = Arc::new(config); - let state = Arc::new(RwLock::new(get_state(&config).await?)); + let state = Arc::new(get_state(&config).await?); let tokens = Arc::new(Mutex::new(HashMap::<(i32, i32), ResetCancelToken>::new())); let addr = config.get_socket_address(); @@ -59,15 +56,12 @@ async fn main() -> anyhow::Result<()> { } } -async fn get_state(config: &Configuration) -> anyhow::Result { - let result = persistence::state_from_file(config.get_file_path()).await; - match result { - Err(ref e) if e.kind() == ErrorKind::NotFound => { - println!("WARNING: No DB state file found, creating new one"); - Ok(State::new()) - } - Err(e) => Err(e)?, - Ok(state) => Ok(state), +async fn get_state(config: &Configuration) -> anyhow::Result { + let path = config.get_folder_path(); + if StateHandler::is_existing_db(&path) { + Ok(StateHandler::connect(path.clone()).await?) + } else { + Ok(StateHandler::new(path.clone()).await?) } } @@ -191,75 +185,10 @@ where token.reset(); let operation = { - let state = state.read().await; - let db_schema = state.db_schema(); + let db_schema = state.read_state().await.db_schema(); parse_and_validate(query, &db_schema)? }; - let need_write = { - let mut state = state.write().await; - let response = state.interpret(operation)?; - - match response { - Response::Deleted(i) => { - writer - .write_command_complete(CompleteStatus::Delete(i)) - .await?; - true - } - Response::Inserted => { - writer - .write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }) - .await?; - true - } - Response::Selected(schema, columns, mut rows) => { - writer.write_table_header(schema, &columns).await?; - match rows.next() { - Some(row) => { - writer.write_table_row(&row).await?; - - let mut sent_rows = 1; - for row in rows { - sent_rows += 1; - writer.write_table_row(&row).await?; - if token.is_canceled() { - token.reset(); - break; - } - } - - writer - .write_command_complete(CompleteStatus::Select(sent_rows)) - .await?; - } - _ => { - writer - .write_command_complete(CompleteStatus::Select(0)) - .await?; - } - } - false - } - Response::TableCreated => { - writer - .write_command_complete(CompleteStatus::CreateTable) - .await?; - true - } - Response::IndexCreated => { - writer - .write_command_complete(CompleteStatus::CreateIndex) - .await?; - true - } - } - }; - - if need_write { - let state = state.read().await; - state_to_file(&state, config.get_file_path()).await?; - } - + state.interpret(writer, token, operation).await?; Ok(()) } diff --git a/server/src/persistence.rs b/server/src/persistence.rs deleted file mode 100644 index 12479c2..0000000 --- a/server/src/persistence.rs +++ /dev/null @@ -1,15 +0,0 @@ -use minisql::interpreter::State; -use std::path::PathBuf; -use tokio::{fs, io}; - -pub async fn state_from_file(path: &PathBuf) -> io::Result { - let content = fs::read_to_string(path).await?; - let state = serde_json::from_str(&content)?; - Ok(state) -} - -pub async fn state_to_file(state: &State, path: &PathBuf) -> io::Result<()> { - let content = serde_json::to_string(state)?; - fs::write(path, content).await?; - Ok(()) -} diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 3f68560..5c76577 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -10,10 +10,11 @@ use bincode::{Decode, Encode}; use crate::segments::entry::EntryDetailed; use crate::segments::entry_header::EntryHeader; use crate::segments::store_header::StoreHeader; -use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; +use crate::store::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; use crate::index::Index; use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; -use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader}; +use crate::cursor_capabilities::traversal::CursorCanTraverse; +use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries; use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex}; const GARBAGE_COLLECTION_TRIGGER: usize = 100; @@ -82,24 +83,24 @@ impl CursorCanWrite for AppendOnlyCursor {} // ===capability to access header=== -impl CursorCanReadHeader for ReadCursor<'_, T> { +impl CursorCanTraverse for ReadCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } -impl CursorCanReadHeader for WriteCursor<'_, T> { +impl CursorCanTraverse for WriteCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } -impl CursorCanReadHeader for AppendOnlyCursor { +impl CursorCanTraverse for AppendOnlyCursor { fn header(&self) -> &StoreHeader { &self.header } } -impl CursorCanWriteHeader for WriteCursor<'_, T> { +impl CursorCanModifyEntries for WriteCursor<'_, T> { fn header_mut(&mut self) -> &mut StoreHeader { self.header } fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } } -impl CursorCanWriteHeader for AppendOnlyCursor { +impl CursorCanModifyEntries for AppendOnlyCursor { fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header } fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } } @@ -243,6 +244,21 @@ impl <'cursor, T> WriteCursor<'cursor, T> Ok(count) } + pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result + where T: Encode + Decode + Ord + Send + Sync + Clone + { + let mut count = 0; + while let Some(entry) = self.next_alive().await? { + count += 1; + self.mark_deleted_at(entry.file_position, false).await? + } + + if enable_garbage_collector { + self.attempt_garbage_collection_if_necessary().await?; + } + Ok(count) + } + pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result where T: Encode + Decode + Ord + Send + Sync + Clone { @@ -363,7 +379,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> async fn spawn_cursor_to_intermediate_file(&self) -> Result> where T: Send { - let table_folder = self.header.table_folder.to_string(); + let table_folder = self.header.table_folder.clone(); let path_to_table = Path::new(&table_folder); let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); diff --git a/storage_engine/src/cursor_capabilities/entry_modification.rs b/storage_engine/src/cursor_capabilities/entry_modification.rs new file mode 100644 index 0000000..6622347 --- /dev/null +++ b/storage_engine/src/cursor_capabilities/entry_modification.rs @@ -0,0 +1,67 @@ +use async_trait::async_trait; + +use bincode; +use bincode::Encode; +use crate::binary_coding::encode; + +use crate::segments::entry::Entry; +use crate::segments::store_header::StoreHeader; +use crate::store::{FilePosition, Result}; +use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; +use crate::cursor_capabilities::traversal::CursorCanTraverse; + +#[async_trait] +pub trait CursorCanModifyEntries: CursorCanTraverse + CursorCanWrite { + fn header_mut(&mut self) -> &mut StoreHeader; + fn set_eof_file_position(&mut self, new_file_position: FilePosition); + + // ===Store Header Manipulation=== + async fn increment_total_count(&mut self) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; + let new_count = self.header_mut().increment_total_count(); + self.write_bytes(&encode::(&new_count)?).await?; + Ok(()) + } + + async fn increment_deleted_count(&mut self) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; + let new_count = self.header_mut().increment_deleted_count(); + self.write_bytes(&encode::(&new_count)?).await?; + Ok(()) + } + + async fn set_header(&mut self, header: &StoreHeader) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + let encoded_header: Vec = header.encode()?; + self.write_bytes(&encoded_header).await?; + + Ok(()) + } + + // ===Append Entry=== + + // Moves cursor to the end. + // Returns file position to the start of the new entry. + async fn append_entry_no_indexing(&mut self, entry: &Entry) -> Result + where T: Encode + Send + Sync + { + self.increment_total_count().await?; + + let encoded_entry: Vec = entry.encode()?; + let file_position = self.seek_to_end().await?; + self.write_bytes(&encoded_entry).await?; + + let eof_file_position: FilePosition = self.current_file_position().await?; + self.set_eof_file_position(eof_file_position); + + Ok(file_position) + } +} diff --git a/storage_engine/src/cursor_capabilities/index_access.rs b/storage_engine/src/cursor_capabilities/index_access.rs index d9d7fc3..81ee38e 100644 --- a/storage_engine/src/cursor_capabilities/index_access.rs +++ b/storage_engine/src/cursor_capabilities/index_access.rs @@ -7,12 +7,13 @@ use bincode::{Decode, Encode}; use crate::error::Error; use crate::segments::entry::{Entry, EntryDetailed}; -use crate::storage_engine::{FilePosition, Column, Result}; +use crate::store::{FilePosition, Column, Result}; use crate::index::Index; -use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader}; +use crate::cursor_capabilities::traversal::CursorCanTraverse; +use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries; #[async_trait] -pub trait CursorCanReadIndex: CursorCanReadHeader { +pub trait CursorCanReadIndex: CursorCanTraverse { fn indexes(&mut self) -> &[Option>]; async fn index_lookup(&mut self, column: Column, value: &T) -> Result>> @@ -52,7 +53,7 @@ pub trait CursorCanReadIndex: CursorCanReadHeader { } #[async_trait] -pub trait CursorCanWriteToIndex: CursorCanReadIndex + CursorCanWriteHeader { +pub trait CursorCanWriteToIndex: CursorCanReadIndex + CursorCanModifyEntries { fn indexes_mut(&mut self) -> &mut [Option>]; // Assumes that the column is indexable. diff --git a/storage_engine/src/cursor_capabilities/mod.rs b/storage_engine/src/cursor_capabilities/mod.rs index 6d301cb..9872f1c 100644 --- a/storage_engine/src/cursor_capabilities/mod.rs +++ b/storage_engine/src/cursor_capabilities/mod.rs @@ -1,3 +1,4 @@ pub(crate) mod primitive; -pub(crate) mod header_access; -pub(crate) mod index_access; +pub mod traversal; +pub mod entry_modification; +pub mod index_access; diff --git a/storage_engine/src/cursor_capabilities/primitive.rs b/storage_engine/src/cursor_capabilities/primitive.rs index d11b79b..82fb23d 100644 --- a/storage_engine/src/cursor_capabilities/primitive.rs +++ b/storage_engine/src/cursor_capabilities/primitive.rs @@ -2,7 +2,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; use tokio::fs::File; use async_trait::async_trait; -use crate::storage_engine::{FilePosition, Result}; +use crate::store::{FilePosition, Result}; #[async_trait] pub(crate) trait CursorCanRead { diff --git a/storage_engine/src/cursor_capabilities/header_access.rs b/storage_engine/src/cursor_capabilities/traversal.rs similarity index 74% rename from storage_engine/src/cursor_capabilities/header_access.rs rename to storage_engine/src/cursor_capabilities/traversal.rs index 57ccabb..00a1da6 100644 --- a/storage_engine/src/cursor_capabilities/header_access.rs +++ b/storage_engine/src/cursor_capabilities/traversal.rs @@ -2,18 +2,19 @@ use tokio::io::AsyncReadExt; use async_trait::async_trait; use bincode; -use bincode::{Decode, Encode}; -use crate::binary_coding::{encode, decode}; +use bincode::Decode; +use crate::binary_coding::decode; use crate::error::{Error, DecodeErrorKind}; -use crate::segments::entry::{Entry, EntryDetailed}; +use crate::segments::entry::EntryDetailed; use crate::segments::entry_header::EntryHeaderWithDataSize; use crate::segments::store_header::StoreHeader; -use crate::storage_engine::{FilePosition, Column, Result}; -use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; +use crate::store::{FilePosition, Column, Result}; +use crate::cursor_capabilities::primitive::CursorCanRead; + #[async_trait] -pub trait CursorCanReadHeader: CursorCanRead { +pub trait CursorCanTraverse: CursorCanRead { fn header(&self) -> &StoreHeader; async fn seek_to_start_of_data(&mut self) -> Result { @@ -172,59 +173,3 @@ pub trait CursorCanReadHeader: CursorCanRead { Ok(bytes) } } - -#[async_trait] -pub trait CursorCanWriteHeader: CursorCanReadHeader + CursorCanWrite { - fn header_mut(&mut self) -> &mut StoreHeader; - fn set_eof_file_position(&mut self, new_file_position: FilePosition); - - // ===Store Header Manipulation=== - async fn increment_total_count(&mut self) -> Result<()> - where T: Send - { - self.seek_to_start().await?; - self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; - let new_count = self.header_mut().increment_total_count(); - self.write_bytes(&encode::(&new_count)?).await?; - Ok(()) - } - - async fn increment_deleted_count(&mut self) -> Result<()> - where T: Send - { - self.seek_to_start().await?; - self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; - let new_count = self.header_mut().increment_deleted_count(); - self.write_bytes(&encode::(&new_count)?).await?; - Ok(()) - } - - async fn set_header(&mut self, header: &StoreHeader) -> Result<()> - where T: Send - { - self.seek_to_start().await?; - let encoded_header: Vec = header.encode()?; - self.write_bytes(&encoded_header).await?; - - Ok(()) - } - - // ===Append Entry=== - - // Moves cursor to the end. - // Returns file position to the start of the new entry. - async fn append_entry_no_indexing(&mut self, entry: &Entry) -> Result - where T: Encode + Send + Sync - { - self.increment_total_count().await?; - - let encoded_entry: Vec = entry.encode()?; - let file_position = self.seek_to_end().await?; - self.write_bytes(&encoded_entry).await?; - - let eof_file_position: FilePosition = self.current_file_position().await?; - self.set_eof_file_position(eof_file_position); - - Ok(file_position) - } -} diff --git a/storage_engine/src/error.rs b/storage_engine/src/error.rs index 47bf5cc..1ca849c 100644 --- a/storage_engine/src/error.rs +++ b/storage_engine/src/error.rs @@ -1,25 +1,41 @@ -use crate::storage_engine::Column; +use crate::store::Column; +use thiserror::Error; -#[derive(Debug)] +#[derive(Debug, Error)] pub enum Error { + #[error("Decoding Error: {0} {1}")] DecodeError(DecodeErrorKind, bincode::error::DecodeError), + #[error("Encoding Error: {0}")] EncodeError(bincode::error::EncodeError), + #[error("Attempt to index non-indexed column {0}")] AttemptToIndexNonIndexableColumn(Column), + #[error("Index Corruption: Index is storing eof file position for column {0}")] IndexIsStoringEofFilePosition(Column), + #[error("Column {0} is already indexed")] ColumnAlreadyIndexed(Column), + #[error("File-System Error: {0}")] IoError(std::io::Error), } -#[derive(Debug)] +#[derive(Debug, Error)] pub enum DecodeErrorKind { + #[error("StoreHeaderNumberOfColumns")] StoreHeaderNumberOfColumns, + #[error("StoreHeaderDeletedCount")] StoreHeaderDeletedCount, + #[error("StoreHeaderTotalCount")] StoreHeaderTotalCount, + #[error("StoreHeaderPrimaryColumn")] StoreHeaderPrimaryColumn, + #[error("StoreHeaderIndexedColumns")] StoreHeaderIndexedColumns, + #[error("EntryData")] EntryData, + #[error("EntryIsDeleted")] EntryIsDeleted, + #[error("EntryHeaderWithDataSizes")] EntryHeaderWithDataSizes, + #[error("CorruptedData")] CorruptedData, } diff --git a/storage_engine/src/lib.rs b/storage_engine/src/lib.rs index e7920dd..00ff87c 100644 --- a/storage_engine/src/lib.rs +++ b/storage_engine/src/lib.rs @@ -1,7 +1,7 @@ -pub mod storage_engine; +pub mod store; mod binary_coding; -mod error; +pub mod error; mod index; -mod cursor; -mod segments; -mod cursor_capabilities; +pub mod cursor; +pub mod segments; +pub mod cursor_capabilities; diff --git a/storage_engine/src/segments/entry.rs b/storage_engine/src/segments/entry.rs index b42c490..a2f8bc8 100644 --- a/storage_engine/src/segments/entry.rs +++ b/storage_engine/src/segments/entry.rs @@ -1,7 +1,7 @@ use bincode::{Decode, Encode}; use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence}; -use crate::storage_engine::{Result, FilePosition}; +use crate::store::{Result, FilePosition}; use crate::error::{Error, DecodeErrorKind}; use crate::segments::entry_header::{EntryHeader, EntryHeaderWithDataSize}; @@ -23,10 +23,6 @@ impl Entry { Self { header: EntryHeader { is_deleted: false }, data } } - pub fn new_deleted(data: Vec) -> Self { - Self { header: EntryHeader { is_deleted: true}, data } - } - // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] pub fn encode(&self) -> Result> where T: Encode diff --git a/storage_engine/src/segments/entry_header.rs b/storage_engine/src/segments/entry_header.rs index cee5496..c0dc11e 100644 --- a/storage_engine/src/segments/entry_header.rs +++ b/storage_engine/src/segments/entry_header.rs @@ -1,5 +1,5 @@ use crate::binary_coding::{decode, encode, decode_sequence}; -use crate::storage_engine::{Result, Column}; +use crate::store::{Result, Column}; use crate::error::{Error, DecodeErrorKind}; use std::mem::size_of; diff --git a/storage_engine/src/segments/store_header.rs b/storage_engine/src/segments/store_header.rs index 890b198..b8bdfd5 100644 --- a/storage_engine/src/segments/store_header.rs +++ b/storage_engine/src/segments/store_header.rs @@ -1,11 +1,12 @@ use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence}; -use crate::storage_engine::{Result, Column}; +use crate::store::{Result, Column}; use crate::error::{Error, DecodeErrorKind}; use std::mem::size_of; +use std::path::PathBuf; #[derive(Debug, Clone)] pub struct StoreHeader { - pub table_folder: String, // This one is not encoded into the file + pub table_folder: PathBuf, // This one is not encoded into the file pub number_of_columns: usize, pub deleted_count: usize, @@ -16,7 +17,7 @@ pub struct StoreHeader { #[derive(Debug, Clone)] pub struct StoreHeaderFixedPart { - pub table_folder: String, // This one is not encoded into the file + pub table_folder: PathBuf, // This one is not encoded into the file pub number_of_columns: usize, pub deleted_count: usize, @@ -63,7 +64,7 @@ impl StoreHeader { vec![0; Self::indexed_columns_size(header.number_of_columns)] } - pub async fn decode_fixed(table_folder: &str, result: &[u8]) -> Result { + pub async fn decode_fixed(table_folder: &PathBuf, result: &[u8]) -> Result { let (number_of_columns, _) = decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; @@ -77,7 +78,7 @@ impl StoreHeader { decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; let header = StoreHeaderFixedPart { - table_folder: table_folder.to_string(), + table_folder: table_folder.clone(), number_of_columns, deleted_count, total_count, @@ -93,7 +94,7 @@ impl StoreHeader { .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?; Ok(StoreHeader { - table_folder: header.table_folder, + table_folder: header.table_folder.into(), number_of_columns: header.number_of_columns, deleted_count: header.deleted_count, total_count: header.total_count, diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/store.rs similarity index 95% rename from storage_engine/src/storage_engine.rs rename to storage_engine/src/store.rs index 7798e10..bb256ff 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/store.rs @@ -6,7 +6,7 @@ use bincode::{Decode, Encode}; use crate::error::Error; use crate::cursor::{ReadCursor, WriteCursor}; -use crate::cursor_capabilities::header_access::CursorCanReadHeader; +use crate::cursor_capabilities::traversal::CursorCanTraverse; use crate::segments::store_header::StoreHeader; use crate::index::Index; @@ -36,10 +36,9 @@ pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_i impl Store { // ===Creation=== - pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result + pub async fn new(path_to_table: &Path, number_of_columns: usize, primary_column: Column) -> Result where T: Encode + Decode + Ord { - let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); DirBuilder::new() .create(path_to_table).await?; @@ -48,7 +47,7 @@ impl Store { let mut indexed_columns = vec![false; number_of_columns]; indexed_columns[primary_column as usize] = true; StoreHeader { - table_folder: table_folder.to_string(), + table_folder: path_to_table.to_path_buf(), number_of_columns, deleted_count: 0, total_count: 0, @@ -121,7 +120,7 @@ impl Store { Ok(file) } - pub async fn connect(table_folder: &str) -> Result + pub async fn connect(table_folder: &PathBuf) -> Result where T: std::fmt::Debug + Encode + Decode + Ord { let path_to_table = Path::new(table_folder); @@ -208,8 +207,8 @@ impl Store { mod tests { use super::*; use crate::segments::entry::Entry; - use crate::cursor_capabilities::header_access::CursorCanReadHeader; use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex}; + use crate::cursor_capabilities::traversal::CursorCanTraverse; impl Drop for Store { fn drop(&mut self) { @@ -225,7 +224,7 @@ mod tests { async fn test_create() { type Data = u32; - let table_path = "test_table_0"; + let table_path = Path::new("test_table_0"); let number_of_columns = 5; let primary_column = 0; let store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -240,7 +239,7 @@ mod tests { async fn test_insert() { type Data = u32; - let table_path = "test_table_1"; + let table_path = Path::new("test_table_1"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -262,7 +261,7 @@ mod tests { async fn test_select_next() { type Data = u32; - let table_path = "test_table_2"; + let table_path = Path::new("test_table_2"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -294,7 +293,7 @@ mod tests { async fn test_select_all() { type Data = u32; - let table_path = "test_table_3"; + let table_path = Path::new("test_table_3"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -329,7 +328,7 @@ mod tests { async fn test_select_eq() { type Data = u32; - let table_path = "test_table_4"; + let table_path = Path::new("test_table_4"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -369,7 +368,7 @@ mod tests { async fn test_select_eq_indexed() { type Data = u32; - let table_path = "test_table_5"; + let table_path = Path::new("test_table_5"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -415,7 +414,7 @@ mod tests { async fn test_delete_entry() { type Data = u32; - let table_path = "test_table_6"; + let table_path = Path::new("test_table_6"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -452,7 +451,7 @@ mod tests { async fn test_delete_where_eq() { type Data = u32; - let table_path = "test_table_7"; + let table_path = Path::new("test_table_7"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); @@ -496,7 +495,7 @@ mod tests { async fn test_garbage_collection() { type Data = u32; - let table_path = "test_table_8"; + let table_path = Path::new("test_table_8"); let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap();