From 53aa5a01278a89849c41ccaab967f2a285338d7a Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sat, 3 Feb 2024 17:56:08 +0100 Subject: [PATCH] Attempt at delete --- storage_engine/src/main.rs | 11 ++- storage_engine/src/storage_engine.rs | 123 ++++++++++++++++++--------- 2 files changed, 93 insertions(+), 41 deletions(-) diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index c5b47de..117ac4a 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -24,10 +24,10 @@ async fn create_store() -> Result> { println!("THE BYTES: {:?}", store.read_all_bytes().await?); let mut cursor = store.cursor(AccessMode::Write).await.map_err(|e| e.to_io_or_panic())?; - let entry0: Entry = Entry::new_deleted(vec![1, 2, 3, 4, 5]); + let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); append_entry(&mut cursor, &entry0).await?; - let entry1: Entry = Entry::new_deleted(vec![200, 200, 5, 6, 7]); + let entry1: Entry = Entry::new(vec![200, 200, 5, 6, 7]); append_entry(&mut cursor, &entry1).await?; println!("{:?}", store.read_all_bytes().await?); @@ -83,6 +83,13 @@ async fn main() -> Result<()> { let entry0: Entry = Entry::new(vec![99, 98, 97, 96, 95]); append_entry(&mut cursor, &entry0).await?; + let entry1: Entry = Entry::new(vec![50,50,50,50,50]); + let file_position = append_entry(&mut cursor, &entry1).await?; + println!("CURRENT FILE_POSITION = {}", file_position); + // Now file_position point to entry1. + // cursor.mark_deleted_at(file_position).await.map_err(|e| e.to_io_or_panic())?; + // cursor.seek_to(file_position).await.map_err(|e| e.to_io_or_panic())?; + cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index d96a42d..ddc0bb2 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -35,6 +35,10 @@ pub struct Store { // primary_index: Vec>>, // indexes: Vec>>>, // primary_index: Index, + + // TODO: It's not good to have StoreHeader copied to all the cursors, since they may modify it. + // How to sync? + // All header: StoreHeader, data_type: PhantomData, @@ -46,6 +50,9 @@ pub struct Store { // list } +// Read Cursors don't modify the rows nor Store Header. +// Write Cursors can modify both rows and Store Header. +// Probably should split these into two types. But they will have a lot of functionality in common. pub struct Cursor { header: StoreHeader, file: File, @@ -54,6 +61,14 @@ pub struct Cursor { eof_file_position: FilePosition, } +pub struct WriteCursor<'a, T> { + header: &'a mut StoreHeader, + file: File, + data_type: PhantomData, + + eof_file_position: FilePosition, +} + pub enum AccessMode { Read, Write @@ -87,10 +102,6 @@ impl StoreHeader { pub struct EntryHeader { is_deleted: bool, } -impl EntryHeader { - const IS_DELETED_SIZE: usize = size_of::(); - const HEADER_SIZE: usize = Self::IS_DELETED_SIZE; -} #[derive(Debug)] pub struct EntryHeaderWithDataSize { @@ -363,13 +374,15 @@ impl Cursor { .await?, }; - let cursor = Self { + let mut cursor = Self { header: store.header.clone(), file, data_type: store.data_type, - eof_file_position: 0, + eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data }; + cursor.seek_to_start_of_data().await?; + Ok(cursor) } @@ -392,7 +405,8 @@ impl Cursor { Ok(result) } - async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> { + // TODO: make private + pub async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> { self.file.seek(SeekFrom::Start(file_position)).await?; Ok(()) } @@ -411,27 +425,41 @@ impl Cursor { self.seek_to(StoreHeader::SIZE as u64).await } - async fn current_file_position(&mut self) -> Result { + // TODO: Make private + pub async fn current_file_position(&mut self) -> Result { let next_file_position: FilePosition = self.file.stream_position().await?; Ok(next_file_position) } - // For debugging. - // Moves file cursor to the end. - pub async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error>{ - let mut bytes: Vec = vec![]; - self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?; - self.file.read_to_end(&mut bytes).await?; - Ok(bytes) + async fn is_at_eof(&mut self) -> Result { + Ok(self.current_file_position().await? == self.eof_file_position) } pub async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> Result { todo!() } - // ===Creation=== + // ===Iteration=== + // Assumes that the current file position is at a valid entry or EOF. + pub async fn next(&mut self) -> Result>> + where T: Decode + { + if self.is_at_eof().await? { + return Ok(None) + } - // ===Append Entry=== + let header = self.read_entry_header().await?; + + let mut data_bytes: Vec = vec![0; header.size_of_data()]; + self.read_bytes(&mut data_bytes).await?; + let entry: EntryDetailed = + EntryDetailed::decode(header, self.header.number_of_columns, &mut data_bytes)?; + + Ok(Some(entry)) + } + + + // ===Store Header Manipulation=== async fn increment_total_count(&mut self) -> Result<()> { self.seek_to_start().await?; self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; @@ -448,7 +476,18 @@ impl Cursor { Ok(()) } + // ===Entry Header Manipulation=== + // assumes we are at the start of the valid entry. + async fn set_entry_is_deleted_to(&mut self, is_deleted: bool) -> Result<()> { + self.seek_to(EntryHeaderWithDataSize::IS_DELETED_OFFSET as u64).await?; + self.write_bytes(&encode::(&is_deleted)?).await?; + Ok(()) + } + + // ===Append Entry=== + // Moves cursor to the end. + // Returns file position to the start of the new entry. pub async fn append_entry(&mut self, entry: &Entry) -> Result where T: Encode { @@ -467,21 +506,34 @@ impl Cursor { // ===Deletion=== pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> { - self.increment_deleted_count().await?; - self.seek_to(file_position).await?; + let entry_header = self.read_entry_header().await?; + if entry_header.is_deleted { + Ok(()) + } else { + self.increment_deleted_count().await?; + self.seek_to(file_position).await?; + self.set_entry_is_deleted_to(true).await?; - // TODO: Now you need to mutate the entry itself - todo!() + self.attempt_garbage_collection_if_necessary().await?; + Ok(()) + } } + async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> { + // TODO: What should be the policy? Counting size of garbage? Counting how many entries are + // garbage? + if self.header.deleted_count > 100 { + todo!() + } else { + Ok(()) + } + } // ===Lookup=== // WARNING: The cursor has to be at the start of an entry. Otherwise garbage data will be // decoded as an entry. - pub async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { - self.seek_to(file_position).await?; - + async fn read_entry_header(&mut self) -> Result { let number_of_columns: usize = self.header.number_of_columns; let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; self.read_bytes(&mut header_bytes).await?; @@ -493,6 +545,11 @@ impl Cursor { Ok(header) } + pub async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { + self.seek_to(file_position).await?; + self.read_entry_header().await + } + pub async fn search_for(&mut self, index: T) -> Result<()> where T: Send { @@ -501,25 +558,13 @@ impl Cursor { todo!() } + // Returns None when file_positoin == eof_file_position pub async fn read_entry_at(&mut self, file_position: FilePosition) -> Result>> where T: Decode { - if file_position == self.eof_file_position { - return Ok(None) - } - - let header = self.read_entry_header_at(file_position).await?; - - let mut data_bytes: Vec = vec![0; header.size_of_data()]; - // TODO: Get rid of the println's - // println!("PREPARED_DATA_BYTES: {:?}", data_bytes); - self.read_bytes(&mut data_bytes).await?; - // println!("DATA_BYTES: {:?}", data_bytes); - let entry: EntryDetailed = - EntryDetailed::decode(header, self.header.number_of_columns, &mut data_bytes)?; - - Ok(Some(entry)) + self.seek_to(file_position).await?; + self.next().await } // TODO: This needs to be some sort of an iterator