use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; use tokio::fs::{File, OpenOptions}; use std::path::Path; use std::marker::PhantomData; use crate::error::{Error, DecodeErrorKind}; use async_trait::async_trait; use bincode; use bincode::{Decode, Encode}; use crate::binary_coding::{encode, decode}; use crate::entry::{Entry, EntryDetailed}; use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader}; use crate::store_header::StoreHeader; use crate::storage_engine::{Store, FilePosition, Column, Result, StoreIndexes, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; use crate::index::Index; // ===Concrete Cursors=== pub struct ReadCursor<'a, T> { header: StoreHeader, indexes: &'a [Option>], file: File, data_type: PhantomData, eof_file_position: FilePosition, } pub struct WriteCursor<'a, T> { header: &'a mut StoreHeader, indexes: &'a mut [Option>], file: File, data_type: PhantomData, eof_file_position: FilePosition, } // ===Traits=== #[async_trait] // TODO: Make this private pub trait PrimitiveCursor { fn file(&mut self) -> &mut File; fn eof_file_position(&self) -> FilePosition; async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> { self.file().read_exact(bytes).await?; Ok(()) } async fn get_bytes(&mut self, count: usize) -> Result> { let mut result: Vec = Vec::with_capacity(count); self.read_bytes(&mut result).await?; Ok(result) } async fn seek_to(&mut self, file_position: FilePosition) -> Result { let file_position = self.file().seek(SeekFrom::Start(file_position)).await?; Ok(file_position) } // Start of the file i.e. the Header, not the entries. async fn seek_to_start(&mut self) -> Result { let file_position = self.file().seek(SeekFrom::Start(0)).await?; Ok(file_position) } async fn seek_to_end(&mut self) -> Result { let file_position = self.file().seek(SeekFrom::End(0)).await?; Ok(file_position) } // Seeks from current position by offset and returns new file position async fn seek_by(&mut self, offset: i64) -> Result { let file_position = self.file().seek(SeekFrom::Current(offset)).await?; Ok(file_position) } async fn current_file_position(&mut self) -> Result { let next_file_position: FilePosition = self.file().stream_position().await?; Ok(next_file_position) } async fn is_at_eof(&mut self) -> Result { let current_file_position = self.current_file_position().await?; let eof_file_position = self.eof_file_position(); Ok(current_file_position == eof_file_position) } } #[async_trait] pub trait CursorWithStoreHeader: PrimitiveCursor { fn header(&self) -> &StoreHeader; async fn seek_to_start_of_data(&mut self) -> Result { self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await } async fn read_entry_header(&mut self) -> Result { let number_of_columns: usize = self.header().number_of_columns; let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; self.read_bytes(&mut header_bytes).await?; let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?; Ok(header) } async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { self.seek_to(file_position).await?; self.read_entry_header().await } // Returns None when file_position == eof_file_position async fn read_entry_at(&mut self, file_position: FilePosition) -> Result>> where T: Decode { self.seek_to(file_position).await?; self.next().await } // ===Iteration=== // The following functions assume that the current file position is at a valid entry or EOF. // WARNING: This moves the file_position to start of the data, so you can't just call // next_entry_header() a bunch of times. You must move the cursor! async fn next_entry_header(&mut self) -> Result> { if self.is_at_eof().await? { return Ok(None) } let entry_header = self.read_entry_header().await?; Ok(Some(entry_header)) } // This is meant to be used after next_entry_header() is called. async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result{ let file_position = self.seek_by(entry_header.size_of_data() as i64).await?; Ok(file_position) } async fn next(&mut self) -> Result>> where T: Decode { let file_position = self.current_file_position().await?; let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; let mut data_bytes: Vec = vec![0; entry_header.size_of_data()]; self.read_bytes(&mut data_bytes).await?; let entry: EntryDetailed = EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?; Ok(Some(entry)) } // Like next, but only reads the column, not the whole entry. async fn next_at_column(&mut self, column: Column) -> Result> where T: Decode + Send { let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; let file_position_at_start_of_data = self.current_file_position().await?; // figuring out how much to decode let column_offset = entry_header.offset_of_column(column); self.seek_by(column_offset as i64).await?; // reading and decoding let mut bytes: Vec = vec![0; entry_header.data_sizes[column as usize]]; self.read_bytes(&mut bytes).await?; let (value, _) = decode::(&bytes[..]) .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; // jumping to next entry self.seek_to(file_position_at_start_of_data).await?; self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?; Ok(Some((entry_header, value))) } async fn next_alive(&mut self) -> Result>> where T: Decode { while let Some(entry) = self.next().await? { if !entry.header.is_deleted { return Ok(Some(entry)) } } Ok(None) } // ===Search=== async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> where T: Decode + PartialEq + Send + Sync { let mut file_position = self.current_file_position().await?; while let Some((_, t)) = self.next_at_column(column).await? { if &t == t0 { // go back and decode the whole entry self.seek_to(file_position).await?; return self.next().await } else { file_position = self.current_file_position().await?; } } Ok(None) } // ===Debugging=== async fn read_entries(&mut self) -> Result<()> where T: Decode + std::fmt::Debug { self.seek_to_start_of_data().await?; while let Some(entry) = self.next().await? { println!("{:?}", entry); } println!("END of entries."); Ok(()) } async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> { let mut bytes: Vec = vec![]; self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?; self.file().read_to_end(&mut bytes).await?; Ok(bytes) } } #[async_trait] pub trait CursorWithAccessToIndex: CursorWithStoreHeader { fn indexes(&mut self) -> &[Option>]; async fn find_in_index(&mut self, k: &T) -> Result> where T: Encode + Decode + Ord + Send + Sync { // let x = self.primary_index().lookup(k).await?; todo!() } } // ===PrimitiveCursor=== impl PrimitiveCursor for ReadCursor<'_, T> { fn file(&mut self) -> &mut File { &mut self.file } fn eof_file_position(&self) -> FilePosition { self.eof_file_position } } impl PrimitiveCursor for WriteCursor<'_, T> { fn file(&mut self) -> &mut File { &mut self.file } fn eof_file_position(&self) -> FilePosition { self.eof_file_position } } // ===CursorWithStoreHeader=== impl CursorWithStoreHeader for ReadCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } impl CursorWithStoreHeader for WriteCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } // ===CursorWithAccessToIndex=== impl CursorWithAccessToIndex for ReadCursor<'_, T> { fn indexes(&mut self) -> &[Option>] { &self.indexes } } impl CursorWithAccessToIndex for WriteCursor<'_, T> { fn indexes(&mut self) -> &[Option>] { &self.indexes } } impl <'cursor, T> ReadCursor<'cursor, T> { pub async fn new<'store: 'cursor>(store: &'store Store) -> Result where T: Send + Sync { let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let file: File = OpenOptions::new() .read(true) .open(path_to_rows) .await?; let mut cursor = Self { header: store.header.clone(), file, data_type: store.data_type, indexes: &store.indexes, eof_file_position: 0, }; let eof_file_position: FilePosition = cursor.seek_to_end().await?; cursor.eof_file_position = eof_file_position; cursor.seek_to_start_of_data().await?; Ok(cursor) } } impl <'cursor, T> WriteCursor<'cursor, T> // TODO: Consider adding this manually to wher eit is really needed where T: Sync { // 'store lives atleast as long as 'cursor pub async fn new<'store: 'cursor>(store: &'store mut Store) -> Result where T: Send { let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let file: File = OpenOptions::new() .read(true) .write(true) .open(path_to_rows) .await?; let mut cursor = Self { header: &mut store.header, file, data_type: store.data_type, indexes: &mut store.indexes, eof_file_position: 0, }; let eof_file_position: FilePosition = cursor.seek_to_end().await?; cursor.eof_file_position = eof_file_position; cursor.seek_to_start_of_data().await?; Ok(cursor) } pub async fn connect<'header: 'cursor, 'indexes: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader, indexes: &'indexes mut StoreIndexes) -> Result where T: Send { let file: File = OpenOptions::new() .read(true) .write(true) .open(path_to_rows) .await?; let mut cursor = Self { header, file, data_type: PhantomData::, indexes, eof_file_position: 0, }; let eof_file_position: FilePosition = cursor.seek_to_end().await?; cursor.eof_file_position = eof_file_position; cursor.seek_to_start_of_data().await?; Ok(cursor) } // ===Primitive Operations=== async fn write_bytes(&mut self, bytes: &[u8]) -> Result { Ok(self.file.write(bytes).await?) } // ===Store Header Manipulation=== async fn increment_total_count(&mut self) -> Result<()> where T: Send { self.seek_to_start().await?; self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; let new_count = self.header.increment_total_count(); self.write_bytes(&encode::(&new_count)?).await?; Ok(()) } async fn increment_deleted_count(&mut self) -> Result<()> where T: Send { self.seek_to_start().await?; self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; let new_count = self.header.increment_deleted_count(); self.write_bytes(&encode::(&new_count)?).await?; Ok(()) } // ===Entry Header Manipulation=== // assumes we are at the start of valid entry. async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> { let bytes: Vec = entry_header.encode()?; self.write_bytes(&bytes).await?; Ok(()) } // ===Append Entry=== // Moves cursor to the end. // Returns file position to the start of the new entry. pub async fn append_entry(&mut self, entry: &Entry) -> Result where T: Encode + Send { self.increment_total_count().await?; let encoded_entry: Vec = entry.encode()?; let file_position = self.seek_to_end().await?; self.write_bytes(&encoded_entry).await?; let eof_file_position: FilePosition = self.current_file_position().await?; self.eof_file_position = eof_file_position; Ok(file_position) } // ===Deletion=== pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> where T: Send { self.seek_to(file_position).await?; let mut entry_header = self.read_entry_header().await?; if entry_header.is_deleted { Ok(()) } else { self.increment_deleted_count().await?; self.seek_to(file_position).await?; entry_header.is_deleted = true; self.set_new_entry_header(entry_header.into()).await?; self.attempt_garbage_collection_if_necessary().await?; Ok(()) } } async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result>> where T: Decode + PartialEq + Send + Sync { let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?; if let Some(entry) = maybe_entry { self.mark_deleted_at(entry.file_position).await?; Ok(Some(entry)) } else { Ok(maybe_entry) } } async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> { // TODO: What should be the policy? Counting size of garbage? Counting how many entries are // garbage? if self.header.deleted_count > 100 { todo!() } else { Ok(()) } } async fn initiate_garbage_collection(&mut self) -> Result where T: Send { let table_folder = self.header.table_folder.to_string(); let path_to_table = Path::new(&table_folder); let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); let intermediate_file: File = Store::::create_empty_rows_file(path_to_rows, &self.header).await?; let mut intermediate_header: StoreHeader = StoreHeader { table_folder, number_of_columns: self.header.number_of_columns, deleted_count: 0, total_count: 0, primary_column: self.header.primary_column, indexed_columns: todo!() }; // Creates a new cursor to the intermediate file in which we'll dump the live entries. // let mut cursor_to_intermediate = Self { // header: &mut intermediate_header, // file: intermediate_file, // data_type: PhantomData::, // eof_file_position: 0, // }; let mut cursor_to_intermediate: Self = todo!(); let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?; cursor_to_intermediate.eof_file_position = eof_file_position; // TODO: intermediate_header does not live long enough, so after garbage collection is // done, we need to use it in the swap. cursor_to_intermediate.header = todo!(); // In it there will be only the alive rows. // Afterwards we swap the files, and delete the garbage. todo!() } // ===Indexing=== async fn insert_to_index(&mut self, t: T, file_position: FilePosition) -> Result> where T: Encode + Decode + Ord + Send + Sync { // let x = self.primary_index.insert(t, file_position).await?; todo!() } async fn delete_from_index(&mut self, t: T, file_position: FilePosition) -> Result> where T: Encode + Decode + Ord + Send + Sync { // let x = self.primary_index.delete(t, file_position).await?; todo!() } }