use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; use tokio::fs::{File, OpenOptions}; use tokio::fs; use std::path::Path; use std::marker::PhantomData; use std::collections::{BTreeMap, HashSet}; use async_trait::async_trait; use bincode; use bincode::{Decode, Encode}; use crate::binary_coding::{encode, decode}; use crate::error::{Error, DecodeErrorKind}; use crate::segments::entry::{Entry, EntryDetailed}; use crate::segments::entry_header::{EntryHeaderWithDataSize, EntryHeader}; use crate::segments::store_header::StoreHeader; use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; use crate::index::Index; const GARBAGE_COLLECTION_TRIGGER: usize = 100; // ===Concrete Cursors=== pub struct ReadCursor<'a, T> { header: StoreHeader, indexes: &'a [Option>], file: File, eof_file_position: FilePosition, } pub struct WriteCursor<'a, T> { header: &'a mut StoreHeader, indexes: &'a mut [Option>], file: File, eof_file_position: FilePosition, } // This is used as a cursor to temporary file during Garbage Collection pub struct AppendOnlyCursor { header: StoreHeader, file: File, data_type: PhantomData, eof_file_position: FilePosition, } // ===Traits=== #[async_trait] pub(crate) trait PrimitiveCursor { fn file(&mut self) -> &mut File; fn eof_file_position(&self) -> FilePosition; async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> { self.file().read_exact(bytes).await?; Ok(()) } async fn get_bytes(&mut self, count: usize) -> Result> { let mut result: Vec = Vec::with_capacity(count); self.read_bytes(&mut result).await?; Ok(result) } async fn seek_to(&mut self, file_position: FilePosition) -> Result { let file_position = self.file().seek(SeekFrom::Start(file_position)).await?; Ok(file_position) } // Start of the file i.e. the Header, not the entries. async fn seek_to_start(&mut self) -> Result { let file_position = self.file().seek(SeekFrom::Start(0)).await?; Ok(file_position) } async fn seek_to_end(&mut self) -> Result { let file_position = self.file().seek(SeekFrom::End(0)).await?; Ok(file_position) } // Seeks from current position by offset and returns new file position async fn seek_by(&mut self, offset: i64) -> Result { let file_position = self.file().seek(SeekFrom::Current(offset)).await?; Ok(file_position) } async fn current_file_position(&mut self) -> Result { let next_file_position: FilePosition = self.file().stream_position().await?; Ok(next_file_position) } async fn is_at_eof(&mut self) -> Result { let current_file_position = self.current_file_position().await?; let eof_file_position = self.eof_file_position(); Ok(current_file_position == eof_file_position) } } #[async_trait] pub(crate) trait PrimitiveWriteCursor: PrimitiveCursor { async fn write_bytes(&mut self, bytes: &[u8]) -> Result { Ok(self.file().write(bytes).await?) } } #[async_trait] pub trait CursorWithStoreHeader: PrimitiveCursor { fn header(&self) -> &StoreHeader; async fn seek_to_start_of_data(&mut self) -> Result { self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await } async fn read_entry_header(&mut self) -> Result { let number_of_columns: usize = self.header().number_of_columns; let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; self.read_bytes(&mut header_bytes).await?; let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?; Ok(header) } async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { self.seek_to(file_position).await?; self.read_entry_header().await } // Returns None when file_position == eof_file_position async fn read_entry_at(&mut self, file_position: FilePosition) -> Result>> where T: Decode { self.seek_to(file_position).await?; self.next().await } // ===Iteration=== // The following functions assume that the current file position is at a valid entry or EOF. // WARNING: This moves the file_position to start of the data, so you can't just call // next_entry_header() a bunch of times. You must move the cursor! async fn next_entry_header(&mut self) -> Result> { if self.is_at_eof().await? { return Ok(None) } let entry_header = self.read_entry_header().await?; Ok(Some(entry_header)) } // This is meant to be used after next_entry_header() is called. async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result{ let file_position = self.seek_by(entry_header.size_of_data() as i64).await?; Ok(file_position) } async fn next(&mut self) -> Result>> where T: Decode { let file_position = self.current_file_position().await?; let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; let mut data_bytes: Vec = vec![0; entry_header.size_of_data()]; self.read_bytes(&mut data_bytes).await?; let entry: EntryDetailed = EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?; Ok(Some(entry)) } // Like next, but only reads the column, not the whole entry. async fn next_at_column(&mut self, column: Column) -> Result> where T: Decode + Send { let file_position = self.current_file_position().await?; let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; let file_position_at_start_of_data = self.current_file_position().await?; // figuring out how much to decode let column_offset = entry_header.offset_of_column(column); self.seek_by(column_offset as i64).await?; // reading and decoding let mut bytes: Vec = vec![0; entry_header.data_sizes[column as usize]]; self.read_bytes(&mut bytes).await?; let (value, _) = decode::(&bytes[..]) .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; // jumping to next entry self.seek_to(file_position_at_start_of_data).await?; self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?; Ok(Some((entry_header, file_position, value))) } async fn next_alive_at_column(&mut self, column: Column) -> Result> where T: Decode + Send { while let Some((header, file_position, t)) = self.next_at_column(column).await? { if !header.is_deleted { return Ok(Some((header, file_position, t))) } } Ok(None) } async fn next_alive(&mut self) -> Result>> where T: Decode { while let Some(entry) = self.next().await? { if !entry.header.is_deleted { return Ok(Some(entry)) } } Ok(None) } // ===Search=== async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> where T: Decode + PartialEq + Send + Sync { let mut file_position = self.current_file_position().await?; while let Some((_, _, t)) = self.next_alive_at_column(column).await? { if &t == t0 { // go back and decode the whole entry self.seek_to(file_position).await?; return self.next().await } else { file_position = self.current_file_position().await?; } } Ok(None) } async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> where T: Decode + PartialEq + Send + Sync { let mut entries = vec![]; while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? { entries.push(entry) } Ok(entries) } // ===Debugging=== async fn read_entries(&mut self) -> Result<()> where T: Decode + std::fmt::Debug { self.seek_to_start_of_data().await?; while let Some(entry) = self.next().await? { println!("{:?}", entry); } println!("END of entries."); Ok(()) } async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> { let mut bytes: Vec = vec![]; self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?; self.file().read_to_end(&mut bytes).await?; Ok(bytes) } } #[async_trait] pub trait CursorWithWriteStoreHeader: CursorWithStoreHeader + PrimitiveWriteCursor { fn header_mut(&mut self) -> &mut StoreHeader; fn set_eof_file_position(&mut self, new_file_position: FilePosition); // ===Store Header Manipulation=== async fn increment_total_count(&mut self) -> Result<()> where T: Send { self.seek_to_start().await?; self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; let new_count = self.header_mut().increment_total_count(); self.write_bytes(&encode::(&new_count)?).await?; Ok(()) } async fn increment_deleted_count(&mut self) -> Result<()> where T: Send { self.seek_to_start().await?; self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; let new_count = self.header_mut().increment_deleted_count(); self.write_bytes(&encode::(&new_count)?).await?; Ok(()) } async fn set_header(&mut self, header: &StoreHeader) -> Result<()> where T: Send { self.seek_to_start().await?; let encoded_header: Vec = header.encode()?; self.write_bytes(&encoded_header).await?; Ok(()) } // ===Append Entry=== // Moves cursor to the end. // Returns file position to the start of the new entry. async fn append_entry_no_indexing(&mut self, entry: &Entry) -> Result where T: Encode + Send + Sync { self.increment_total_count().await?; let encoded_entry: Vec = entry.encode()?; let file_position = self.seek_to_end().await?; self.write_bytes(&encoded_entry).await?; let eof_file_position: FilePosition = self.current_file_position().await?; self.set_eof_file_position(eof_file_position); Ok(file_position) } } #[async_trait] pub trait CursorWithAccessToIndex: CursorWithStoreHeader { fn indexes(&mut self) -> &[Option>]; async fn index_lookup(&mut self, column: Column, value: &T) -> Result>> where T: Encode + Decode + Ord + Send + Sync { match &self.indexes()[column as usize] { Some(index) => { let file_positions = index.lookup(value).await?.unwrap_or_else(|| HashSet::new()); let mut entries: Vec> = vec![]; for &file_position in file_positions.iter() { match self.read_entry_at(file_position).await? { Some(entry) => { entries.push(entry) }, None => { return Err(Error::IndexIsStoringEofFilePosition(column)) } } } Ok(entries) }, None => Err(Error::AttemptToIndexNonIndexableColumn(column)) } } async fn select_entries_where_eq(&mut self, column: Column, value: &T) -> Result>> where T: Encode + Decode + Ord + Send + Sync { if self.header().is_column_indexed(column) { println!("INDEXED LOOKUP"); self.index_lookup(column, value).await } else { println!("BRUTE-FORCE LOOKUP"); self.find_all_eq_bruteforce(column, value).await } } } #[async_trait] pub trait CursorWithWriteAccessToIndex: CursorWithAccessToIndex + CursorWithWriteStoreHeader { fn indexes_mut(&mut self) -> &mut [Option>]; // Assumes that the column is indexable. fn mut_index_at(&mut self, column: Column) -> &mut Index { match &mut self.indexes_mut()[column as usize] { Some(index) => { index }, None => { unreachable!() } } } // Assumes that the column is indexable. async fn insert_into_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> where T: Encode + Decode + Ord + Send + Sync + 'async_trait { let index = self.mut_index_at(column as Column); index.insert(value, file_position).await?; Ok(()) } // Assumes that the column is indexable. async fn delete_from_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> where T: Encode + Decode + Ord + Send + Sync + 'async_trait { let index = self.mut_index_at(column as Column); index.delete(value, file_position).await?; Ok(()) } async fn insert_entry(&mut self, entry: Entry) -> Result where T: Encode + Decode + Ord + Send + Sync + 'async_trait { let file_position = self.append_entry_no_indexing(&entry).await?; // insert the indexable values of the entry into corresponding indexes. for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { if should_index { // SAFETY: If should_index is true, then the column is indexable. self.insert_into_index(column as Column, value, file_position).await? } } Ok(file_position) } async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed) -> Result<()> where T: Encode + Decode + Ord + Send + Sync + 'async_trait { for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { if should_index { // SAFETY: If should_index is true, then the column is indexable. self.delete_from_index(column as Column, value, entry.file_position).await? } } Ok(()) } } // ===========Implementations============= // ===PrimitiveCursor=== impl PrimitiveCursor for ReadCursor<'_, T> { fn file(&mut self) -> &mut File { &mut self.file } fn eof_file_position(&self) -> FilePosition { self.eof_file_position } } impl PrimitiveCursor for WriteCursor<'_, T> { fn file(&mut self) -> &mut File { &mut self.file } fn eof_file_position(&self) -> FilePosition { self.eof_file_position } } impl PrimitiveCursor for AppendOnlyCursor { fn file(&mut self) -> &mut File { &mut self.file } fn eof_file_position(&self) -> FilePosition { self.eof_file_position } } // ===PrimitiveCursor=== impl PrimitiveWriteCursor for WriteCursor<'_, T> {} impl PrimitiveWriteCursor for AppendOnlyCursor {} // ===CursorWithStoreHeader=== impl CursorWithStoreHeader for ReadCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } impl CursorWithStoreHeader for WriteCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } impl CursorWithStoreHeader for AppendOnlyCursor { fn header(&self) -> &StoreHeader { &self.header } } // ===CursorWithWriteStoreHeader=== impl CursorWithWriteStoreHeader for WriteCursor<'_, T> { fn header_mut(&mut self) -> &mut StoreHeader { self.header } fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } } impl CursorWithWriteStoreHeader for AppendOnlyCursor { fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header } fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } } // ===CursorWithAccessToIndex=== impl CursorWithAccessToIndex for ReadCursor<'_, T> { fn indexes(&mut self) -> &[Option>] { &self.indexes } } impl CursorWithAccessToIndex for WriteCursor<'_, T> { fn indexes(&mut self) -> &[Option>] { &self.indexes } } // ===CursorWithWriteAccessToIndex=== impl CursorWithWriteAccessToIndex for WriteCursor<'_, T> { fn indexes_mut(&mut self) -> &mut [Option>] { self.indexes } } impl <'cursor, T> ReadCursor<'cursor, T> { pub async fn new<'store: 'cursor>(store: &'store Store) -> Result where T: Send + Sync { let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let file: File = OpenOptions::new() .read(true) .open(path_to_rows) .await?; let mut cursor = Self { header: store.header.clone(), file, indexes: &store.indexes, eof_file_position: 0, }; let eof_file_position: FilePosition = cursor.seek_to_end().await?; cursor.eof_file_position = eof_file_position; cursor.seek_to_start_of_data().await?; Ok(cursor) } } impl <'cursor, T> WriteCursor<'cursor, T> { // 'store lives atleast as long as 'cursor pub async fn new<'store: 'cursor>(store: &'store mut Store) -> Result where T: Send { let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let file: File = OpenOptions::new() .read(true) .write(true) .open(path_to_rows) .await?; let mut cursor = Self { header: &mut store.header, file, indexes: &mut store.indexes, eof_file_position: 0, }; let eof_file_position: FilePosition = cursor.seek_to_end().await?; cursor.eof_file_position = eof_file_position; cursor.seek_to_start_of_data().await?; Ok(cursor) } // ===Entry Header Manipulation=== // assumes we are at the start of valid entry. async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> where T: Send { let bytes: Vec = entry_header.encode()?; self.write_bytes(&bytes).await?; Ok(()) } // ===Deletion=== pub async fn mark_deleted_at(&mut self, file_position: FilePosition, enable_garbage_collector: bool) -> Result<()> where T: Encode + Decode + Ord + Send + Sync + Clone + Ord { println!("MARKING {} as DELETED", file_position); self.seek_to(file_position).await?; let mut entry_header = self.read_entry_header().await?; if entry_header.is_deleted { Ok(()) } else { // Update store and entry headers self.increment_deleted_count().await?; self.seek_to(file_position).await?; entry_header.is_deleted = true; self.set_new_entry_header(entry_header.into()).await?; // Update index self.seek_to(file_position).await?; match self.next().await? { Some(entry) => { self.delete_entry_values_from_indexes(entry).await? }, None => { // SAFETY: We just modified its header, so it must exist. unreachable!() } } if enable_garbage_collector { self.attempt_garbage_collection_if_necessary().await?; } Ok(()) } } async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T, enable_garbage_collector: bool) -> Result>> where T: Encode + Decode + Ord + Send + Sync + Clone { let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?; if let Some(entry) = maybe_entry { self.mark_deleted_at(entry.file_position, enable_garbage_collector).await?; Ok(Some(entry)) } else { Ok(maybe_entry) } } // Doesn't update indexes. async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result where T: Encode + Decode + Ord + Send + Sync + Clone { let mut count = 0; while let Some(_) = self.find_first_eq_bruteforce_and_delete(column, t0, false).await? { count += 1; } Ok(count) } pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result where T: Encode + Decode + Ord + Send + Sync + Clone { let count = if self.header().is_column_indexed(column) { println!("DELETION: INDEXED LOOKUP"); let entries = self.index_lookup(column, value).await?; let count = entries.len(); for entry in entries { self.mark_deleted_at(entry.file_position, false).await? } count } else { println!("DELETION: BRUTE-FORCE LOOKUP"); let count = self.find_all_eq_bruteforce_and_delete(column, value).await?; count }; if enable_garbage_collector { self.attempt_garbage_collection_if_necessary().await?; } Ok(count) } // ===Indexing=== // WARNING: Assumes the column is NOT indexable. pub async fn attach_index(&mut self, column: Column) -> Result<()> where T: Ord + Decode + Encode + Send + Sync { // New Index let index = Store::create_empty_index_at(&self.header, column).await?; self.indexes[column as usize] = Some(index); // Mark column as indexed self.header.make_column_indexed(column); self.set_header(&self.header.clone()).await?; // Build index self.seek_to_start_of_data().await?; while let Some((_, file_position, value)) = self.next_alive_at_column(column).await? { self.insert_into_index(column, value, file_position).await? } Ok(()) } // ===Garbage Collection=== async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> where T: Send + Sync + Decode + Encode + Clone + Ord { if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER { println!("=======START GARBAGE COLLETOR===="); self.initiate_garbage_collection().await?; } Ok(()) } pub async fn initiate_garbage_collection(&mut self) -> Result where T: Send + Sync + Decode + Encode + Clone + Ord { let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?; // Since garbage collection changes FilePositions of live entries, we need to update the // indexes too. let mut in_memory_indexes: Vec>>> = Vec::with_capacity(self.header.number_of_columns); for column in 0..self.header.number_of_columns { if self.header.is_column_indexed(column as Column) { let in_memory_index = BTreeMap::new(); in_memory_indexes.push(Some(in_memory_index)) } else { in_memory_indexes.push(None) } } // We'll dump all alive entries into a new file. let mut entries_deleted = 0; self.seek_to_start_of_data().await?; { while let Some(live_entry) = self.next_alive().await? { entries_deleted += 1; let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?; // Update index. (Wouldn't it be nice if we had `for let ...`?) for (maybe_in_memory_index, value) in in_memory_indexes.iter_mut().zip(&live_entry.data) { if let Some(in_memory_index) = maybe_in_memory_index { in_memory_index.entry(value.clone()).or_insert_with(HashSet::new).insert(file_position); } } } } // ===swap=== // swapping indexes // Update indexes on disk. for (column, maybe_in_memory_index) in in_memory_indexes.into_iter().enumerate() { if let Some(in_memory_index) = maybe_in_memory_index { let index = self.mut_index_at(column as Column); index.reset(in_memory_index).await?; } } // swapping headers self.header.deleted_count = 0; self.header.total_count = cursor_to_intermediate.header.total_count; self.file = cursor_to_intermediate.file; self.eof_file_position = cursor_to_intermediate.eof_file_position; // swap files on disk // current file let path_to_table = Path::new(&self.header.table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); let path_to_intermediate_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); fs::remove_file(path_to_rows.clone()).await?; fs::rename(path_to_intermediate_rows, path_to_rows).await?; Ok(entries_deleted) } async fn spawn_cursor_to_intermediate_file(&self) -> Result> where T: Send { let table_folder = self.header.table_folder.to_string(); let path_to_table = Path::new(&table_folder); let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); let intermediate_file: File = Store::::create_empty_rows_file(path_to_rows, &self.header).await?; let intermediate_header: StoreHeader = StoreHeader { table_folder, number_of_columns: self.header.number_of_columns, deleted_count: 0, total_count: 0, primary_column: self.header.primary_column, indexed_columns: self.header.indexed_columns.clone(), }; // Creates a new (append) cursor to the intermediate file in which we'll dump the live entries. let mut cursor_to_intermediate = AppendOnlyCursor { header: intermediate_header, file: intermediate_file, data_type: PhantomData::, eof_file_position: 0, }; let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?; cursor_to_intermediate.eof_file_position = eof_file_position; Ok(cursor_to_intermediate) } }