Attempt at delete
This commit is contained in:
parent
3e7e8665fd
commit
53aa5a0127
2 changed files with 93 additions and 41 deletions
|
|
@ -24,10 +24,10 @@ async fn create_store() -> Result<Store<Data>> {
|
||||||
println!("THE BYTES: {:?}", store.read_all_bytes().await?);
|
println!("THE BYTES: {:?}", store.read_all_bytes().await?);
|
||||||
|
|
||||||
let mut cursor = store.cursor(AccessMode::Write).await.map_err(|e| e.to_io_or_panic())?;
|
let mut cursor = store.cursor(AccessMode::Write).await.map_err(|e| e.to_io_or_panic())?;
|
||||||
let entry0: Entry<u32> = Entry::new_deleted(vec![1, 2, 3, 4, 5]);
|
let entry0: Entry<u32> = Entry::new(vec![1, 2, 3, 4, 5]);
|
||||||
append_entry(&mut cursor, &entry0).await?;
|
append_entry(&mut cursor, &entry0).await?;
|
||||||
|
|
||||||
let entry1: Entry<u32> = Entry::new_deleted(vec![200, 200, 5, 6, 7]);
|
let entry1: Entry<u32> = Entry::new(vec![200, 200, 5, 6, 7]);
|
||||||
append_entry(&mut cursor, &entry1).await?;
|
append_entry(&mut cursor, &entry1).await?;
|
||||||
|
|
||||||
println!("{:?}", store.read_all_bytes().await?);
|
println!("{:?}", store.read_all_bytes().await?);
|
||||||
|
|
@ -83,6 +83,13 @@ async fn main() -> Result<()> {
|
||||||
let entry0: Entry<u32> = Entry::new(vec![99, 98, 97, 96, 95]);
|
let entry0: Entry<u32> = Entry::new(vec![99, 98, 97, 96, 95]);
|
||||||
append_entry(&mut cursor, &entry0).await?;
|
append_entry(&mut cursor, &entry0).await?;
|
||||||
|
|
||||||
|
let entry1: Entry<u32> = Entry::new(vec![50,50,50,50,50]);
|
||||||
|
let file_position = append_entry(&mut cursor, &entry1).await?;
|
||||||
|
println!("CURRENT FILE_POSITION = {}", file_position);
|
||||||
|
// Now file_position point to entry1.
|
||||||
|
// cursor.mark_deleted_at(file_position).await.map_err(|e| e.to_io_or_panic())?;
|
||||||
|
// cursor.seek_to(file_position).await.map_err(|e| e.to_io_or_panic())?;
|
||||||
|
|
||||||
cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?;
|
cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,10 @@ pub struct Store<T> {
|
||||||
// primary_index: Vec<Index<T, FilePosition>>>,
|
// primary_index: Vec<Index<T, FilePosition>>>,
|
||||||
// indexes: Vec<Option<Index<T, HashSet<FilePosition>>>>,
|
// indexes: Vec<Option<Index<T, HashSet<FilePosition>>>>,
|
||||||
// primary_index: Index<PositionOfValue, PositionOfRow>,
|
// primary_index: Index<PositionOfValue, PositionOfRow>,
|
||||||
|
|
||||||
|
// TODO: It's not good to have StoreHeader copied to all the cursors, since they may modify it.
|
||||||
|
// How to sync?
|
||||||
|
// All
|
||||||
header: StoreHeader,
|
header: StoreHeader,
|
||||||
data_type: PhantomData<T>,
|
data_type: PhantomData<T>,
|
||||||
|
|
||||||
|
|
@ -46,6 +50,9 @@ pub struct Store<T> {
|
||||||
// list
|
// list
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Read Cursors don't modify the rows nor Store Header.
|
||||||
|
// Write Cursors can modify both rows and Store Header.
|
||||||
|
// Probably should split these into two types. But they will have a lot of functionality in common.
|
||||||
pub struct Cursor<T> {
|
pub struct Cursor<T> {
|
||||||
header: StoreHeader,
|
header: StoreHeader,
|
||||||
file: File,
|
file: File,
|
||||||
|
|
@ -54,6 +61,14 @@ pub struct Cursor<T> {
|
||||||
eof_file_position: FilePosition,
|
eof_file_position: FilePosition,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct WriteCursor<'a, T> {
|
||||||
|
header: &'a mut StoreHeader,
|
||||||
|
file: File,
|
||||||
|
data_type: PhantomData<T>,
|
||||||
|
|
||||||
|
eof_file_position: FilePosition,
|
||||||
|
}
|
||||||
|
|
||||||
pub enum AccessMode {
|
pub enum AccessMode {
|
||||||
Read,
|
Read,
|
||||||
Write
|
Write
|
||||||
|
|
@ -87,10 +102,6 @@ impl StoreHeader {
|
||||||
pub struct EntryHeader {
|
pub struct EntryHeader {
|
||||||
is_deleted: bool,
|
is_deleted: bool,
|
||||||
}
|
}
|
||||||
impl EntryHeader {
|
|
||||||
const IS_DELETED_SIZE: usize = size_of::<bool>();
|
|
||||||
const HEADER_SIZE: usize = Self::IS_DELETED_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct EntryHeaderWithDataSize {
|
pub struct EntryHeaderWithDataSize {
|
||||||
|
|
@ -363,13 +374,15 @@ impl <T>Cursor<T> {
|
||||||
.await?,
|
.await?,
|
||||||
};
|
};
|
||||||
|
|
||||||
let cursor = Self {
|
let mut cursor = Self {
|
||||||
header: store.header.clone(),
|
header: store.header.clone(),
|
||||||
file,
|
file,
|
||||||
data_type: store.data_type,
|
data_type: store.data_type,
|
||||||
|
|
||||||
eof_file_position: 0,
|
eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data
|
||||||
};
|
};
|
||||||
|
cursor.seek_to_start_of_data().await?;
|
||||||
|
|
||||||
Ok(cursor)
|
Ok(cursor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -392,7 +405,8 @@ impl <T>Cursor<T> {
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> {
|
// TODO: make private
|
||||||
|
pub async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> {
|
||||||
self.file.seek(SeekFrom::Start(file_position)).await?;
|
self.file.seek(SeekFrom::Start(file_position)).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -411,27 +425,41 @@ impl <T>Cursor<T> {
|
||||||
self.seek_to(StoreHeader::SIZE as u64).await
|
self.seek_to(StoreHeader::SIZE as u64).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn current_file_position(&mut self) -> Result<FilePosition> {
|
// TODO: Make private
|
||||||
|
pub async fn current_file_position(&mut self) -> Result<FilePosition> {
|
||||||
let next_file_position: FilePosition = self.file.stream_position().await?;
|
let next_file_position: FilePosition = self.file.stream_position().await?;
|
||||||
Ok(next_file_position)
|
Ok(next_file_position)
|
||||||
}
|
}
|
||||||
|
|
||||||
// For debugging.
|
async fn is_at_eof(&mut self) -> Result<bool> {
|
||||||
// Moves file cursor to the end.
|
Ok(self.current_file_position().await? == self.eof_file_position)
|
||||||
pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>{
|
|
||||||
let mut bytes: Vec<u8> = vec![];
|
|
||||||
self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?;
|
|
||||||
self.file.read_to_end(&mut bytes).await?;
|
|
||||||
Ok(bytes)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> Result<bool> {
|
pub async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> Result<bool> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===Creation===
|
// ===Iteration===
|
||||||
|
// Assumes that the current file position is at a valid entry or EOF.
|
||||||
|
pub async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
|
||||||
|
where T: Decode
|
||||||
|
{
|
||||||
|
if self.is_at_eof().await? {
|
||||||
|
return Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
// ===Append Entry===
|
let header = self.read_entry_header().await?;
|
||||||
|
|
||||||
|
let mut data_bytes: Vec<u8> = vec![0; header.size_of_data()];
|
||||||
|
self.read_bytes(&mut data_bytes).await?;
|
||||||
|
let entry: EntryDetailed<T> =
|
||||||
|
EntryDetailed::decode(header, self.header.number_of_columns, &mut data_bytes)?;
|
||||||
|
|
||||||
|
Ok(Some(entry))
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// ===Store Header Manipulation===
|
||||||
async fn increment_total_count(&mut self) -> Result<()> {
|
async fn increment_total_count(&mut self) -> Result<()> {
|
||||||
self.seek_to_start().await?;
|
self.seek_to_start().await?;
|
||||||
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
|
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
|
||||||
|
|
@ -448,7 +476,18 @@ impl <T>Cursor<T> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ===Entry Header Manipulation===
|
||||||
|
// assumes we are at the start of the valid entry.
|
||||||
|
async fn set_entry_is_deleted_to(&mut self, is_deleted: bool) -> Result<()> {
|
||||||
|
self.seek_to(EntryHeaderWithDataSize::IS_DELETED_OFFSET as u64).await?;
|
||||||
|
self.write_bytes(&encode::<bool>(&is_deleted)?).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===Append Entry===
|
||||||
|
|
||||||
// Moves cursor to the end.
|
// Moves cursor to the end.
|
||||||
|
// Returns file position to the start of the new entry.
|
||||||
pub async fn append_entry(&mut self, entry: &Entry<T>) -> Result<FilePosition>
|
pub async fn append_entry(&mut self, entry: &Entry<T>) -> Result<FilePosition>
|
||||||
where T: Encode
|
where T: Encode
|
||||||
{
|
{
|
||||||
|
|
@ -467,21 +506,34 @@ impl <T>Cursor<T> {
|
||||||
|
|
||||||
// ===Deletion===
|
// ===Deletion===
|
||||||
pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> {
|
pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> {
|
||||||
self.increment_deleted_count().await?;
|
|
||||||
|
|
||||||
self.seek_to(file_position).await?;
|
self.seek_to(file_position).await?;
|
||||||
|
let entry_header = self.read_entry_header().await?;
|
||||||
|
if entry_header.is_deleted {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
self.increment_deleted_count().await?;
|
||||||
|
self.seek_to(file_position).await?;
|
||||||
|
self.set_entry_is_deleted_to(true).await?;
|
||||||
|
|
||||||
// TODO: Now you need to mutate the entry itself
|
self.attempt_garbage_collection_if_necessary().await?;
|
||||||
todo!()
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> {
|
||||||
|
// TODO: What should be the policy? Counting size of garbage? Counting how many entries are
|
||||||
|
// garbage?
|
||||||
|
if self.header.deleted_count > 100 {
|
||||||
|
todo!()
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ===Lookup===
|
// ===Lookup===
|
||||||
// WARNING: The cursor has to be at the start of an entry. Otherwise garbage data will be
|
// WARNING: The cursor has to be at the start of an entry. Otherwise garbage data will be
|
||||||
// decoded as an entry.
|
// decoded as an entry.
|
||||||
pub async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result<EntryHeaderWithDataSize> {
|
async fn read_entry_header(&mut self) -> Result<EntryHeaderWithDataSize> {
|
||||||
self.seek_to(file_position).await?;
|
|
||||||
|
|
||||||
let number_of_columns: usize = self.header.number_of_columns;
|
let number_of_columns: usize = self.header.number_of_columns;
|
||||||
let mut header_bytes: Vec<u8> = vec![0; EntryHeaderWithDataSize::size(number_of_columns)];
|
let mut header_bytes: Vec<u8> = vec![0; EntryHeaderWithDataSize::size(number_of_columns)];
|
||||||
self.read_bytes(&mut header_bytes).await?;
|
self.read_bytes(&mut header_bytes).await?;
|
||||||
|
|
@ -493,6 +545,11 @@ impl <T>Cursor<T> {
|
||||||
Ok(header)
|
Ok(header)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result<EntryHeaderWithDataSize> {
|
||||||
|
self.seek_to(file_position).await?;
|
||||||
|
self.read_entry_header().await
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn search_for(&mut self, index: T) -> Result<()>
|
pub async fn search_for(&mut self, index: T) -> Result<()>
|
||||||
where T: Send
|
where T: Send
|
||||||
{
|
{
|
||||||
|
|
@ -501,25 +558,13 @@ impl <T>Cursor<T> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Returns None when file_positoin == eof_file_position
|
// Returns None when file_positoin == eof_file_position
|
||||||
pub async fn read_entry_at(&mut self, file_position: FilePosition) -> Result<Option<EntryDetailed<T>>>
|
pub async fn read_entry_at(&mut self, file_position: FilePosition) -> Result<Option<EntryDetailed<T>>>
|
||||||
where T: Decode
|
where T: Decode
|
||||||
{
|
{
|
||||||
if file_position == self.eof_file_position {
|
self.seek_to(file_position).await?;
|
||||||
return Ok(None)
|
self.next().await
|
||||||
}
|
|
||||||
|
|
||||||
let header = self.read_entry_header_at(file_position).await?;
|
|
||||||
|
|
||||||
let mut data_bytes: Vec<u8> = vec![0; header.size_of_data()];
|
|
||||||
// TODO: Get rid of the println's
|
|
||||||
// println!("PREPARED_DATA_BYTES: {:?}", data_bytes);
|
|
||||||
self.read_bytes(&mut data_bytes).await?;
|
|
||||||
// println!("DATA_BYTES: {:?}", data_bytes);
|
|
||||||
let entry: EntryDetailed<T> =
|
|
||||||
EntryDetailed::decode(header, self.header.number_of_columns, &mut data_bytes)?;
|
|
||||||
|
|
||||||
Ok(Some(entry))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: This needs to be some sort of an iterator
|
// TODO: This needs to be some sort of an iterator
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue