diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index b0ea017..8afe89f 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -13,7 +13,7 @@ use crate::binary_coding::{encode, decode}; use crate::entry::{Entry, EntryDetailed}; use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader}; use crate::store_header::StoreHeader; -use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME}; +use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; #[async_trait] // TODO: Make this private @@ -261,7 +261,7 @@ impl ReadCursor { pub async fn new(store: &Store) -> Result where T: Send { - let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME); + let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let file: File = OpenOptions::new() .read(true) @@ -293,7 +293,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> { pub async fn new<'store: 'cursor>(store: &'store mut Store) -> Result where T: Send { - let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME); + let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let file: File = OpenOptions::new() .read(true) @@ -315,7 +315,33 @@ impl <'cursor, T> WriteCursor<'cursor, T> { Ok(cursor) } - + + pub async fn connect<'header: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader) -> Result + where T: Send + { + let file: File = + OpenOptions::new() + .read(true) + .write(true) + .open(path_to_rows) + .await?; + + let mut cursor = Self { + header, + file, + data_type: PhantomData::, + + eof_file_position: 0, + }; + let eof_file_position: FilePosition = cursor.seek_to_end().await?; + cursor.eof_file_position = eof_file_position; + + cursor.seek_to_start_of_data().await?; + + Ok(cursor) + } + + // ===Primitive Operations=== async fn write_bytes(&mut self, bytes: &[u8]) -> Result { Ok(self.file.write(bytes).await?) @@ -389,6 +415,18 @@ impl <'cursor, T> WriteCursor<'cursor, T> { } } + async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result>> + where T: Decode + PartialEq + Send + Sync + { + let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?; + if let Some(entry) = maybe_entry { + self.mark_deleted_at(entry.file_position).await?; + Ok(Some(entry)) + } else { + Ok(maybe_entry) + } + } + async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> { // TODO: What should be the policy? Counting size of garbage? Counting how many entries are // garbage? @@ -398,6 +436,46 @@ impl <'cursor, T> WriteCursor<'cursor, T> { Ok(()) } } + + async fn initiate_garbage_collection(&mut self) -> Result + where T: Send + { + let table_folder = self.header.table_folder.to_string(); + let path_to_table = Path::new(&table_folder); + let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); + + let intermediate_file: File = Store::::create_empty_rows_file(path_to_rows, &self.header).await?; + + let mut intermediate_header: StoreHeader = StoreHeader { + table_folder, + number_of_columns: self.header.number_of_columns, + deleted_count: 0, + total_count: 0, + primary_column: self.header.primary_column + }; + + // Creates a new cursor to the intermediate file in which we'll dump the live entries. + // let mut cursor_to_intermediate = Self { + // header: &mut intermediate_header, + // file: intermediate_file, + // data_type: PhantomData::, + + // eof_file_position: 0, + // }; + let mut cursor_to_intermediate: Self = todo!(); + let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?; + cursor_to_intermediate.eof_file_position = eof_file_position; + + + + // TODO: intermediate_header does not live long enough, so after garbage collection is + // done, we need to use it in the swap. + cursor_to_intermediate.header = todo!(); + + // In it there will be only the alive rows. + // Afterwards we swap the files, and delete the garbage. + todo!() + } } diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index 4284f74..4c06c4d 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -1,7 +1,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::fs::{File, OpenOptions, DirBuilder}; use tokio::fs; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::marker::PhantomData; use async_trait::async_trait; @@ -28,7 +28,6 @@ pub struct Store { // {write: 0, read: n + 1} ~> {write:0, read: n} // destroy read // {write: 0, read: 0} ~> {write: 1, read: 0} // create write // {write: 1, read: 0} ~> {write: 0, read: 0} // destroy write - pub table_folder: String, // primary_index: Vec>>, // indexes: Vec>>>, // primary_index: Index, @@ -63,6 +62,7 @@ impl SomethingSupportingLeq for Store } pub const ROWS_FILE_NAME: &'static str = "rows"; +pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate"; impl Store { // ===Creation=== @@ -72,6 +72,31 @@ impl Store { DirBuilder::new() .create(path_to_table).await?; + let header = StoreHeader { + table_folder: table_folder.to_string(), + number_of_columns, + deleted_count: 0, + total_count: 0, + primary_column, + }; + + // We don't need the file right now. Only cursors will later open it. + Self::create_empty_rows_file(path_to_rows, &header).await?; + + // TODO: indexes + // let index: Index = Index::new( + // &format!("rows_{}", primary_column.to_string()), + // ).await?; + + let store = Self { + header, + data_type: PhantomData::, + }; + + Ok(store) + } + + pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result { let mut file: File = OpenOptions::new() .write(true) @@ -80,28 +105,10 @@ impl Store { .open(path_to_rows) .await?; - let header = StoreHeader { - number_of_columns, - deleted_count: 0, - total_count: 0, - primary_column, - }; let encoded_header: Vec = header.encode()?; file.write(&encoded_header).await?; - - // TODO: indexes - // let index: Index = Index::new( - // &format!("rows_{}", primary_column.to_string()), - // ).await?; - - let store = Self { - table_folder: table_folder.to_string(), - header, - data_type: PhantomData::, - }; - - Ok(store) + Ok(file) } pub async fn connect(table_folder: &str) -> Result @@ -121,10 +128,9 @@ impl Store { // header. let mut header_bytes = StoreHeader::decode_buffer(); file.read_exact(&mut header_bytes).await?; - let header = StoreHeader::decode(&mut header_bytes).await?; + let header = StoreHeader::decode(table_folder, &mut header_bytes).await?; let store = Self { - table_folder: table_folder.to_string(), header, data_type: PhantomData::, }; diff --git a/storage_engine/src/store_header.rs b/storage_engine/src/store_header.rs index fa84478..73cbb55 100644 --- a/storage_engine/src/store_header.rs +++ b/storage_engine/src/store_header.rs @@ -5,6 +5,8 @@ use std::mem::size_of; #[derive(Debug, Clone)] pub struct StoreHeader { + pub table_folder: String, // This one is not encoded into the file + pub number_of_columns: usize, pub deleted_count: usize, pub total_count: usize, @@ -35,7 +37,7 @@ impl StoreHeader { [0; StoreHeader::SIZE] } - pub async fn decode(result: &mut [u8]) -> Result { + pub async fn decode(table_folder: &str, result: &mut [u8]) -> Result { let (number_of_columns, _) = decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; @@ -49,6 +51,7 @@ impl StoreHeader { decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; let header = StoreHeader { + table_folder: table_folder.to_string(), number_of_columns, deleted_count, total_count,