From 1086b2fc5e21a121019dded812f1c56909f1fc14 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 01:04:10 +0100 Subject: [PATCH] Add indexing to deletion --- storage_engine/src/cursor.rs | 81 ++++++++++++++++++++++-------------- storage_engine/src/main.rs | 8 ++-- 2 files changed, 52 insertions(+), 37 deletions(-) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index f15a80a..13054da 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -400,7 +400,6 @@ pub trait CursorWithWriteAccessToIndex: CursorWithAccessToIndex + CursorWi } async fn insert_entry(&mut self, entry: Entry) -> Result - // TODO: Why is 'async_trait necessary? where T: Encode + Decode + Ord + Send + Sync + 'async_trait { let file_position = self.append_entry_no_indexing(&entry).await?; @@ -415,6 +414,18 @@ pub trait CursorWithWriteAccessToIndex: CursorWithAccessToIndex + CursorWi Ok(file_position) } + + async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed) -> Result<()> + where T: Encode + Decode + Ord + Send + Sync + 'async_trait + { + for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { + if should_index { + // SAFETY: If should_index is true, then the column is indexable. + self.delete_from_index(column as Column, value, entry.file_position).await? + } + } + Ok(()) + } } @@ -591,26 +602,39 @@ impl <'cursor, T> WriteCursor<'cursor, T> // ===Deletion=== pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> - where T: Send + Decode + Encode + where T: Encode + Decode + Ord + Send + Sync { self.seek_to(file_position).await?; let mut entry_header = self.read_entry_header().await?; if entry_header.is_deleted { Ok(()) } else { + // Update store and entry headers self.increment_deleted_count().await?; self.seek_to(file_position).await?; entry_header.is_deleted = true; self.set_new_entry_header(entry_header.into()).await?; + // Update index + self.seek_to(file_position).await?; + match self.next().await? { + Some(entry) => { + self.delete_entry_values_from_indexes(entry).await? + }, + None => { + // SAFETY: We just modified its header, so it must exist. + unreachable!() + } + } + self.attempt_garbage_collection_if_necessary().await?; Ok(()) } } async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result>> - where T: Decode + Encode + PartialEq + Send + Sync + where T: Encode + Decode + Ord + Send + Sync { let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?; if let Some(entry) = maybe_entry { @@ -621,6 +645,28 @@ impl <'cursor, T> WriteCursor<'cursor, T> } } + // ===Indexing=== + // WARNING: Assumes the column is NOT indexable. + pub async fn attach_index(&mut self, column: Column) -> Result<()> + where T: Ord + Decode + Encode + Send + Sync + { + // New Index + let index = Store::create_empty_index_at(&self.header, column).await?; + self.indexes[column as usize] = Some(index); + + // Mark column as indexed + self.header.make_column_indexed(column); + self.set_header(&self.header.clone()).await?; + + // Build index + self.seek_to_start_of_data().await?; + while let Some((_, file_position, value)) = self.next_alive_at_column(column).await? { + self.insert_into_index(column, value, file_position).await? + } + + Ok(()) + } + // ===Garbage Collection=== async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> where T: Send + Decode + Encode @@ -701,34 +747,5 @@ impl <'cursor, T> WriteCursor<'cursor, T> Ok(cursor_to_intermediate) } - - // ===Indexing=== - // WARNING: Assumes the column is NOT indexable. - pub async fn attach_index(&mut self, column: Column) -> Result<()> - where T: Ord + Decode + Encode + Send + Sync - { - // New Index - let index = Store::create_empty_index_at(&self.header, column).await?; - self.indexes[column as usize] = Some(index); - - // Mark column as indexed - self.header.make_column_indexed(column); - self.set_header(&self.header.clone()).await?; - - // Build index - self.seek_to_start_of_data().await?; - while let Some((_, file_position, value)) = self.next_alive_at_column(column).await? { - self.insert_into_index(column, value, file_position).await? - } - - Ok(()) - } - - async fn delete_from_index(&mut self, t: T, file_position: FilePosition) -> Result> - where T: Encode + Decode + Ord + Send + Sync - { - // let x = self.primary_index.delete(t, file_position).await?; - todo!() - } } diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index 55bf693..46e6552 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -166,9 +166,9 @@ async fn main() -> Result<()> { { let column = 1; - println!("BUILDING AN INDEX"); - store.attach_index(column).await.map_err(|e| e.to_io_or_panic())?; - println!("INDEX BUILT!"); + // println!("BUILDING AN INDEX"); + // store.attach_index(column).await.map_err(|e| e.to_io_or_panic())?; + // println!("INDEX BUILT!"); let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; let value = 2; @@ -178,8 +178,6 @@ async fn main() -> Result<()> { } - - // { // let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; // let column = 3;