diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 648d964..c658213 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -59,7 +59,7 @@ trait PrimitiveCursor { } #[async_trait] -trait CursorWithStoreHeader: PrimitiveCursor { +pub trait CursorWithStoreHeader: PrimitiveCursor { fn header(&self) -> &StoreHeader; async fn read_entry_header(&mut self) -> Result { @@ -103,6 +103,7 @@ trait CursorWithStoreHeader: PrimitiveCursor { Ok(Some(entry)) } + // ===Debugging=== async fn read_entries(&mut self) -> Result<()> where T: Decode + std::fmt::Debug { @@ -113,6 +114,12 @@ trait CursorWithStoreHeader: PrimitiveCursor { println!("END of entries."); Ok(()) } + + async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> { + let mut bytes: Vec = vec![]; + self.file().read_to_end(&mut bytes).await?; + Ok(bytes) + } } @@ -189,6 +196,10 @@ impl ReadCursor { Ok(cursor) } + + pub async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> Result { + todo!() + } } impl <'cursor, T> WriteCursor<'cursor, T> { @@ -300,3 +311,23 @@ impl <'cursor, T> WriteCursor<'cursor, T> { } } } + + + +// TODO + // pub async fn search_for(&mut self, index: T) -> Result<()> + // where T: Send + // { + // // let index = self.primary_index.borrow_mut(); + // // let x = index.lookup(self, 123).await?; + // todo!() + // } + // pub async fn search_for_entry_with_id(&mut self, id: T) -> Result>> { + // // TODO: make call to the primary index + // todo!() + // } + + // // TODO: This needs to be some sort of an iterator + // pub async fn get_all_eq(&self, column: Column, value: T) -> Result>> { + // todo!() + // } diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index 504e519..fd38d58 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -11,6 +11,7 @@ mod index; mod cursor; use crate::storage_engine::*; +use crate::cursor::*; type Data = u32; @@ -24,14 +25,6 @@ async fn create_store() -> Result> { println!("THE STORE: {:?}", store); println!("THE BYTES: {:?}", store.read_all_bytes().await?); - let mut cursor = store.cursor(AccessMode::Write).await.map_err(|e| e.to_io_or_panic())?; - let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); - append_entry(&mut cursor, &entry0).await?; - - let entry1: Entry = Entry::new(vec![200, 200, 5, 6, 7]); - append_entry(&mut cursor, &entry1).await?; - - println!("{:?}", store.read_all_bytes().await?); Ok(store) } @@ -53,7 +46,7 @@ async fn create_or_connect() -> Result> { } -async fn append_entry(cursor: &mut Cursor, entry: &Entry) -> Result{ +async fn append_entry(cursor: &mut WriteCursor<'_, Data>, entry: &Entry) -> Result{ println!("APPENDING"); println!("entry == {:?}", entry); let file_position: FilePosition = cursor.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?; @@ -63,9 +56,10 @@ async fn append_entry(cursor: &mut Cursor, entry: &Entry) -> Result< async fn read_entry(cursor: &mut Cursor, file_position: FilePosition) -> Result>>{ println!("READING ENTRY at file_position={}", file_position); - let entry = cursor.read_entry_at(file_position).await.map_err(|e| e.to_io_or_panic())?; - println!("ENTRY: {:?}", entry); - Ok(entry) + // let entry = cursor.read_entry_at(file_position).await.map_err(|e| e.to_io_or_panic())?; + // println!("ENTRY: {:?}", entry); + // Ok(entry) + todo!() } @@ -73,32 +67,44 @@ async fn read_entry(cursor: &mut Cursor, file_position: FilePosition) -> R async fn main() -> Result<()> { println!("STOOOOOOOOOOOORAAAAAAAAAAAGE"); - let store: Store = create_or_connect().await?; + let mut store: Store = create_or_connect().await?; + + { + let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?; + let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); + append_entry(&mut cursor, &entry0).await?; + + let entry1: Entry = Entry::new(vec![200, 200, 5, 6, 7]); + append_entry(&mut cursor, &entry1).await?; + + // println!("{:?}", store.read_all_bytes().await?); + let entry2: Entry = Entry::new(vec![99, 98, 97, 96, 95]); + append_entry(&mut cursor, &entry2).await?; + + let entry3: Entry = Entry::new(vec![50,50,50,50,50]); + append_entry(&mut cursor, &entry3).await?; + } - // let entry0 = read_entry(&mut store, 16).await?; - // let entry1 = read_entry(&mut store, 45).await?; // println!("{:?}", store); // println!("{:?}", store.read_all_bytes().await?); - let mut cursor = store.cursor(AccessMode::Write).await.map_err(|e| e.to_io_or_panic())?; - let entry0: Entry = Entry::new(vec![99, 98, 97, 96, 95]); - append_entry(&mut cursor, &entry0).await?; + // let entry0: Entry = Entry::new(vec![99, 98, 97, 96, 95]); + // append_entry(&mut cursor, &entry0).await?; + + // let entry1: Entry = Entry::new(vec![50,50,50,50,50]); + // let file_position = append_entry(&mut cursor, &entry1).await?; + // println!("CURRENT FILE_POSITION = {}", file_position); - let entry1: Entry = Entry::new(vec![50,50,50,50,50]); - let file_position = append_entry(&mut cursor, &entry1).await?; - println!("CURRENT FILE_POSITION = {}", file_position); // Now file_position point to entry1. // cursor.mark_deleted_at(file_position).await.map_err(|e| e.to_io_or_panic())?; // cursor.seek_to(file_position).await.map_err(|e| e.to_io_or_panic())?; - cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; - - // let entry2: StoreEntry = StoreEntry::new_deleted(vec![3, 2, 1]); // let cursor2 = store.append_entry(&entry2).await.map_err(|e| e.to_io_or_panic())?; // println!("cursor2 = {}", cursor2); println!("{:?}", store); + println!("{:?}", store.read_all_bytes().await?); println!("DONE"); diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index 27e9e41..091c403 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -12,6 +12,7 @@ use tokio::fs; use crate::index::SomethingSupportingLeq; use crate::error::{Error, DecodeErrorKind}; +use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader}; use crate::index::Index; @@ -159,15 +160,6 @@ impl SomethingSupportingLeq for Store pub const ROWS_FILE_NAME: &'static str = "rows"; impl Store { - // For debugging. - // Moves file cursor to the end. - pub async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error>{ - let mut bytes: Vec = vec![]; - let mut cursor = self.cursor(AccessMode::Read).await.map_err(|e| e.to_io_or_panic())?; - cursor.file.read_to_end(&mut bytes).await?; - Ok(bytes) - } - // ===Creation=== pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result { let path_to_table = Path::new(table_folder); @@ -234,12 +226,26 @@ impl Store { Ok(store) } - pub async fn cursor(&self, mode: AccessMode) -> Result> { - Cursor::new(&self, mode).await + // ===Cursors=== + pub async fn read_cursor(&self) -> Result> + where T: Send + { + ReadCursor::new(self).await } - pub async fn garbage_collect(&mut self) -> Result<()> { - todo!() + pub async fn write_cursor(&mut self) -> Result> + where T: Send + { + WriteCursor::new(self).await + } + + // For debugging. + pub async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> + where T: Send + { + let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + let bytes = cursor.read_all_bytes().await?; + Ok(bytes) } } @@ -347,251 +353,6 @@ impl EntryDetailed { } -//=================Cursor================== -impl Cursor { - pub async fn new(store: &Store, mode: AccessMode) -> Result { - let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME); - let file: File = match mode { - AccessMode::Read => - OpenOptions::new() - .read(true) - .open(path_to_rows) - .await?, - - AccessMode::Write => - OpenOptions::new() - .read(true) - .write(true) - .open(path_to_rows) - .await?, - }; - - let mut cursor = Self { - header: store.header.clone(), - file, - data_type: store.data_type, - - eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data - }; - cursor.seek_to_start_of_data().await?; - - Ok(cursor) - } - - //===primitive file operations=== - // Moves the file cursor right. - async fn write_bytes(&mut self, bytes: &[u8]) -> Result { - Ok(self.file.write(bytes).await?) - } - - // Moves the file cursor right. - async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> { - self.file.read_exact(bytes).await?; - Ok(()) - } - - // Moves the file cursor right. - async fn get_bytes(&mut self, count: usize) -> Result> { - let mut result: Vec = Vec::with_capacity(count); - self.read_bytes(&mut result).await?; - Ok(result) - } - - pub async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> { - self.file.seek(SeekFrom::Start(file_position)).await?; - Ok(()) - } - - async fn seek_to_start(&mut self) -> Result<()> { - self.file.seek(SeekFrom::Start(0)).await?; - Ok(()) - } - - async fn seek_to_end(&mut self) -> Result<()> { - self.file.seek(SeekFrom::End(0)).await?; - Ok(()) - } - - async fn seek_to_start_of_data(&mut self) -> Result<()> { - self.seek_to(StoreHeader::SIZE as u64).await - } - - pub async fn current_file_position(&mut self) -> Result { - let next_file_position: FilePosition = self.file.stream_position().await?; - Ok(next_file_position) - } - - async fn is_at_eof(&mut self) -> Result { - Ok(self.current_file_position().await? == self.eof_file_position) - } - - pub async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> Result { - todo!() - } - - // ===Iteration=== - // Assumes that the current file position is at a valid entry or EOF. - pub async fn next(&mut self) -> Result>> - where T: Decode - { - if self.is_at_eof().await? { - return Ok(None) - } - - let header = self.read_entry_header().await?; - - let mut data_bytes: Vec = vec![0; header.size_of_data()]; - self.read_bytes(&mut data_bytes).await?; - let entry: EntryDetailed = - EntryDetailed::decode(header, self.header.number_of_columns, &mut data_bytes)?; - - Ok(Some(entry)) - } - - - // ===Store Header Manipulation=== - async fn increment_total_count(&mut self) -> Result<()> { - self.seek_to_start().await?; - self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; - let new_count = self.header.increment_total_count(); - self.write_bytes(&encode::(&new_count)?).await?; - Ok(()) - } - - async fn increment_deleted_count(&mut self) -> Result<()> { - self.seek_to_start().await?; - self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; - let new_count = self.header.increment_deleted_count(); - self.write_bytes(&encode::(&new_count)?).await?; - Ok(()) - } - - // ===Entry Header Manipulation=== - // assumes we are at the start of the valid entry. - async fn set_entry_is_deleted_to(&mut self, is_deleted: bool) -> Result<()> { - self.seek_to(EntryHeaderWithDataSize::IS_DELETED_OFFSET as u64).await?; - self.write_bytes(&encode::(&is_deleted)?).await?; - Ok(()) - } - - // ===Append Entry=== - - // Moves cursor to the end. - // Returns file position to the start of the new entry. - pub async fn append_entry(&mut self, entry: &Entry) -> Result - where T: Encode - { - self.increment_total_count().await?; - - let encoded_entry: Vec = entry.encode()?; - self.seek_to_end().await?; - let file_position: FilePosition = self.current_file_position().await?; - self.write_bytes(&encoded_entry).await?; - - let eof_file_position: FilePosition = self.current_file_position().await?; - self.eof_file_position = eof_file_position; - - Ok(file_position) - } - - // ===Deletion=== - pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> { - self.seek_to(file_position).await?; - let entry_header = self.read_entry_header().await?; - if entry_header.is_deleted { - Ok(()) - } else { - self.increment_deleted_count().await?; - self.seek_to(file_position).await?; - self.set_entry_is_deleted_to(true).await?; - - self.attempt_garbage_collection_if_necessary().await?; - Ok(()) - } - } - - async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> { - // TODO: What should be the policy? Counting size of garbage? Counting how many entries are - // garbage? - if self.header.deleted_count > 100 { - todo!() - } else { - Ok(()) - } - } - - // ===Lookup=== - // WARNING: The cursor has to be at the start of an entry. Otherwise garbage data will be - // decoded as an entry. - async fn read_entry_header(&mut self) -> Result { - let number_of_columns: usize = self.header.number_of_columns; - let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; - self.read_bytes(&mut header_bytes).await?; - let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?; - // TODO: Get rid of the println's - // println!("HEADER_BYTES: {:?}", header_bytes); - // println!("HEADER: {:?}", header); - - Ok(header) - } - - pub async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { - self.seek_to(file_position).await?; - self.read_entry_header().await - } - - pub async fn search_for(&mut self, index: T) -> Result<()> - where T: Send - { - // let index = self.primary_index.borrow_mut(); - // let x = index.lookup(self, 123).await?; - todo!() - } - - - // Returns None when file_positoin == eof_file_position - pub async fn read_entry_at(&mut self, file_position: FilePosition) -> Result>> - where T: Decode - { - self.seek_to(file_position).await?; - self.next().await - } - - // TODO: This needs to be some sort of an iterator - // pub async fn entries() -> EntryIterator { - // todo!() - // } - - pub async fn read_entries(&mut self) -> Result<()> - where T: Decode + std::fmt::Debug - { - self.seek_to_start_of_data().await?; - let mut file_position: FilePosition = self.current_file_position().await?; - loop { - match self.read_entry_at(file_position).await? { - Some(entry) => { - println!("{:?}", entry); - file_position = self.current_file_position().await?; - }, - None => { - println!("END of entries."); - return Ok(()) - } - } - } - } - - pub async fn search_for_entry_with_id(&mut self, id: T) -> Result>> { - // TODO: make call to the primary index - todo!() - } - - // TODO: This needs to be some sort of an iterator - pub async fn get_all_eq(&self, column: Column, value: T) -> Result>> { - todo!() - } -} - // impl StorageEngine for ColumnStore { // async fn append(&mut self, id: Index, entry: Row) -> Result