use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; use tokio::fs::{File, OpenOptions}; use std::path::Path; use std::marker::PhantomData; use async_trait::async_trait; use bincode; use bincode::{Decode, Encode}; use crate::binary_coding::encode; use crate::entry::{Entry, EntryDetailed}; use crate::entry_header::EntryHeaderWithDataSize; use crate::store_header::StoreHeader; use crate::storage_engine::{Store, FilePosition, Result, ROWS_FILE_NAME}; #[async_trait] trait PrimitiveCursor { fn file(&mut self) -> &mut File; fn eof_file_position(&self) -> FilePosition; async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> { self.file().read_exact(bytes).await?; Ok(()) } async fn get_bytes(&mut self, count: usize) -> Result> { let mut result: Vec = Vec::with_capacity(count); self.read_bytes(&mut result).await?; Ok(result) } async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> { self.file().seek(SeekFrom::Start(file_position)).await?; Ok(()) } async fn seek_to_start(&mut self) -> Result<()> { self.file().seek(SeekFrom::Start(0)).await?; Ok(()) } async fn seek_to_end(&mut self) -> Result<()> { self.file().seek(SeekFrom::End(0)).await?; Ok(()) } async fn seek_to_start_of_data(&mut self) -> Result<()> { self.seek_to(StoreHeader::SIZE as u64).await } async fn current_file_position(&mut self) -> Result { let next_file_position: FilePosition = self.file().stream_position().await?; Ok(next_file_position) } async fn is_at_eof(&mut self) -> Result { Ok(self.current_file_position().await? == self.eof_file_position()) } } #[async_trait] pub trait CursorWithStoreHeader: PrimitiveCursor { fn header(&self) -> &StoreHeader; async fn read_entry_header(&mut self) -> Result { let number_of_columns: usize = self.header().number_of_columns; let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; self.read_bytes(&mut header_bytes).await?; let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?; Ok(header) } async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { self.seek_to(file_position).await?; self.read_entry_header().await } // Returns None when file_position == eof_file_position async fn read_entry_at(&mut self, file_position: FilePosition) -> Result>> where T: Decode { self.seek_to(file_position).await?; self.next().await } // ===Iteration=== // Assumes that the current file position is at a valid entry or EOF. async fn next(&mut self) -> Result>> where T: Decode { if self.is_at_eof().await? { return Ok(None) } let entry_header = self.read_entry_header().await?; let mut data_bytes: Vec = vec![0; entry_header.size_of_data()]; self.read_bytes(&mut data_bytes).await?; let entry: EntryDetailed = EntryDetailed::decode(entry_header, self.header().number_of_columns, &mut data_bytes)?; Ok(Some(entry)) } // ===Debugging=== async fn read_entries(&mut self) -> Result<()> where T: Decode + std::fmt::Debug { self.seek_to_start_of_data().await?; while let Some(entry) = self.next().await? { println!("{:?}", entry); } println!("END of entries."); Ok(()) } async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> { let mut bytes: Vec = vec![]; self.file().read_to_end(&mut bytes).await?; Ok(bytes) } } // ===Concrete Cursors=== pub struct ReadCursor { header: StoreHeader, file: File, data_type: PhantomData, eof_file_position: FilePosition, } pub struct WriteCursor<'a, T> { header: &'a mut StoreHeader, file: File, data_type: PhantomData, eof_file_position: FilePosition, } // ===PrimitiveCursor=== impl PrimitiveCursor for ReadCursor { fn file(&mut self) -> &mut File { &mut self.file } fn eof_file_position(&self) -> FilePosition { self.eof_file_position } } impl PrimitiveCursor for WriteCursor<'_, T> { fn file(&mut self) -> &mut File { &mut self.file } fn eof_file_position(&self) -> FilePosition { self.eof_file_position } } // ===CursorWithStoreHeader=== impl CursorWithStoreHeader for ReadCursor { fn header(&self) -> &StoreHeader { &self.header } } impl CursorWithStoreHeader for WriteCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } impl ReadCursor { pub async fn new(store: &Store) -> Result where T: Send { let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME); let file: File = OpenOptions::new() .read(true) .open(path_to_rows) .await?; let mut cursor = Self { header: store.header.clone(), file, data_type: store.data_type, eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data }; cursor.seek_to_start_of_data().await?; Ok(cursor) } pub async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> Result { todo!() } } impl <'cursor, T> WriteCursor<'cursor, T> { // 'store lives atleast as long as 'cursor pub async fn new<'store: 'cursor>(store: &'store mut Store) -> Result where T: Send { let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME); let file: File = OpenOptions::new() .read(true) .write(true) .open(path_to_rows) .await?; let mut cursor = Self { header: &mut store.header, file, data_type: store.data_type, eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data }; cursor.seek_to_start_of_data().await?; Ok(cursor) } // ===Primitive Operations=== async fn write_bytes(&mut self, bytes: &[u8]) -> Result { Ok(self.file.write(bytes).await?) } // ===Store Header Manipulation=== async fn increment_total_count(&mut self) -> Result<()> where T: Send { self.seek_to_start().await?; self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; let new_count = self.header.increment_total_count(); self.write_bytes(&encode::(&new_count)?).await?; Ok(()) } async fn increment_deleted_count(&mut self) -> Result<()> where T: Send { self.seek_to_start().await?; self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; let new_count = self.header.increment_deleted_count(); self.write_bytes(&encode::(&new_count)?).await?; Ok(()) } // ===Entry Header Manipulation=== // assumes we are at the start of the valid entry. async fn set_entry_is_deleted_to(&mut self, is_deleted: bool) -> Result<()> where T: Send { self.seek_to(EntryHeaderWithDataSize::IS_DELETED_OFFSET as u64).await?; self.write_bytes(&encode::(&is_deleted)?).await?; Ok(()) } // ===Append Entry=== // Moves cursor to the end. // Returns file position to the start of the new entry. pub async fn append_entry(&mut self, entry: &Entry) -> Result where T: Encode + Send { self.increment_total_count().await?; let encoded_entry: Vec = entry.encode()?; self.seek_to_end().await?; let file_position: FilePosition = self.current_file_position().await?; self.write_bytes(&encoded_entry).await?; let eof_file_position: FilePosition = self.current_file_position().await?; self.eof_file_position = eof_file_position; Ok(file_position) } // ===Deletion=== pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> where T: Send { self.seek_to(file_position).await?; let entry_header = self.read_entry_header().await?; if entry_header.is_deleted { Ok(()) } else { self.increment_deleted_count().await?; self.seek_to(file_position).await?; self.set_entry_is_deleted_to(true).await?; self.attempt_garbage_collection_if_necessary().await?; Ok(()) } } async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> { // TODO: What should be the policy? Counting size of garbage? Counting how many entries are // garbage? if self.header.deleted_count > 100 { todo!() } else { Ok(()) } } } // TODO // pub async fn search_for(&mut self, index: T) -> Result<()> // where T: Send // { // // let index = self.primary_index.borrow_mut(); // // let x = index.lookup(self, 123).await?; // todo!() // } // pub async fn search_for_entry_with_id(&mut self, id: T) -> Result>> { // // TODO: make call to the primary index // todo!() // } // // TODO: This needs to be some sort of an iterator // pub async fn get_all_eq(&self, column: Column, value: T) -> Result>> { // todo!() // }