diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 45b37fc..8b6bfbb 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -1,5 +1,6 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; use tokio::fs::{File, OpenOptions}; +use tokio::fs; use std::path::Path; use std::marker::PhantomData; use std::collections::{BTreeMap, HashSet}; @@ -17,7 +18,7 @@ use crate::store_header::StoreHeader; use crate::storage_engine::{Store, FilePosition, Column, Result, StoreIndexes, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; use crate::index::Index; - +const GARBAGE_COLLECTION_TRIGGER: usize = 100; // ===Concrete Cursors=== pub struct ReadCursor<'a, T> { @@ -327,12 +328,12 @@ pub trait CursorWithWriteStoreHeader: CursorWithStoreHeader + PrimitiveWri pub trait CursorWithAccessToIndex: CursorWithStoreHeader { fn indexes(&mut self) -> &[Option>]; - async fn index_lookup(&mut self, column: Column, k: &T) -> Result>> + async fn index_lookup(&mut self, column: Column, value: &T) -> Result>> where T: Encode + Decode + Ord + Send + Sync { match &self.indexes()[column as usize] { Some(index) => { - let file_positions = index.lookup(k).await?.unwrap_or_else(|| HashSet::new()); + let file_positions = index.lookup(value).await?.unwrap_or_else(|| HashSet::new()); let mut entries: Vec> = vec![]; for &file_position in file_positions.iter() { match self.read_entry_at(file_position).await? { @@ -601,9 +602,10 @@ impl <'cursor, T> WriteCursor<'cursor, T> } // ===Deletion=== - pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> + pub async fn mark_deleted_at(&mut self, file_position: FilePosition, enable_garbage_collector: bool) -> Result<()> where T: Encode + Decode + Ord + Send + Sync + Clone + Ord { + println!("MARKING {} as DELETED", file_position); self.seek_to(file_position).await?; let mut entry_header = self.read_entry_header().await?; if entry_header.is_deleted { @@ -628,23 +630,59 @@ impl <'cursor, T> WriteCursor<'cursor, T> } } - self.attempt_garbage_collection_if_necessary().await?; + if enable_garbage_collector { + self.attempt_garbage_collection_if_necessary().await?; + } Ok(()) } } - async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result>> - where T: Encode + Decode + Ord + Send + Sync + Clone + Ord + async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T, enable_garbage_collector: bool) -> Result>> + where T: Encode + Decode + Ord + Send + Sync + Clone { let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?; if let Some(entry) = maybe_entry { - self.mark_deleted_at(entry.file_position).await?; + self.mark_deleted_at(entry.file_position, enable_garbage_collector).await?; Ok(Some(entry)) } else { Ok(maybe_entry) } } + // Doesn't update indexes. + async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result + where T: Encode + Decode + Ord + Send + Sync + Clone + { + let mut count = 0; + while let Some(_) = self.find_first_eq_bruteforce_and_delete(column, t0, false).await? { + count += 1; + } + Ok(count) + } + + pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result + where T: Encode + Decode + Ord + Send + Sync + Clone + { + let count = + if self.header().is_column_indexed(column) { + println!("DELETION: INDEXED LOOKUP"); + let entries = self.index_lookup(column, value).await?; + let count = entries.len(); + for entry in entries { + self.mark_deleted_at(entry.file_position, false).await? + } + count + } else { + println!("DELETION: BRUTE-FORCE LOOKUP"); + let count = self.find_all_eq_bruteforce_and_delete(column, value).await?; + count + }; + if enable_garbage_collector { + self.attempt_garbage_collection_if_necessary().await?; + } + Ok(count) + } + // ===Indexing=== // WARNING: Assumes the column is NOT indexable. pub async fn attach_index(&mut self, column: Column) -> Result<()> @@ -671,15 +709,14 @@ impl <'cursor, T> WriteCursor<'cursor, T> async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> where T: Send + Decode + Encode + Clone + Ord { - // TODO: What should be the policy? Counting size of garbage? Counting how many entries are - // garbage? - if self.header.deleted_count > 100 { + if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER { + println!("=======START GARBAGE COLLETOR===="); self.initiate_garbage_collection().await?; } Ok(()) } - async fn initiate_garbage_collection(&mut self) -> Result + pub async fn initiate_garbage_collection(&mut self) -> Result where T: Send + Decode + Encode + Clone + Ord { let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?; @@ -698,6 +735,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> // We'll dump all alive entries into a new file. let mut entries_deleted = 0; + self.seek_to_start_of_data().await?; { while let Some(live_entry) = self.next_alive().await? { entries_deleted += 1; @@ -729,6 +767,14 @@ impl <'cursor, T> WriteCursor<'cursor, T> self.file = cursor_to_intermediate.file; self.eof_file_position = cursor_to_intermediate.eof_file_position; + // swap files on disk + // current file + let path_to_table = Path::new(&self.header.table_folder); + let path_to_rows = path_to_table.join(ROWS_FILE_NAME); + let path_to_intermediate_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); + fs::remove_file(path_to_rows.clone()).await?; + fs::rename(path_to_intermediate_rows, path_to_rows).await?; + Ok(entries_deleted) } diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index 46e6552..f3a32e0 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -82,6 +82,20 @@ async fn append_bunch_of_entries(store: &mut Store) -> Result<()> { Ok(()) } +async fn test_garbage_collection(store: &mut Store) -> Result<()> { + let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?; + // cursor.delete_entries_where_eq() + // 1. mark a bunch of entries as deleted + let column = 0; + let value = 1; + // cursor.delete_entries_where_eq(column, &value, true).await.map_err(|e| e.to_io_or_panic())?; + // let value = 50; + // cursor.delete_entries_where_eq(column, &value).await.map_err(|e| e.to_io_or_panic())?; + + // cursor.initiate_garbage_collection().await.map_err(|e| e.to_io_or_panic())?; + Ok(()) +} + #[tokio::main] async fn main() -> Result<()> { println!("STOOOOOOOOOOOORAAAAAAAAAAAGE"); @@ -117,65 +131,73 @@ async fn main() -> Result<()> { cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; } - { - let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; - let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - println!("{:?}", x); - let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - println!("{:?}", x); - let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - println!("{:?}", x); - let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - println!("{:?}", x); - let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - println!("{:?}", x); - } + test_garbage_collection(&mut store).await?; { let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; - let column = 2; - let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; - println!("{:?}", x); - let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; - println!("{:?}", x); - let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; - println!("{:?}", x); - let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; - println!("{:?}", x); - let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; - println!("{:?}", x); + cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; } - { - let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; - let column = 0; - let value = 1; - let entries = cursor.select_entries_where_eq(column, &value).await.map_err(|e| e.to_io_or_panic())?; - println!("ARE INDEXES WORKING???"); - println!("{:?}", entries); - } - { - let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; - let column = 1; - let value = 2; - let entries = cursor.select_entries_where_eq(column, &value).await.map_err(|e| e.to_io_or_panic())?; - println!("ARE INDEXES WORKING???"); - println!("{:?}", entries); - } + // { + // let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // } - { - let column = 1; - // println!("BUILDING AN INDEX"); - // store.attach_index(column).await.map_err(|e| e.to_io_or_panic())?; - // println!("INDEX BUILT!"); + // { + // let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + // let column = 2; + // let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // } - let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; - let value = 2; - let entries = cursor.select_entries_where_eq(column, &value).await.map_err(|e| e.to_io_or_panic())?; - println!("ARE INDEXES WORKING???"); - println!("{:?}", entries); - } + // { + // let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + // let column = 0; + // let value = 1; + // let entries = cursor.select_entries_where_eq(column, &value).await.map_err(|e| e.to_io_or_panic())?; + // println!("ARE INDEXES WORKING???"); + // println!("{:?}", entries); + // } + + // { + // let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + // let column = 1; + // let value = 2; + // let entries = cursor.select_entries_where_eq(column, &value).await.map_err(|e| e.to_io_or_panic())?; + // println!("ARE INDEXES WORKING???"); + // println!("{:?}", entries); + // } + + // { + // let column = 1; + // // println!("BUILDING AN INDEX"); + // // store.attach_index(column).await.map_err(|e| e.to_io_or_panic())?; + // // println!("INDEX BUILT!"); + + // let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + // let value = 2; + // let entries = cursor.select_entries_where_eq(column, &value).await.map_err(|e| e.to_io_or_panic())?; + // println!("ARE INDEXES WORKING???"); + // println!("{:?}", entries); + // } // {