diff --git a/Cargo.lock b/Cargo.lock index 14ae99d..d76a63c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -601,6 +601,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "storage_engine" +version = "0.1.0" +dependencies = [ + "async-trait", + "bincode", + "thiserror", + "tokio", +] + [[package]] name = "strsim" version = "0.10.0" diff --git a/Cargo.toml b/Cargo.toml index 57d0219..3e6cda3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,5 +5,6 @@ members = [ "proto", "server", "client", - "parser" + "parser", + "storage_engine" ] diff --git a/storage_engine/Cargo.toml b/storage_engine/Cargo.toml new file mode 100644 index 0000000..6658764 --- /dev/null +++ b/storage_engine/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "storage_engine" +version = "0.1.0" +edition = "2021" + +[dependencies] +bincode = "2.0.0-rc.3" +tokio = { version = "1.34.0", features = ["full"] } +async-trait = "0.1.74" +thiserror = "1.0.50" diff --git a/storage_engine/src/binary_coding.rs b/storage_engine/src/binary_coding.rs new file mode 100644 index 0000000..1b2a475 --- /dev/null +++ b/storage_engine/src/binary_coding.rs @@ -0,0 +1,87 @@ +use bincode; +use bincode::{Decode, Encode}; +use bincode::config::{BigEndian, Configuration, Fixint}; +use std::mem::size_of; + +const BIN_CONFIG: Configuration = bincode::config::standard().with_big_endian().with_fixed_int_encoding(); + +pub fn encode(t: &T) -> Result, bincode::error::EncodeError> { + bincode::encode_to_vec(t, BIN_CONFIG) +} + +pub fn decode(bytes: &[u8]) -> Result<(T, usize), bincode::error::DecodeError> { + bincode::decode_from_slice(bytes, BIN_CONFIG) +} + +pub fn encode_vector(ts: &[T]) -> Result, bincode::error::EncodeError> { + let size: usize = ts.len(); + let mut result = encode(&size)?; + for t in ts { + result.append(&mut encode(&t)?); + } + Ok(result) +} + +pub fn decode_vector(bytes: &[u8]) -> Result, bincode::error::DecodeError> { + let mut offset = size_of::(); + let result_len: usize = decode(&bytes[..offset])?.0; + let mut result: Vec = Vec::with_capacity(result_len); + for _ in 0..result_len { + let (x, bytes_consumed) = decode::(&bytes[offset..])?; + offset += bytes_consumed; + result.push(x); + } + Ok(result) +} + +// We don't care about encoding the length here (since it will be used for a row with known column +// size) +pub fn encode_sequence(ts: &[T]) -> Result, bincode::error::EncodeError> { + let mut result = vec![]; + for t in ts { + result.append(&mut encode(&t)?); + + } + Ok(result) +} + +pub fn encode_sequence_with_sizes(ts: &[T]) -> Result<(Vec, Vec), bincode::error::EncodeError> { + let mut result_bytes = vec![]; + let mut sizes = Vec::with_capacity(ts.len()); + for t in ts { + let mut bytes = encode(&t)?; + sizes.push(bytes.len()); + result_bytes.append(&mut bytes); + + } + Ok((result_bytes, sizes)) +} + +pub fn decode_sequence(len: usize, bytes: &[u8]) -> Result, bincode::error::DecodeError> { + let mut result: Vec = Vec::with_capacity(len); + let mut offset = 0; + for _ in 0..len { + let (x, bytes_consumed) = decode::(&bytes[offset..])?; + offset += bytes_consumed; + result.push(x); + } + Ok(result) +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_encoding_decoding() { + let xs: Vec = vec!["foo".to_string(), "bar".to_string()]; + + let exs = encode_vector(&xs[..]).unwrap(); + + // WARNING: Don't forget to specify the type here + let dxs = decode_vector::(&exs[..]).unwrap(); + + assert!(dxs == xs); + } +} diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs new file mode 100644 index 0000000..3f68560 --- /dev/null +++ b/storage_engine/src/cursor.rs @@ -0,0 +1,394 @@ +use tokio::fs::{File, OpenOptions}; +use tokio::fs; +use std::path::Path; +use std::marker::PhantomData; +use std::collections::{BTreeMap, HashSet}; + +use bincode; +use bincode::{Decode, Encode}; + +use crate::segments::entry::EntryDetailed; +use crate::segments::entry_header::EntryHeader; +use crate::segments::store_header::StoreHeader; +use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; +use crate::index::Index; +use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; +use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader}; +use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex}; + +const GARBAGE_COLLECTION_TRIGGER: usize = 100; + +// ===Concrete Cursors=== +pub struct ReadCursor<'a, T> { + header: StoreHeader, + indexes: &'a [Option>], + file: File, + + eof_file_position: FilePosition, +} + +pub struct WriteCursor<'a, T> { + header: &'a mut StoreHeader, + indexes: &'a mut [Option>], + file: File, + + eof_file_position: FilePosition, +} + +// This is used as a cursor to temporary file during Garbage Collection +pub struct AppendOnlyCursor { + header: StoreHeader, + file: File, + data_type: PhantomData, + + eof_file_position: FilePosition, +} + + +// ===========Implementations============= +// ===primitive capabilities=== +impl CursorCanRead for ReadCursor<'_, T> { + fn file(&mut self) -> &mut File { + &mut self.file + } + + fn eof_file_position(&self) -> FilePosition { + self.eof_file_position + } +} + +impl CursorCanRead for WriteCursor<'_, T> { + fn file(&mut self) -> &mut File { + &mut self.file + } + + fn eof_file_position(&self) -> FilePosition { + self.eof_file_position + } +} + +impl CursorCanRead for AppendOnlyCursor { + fn file(&mut self) -> &mut File { + &mut self.file + } + + fn eof_file_position(&self) -> FilePosition { + self.eof_file_position + } +} + +impl CursorCanWrite for WriteCursor<'_, T> {} +impl CursorCanWrite for AppendOnlyCursor {} + + +// ===capability to access header=== +impl CursorCanReadHeader for ReadCursor<'_, T> { + fn header(&self) -> &StoreHeader { &self.header } +} + +impl CursorCanReadHeader for WriteCursor<'_, T> { + fn header(&self) -> &StoreHeader { &self.header } +} + +impl CursorCanReadHeader for AppendOnlyCursor { + fn header(&self) -> &StoreHeader { &self.header } +} + +impl CursorCanWriteHeader for WriteCursor<'_, T> { + fn header_mut(&mut self) -> &mut StoreHeader { self.header } + fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } +} + +impl CursorCanWriteHeader for AppendOnlyCursor { + fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header } + fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } +} + +// ===capability to access index=== +impl CursorCanReadIndex for ReadCursor<'_, T> { + fn indexes(&mut self) -> &[Option>] { &self.indexes } +} + +impl CursorCanReadIndex for WriteCursor<'_, T> { + fn indexes(&mut self) -> &[Option>] { &self.indexes } +} + +impl CursorCanWriteToIndex for WriteCursor<'_, T> { + fn indexes_mut(&mut self) -> &mut [Option>] { self.indexes } +} + + +// ===Specifics=== +impl <'cursor, T> ReadCursor<'cursor, T> { + pub async fn new<'store: 'cursor>(store: &'store Store) -> Result + where T: Send + Sync + { + let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); + let file: File = + OpenOptions::new() + .read(true) + .open(path_to_rows) + .await?; + + let mut cursor = Self { + header: store.header.clone(), + file, + indexes: &store.indexes, + + eof_file_position: 0, + }; + let eof_file_position: FilePosition = cursor.seek_to_end().await?; + cursor.eof_file_position = eof_file_position; + + cursor.seek_to_start_of_data().await?; + + Ok(cursor) + } +} + +impl <'cursor, T> WriteCursor<'cursor, T> +{ + // 'store lives atleast as long as 'cursor + pub async fn new<'store: 'cursor>(store: &'store mut Store) -> Result + where T: Send + { + let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); + let file: File = + OpenOptions::new() + .read(true) + .write(true) + .open(path_to_rows) + .await?; + + let mut cursor = Self { + header: &mut store.header, + file, + indexes: &mut store.indexes, + + eof_file_position: 0, + }; + let eof_file_position: FilePosition = cursor.seek_to_end().await?; + cursor.eof_file_position = eof_file_position; + + cursor.seek_to_start_of_data().await?; + + Ok(cursor) + } + + // ===Entry Header Manipulation=== + // assumes we are at the start of valid entry. + async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> + where T: Send + { + let bytes: Vec = entry_header.encode()?; + self.write_bytes(&bytes).await?; + Ok(()) + } + + // ===Deletion=== + pub async fn mark_deleted_at(&mut self, file_position: FilePosition, enable_garbage_collector: bool) -> Result<()> + where T: Encode + Decode + Ord + Send + Sync + Clone + Ord + { + self.seek_to(file_position).await?; + let mut entry_header = self.read_entry_header().await?; + if entry_header.is_deleted { + Ok(()) + } else { + // Update store and entry headers + self.increment_deleted_count().await?; + self.seek_to(file_position).await?; + + entry_header.is_deleted = true; + self.set_new_entry_header(entry_header.into()).await?; + + // Update index + self.seek_to(file_position).await?; + match self.next().await? { + Some(entry) => { + self.delete_entry_values_from_indexes(entry).await? + }, + None => { + // SAFETY: We just modified its header, so it must exist. + unreachable!() + } + } + + if enable_garbage_collector { + self.attempt_garbage_collection_if_necessary().await?; + } + Ok(()) + } + } + + async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T, enable_garbage_collector: bool) -> Result>> + where T: Encode + Decode + Ord + Send + Sync + Clone + { + let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?; + if let Some(entry) = maybe_entry { + self.mark_deleted_at(entry.file_position, enable_garbage_collector).await?; + Ok(Some(entry)) + } else { + Ok(maybe_entry) + } + } + + // Doesn't update indexes. + async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result + where T: Encode + Decode + Ord + Send + Sync + Clone + { + let mut count = 0; + while let Some(_) = self.find_first_eq_bruteforce_and_delete(column, t0, false).await? { + count += 1; + } + Ok(count) + } + + pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result + where T: Encode + Decode + Ord + Send + Sync + Clone + { + let count = + if self.header().is_column_indexed(column) { + let entries = self.index_lookup(column, value).await?; + let count = entries.len(); + for entry in entries { + self.mark_deleted_at(entry.file_position, false).await? + } + count + } else { + let count = self.find_all_eq_bruteforce_and_delete(column, value).await?; + count + }; + if enable_garbage_collector { + self.attempt_garbage_collection_if_necessary().await?; + } + Ok(count) + } + + // ===Indexing=== + // WARNING: Assumes the column is NOT indexable. + pub async fn attach_index(&mut self, column: Column) -> Result<()> + where T: Ord + Decode + Encode + Send + Sync + { + // New Index + let index = Store::create_empty_index_at(&self.header, column).await?; + self.indexes[column as usize] = Some(index); + + // Mark column as indexed + self.header.make_column_indexed(column); + self.set_header(&self.header.clone()).await?; + + // Build index + self.seek_to_start_of_data().await?; + while let Some((_, file_position, value)) = self.next_alive_at_column(column).await? { + self.insert_into_index(column, value, file_position).await? + } + + Ok(()) + } + + // ===Garbage Collection=== + async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> + where T: Send + Sync + Decode + Encode + Clone + Ord + { + if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER { + println!("=======START GARBAGE COLLECTOR===="); + self.initiate_garbage_collection().await?; + println!("=======GARBAGE COLLECTOR FINISHED===="); + } + Ok(()) + } + + pub async fn initiate_garbage_collection(&mut self) -> Result + where T: Send + Sync + Decode + Encode + Clone + Ord + { + let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?; + // Since garbage collection changes FilePositions of live entries, we need to update the + // indexes too. + + let mut in_memory_indexes: Vec>>> = Vec::with_capacity(self.header.number_of_columns); + for column in 0..self.header.number_of_columns { + if self.header.is_column_indexed(column as Column) { + let in_memory_index = BTreeMap::new(); + in_memory_indexes.push(Some(in_memory_index)) + } else { + in_memory_indexes.push(None) + } + } + + // We'll dump all alive entries into a new file. + let mut entries_deleted = 0; + self.seek_to_start_of_data().await?; + { + while let Some(live_entry) = self.next_alive().await? { + entries_deleted += 1; + let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?; + + // Update index. (Wouldn't it be nice if we had `for let ...`?) + for (maybe_in_memory_index, value) in in_memory_indexes.iter_mut().zip(&live_entry.data) { + if let Some(in_memory_index) = maybe_in_memory_index { + in_memory_index.entry(value.clone()).or_insert_with(HashSet::new).insert(file_position); + } + } + } + } + + // ===swap=== + // swapping indexes + // Update indexes on disk. + for (column, maybe_in_memory_index) in in_memory_indexes.into_iter().enumerate() { + if let Some(in_memory_index) = maybe_in_memory_index { + let index = self.mut_index_at(column as Column); + index.reset(in_memory_index).await?; + } + } + + // swapping headers + self.header.deleted_count = 0; + self.header.total_count = cursor_to_intermediate.header.total_count; + + self.file = cursor_to_intermediate.file; + self.eof_file_position = cursor_to_intermediate.eof_file_position; + + // swap files on disk + // current file + let path_to_table = Path::new(&self.header.table_folder); + let path_to_rows = path_to_table.join(ROWS_FILE_NAME); + let path_to_intermediate_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); + fs::remove_file(path_to_rows.clone()).await?; + fs::rename(path_to_intermediate_rows, path_to_rows).await?; + + Ok(entries_deleted) + } + + async fn spawn_cursor_to_intermediate_file(&self) -> Result> + where T: Send + { + let table_folder = self.header.table_folder.to_string(); + let path_to_table = Path::new(&table_folder); + let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); + + let intermediate_file: File = Store::::create_empty_rows_file(path_to_rows, &self.header).await?; + + let intermediate_header: StoreHeader = StoreHeader { + table_folder, + number_of_columns: self.header.number_of_columns, + deleted_count: 0, + total_count: 0, + primary_column: self.header.primary_column, + indexed_columns: self.header.indexed_columns.clone(), + }; + + // Creates a new (append) cursor to the intermediate file in which we'll dump the live entries. + let mut cursor_to_intermediate = AppendOnlyCursor { + header: intermediate_header, + file: intermediate_file, + data_type: PhantomData::, + + eof_file_position: 0, + }; + let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?; + cursor_to_intermediate.eof_file_position = eof_file_position; + + Ok(cursor_to_intermediate) + } +} diff --git a/storage_engine/src/cursor_capabilities/header_access.rs b/storage_engine/src/cursor_capabilities/header_access.rs new file mode 100644 index 0000000..57ccabb --- /dev/null +++ b/storage_engine/src/cursor_capabilities/header_access.rs @@ -0,0 +1,230 @@ +use tokio::io::AsyncReadExt; +use async_trait::async_trait; + +use bincode; +use bincode::{Decode, Encode}; +use crate::binary_coding::{encode, decode}; + +use crate::error::{Error, DecodeErrorKind}; +use crate::segments::entry::{Entry, EntryDetailed}; +use crate::segments::entry_header::EntryHeaderWithDataSize; +use crate::segments::store_header::StoreHeader; +use crate::storage_engine::{FilePosition, Column, Result}; +use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; + +#[async_trait] +pub trait CursorCanReadHeader: CursorCanRead { + fn header(&self) -> &StoreHeader; + + async fn seek_to_start_of_data(&mut self) -> Result { + self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await + } + + async fn read_entry_header(&mut self) -> Result { + let number_of_columns: usize = self.header().number_of_columns; + let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; + self.read_bytes(&mut header_bytes).await?; + let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?; + + Ok(header) + } + + async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { + self.seek_to(file_position).await?; + self.read_entry_header().await + } + + // Returns None when file_position == eof_file_position + async fn read_entry_at(&mut self, file_position: FilePosition) -> Result>> + where T: Decode + { + self.seek_to(file_position).await?; + self.next().await + } + + // ===Iteration=== + // The following functions assume that the current file position is at a valid entry or EOF. + + + // WARNING: This moves the file_position to start of the data, so you can't just call + // next_entry_header() a bunch of times. You must move the cursor! + async fn next_entry_header(&mut self) -> Result> { + if self.is_at_eof().await? { + return Ok(None) + } + + let entry_header = self.read_entry_header().await?; + + Ok(Some(entry_header)) + } + + // This is meant to be used after next_entry_header() is called. + async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result{ + let file_position = self.seek_by(entry_header.size_of_data() as i64).await?; + Ok(file_position) + } + + async fn next(&mut self) -> Result>> + where T: Decode + { + let file_position = self.current_file_position().await?; + let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; + + let mut data_bytes: Vec = vec![0; entry_header.size_of_data()]; + self.read_bytes(&mut data_bytes).await?; + let entry: EntryDetailed = + EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?; + + Ok(Some(entry)) + } + + // Like next, but only reads the column, not the whole entry. + async fn next_at_column(&mut self, column: Column) -> Result> + where T: Decode + Send + { + let file_position = self.current_file_position().await?; + let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; + let file_position_at_start_of_data = self.current_file_position().await?; + + // figuring out how much to decode + let column_offset = entry_header.offset_of_column(column); + self.seek_by(column_offset as i64).await?; + + // reading and decoding + let mut bytes: Vec = vec![0; entry_header.data_sizes[column as usize]]; + self.read_bytes(&mut bytes).await?; + let (value, _) = + decode::(&bytes[..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; + + // jumping to next entry + self.seek_to(file_position_at_start_of_data).await?; + self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?; + + Ok(Some((entry_header, file_position, value))) + } + + async fn next_alive_at_column(&mut self, column: Column) -> Result> + where T: Decode + Send + { + while let Some((header, file_position, t)) = self.next_at_column(column).await? { + if !header.is_deleted { + return Ok(Some((header, file_position, t))) + } + } + Ok(None) + } + + async fn next_alive(&mut self) -> Result>> + where T: Decode + { + while let Some(entry) = self.next().await? { + if !entry.header.is_deleted { + return Ok(Some(entry)) + } + } + Ok(None) + } + + // ===Search=== + async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> + where T: Decode + PartialEq + Send + Sync + { + let mut file_position = self.current_file_position().await?; + while let Some((_, _, t)) = self.next_alive_at_column(column).await? { + if &t == t0 { + // go back and decode the whole entry + self.seek_to(file_position).await?; + return self.next().await + } else { + file_position = self.current_file_position().await?; + } + } + Ok(None) + } + + async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> + where T: Decode + PartialEq + Send + Sync + { + let mut entries = vec![]; + while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? { + entries.push(entry) + } + Ok(entries) + } + + // ===Debugging=== + async fn read_entries(&mut self) -> Result<()> + where T: Decode + std::fmt::Debug + { + self.seek_to_start_of_data().await?; + while let Some(entry) = self.next().await? { + println!("{:?}", entry); + } + println!("END of entries."); + Ok(()) + } + + async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> { + let mut bytes: Vec = vec![]; + self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?; + self.file().read_to_end(&mut bytes).await?; + Ok(bytes) + } +} + +#[async_trait] +pub trait CursorCanWriteHeader: CursorCanReadHeader + CursorCanWrite { + fn header_mut(&mut self) -> &mut StoreHeader; + fn set_eof_file_position(&mut self, new_file_position: FilePosition); + + // ===Store Header Manipulation=== + async fn increment_total_count(&mut self) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; + let new_count = self.header_mut().increment_total_count(); + self.write_bytes(&encode::(&new_count)?).await?; + Ok(()) + } + + async fn increment_deleted_count(&mut self) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; + let new_count = self.header_mut().increment_deleted_count(); + self.write_bytes(&encode::(&new_count)?).await?; + Ok(()) + } + + async fn set_header(&mut self, header: &StoreHeader) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + let encoded_header: Vec = header.encode()?; + self.write_bytes(&encoded_header).await?; + + Ok(()) + } + + // ===Append Entry=== + + // Moves cursor to the end. + // Returns file position to the start of the new entry. + async fn append_entry_no_indexing(&mut self, entry: &Entry) -> Result + where T: Encode + Send + Sync + { + self.increment_total_count().await?; + + let encoded_entry: Vec = entry.encode()?; + let file_position = self.seek_to_end().await?; + self.write_bytes(&encoded_entry).await?; + + let eof_file_position: FilePosition = self.current_file_position().await?; + self.set_eof_file_position(eof_file_position); + + Ok(file_position) + } +} diff --git a/storage_engine/src/cursor_capabilities/index_access.rs b/storage_engine/src/cursor_capabilities/index_access.rs new file mode 100644 index 0000000..d9d7fc3 --- /dev/null +++ b/storage_engine/src/cursor_capabilities/index_access.rs @@ -0,0 +1,115 @@ +use std::collections::HashSet; + +use async_trait::async_trait; + +use bincode; +use bincode::{Decode, Encode}; + +use crate::error::Error; +use crate::segments::entry::{Entry, EntryDetailed}; +use crate::storage_engine::{FilePosition, Column, Result}; +use crate::index::Index; +use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader}; + +#[async_trait] +pub trait CursorCanReadIndex: CursorCanReadHeader { + fn indexes(&mut self) -> &[Option>]; + + async fn index_lookup(&mut self, column: Column, value: &T) -> Result>> + where T: Encode + Decode + Ord + Send + Sync + { + match &self.indexes()[column as usize] { + Some(index) => { + let file_positions = index.lookup(value).await?.unwrap_or_else(|| HashSet::new()); + let mut entries: Vec> = vec![]; + for &file_position in file_positions.iter() { + match self.read_entry_at(file_position).await? { + Some(entry) => { + entries.push(entry) + }, + None => { + return Err(Error::IndexIsStoringEofFilePosition(column)) + } + } + } + + Ok(entries) + }, + None => + Err(Error::AttemptToIndexNonIndexableColumn(column)) + } + } + + async fn select_entries_where_eq(&mut self, column: Column, value: &T) -> Result>> + where T: Encode + Decode + Ord + Send + Sync + { + if self.header().is_column_indexed(column) { + self.index_lookup(column, value).await + } else { + self.find_all_eq_bruteforce(column, value).await + } + } +} + +#[async_trait] +pub trait CursorCanWriteToIndex: CursorCanReadIndex + CursorCanWriteHeader { + fn indexes_mut(&mut self) -> &mut [Option>]; + + // Assumes that the column is indexable. + fn mut_index_at(&mut self, column: Column) -> &mut Index { + match &mut self.indexes_mut()[column as usize] { + Some(index) => { + index + }, + None => { + unreachable!() + } + } + } + + // Assumes that the column is indexable. + async fn insert_into_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> + where T: Encode + Decode + Ord + Send + Sync + 'async_trait + { + let index = self.mut_index_at(column as Column); + index.insert(value, file_position).await?; + Ok(()) + } + + // Assumes that the column is indexable. + async fn delete_from_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> + where T: Encode + Decode + Ord + Send + Sync + 'async_trait + { + let index = self.mut_index_at(column as Column); + index.delete(value, file_position).await?; + Ok(()) + } + + async fn insert_entry(&mut self, entry: Entry) -> Result + where T: Encode + Decode + Ord + Send + Sync + 'async_trait + { + let file_position = self.append_entry_no_indexing(&entry).await?; + + // insert the indexable values of the entry into corresponding indexes. + for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { + if should_index { + // SAFETY: If should_index is true, then the column is indexable. + self.insert_into_index(column as Column, value, file_position).await? + } + } + + Ok(file_position) + } + + async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed) -> Result<()> + where T: Encode + Decode + Ord + Send + Sync + 'async_trait + { + for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { + if should_index { + // SAFETY: If should_index is true, then the column is indexable. + self.delete_from_index(column as Column, value, entry.file_position).await? + } + } + Ok(()) + } +} diff --git a/storage_engine/src/cursor_capabilities/mod.rs b/storage_engine/src/cursor_capabilities/mod.rs new file mode 100644 index 0000000..6d301cb --- /dev/null +++ b/storage_engine/src/cursor_capabilities/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod primitive; +pub(crate) mod header_access; +pub(crate) mod index_access; diff --git a/storage_engine/src/cursor_capabilities/primitive.rs b/storage_engine/src/cursor_capabilities/primitive.rs new file mode 100644 index 0000000..d11b79b --- /dev/null +++ b/storage_engine/src/cursor_capabilities/primitive.rs @@ -0,0 +1,62 @@ +use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; +use tokio::fs::File; +use async_trait::async_trait; + +use crate::storage_engine::{FilePosition, Result}; + +#[async_trait] +pub(crate) trait CursorCanRead { + fn file(&mut self) -> &mut File; + fn eof_file_position(&self) -> FilePosition; + + async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> { + self.file().read_exact(bytes).await?; + Ok(()) + } + + async fn get_bytes(&mut self, count: usize) -> Result> { + let mut result: Vec = Vec::with_capacity(count); + self.read_bytes(&mut result).await?; + Ok(result) + } + + async fn seek_to(&mut self, file_position: FilePosition) -> Result { + let file_position = self.file().seek(SeekFrom::Start(file_position)).await?; + Ok(file_position) + } + + // Start of the file i.e. the Header, not the entries. + async fn seek_to_start(&mut self) -> Result { + let file_position = self.file().seek(SeekFrom::Start(0)).await?; + Ok(file_position) + } + + async fn seek_to_end(&mut self) -> Result { + let file_position = self.file().seek(SeekFrom::End(0)).await?; + Ok(file_position) + } + + // Seeks from current position by offset and returns new file position + async fn seek_by(&mut self, offset: i64) -> Result { + let file_position = self.file().seek(SeekFrom::Current(offset)).await?; + Ok(file_position) + } + + async fn current_file_position(&mut self) -> Result { + let next_file_position: FilePosition = self.file().stream_position().await?; + Ok(next_file_position) + } + + async fn is_at_eof(&mut self) -> Result { + let current_file_position = self.current_file_position().await?; + let eof_file_position = self.eof_file_position(); + Ok(current_file_position == eof_file_position) + } +} + +#[async_trait] +pub(crate) trait CursorCanWrite: CursorCanRead { + async fn write_bytes(&mut self, bytes: &[u8]) -> Result { + Ok(self.file().write(bytes).await?) + } +} diff --git a/storage_engine/src/error.rs b/storage_engine/src/error.rs new file mode 100644 index 0000000..47bf5cc --- /dev/null +++ b/storage_engine/src/error.rs @@ -0,0 +1,50 @@ +use crate::storage_engine::Column; + +#[derive(Debug)] +pub enum Error { + DecodeError(DecodeErrorKind, bincode::error::DecodeError), + EncodeError(bincode::error::EncodeError), + AttemptToIndexNonIndexableColumn(Column), + IndexIsStoringEofFilePosition(Column), + ColumnAlreadyIndexed(Column), + IoError(std::io::Error), +} + +#[derive(Debug)] +pub enum DecodeErrorKind { + StoreHeaderNumberOfColumns, + StoreHeaderDeletedCount, + StoreHeaderTotalCount, + StoreHeaderPrimaryColumn, + StoreHeaderIndexedColumns, + EntryData, + EntryIsDeleted, + EntryHeaderWithDataSizes, + CorruptedData, +} + +// ===Errors=== +impl Error { + pub fn to_io_or_panic(self) -> std::io::Error { + use Error::*; + match self { + IoError(err) => err, + err => { + println!("{:?}", err); + panic!(); + } + } + } +} + +impl From for Error { + fn from(err: bincode::error::EncodeError) -> Self { + Self::EncodeError(err) + } +} + +impl From for Error { + fn from(err: std::io::Error) -> Self { + Self::IoError(err) + } +} diff --git a/storage_engine/src/index.rs b/storage_engine/src/index.rs new file mode 100644 index 0000000..6f69b76 --- /dev/null +++ b/storage_engine/src/index.rs @@ -0,0 +1,290 @@ +use std::path::PathBuf; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter}; + +use std::collections::{BTreeMap, HashSet}; +use std::hash::Hash; +use std::io::SeekFrom; + +use crate::binary_coding::{decode, encode}; +use bincode; +use bincode::{Decode, Encode}; + +use crate::error::{DecodeErrorKind, Error}; + +type Result = std::result::Result; + +#[derive(Debug)] +pub struct Index { + file: File, + data: BTreeMap>, +} + +#[derive(Debug)] +pub struct IndexHeader {} + +impl Index +where + K: Encode + Decode + Ord, + V: Encode + Decode + Clone + Eq + Hash, +{ + pub async fn new(file_name: PathBuf) -> Result> { + let file: File = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(file_name) + .await?; + + let data = BTreeMap::new(); + + Ok(Index { file, data }) + } + + pub async fn connect(file_name: PathBuf) -> Result> { + let file: File = OpenOptions::new() + .read(true) + .write(true) + .open(file_name) + .await?; + + let mut index = Index { + file, + data: BTreeMap::new(), + }; + + index.load_from_file().await?; + Ok(index) + } + + pub async fn insert(&mut self, k: K, v: V) -> Result<()> { + self.append_to_file(&k, &v).await?; + self.data.entry(k).or_insert_with(HashSet::new).insert(v); + Ok(()) + } + + pub fn insert_desynced(&mut self, k: K, v: V) -> () { + self.data.entry(k).or_insert_with(HashSet::new).insert(v); + } + + pub async fn lookup(&self, k: &K) -> Result>> { + let hashset = self.data.get(k).cloned(); + Ok(hashset) + } + + pub async fn delete(&mut self, k: K, v: V) -> Result<()> { + self.data.entry(k).and_modify(|values| { + values.remove(&v); + }); + self.sync_to_disk().await + } + + pub async fn sync_to_disk(&mut self) -> Result<()> { + let mut writer = BufWriter::new(&mut self.file); + writer.seek(SeekFrom::Start(0)).await?; + + let mut written: u64 = 0; + let mut encoded = Vec::new(); + for (key, value) in &self.data { + for v in value { + encoded.clear(); + encoded.extend(encode(key)?); + encoded.extend(encode(v)?); + writer.write(&encoded).await?; + written += encoded.len() as u64; + } + } + + writer.flush().await?; + self.file.set_len(written).await?; + Ok(()) + } + + pub async fn reset(&mut self, data: BTreeMap>) -> Result<()> { + self.data = data; + self.sync_to_disk().await + } + + async fn append_to_file(&mut self, key: &K, value: &V) -> Result<()> { + let mut encoded = Vec::new(); + encoded.extend(encode(key)?); + encoded.extend(encode(value)?); + + self.file.seek(SeekFrom::End(0)).await?; + self.file.write(&encoded).await?; + + Ok(()) + } + + async fn load_from_file(&mut self) -> Result<()> { + let mut bytes = vec![]; + + self.file.seek(SeekFrom::Start(0)).await?; + self.file.read_to_end(&mut bytes).await?; + + let mut cursor = 0; + while cursor < bytes.len() { + let (key, len) = decode(&bytes[cursor..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?; + cursor += len; + let (value, len) = decode(&bytes[cursor..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?; + cursor += len; + + self.insert_desynced(key, value); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::fs::remove_file; + + #[tokio::test] + async fn connect_to_new() { + let file_name = PathBuf::from("connect_to_new"); + if file_name.exists() { + remove_file(&file_name).await.unwrap(); + } + + { + let index = Index::::new(file_name.clone()).await.unwrap(); + assert_eq!(index.data.len(), 0); + } + + { + let index = Index::::connect(file_name.clone()).await.unwrap(); + assert_eq!(index.data.len(), 0); + } + + remove_file(&file_name).await.unwrap(); + } + + #[tokio::test] + async fn inserting() { + let file_name = PathBuf::from("inserting"); + if file_name.exists() { + remove_file(&file_name).await.unwrap(); + } + + { + let mut index = Index::::new(file_name.clone()).await.unwrap(); + index.insert(1, 2).await.unwrap(); + index.insert(1, 3).await.unwrap(); + index.insert(1, 4).await.unwrap(); + index.insert(2, 3).await.unwrap(); + index.insert(2, 4).await.unwrap(); + index.insert(2, 5).await.unwrap(); + + assert_eq!(index.data.len(), 2); + assert_eq!(index.data.get(&1).unwrap().len(), 3); + assert_eq!(index.data.get(&2).unwrap().len(), 3); + } + + { + let index = Index::::connect(file_name.clone()).await.unwrap(); + assert_eq!(index.data.len(), 2); + assert_eq!(index.data.get(&1).unwrap().len(), 3); + assert_eq!(index.data.get(&2).unwrap().len(), 3); + } + + remove_file(&file_name).await.unwrap(); + } + + #[tokio::test] + async fn lookuping() { + let file_name = PathBuf::from("lookuping"); + if file_name.exists() { + remove_file(&file_name).await.unwrap(); + } + + { + let mut index = Index::::new(file_name.clone()).await.unwrap(); + index.insert(1, 2).await.unwrap(); + index.insert(1, 3).await.unwrap(); + index.insert(1, 4).await.unwrap(); + index.insert(2, 3).await.unwrap(); + index.insert(2, 4).await.unwrap(); + index.insert(2, 5).await.unwrap(); + + assert_eq!(index.lookup(&1).await.unwrap().unwrap().len(), 3); + assert_eq!(index.lookup(&2).await.unwrap().unwrap().len(), 3); + assert_eq!(index.lookup(&3).await.unwrap(), None); + + let first = index.lookup(&1).await.unwrap().unwrap(); + assert!(first.contains(&2)); + assert!(first.contains(&3)); + assert!(first.contains(&4)); + + let second = index.lookup(&2).await.unwrap().unwrap(); + assert!(second.contains(&3)); + assert!(second.contains(&4)); + assert!(second.contains(&5)); + } + + { + let index = Index::::connect(file_name.clone()).await.unwrap(); + assert_eq!(index.lookup(&1).await.unwrap().unwrap().len(), 3); + assert_eq!(index.lookup(&2).await.unwrap().unwrap().len(), 3); + assert_eq!(index.lookup(&3).await.unwrap(), None); + + let first = index.lookup(&1).await.unwrap().unwrap(); + assert!(first.contains(&2)); + assert!(first.contains(&3)); + assert!(first.contains(&4)); + + let second = index.lookup(&2).await.unwrap().unwrap(); + assert!(second.contains(&3)); + assert!(second.contains(&4)); + assert!(second.contains(&5)); + } + + remove_file(&file_name).await.unwrap(); + } + + #[tokio::test] + async fn deleting() { + let file_name = PathBuf::from("deleting"); + if file_name.exists() { + remove_file(&file_name).await.unwrap(); + } + + { + let mut index = Index::::new(file_name.clone()).await.unwrap(); + index.insert(1, 2).await.unwrap(); + index.insert(1, 3).await.unwrap(); + index.insert(1, 4).await.unwrap(); + index.insert(2, 3).await.unwrap(); + index.insert(2, 4).await.unwrap(); + index.insert(2, 5).await.unwrap(); + + assert!(index.lookup(&1).await.unwrap().unwrap().contains(&2)); + index.delete(1, 2).await.unwrap(); + assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&2)); + + assert!(index.lookup(&2).await.unwrap().unwrap().contains(&3)); + index.delete(2, 3).await.unwrap(); + assert!(!index.lookup(&2).await.unwrap().unwrap().contains(&3)); + } + + { + let mut index = Index::::connect(file_name.clone()).await.unwrap(); + + assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&2)); + assert!(!index.lookup(&2).await.unwrap().unwrap().contains(&3)); + + assert!(index.lookup(&1).await.unwrap().unwrap().contains(&3)); + index.delete(1, 3).await.unwrap(); + assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&3)); + + assert!(index.lookup(&1).await.unwrap().unwrap().contains(&4)); + assert!(index.lookup(&2).await.unwrap().unwrap().contains(&4)); + assert!(index.lookup(&2).await.unwrap().unwrap().contains(&5)); + } + + remove_file(&file_name).await.unwrap(); + } +} diff --git a/storage_engine/src/lib.rs b/storage_engine/src/lib.rs new file mode 100644 index 0000000..e7920dd --- /dev/null +++ b/storage_engine/src/lib.rs @@ -0,0 +1,7 @@ +pub mod storage_engine; +mod binary_coding; +mod error; +mod index; +mod cursor; +mod segments; +mod cursor_capabilities; diff --git a/storage_engine/src/segments/entry.rs b/storage_engine/src/segments/entry.rs new file mode 100644 index 0000000..b42c490 --- /dev/null +++ b/storage_engine/src/segments/entry.rs @@ -0,0 +1,60 @@ +use bincode::{Decode, Encode}; + +use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence}; +use crate::storage_engine::{Result, FilePosition}; +use crate::error::{Error, DecodeErrorKind}; +use crate::segments::entry_header::{EntryHeader, EntryHeaderWithDataSize}; + +#[derive(Debug)] +pub struct Entry { + pub header: EntryHeader, + pub data: Vec, +} + +#[derive(Debug)] +pub struct EntryDetailed { + pub header: EntryHeaderWithDataSize, + pub file_position: FilePosition, + pub data: Vec, +} + +impl Entry { + pub fn new(data: Vec) -> Self { + Self { header: EntryHeader { is_deleted: false }, data } + } + + pub fn new_deleted(data: Vec) -> Self { + Self { header: EntryHeader { is_deleted: true}, data } + } + + // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] + pub fn encode(&self) -> Result> + where T: Encode + { + let mut result: Vec = self.header.encode()?; + + let (mut encoded_data, sizes) = encode_sequence_with_sizes(&self.data[..])?; + result.append(&mut encode_sequence(&sizes)?); // sizes of data (fixed by number of columns) + result.append(&mut encoded_data); // data variable size + Ok(result) + } +} + +impl EntryDetailed { + pub fn decode(header: EntryHeaderWithDataSize, file_position: FilePosition, number_of_columns: usize, bytes: &[u8]) -> Result + where T: Decode + { + let data = decode_sequence::(number_of_columns, bytes) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; + Ok(EntryDetailed { header, file_position, data }) + } + + pub fn forget(&self) -> Entry + where T: Clone + { + Entry { + header: self.header.clone().into(), + data: self.data.clone(), + } + } +} diff --git a/storage_engine/src/segments/entry_header.rs b/storage_engine/src/segments/entry_header.rs new file mode 100644 index 0000000..cee5496 --- /dev/null +++ b/storage_engine/src/segments/entry_header.rs @@ -0,0 +1,68 @@ +use crate::binary_coding::{decode, encode, decode_sequence}; +use crate::storage_engine::{Result, Column}; +use crate::error::{Error, DecodeErrorKind}; +use std::mem::size_of; + +#[derive(Debug)] +pub struct EntryHeader { + pub is_deleted: bool, +} + +#[derive(Debug, Clone)] +pub struct EntryHeaderWithDataSize { + pub is_deleted: bool, + pub data_sizes: Vec, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6 + // bytes etc +} + +impl EntryHeader { + pub fn encode(self: &EntryHeader) -> Result> { + let result: Vec = encode(&self.is_deleted)?; + Ok(result) + } +} + +impl From for EntryHeader { + fn from(entry: EntryHeaderWithDataSize) -> Self { + Self { is_deleted: entry.is_deleted, } + } +} + +impl EntryHeaderWithDataSize { + pub const IS_DELETED_OFFSET: usize = 0; + pub const IS_DELETED_SIZE: usize = size_of::(); + pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE; + + pub fn size(number_of_columns: usize) -> usize { + let size_of_data_sizes: usize = number_of_columns*size_of::(); + Self::IS_DELETED_SIZE + size_of_data_sizes + } + + pub fn size_of_data(&self) -> usize{ + self.data_sizes.iter().sum() + } + + pub fn offset_of_column(&self, column: Column) -> usize { + let mut sum = 0; + for (i, size) in self.data_sizes.iter().enumerate() { + if i < column as usize { + sum += size; + } else { + break + } + } + sum + } + + pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { + let (is_deleted, _) = + decode::(&bytes) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; + + let data_sizes = decode_sequence::(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?; + + Ok(Self { is_deleted, data_sizes } ) + } +} + diff --git a/storage_engine/src/segments/mod.rs b/storage_engine/src/segments/mod.rs new file mode 100644 index 0000000..b78a646 --- /dev/null +++ b/storage_engine/src/segments/mod.rs @@ -0,0 +1,3 @@ +pub mod entry; +pub mod entry_header; +pub mod store_header; diff --git a/storage_engine/src/segments/store_header.rs b/storage_engine/src/segments/store_header.rs new file mode 100644 index 0000000..890b198 --- /dev/null +++ b/storage_engine/src/segments/store_header.rs @@ -0,0 +1,127 @@ +use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence}; +use crate::storage_engine::{Result, Column}; +use crate::error::{Error, DecodeErrorKind}; +use std::mem::size_of; + +#[derive(Debug, Clone)] +pub struct StoreHeader { + pub table_folder: String, // This one is not encoded into the file + + pub number_of_columns: usize, + pub deleted_count: usize, + pub total_count: usize, + pub primary_column: Column, + pub indexed_columns: Vec, +} + +#[derive(Debug, Clone)] +pub struct StoreHeaderFixedPart { + pub table_folder: String, // This one is not encoded into the file + + pub number_of_columns: usize, + pub deleted_count: usize, + pub total_count: usize, + pub primary_column: Column, +} + +impl StoreHeader { + pub const NUMBER_OF_COLUMNS_SIZE: usize = size_of::(); + pub const DELETED_COUNT_SIZE: usize = size_of::(); + pub const TOTAL_COUNT_SIZE: usize = size_of::(); + pub const PRIMARY_COLUMN_SIZE: usize = size_of::(); + pub const FIXED_SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; + + pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0; + pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; + pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE; + pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE; + #[allow(dead_code)] + pub const INDEXED_COLUMNS_OFFSET: usize = Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE; + + fn indexed_columns_size(number_of_columns: usize) -> usize { + size_of::() * number_of_columns + } + + pub fn size(number_of_columns: usize) -> usize { + Self::FIXED_SIZE + Self::indexed_columns_size(number_of_columns) + } + + pub fn encode(&self) -> Result> { + let mut result = encode(&self.number_of_columns)?; + result.append(&mut encode(&self.deleted_count)?); + result.append(&mut encode(&self.total_count)?); + result.append(&mut encode(&self.primary_column)?); + result.append(&mut encode_sequence(&self.indexed_columns)?); + Ok(result) + } + + pub fn buffer_for_fixed_decoding() -> [u8; Self::FIXED_SIZE] { + [0; Self::FIXED_SIZE] + } + + pub fn buffer_for_rest_decoding(header: &StoreHeaderFixedPart) -> Vec { + vec![0; Self::indexed_columns_size(header.number_of_columns)] + } + + pub async fn decode_fixed(table_folder: &str, result: &[u8]) -> Result { + let (number_of_columns, _) = + decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; + let (deleted_count, _) = + decode::(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; + let (total_count, _) = + decode::(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?; + let (primary_column, _) = + decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; + let header = StoreHeaderFixedPart { + table_folder: table_folder.to_string(), + number_of_columns, + deleted_count, + total_count, + primary_column, + }; + + Ok(header) + } + + pub async fn decode_rest(header: StoreHeaderFixedPart, result: &[u8]) -> Result { + let indexed_columns: Vec = + decode_sequence::(header.number_of_columns, result) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?; + + Ok(StoreHeader { + table_folder: header.table_folder, + number_of_columns: header.number_of_columns, + deleted_count: header.deleted_count, + total_count: header.total_count, + primary_column: header.primary_column, + + indexed_columns, + }) + } + + + + // returns new count + pub fn increment_total_count(&mut self) -> usize { + self.total_count += 1; + self.total_count + } + + // returns new count + pub fn increment_deleted_count(&mut self) -> usize { + self.deleted_count += 1; + self.deleted_count + } + + pub fn is_column_indexed(&self, column: Column) -> bool { + self.indexed_columns[column as usize] + } + + pub fn make_column_indexed(&mut self, column: Column) { + self.indexed_columns[column as usize] = true + } +} diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs new file mode 100644 index 0000000..7798e10 --- /dev/null +++ b/storage_engine/src/storage_engine.rs @@ -0,0 +1,544 @@ +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::fs::{File, OpenOptions, DirBuilder}; +use tokio::fs; +use std::path::{Path, PathBuf}; +use bincode::{Decode, Encode}; + +use crate::error::Error; +use crate::cursor::{ReadCursor, WriteCursor}; +use crate::cursor_capabilities::header_access::CursorCanReadHeader; +use crate::segments::store_header::StoreHeader; +use crate::index::Index; + + +pub type Result = std::result::Result; + +pub type Column = u64; +pub type FilePosition = u64; + +// TODO: Consider adding another type parameter for indexable values +#[derive(Debug)] +pub struct Store { + pub header: StoreHeader, + pub indexes: StoreIndexes, +} + +pub type StoreIndexes = Vec>>; + + +//===Store=== +pub async fn store_exists(table_folder: &str) -> Result { + Ok(fs::metadata(table_folder).await.is_ok()) +} + +pub const ROWS_FILE_NAME: &'static str = "rows"; +pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate"; + +impl Store { + // ===Creation=== + pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result + where T: Encode + Decode + Ord + { + let path_to_table = Path::new(table_folder); + let path_to_rows = path_to_table.join(ROWS_FILE_NAME); + DirBuilder::new() + .create(path_to_table).await?; + + let header = { + let mut indexed_columns = vec![false; number_of_columns]; + indexed_columns[primary_column as usize] = true; + StoreHeader { + table_folder: table_folder.to_string(), + number_of_columns, + deleted_count: 0, + total_count: 0, + primary_column, + indexed_columns, + } + }; + + // We don't need the file right now. Only cursors will later open it. + Self::create_empty_rows_file(path_to_rows, &header).await?; + + let indexes: StoreIndexes = Self::create_initial_indexes(&header).await?; + + let store = Self { + header, + indexes, + }; + + Ok(store) + } + + pub fn path_to_index_file(header: &StoreHeader, column: Column) -> PathBuf { + let path_to_table = Path::new(&header.table_folder); + let path_to_index = path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column.to_string())); + path_to_index + } + + pub async fn create_empty_index_at(header: &StoreHeader, column: Column) -> Result> + where T: Encode + Decode + Ord + { + let path_to_index = Self::path_to_index_file(&header, column); + let index = Index::new(path_to_index).await?; + + Ok(index) + } + + pub async fn create_initial_indexes(header: &StoreHeader) -> Result> + where T: Encode + Decode + Ord + { + let mut result: StoreIndexes = Vec::with_capacity(header.number_of_columns); + for _ in 0..header.number_of_columns { + result.push(None) + } + + result[header.primary_column as usize] = Some(Self::create_empty_index_at(&header, header.primary_column).await?); + + Ok(result) + } + + pub async fn connect_index_at(header: &StoreHeader, column: Column) -> Result> + where T: Encode + Decode + Ord + { + let path_to_index = Self::path_to_index_file(&header, column); + let index: Index = Index::connect(path_to_index).await?; + Ok(index) + } + + pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result { + let mut file: File = + OpenOptions::new() + .write(true) + .read(true) + .create_new(true) + .open(path_to_rows) + .await?; + + let encoded_header: Vec = header.encode()?; + file.write(&encoded_header).await?; + + Ok(file) + } + + pub async fn connect(table_folder: &str) -> Result + where T: std::fmt::Debug + Encode + Decode + Ord + { + let path_to_table = Path::new(table_folder); + let path_to_rows = path_to_table.join(ROWS_FILE_NAME); + + let mut file: File = + OpenOptions::new() + .read(true) + .write(true) + .open(path_to_rows) + .await?; + + // Unfortunately we can't yet use store.read_bytes, since it can't be created without the + // header. + let header = { + let mut fixed_header_bytes = StoreHeader::buffer_for_fixed_decoding(); + file.read_exact(&mut fixed_header_bytes).await?; + let fixed_header = StoreHeader::decode_fixed(table_folder, &fixed_header_bytes).await?; + + // decode the indexes + let mut rest_bytes: Vec = StoreHeader::buffer_for_rest_decoding(&fixed_header); + file.read_exact(&mut rest_bytes).await?; + StoreHeader::decode_rest(fixed_header, &rest_bytes).await? + }; + + + let indexes: StoreIndexes = { + let mut result = Vec::with_capacity(header.number_of_columns); + for (column, &is_indexed) in header.indexed_columns.iter().enumerate() { + if is_indexed { + result.push(Some(Self::connect_index_at(&header, column as Column).await?)) + } else { + result.push(None) + } + } + + result + }; + + let store = Self { + header, + indexes + }; + Ok(store) + } + + // ===Cursors=== + pub async fn read_cursor(&self) -> Result> + where T: Send + Sync + { + ReadCursor::new(self).await + } + + pub async fn write_cursor(&mut self) -> Result> + where T: Send + Sync + { + WriteCursor::new(self).await + } + + // ===Indexes=== + pub async fn attach_index(&mut self, column: Column) -> Result<()> + where T: Ord + Decode + Encode + Send + Sync + { + if self.header.is_column_indexed(column) { + Err(Error::ColumnAlreadyIndexed(column)) + } else { + let mut cursor = self.write_cursor().await?; + cursor.attach_index(column).await + } + } + + // For debugging. + #[allow(dead_code)] + pub async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> + where T: Send + Sync + { + let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + let bytes = cursor.read_all_bytes().await?; + Ok(bytes) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::segments::entry::Entry; + use crate::cursor_capabilities::header_access::CursorCanReadHeader; + use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex}; + + impl Drop for Store { + fn drop(&mut self) { + println!("DROPPING TEST FOLDER"); + let table_folder = self.header.table_folder.clone(); + // Seems no one has figured out how to do AsyncDrop yet. + std::fs::remove_dir_all(table_folder).unwrap(); + } + } + + + #[tokio::test] + async fn test_create() { + type Data = u32; + + let table_path = "test_table_0"; + let number_of_columns = 5; + let primary_column = 0; + let store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + + assert!(store.header.number_of_columns == number_of_columns); + assert!(store.header.total_count == 0); + assert!(store.header.deleted_count == 0); + assert!(store.header.primary_column == primary_column); + } + + #[tokio::test] + async fn test_insert() { + type Data = u32; + + let table_path = "test_table_1"; + let number_of_columns = 5; + let primary_column = 0; + let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + + { + let mut cursor = store.write_cursor().await.unwrap(); + + let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); + cursor.insert_entry(entry0).await.unwrap(); + + let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); + cursor.insert_entry(entry1).await.unwrap(); + + assert!(store.header.total_count == 2); + } + } + + #[tokio::test] + async fn test_select_next() { + type Data = u32; + + let table_path = "test_table_2"; + let number_of_columns = 5; + let primary_column = 0; + let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + + { + let mut cursor = store.write_cursor().await.unwrap(); + + let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); + cursor.insert_entry(entry0).await.unwrap(); + + let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); + cursor.insert_entry(entry1).await.unwrap(); + + assert!(store.header.total_count == 2); + } + + { + let mut cursor = store.read_cursor().await.unwrap(); + + let entry0 = cursor.next().await.unwrap().unwrap(); + let entry1 = cursor.next().await.unwrap().unwrap(); + + assert!(entry0.data == vec![1,2,3,4,5]); + assert!(entry1.data == vec![6,7,8,9,10]); + } + } + + #[tokio::test] + async fn test_select_all() { + type Data = u32; + + let table_path = "test_table_3"; + let number_of_columns = 5; + let primary_column = 0; + let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + + { + let mut cursor = store.write_cursor().await.unwrap(); + + let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); + cursor.insert_entry(entry0).await.unwrap(); + + let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); + cursor.insert_entry(entry1).await.unwrap(); + + assert!(store.header.total_count == 2); + } + + { + let mut cursor = store.read_cursor().await.unwrap(); + + let mut entries = vec![]; + while let Some(entry) = cursor.next().await.unwrap() { + entries.push(entry) + } + + assert!(entries.len() == 2); + assert!(entries[0].data == vec![1,2,3,4,5]); + assert!(entries[1].data == vec![6,7,8,9,10]); + } + } + + #[tokio::test] + async fn test_select_eq() { + type Data = u32; + + let table_path = "test_table_4"; + let number_of_columns = 5; + let primary_column = 0; + let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + + let value = 200; + { + let mut cursor = store.write_cursor().await.unwrap(); + + let entry0: Entry = Entry::new(vec![1, value, 3, 4, 5]); + cursor.insert_entry(entry0).await.unwrap(); + + let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); + cursor.insert_entry(entry1).await.unwrap(); + + let entry2: Entry = Entry::new(vec![11, 2, 10, 10, 10]); + cursor.insert_entry(entry2).await.unwrap(); + + let entry3: Entry = Entry::new(vec![1, value, 100, 50, 40]); + cursor.insert_entry(entry3).await.unwrap(); + + assert!(store.header.total_count == 4); + } + + { + let mut cursor = store.read_cursor().await.unwrap(); + let column = 1; + + let entries = cursor.select_entries_where_eq(column, &value).await.unwrap(); + + assert!(entries.len() == 2); + assert!(entries[0].data == vec![1, value, 3, 4, 5]); + assert!(entries[1].data == vec![1, value, 100, 50, 40]); + } + } + + #[tokio::test] + async fn test_select_eq_indexed() { + type Data = u32; + + let table_path = "test_table_5"; + let number_of_columns = 5; + let primary_column = 0; + let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + + let column: Column = 1; + + assert!(store.indexes[column as usize].is_none()); + store.attach_index(column).await.unwrap(); + assert!(store.indexes[column as usize].is_some()); + + let value = 200; + { + let mut cursor = store.write_cursor().await.unwrap(); + + let entry0: Entry = Entry::new(vec![1, value, 3, 4, 5]); + cursor.insert_entry(entry0).await.unwrap(); + + let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); + cursor.insert_entry(entry1).await.unwrap(); + + let entry2: Entry = Entry::new(vec![11, 2, 10, 10, 10]); + cursor.insert_entry(entry2).await.unwrap(); + + let entry3: Entry = Entry::new(vec![1, value, 100, 50, 40]); + cursor.insert_entry(entry3).await.unwrap(); + + assert!(store.header.total_count == 4); + } + + { + let mut cursor = store.read_cursor().await.unwrap(); + let column = 1; + + let entries = cursor.select_entries_where_eq(column, &value).await.unwrap(); + assert!(entries.len() == 2); + // Order may be non-deterministic. + assert!(entries[0].data[column as usize] == value); + assert!(entries[1].data[column as usize] == value); + } + } + + #[tokio::test] + async fn test_delete_entry() { + type Data = u32; + + let table_path = "test_table_6"; + let number_of_columns = 5; + let primary_column = 0; + let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + + let value = 200; + let (_file_position0, file_position1, _file_position2, _file_position3) = { + let mut cursor = store.write_cursor().await.unwrap(); + + let entry0: Entry = Entry::new(vec![1, value, 3, 4, 5]); + let file_position0 = cursor.insert_entry(entry0).await.unwrap(); + + let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); + let file_position1 = cursor.insert_entry(entry1).await.unwrap(); + + let entry2: Entry = Entry::new(vec![11, 2, 10, 10, 10]); + let file_position2 = cursor.insert_entry(entry2).await.unwrap(); + + let entry3: Entry = Entry::new(vec![1, value, 100, 50, 40]); + let file_position3 = cursor.insert_entry(entry3).await.unwrap(); + + assert!(store.header.total_count == 4); + (file_position0, file_position1, file_position2, file_position3) + }; + + { + assert!(store.header.deleted_count == 0); + let mut cursor = store.write_cursor().await.unwrap(); + cursor.mark_deleted_at(file_position1, false).await.unwrap(); + assert!(store.header.deleted_count == 1); + } + } + + #[tokio::test] + async fn test_delete_where_eq() { + type Data = u32; + + let table_path = "test_table_7"; + let number_of_columns = 5; + let primary_column = 0; + let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + + let column: Column = 1; + + assert!(store.indexes[column as usize].is_none()); + store.attach_index(column).await.unwrap(); + assert!(store.indexes[column as usize].is_some()); + + let value = 200; + + let (_file_position0, _file_position1, _file_position2, _file_position3) = { + let mut cursor = store.write_cursor().await.unwrap(); + + let entry0: Entry = Entry::new(vec![1, value, 3, 4, 5]); + let file_position0 = cursor.insert_entry(entry0).await.unwrap(); + + let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); + let file_position1 = cursor.insert_entry(entry1).await.unwrap(); + + let entry2: Entry = Entry::new(vec![11, 2, 10, 10, 10]); + let file_position2 = cursor.insert_entry(entry2).await.unwrap(); + + let entry3: Entry = Entry::new(vec![1, value, 100, 50, 40]); + let file_position3 = cursor.insert_entry(entry3).await.unwrap(); + + assert!(store.header.total_count == 4); + (file_position0, file_position1, file_position2, file_position3) + }; + + { + assert!(store.header.deleted_count == 0); + let mut cursor = store.write_cursor().await.unwrap(); + cursor.delete_entries_where_eq(column, &value, false).await.unwrap(); + assert!(store.header.deleted_count == 2); + } + } + + #[tokio::test] + async fn test_garbage_collection() { + type Data = u32; + + let table_path = "test_table_8"; + let number_of_columns = 5; + let primary_column = 0; + let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); + + let column: Column = 1; + + assert!(store.indexes[column as usize].is_none()); + store.attach_index(column).await.unwrap(); + assert!(store.indexes[column as usize].is_some()); + + let value = 200; + + let (_file_position0, _file_position1, _file_position2, _file_position3) = { + let mut cursor = store.write_cursor().await.unwrap(); + + let entry0: Entry = Entry::new(vec![1, value, 3, 4, 5]); + let file_position0 = cursor.insert_entry(entry0).await.unwrap(); + + let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); + let file_position1 = cursor.insert_entry(entry1).await.unwrap(); + + let entry2: Entry = Entry::new(vec![11, 2, 10, 10, 10]); + let file_position2 = cursor.insert_entry(entry2).await.unwrap(); + + let entry3: Entry = Entry::new(vec![1, value, 100, 50, 40]); + let file_position3 = cursor.insert_entry(entry3).await.unwrap(); + + assert!(store.header.total_count == 4); + (file_position0, file_position1, file_position2, file_position3) + }; + + { + assert!(store.header.deleted_count == 0); + let mut cursor = store.write_cursor().await.unwrap(); + cursor.delete_entries_where_eq(column, &value, false).await.unwrap(); + assert!(cursor.header().deleted_count == 2); + assert!(cursor.header().total_count == 4); + + cursor.initiate_garbage_collection().await.unwrap(); + assert!(cursor.header().deleted_count == 0); + assert!(cursor.header().total_count == 2); + } + } + +}