diff --git a/minisql/src/cancellation.rs b/minisql/src/cancellation.rs index d29624c..9452084 100644 --- a/minisql/src/cancellation.rs +++ b/minisql/src/cancellation.rs @@ -10,4 +10,4 @@ impl Cancellation for DummyCancellation { fn is_canceled(&self) -> bool { false } -} \ No newline at end of file +} diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 9426018..8d73a63 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -1,25 +1,27 @@ +use crate::error::RuntimeError; +use crate::internals::row::Row; use crate::operation::{ColumnSelection, Condition, Operation}; +use crate::response_writer::{CompleteStatus, ResponseWriter}; use crate::result::DbResult; use crate::schema::{Column, TableName, TablePosition, TableSchema}; use crate::type_system::Value; -use crate::error::RuntimeError; -use crate::response_writer::{ResponseWriter, CompleteStatus}; -use crate::internals::row::Row; use bimap::BiMap; use std::path::{Path, PathBuf}; use std::sync::Arc; -use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use serde::{Deserialize, Serialize}; use tokio::fs; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use storage_engine::store::Store; -use storage_engine::segments::entry::Entry; -use storage_engine::cursor::{ReadCursor, WriteCursor}; -use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; -use storage_engine::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex}; use crate::cancellation::Cancellation; +use storage_engine::cursor::{ReadCursor, WriteCursor}; +use storage_engine::cursor_capabilities::index_access::{ + CursorCanReadIndex, CursorCanWriteToIndex, +}; +use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; +use storage_engine::segments::entry::Entry; +use storage_engine::store::Store; const METADATA_FILE: &'static str = "metadata.json"; @@ -42,7 +44,7 @@ pub type Tables = Vec>; #[derive(Debug)] pub struct Table { schema: Arc, - store: Store + store: Store, } pub type DbSchema = Vec<(TableName, TablePosition, Arc)>; @@ -60,7 +62,10 @@ impl Table { let number_of_columns = table_schema.number_of_columns(); let primary_column = table_schema.primary_column() as storage_engine::store::Column; - let store: Store = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap(); + let store: Store = + Store::new(&path_to_table_folder, number_of_columns, primary_column) + .await + .unwrap(); let table = Self { schema: Arc::new(table_schema), @@ -73,7 +78,9 @@ impl Table { let table_folder_name = table_schema.table_name(); let path_to_table_folder = db_path.join(table_folder_name); - let store: Store = Store::connect(&path_to_table_folder).await.map_err(|e| RuntimeError::StorageEngineError(table_schema.table_name().to_string(), e))?; + let store: Store = Store::connect(&path_to_table_folder).await.map_err(|e| { + RuntimeError::StorageEngineError(table_schema.table_name().to_string(), e) + })?; let table = Self { schema: table_schema, @@ -83,12 +90,18 @@ impl Table { } async fn read(&self) -> DbResult> { - let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?; + let cursor = self + .store + .read_cursor() + .await + .map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?; Ok(cursor) } async fn write(&mut self) -> DbResult> { - let cursor = self.store.write_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e))?; + let cursor = self.store.write_cursor().await.map_err(|e| { + RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e) + })?; Ok(cursor) } @@ -136,16 +149,30 @@ impl State { self.tables.push(RwLock::new(table)); } - async fn select_all_rows(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection) -> DbResult - where W: ResponseWriter, - C: Cancellation + async fn select_all_rows( + table: &Table, + mut cursor: ReadCursor<'_, Value>, + response_writer: &mut W, + cancellation: &C, + column_selection: ColumnSelection, + ) -> DbResult + where + W: ResponseWriter, + C: Cancellation, { let mut count = 0; - while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? { + while let Some(entry) = cursor + .next_alive() + .await + .map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? + { count += 1; let row: Row = From::from(entry); let restricted_row = row.restrict_columns(&column_selection); - response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; + response_writer + .write_table_row(&restricted_row) + .await + .map_err(|e| RuntimeError::AnyhowError(e))?; if cancellation.is_canceled() { break; @@ -155,16 +182,31 @@ impl State { Ok(count) } - async fn select_eq(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult - where W: ResponseWriter, - C: Cancellation + async fn select_eq( + table: &Table, + mut cursor: ReadCursor<'_, Value>, + response_writer: &mut W, + cancellation: &C, + column_selection: ColumnSelection, + column: Column, + value: Value, + ) -> DbResult + where + W: ResponseWriter, + C: Cancellation, { - let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; + let entries = cursor + .select_entries_where_eq(column as storage_engine::store::Column, &value) + .await + .map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; let count = entries.len(); for entry in entries { let row: Row = From::from(entry); let restricted_row = row.restrict_columns(&column_selection); - response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; + response_writer + .write_table_row(&restricted_row) + .await + .map_err(|e| RuntimeError::AnyhowError(e))?; if cancellation.is_canceled() { break; @@ -174,28 +216,43 @@ impl State { Ok(count) } - async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>) -> DbResult { - let count = cursor.delete_all_entries(true) - .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; + async fn delete_all_rows( + table_name: String, + mut cursor: WriteCursor<'_, Value>, + ) -> DbResult { + let count = cursor + .delete_all_entries(true) + .await + .map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; Ok(count) } - async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, eq_column: Column, value: Value) -> DbResult { - let count = - cursor.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true) - .await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; + async fn delete_all_eq( + table_name: String, + mut cursor: WriteCursor<'_, Value>, + eq_column: Column, + value: Value, + ) -> DbResult { + let count = cursor + .delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true) + .await + .map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; Ok(count) } } impl StateHandler { pub fn is_existing_db(db_path: &PathBuf) -> bool { - db_path.exists() && db_path.is_dir() && - db_path.join(METADATA_FILE).exists() && db_path.join(METADATA_FILE).is_file() + db_path.exists() + && db_path.is_dir() + && db_path.join(METADATA_FILE).exists() + && db_path.join(METADATA_FILE).is_file() } pub async fn new(db_path: PathBuf) -> DbResult { - fs::create_dir(db_path.clone()).await.map_err(|e| RuntimeError::IoError(e))?; + fs::create_dir(db_path.clone()) + .await + .map_err(|e| RuntimeError::IoError(e))?; let state = Self { db_path, @@ -228,14 +285,21 @@ impl StateHandler { let metadata_file = self.db_path.join(METADATA_FILE); let state = self.state.read().await; let metadata_raw = serde_json::to_string(&*state)?; - fs::write(metadata_file, metadata_raw).await.map_err(|e| RuntimeError::IoError(e)) + fs::write(metadata_file, metadata_raw) + .await + .map_err(|e| RuntimeError::IoError(e)) } pub async fn read_state(&self) -> RwLockReadGuard { self.state.read().await } - pub async fn interpret(&self, response_writer: &mut W, cancellation: &C, operation: Operation) -> DbResult<()> { + pub async fn interpret( + &self, + response_writer: &mut W, + cancellation: &C, + operation: Operation, + ) -> DbResult<()> { use Operation::*; match operation { @@ -245,12 +309,38 @@ impl StateHandler { let table = state.table_at(table_position).await; let cursor = table.read().await?; - response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; + response_writer + .write_table_header(&table.schema, &column_selection) + .await + .map_err(|e| RuntimeError::AnyhowError(e))?; let count = match maybe_condition { - None => State::select_all_rows(&table, cursor, response_writer, cancellation, column_selection).await?, - Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, cancellation, column_selection, eq_column, value).await? + None => { + State::select_all_rows( + &table, + cursor, + response_writer, + cancellation, + column_selection, + ) + .await? + } + Some(Condition::Eq(eq_column, value)) => { + State::select_eq( + &table, + cursor, + response_writer, + cancellation, + column_selection, + eq_column, + value, + ) + .await? + } }; - response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) + response_writer + .write_command_complete(CompleteStatus::Select(count)) + .await + .map_err(|e| RuntimeError::AnyhowError(e)) } Insert(table_position, values) => { let state = self.state.read().await; @@ -259,9 +349,14 @@ impl StateHandler { let mut cursor = table.write().await?; let entry = Entry::new(values); - cursor.insert_entry(entry).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; + cursor.insert_entry(entry).await.map_err(|e| { + RuntimeError::StorageEngineError(table.table_name().to_string(), e) + })?; - response_writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await.map_err(|e| RuntimeError::AnyhowError(e)) + response_writer + .write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }) + .await + .map_err(|e| RuntimeError::AnyhowError(e)) } Delete(table_position, maybe_condition) => { let state = self.state.read().await; @@ -272,10 +367,15 @@ impl StateHandler { let count = match maybe_condition { None => State::delete_all_rows(table_name, cursor).await?, - Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, eq_column, value).await? + Some(Condition::Eq(eq_column, value)) => { + State::delete_all_eq(table_name, cursor, eq_column, value).await? + } }; - response_writer.write_command_complete(CompleteStatus::Delete(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) + response_writer + .write_command_complete(CompleteStatus::Delete(count)) + .await + .map_err(|e| RuntimeError::AnyhowError(e)) } CreateTable(table_schema) => { { @@ -285,34 +385,42 @@ impl StateHandler { // WARNING: We need to drop the write lock on state unless we want a deadlock. } self.save_metadata().await?; - response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e)) + response_writer + .write_command_complete(CompleteStatus::CreateTable) + .await + .map_err(|e| RuntimeError::AnyhowError(e)) } CreateIndex(table_position, column) => { let state = self.state.read().await; let mut table = state.table_at_mut(table_position).await; let mut cursor = table.write().await?; - cursor.attach_index(column as storage_engine::store::Column).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; - response_writer.write_command_complete(CompleteStatus::CreateIndex).await.map_err(|e| RuntimeError::AnyhowError(e)) + cursor + .attach_index(column as storage_engine::store::Column) + .await + .map_err(|e| { + RuntimeError::StorageEngineError(table.table_name().to_string(), e) + })?; + response_writer + .write_command_complete(CompleteStatus::CreateIndex) + .await + .map_err(|e| RuntimeError::AnyhowError(e)) } } } } - - - #[cfg(test)] mod tests { use super::*; + use crate::cancellation::DummyCancellation; use crate::operation::Operation; - use crate::schema::Column; use crate::response_writer::ResponseWriterStub; + use crate::schema::Column; use crate::type_system::{DbType, IndexableValue, Value}; use std::collections::HashSet; - use tokio::fs::{File, OpenOptions, DirBuilder}; use tokio::fs; - use crate::cancellation::DummyCancellation; + use tokio::fs::{DirBuilder, File, OpenOptions}; impl Drop for State { fn drop(&mut self) { @@ -344,18 +452,24 @@ mod tests { let mut response_writer = ResponseWriterStub::new(); state - .interpret(&mut response_writer, &DummyCancellation, Operation::CreateTable(users_schema.clone())).await + .interpret( + &mut response_writer, + &DummyCancellation, + Operation::CreateTable(users_schema.clone()), + ) + .await .unwrap(); { println!("==EMPTY SELECT==="); let users_position: TablePosition = 0; state - .interpret(&mut response_writer, &DummyCancellation, Operation::Select( - users_position, - users_schema.all_selection(), - None, - )).await + .interpret( + &mut response_writer, + &DummyCancellation, + Operation::Select(users_position, users_schema.all_selection(), None), + ) + .await .unwrap(); } @@ -370,27 +484,27 @@ mod tests { println!("About to insert!"); state - .interpret(&mut response_writer, &DummyCancellation, Operation::Insert( - users, - vec![id.clone(), name.clone(), age.clone()], - )).await + .interpret( + &mut response_writer, + &DummyCancellation, + Operation::Insert(users, vec![id.clone(), name.clone(), age.clone()]), + ) + .await .unwrap(); } { println!("==SELECT==="); let users_position: TablePosition = 0; state - .interpret(&mut response_writer, &DummyCancellation, Operation::Select( - users_position, - users_schema.all_selection(), - None, - )).await + .interpret( + &mut response_writer, + &DummyCancellation, + Operation::Select(users_position, users_schema.all_selection(), None), + ) + .await .unwrap(); } - - - // assert!(false); // assert!(state.tables.len() == 1); @@ -400,4 +514,3 @@ mod tests { // assert!(table.table_name() == &users); } } - diff --git a/minisql/src/lib.rs b/minisql/src/lib.rs index 97cb941..257959d 100644 --- a/minisql/src/lib.rs +++ b/minisql/src/lib.rs @@ -1,3 +1,4 @@ +pub mod cancellation; mod error; mod internals; pub mod interpreter; @@ -8,4 +9,3 @@ pub mod restricted_row; mod result; pub mod schema; pub mod type_system; -pub mod cancellation; diff --git a/minisql/src/response_writer.rs b/minisql/src/response_writer.rs index 240a2f8..c60bcc0 100644 --- a/minisql/src/response_writer.rs +++ b/minisql/src/response_writer.rs @@ -46,8 +46,7 @@ impl ResponseWriterStub { } #[async_trait] -impl ResponseWriter for ResponseWriterStub -{ +impl ResponseWriter for ResponseWriterStub { async fn write_table_header( &mut self, table_schema: &TableSchema, diff --git a/minisql/src/restricted_row.rs b/minisql/src/restricted_row.rs index 07ad0e0..16cfa38 100644 --- a/minisql/src/restricted_row.rs +++ b/minisql/src/restricted_row.rs @@ -1,6 +1,6 @@ +use crate::operation::ColumnSelection; use crate::schema::{Column, TableSchema}; use crate::type_system::Value; -use crate::operation::ColumnSelection; use std::ops::Index; use std::slice::SliceIndex; use storage_engine::segments::entry::EntryDetailed; diff --git a/minisql/src/schema.rs b/minisql/src/schema.rs index 10e542f..637733b 100644 --- a/minisql/src/schema.rs +++ b/minisql/src/schema.rs @@ -59,11 +59,10 @@ impl TableSchema { } pub fn get_columns(&self) -> Vec<&ColumnName> { - let mut columns_in_random_order: Vec<_> = self.column_name_position_mapping - .iter() - .collect(); + let mut columns_in_random_order: Vec<_> = + self.column_name_position_mapping.iter().collect(); columns_in_random_order.sort_by(|&(_, column0), &(_, column1)| column0.cmp(column1)); - + let columns: Vec<_> = columns_in_random_order .iter() .map(|(name, _)| *name) diff --git a/minisql/src/type_system.rs b/minisql/src/type_system.rs index 155eb99..e3d6f41 100644 --- a/minisql/src/type_system.rs +++ b/minisql/src/type_system.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use std::cmp::Ordering; // TODO: Private??? // use bincode::{Encode, Encoder, EncodeError, Decode, Decoder, DecodeError}; -use bincode::{Encode, Decode}; +use bincode::{Decode, Encode}; // ==============Types================ #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)] diff --git a/parser/src/core.rs b/parser/src/core.rs index 28059fe..1fc1f3e 100644 --- a/parser/src/core.rs +++ b/parser/src/core.rs @@ -1,6 +1,12 @@ use crate::syntax::RawQuerySyntax; use minisql::{interpreter2::DbSchema, operation::Operation}; -use nom::{branch::alt, character::complete::{multispace0, char}, multi::many1, sequence::{delimited, terminated}, IResult}; +use nom::{ + branch::alt, + character::complete::{char, multispace0}, + multi::many1, + sequence::{delimited, terminated}, + IResult, +}; use thiserror::Error; use crate::{ @@ -35,7 +41,10 @@ fn parse_statement(input: &str) -> IResult<&str, RawQuerySyntax> { /// Parse one or more statements #[allow(dead_code)] fn parse_statement1(input: &str) -> IResult<&str, Vec> { - many1(terminated(parse_statement, delimited(multispace0, char(';'), multispace0)))(input) + many1(terminated( + parse_statement, + delimited(multispace0, char(';'), multispace0), + ))(input) } pub fn parse_and_validate(str_query: String, db_schema: &DbSchema) -> Result { @@ -53,11 +62,9 @@ mod test { #[test] fn test_parse_two_select() { - let (rest, sntx) = parse_statement1("SELECT * FROM users ; SELECT * FROM cities ; ").expect("should parse"); - assert_eq!( - sntx.len(), - 2 - ); + let (rest, sntx) = parse_statement1("SELECT * FROM users ; SELECT * FROM cities ; ") + .expect("should parse"); + assert_eq!(sntx.len(), 2); assert_eq!(rest, ""); } @@ -68,11 +75,10 @@ mod test { SELECT * FROM users ; INSERT INTO table1 (id, data) VALUES (u4, 30) ; INSERT INTO table1 (id, data) VALUES (u5, 40) ; - "#).expect("should parse"); - assert_eq!( - sntx.len(), - 4 - ); + "#, + ) + .expect("should parse"); + assert_eq!(sntx.len(), 4); assert_eq!(rest, ""); } } diff --git a/parser/src/parsing/common.rs b/parser/src/parsing/common.rs index 9d9aef6..f3ded31 100644 --- a/parser/src/parsing/common.rs +++ b/parser/src/parsing/common.rs @@ -1,10 +1,19 @@ use minisql::type_system::DbType; use nom::{ - branch::alt, bytes::complete::{tag, take_while}, character::{complete::{alphanumeric1, anychar, char, multispace0, multispace1}, is_alphanumeric}, combinator::peek, error::make_error, sequence::{delimited, terminated}, IResult, Parser + branch::alt, + bytes::complete::{tag, take_while}, + character::{ + complete::{alphanumeric1, anychar, char, multispace0, multispace1}, + is_alphanumeric, + }, + combinator::peek, + error::make_error, + sequence::{delimited, terminated}, + IResult, Parser, }; -use crate::syntax::Condition; use super::literal::parse_literal; +use crate::syntax::Condition; pub fn parse_table_name(input: &str) -> IResult<&str, &str> { alt(( @@ -16,11 +25,9 @@ pub fn parse_table_name(input: &str) -> IResult<&str, &str> { pub fn parse_identifier(input: &str) -> IResult<&str, &str> { let (_, first) = peek(anychar)(input)?; if first.is_alphabetic() || first == '_' { - take_while(|c: char| { - match c { - 'a'..='z' | 'A'..='Z' | '_' | '0'..='9' => true, - _ => false - } + take_while(|c: char| match c { + 'a'..='z' | 'A'..='Z' | '_' | '0'..='9' => true, + _ => false, })(input) } else { Err(nom::Err::Error(make_error( @@ -35,22 +42,13 @@ pub fn parse_column_name(input: &str) -> IResult<&str, String> { } pub fn parse_db_type(input: &str) -> IResult<&str, DbType> { - let (input, db_type) = alt( - ( - tag("STRING") - .map(|_| DbType::String), - tag("INT") - .map(|_| DbType::Int), - tag("NUMBER") - .map(|_| DbType::Number), - tag("UUID") - .map(|_| DbType::Uuid), - delimited(tag("Option("), parse_db_type, tag(")")) - .map(|ty| { - DbType::Option(Box::new(ty)) - }) - ) - )(input)?; + let (input, db_type) = alt(( + tag("STRING").map(|_| DbType::String), + tag("INT").map(|_| DbType::Int), + tag("NUMBER").map(|_| DbType::Number), + tag("UUID").map(|_| DbType::Uuid), + delimited(tag("Option("), parse_db_type, tag(")")).map(|ty| DbType::Option(Box::new(ty))), + ))(input)?; Ok((input, db_type)) } @@ -122,10 +120,7 @@ mod tests { parse_identifier("_variable__Test").expect("should parse").1, "_variable__Test" ); - assert!(matches!( - parse_identifier("123_variable__Test"), - Err(_) - )); + assert!(matches!(parse_identifier("123_variable__Test"), Err(_))); } #[test] @@ -139,8 +134,12 @@ mod tests { #[test] fn test_parse_nested_option_int_type() { assert_eq!( - parse_db_type("Option(Option(Option(INT)))").expect("should parse").1, - DbType::Option(Box::new(DbType::Option(Box::new(DbType::Option(Box::new(DbType::Int)))))) + parse_db_type("Option(Option(Option(INT)))") + .expect("should parse") + .1, + DbType::Option(Box::new(DbType::Option(Box::new(DbType::Option( + Box::new(DbType::Int) + ))))) ); } } diff --git a/parser/src/parsing/create.rs b/parser/src/parsing/create.rs index 1f4f7b1..285741f 100644 --- a/parser/src/parsing/create.rs +++ b/parser/src/parsing/create.rs @@ -77,8 +77,7 @@ mod tests { #[test] fn test_parse_create_no_quotes_table_name() { - parse_create("CREATE TABLE Table1(id UUID PRIMARY KEY,column1 INT)") - .expect("should parse"); + parse_create("CREATE TABLE Table1(id UUID PRIMARY KEY,column1 INT)").expect("should parse"); } #[test] @@ -91,8 +90,8 @@ mod tests { #[test] fn test_parse_create() { - let (_, create) = parse_create("CREATE TABLE \"Table1\"( id UUID , column1 INT )") - .expect("should parse"); + let (_, create) = + parse_create("CREATE TABLE \"Table1\"( id UUID , column1 INT )").expect("should parse"); assert!(matches!(create, RawQuerySyntax::CreateTable(_))); match create { RawQuerySyntax::CreateTable(schema) => { @@ -114,11 +113,13 @@ mod tests { _ => {} } } - + #[test] fn test_parse_create_option() { - let (_, create) = parse_create("CREATE TABLE games (id UUID PRIMARY KEY, name STRING, year Option(INT), price NUMBER)") - .expect("should parse"); + let (_, create) = parse_create( + "CREATE TABLE games (id UUID PRIMARY KEY, name STRING, year Option(INT), price NUMBER)", + ) + .expect("should parse"); assert!(matches!(create, RawQuerySyntax::CreateTable(_))); match create { RawQuerySyntax::CreateTable(schema) => { @@ -139,16 +140,12 @@ mod tests { assert_eq!(column1_column.type_, DbType::String); let column = schema.get_column(&"year".to_string()); - let Some(column) = column else { - panic!() - }; + let Some(column) = column else { panic!() }; assert_eq!(column.column_name, "year".to_string()); assert_eq!(column.type_, DbType::Option(Box::new(DbType::Int))); let column = schema.get_column(&"price".to_string()); - let Some(column) = column else { - panic!() - }; + let Some(column) = column else { panic!() }; assert_eq!(column.column_name, "price".to_string()); assert_eq!(column.type_, DbType::Number); } diff --git a/parser/src/parsing/delete.rs b/parser/src/parsing/delete.rs index 56e56f3..96c9d4d 100644 --- a/parser/src/parsing/delete.rs +++ b/parser/src/parsing/delete.rs @@ -29,22 +29,19 @@ mod tests { #[test] fn test_parse_delete() { - let (_, sntx) = - parse_delete("DELETE FROM \"T1\" WHERE id = 1").expect("should parse"); + let (_, sntx) = parse_delete("DELETE FROM \"T1\" WHERE id = 1").expect("should parse"); assert!(matches!(sntx, RawQuerySyntax::Delete(_, _))) } #[test] fn test_parse_delete_with_spaces() { - let (_, sntx) = - parse_delete("DELETE FROM T1 WHERE id = 1").expect("should parse"); + let (_, sntx) = parse_delete("DELETE FROM T1 WHERE id = 1").expect("should parse"); assert!(matches!(sntx, RawQuerySyntax::Delete(_, _))) } - + #[test] fn test_parse_delete_none() { - let (_, sntx) = - parse_delete("DELETE FROM games WHERE year = None").expect("should parse"); + let (_, sntx) = parse_delete("DELETE FROM games WHERE year = None").expect("should parse"); if let RawQuerySyntax::Delete(tname, Some(Condition::Eq(column_name, lit))) = sntx { assert_eq!(tname, "games".to_string()); assert_eq!(column_name, "year".to_string()); diff --git a/parser/src/parsing/insert.rs b/parser/src/parsing/insert.rs index e4c5328..ade6763 100644 --- a/parser/src/parsing/insert.rs +++ b/parser/src/parsing/insert.rs @@ -68,10 +68,7 @@ mod tests { insertion_values, vec![ ("id".to_string(), Literal::Int(1)), - ( - "data".to_string(), - Literal::String("Text".to_string()) - ) + ("data".to_string(), Literal::String("Text".to_string())) ] ); } @@ -83,8 +80,7 @@ mod tests { #[test] fn test_parse_insert_with_spaces() { - let sql = - "INSERT INTO \"MyTable\" ( id, data ) VALUES ( 1, \"Text\" )"; + let sql = "INSERT INTO \"MyTable\" ( id, data ) VALUES ( 1, \"Text\" )"; let operation = parse_insert(sql).expect("should parse"); match operation { ("", RawQuerySyntax::Insert(table_name, insertion_values)) => { @@ -93,10 +89,7 @@ mod tests { insertion_values, vec![ ("id".to_string(), Literal::Int(1)), - ( - "data".to_string(), - Literal::String("Text".to_string()) - ) + ("data".to_string(), Literal::String("Text".to_string())) ] ); } @@ -117,18 +110,12 @@ mod tests { insertion_values, vec![ ("id".to_string(), Literal::Uuid(12345)), - ( - "name".to_string(), - Literal::String("Doom".to_string()) - ), + ("name".to_string(), Literal::String("Doom".to_string())), ( "year".to_string(), Literal::Some(Box::new(Literal::Int(1993))) ), - ( - "price".to_string(), - Literal::Number(6.5) - ) + ("price".to_string(), Literal::Number(6.5)) ] ); } diff --git a/parser/src/parsing/literal.rs b/parser/src/parsing/literal.rs index b09b0cb..34bab14 100644 --- a/parser/src/parsing/literal.rs +++ b/parser/src/parsing/literal.rs @@ -1,5 +1,12 @@ use nom::{ - branch::alt, bytes::complete::tag, character::complete::{char, digit1, none_of, u64}, combinator::opt, error::make_error, multi::many0, sequence::{delimited, pair, preceded}, IResult, Parser + branch::alt, + bytes::complete::tag, + character::complete::{char, digit1, none_of, u64}, + combinator::opt, + error::make_error, + multi::many0, + sequence::{delimited, pair, preceded}, + IResult, Parser, }; #[derive(Debug, PartialEq)] @@ -13,7 +20,13 @@ pub enum Literal { } pub fn parse_literal(input: &str) -> IResult<&str, Literal> { - alt((parse_option, parse_string, parse_number, parse_int, parse_uuid))(input) + alt(( + parse_option, + parse_string, + parse_number, + parse_int, + parse_uuid, + ))(input) } pub fn parse_number(input: &str) -> IResult<&str, Literal> { @@ -92,16 +105,16 @@ pub fn parse_string(input: &str) -> IResult<&str, Literal> { } pub fn parse_uuid(input: &str) -> IResult<&str, Literal> { - let (input, value) = pair(char('u'), u64)(input) - .map(|(input, (_, v))| (input, Literal::Uuid(v)))?; + let (input, value) = + pair(char('u'), u64)(input).map(|(input, (_, v))| (input, Literal::Uuid(v)))?; Ok((input, value)) } pub fn parse_option(input: &str) -> IResult<&str, Literal> { - let (input, inner) = alt((tag("None") - .map(|_| Literal::None), delimited(tag("Some("), parse_literal, tag(")")).map(|v| { - Literal::Some(Box::new(v)) - })))(input)?; + let (input, inner) = alt(( + tag("None").map(|_| Literal::None), + delimited(tag("Some("), parse_literal, tag(")")).map(|v| Literal::Some(Box::new(v))), + ))(input)?; Ok((input, inner)) } @@ -114,24 +127,15 @@ mod tests { fn test_string_parser() { assert_eq!( parse_string(r#""simple""#), - Ok(( - "", - Literal::String(String::from("simple")) - )) + Ok(("", Literal::String(String::from("simple")))) ); assert_eq!( parse_string(r#""\"\t\r\n\\""#), - Ok(( - "", - Literal::String(String::from("\"\t\r\n\\")) - )) + Ok(("", Literal::String(String::from("\"\t\r\n\\")))) ); assert_eq!( parse_string(r#""name is \"John\".""#), - Ok(( - "", - Literal::String(String::from("name is \"John\".")) - )) + Ok(("", Literal::String(String::from("name is \"John\".")))) ); } @@ -147,13 +151,11 @@ mod tests { let (_, _) = parse_literal("\"STRING\"").expect("should parse"); let (input, value) = - parse_literal("\"abcdefghkjklmnopqrstuvwxyz!@#$%^&*()_+ \"").expect("should parse"); + parse_literal("\"abcdefghkjklmnopqrstuvwxyz!@#$%^&*()_+ \"").expect("should parse"); assert_eq!(input, ""); assert_eq!( value, - Literal::String( - "abcdefghkjklmnopqrstuvwxyz!@#$%^&*()_+ ".to_string() - ) + Literal::String("abcdefghkjklmnopqrstuvwxyz!@#$%^&*()_+ ".to_string()) ); } @@ -192,18 +194,12 @@ mod tests { #[test] fn test_parse_int() { - assert_eq!( - parse_literal("5134616"), - Ok(("", Literal::Int(5134616))) - ); + assert_eq!(parse_literal("5134616"), Ok(("", Literal::Int(5134616)))); } #[test] fn test_parse_uuid() { - assert_eq!( - parse_uuid("u131515"), - Ok(("", Literal::Uuid(131515))) - ) + assert_eq!(parse_uuid("u131515"), Ok(("", Literal::Uuid(131515)))) } #[test] @@ -214,7 +210,10 @@ mod tests { ); assert_eq!( parse_option("Some(Some(3))"), - Ok(("", Literal::Some(Box::new(Literal::Some(Box::new(Literal::Int(3))))))) + Ok(( + "", + Literal::Some(Box::new(Literal::Some(Box::new(Literal::Int(3))))) + )) ); assert_eq!( parse_option("Some(None)"), diff --git a/parser/src/parsing/select.rs b/parser/src/parsing/select.rs index 6d8274d..1f5e0b7 100644 --- a/parser/src/parsing/select.rs +++ b/parser/src/parsing/select.rs @@ -42,7 +42,9 @@ pub fn try_parse_column_selection(input: &str) -> IResult<&str, ColumnSelection> #[cfg(test)] mod tests { use crate::parsing::{ - common::{parse_column_name, parse_table_name}, literal::Literal, select::parse_select + common::{parse_column_name, parse_table_name}, + literal::Literal, + select::parse_select, }; use crate::syntax::{ColumnSelection, RawQuerySyntax}; @@ -137,7 +139,7 @@ mod tests { } } } - + #[test] fn test_parse_select_option_none() { use crate::syntax::Condition; diff --git a/parser/src/syntax.rs b/parser/src/syntax.rs index a0f5df3..f4c4599 100644 --- a/parser/src/syntax.rs +++ b/parser/src/syntax.rs @@ -71,4 +71,3 @@ impl RawTableSchema { .collect() } } - diff --git a/parser/src/validation.rs b/parser/src/validation.rs index 87eec28..14bc879 100644 --- a/parser/src/validation.rs +++ b/parser/src/validation.rs @@ -37,7 +37,7 @@ pub enum ValidationError { expected_type: DbType, }, #[error("Expected type {expected_type:?}, received None")] - UnexpectedNoneValue{ expected_type: DbType }, + UnexpectedNoneValue { expected_type: DbType }, #[error("values for required columns {0:?} are missing")] RequiredColumnsAreMissing(Vec), } @@ -284,7 +284,10 @@ fn validate_condition( )?; let value_type: DbType = type_from_literal_with_type_hint(&value, &expected_type)?; if expected_type.eq(&value_type) { - Ok(Some(operation::Condition::Eq(column, literal_to_value(value, &expected_type)))) + Ok(Some(operation::Condition::Eq( + column, + literal_to_value(value, &expected_type), + ))) } else { Err(ValidationError::TypeMismatch { column_name: column_name.to_string(), @@ -339,10 +342,7 @@ where None } -fn get_table_schema( - db_schema: &DbSchema, - table_name: &TableName, -) -> Option> { +fn get_table_schema(db_schema: &DbSchema, table_name: &TableName) -> Option> { let (_, _, table_schema) = db_schema .iter() .find(|(tname, _, _)| table_name.eq(tname))?; @@ -360,15 +360,18 @@ fn literal_to_value(lit: Literal, hint: &DbType) -> Value { if let DbType::Option(t) = hint { Value::None(*t.clone()) } else { - // By the time calling current function, hopefully we should be sure about the + // By the time calling current function, hopefully we should be sure about the // type we want from the literal panic!() } - }, + } } } -fn type_from_literal_with_type_hint(lit: &Literal, hint: &DbType) -> Result { +fn type_from_literal_with_type_hint( + lit: &Literal, + hint: &DbType, +) -> Result { Ok(match lit { Literal::Number(_) => DbType::Number, Literal::String(_) => DbType::String, @@ -379,7 +382,9 @@ fn type_from_literal_with_type_hint(lit: &Literal, hint: &DbType) -> Result, diff --git a/storage_engine/src/binary_coding.rs b/storage_engine/src/binary_coding.rs index 1b2a475..a607296 100644 --- a/storage_engine/src/binary_coding.rs +++ b/storage_engine/src/binary_coding.rs @@ -1,9 +1,11 @@ use bincode; -use bincode::{Decode, Encode}; use bincode::config::{BigEndian, Configuration, Fixint}; +use bincode::{Decode, Encode}; use std::mem::size_of; -const BIN_CONFIG: Configuration = bincode::config::standard().with_big_endian().with_fixed_int_encoding(); +const BIN_CONFIG: Configuration = bincode::config::standard() + .with_big_endian() + .with_fixed_int_encoding(); pub fn encode(t: &T) -> Result, bincode::error::EncodeError> { bincode::encode_to_vec(t, BIN_CONFIG) @@ -40,24 +42,27 @@ pub fn encode_sequence(ts: &[T]) -> Result, bincode::error::E let mut result = vec![]; for t in ts { result.append(&mut encode(&t)?); - } Ok(result) } -pub fn encode_sequence_with_sizes(ts: &[T]) -> Result<(Vec, Vec), bincode::error::EncodeError> { +pub fn encode_sequence_with_sizes( + ts: &[T], +) -> Result<(Vec, Vec), bincode::error::EncodeError> { let mut result_bytes = vec![]; let mut sizes = Vec::with_capacity(ts.len()); for t in ts { let mut bytes = encode(&t)?; sizes.push(bytes.len()); result_bytes.append(&mut bytes); - } Ok((result_bytes, sizes)) } -pub fn decode_sequence(len: usize, bytes: &[u8]) -> Result, bincode::error::DecodeError> { +pub fn decode_sequence( + len: usize, + bytes: &[u8], +) -> Result, bincode::error::DecodeError> { let mut result: Vec = Vec::with_capacity(len); let mut offset = 0; for _ in 0..len { @@ -68,7 +73,6 @@ pub fn decode_sequence(len: usize, bytes: &[u8]) -> Result, bi Ok(result) } - #[cfg(test)] mod tests { use super::*; diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 5c76577..c145398 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -1,21 +1,24 @@ -use tokio::fs::{File, OpenOptions}; -use tokio::fs; -use std::path::Path; -use std::marker::PhantomData; use std::collections::{BTreeMap, HashSet}; +use std::marker::PhantomData; +use std::path::Path; +use tokio::fs; +use tokio::fs::{File, OpenOptions}; use bincode; use bincode::{Decode, Encode}; +use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries; +use crate::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex}; +use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; +use crate::cursor_capabilities::traversal::CursorCanTraverse; +use crate::index::Index; use crate::segments::entry::EntryDetailed; use crate::segments::entry_header::EntryHeader; use crate::segments::store_header::StoreHeader; -use crate::store::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; -use crate::index::Index; -use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; -use crate::cursor_capabilities::traversal::CursorCanTraverse; -use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries; -use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex}; +use crate::store::{ + Column, FilePosition, Result, Store, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME, + ROWS_FILE_NAME, +}; const GARBAGE_COLLECTION_TRIGGER: usize = 100; @@ -45,10 +48,9 @@ pub struct AppendOnlyCursor { eof_file_position: FilePosition, } - // ===========Implementations============= // ===primitive capabilities=== -impl CursorCanRead for ReadCursor<'_, T> { +impl CursorCanRead for ReadCursor<'_, T> { fn file(&mut self) -> &mut File { &mut self.file } @@ -58,7 +60,7 @@ impl CursorCanRead for ReadCursor<'_, T> { } } -impl CursorCanRead for WriteCursor<'_, T> { +impl CursorCanRead for WriteCursor<'_, T> { fn file(&mut self) -> &mut File { &mut self.file } @@ -68,7 +70,7 @@ impl CursorCanRead for WriteCursor<'_, T> { } } -impl CursorCanRead for AppendOnlyCursor { +impl CursorCanRead for AppendOnlyCursor { fn file(&mut self) -> &mut File { &mut self.file } @@ -78,108 +80,123 @@ impl CursorCanRead for AppendOnlyCursor { } } -impl CursorCanWrite for WriteCursor<'_, T> {} -impl CursorCanWrite for AppendOnlyCursor {} - +impl CursorCanWrite for WriteCursor<'_, T> {} +impl CursorCanWrite for AppendOnlyCursor {} // ===capability to access header=== -impl CursorCanTraverse for ReadCursor<'_, T> { - fn header(&self) -> &StoreHeader { &self.header } +impl CursorCanTraverse for ReadCursor<'_, T> { + fn header(&self) -> &StoreHeader { + &self.header + } } -impl CursorCanTraverse for WriteCursor<'_, T> { - fn header(&self) -> &StoreHeader { &self.header } +impl CursorCanTraverse for WriteCursor<'_, T> { + fn header(&self) -> &StoreHeader { + &self.header + } } -impl CursorCanTraverse for AppendOnlyCursor { - fn header(&self) -> &StoreHeader { &self.header } +impl CursorCanTraverse for AppendOnlyCursor { + fn header(&self) -> &StoreHeader { + &self.header + } } -impl CursorCanModifyEntries for WriteCursor<'_, T> { - fn header_mut(&mut self) -> &mut StoreHeader { self.header } - fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } +impl CursorCanModifyEntries for WriteCursor<'_, T> { + fn header_mut(&mut self) -> &mut StoreHeader { + self.header + } + fn set_eof_file_position(&mut self, new_file_position: FilePosition) { + self.eof_file_position = new_file_position + } } -impl CursorCanModifyEntries for AppendOnlyCursor { - fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header } - fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } +impl CursorCanModifyEntries for AppendOnlyCursor { + fn header_mut(&mut self) -> &mut StoreHeader { + &mut self.header + } + fn set_eof_file_position(&mut self, new_file_position: FilePosition) { + self.eof_file_position = new_file_position + } } // ===capability to access index=== -impl CursorCanReadIndex for ReadCursor<'_, T> { - fn indexes(&mut self) -> &[Option>] { &self.indexes } -} - -impl CursorCanReadIndex for WriteCursor<'_, T> { - fn indexes(&mut self) -> &[Option>] { &self.indexes } -} - -impl CursorCanWriteToIndex for WriteCursor<'_, T> { - fn indexes_mut(&mut self) -> &mut [Option>] { self.indexes } -} - - -// ===Specifics=== -impl <'cursor, T> ReadCursor<'cursor, T> { - pub async fn new<'store: 'cursor>(store: &'store Store) -> Result - where T: Send + Sync - { - let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); - let file: File = - OpenOptions::new() - .read(true) - .open(path_to_rows) - .await?; - - let mut cursor = Self { - header: store.header.clone(), - file, - indexes: &store.indexes, - - eof_file_position: 0, - }; - let eof_file_position: FilePosition = cursor.seek_to_end().await?; - cursor.eof_file_position = eof_file_position; - - cursor.seek_to_start_of_data().await?; - - Ok(cursor) +impl CursorCanReadIndex for ReadCursor<'_, T> { + fn indexes(&mut self) -> &[Option>] { + &self.indexes } } -impl <'cursor, T> WriteCursor<'cursor, T> -{ - // 'store lives atleast as long as 'cursor - pub async fn new<'store: 'cursor>(store: &'store mut Store) -> Result - where T: Send +impl CursorCanReadIndex for WriteCursor<'_, T> { + fn indexes(&mut self) -> &[Option>] { + &self.indexes + } +} + +impl CursorCanWriteToIndex for WriteCursor<'_, T> { + fn indexes_mut(&mut self) -> &mut [Option>] { + self.indexes + } +} + +// ===Specifics=== +impl<'cursor, T> ReadCursor<'cursor, T> { + pub async fn new<'store: 'cursor>(store: &'store Store) -> Result + where + T: Send + Sync, { - let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); - let file: File = - OpenOptions::new() - .read(true) - .write(true) - .open(path_to_rows) - .await?; + let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); + let file: File = OpenOptions::new().read(true).open(path_to_rows).await?; - let mut cursor = Self { - header: &mut store.header, - file, - indexes: &mut store.indexes, + let mut cursor = Self { + header: store.header.clone(), + file, + indexes: &store.indexes, - eof_file_position: 0, - }; - let eof_file_position: FilePosition = cursor.seek_to_end().await?; - cursor.eof_file_position = eof_file_position; + eof_file_position: 0, + }; + let eof_file_position: FilePosition = cursor.seek_to_end().await?; + cursor.eof_file_position = eof_file_position; - cursor.seek_to_start_of_data().await?; + cursor.seek_to_start_of_data().await?; - Ok(cursor) + Ok(cursor) + } +} + +impl<'cursor, T> WriteCursor<'cursor, T> { + // 'store lives atleast as long as 'cursor + pub async fn new<'store: 'cursor>(store: &'store mut Store) -> Result + where + T: Send, + { + let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); + let file: File = OpenOptions::new() + .read(true) + .write(true) + .open(path_to_rows) + .await?; + + let mut cursor = Self { + header: &mut store.header, + file, + indexes: &mut store.indexes, + + eof_file_position: 0, + }; + let eof_file_position: FilePosition = cursor.seek_to_end().await?; + cursor.eof_file_position = eof_file_position; + + cursor.seek_to_start_of_data().await?; + + Ok(cursor) } // ===Entry Header Manipulation=== // assumes we are at the start of valid entry. - async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> - where T: Send + async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> + where + T: Send, { let bytes: Vec = entry_header.encode()?; self.write_bytes(&bytes).await?; @@ -187,8 +204,13 @@ impl <'cursor, T> WriteCursor<'cursor, T> } // ===Deletion=== - pub async fn mark_deleted_at(&mut self, file_position: FilePosition, enable_garbage_collector: bool) -> Result<()> - where T: Encode + Decode + Ord + Send + Sync + Clone + Ord + pub async fn mark_deleted_at( + &mut self, + file_position: FilePosition, + enable_garbage_collector: bool, + ) -> Result<()> + where + T: Encode + Decode + Ord + Send + Sync + Clone + Ord, { self.seek_to(file_position).await?; let mut entry_header = self.read_entry_header().await?; @@ -205,9 +227,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> // Update index self.seek_to(file_position).await?; match self.next().await? { - Some(entry) => { - self.delete_entry_values_from_indexes(entry).await? - }, + Some(entry) => self.delete_entry_values_from_indexes(entry).await?, None => { // SAFETY: We just modified its header, so it must exist. unreachable!() @@ -221,12 +241,19 @@ impl <'cursor, T> WriteCursor<'cursor, T> } } - async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T, enable_garbage_collector: bool) -> Result>> - where T: Encode + Decode + Ord + Send + Sync + Clone + async fn find_first_eq_bruteforce_and_delete( + &mut self, + column: Column, + t0: &T, + enable_garbage_collector: bool, + ) -> Result>> + where + T: Encode + Decode + Ord + Send + Sync + Clone, { let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?; if let Some(entry) = maybe_entry { - self.mark_deleted_at(entry.file_position, enable_garbage_collector).await?; + self.mark_deleted_at(entry.file_position, enable_garbage_collector) + .await?; Ok(Some(entry)) } else { Ok(maybe_entry) @@ -234,18 +261,23 @@ impl <'cursor, T> WriteCursor<'cursor, T> } // Doesn't update indexes. - async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result - where T: Encode + Decode + Ord + Send + Sync + Clone + async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result + where + T: Encode + Decode + Ord + Send + Sync + Clone, { let mut count = 0; - while let Some(_) = self.find_first_eq_bruteforce_and_delete(column, t0, false).await? { + while let Some(_) = self + .find_first_eq_bruteforce_and_delete(column, t0, false) + .await? + { count += 1; } Ok(count) } - pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result - where T: Encode + Decode + Ord + Send + Sync + Clone + pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result + where + T: Encode + Decode + Ord + Send + Sync + Clone, { let mut count = 0; while let Some(entry) = self.next_alive().await? { @@ -259,21 +291,28 @@ impl <'cursor, T> WriteCursor<'cursor, T> Ok(count) } - pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result - where T: Encode + Decode + Ord + Send + Sync + Clone + pub async fn delete_entries_where_eq( + &mut self, + column: Column, + value: &T, + enable_garbage_collector: bool, + ) -> Result + where + T: Encode + Decode + Ord + Send + Sync + Clone, { - let count = - if self.header().is_column_indexed(column) { - let entries = self.index_lookup(column, value).await?; - let count = entries.len(); - for entry in entries { - self.mark_deleted_at(entry.file_position, false).await? - } - count - } else { - let count = self.find_all_eq_bruteforce_and_delete(column, value).await?; - count - }; + let count = if self.header().is_column_indexed(column) { + let entries = self.index_lookup(column, value).await?; + let count = entries.len(); + for entry in entries { + self.mark_deleted_at(entry.file_position, false).await? + } + count + } else { + let count = self + .find_all_eq_bruteforce_and_delete(column, value) + .await?; + count + }; if enable_garbage_collector { self.attempt_garbage_collection_if_necessary().await?; } @@ -282,8 +321,9 @@ impl <'cursor, T> WriteCursor<'cursor, T> // ===Indexing=== // WARNING: Assumes the column is NOT indexable. - pub async fn attach_index(&mut self, column: Column) -> Result<()> - where T: Ord + Decode + Encode + Send + Sync + pub async fn attach_index(&mut self, column: Column) -> Result<()> + where + T: Ord + Decode + Encode + Send + Sync, { // New Index let index = Store::create_empty_index_at(&self.header, column).await?; @@ -303,8 +343,9 @@ impl <'cursor, T> WriteCursor<'cursor, T> } // ===Garbage Collection=== - async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> - where T: Send + Sync + Decode + Encode + Clone + Ord + async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> + where + T: Send + Sync + Decode + Encode + Clone + Ord, { if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER { println!("=======START GARBAGE COLLECTOR===="); @@ -314,14 +355,16 @@ impl <'cursor, T> WriteCursor<'cursor, T> Ok(()) } - pub async fn initiate_garbage_collection(&mut self) -> Result - where T: Send + Sync + Decode + Encode + Clone + Ord + pub async fn initiate_garbage_collection(&mut self) -> Result + where + T: Send + Sync + Decode + Encode + Clone + Ord, { let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?; // Since garbage collection changes FilePositions of live entries, we need to update the // indexes too. - let mut in_memory_indexes: Vec>>> = Vec::with_capacity(self.header.number_of_columns); + let mut in_memory_indexes: Vec>>> = + Vec::with_capacity(self.header.number_of_columns); for column in 0..self.header.number_of_columns { if self.header.is_column_indexed(column as Column) { let in_memory_index = BTreeMap::new(); @@ -337,12 +380,19 @@ impl <'cursor, T> WriteCursor<'cursor, T> { while let Some(live_entry) = self.next_alive().await? { entries_deleted += 1; - let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?; + let file_position = cursor_to_intermediate + .append_entry_no_indexing(&live_entry.forget()) + .await?; // Update index. (Wouldn't it be nice if we had `for let ...`?) - for (maybe_in_memory_index, value) in in_memory_indexes.iter_mut().zip(&live_entry.data) { + for (maybe_in_memory_index, value) in + in_memory_indexes.iter_mut().zip(&live_entry.data) + { if let Some(in_memory_index) = maybe_in_memory_index { - in_memory_index.entry(value.clone()).or_insert_with(HashSet::new).insert(file_position); + in_memory_index + .entry(value.clone()) + .or_insert_with(HashSet::new) + .insert(file_position); } } } @@ -369,21 +419,24 @@ impl <'cursor, T> WriteCursor<'cursor, T> // current file let path_to_table = Path::new(&self.header.table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); - let path_to_intermediate_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); + let path_to_intermediate_rows = + path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); fs::remove_file(path_to_rows.clone()).await?; fs::rename(path_to_intermediate_rows, path_to_rows).await?; Ok(entries_deleted) } - async fn spawn_cursor_to_intermediate_file(&self) -> Result> - where T: Send + async fn spawn_cursor_to_intermediate_file(&self) -> Result> + where + T: Send, { let table_folder = self.header.table_folder.clone(); let path_to_table = Path::new(&table_folder); let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); - let intermediate_file: File = Store::::create_empty_rows_file(path_to_rows, &self.header).await?; + let intermediate_file: File = + Store::::create_empty_rows_file(path_to_rows, &self.header).await?; let intermediate_header: StoreHeader = StoreHeader { table_folder, diff --git a/storage_engine/src/cursor_capabilities/entry_modification.rs b/storage_engine/src/cursor_capabilities/entry_modification.rs index 6622347..7f807e8 100644 --- a/storage_engine/src/cursor_capabilities/entry_modification.rs +++ b/storage_engine/src/cursor_capabilities/entry_modification.rs @@ -1,14 +1,14 @@ use async_trait::async_trait; +use crate::binary_coding::encode; use bincode; use bincode::Encode; -use crate::binary_coding::encode; +use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; +use crate::cursor_capabilities::traversal::CursorCanTraverse; use crate::segments::entry::Entry; use crate::segments::store_header::StoreHeader; use crate::store::{FilePosition, Result}; -use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; -use crate::cursor_capabilities::traversal::CursorCanTraverse; #[async_trait] pub trait CursorCanModifyEntries: CursorCanTraverse + CursorCanWrite { @@ -16,8 +16,9 @@ pub trait CursorCanModifyEntries: CursorCanTraverse + CursorCanWrite { fn set_eof_file_position(&mut self, new_file_position: FilePosition); // ===Store Header Manipulation=== - async fn increment_total_count(&mut self) -> Result<()> - where T: Send + async fn increment_total_count(&mut self) -> Result<()> + where + T: Send, { self.seek_to_start().await?; self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; @@ -26,18 +27,21 @@ pub trait CursorCanModifyEntries: CursorCanTraverse + CursorCanWrite { Ok(()) } - async fn increment_deleted_count(&mut self) -> Result<()> - where T: Send + async fn increment_deleted_count(&mut self) -> Result<()> + where + T: Send, { self.seek_to_start().await?; - self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; + self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64) + .await?; let new_count = self.header_mut().increment_deleted_count(); self.write_bytes(&encode::(&new_count)?).await?; Ok(()) } - async fn set_header(&mut self, header: &StoreHeader) -> Result<()> - where T: Send + async fn set_header(&mut self, header: &StoreHeader) -> Result<()> + where + T: Send, { self.seek_to_start().await?; let encoded_header: Vec = header.encode()?; @@ -50,8 +54,9 @@ pub trait CursorCanModifyEntries: CursorCanTraverse + CursorCanWrite { // Moves cursor to the end. // Returns file position to the start of the new entry. - async fn append_entry_no_indexing(&mut self, entry: &Entry) -> Result - where T: Encode + Send + Sync + async fn append_entry_no_indexing(&mut self, entry: &Entry) -> Result + where + T: Encode + Send + Sync, { self.increment_total_count().await?; diff --git a/storage_engine/src/cursor_capabilities/index_access.rs b/storage_engine/src/cursor_capabilities/index_access.rs index 81ee38e..5210e95 100644 --- a/storage_engine/src/cursor_capabilities/index_access.rs +++ b/storage_engine/src/cursor_capabilities/index_access.rs @@ -5,19 +5,20 @@ use async_trait::async_trait; use bincode; use bincode::{Decode, Encode}; -use crate::error::Error; -use crate::segments::entry::{Entry, EntryDetailed}; -use crate::store::{FilePosition, Column, Result}; -use crate::index::Index; -use crate::cursor_capabilities::traversal::CursorCanTraverse; use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries; +use crate::cursor_capabilities::traversal::CursorCanTraverse; +use crate::error::Error; +use crate::index::Index; +use crate::segments::entry::{Entry, EntryDetailed}; +use crate::store::{Column, FilePosition, Result}; #[async_trait] pub trait CursorCanReadIndex: CursorCanTraverse { fn indexes(&mut self) -> &[Option>]; - async fn index_lookup(&mut self, column: Column, value: &T) -> Result>> - where T: Encode + Decode + Ord + Send + Sync + async fn index_lookup(&mut self, column: Column, value: &T) -> Result>> + where + T: Encode + Decode + Ord + Send + Sync, { match &self.indexes()[column as usize] { Some(index) => { @@ -25,24 +26,24 @@ pub trait CursorCanReadIndex: CursorCanTraverse { let mut entries: Vec> = vec![]; for &file_position in file_positions.iter() { match self.read_entry_at(file_position).await? { - Some(entry) => { - entries.push(entry) - }, - None => { - return Err(Error::IndexIsStoringEofFilePosition(column)) - } + Some(entry) => entries.push(entry), + None => return Err(Error::IndexIsStoringEofFilePosition(column)), } } Ok(entries) - }, - None => - Err(Error::AttemptToIndexNonIndexableColumn(column)) + } + None => Err(Error::AttemptToIndexNonIndexableColumn(column)), } } - async fn select_entries_where_eq(&mut self, column: Column, value: &T) -> Result>> - where T: Encode + Decode + Ord + Send + Sync + async fn select_entries_where_eq( + &mut self, + column: Column, + value: &T, + ) -> Result>> + where + T: Encode + Decode + Ord + Send + Sync, { if self.header().is_column_indexed(column) { self.index_lookup(column, value).await @@ -59,9 +60,7 @@ pub trait CursorCanWriteToIndex: CursorCanReadIndex + CursorCanModifyEntri // Assumes that the column is indexable. fn mut_index_at(&mut self, column: Column) -> &mut Index { match &mut self.indexes_mut()[column as usize] { - Some(index) => { - index - }, + Some(index) => index, None => { unreachable!() } @@ -69,8 +68,14 @@ pub trait CursorCanWriteToIndex: CursorCanReadIndex + CursorCanModifyEntri } // Assumes that the column is indexable. - async fn insert_into_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> - where T: Encode + Decode + Ord + Send + Sync + 'async_trait + async fn insert_into_index( + &mut self, + column: Column, + value: T, + file_position: FilePosition, + ) -> Result<()> + where + T: Encode + Decode + Ord + Send + Sync + 'async_trait, { let index = self.mut_index_at(column as Column); index.insert(value, file_position).await?; @@ -78,24 +83,37 @@ pub trait CursorCanWriteToIndex: CursorCanReadIndex + CursorCanModifyEntri } // Assumes that the column is indexable. - async fn delete_from_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> - where T: Encode + Decode + Ord + Send + Sync + 'async_trait + async fn delete_from_index( + &mut self, + column: Column, + value: T, + file_position: FilePosition, + ) -> Result<()> + where + T: Encode + Decode + Ord + Send + Sync + 'async_trait, { let index = self.mut_index_at(column as Column); index.delete(value, file_position).await?; Ok(()) } - - async fn insert_entry(&mut self, entry: Entry) -> Result - where T: Encode + Decode + Ord + Send + Sync + 'async_trait + + async fn insert_entry(&mut self, entry: Entry) -> Result + where + T: Encode + Decode + Ord + Send + Sync + 'async_trait, { let file_position = self.append_entry_no_indexing(&entry).await?; // insert the indexable values of the entry into corresponding indexes. - for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { + for (column, (value, should_index)) in entry + .data + .into_iter() + .zip(self.header().indexed_columns.clone()) + .enumerate() + { if should_index { // SAFETY: If should_index is true, then the column is indexable. - self.insert_into_index(column as Column, value, file_position).await? + self.insert_into_index(column as Column, value, file_position) + .await? } } @@ -103,12 +121,19 @@ pub trait CursorCanWriteToIndex: CursorCanReadIndex + CursorCanModifyEntri } async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed) -> Result<()> - where T: Encode + Decode + Ord + Send + Sync + 'async_trait + where + T: Encode + Decode + Ord + Send + Sync + 'async_trait, { - for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { + for (column, (value, should_index)) in entry + .data + .into_iter() + .zip(self.header().indexed_columns.clone()) + .enumerate() + { if should_index { // SAFETY: If should_index is true, then the column is indexable. - self.delete_from_index(column as Column, value, entry.file_position).await? + self.delete_from_index(column as Column, value, entry.file_position) + .await? } } Ok(()) diff --git a/storage_engine/src/cursor_capabilities/mod.rs b/storage_engine/src/cursor_capabilities/mod.rs index 9872f1c..bea5cf7 100644 --- a/storage_engine/src/cursor_capabilities/mod.rs +++ b/storage_engine/src/cursor_capabilities/mod.rs @@ -1,4 +1,4 @@ -pub(crate) mod primitive; -pub mod traversal; pub mod entry_modification; pub mod index_access; +pub(crate) mod primitive; +pub mod traversal; diff --git a/storage_engine/src/cursor_capabilities/primitive.rs b/storage_engine/src/cursor_capabilities/primitive.rs index 82fb23d..bf4e7f5 100644 --- a/storage_engine/src/cursor_capabilities/primitive.rs +++ b/storage_engine/src/cursor_capabilities/primitive.rs @@ -1,6 +1,6 @@ -use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; -use tokio::fs::File; use async_trait::async_trait; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use crate::store::{FilePosition, Result}; diff --git a/storage_engine/src/cursor_capabilities/traversal.rs b/storage_engine/src/cursor_capabilities/traversal.rs index 00a1da6..809d416 100644 --- a/storage_engine/src/cursor_capabilities/traversal.rs +++ b/storage_engine/src/cursor_capabilities/traversal.rs @@ -1,24 +1,24 @@ -use tokio::io::AsyncReadExt; use async_trait::async_trait; +use tokio::io::AsyncReadExt; +use crate::binary_coding::decode; use bincode; use bincode::Decode; -use crate::binary_coding::decode; -use crate::error::{Error, DecodeErrorKind}; +use crate::cursor_capabilities::primitive::CursorCanRead; +use crate::error::{DecodeErrorKind, Error}; use crate::segments::entry::EntryDetailed; use crate::segments::entry_header::EntryHeaderWithDataSize; use crate::segments::store_header::StoreHeader; -use crate::store::{FilePosition, Column, Result}; -use crate::cursor_capabilities::primitive::CursorCanRead; - +use crate::store::{Column, FilePosition, Result}; #[async_trait] pub trait CursorCanTraverse: CursorCanRead { fn header(&self) -> &StoreHeader; async fn seek_to_start_of_data(&mut self) -> Result { - self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await + self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64) + .await } async fn read_entry_header(&mut self) -> Result { @@ -30,14 +30,21 @@ pub trait CursorCanTraverse: CursorCanRead { Ok(header) } - async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { + async fn read_entry_header_at( + &mut self, + file_position: FilePosition, + ) -> Result { self.seek_to(file_position).await?; self.read_entry_header().await } // Returns None when file_position == eof_file_position - async fn read_entry_at(&mut self, file_position: FilePosition) -> Result>> - where T: Decode + async fn read_entry_at( + &mut self, + file_position: FilePosition, + ) -> Result>> + where + T: Decode, { self.seek_to(file_position).await?; self.next().await @@ -46,12 +53,11 @@ pub trait CursorCanTraverse: CursorCanRead { // ===Iteration=== // The following functions assume that the current file position is at a valid entry or EOF. - // WARNING: This moves the file_position to start of the data, so you can't just call // next_entry_header() a bunch of times. You must move the cursor! async fn next_entry_header(&mut self) -> Result> { if self.is_at_eof().await? { - return Ok(None) + return Ok(None); } let entry_header = self.read_entry_header().await?; @@ -60,31 +66,47 @@ pub trait CursorCanTraverse: CursorCanRead { } // This is meant to be used after next_entry_header() is called. - async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result{ + async fn jump_from_start_of_entry_data_to_next_entry( + &mut self, + entry_header: &EntryHeaderWithDataSize, + ) -> Result { let file_position = self.seek_by(entry_header.size_of_data() as i64).await?; Ok(file_position) } - async fn next(&mut self) -> Result>> - where T: Decode + async fn next(&mut self) -> Result>> + where + T: Decode, { let file_position = self.current_file_position().await?; - let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; + let Some(entry_header) = self.next_entry_header().await? else { + return Ok(None); + }; let mut data_bytes: Vec = vec![0; entry_header.size_of_data()]; self.read_bytes(&mut data_bytes).await?; - let entry: EntryDetailed = - EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?; + let entry: EntryDetailed = EntryDetailed::decode( + entry_header, + file_position, + self.header().number_of_columns, + &mut data_bytes, + )?; Ok(Some(entry)) } - // Like next, but only reads the column, not the whole entry. - async fn next_at_column(&mut self, column: Column) -> Result> - where T: Decode + Send + // Like next, but only reads the column, not the whole entry. + async fn next_at_column( + &mut self, + column: Column, + ) -> Result> + where + T: Decode + Send, { let file_position = self.current_file_position().await?; - let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; + let Some(entry_header) = self.next_entry_header().await? else { + return Ok(None); + }; let file_position_at_start_of_data = self.current_file_position().await?; // figuring out how much to decode @@ -94,49 +116,59 @@ pub trait CursorCanTraverse: CursorCanRead { // reading and decoding let mut bytes: Vec = vec![0; entry_header.data_sizes[column as usize]]; self.read_bytes(&mut bytes).await?; - let (value, _) = - decode::(&bytes[..]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; + let (value, _) = decode::(&bytes[..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; // jumping to next entry self.seek_to(file_position_at_start_of_data).await?; - self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?; + self.jump_from_start_of_entry_data_to_next_entry(&entry_header) + .await?; Ok(Some((entry_header, file_position, value))) } - async fn next_alive_at_column(&mut self, column: Column) -> Result> - where T: Decode + Send + async fn next_alive_at_column( + &mut self, + column: Column, + ) -> Result> + where + T: Decode + Send, { while let Some((header, file_position, t)) = self.next_at_column(column).await? { - if !header.is_deleted { - return Ok(Some((header, file_position, t))) + if !header.is_deleted { + return Ok(Some((header, file_position, t))); } } Ok(None) - } + } - async fn next_alive(&mut self) -> Result>> - where T: Decode + async fn next_alive(&mut self) -> Result>> + where + T: Decode, { while let Some(entry) = self.next().await? { - if !entry.header.is_deleted { - return Ok(Some(entry)) + if !entry.header.is_deleted { + return Ok(Some(entry)); } } Ok(None) } // ===Search=== - async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> - where T: Decode + PartialEq + Send + Sync + async fn find_first_eq_bruteforce( + &mut self, + column: Column, + t0: &T, + ) -> Result>> + where + T: Decode + PartialEq + Send + Sync, { let mut file_position = self.current_file_position().await?; while let Some((_, _, t)) = self.next_alive_at_column(column).await? { - if &t == t0 { + if &t == t0 { // go back and decode the whole entry self.seek_to(file_position).await?; - return self.next().await + return self.next().await; } else { file_position = self.current_file_position().await?; } @@ -144,8 +176,13 @@ pub trait CursorCanTraverse: CursorCanRead { Ok(None) } - async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> - where T: Decode + PartialEq + Send + Sync + async fn find_all_eq_bruteforce( + &mut self, + column: Column, + t0: &T, + ) -> Result>> + where + T: Decode + PartialEq + Send + Sync, { let mut entries = vec![]; while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? { @@ -155,8 +192,9 @@ pub trait CursorCanTraverse: CursorCanRead { } // ===Debugging=== - async fn read_entries(&mut self) -> Result<()> - where T: Decode + std::fmt::Debug + async fn read_entries(&mut self) -> Result<()> + where + T: Decode + std::fmt::Debug, { self.seek_to_start_of_data().await?; while let Some(entry) = self.next().await? { diff --git a/storage_engine/src/lib.rs b/storage_engine/src/lib.rs index 00ff87c..8a8eb4b 100644 --- a/storage_engine/src/lib.rs +++ b/storage_engine/src/lib.rs @@ -1,7 +1,7 @@ -pub mod store; mod binary_coding; +pub mod cursor; +pub mod cursor_capabilities; pub mod error; mod index; -pub mod cursor; pub mod segments; -pub mod cursor_capabilities; +pub mod store; diff --git a/storage_engine/src/segments/entry.rs b/storage_engine/src/segments/entry.rs index a2f8bc8..bafe011 100644 --- a/storage_engine/src/segments/entry.rs +++ b/storage_engine/src/segments/entry.rs @@ -1,9 +1,9 @@ use bincode::{Decode, Encode}; -use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence}; -use crate::store::{Result, FilePosition}; -use crate::error::{Error, DecodeErrorKind}; +use crate::binary_coding::{decode_sequence, encode_sequence, encode_sequence_with_sizes}; +use crate::error::{DecodeErrorKind, Error}; use crate::segments::entry_header::{EntryHeader, EntryHeaderWithDataSize}; +use crate::store::{FilePosition, Result}; #[derive(Debug)] pub struct Entry { @@ -18,14 +18,18 @@ pub struct EntryDetailed { pub data: Vec, } -impl Entry { +impl Entry { pub fn new(data: Vec) -> Self { - Self { header: EntryHeader { is_deleted: false }, data } + Self { + header: EntryHeader { is_deleted: false }, + data, + } } // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] - pub fn encode(&self) -> Result> - where T: Encode + pub fn encode(&self) -> Result> + where + T: Encode, { let mut result: Vec = self.header.encode()?; @@ -36,17 +40,28 @@ impl Entry { } } -impl EntryDetailed { - pub fn decode(header: EntryHeaderWithDataSize, file_position: FilePosition, number_of_columns: usize, bytes: &[u8]) -> Result - where T: Decode +impl EntryDetailed { + pub fn decode( + header: EntryHeaderWithDataSize, + file_position: FilePosition, + number_of_columns: usize, + bytes: &[u8], + ) -> Result + where + T: Decode, { let data = decode_sequence::(number_of_columns, bytes) .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; - Ok(EntryDetailed { header, file_position, data }) + Ok(EntryDetailed { + header, + file_position, + data, + }) } - pub fn forget(&self) -> Entry - where T: Clone + pub fn forget(&self) -> Entry + where + T: Clone, { Entry { header: self.header.clone().into(), diff --git a/storage_engine/src/segments/entry_header.rs b/storage_engine/src/segments/entry_header.rs index c0dc11e..e106664 100644 --- a/storage_engine/src/segments/entry_header.rs +++ b/storage_engine/src/segments/entry_header.rs @@ -1,6 +1,6 @@ -use crate::binary_coding::{decode, encode, decode_sequence}; -use crate::store::{Result, Column}; -use crate::error::{Error, DecodeErrorKind}; +use crate::binary_coding::{decode, decode_sequence, encode}; +use crate::error::{DecodeErrorKind, Error}; +use crate::store::{Column, Result}; use std::mem::size_of; #[derive(Debug)] @@ -12,7 +12,7 @@ pub struct EntryHeader { pub struct EntryHeaderWithDataSize { pub is_deleted: bool, pub data_sizes: Vec, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6 - // bytes etc + // bytes etc } impl EntryHeader { @@ -24,21 +24,23 @@ impl EntryHeader { impl From for EntryHeader { fn from(entry: EntryHeaderWithDataSize) -> Self { - Self { is_deleted: entry.is_deleted, } + Self { + is_deleted: entry.is_deleted, + } } } impl EntryHeaderWithDataSize { pub const IS_DELETED_OFFSET: usize = 0; pub const IS_DELETED_SIZE: usize = size_of::(); - pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE; + pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE; pub fn size(number_of_columns: usize) -> usize { - let size_of_data_sizes: usize = number_of_columns*size_of::(); + let size_of_data_sizes: usize = number_of_columns * size_of::(); Self::IS_DELETED_SIZE + size_of_data_sizes } - pub fn size_of_data(&self) -> usize{ + pub fn size_of_data(&self) -> usize { self.data_sizes.iter().sum() } @@ -48,21 +50,23 @@ impl EntryHeaderWithDataSize { if i < column as usize { sum += size; } else { - break + break; } } sum } pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { - let (is_deleted, _) = - decode::(&bytes) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; + let (is_deleted, _) = decode::(&bytes) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; - let data_sizes = decode_sequence::(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?; + let data_sizes = + decode_sequence::(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?; - Ok(Self { is_deleted, data_sizes } ) + Ok(Self { + is_deleted, + data_sizes, + }) } } - diff --git a/storage_engine/src/segments/store_header.rs b/storage_engine/src/segments/store_header.rs index b8bdfd5..ea2cc3f 100644 --- a/storage_engine/src/segments/store_header.rs +++ b/storage_engine/src/segments/store_header.rs @@ -1,6 +1,6 @@ -use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence}; -use crate::store::{Result, Column}; -use crate::error::{Error, DecodeErrorKind}; +use crate::binary_coding::{decode, decode_sequence, encode, encode_sequence}; +use crate::error::{DecodeErrorKind, Error}; +use crate::store::{Column, Result}; use std::mem::size_of; use std::path::PathBuf; @@ -30,14 +30,19 @@ impl StoreHeader { pub const DELETED_COUNT_SIZE: usize = size_of::(); pub const TOTAL_COUNT_SIZE: usize = size_of::(); pub const PRIMARY_COLUMN_SIZE: usize = size_of::(); - pub const FIXED_SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; + pub const FIXED_SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + + Self::DELETED_COUNT_SIZE + + Self::TOTAL_COUNT_SIZE + + Self::PRIMARY_COLUMN_SIZE; pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0; - pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; + pub const DELETED_COUNT_OFFSET: usize = + Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE; pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE; #[allow(dead_code)] - pub const INDEXED_COLUMNS_OFFSET: usize = Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE; + pub const INDEXED_COLUMNS_OFFSET: usize = + Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE; fn indexed_columns_size(number_of_columns: usize) -> usize { size_of::() * number_of_columns @@ -64,19 +69,29 @@ impl StoreHeader { vec![0; Self::indexed_columns_size(header.number_of_columns)] } - pub async fn decode_fixed(table_folder: &PathBuf, result: &[u8]) -> Result { - let (number_of_columns, _) = - decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; - let (deleted_count, _) = - decode::(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; - let (total_count, _) = - decode::(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?; - let (primary_column, _) = - decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; + pub async fn decode_fixed( + table_folder: &PathBuf, + result: &[u8], + ) -> Result { + let (number_of_columns, _) = decode::( + &result[Self::NUMBER_OF_COLUMNS_OFFSET + ..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE], + ) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; + let (deleted_count, _) = decode::( + &result + [Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE], + ) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; + let (total_count, _) = decode::( + &result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE], + ) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?; + let (primary_column, _) = decode::( + &result[Self::PRIMARY_COLUMN_OFFSET + ..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE], + ) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; let header = StoreHeaderFixedPart { table_folder: table_folder.clone(), number_of_columns, @@ -89,9 +104,8 @@ impl StoreHeader { } pub async fn decode_rest(header: StoreHeaderFixedPart, result: &[u8]) -> Result { - let indexed_columns: Vec = - decode_sequence::(header.number_of_columns, result) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?; + let indexed_columns: Vec = decode_sequence::(header.number_of_columns, result) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?; Ok(StoreHeader { table_folder: header.table_folder.into(), @@ -104,8 +118,6 @@ impl StoreHeader { }) } - - // returns new count pub fn increment_total_count(&mut self) -> usize { self.total_count += 1; diff --git a/storage_engine/src/store.rs b/storage_engine/src/store.rs index bb256ff..1a56a1f 100644 --- a/storage_engine/src/store.rs +++ b/storage_engine/src/store.rs @@ -1,15 +1,14 @@ -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::fs::{File, OpenOptions, DirBuilder}; -use tokio::fs; -use std::path::{Path, PathBuf}; use bincode::{Decode, Encode}; +use std::path::{Path, PathBuf}; +use tokio::fs; +use tokio::fs::{DirBuilder, File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use crate::error::Error; use crate::cursor::{ReadCursor, WriteCursor}; use crate::cursor_capabilities::traversal::CursorCanTraverse; -use crate::segments::store_header::StoreHeader; +use crate::error::Error; use crate::index::Index; - +use crate::segments::store_header::StoreHeader; pub type Result = std::result::Result; @@ -25,7 +24,6 @@ pub struct Store { pub type StoreIndexes = Vec>>; - //===Store=== pub async fn store_exists(table_folder: &str) -> Result { Ok(fs::metadata(table_folder).await.is_ok()) @@ -34,14 +32,18 @@ pub async fn store_exists(table_folder: &str) -> Result { pub const ROWS_FILE_NAME: &'static str = "rows"; pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate"; -impl Store { +impl Store { // ===Creation=== - pub async fn new(path_to_table: &Path, number_of_columns: usize, primary_column: Column) -> Result - where T: Encode + Decode + Ord + pub async fn new( + path_to_table: &Path, + number_of_columns: usize, + primary_column: Column, + ) -> Result + where + T: Encode + Decode + Ord, { let path_to_rows = path_to_table.join(ROWS_FILE_NAME); - DirBuilder::new() - .create(path_to_table).await?; + DirBuilder::new().create(path_to_table).await?; let header = { let mut indexed_columns = vec![false; number_of_columns]; @@ -61,22 +63,24 @@ impl Store { let indexes: StoreIndexes = Self::create_initial_indexes(&header).await?; - let store = Self { - header, - indexes, - }; + let store = Self { header, indexes }; Ok(store) } pub fn path_to_index_file(header: &StoreHeader, column: Column) -> PathBuf { let path_to_table = Path::new(&header.table_folder); - let path_to_index = path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column.to_string())); + let path_to_index = + path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column.to_string())); path_to_index } - pub async fn create_empty_index_at(header: &StoreHeader, column: Column) -> Result> - where T: Encode + Decode + Ord + pub async fn create_empty_index_at( + header: &StoreHeader, + column: Column, + ) -> Result> + where + T: Encode + Decode + Ord, { let path_to_index = Self::path_to_index_file(&header, column); let index = Index::new(path_to_index).await?; @@ -84,35 +88,43 @@ impl Store { Ok(index) } - pub async fn create_initial_indexes(header: &StoreHeader) -> Result> - where T: Encode + Decode + Ord + pub async fn create_initial_indexes(header: &StoreHeader) -> Result> + where + T: Encode + Decode + Ord, { let mut result: StoreIndexes = Vec::with_capacity(header.number_of_columns); for _ in 0..header.number_of_columns { result.push(None) } - result[header.primary_column as usize] = Some(Self::create_empty_index_at(&header, header.primary_column).await?); + result[header.primary_column as usize] = + Some(Self::create_empty_index_at(&header, header.primary_column).await?); Ok(result) } - pub async fn connect_index_at(header: &StoreHeader, column: Column) -> Result> - where T: Encode + Decode + Ord + pub async fn connect_index_at( + header: &StoreHeader, + column: Column, + ) -> Result> + where + T: Encode + Decode + Ord, { let path_to_index = Self::path_to_index_file(&header, column); let index: Index = Index::connect(path_to_index).await?; Ok(index) } - pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result { - let mut file: File = - OpenOptions::new() - .write(true) - .read(true) - .create_new(true) - .open(path_to_rows) - .await?; + pub async fn create_empty_rows_file( + path_to_rows: PathBuf, + header: &StoreHeader, + ) -> Result { + let mut file: File = OpenOptions::new() + .write(true) + .read(true) + .create_new(true) + .open(path_to_rows) + .await?; let encoded_header: Vec = header.encode()?; file.write(&encoded_header).await?; @@ -121,17 +133,17 @@ impl Store { } pub async fn connect(table_folder: &PathBuf) -> Result - where T: std::fmt::Debug + Encode + Decode + Ord + where + T: std::fmt::Debug + Encode + Decode + Ord, { let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); - let mut file: File = - OpenOptions::new() - .read(true) - .write(true) - .open(path_to_rows) - .await?; + let mut file: File = OpenOptions::new() + .read(true) + .write(true) + .open(path_to_rows) + .await?; // Unfortunately we can't yet use store.read_bytes, since it can't be created without the // header. @@ -146,12 +158,13 @@ impl Store { StoreHeader::decode_rest(fixed_header, &rest_bytes).await? }; - let indexes: StoreIndexes = { let mut result = Vec::with_capacity(header.number_of_columns); for (column, &is_indexed) in header.indexed_columns.iter().enumerate() { if is_indexed { - result.push(Some(Self::connect_index_at(&header, column as Column).await?)) + result.push(Some( + Self::connect_index_at(&header, column as Column).await?, + )) } else { result.push(None) } @@ -160,29 +173,29 @@ impl Store { result }; - let store = Self { - header, - indexes - }; + let store = Self { header, indexes }; Ok(store) } // ===Cursors=== pub async fn read_cursor(&self) -> Result> - where T: Send + Sync + where + T: Send + Sync, { ReadCursor::new(self).await } pub async fn write_cursor(&mut self) -> Result> - where T: Send + Sync + where + T: Send + Sync, { WriteCursor::new(self).await } // ===Indexes=== - pub async fn attach_index(&mut self, column: Column) -> Result<()> - where T: Ord + Decode + Encode + Send + Sync + pub async fn attach_index(&mut self, column: Column) -> Result<()> + where + T: Ord + Decode + Encode + Send + Sync, { if self.header.is_column_indexed(column) { Err(Error::ColumnAlreadyIndexed(column)) @@ -195,7 +208,8 @@ impl Store { // For debugging. #[allow(dead_code)] pub async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> - where T: Send + Sync + where + T: Send + Sync, { let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?; let bytes = cursor.read_all_bytes().await?; @@ -206,11 +220,11 @@ impl Store { #[cfg(test)] mod tests { use super::*; - use crate::segments::entry::Entry; - use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex}; + use crate::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex}; use crate::cursor_capabilities::traversal::CursorCanTraverse; + use crate::segments::entry::Entry; - impl Drop for Store { + impl Drop for Store { fn drop(&mut self) { println!("DROPPING TEST FOLDER"); let table_folder = self.header.table_folder.clone(); @@ -219,7 +233,6 @@ mod tests { } } - #[tokio::test] async fn test_create() { type Data = u32; @@ -227,7 +240,9 @@ mod tests { let table_path = Path::new("test_table_0"); let number_of_columns = 5; let primary_column = 0; - let store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + let store: Store = Store::new(table_path, number_of_columns, primary_column) + .await + .unwrap(); assert!(store.header.number_of_columns == number_of_columns); assert!(store.header.total_count == 0); @@ -242,7 +257,9 @@ mod tests { let table_path = Path::new("test_table_1"); let number_of_columns = 5; let primary_column = 0; - let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + let mut store: Store = Store::new(table_path, number_of_columns, primary_column) + .await + .unwrap(); { let mut cursor = store.write_cursor().await.unwrap(); @@ -264,7 +281,9 @@ mod tests { let table_path = Path::new("test_table_2"); let number_of_columns = 5; let primary_column = 0; - let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + let mut store: Store = Store::new(table_path, number_of_columns, primary_column) + .await + .unwrap(); { let mut cursor = store.write_cursor().await.unwrap(); @@ -284,8 +303,8 @@ mod tests { let entry0 = cursor.next().await.unwrap().unwrap(); let entry1 = cursor.next().await.unwrap().unwrap(); - assert!(entry0.data == vec![1,2,3,4,5]); - assert!(entry1.data == vec![6,7,8,9,10]); + assert!(entry0.data == vec![1, 2, 3, 4, 5]); + assert!(entry1.data == vec![6, 7, 8, 9, 10]); } } @@ -296,7 +315,9 @@ mod tests { let table_path = Path::new("test_table_3"); let number_of_columns = 5; let primary_column = 0; - let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + let mut store: Store = Store::new(table_path, number_of_columns, primary_column) + .await + .unwrap(); { let mut cursor = store.write_cursor().await.unwrap(); @@ -319,8 +340,8 @@ mod tests { } assert!(entries.len() == 2); - assert!(entries[0].data == vec![1,2,3,4,5]); - assert!(entries[1].data == vec![6,7,8,9,10]); + assert!(entries[0].data == vec![1, 2, 3, 4, 5]); + assert!(entries[1].data == vec![6, 7, 8, 9, 10]); } } @@ -331,7 +352,9 @@ mod tests { let table_path = Path::new("test_table_4"); let number_of_columns = 5; let primary_column = 0; - let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + let mut store: Store = Store::new(table_path, number_of_columns, primary_column) + .await + .unwrap(); let value = 200; { @@ -356,7 +379,10 @@ mod tests { let mut cursor = store.read_cursor().await.unwrap(); let column = 1; - let entries = cursor.select_entries_where_eq(column, &value).await.unwrap(); + let entries = cursor + .select_entries_where_eq(column, &value) + .await + .unwrap(); assert!(entries.len() == 2); assert!(entries[0].data == vec![1, value, 3, 4, 5]); @@ -371,7 +397,9 @@ mod tests { let table_path = Path::new("test_table_5"); let number_of_columns = 5; let primary_column = 0; - let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + let mut store: Store = Store::new(table_path, number_of_columns, primary_column) + .await + .unwrap(); let column: Column = 1; @@ -402,7 +430,10 @@ mod tests { let mut cursor = store.read_cursor().await.unwrap(); let column = 1; - let entries = cursor.select_entries_where_eq(column, &value).await.unwrap(); + let entries = cursor + .select_entries_where_eq(column, &value) + .await + .unwrap(); assert!(entries.len() == 2); // Order may be non-deterministic. assert!(entries[0].data[column as usize] == value); @@ -417,7 +448,9 @@ mod tests { let table_path = Path::new("test_table_6"); let number_of_columns = 5; let primary_column = 0; - let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + let mut store: Store = Store::new(table_path, number_of_columns, primary_column) + .await + .unwrap(); let value = 200; let (_file_position0, file_position1, _file_position2, _file_position3) = { @@ -436,7 +469,12 @@ mod tests { let file_position3 = cursor.insert_entry(entry3).await.unwrap(); assert!(store.header.total_count == 4); - (file_position0, file_position1, file_position2, file_position3) + ( + file_position0, + file_position1, + file_position2, + file_position3, + ) }; { @@ -454,7 +492,9 @@ mod tests { let table_path = Path::new("test_table_7"); let number_of_columns = 5; let primary_column = 0; - let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + let mut store: Store = Store::new(table_path, number_of_columns, primary_column) + .await + .unwrap(); let column: Column = 1; @@ -480,13 +520,21 @@ mod tests { let file_position3 = cursor.insert_entry(entry3).await.unwrap(); assert!(store.header.total_count == 4); - (file_position0, file_position1, file_position2, file_position3) + ( + file_position0, + file_position1, + file_position2, + file_position3, + ) }; { assert!(store.header.deleted_count == 0); let mut cursor = store.write_cursor().await.unwrap(); - cursor.delete_entries_where_eq(column, &value, false).await.unwrap(); + cursor + .delete_entries_where_eq(column, &value, false) + .await + .unwrap(); assert!(store.header.deleted_count == 2); } } @@ -498,7 +546,9 @@ mod tests { let table_path = Path::new("test_table_8"); let number_of_columns = 5; let primary_column = 0; - let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + let mut store: Store = Store::new(table_path, number_of_columns, primary_column) + .await + .unwrap(); let column: Column = 1; @@ -524,13 +574,21 @@ mod tests { let file_position3 = cursor.insert_entry(entry3).await.unwrap(); assert!(store.header.total_count == 4); - (file_position0, file_position1, file_position2, file_position3) + ( + file_position0, + file_position1, + file_position2, + file_position3, + ) }; { assert!(store.header.deleted_count == 0); let mut cursor = store.write_cursor().await.unwrap(); - cursor.delete_entries_where_eq(column, &value, false).await.unwrap(); + cursor + .delete_entries_where_eq(column, &value, false) + .await + .unwrap(); assert!(cursor.header().deleted_count == 2); assert!(cursor.header().total_count == 4); @@ -539,5 +597,4 @@ mod tests { assert!(cursor.header().total_count == 2); } } - }