diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 13054da..45b37fc 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -602,7 +602,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> // ===Deletion=== pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> - where T: Encode + Decode + Ord + Send + Sync + where T: Encode + Decode + Ord + Send + Sync + Clone + Ord { self.seek_to(file_position).await?; let mut entry_header = self.read_entry_header().await?; @@ -634,7 +634,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> } async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result>> - where T: Encode + Decode + Ord + Send + Sync + where T: Encode + Decode + Ord + Send + Sync + Clone + Ord { let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?; if let Some(entry) = maybe_entry { @@ -669,7 +669,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> // ===Garbage Collection=== async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> - where T: Send + Decode + Encode + where T: Send + Decode + Encode + Clone + Ord { // TODO: What should be the policy? Counting size of garbage? Counting how many entries are // garbage? @@ -680,11 +680,21 @@ impl <'cursor, T> WriteCursor<'cursor, T> } async fn initiate_garbage_collection(&mut self) -> Result - where T: Send + Decode + Encode + where T: Send + Decode + Encode + Clone + Ord { let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?; - // This will be a vector of such BTree maps... - let in_memory_index: BTreeMap> = BTreeMap::new(); + // Since garbage collection changes FilePositions of live entries, we need to update the + // indexes too. + + let mut in_memory_indexes: Vec>>> = Vec::with_capacity(self.header.number_of_columns); + for column in 0..self.header.number_of_columns { + if self.header.is_column_indexed(column as Column) { + let in_memory_index = BTreeMap::new(); + in_memory_indexes.push(Some(in_memory_index)) + } else { + in_memory_indexes.push(None) + } + } // We'll dump all alive entries into a new file. let mut entries_deleted = 0; @@ -692,25 +702,31 @@ impl <'cursor, T> WriteCursor<'cursor, T> while let Some(live_entry) = self.next_alive().await? { entries_deleted += 1; let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?; - // TODO: Start indexing all of the indexable columns from scratch. + + // Update index. (Wouldn't it be nice if we had `for let ...`?) + for (maybe_in_memory_index, value) in in_memory_indexes.iter_mut().zip(&live_entry.data) { + if let Some(in_memory_index) = maybe_in_memory_index { + in_memory_index.entry(value.clone()).or_insert_with(HashSet::new).insert(file_position); + } + } } } - // TODO: Create a new indexes from in_memory_index. - - // Afterwards we swap the files, and delete the garbage. - // TODO: - // What needs to be done? - // 1. We take self cursor and mutate it + // ===swap=== + // swapping indexes + // Update indexes on disk. + for (column, maybe_in_memory_index) in in_memory_indexes.into_iter().enumerate() { + if let Some(in_memory_index) = maybe_in_memory_index { + let index = self.mut_index_at(column as Column); + index.reset(in_memory_index).await?; + } + } // swapping headers self.header.deleted_count = 0; self.header.total_count = cursor_to_intermediate.header.total_count; - // TODO: We'll actually have to iterate through all the indexes and swap each of them. - self.indexes = todo!(); self.file = cursor_to_intermediate.file; - self.eof_file_position = cursor_to_intermediate.eof_file_position; Ok(entries_deleted) diff --git a/storage_engine/src/entry.rs b/storage_engine/src/entry.rs index ed0c1fa..d22a46f 100644 --- a/storage_engine/src/entry.rs +++ b/storage_engine/src/entry.rs @@ -49,10 +49,12 @@ impl EntryDetailed { Ok(EntryDetailed { header, file_position, data }) } - pub fn forget(self) -> Entry { + pub fn forget(&self) -> Entry + where T: Clone + { Entry { - header: self.header.into(), - data: self.data, + header: self.header.clone().into(), + data: self.data.clone(), } } } diff --git a/storage_engine/src/entry_header.rs b/storage_engine/src/entry_header.rs index 7c8d626..cee5496 100644 --- a/storage_engine/src/entry_header.rs +++ b/storage_engine/src/entry_header.rs @@ -8,7 +8,7 @@ pub struct EntryHeader { pub is_deleted: bool, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EntryHeaderWithDataSize { pub is_deleted: bool, pub data_sizes: Vec, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6 diff --git a/storage_engine/src/index.rs b/storage_engine/src/index.rs index b5c42c1..6f69b76 100644 --- a/storage_engine/src/index.rs +++ b/storage_engine/src/index.rs @@ -100,6 +100,11 @@ where Ok(()) } + pub async fn reset(&mut self, data: BTreeMap>) -> Result<()> { + self.data = data; + self.sync_to_disk().await + } + async fn append_to_file(&mut self, key: &K, value: &V) -> Result<()> { let mut encoded = Vec::new(); encoded.extend(encode(key)?);