From 28741006e73dfe640eacdd308b99d148fac11254 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Fri, 2 Feb 2024 19:04:51 +0100 Subject: [PATCH] Rename cursor ~> file_position --- storage_engine/src/main.rs | 27 ++++---- storage_engine/src/storage_engine.rs | 100 ++++++++++++++++++--------- 2 files changed, 81 insertions(+), 46 deletions(-) diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index 0bdd82c..a286df5 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -20,7 +20,8 @@ type Result = std::result::Result; async fn create_store() -> Result> { let mut store: Store = Store::new(TABLE_PATH, 5, 0).await.map_err(|e| e.to_io_or_panic())?; println!("CREATED"); - println!("{:?}", store.read_all_bytes().await?); + println!("THE STORE: {:?}", store); + println!("THE BYTES: {:?}", store.read_all_bytes().await?); let entry0: Entry = Entry::new_deleted(vec![1, 2, 3, 4, 5]); append_entry(&mut store, &entry0).await?; @@ -31,10 +32,12 @@ async fn create_store() -> Result> { println!("{:?}", store.read_all_bytes().await?); Ok(store) } + async fn connect_store() -> Result> { let mut store: Store = Store::connect(TABLE_PATH).await.map_err(|e| e.to_io_or_panic())?; println!("CONNECTED"); - println!("{:?}", store.read_all_bytes().await?); + println!("THE STORE: {:?}", store); + println!("THE BYTES: {:?}", store.read_all_bytes().await?); Ok(store) } @@ -48,17 +51,17 @@ async fn create_or_connect() -> Result> { } -async fn append_entry(store: &mut Store, entry: &Entry) -> Result{ +async fn append_entry(store: &mut Store, entry: &Entry) -> Result{ println!("APPENDING"); println!("entry == {:?}", entry); - let cursor: Cursor = store.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?; - println!("cursor == {:?}", cursor); - Ok(cursor) + let file_position: FilePosition = store.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?; + println!("file_position == {:?}", file_position); + Ok(file_position) } -async fn read_entry(store: &mut Store, cursor: Cursor) -> Result>{ - println!("READING ENTRY at cursor={}", cursor); - let entry = store.read_entry_at(cursor).await.map_err(|e| e.to_io_or_panic())?; +async fn read_entry(store: &mut Store, file_position: FilePosition) -> Result>>{ + println!("READING ENTRY at file_position={}", file_position); + let entry = store.read_entry_at(file_position).await.map_err(|e| e.to_io_or_panic())?; println!("ENTRY: {:?}", entry); Ok(entry) } @@ -75,10 +78,10 @@ async fn main() -> Result<()> { // println!("{:?}", store); // println!("{:?}", store.read_all_bytes().await?); - // let entry0: Entry = Entry::new(vec![99, 98, 97, 96, 95]); - // append_entry(&mut store, &entry0).await?; + let entry0: Entry = Entry::new(vec![99, 98, 97, 96, 95]); + append_entry(&mut store, &entry0).await?; - store.read_entries(2).await.map_err(|e| e.to_io_or_panic())?; + store.read_entries().await.map_err(|e| e.to_io_or_panic())?; // let entry2: StoreEntry = StoreEntry::new_deleted(vec![3, 2, 1]); diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index 2555939..113982a 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -15,7 +15,7 @@ use std::mem::size_of; type Result = std::result::Result; pub type Column = u64; -pub type Cursor = u64; +pub type FilePosition = u64; // TODO: Consider introducing a phantom type for the data that's used in the store. #[derive(Debug)] @@ -24,6 +24,8 @@ pub struct Store { file: File, header: StoreHeader, data_type: PhantomData, + + eof_file_position: FilePosition, // meta // location of rows file // locations of index files @@ -32,6 +34,10 @@ pub struct Store { // list } +// TODO: Basically a pointer to Store + its own file position +// pub struct Cursor<'a, T> { +// } + #[derive(Debug)] pub struct StoreHeader { number_of_columns: usize, @@ -94,33 +100,41 @@ pub struct EntryDetailed { data: Vec, } + +pub struct EntryIterator<'a> { + file: &'a mut File, + current_file_position: FilePosition +} + //===Store=== pub async fn store_exists(table_folder: &str) -> Result { Ok(fs::metadata(table_folder).await.is_ok()) } impl Store { + const ROWS_FILE_NAME: &'static str = "rows"; + //===primitive file operations=== - // Moves the cursor right. + // Moves the file cursor right. async fn write_bytes(&mut self, bytes: &[u8]) -> Result { Ok(self.file.write(bytes).await?) } - // Moves the cursor right. + // Moves the file cursor right. async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> { self.file.read_exact(bytes).await?; Ok(()) } - // Moves the cursor right. + // Moves the file cursor right. async fn get_bytes(&mut self, count: usize) -> Result> { let mut result: Vec = Vec::with_capacity(count); self.read_bytes(&mut result).await?; Ok(result) } - async fn seek_to(&mut self, cursor: Cursor) -> Result<()>{ - self.file.seek(SeekFrom::Start(cursor)).await?; + async fn seek_to(&mut self, file_position: FilePosition) -> Result<()>{ + self.file.seek(SeekFrom::Start(file_position)).await?; Ok(()) } @@ -138,13 +152,13 @@ impl Store { self.seek_to(StoreHeader::SIZE as u64).await } - async fn current_cursor(&mut self) -> Result { - let next_cursor: Cursor = self.file.stream_position().await?; - Ok(next_cursor) + async fn current_file_position(&mut self) -> Result { + let next_file_position: FilePosition = self.file.stream_position().await?; + Ok(next_file_position) } // For debugging. - // Moves cursor to the end. + // Moves file cursor to the end. pub async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error>{ let mut bytes: Vec = vec![]; self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?; @@ -152,8 +166,6 @@ impl Store { Ok(bytes) } - const ROWS_FILE_NAME: &'static str = "rows"; - // ===Creation=== pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result { let path_to_table = Path::new(table_folder); @@ -177,15 +189,15 @@ impl Store { }; let encoded_header: Vec = header.encode()?; - println!("ENCODED_HEADER: {:?}", encoded_header); - let mut store = Self { table_folder: table_folder.to_string(), file, header, data_type: PhantomData::, + eof_file_position: 0, }; store.write_bytes(&encoded_header).await?; + store.eof_file_position = store.current_file_position().await?; Ok(store) } @@ -208,13 +220,15 @@ impl Store { file.read_exact(&mut header_bytes).await?; let header = StoreHeader::decode(&mut header_bytes).await?; + let eof_file_position = file.seek(SeekFrom::End(0)).await?; + let store = Self { table_folder: table_folder.to_string(), file, header, data_type: PhantomData::, + eof_file_position }; - println!("just connected TOOOOO {:?}", store); Ok(store) } @@ -236,24 +250,27 @@ impl Store { } // Moves cursor to the end. - pub async fn append_entry(&mut self, entry: &Entry) -> Result + pub async fn append_entry(&mut self, entry: &Entry) -> Result where T: Encode { self.increment_total_count().await?; let encoded_entry: Vec = entry.encode()?; self.seek_to_end().await?; - let cursor: Cursor = self.current_cursor().await?; + let file_position: FilePosition = self.current_file_position().await?; self.write_bytes(&encoded_entry).await?; - Ok(cursor) + let eof_file_position: FilePosition = self.current_file_position().await?; + self.eof_file_position = eof_file_position; + + Ok(file_position) } // ===Deletion=== - pub async fn mark_deleted_at(&mut self, cursor: Cursor) -> Result<()> { + pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> { self.increment_deleted_count().await?; - self.seek_to(cursor).await?; + self.seek_to(file_position).await?; // TODO: Now you need to mutate the entry itself todo!() @@ -263,9 +280,8 @@ impl Store { // ===Lookup=== // WARNING: The cursor has to be at the start of an entry. Otherwise garbage data will be // decoded as an entry. - pub async fn read_entry_header_at(&mut self, cursor: Cursor) -> Result { - self.seek_to(cursor).await?; - self.file.seek(SeekFrom::Start(cursor)).await?; + pub async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { + self.seek_to(file_position).await?; let number_of_columns: usize = self.header.number_of_columns; let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; @@ -278,10 +294,15 @@ impl Store { Ok(header) } - pub async fn read_entry_at(&mut self, cursor: Cursor) -> Result> + // Returns None when file_positoin == eof_file_position + pub async fn read_entry_at(&mut self, file_position: FilePosition) -> Result>> where T: Decode { - let header = self.read_entry_header_at(cursor).await?; + if file_position == self.eof_file_position { + return Ok(None) + } + + let header = self.read_entry_header_at(file_position).await?; let mut data_bytes: Vec = vec![0; header.size_of_data()]; // TODO: Get rid of the println's @@ -291,20 +312,31 @@ impl Store { let entry: EntryDetailed = EntryDetailed::decode(header, self.header.number_of_columns, &mut data_bytes)?; - Ok(entry) + Ok(Some(entry)) } - pub async fn read_entries(&mut self, n: usize) -> Result<()> + // TODO: This needs to be some sort of an iterator + // pub async fn entries() -> EntryIterator { + // todo!() + // } + + pub async fn read_entries(&mut self) -> Result<()> where T: Decode + std::fmt::Debug { self.seek_to_start_of_data().await?; - let mut cursor: Cursor = self.current_cursor().await?; - for i in 0..n { - let entry = self.read_entry_at(cursor).await?; - println!("({}, {:?})", i, entry); - cursor = self.current_cursor().await?; + let mut file_position: FilePosition = self.current_file_position().await?; + loop { + match self.read_entry_at(file_position).await? { + Some(entry) => { + println!("{:?}", entry); + file_position = self.current_file_position().await?; + }, + None => { + println!("END of entries."); + return Ok(()) + } + } } - Ok(()) } pub async fn search_for_entry_with_id(&mut self, id: T) -> Result>> { @@ -313,7 +345,7 @@ impl Store { } // TODO: This needs to be some sort of an iterator - pub async fn get_eq(&self, column: Column, value: T) -> Result>> { + pub async fn get_all_eq(&self, column: Column, value: T) -> Result>> { todo!() }