From 82300039fc27c59094c4740a5a1d251b0d507f76 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sun, 4 Feb 2024 20:45:57 +0100 Subject: [PATCH] Split cursor functionality further into traits. Prep for garbage collection. --- storage_engine/src/cursor.rs | 215 +++++++++++++++++++++-------------- storage_engine/src/main.rs | 2 +- 2 files changed, 133 insertions(+), 84 deletions(-) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 05f9d05..cf01190 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -37,6 +37,15 @@ pub struct WriteCursor<'a, T> { eof_file_position: FilePosition, } +// This is used as a cursor to temporary file during Garbage Collection +pub struct AppendOnlyCursor { + header: StoreHeader, + file: File, + data_type: PhantomData, + + eof_file_position: FilePosition, +} + // ===Traits=== #[async_trait] @@ -90,6 +99,14 @@ pub trait PrimitiveCursor { } } +#[async_trait] +pub trait PrimitiveWriteCursor: PrimitiveCursor { + async fn write_bytes(&mut self, bytes: &[u8]) -> Result { + Ok(self.file().write(bytes).await?) + } + +} + #[async_trait] pub trait CursorWithStoreHeader: PrimitiveCursor { fn header(&self) -> &StoreHeader; @@ -241,6 +258,53 @@ pub trait CursorWithAccessToIndex: CursorWithStoreHeader { } } +#[async_trait] +pub trait CursorWithWriteStoreHeader: CursorWithStoreHeader + PrimitiveWriteCursor { + fn header_mut(&mut self) -> &mut StoreHeader; + fn set_eof_file_position(&mut self, new_file_position: FilePosition); + + // ===Store Header Manipulation=== + async fn increment_total_count(&mut self) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; + let new_count = self.header_mut().increment_total_count(); + self.write_bytes(&encode::(&new_count)?).await?; + Ok(()) + } + + async fn increment_deleted_count(&mut self) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; + let new_count = self.header_mut().increment_deleted_count(); + self.write_bytes(&encode::(&new_count)?).await?; + Ok(()) + } + + // ===Append Entry=== + + // Moves cursor to the end. + // Returns file position to the start of the new entry. + async fn append_entry(&mut self, entry: &Entry) -> Result + where T: Encode + Send + Sync + { + self.increment_total_count().await?; + + let encoded_entry: Vec = entry.encode()?; + let file_position = self.seek_to_end().await?; + self.write_bytes(&encoded_entry).await?; + + let eof_file_position: FilePosition = self.current_file_position().await?; + self.set_eof_file_position(eof_file_position); + + Ok(file_position) + } +} + +// ===========Implementations============= // ===PrimitiveCursor=== impl PrimitiveCursor for ReadCursor<'_, T> { fn file(&mut self) -> &mut File { @@ -262,33 +326,56 @@ impl PrimitiveCursor for WriteCursor<'_, T> { } } -// ===CursorWithStoreHeader=== -impl CursorWithStoreHeader for ReadCursor<'_, T> { - fn header(&self) -> &StoreHeader { - &self.header +impl PrimitiveCursor for AppendOnlyCursor { + fn file(&mut self) -> &mut File { + &mut self.file + } + + fn eof_file_position(&self) -> FilePosition { + self.eof_file_position } } +// ===PrimitiveCursor=== +impl PrimitiveWriteCursor for WriteCursor<'_, T> {} +impl PrimitiveWriteCursor for AppendOnlyCursor {} + + +// ===CursorWithStoreHeader=== +impl CursorWithStoreHeader for ReadCursor<'_, T> { + fn header(&self) -> &StoreHeader { &self.header } +} + impl CursorWithStoreHeader for WriteCursor<'_, T> { - fn header(&self) -> &StoreHeader { - &self.header - } + fn header(&self) -> &StoreHeader { &self.header } +} + +impl CursorWithStoreHeader for AppendOnlyCursor { + fn header(&self) -> &StoreHeader { &self.header } +} + +// ===CursorWithWriteStoreHeader=== +impl CursorWithWriteStoreHeader for WriteCursor<'_, T> { + fn header_mut(&mut self) -> &mut StoreHeader { self.header } + fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } +} + +impl CursorWithWriteStoreHeader for AppendOnlyCursor { + fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header } + fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } } // ===CursorWithAccessToIndex=== impl CursorWithAccessToIndex for ReadCursor<'_, T> { - fn indexes(&mut self) -> &[Option>] { - &self.indexes - } + fn indexes(&mut self) -> &[Option>] { &self.indexes } } impl CursorWithAccessToIndex for WriteCursor<'_, T> { - fn indexes(&mut self) -> &[Option>] { - &self.indexes - } + fn indexes(&mut self) -> &[Option>] { &self.indexes } } + impl <'cursor, T> ReadCursor<'cursor, T> { pub async fn new<'store: 'cursor>(store: &'store Store) -> Result where T: Send + Sync @@ -317,6 +404,8 @@ impl <'cursor, T> ReadCursor<'cursor, T> { } } + + impl <'cursor, T> WriteCursor<'cursor, T> // TODO: Consider adding this manually to wher eit is really needed where T: Sync @@ -375,60 +464,16 @@ impl <'cursor, T> WriteCursor<'cursor, T> Ok(cursor) } - - // ===Primitive Operations=== - async fn write_bytes(&mut self, bytes: &[u8]) -> Result { - Ok(self.file.write(bytes).await?) - } - - // ===Store Header Manipulation=== - async fn increment_total_count(&mut self) -> Result<()> - where T: Send - { - self.seek_to_start().await?; - self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; - let new_count = self.header.increment_total_count(); - self.write_bytes(&encode::(&new_count)?).await?; - Ok(()) - } - - async fn increment_deleted_count(&mut self) -> Result<()> - where T: Send - { - self.seek_to_start().await?; - self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; - let new_count = self.header.increment_deleted_count(); - self.write_bytes(&encode::(&new_count)?).await?; - Ok(()) - } - // ===Entry Header Manipulation=== // assumes we are at the start of valid entry. - async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> { + async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> + where T: Send + { let bytes: Vec = entry_header.encode()?; self.write_bytes(&bytes).await?; Ok(()) } - // ===Append Entry=== - - // Moves cursor to the end. - // Returns file position to the start of the new entry. - pub async fn append_entry(&mut self, entry: &Entry) -> Result - where T: Encode + Send - { - self.increment_total_count().await?; - - let encoded_entry: Vec = entry.encode()?; - let file_position = self.seek_to_end().await?; - self.write_bytes(&encoded_entry).await?; - - let eof_file_position: FilePosition = self.current_file_position().await?; - self.eof_file_position = eof_file_position; - - Ok(file_position) - } - // ===Deletion=== pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> where T: Send @@ -461,18 +506,31 @@ impl <'cursor, T> WriteCursor<'cursor, T> } } - async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> { + // ===Garbage Collection=== + async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> + where T: Send + { // TODO: What should be the policy? Counting size of garbage? Counting how many entries are // garbage? if self.header.deleted_count > 100 { - todo!() - } else { - Ok(()) + self.initiate_garbage_collection().await?; } + Ok(()) } async fn initiate_garbage_collection(&mut self) -> Result where T: Send + { + // We'll dump all alive entries into a new file. + let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?; + + // In it there will be only the alive rows. + // Afterwards we swap the files, and delete the garbage. + todo!() + } + + async fn spawn_cursor_to_intermediate_file(&self) -> Result> + where T: Send { let table_folder = self.header.table_folder.to_string(); let path_to_table = Path::new(&table_folder); @@ -480,36 +538,27 @@ impl <'cursor, T> WriteCursor<'cursor, T> let intermediate_file: File = Store::::create_empty_rows_file(path_to_rows, &self.header).await?; - let mut intermediate_header: StoreHeader = StoreHeader { + let intermediate_header: StoreHeader = StoreHeader { table_folder, number_of_columns: self.header.number_of_columns, deleted_count: 0, total_count: 0, primary_column: self.header.primary_column, - indexed_columns: todo!() + indexed_columns: self.header.indexed_columns.clone(), }; - // Creates a new cursor to the intermediate file in which we'll dump the live entries. - // let mut cursor_to_intermediate = Self { - // header: &mut intermediate_header, - // file: intermediate_file, - // data_type: PhantomData::, + // Creates a new (append) cursor to the intermediate file in which we'll dump the live entries. + let mut cursor_to_intermediate = AppendOnlyCursor { + header: intermediate_header, + file: intermediate_file, + data_type: PhantomData::, - // eof_file_position: 0, - // }; - let mut cursor_to_intermediate: Self = todo!(); + eof_file_position: 0, + }; let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?; cursor_to_intermediate.eof_file_position = eof_file_position; - - - // TODO: intermediate_header does not live long enough, so after garbage collection is - // done, we need to use it in the swap. - cursor_to_intermediate.header = todo!(); - - // In it there will be only the alive rows. - // Afterwards we swap the files, and delete the garbage. - todo!() + Ok(cursor_to_intermediate) } // ===Indexing=== diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index 5f4bb69..d8df150 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -9,7 +9,7 @@ mod store_header; use crate::entry::{Entry, EntryDetailed}; use crate::storage_engine::{Store, FilePosition}; -use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, PrimitiveCursor}; +use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, CursorWithWriteStoreHeader}; type Data = u32;