Split cursor functionality further into traits. Prep for garbage collection.

This commit is contained in:
Yuriy Dupyn 2024-02-04 20:45:57 +01:00
parent 8fd2d4ebf3
commit 82300039fc
2 changed files with 133 additions and 84 deletions

View file

@ -37,6 +37,15 @@ pub struct WriteCursor<'a, T> {
eof_file_position: FilePosition,
}
// This is used as a cursor to temporary file during Garbage Collection
pub struct AppendOnlyCursor<T> {
header: StoreHeader,
file: File,
data_type: PhantomData<T>,
eof_file_position: FilePosition,
}
// ===Traits===
#[async_trait]
@ -90,6 +99,14 @@ pub trait PrimitiveCursor<T> {
}
}
#[async_trait]
pub trait PrimitiveWriteCursor<T>: PrimitiveCursor<T> {
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
Ok(self.file().write(bytes).await?)
}
}
#[async_trait]
pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> {
fn header(&self) -> &StoreHeader;
@ -241,6 +258,53 @@ pub trait CursorWithAccessToIndex<T>: CursorWithStoreHeader<T> {
}
}
#[async_trait]
pub trait CursorWithWriteStoreHeader<T>: CursorWithStoreHeader<T> + PrimitiveWriteCursor<T> {
fn header_mut(&mut self) -> &mut StoreHeader;
fn set_eof_file_position(&mut self, new_file_position: FilePosition);
// ===Store Header Manipulation===
async fn increment_total_count(&mut self) -> Result<()>
where T: Send
{
self.seek_to_start().await?;
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
let new_count = self.header_mut().increment_total_count();
self.write_bytes(&encode::<usize>(&new_count)?).await?;
Ok(())
}
async fn increment_deleted_count(&mut self) -> Result<()>
where T: Send
{
self.seek_to_start().await?;
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?;
let new_count = self.header_mut().increment_deleted_count();
self.write_bytes(&encode::<usize>(&new_count)?).await?;
Ok(())
}
// ===Append Entry===
// Moves cursor to the end.
// Returns file position to the start of the new entry.
async fn append_entry(&mut self, entry: &Entry<T>) -> Result<FilePosition>
where T: Encode + Send + Sync
{
self.increment_total_count().await?;
let encoded_entry: Vec<u8> = entry.encode()?;
let file_position = self.seek_to_end().await?;
self.write_bytes(&encoded_entry).await?;
let eof_file_position: FilePosition = self.current_file_position().await?;
self.set_eof_file_position(eof_file_position);
Ok(file_position)
}
}
// ===========Implementations=============
// ===PrimitiveCursor===
impl <T>PrimitiveCursor<T> for ReadCursor<'_, T> {
fn file(&mut self) -> &mut File {
@ -262,33 +326,56 @@ impl <T>PrimitiveCursor<T> for WriteCursor<'_, T> {
}
}
// ===CursorWithStoreHeader===
impl <T>CursorWithStoreHeader<T> for ReadCursor<'_, T> {
fn header(&self) -> &StoreHeader {
&self.header
impl <T>PrimitiveCursor<T> for AppendOnlyCursor<T> {
fn file(&mut self) -> &mut File {
&mut self.file
}
fn eof_file_position(&self) -> FilePosition {
self.eof_file_position
}
}
// ===PrimitiveCursor===
impl <T>PrimitiveWriteCursor<T> for WriteCursor<'_, T> {}
impl <T>PrimitiveWriteCursor<T> for AppendOnlyCursor<T> {}
// ===CursorWithStoreHeader===
impl <T>CursorWithStoreHeader<T> for ReadCursor<'_, T> {
fn header(&self) -> &StoreHeader { &self.header }
}
impl <T>CursorWithStoreHeader<T> for WriteCursor<'_, T> {
fn header(&self) -> &StoreHeader {
&self.header
}
fn header(&self) -> &StoreHeader { &self.header }
}
impl <T>CursorWithStoreHeader<T> for AppendOnlyCursor<T> {
fn header(&self) -> &StoreHeader { &self.header }
}
// ===CursorWithWriteStoreHeader===
impl <T>CursorWithWriteStoreHeader<T> for WriteCursor<'_, T> {
fn header_mut(&mut self) -> &mut StoreHeader { self.header }
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
}
impl <T>CursorWithWriteStoreHeader<T> for AppendOnlyCursor<T> {
fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header }
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
}
// ===CursorWithAccessToIndex===
impl <T>CursorWithAccessToIndex<T> for ReadCursor<'_, T> {
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] {
&self.indexes
}
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
}
impl <T>CursorWithAccessToIndex<T> for WriteCursor<'_, T> {
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] {
&self.indexes
}
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
}
impl <'cursor, T> ReadCursor<'cursor, T> {
pub async fn new<'store: 'cursor>(store: &'store Store<T>) -> Result<Self>
where T: Send + Sync
@ -317,6 +404,8 @@ impl <'cursor, T> ReadCursor<'cursor, T> {
}
}
impl <'cursor, T> WriteCursor<'cursor, T>
// TODO: Consider adding this manually to wher eit is really needed
where T: Sync
@ -375,60 +464,16 @@ impl <'cursor, T> WriteCursor<'cursor, T>
Ok(cursor)
}
// ===Primitive Operations===
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
Ok(self.file.write(bytes).await?)
}
// ===Store Header Manipulation===
async fn increment_total_count(&mut self) -> Result<()>
where T: Send
{
self.seek_to_start().await?;
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
let new_count = self.header.increment_total_count();
self.write_bytes(&encode::<usize>(&new_count)?).await?;
Ok(())
}
async fn increment_deleted_count(&mut self) -> Result<()>
where T: Send
{
self.seek_to_start().await?;
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?;
let new_count = self.header.increment_deleted_count();
self.write_bytes(&encode::<usize>(&new_count)?).await?;
Ok(())
}
// ===Entry Header Manipulation===
// assumes we are at the start of valid entry.
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> {
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()>
where T: Send
{
let bytes: Vec<u8> = entry_header.encode()?;
self.write_bytes(&bytes).await?;
Ok(())
}
// ===Append Entry===
// Moves cursor to the end.
// Returns file position to the start of the new entry.
pub async fn append_entry(&mut self, entry: &Entry<T>) -> Result<FilePosition>
where T: Encode + Send
{
self.increment_total_count().await?;
let encoded_entry: Vec<u8> = entry.encode()?;
let file_position = self.seek_to_end().await?;
self.write_bytes(&encoded_entry).await?;
let eof_file_position: FilePosition = self.current_file_position().await?;
self.eof_file_position = eof_file_position;
Ok(file_position)
}
// ===Deletion===
pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()>
where T: Send
@ -461,18 +506,31 @@ impl <'cursor, T> WriteCursor<'cursor, T>
}
}
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> {
// ===Garbage Collection===
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
where T: Send
{
// TODO: What should be the policy? Counting size of garbage? Counting how many entries are
// garbage?
if self.header.deleted_count > 100 {
todo!()
} else {
Ok(())
self.initiate_garbage_collection().await?;
}
Ok(())
}
async fn initiate_garbage_collection(&mut self) -> Result<usize>
where T: Send
{
// We'll dump all alive entries into a new file.
let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?;
// In it there will be only the alive rows.
// Afterwards we swap the files, and delete the garbage.
todo!()
}
async fn spawn_cursor_to_intermediate_file(&self) -> Result<AppendOnlyCursor<T>>
where T: Send
{
let table_folder = self.header.table_folder.to_string();
let path_to_table = Path::new(&table_folder);
@ -480,36 +538,27 @@ impl <'cursor, T> WriteCursor<'cursor, T>
let intermediate_file: File = Store::<T>::create_empty_rows_file(path_to_rows, &self.header).await?;
let mut intermediate_header: StoreHeader = StoreHeader {
let intermediate_header: StoreHeader = StoreHeader {
table_folder,
number_of_columns: self.header.number_of_columns,
deleted_count: 0,
total_count: 0,
primary_column: self.header.primary_column,
indexed_columns: todo!()
indexed_columns: self.header.indexed_columns.clone(),
};
// Creates a new cursor to the intermediate file in which we'll dump the live entries.
// let mut cursor_to_intermediate = Self {
// header: &mut intermediate_header,
// file: intermediate_file,
// data_type: PhantomData::<T>,
// Creates a new (append) cursor to the intermediate file in which we'll dump the live entries.
let mut cursor_to_intermediate = AppendOnlyCursor {
header: intermediate_header,
file: intermediate_file,
data_type: PhantomData::<T>,
// eof_file_position: 0,
// };
let mut cursor_to_intermediate: Self = todo!();
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?;
cursor_to_intermediate.eof_file_position = eof_file_position;
// TODO: intermediate_header does not live long enough, so after garbage collection is
// done, we need to use it in the swap.
cursor_to_intermediate.header = todo!();
// In it there will be only the alive rows.
// Afterwards we swap the files, and delete the garbage.
todo!()
Ok(cursor_to_intermediate)
}
// ===Indexing===

View file

@ -9,7 +9,7 @@ mod store_header;
use crate::entry::{Entry, EntryDetailed};
use crate::storage_engine::{Store, FilePosition};
use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, PrimitiveCursor};
use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, CursorWithWriteStoreHeader};
type Data = u32;