diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 716abf7..763db68 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -10,12 +10,13 @@ use bincode::{Decode, Encode}; use crate::binary_coding::encode; use crate::entry::{Entry, EntryDetailed}; -use crate::entry_header::EntryHeaderWithDataSize; +use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader}; use crate::store_header::StoreHeader; use crate::storage_engine::{Store, FilePosition, Result, ROWS_FILE_NAME}; #[async_trait] -trait PrimitiveCursor { +// TODO: Make this private +pub trait PrimitiveCursor { fn file(&mut self) -> &mut File; fn eof_file_position(&self) -> FilePosition; @@ -30,22 +31,23 @@ trait PrimitiveCursor { Ok(result) } - async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> { - self.file().seek(SeekFrom::Start(file_position)).await?; - Ok(()) + async fn seek_to(&mut self, file_position: FilePosition) -> Result { + let file_position = self.file().seek(SeekFrom::Start(file_position)).await?; + Ok(file_position) } - async fn seek_to_start(&mut self) -> Result<()> { - self.file().seek(SeekFrom::Start(0)).await?; - Ok(()) + // Start of the file i.e. the Header, not the entries. + async fn seek_to_start(&mut self) -> Result { + let file_position = self.file().seek(SeekFrom::Start(0)).await?; + Ok(file_position) } - async fn seek_to_end(&mut self) -> Result<()> { - self.file().seek(SeekFrom::End(0)).await?; - Ok(()) + async fn seek_to_end(&mut self) -> Result { + let file_position = self.file().seek(SeekFrom::End(0)).await?; + Ok(file_position) } - async fn seek_to_start_of_data(&mut self) -> Result<()> { + async fn seek_to_start_of_data(&mut self) -> Result { self.seek_to(StoreHeader::SIZE as u64).await } @@ -55,7 +57,10 @@ trait PrimitiveCursor { } async fn is_at_eof(&mut self) -> Result { - Ok(self.current_file_position().await? == self.eof_file_position()) + let current_file_position = self.current_file_position().await?; + let eof_file_position = self.eof_file_position(); + println!("IN EOF: current={}, eof_file_position={}", current_file_position, eof_file_position); + Ok(current_file_position == eof_file_position) } } @@ -90,9 +95,12 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { async fn next(&mut self) -> Result>> where T: Decode { + println!("are we at eof?"); if self.is_at_eof().await? { + println!("YES"); return Ok(None) } + println!("NO"); let entry_header = self.read_entry_header().await?; @@ -118,6 +126,7 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> { let mut bytes: Vec = vec![]; + self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?; self.file().read_to_end(&mut bytes).await?; Ok(bytes) } @@ -191,8 +200,11 @@ impl ReadCursor { file, data_type: store.data_type, - eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data + eof_file_position: 0, }; + let eof_file_position: FilePosition = cursor.seek_to_end().await?; + cursor.eof_file_position = eof_file_position; + cursor.seek_to_start_of_data().await?; Ok(cursor) @@ -221,8 +233,11 @@ impl <'cursor, T> WriteCursor<'cursor, T> { file, data_type: store.data_type, - eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data + eof_file_position: 0, }; + let eof_file_position: FilePosition = cursor.seek_to_end().await?; + cursor.eof_file_position = eof_file_position; + cursor.seek_to_start_of_data().await?; Ok(cursor) @@ -255,12 +270,10 @@ impl <'cursor, T> WriteCursor<'cursor, T> { } // ===Entry Header Manipulation=== - // assumes we are at the start of the valid entry. - async fn set_entry_is_deleted_to(&mut self, is_deleted: bool) -> Result<()> - where T: Send - { - self.seek_to(EntryHeaderWithDataSize::IS_DELETED_OFFSET as u64).await?; - self.write_bytes(&encode::(&is_deleted)?).await?; + // assumes we are at the start of valid entry. + async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> { + let bytes: Vec = entry_header.encode()?; + self.write_bytes(&bytes).await?; Ok(()) } @@ -274,8 +287,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> { self.increment_total_count().await?; let encoded_entry: Vec = entry.encode()?; - self.seek_to_end().await?; - let file_position: FilePosition = self.current_file_position().await?; + let file_position = self.seek_to_end().await?; self.write_bytes(&encoded_entry).await?; let eof_file_position: FilePosition = self.current_file_position().await?; @@ -289,13 +301,19 @@ impl <'cursor, T> WriteCursor<'cursor, T> { where T: Send { self.seek_to(file_position).await?; - let entry_header = self.read_entry_header().await?; + println!("Attempting to read the entry"); + let mut entry_header = self.read_entry_header().await?; + println!("Entry Header == {:?}", entry_header); if entry_header.is_deleted { + println!("Already deleted"); Ok(()) } else { + println!("Marking as deleted"); self.increment_deleted_count().await?; self.seek_to(file_position).await?; - self.set_entry_is_deleted_to(true).await?; + + entry_header.is_deleted = true; + self.set_new_entry_header(entry_header.into()).await?; self.attempt_garbage_collection_if_necessary().await?; Ok(()) diff --git a/storage_engine/src/entry.rs b/storage_engine/src/entry.rs index 84d15f9..9d9de38 100644 --- a/storage_engine/src/entry.rs +++ b/storage_engine/src/entry.rs @@ -1,6 +1,6 @@ use bincode::{Decode, Encode}; -use crate::binary_coding::{encode, encode_sequence, encode_sequence_with_sizes, decode_sequence}; +use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence}; use crate::storage_engine::Result; use crate::error::{Error, DecodeErrorKind}; use crate::entry_header::{EntryHeader, EntryHeaderWithDataSize}; @@ -17,13 +17,6 @@ pub struct EntryDetailed { data: Vec, } -impl EntryHeader { - fn encode(self: &EntryHeader) -> Result> { - let result: Vec = encode(&self.is_deleted)?; - Ok(result) - } -} - impl Entry { pub fn new(data: Vec) -> Self { Self { header: EntryHeader { is_deleted: false }, data } diff --git a/storage_engine/src/entry_header.rs b/storage_engine/src/entry_header.rs index b1a9f7d..4aa904c 100644 --- a/storage_engine/src/entry_header.rs +++ b/storage_engine/src/entry_header.rs @@ -1,4 +1,4 @@ -use crate::binary_coding::{decode, decode_sequence}; +use crate::binary_coding::{decode, encode, decode_sequence}; use crate::storage_engine::Result; use crate::error::{Error, DecodeErrorKind}; use std::mem::size_of; @@ -14,6 +14,20 @@ pub struct EntryHeaderWithDataSize { pub data_sizes: Vec, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6 // bytes etc } + +impl EntryHeader { + pub fn encode(self: &EntryHeader) -> Result> { + let result: Vec = encode(&self.is_deleted)?; + Ok(result) + } +} + +impl From for EntryHeader { + fn from(entry: EntryHeaderWithDataSize) -> Self { + Self { is_deleted: entry.is_deleted, } + } +} + impl EntryHeaderWithDataSize { pub const IS_DELETED_OFFSET: usize = 0; pub const IS_DELETED_SIZE: usize = size_of::(); diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index e09e481..d530bd1 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -9,7 +9,7 @@ mod store_header; use crate::entry::{Entry, EntryDetailed}; use crate::storage_engine::{Store, FilePosition}; -use crate::cursor::{ReadCursor, WriteCursor}; +use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, PrimitiveCursor}; type Data = u32; @@ -20,8 +20,8 @@ type Result = std::result::Result; async fn create_store() -> Result> { let mut store: Store = Store::new(TABLE_PATH, 5, 0).await.map_err(|e| e.to_io_or_panic())?; println!("CREATED"); - println!("THE STORE: {:?}", store); - println!("THE BYTES: {:?}", store.read_all_bytes().await?); + // println!("THE STORE: {:?}", store); + // println!("THE BYTES: {:?}", store.read_all_bytes().await?); Ok(store) } @@ -29,8 +29,8 @@ async fn create_store() -> Result> { async fn connect_store() -> Result> { let mut store: Store = Store::connect(TABLE_PATH).await.map_err(|e| e.to_io_or_panic())?; println!("CONNECTED"); - println!("THE STORE: {:?}", store); - println!("THE BYTES: {:?}", store.read_all_bytes().await?); + // println!("THE STORE: {:?}", store); + // println!("THE BYTES: {:?}", store.read_all_bytes().await?); Ok(store) } @@ -60,6 +60,22 @@ async fn read_entry(cursor: &mut ReadCursor, file_position: FilePosition) todo!() } +async fn append_bunch_of_entries(store: &mut Store) -> Result<()> { + let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?; + let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); + append_entry(&mut cursor, &entry0).await?; + + let entry1: Entry = Entry::new(vec![200, 200, 5, 6, 7]); + append_entry(&mut cursor, &entry1).await?; + + // println!("{:?}", store.read_all_bytes().await?); + let entry2: Entry = Entry::new(vec![99, 98, 97, 96, 95]); + append_entry(&mut cursor, &entry2).await?; + + let entry3: Entry = Entry::new(vec![50,50,50,50,50]); + append_entry(&mut cursor, &entry3).await?; + Ok(()) +} #[tokio::main] async fn main() -> Result<()> { @@ -67,42 +83,46 @@ async fn main() -> Result<()> { let mut store: Store = create_or_connect().await?; + if store.header.total_count == 0 { + println!("INSERTING!"); + append_bunch_of_entries(&mut store).await?; + } + { let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?; - let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); - append_entry(&mut cursor, &entry0).await?; - let entry1: Entry = Entry::new(vec![200, 200, 5, 6, 7]); - append_entry(&mut cursor, &entry1).await?; + let entry: Entry = Entry::new(vec![60, 50, 40, 30, 20]); + // let file_position = append_entry(&mut cursor, &entry).await?; + // let file_position = 215; + // cursor.seek_to(file_position).await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", store.read_all_bytes().await?); - let entry2: Entry = Entry::new(vec![99, 98, 97, 96, 95]); - append_entry(&mut cursor, &entry2).await?; + // let entry_header = cursor.read_entry_header().await.map_err(|e| e.to_io_or_panic())?; + // println!("entry header = {:?}", entry_header); - let entry3: Entry = Entry::new(vec![50,50,50,50,50]); - append_entry(&mut cursor, &entry3).await?; + // println!("FILE POSITION == {}", file_position); + // cursor.mark_deleted_at(file_position).await.map_err(|e| e.to_io_or_panic())?; + // let entry_header = cursor.read_entry_header().await.map_err(|e| e.to_io_or_panic())?; + // println!("entry header after delete = {:?}", entry_header); } // println!("{:?}", store); // println!("{:?}", store.read_all_bytes().await?); - // let entry0: Entry = Entry::new(vec![99, 98, 97, 96, 95]); - // append_entry(&mut cursor, &entry0).await?; + let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; - // let entry1: Entry = Entry::new(vec![50,50,50,50,50]); - // let file_position = append_entry(&mut cursor, &entry1).await?; - // println!("CURRENT FILE_POSITION = {}", file_position); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); - // Now file_position point to entry1. - // cursor.mark_deleted_at(file_position).await.map_err(|e| e.to_io_or_panic())?; - // cursor.seek_to(file_position).await.map_err(|e| e.to_io_or_panic())?; + cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; - // let entry2: StoreEntry = StoreEntry::new_deleted(vec![3, 2, 1]); - // let cursor2 = store.append_entry(&entry2).await.map_err(|e| e.to_io_or_panic())?; - // println!("cursor2 = {}", cursor2); - - println!("{:?}", store); - println!("{:?}", store.read_all_bytes().await?); println!("DONE");