From a345bf99c6b72db71f67a039675e28cc582d6238 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sat, 3 Feb 2024 21:34:50 +0100 Subject: [PATCH 1/6] Fix delete bug --- storage_engine/src/cursor.rs | 68 ++++++++++++++++---------- storage_engine/src/entry.rs | 9 +--- storage_engine/src/entry_header.rs | 16 ++++++- storage_engine/src/main.rs | 76 +++++++++++++++++++----------- 4 files changed, 107 insertions(+), 62 deletions(-) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 716abf7..763db68 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -10,12 +10,13 @@ use bincode::{Decode, Encode}; use crate::binary_coding::encode; use crate::entry::{Entry, EntryDetailed}; -use crate::entry_header::EntryHeaderWithDataSize; +use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader}; use crate::store_header::StoreHeader; use crate::storage_engine::{Store, FilePosition, Result, ROWS_FILE_NAME}; #[async_trait] -trait PrimitiveCursor { +// TODO: Make this private +pub trait PrimitiveCursor { fn file(&mut self) -> &mut File; fn eof_file_position(&self) -> FilePosition; @@ -30,22 +31,23 @@ trait PrimitiveCursor { Ok(result) } - async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> { - self.file().seek(SeekFrom::Start(file_position)).await?; - Ok(()) + async fn seek_to(&mut self, file_position: FilePosition) -> Result { + let file_position = self.file().seek(SeekFrom::Start(file_position)).await?; + Ok(file_position) } - async fn seek_to_start(&mut self) -> Result<()> { - self.file().seek(SeekFrom::Start(0)).await?; - Ok(()) + // Start of the file i.e. the Header, not the entries. + async fn seek_to_start(&mut self) -> Result { + let file_position = self.file().seek(SeekFrom::Start(0)).await?; + Ok(file_position) } - async fn seek_to_end(&mut self) -> Result<()> { - self.file().seek(SeekFrom::End(0)).await?; - Ok(()) + async fn seek_to_end(&mut self) -> Result { + let file_position = self.file().seek(SeekFrom::End(0)).await?; + Ok(file_position) } - async fn seek_to_start_of_data(&mut self) -> Result<()> { + async fn seek_to_start_of_data(&mut self) -> Result { self.seek_to(StoreHeader::SIZE as u64).await } @@ -55,7 +57,10 @@ trait PrimitiveCursor { } async fn is_at_eof(&mut self) -> Result { - Ok(self.current_file_position().await? == self.eof_file_position()) + let current_file_position = self.current_file_position().await?; + let eof_file_position = self.eof_file_position(); + println!("IN EOF: current={}, eof_file_position={}", current_file_position, eof_file_position); + Ok(current_file_position == eof_file_position) } } @@ -90,9 +95,12 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { async fn next(&mut self) -> Result>> where T: Decode { + println!("are we at eof?"); if self.is_at_eof().await? { + println!("YES"); return Ok(None) } + println!("NO"); let entry_header = self.read_entry_header().await?; @@ -118,6 +126,7 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> { let mut bytes: Vec = vec![]; + self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?; self.file().read_to_end(&mut bytes).await?; Ok(bytes) } @@ -191,8 +200,11 @@ impl ReadCursor { file, data_type: store.data_type, - eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data + eof_file_position: 0, }; + let eof_file_position: FilePosition = cursor.seek_to_end().await?; + cursor.eof_file_position = eof_file_position; + cursor.seek_to_start_of_data().await?; Ok(cursor) @@ -221,8 +233,11 @@ impl <'cursor, T> WriteCursor<'cursor, T> { file, data_type: store.data_type, - eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data + eof_file_position: 0, }; + let eof_file_position: FilePosition = cursor.seek_to_end().await?; + cursor.eof_file_position = eof_file_position; + cursor.seek_to_start_of_data().await?; Ok(cursor) @@ -255,12 +270,10 @@ impl <'cursor, T> WriteCursor<'cursor, T> { } // ===Entry Header Manipulation=== - // assumes we are at the start of the valid entry. - async fn set_entry_is_deleted_to(&mut self, is_deleted: bool) -> Result<()> - where T: Send - { - self.seek_to(EntryHeaderWithDataSize::IS_DELETED_OFFSET as u64).await?; - self.write_bytes(&encode::(&is_deleted)?).await?; + // assumes we are at the start of valid entry. + async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> { + let bytes: Vec = entry_header.encode()?; + self.write_bytes(&bytes).await?; Ok(()) } @@ -274,8 +287,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> { self.increment_total_count().await?; let encoded_entry: Vec = entry.encode()?; - self.seek_to_end().await?; - let file_position: FilePosition = self.current_file_position().await?; + let file_position = self.seek_to_end().await?; self.write_bytes(&encoded_entry).await?; let eof_file_position: FilePosition = self.current_file_position().await?; @@ -289,13 +301,19 @@ impl <'cursor, T> WriteCursor<'cursor, T> { where T: Send { self.seek_to(file_position).await?; - let entry_header = self.read_entry_header().await?; + println!("Attempting to read the entry"); + let mut entry_header = self.read_entry_header().await?; + println!("Entry Header == {:?}", entry_header); if entry_header.is_deleted { + println!("Already deleted"); Ok(()) } else { + println!("Marking as deleted"); self.increment_deleted_count().await?; self.seek_to(file_position).await?; - self.set_entry_is_deleted_to(true).await?; + + entry_header.is_deleted = true; + self.set_new_entry_header(entry_header.into()).await?; self.attempt_garbage_collection_if_necessary().await?; Ok(()) diff --git a/storage_engine/src/entry.rs b/storage_engine/src/entry.rs index 84d15f9..9d9de38 100644 --- a/storage_engine/src/entry.rs +++ b/storage_engine/src/entry.rs @@ -1,6 +1,6 @@ use bincode::{Decode, Encode}; -use crate::binary_coding::{encode, encode_sequence, encode_sequence_with_sizes, decode_sequence}; +use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence}; use crate::storage_engine::Result; use crate::error::{Error, DecodeErrorKind}; use crate::entry_header::{EntryHeader, EntryHeaderWithDataSize}; @@ -17,13 +17,6 @@ pub struct EntryDetailed { data: Vec, } -impl EntryHeader { - fn encode(self: &EntryHeader) -> Result> { - let result: Vec = encode(&self.is_deleted)?; - Ok(result) - } -} - impl Entry { pub fn new(data: Vec) -> Self { Self { header: EntryHeader { is_deleted: false }, data } diff --git a/storage_engine/src/entry_header.rs b/storage_engine/src/entry_header.rs index b1a9f7d..4aa904c 100644 --- a/storage_engine/src/entry_header.rs +++ b/storage_engine/src/entry_header.rs @@ -1,4 +1,4 @@ -use crate::binary_coding::{decode, decode_sequence}; +use crate::binary_coding::{decode, encode, decode_sequence}; use crate::storage_engine::Result; use crate::error::{Error, DecodeErrorKind}; use std::mem::size_of; @@ -14,6 +14,20 @@ pub struct EntryHeaderWithDataSize { pub data_sizes: Vec, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6 // bytes etc } + +impl EntryHeader { + pub fn encode(self: &EntryHeader) -> Result> { + let result: Vec = encode(&self.is_deleted)?; + Ok(result) + } +} + +impl From for EntryHeader { + fn from(entry: EntryHeaderWithDataSize) -> Self { + Self { is_deleted: entry.is_deleted, } + } +} + impl EntryHeaderWithDataSize { pub const IS_DELETED_OFFSET: usize = 0; pub const IS_DELETED_SIZE: usize = size_of::(); diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index e09e481..d530bd1 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -9,7 +9,7 @@ mod store_header; use crate::entry::{Entry, EntryDetailed}; use crate::storage_engine::{Store, FilePosition}; -use crate::cursor::{ReadCursor, WriteCursor}; +use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, PrimitiveCursor}; type Data = u32; @@ -20,8 +20,8 @@ type Result = std::result::Result; async fn create_store() -> Result> { let mut store: Store = Store::new(TABLE_PATH, 5, 0).await.map_err(|e| e.to_io_or_panic())?; println!("CREATED"); - println!("THE STORE: {:?}", store); - println!("THE BYTES: {:?}", store.read_all_bytes().await?); + // println!("THE STORE: {:?}", store); + // println!("THE BYTES: {:?}", store.read_all_bytes().await?); Ok(store) } @@ -29,8 +29,8 @@ async fn create_store() -> Result> { async fn connect_store() -> Result> { let mut store: Store = Store::connect(TABLE_PATH).await.map_err(|e| e.to_io_or_panic())?; println!("CONNECTED"); - println!("THE STORE: {:?}", store); - println!("THE BYTES: {:?}", store.read_all_bytes().await?); + // println!("THE STORE: {:?}", store); + // println!("THE BYTES: {:?}", store.read_all_bytes().await?); Ok(store) } @@ -60,6 +60,22 @@ async fn read_entry(cursor: &mut ReadCursor, file_position: FilePosition) todo!() } +async fn append_bunch_of_entries(store: &mut Store) -> Result<()> { + let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?; + let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); + append_entry(&mut cursor, &entry0).await?; + + let entry1: Entry = Entry::new(vec![200, 200, 5, 6, 7]); + append_entry(&mut cursor, &entry1).await?; + + // println!("{:?}", store.read_all_bytes().await?); + let entry2: Entry = Entry::new(vec![99, 98, 97, 96, 95]); + append_entry(&mut cursor, &entry2).await?; + + let entry3: Entry = Entry::new(vec![50,50,50,50,50]); + append_entry(&mut cursor, &entry3).await?; + Ok(()) +} #[tokio::main] async fn main() -> Result<()> { @@ -67,42 +83,46 @@ async fn main() -> Result<()> { let mut store: Store = create_or_connect().await?; + if store.header.total_count == 0 { + println!("INSERTING!"); + append_bunch_of_entries(&mut store).await?; + } + { let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?; - let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); - append_entry(&mut cursor, &entry0).await?; - let entry1: Entry = Entry::new(vec![200, 200, 5, 6, 7]); - append_entry(&mut cursor, &entry1).await?; + let entry: Entry = Entry::new(vec![60, 50, 40, 30, 20]); + // let file_position = append_entry(&mut cursor, &entry).await?; + // let file_position = 215; + // cursor.seek_to(file_position).await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", store.read_all_bytes().await?); - let entry2: Entry = Entry::new(vec![99, 98, 97, 96, 95]); - append_entry(&mut cursor, &entry2).await?; + // let entry_header = cursor.read_entry_header().await.map_err(|e| e.to_io_or_panic())?; + // println!("entry header = {:?}", entry_header); - let entry3: Entry = Entry::new(vec![50,50,50,50,50]); - append_entry(&mut cursor, &entry3).await?; + // println!("FILE POSITION == {}", file_position); + // cursor.mark_deleted_at(file_position).await.map_err(|e| e.to_io_or_panic())?; + // let entry_header = cursor.read_entry_header().await.map_err(|e| e.to_io_or_panic())?; + // println!("entry header after delete = {:?}", entry_header); } // println!("{:?}", store); // println!("{:?}", store.read_all_bytes().await?); - // let entry0: Entry = Entry::new(vec![99, 98, 97, 96, 95]); - // append_entry(&mut cursor, &entry0).await?; + let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; - // let entry1: Entry = Entry::new(vec![50,50,50,50,50]); - // let file_position = append_entry(&mut cursor, &entry1).await?; - // println!("CURRENT FILE_POSITION = {}", file_position); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); - // Now file_position point to entry1. - // cursor.mark_deleted_at(file_position).await.map_err(|e| e.to_io_or_panic())?; - // cursor.seek_to(file_position).await.map_err(|e| e.to_io_or_panic())?; + cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; - // let entry2: StoreEntry = StoreEntry::new_deleted(vec![3, 2, 1]); - // let cursor2 = store.append_entry(&entry2).await.map_err(|e| e.to_io_or_panic())?; - // println!("cursor2 = {}", cursor2); - - println!("{:?}", store); - println!("{:?}", store.read_all_bytes().await?); println!("DONE"); From ff378b1dd83bd06a4168c97685e7d95c3f4b3033 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sat, 3 Feb 2024 22:47:29 +0100 Subject: [PATCH 2/6] Add brute-force search --- storage_engine/src/cursor.rs | 100 ++++++++++++++++++++++++----- storage_engine/src/entry.rs | 4 +- storage_engine/src/entry_header.rs | 14 +++- storage_engine/src/main.rs | 48 ++++++++++---- 4 files changed, 135 insertions(+), 31 deletions(-) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 763db68..42584b7 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -2,17 +2,18 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; use tokio::fs::{File, OpenOptions}; use std::path::Path; use std::marker::PhantomData; +use crate::error::{Error, DecodeErrorKind}; use async_trait::async_trait; use bincode; use bincode::{Decode, Encode}; -use crate::binary_coding::encode; +use crate::binary_coding::{encode, decode}; use crate::entry::{Entry, EntryDetailed}; use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader}; use crate::store_header::StoreHeader; -use crate::storage_engine::{Store, FilePosition, Result, ROWS_FILE_NAME}; +use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME}; #[async_trait] // TODO: Make this private @@ -51,6 +52,12 @@ pub trait PrimitiveCursor { self.seek_to(StoreHeader::SIZE as u64).await } + // Seeks from current position by offset and returns new file position + async fn seek_by(&mut self, offset: i64) -> Result { + let file_position = self.file().seek(SeekFrom::Current(offset)).await?; + Ok(file_position) + } + async fn current_file_position(&mut self) -> Result { let next_file_position: FilePosition = self.file().stream_position().await?; Ok(next_file_position) @@ -59,7 +66,6 @@ pub trait PrimitiveCursor { async fn is_at_eof(&mut self) -> Result { let current_file_position = self.current_file_position().await?; let eof_file_position = self.eof_file_position(); - println!("IN EOF: current={}, eof_file_position={}", current_file_position, eof_file_position); Ok(current_file_position == eof_file_position) } } @@ -91,18 +97,31 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { } // ===Iteration=== - // Assumes that the current file position is at a valid entry or EOF. + // The following functions assume that the current file position is at a valid entry or EOF. + + + // WARNING: This moves the file_position to start of the data, so you can't just call + // next_entry_header() a bunch of times. You must move the cursor! + async fn next_entry_header(&mut self) -> Result> { + if self.is_at_eof().await? { + return Ok(None) + } + + let entry_header = self.read_entry_header().await?; + + Ok(Some(entry_header)) + } + + // This is meant to be used after next_entry_header() is called. + async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result{ + let file_position = self.seek_by(entry_header.size_of_data() as i64).await?; + Ok(file_position) + } + async fn next(&mut self) -> Result>> where T: Decode { - println!("are we at eof?"); - if self.is_at_eof().await? { - println!("YES"); - return Ok(None) - } - println!("NO"); - - let entry_header = self.read_entry_header().await?; + let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; let mut data_bytes: Vec = vec![0; entry_header.size_of_data()]; self.read_bytes(&mut data_bytes).await?; @@ -112,6 +131,59 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { Ok(Some(entry)) } + // Like next, but only reads the column, not the whole entry. + async fn next_at_column(&mut self, column: Column) -> Result> + where T: Decode + Send + { + let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; + let file_position_at_start_of_data = self.current_file_position().await?; + + // figuring out how much to decode + let column_offset = entry_header.offset_of_column(column); + self.seek_by(column_offset as i64).await?; + + // reading and decoding + let mut bytes: Vec = vec![0; entry_header.data_sizes[column as usize]]; + self.read_bytes(&mut bytes).await?; + let (value, _) = + decode::(&bytes[..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; + + // jumping to next entry + self.seek_to(file_position_at_start_of_data).await?; + self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?; + + Ok(Some((entry_header, value))) + } + + async fn next_alive(&mut self) -> Result>> + where T: Decode + { + while let Some(entry) = self.next().await? { + if !entry.header.is_deleted { + return Ok(Some(entry)) + } + } + Ok(None) + } + + // ===Search=== + async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> + where T: Decode + PartialEq + Send + Sync + { + let mut file_position = self.current_file_position().await?; + while let Some((_, t)) = self.next_at_column(column).await? { + if &t == t0 { + // go back and decode the whole entry + self.seek_to(file_position).await?; + return self.next().await + } else { + file_position = self.current_file_position().await?; + } + } + Ok(None) + } + // ===Debugging=== async fn read_entries(&mut self) -> Result<()> where T: Decode + std::fmt::Debug @@ -301,14 +373,10 @@ impl <'cursor, T> WriteCursor<'cursor, T> { where T: Send { self.seek_to(file_position).await?; - println!("Attempting to read the entry"); let mut entry_header = self.read_entry_header().await?; - println!("Entry Header == {:?}", entry_header); if entry_header.is_deleted { - println!("Already deleted"); Ok(()) } else { - println!("Marking as deleted"); self.increment_deleted_count().await?; self.seek_to(file_position).await?; diff --git a/storage_engine/src/entry.rs b/storage_engine/src/entry.rs index 9d9de38..90d3dca 100644 --- a/storage_engine/src/entry.rs +++ b/storage_engine/src/entry.rs @@ -13,8 +13,8 @@ pub struct Entry { #[derive(Debug)] pub struct EntryDetailed { - header: EntryHeaderWithDataSize, - data: Vec, + pub header: EntryHeaderWithDataSize, + pub data: Vec, } impl Entry { diff --git a/storage_engine/src/entry_header.rs b/storage_engine/src/entry_header.rs index 4aa904c..7c8d626 100644 --- a/storage_engine/src/entry_header.rs +++ b/storage_engine/src/entry_header.rs @@ -1,5 +1,5 @@ use crate::binary_coding::{decode, encode, decode_sequence}; -use crate::storage_engine::Result; +use crate::storage_engine::{Result, Column}; use crate::error::{Error, DecodeErrorKind}; use std::mem::size_of; @@ -42,6 +42,18 @@ impl EntryHeaderWithDataSize { self.data_sizes.iter().sum() } + pub fn offset_of_column(&self, column: Column) -> usize { + let mut sum = 0; + for (i, size) in self.data_sizes.iter().enumerate() { + if i < column as usize { + sum += size; + } else { + break + } + } + sum + } + pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { let (is_deleted, _) = decode::(&bytes) diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index d530bd1..cf32a88 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -108,20 +108,44 @@ async fn main() -> Result<()> { // println!("{:?}", store); // println!("{:?}", store.read_all_bytes().await?); - let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + { + let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + } - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); + { + let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + let column = 2; + let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + } + + { + let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + let column = 3; + let t0 = 6; + let x = cursor.find_first_eq_bruteforce(column, &t0).await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + } - cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; From 0f98903759bedd148c1d823e2beb4aef9e16f7cf Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sat, 3 Feb 2024 22:54:55 +0100 Subject: [PATCH 3/6] Add file_position to EntryDetailed --- storage_engine/src/cursor.rs | 3 ++- storage_engine/src/entry.rs | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 42584b7..b0ea017 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -121,12 +121,13 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { async fn next(&mut self) -> Result>> where T: Decode { + let file_position = self.current_file_position().await?; let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; let mut data_bytes: Vec = vec![0; entry_header.size_of_data()]; self.read_bytes(&mut data_bytes).await?; let entry: EntryDetailed = - EntryDetailed::decode(entry_header, self.header().number_of_columns, &mut data_bytes)?; + EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?; Ok(Some(entry)) } diff --git a/storage_engine/src/entry.rs b/storage_engine/src/entry.rs index 90d3dca..c628979 100644 --- a/storage_engine/src/entry.rs +++ b/storage_engine/src/entry.rs @@ -1,7 +1,7 @@ use bincode::{Decode, Encode}; use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence}; -use crate::storage_engine::Result; +use crate::storage_engine::{Result, FilePosition}; use crate::error::{Error, DecodeErrorKind}; use crate::entry_header::{EntryHeader, EntryHeaderWithDataSize}; @@ -14,6 +14,7 @@ pub struct Entry { #[derive(Debug)] pub struct EntryDetailed { pub header: EntryHeaderWithDataSize, + pub file_position: FilePosition, pub data: Vec, } @@ -40,11 +41,11 @@ impl Entry { } impl EntryDetailed { - pub fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result + pub fn decode(header: EntryHeaderWithDataSize, file_position: FilePosition, number_of_columns: usize, bytes: &[u8]) -> Result where T: Decode { let data = decode_sequence::(number_of_columns, bytes) .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; - Ok(EntryDetailed { header, data }) + Ok(EntryDetailed { header, file_position, data }) } } From daa39850f0c54b553c82bcff4439ff25ed7d807c Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sat, 3 Feb 2024 23:45:55 +0100 Subject: [PATCH 4/6] Prepare for garbage collection --- storage_engine/src/cursor.rs | 86 ++++++++++++++++++++++++++-- storage_engine/src/storage_engine.rs | 52 +++++++++-------- storage_engine/src/store_header.rs | 5 +- 3 files changed, 115 insertions(+), 28 deletions(-) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index b0ea017..8afe89f 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -13,7 +13,7 @@ use crate::binary_coding::{encode, decode}; use crate::entry::{Entry, EntryDetailed}; use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader}; use crate::store_header::StoreHeader; -use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME}; +use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; #[async_trait] // TODO: Make this private @@ -261,7 +261,7 @@ impl ReadCursor { pub async fn new(store: &Store) -> Result where T: Send { - let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME); + let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let file: File = OpenOptions::new() .read(true) @@ -293,7 +293,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> { pub async fn new<'store: 'cursor>(store: &'store mut Store) -> Result where T: Send { - let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME); + let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let file: File = OpenOptions::new() .read(true) @@ -315,7 +315,33 @@ impl <'cursor, T> WriteCursor<'cursor, T> { Ok(cursor) } - + + pub async fn connect<'header: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader) -> Result + where T: Send + { + let file: File = + OpenOptions::new() + .read(true) + .write(true) + .open(path_to_rows) + .await?; + + let mut cursor = Self { + header, + file, + data_type: PhantomData::, + + eof_file_position: 0, + }; + let eof_file_position: FilePosition = cursor.seek_to_end().await?; + cursor.eof_file_position = eof_file_position; + + cursor.seek_to_start_of_data().await?; + + Ok(cursor) + } + + // ===Primitive Operations=== async fn write_bytes(&mut self, bytes: &[u8]) -> Result { Ok(self.file.write(bytes).await?) @@ -389,6 +415,18 @@ impl <'cursor, T> WriteCursor<'cursor, T> { } } + async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result>> + where T: Decode + PartialEq + Send + Sync + { + let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?; + if let Some(entry) = maybe_entry { + self.mark_deleted_at(entry.file_position).await?; + Ok(Some(entry)) + } else { + Ok(maybe_entry) + } + } + async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> { // TODO: What should be the policy? Counting size of garbage? Counting how many entries are // garbage? @@ -398,6 +436,46 @@ impl <'cursor, T> WriteCursor<'cursor, T> { Ok(()) } } + + async fn initiate_garbage_collection(&mut self) -> Result + where T: Send + { + let table_folder = self.header.table_folder.to_string(); + let path_to_table = Path::new(&table_folder); + let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); + + let intermediate_file: File = Store::::create_empty_rows_file(path_to_rows, &self.header).await?; + + let mut intermediate_header: StoreHeader = StoreHeader { + table_folder, + number_of_columns: self.header.number_of_columns, + deleted_count: 0, + total_count: 0, + primary_column: self.header.primary_column + }; + + // Creates a new cursor to the intermediate file in which we'll dump the live entries. + // let mut cursor_to_intermediate = Self { + // header: &mut intermediate_header, + // file: intermediate_file, + // data_type: PhantomData::, + + // eof_file_position: 0, + // }; + let mut cursor_to_intermediate: Self = todo!(); + let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?; + cursor_to_intermediate.eof_file_position = eof_file_position; + + + + // TODO: intermediate_header does not live long enough, so after garbage collection is + // done, we need to use it in the swap. + cursor_to_intermediate.header = todo!(); + + // In it there will be only the alive rows. + // Afterwards we swap the files, and delete the garbage. + todo!() + } } diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index 4284f74..4c06c4d 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -1,7 +1,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::fs::{File, OpenOptions, DirBuilder}; use tokio::fs; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::marker::PhantomData; use async_trait::async_trait; @@ -28,7 +28,6 @@ pub struct Store { // {write: 0, read: n + 1} ~> {write:0, read: n} // destroy read // {write: 0, read: 0} ~> {write: 1, read: 0} // create write // {write: 1, read: 0} ~> {write: 0, read: 0} // destroy write - pub table_folder: String, // primary_index: Vec>>, // indexes: Vec>>>, // primary_index: Index, @@ -63,6 +62,7 @@ impl SomethingSupportingLeq for Store } pub const ROWS_FILE_NAME: &'static str = "rows"; +pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate"; impl Store { // ===Creation=== @@ -72,6 +72,31 @@ impl Store { DirBuilder::new() .create(path_to_table).await?; + let header = StoreHeader { + table_folder: table_folder.to_string(), + number_of_columns, + deleted_count: 0, + total_count: 0, + primary_column, + }; + + // We don't need the file right now. Only cursors will later open it. + Self::create_empty_rows_file(path_to_rows, &header).await?; + + // TODO: indexes + // let index: Index = Index::new( + // &format!("rows_{}", primary_column.to_string()), + // ).await?; + + let store = Self { + header, + data_type: PhantomData::, + }; + + Ok(store) + } + + pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result { let mut file: File = OpenOptions::new() .write(true) @@ -80,28 +105,10 @@ impl Store { .open(path_to_rows) .await?; - let header = StoreHeader { - number_of_columns, - deleted_count: 0, - total_count: 0, - primary_column, - }; let encoded_header: Vec = header.encode()?; file.write(&encoded_header).await?; - - // TODO: indexes - // let index: Index = Index::new( - // &format!("rows_{}", primary_column.to_string()), - // ).await?; - - let store = Self { - table_folder: table_folder.to_string(), - header, - data_type: PhantomData::, - }; - - Ok(store) + Ok(file) } pub async fn connect(table_folder: &str) -> Result @@ -121,10 +128,9 @@ impl Store { // header. let mut header_bytes = StoreHeader::decode_buffer(); file.read_exact(&mut header_bytes).await?; - let header = StoreHeader::decode(&mut header_bytes).await?; + let header = StoreHeader::decode(table_folder, &mut header_bytes).await?; let store = Self { - table_folder: table_folder.to_string(), header, data_type: PhantomData::, }; diff --git a/storage_engine/src/store_header.rs b/storage_engine/src/store_header.rs index fa84478..73cbb55 100644 --- a/storage_engine/src/store_header.rs +++ b/storage_engine/src/store_header.rs @@ -5,6 +5,8 @@ use std::mem::size_of; #[derive(Debug, Clone)] pub struct StoreHeader { + pub table_folder: String, // This one is not encoded into the file + pub number_of_columns: usize, pub deleted_count: usize, pub total_count: usize, @@ -35,7 +37,7 @@ impl StoreHeader { [0; StoreHeader::SIZE] } - pub async fn decode(result: &mut [u8]) -> Result { + pub async fn decode(table_folder: &str, result: &mut [u8]) -> Result { let (number_of_columns, _) = decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; @@ -49,6 +51,7 @@ impl StoreHeader { decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; let header = StoreHeader { + table_folder: table_folder.to_string(), number_of_columns, deleted_count, total_count, From 017f34bafa2049bb53cd5e0809a385ea9fecace2 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sun, 4 Feb 2024 14:07:29 +0100 Subject: [PATCH 5/6] Remove redundant comment --- storage_engine/src/cursor.rs | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 8afe89f..c9913b9 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -477,23 +477,3 @@ impl <'cursor, T> WriteCursor<'cursor, T> { todo!() } } - - - -// TODO - // pub async fn search_for(&mut self, index: T) -> Result<()> - // where T: Send - // { - // // let index = self.primary_index.borrow_mut(); - // // let x = index.lookup(self, 123).await?; - // todo!() - // } - // pub async fn search_for_entry_with_id(&mut self, id: T) -> Result>> { - // // TODO: make call to the primary index - // todo!() - // } - - // // TODO: This needs to be some sort of an iterator - // pub async fn get_all_eq(&self, column: Column, value: T) -> Result>> { - // todo!() - // } From dac888dc51254a6b1115ea63dfc2cd03389fb03b Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sun, 4 Feb 2024 15:46:43 +0100 Subject: [PATCH 6/6] Move concrete Cursor definitions on top --- storage_engine/src/cursor.rs | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index c9913b9..05f14db 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -15,6 +15,25 @@ use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader}; use crate::store_header::StoreHeader; use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; +// ===Concrete Cursors=== +pub struct ReadCursor { + header: StoreHeader, + file: File, + data_type: PhantomData, + + eof_file_position: FilePosition, +} + +pub struct WriteCursor<'a, T> { + header: &'a mut StoreHeader, + file: File, + data_type: PhantomData, + + eof_file_position: FilePosition, +} + + +// ===Traits=== #[async_trait] // TODO: Make this private pub trait PrimitiveCursor { @@ -206,23 +225,6 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { } -// ===Concrete Cursors=== -pub struct ReadCursor { - header: StoreHeader, - file: File, - data_type: PhantomData, - - eof_file_position: FilePosition, -} - -pub struct WriteCursor<'a, T> { - header: &'a mut StoreHeader, - file: File, - data_type: PhantomData, - - eof_file_position: FilePosition, -} - // ===PrimitiveCursor=== impl PrimitiveCursor for ReadCursor { fn file(&mut self) -> &mut File {