This commit is contained in:
Yuriy Dupyn 2024-02-05 03:29:42 +01:00
parent c0a3ee08b8
commit 1618bffb85
6 changed files with 22 additions and 152 deletions

View file

@ -49,8 +49,7 @@ pub struct AppendOnlyCursor<T> {
// ===Traits===
#[async_trait]
// TODO: Make this private
pub trait PrimitiveCursor<T> {
pub(crate) trait PrimitiveCursor<T> {
fn file(&mut self) -> &mut File;
fn eof_file_position(&self) -> FilePosition;
@ -100,7 +99,7 @@ pub trait PrimitiveCursor<T> {
}
#[async_trait]
pub trait PrimitiveWriteCursor<T>: PrimitiveCursor<T> {
pub(crate) trait PrimitiveWriteCursor<T>: PrimitiveCursor<T> {
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
Ok(self.file().write(bytes).await?)
}
@ -536,8 +535,6 @@ impl <'cursor, T> ReadCursor<'cursor, T> {
impl <'cursor, T> WriteCursor<'cursor, T>
// TODO: Consider adding this manually to wher eit is really needed
where T: Sync
{
// 'store lives atleast as long as 'cursor
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
@ -566,31 +563,6 @@ impl <'cursor, T> WriteCursor<'cursor, T>
Ok(cursor)
}
pub async fn connect<'header: 'cursor, 'indexes: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader, indexes: &'indexes mut StoreIndexes<T>) -> Result<Self>
where T: Send
{
let file: File =
OpenOptions::new()
.read(true)
.write(true)
.open(path_to_rows)
.await?;
let mut cursor = Self {
header,
file,
indexes,
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
cursor.eof_file_position = eof_file_position;
cursor.seek_to_start_of_data().await?;
Ok(cursor)
}
// ===Entry Header Manipulation===
// assumes we are at the start of valid entry.
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()>
@ -707,7 +679,7 @@ impl <'cursor, T> WriteCursor<'cursor, T>
// ===Garbage Collection===
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
where T: Send + Decode + Encode + Clone + Ord
where T: Send + Sync + Decode + Encode + Clone + Ord
{
if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER {
println!("=======START GARBAGE COLLETOR====");
@ -717,7 +689,7 @@ impl <'cursor, T> WriteCursor<'cursor, T>
}
pub async fn initiate_garbage_collection(&mut self) -> Result<usize>
where T: Send + Decode + Encode + Clone + Ord
where T: Send + Sync + Decode + Encode + Clone + Ord
{
let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?;
// Since garbage collection changes FilePositions of live entries, we need to update the