Implement Garbage Collection
This commit is contained in:
parent
1086b2fc5e
commit
f3fc67cbbc
4 changed files with 43 additions and 20 deletions
|
|
@ -602,7 +602,7 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
||||||
|
|
||||||
// ===Deletion===
|
// ===Deletion===
|
||||||
pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()>
|
pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()>
|
||||||
where T: Encode + Decode + Ord + Send + Sync
|
where T: Encode + Decode + Ord + Send + Sync + Clone + Ord
|
||||||
{
|
{
|
||||||
self.seek_to(file_position).await?;
|
self.seek_to(file_position).await?;
|
||||||
let mut entry_header = self.read_entry_header().await?;
|
let mut entry_header = self.read_entry_header().await?;
|
||||||
|
|
@ -634,7 +634,7 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
|
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
|
||||||
where T: Encode + Decode + Ord + Send + Sync
|
where T: Encode + Decode + Ord + Send + Sync + Clone + Ord
|
||||||
{
|
{
|
||||||
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
|
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
|
||||||
if let Some(entry) = maybe_entry {
|
if let Some(entry) = maybe_entry {
|
||||||
|
|
@ -669,7 +669,7 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
||||||
|
|
||||||
// ===Garbage Collection===
|
// ===Garbage Collection===
|
||||||
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
|
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
|
||||||
where T: Send + Decode + Encode
|
where T: Send + Decode + Encode + Clone + Ord
|
||||||
{
|
{
|
||||||
// TODO: What should be the policy? Counting size of garbage? Counting how many entries are
|
// TODO: What should be the policy? Counting size of garbage? Counting how many entries are
|
||||||
// garbage?
|
// garbage?
|
||||||
|
|
@ -680,11 +680,21 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn initiate_garbage_collection(&mut self) -> Result<usize>
|
async fn initiate_garbage_collection(&mut self) -> Result<usize>
|
||||||
where T: Send + Decode + Encode
|
where T: Send + Decode + Encode + Clone + Ord
|
||||||
{
|
{
|
||||||
let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?;
|
let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?;
|
||||||
// This will be a vector of such BTree maps...
|
// Since garbage collection changes FilePositions of live entries, we need to update the
|
||||||
let in_memory_index: BTreeMap<T, HashSet<FilePosition>> = BTreeMap::new();
|
// indexes too.
|
||||||
|
|
||||||
|
let mut in_memory_indexes: Vec<Option<BTreeMap<T, HashSet<FilePosition>>>> = Vec::with_capacity(self.header.number_of_columns);
|
||||||
|
for column in 0..self.header.number_of_columns {
|
||||||
|
if self.header.is_column_indexed(column as Column) {
|
||||||
|
let in_memory_index = BTreeMap::new();
|
||||||
|
in_memory_indexes.push(Some(in_memory_index))
|
||||||
|
} else {
|
||||||
|
in_memory_indexes.push(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// We'll dump all alive entries into a new file.
|
// We'll dump all alive entries into a new file.
|
||||||
let mut entries_deleted = 0;
|
let mut entries_deleted = 0;
|
||||||
|
|
@ -692,25 +702,31 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
||||||
while let Some(live_entry) = self.next_alive().await? {
|
while let Some(live_entry) = self.next_alive().await? {
|
||||||
entries_deleted += 1;
|
entries_deleted += 1;
|
||||||
let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?;
|
let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?;
|
||||||
// TODO: Start indexing all of the indexable columns from scratch.
|
|
||||||
|
// Update index. (Wouldn't it be nice if we had `for let ...`?)
|
||||||
|
for (maybe_in_memory_index, value) in in_memory_indexes.iter_mut().zip(&live_entry.data) {
|
||||||
|
if let Some(in_memory_index) = maybe_in_memory_index {
|
||||||
|
in_memory_index.entry(value.clone()).or_insert_with(HashSet::new).insert(file_position);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Create a new indexes from in_memory_index.
|
// ===swap===
|
||||||
|
// swapping indexes
|
||||||
// Afterwards we swap the files, and delete the garbage.
|
// Update indexes on disk.
|
||||||
// TODO:
|
for (column, maybe_in_memory_index) in in_memory_indexes.into_iter().enumerate() {
|
||||||
// What needs to be done?
|
if let Some(in_memory_index) = maybe_in_memory_index {
|
||||||
// 1. We take self cursor and mutate it
|
let index = self.mut_index_at(column as Column);
|
||||||
|
index.reset(in_memory_index).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// swapping headers
|
// swapping headers
|
||||||
self.header.deleted_count = 0;
|
self.header.deleted_count = 0;
|
||||||
self.header.total_count = cursor_to_intermediate.header.total_count;
|
self.header.total_count = cursor_to_intermediate.header.total_count;
|
||||||
|
|
||||||
// TODO: We'll actually have to iterate through all the indexes and swap each of them.
|
|
||||||
self.indexes = todo!();
|
|
||||||
self.file = cursor_to_intermediate.file;
|
self.file = cursor_to_intermediate.file;
|
||||||
|
|
||||||
self.eof_file_position = cursor_to_intermediate.eof_file_position;
|
self.eof_file_position = cursor_to_intermediate.eof_file_position;
|
||||||
|
|
||||||
Ok(entries_deleted)
|
Ok(entries_deleted)
|
||||||
|
|
|
||||||
|
|
@ -49,10 +49,12 @@ impl <T>EntryDetailed<T> {
|
||||||
Ok(EntryDetailed { header, file_position, data })
|
Ok(EntryDetailed { header, file_position, data })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn forget(self) -> Entry<T> {
|
pub fn forget(&self) -> Entry<T>
|
||||||
|
where T: Clone
|
||||||
|
{
|
||||||
Entry {
|
Entry {
|
||||||
header: self.header.into(),
|
header: self.header.clone().into(),
|
||||||
data: self.data,
|
data: self.data.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ pub struct EntryHeader {
|
||||||
pub is_deleted: bool,
|
pub is_deleted: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct EntryHeaderWithDataSize {
|
pub struct EntryHeaderWithDataSize {
|
||||||
pub is_deleted: bool,
|
pub is_deleted: bool,
|
||||||
pub data_sizes: Vec<usize>, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6
|
pub data_sizes: Vec<usize>, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6
|
||||||
|
|
|
||||||
|
|
@ -100,6 +100,11 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn reset(&mut self, data: BTreeMap<K, HashSet<V>>) -> Result<()> {
|
||||||
|
self.data = data;
|
||||||
|
self.sync_to_disk().await
|
||||||
|
}
|
||||||
|
|
||||||
async fn append_to_file(&mut self, key: &K, value: &V) -> Result<()> {
|
async fn append_to_file(&mut self, key: &K, value: &V) -> Result<()> {
|
||||||
let mut encoded = Vec::new();
|
let mut encoded = Vec::new();
|
||||||
encoded.extend(encode(key)?);
|
encoded.extend(encode(key)?);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue