Prepare for garbage collection

This commit is contained in:
Yuriy Dupyn 2024-02-03 23:45:55 +01:00
parent 0f98903759
commit daa39850f0
3 changed files with 115 additions and 28 deletions

View file

@ -13,7 +13,7 @@ use crate::binary_coding::{encode, decode};
use crate::entry::{Entry, EntryDetailed};
use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader};
use crate::store_header::StoreHeader;
use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME};
use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME};
#[async_trait]
// TODO: Make this private
@ -261,7 +261,7 @@ impl <T> ReadCursor<T> {
pub async fn new(store: &Store<T>) -> Result<Self>
where T: Send
{
let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME);
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
let file: File =
OpenOptions::new()
.read(true)
@ -293,7 +293,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
where T: Send
{
let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME);
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
let file: File =
OpenOptions::new()
.read(true)
@ -315,7 +315,33 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
Ok(cursor)
}
pub async fn connect<'header: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader) -> Result<Self>
where T: Send
{
let file: File =
OpenOptions::new()
.read(true)
.write(true)
.open(path_to_rows)
.await?;
let mut cursor = Self {
header,
file,
data_type: PhantomData::<T>,
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
cursor.eof_file_position = eof_file_position;
cursor.seek_to_start_of_data().await?;
Ok(cursor)
}
// ===Primitive Operations===
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
Ok(self.file.write(bytes).await?)
@ -389,6 +415,18 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
}
}
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
where T: Decode + PartialEq + Send + Sync
{
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
if let Some(entry) = maybe_entry {
self.mark_deleted_at(entry.file_position).await?;
Ok(Some(entry))
} else {
Ok(maybe_entry)
}
}
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> {
// TODO: What should be the policy? Counting size of garbage? Counting how many entries are
// garbage?
@ -398,6 +436,46 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
Ok(())
}
}
async fn initiate_garbage_collection(&mut self) -> Result<usize>
where T: Send
{
let table_folder = self.header.table_folder.to_string();
let path_to_table = Path::new(&table_folder);
let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
let intermediate_file: File = Store::<T>::create_empty_rows_file(path_to_rows, &self.header).await?;
let mut intermediate_header: StoreHeader = StoreHeader {
table_folder,
number_of_columns: self.header.number_of_columns,
deleted_count: 0,
total_count: 0,
primary_column: self.header.primary_column
};
// Creates a new cursor to the intermediate file in which we'll dump the live entries.
// let mut cursor_to_intermediate = Self {
// header: &mut intermediate_header,
// file: intermediate_file,
// data_type: PhantomData::<T>,
// eof_file_position: 0,
// };
let mut cursor_to_intermediate: Self = todo!();
let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?;
cursor_to_intermediate.eof_file_position = eof_file_position;
// TODO: intermediate_header does not live long enough, so after garbage collection is
// done, we need to use it in the swap.
cursor_to_intermediate.header = todo!();
// In it there will be only the alive rows.
// Afterwards we swap the files, and delete the garbage.
todo!()
}
}

View file

@ -1,7 +1,7 @@
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::fs::{File, OpenOptions, DirBuilder};
use tokio::fs;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::marker::PhantomData;
use async_trait::async_trait;
@ -28,7 +28,6 @@ pub struct Store<T> {
// {write: 0, read: n + 1} ~> {write:0, read: n} // destroy read
// {write: 0, read: 0} ~> {write: 1, read: 0} // create write
// {write: 1, read: 0} ~> {write: 0, read: 0} // destroy write
pub table_folder: String,
// primary_index: Vec<Index<T, FilePosition>>>,
// indexes: Vec<Option<Index<T, HashSet<FilePosition>>>>,
// primary_index: Index<PositionOfValue, PositionOfRow>,
@ -63,6 +62,7 @@ impl <T>SomethingSupportingLeq for Store<T>
}
pub const ROWS_FILE_NAME: &'static str = "rows";
pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate";
impl <T>Store<T> {
// ===Creation===
@ -72,6 +72,31 @@ impl <T>Store<T> {
DirBuilder::new()
.create(path_to_table).await?;
let header = StoreHeader {
table_folder: table_folder.to_string(),
number_of_columns,
deleted_count: 0,
total_count: 0,
primary_column,
};
// We don't need the file right now. Only cursors will later open it.
Self::create_empty_rows_file(path_to_rows, &header).await?;
// TODO: indexes
// let index: Index<PositionOfValue, PositionOfRow> = Index::new(
// &format!("rows_{}", primary_column.to_string()),
// ).await?;
let store = Self {
header,
data_type: PhantomData::<T>,
};
Ok(store)
}
pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result<File> {
let mut file: File =
OpenOptions::new()
.write(true)
@ -80,28 +105,10 @@ impl <T>Store<T> {
.open(path_to_rows)
.await?;
let header = StoreHeader {
number_of_columns,
deleted_count: 0,
total_count: 0,
primary_column,
};
let encoded_header: Vec<u8> = header.encode()?;
file.write(&encoded_header).await?;
// TODO: indexes
// let index: Index<PositionOfValue, PositionOfRow> = Index::new(
// &format!("rows_{}", primary_column.to_string()),
// ).await?;
let store = Self {
table_folder: table_folder.to_string(),
header,
data_type: PhantomData::<T>,
};
Ok(store)
Ok(file)
}
pub async fn connect(table_folder: &str) -> Result<Self>
@ -121,10 +128,9 @@ impl <T>Store<T> {
// header.
let mut header_bytes = StoreHeader::decode_buffer();
file.read_exact(&mut header_bytes).await?;
let header = StoreHeader::decode(&mut header_bytes).await?;
let header = StoreHeader::decode(table_folder, &mut header_bytes).await?;
let store = Self {
table_folder: table_folder.to_string(),
header,
data_type: PhantomData::<T>,
};

View file

@ -5,6 +5,8 @@ use std::mem::size_of;
#[derive(Debug, Clone)]
pub struct StoreHeader {
pub table_folder: String, // This one is not encoded into the file
pub number_of_columns: usize,
pub deleted_count: usize,
pub total_count: usize,
@ -35,7 +37,7 @@ impl StoreHeader {
[0; StoreHeader::SIZE]
}
pub async fn decode(result: &mut [u8]) -> Result<StoreHeader> {
pub async fn decode(table_folder: &str, result: &mut [u8]) -> Result<StoreHeader> {
let (number_of_columns, _) =
decode::<usize>(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
@ -49,6 +51,7 @@ impl StoreHeader {
decode::<Column>(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?;
let header = StoreHeader {
table_folder: table_folder.to_string(),
number_of_columns,
deleted_count,
total_count,