Fix delete bug

This commit is contained in:
Yuriy Dupyn 2024-02-03 21:34:50 +01:00
parent 3bf04ae2d6
commit a345bf99c6
4 changed files with 107 additions and 62 deletions

View file

@ -10,12 +10,13 @@ use bincode::{Decode, Encode};
use crate::binary_coding::encode;
use crate::entry::{Entry, EntryDetailed};
use crate::entry_header::EntryHeaderWithDataSize;
use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader};
use crate::store_header::StoreHeader;
use crate::storage_engine::{Store, FilePosition, Result, ROWS_FILE_NAME};
#[async_trait]
trait PrimitiveCursor<T> {
// TODO: Make this private
pub trait PrimitiveCursor<T> {
fn file(&mut self) -> &mut File;
fn eof_file_position(&self) -> FilePosition;
@ -30,22 +31,23 @@ trait PrimitiveCursor<T> {
Ok(result)
}
async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> {
self.file().seek(SeekFrom::Start(file_position)).await?;
Ok(())
async fn seek_to(&mut self, file_position: FilePosition) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::Start(file_position)).await?;
Ok(file_position)
}
async fn seek_to_start(&mut self) -> Result<()> {
self.file().seek(SeekFrom::Start(0)).await?;
Ok(())
// Start of the file i.e. the Header, not the entries.
async fn seek_to_start(&mut self) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::Start(0)).await?;
Ok(file_position)
}
async fn seek_to_end(&mut self) -> Result<()> {
self.file().seek(SeekFrom::End(0)).await?;
Ok(())
async fn seek_to_end(&mut self) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::End(0)).await?;
Ok(file_position)
}
async fn seek_to_start_of_data(&mut self) -> Result<()> {
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
self.seek_to(StoreHeader::SIZE as u64).await
}
@ -55,7 +57,10 @@ trait PrimitiveCursor<T> {
}
async fn is_at_eof(&mut self) -> Result<bool> {
Ok(self.current_file_position().await? == self.eof_file_position())
let current_file_position = self.current_file_position().await?;
let eof_file_position = self.eof_file_position();
println!("IN EOF: current={}, eof_file_position={}", current_file_position, eof_file_position);
Ok(current_file_position == eof_file_position)
}
}
@ -90,9 +95,12 @@ pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> {
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
where T: Decode
{
println!("are we at eof?");
if self.is_at_eof().await? {
println!("YES");
return Ok(None)
}
println!("NO");
let entry_header = self.read_entry_header().await?;
@ -118,6 +126,7 @@ pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> {
async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error> {
let mut bytes: Vec<u8> = vec![];
self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?;
self.file().read_to_end(&mut bytes).await?;
Ok(bytes)
}
@ -191,8 +200,11 @@ impl <T> ReadCursor<T> {
file,
data_type: store.data_type,
eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
cursor.eof_file_position = eof_file_position;
cursor.seek_to_start_of_data().await?;
Ok(cursor)
@ -221,8 +233,11 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
file,
data_type: store.data_type,
eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
cursor.eof_file_position = eof_file_position;
cursor.seek_to_start_of_data().await?;
Ok(cursor)
@ -255,12 +270,10 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
}
// ===Entry Header Manipulation===
// assumes we are at the start of the valid entry.
async fn set_entry_is_deleted_to(&mut self, is_deleted: bool) -> Result<()>
where T: Send
{
self.seek_to(EntryHeaderWithDataSize::IS_DELETED_OFFSET as u64).await?;
self.write_bytes(&encode::<bool>(&is_deleted)?).await?;
// assumes we are at the start of valid entry.
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> {
let bytes: Vec<u8> = entry_header.encode()?;
self.write_bytes(&bytes).await?;
Ok(())
}
@ -274,8 +287,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
self.increment_total_count().await?;
let encoded_entry: Vec<u8> = entry.encode()?;
self.seek_to_end().await?;
let file_position: FilePosition = self.current_file_position().await?;
let file_position = self.seek_to_end().await?;
self.write_bytes(&encoded_entry).await?;
let eof_file_position: FilePosition = self.current_file_position().await?;
@ -289,13 +301,19 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
where T: Send
{
self.seek_to(file_position).await?;
let entry_header = self.read_entry_header().await?;
println!("Attempting to read the entry");
let mut entry_header = self.read_entry_header().await?;
println!("Entry Header == {:?}", entry_header);
if entry_header.is_deleted {
println!("Already deleted");
Ok(())
} else {
println!("Marking as deleted");
self.increment_deleted_count().await?;
self.seek_to(file_position).await?;
self.set_entry_is_deleted_to(true).await?;
entry_header.is_deleted = true;
self.set_new_entry_header(entry_header.into()).await?;
self.attempt_garbage_collection_if_necessary().await?;
Ok(())