use tokio::fs::{File, OpenOptions}; use tokio::fs; use std::path::Path; use std::marker::PhantomData; use std::collections::{BTreeMap, HashSet}; use bincode; use bincode::{Decode, Encode}; use crate::segments::entry::EntryDetailed; use crate::segments::entry_header::EntryHeader; use crate::segments::store_header::StoreHeader; use crate::store::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; use crate::index::Index; use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; use crate::cursor_capabilities::traversal::CursorCanTraverse; use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries; use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex}; const GARBAGE_COLLECTION_TRIGGER: usize = 100; // ===Concrete Cursors=== pub struct ReadCursor<'a, T> { header: StoreHeader, indexes: &'a [Option>], file: File, eof_file_position: FilePosition, } pub struct WriteCursor<'a, T> { header: &'a mut StoreHeader, indexes: &'a mut [Option>], file: File, eof_file_position: FilePosition, } // This is used as a cursor to temporary file during Garbage Collection pub struct AppendOnlyCursor { header: StoreHeader, file: File, data_type: PhantomData, eof_file_position: FilePosition, } // ===========Implementations============= // ===primitive capabilities=== impl CursorCanRead for ReadCursor<'_, T> { fn file(&mut self) -> &mut File { &mut self.file } fn eof_file_position(&self) -> FilePosition { self.eof_file_position } } impl CursorCanRead for WriteCursor<'_, T> { fn file(&mut self) -> &mut File { &mut self.file } fn eof_file_position(&self) -> FilePosition { self.eof_file_position } } impl CursorCanRead for AppendOnlyCursor { fn file(&mut self) -> &mut File { &mut self.file } fn eof_file_position(&self) -> FilePosition { self.eof_file_position } } impl CursorCanWrite for WriteCursor<'_, T> {} impl CursorCanWrite for AppendOnlyCursor {} // ===capability to access header=== impl CursorCanTraverse for ReadCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } impl CursorCanTraverse for WriteCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } impl CursorCanTraverse for AppendOnlyCursor { fn header(&self) -> &StoreHeader { &self.header } } impl CursorCanModifyEntries for WriteCursor<'_, T> { fn header_mut(&mut self) -> &mut StoreHeader { self.header } fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } } impl CursorCanModifyEntries for AppendOnlyCursor { fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header } fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } } // ===capability to access index=== impl CursorCanReadIndex for ReadCursor<'_, T> { fn indexes(&mut self) -> &[Option>] { &self.indexes } } impl CursorCanReadIndex for WriteCursor<'_, T> { fn indexes(&mut self) -> &[Option>] { &self.indexes } } impl CursorCanWriteToIndex for WriteCursor<'_, T> { fn indexes_mut(&mut self) -> &mut [Option>] { self.indexes } } // ===Specifics=== impl <'cursor, T> ReadCursor<'cursor, T> { pub async fn new<'store: 'cursor>(store: &'store Store) -> Result where T: Send + Sync { let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let file: File = OpenOptions::new() .read(true) .open(path_to_rows) .await?; let mut cursor = Self { header: store.header.clone(), file, indexes: &store.indexes, eof_file_position: 0, }; let eof_file_position: FilePosition = cursor.seek_to_end().await?; cursor.eof_file_position = eof_file_position; cursor.seek_to_start_of_data().await?; Ok(cursor) } } impl <'cursor, T> WriteCursor<'cursor, T> { // 'store lives atleast as long as 'cursor pub async fn new<'store: 'cursor>(store: &'store mut Store) -> Result where T: Send { let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let file: File = OpenOptions::new() .read(true) .write(true) .open(path_to_rows) .await?; let mut cursor = Self { header: &mut store.header, file, indexes: &mut store.indexes, eof_file_position: 0, }; let eof_file_position: FilePosition = cursor.seek_to_end().await?; cursor.eof_file_position = eof_file_position; cursor.seek_to_start_of_data().await?; Ok(cursor) } // ===Entry Header Manipulation=== // assumes we are at the start of valid entry. async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> where T: Send { let bytes: Vec = entry_header.encode()?; self.write_bytes(&bytes).await?; Ok(()) } // ===Deletion=== pub async fn mark_deleted_at(&mut self, file_position: FilePosition, enable_garbage_collector: bool) -> Result<()> where T: Encode + Decode + Ord + Send + Sync + Clone + Ord { self.seek_to(file_position).await?; let mut entry_header = self.read_entry_header().await?; if entry_header.is_deleted { Ok(()) } else { // Update store and entry headers self.increment_deleted_count().await?; self.seek_to(file_position).await?; entry_header.is_deleted = true; self.set_new_entry_header(entry_header.into()).await?; // Update index self.seek_to(file_position).await?; match self.next().await? { Some(entry) => { self.delete_entry_values_from_indexes(entry).await? }, None => { // SAFETY: We just modified its header, so it must exist. unreachable!() } } if enable_garbage_collector { self.attempt_garbage_collection_if_necessary().await?; } Ok(()) } } async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T, enable_garbage_collector: bool) -> Result>> where T: Encode + Decode + Ord + Send + Sync + Clone { let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?; if let Some(entry) = maybe_entry { self.mark_deleted_at(entry.file_position, enable_garbage_collector).await?; Ok(Some(entry)) } else { Ok(maybe_entry) } } // Doesn't update indexes. async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result where T: Encode + Decode + Ord + Send + Sync + Clone { let mut count = 0; while let Some(_) = self.find_first_eq_bruteforce_and_delete(column, t0, false).await? { count += 1; } Ok(count) } pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result where T: Encode + Decode + Ord + Send + Sync + Clone { let mut count = 0; while let Some(entry) = self.next_alive().await? { count += 1; self.mark_deleted_at(entry.file_position, false).await? } if enable_garbage_collector { self.attempt_garbage_collection_if_necessary().await?; } Ok(count) } pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result where T: Encode + Decode + Ord + Send + Sync + Clone { let count = if self.header().is_column_indexed(column) { let entries = self.index_lookup(column, value).await?; let count = entries.len(); for entry in entries { self.mark_deleted_at(entry.file_position, false).await? } count } else { let count = self.find_all_eq_bruteforce_and_delete(column, value).await?; count }; if enable_garbage_collector { self.attempt_garbage_collection_if_necessary().await?; } Ok(count) } // ===Indexing=== // WARNING: Assumes the column is NOT indexable. pub async fn attach_index(&mut self, column: Column) -> Result<()> where T: Ord + Decode + Encode + Send + Sync { // New Index let index = Store::create_empty_index_at(&self.header, column).await?; self.indexes[column as usize] = Some(index); // Mark column as indexed self.header.make_column_indexed(column); self.set_header(&self.header.clone()).await?; // Build index self.seek_to_start_of_data().await?; while let Some((_, file_position, value)) = self.next_alive_at_column(column).await? { self.insert_into_index(column, value, file_position).await? } Ok(()) } // ===Garbage Collection=== async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> where T: Send + Sync + Decode + Encode + Clone + Ord { if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER { println!("=======START GARBAGE COLLECTOR===="); self.initiate_garbage_collection().await?; println!("=======GARBAGE COLLECTOR FINISHED===="); } Ok(()) } pub async fn initiate_garbage_collection(&mut self) -> Result where T: Send + Sync + Decode + Encode + Clone + Ord { let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?; // Since garbage collection changes FilePositions of live entries, we need to update the // indexes too. let mut in_memory_indexes: Vec>>> = Vec::with_capacity(self.header.number_of_columns); for column in 0..self.header.number_of_columns { if self.header.is_column_indexed(column as Column) { let in_memory_index = BTreeMap::new(); in_memory_indexes.push(Some(in_memory_index)) } else { in_memory_indexes.push(None) } } // We'll dump all alive entries into a new file. let mut entries_deleted = 0; self.seek_to_start_of_data().await?; { while let Some(live_entry) = self.next_alive().await? { entries_deleted += 1; let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?; // Update index. (Wouldn't it be nice if we had `for let ...`?) for (maybe_in_memory_index, value) in in_memory_indexes.iter_mut().zip(&live_entry.data) { if let Some(in_memory_index) = maybe_in_memory_index { in_memory_index.entry(value.clone()).or_insert_with(HashSet::new).insert(file_position); } } } } // ===swap=== // swapping indexes // Update indexes on disk. for (column, maybe_in_memory_index) in in_memory_indexes.into_iter().enumerate() { if let Some(in_memory_index) = maybe_in_memory_index { let index = self.mut_index_at(column as Column); index.reset(in_memory_index).await?; } } // swapping headers self.header.deleted_count = 0; self.header.total_count = cursor_to_intermediate.header.total_count; self.file = cursor_to_intermediate.file; self.eof_file_position = cursor_to_intermediate.eof_file_position; // swap files on disk // current file let path_to_table = Path::new(&self.header.table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); let path_to_intermediate_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); fs::remove_file(path_to_rows.clone()).await?; fs::rename(path_to_intermediate_rows, path_to_rows).await?; Ok(entries_deleted) } async fn spawn_cursor_to_intermediate_file(&self) -> Result> where T: Send { let table_folder = self.header.table_folder.clone(); let path_to_table = Path::new(&table_folder); let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); let intermediate_file: File = Store::::create_empty_rows_file(path_to_rows, &self.header).await?; let intermediate_header: StoreHeader = StoreHeader { table_folder, number_of_columns: self.header.number_of_columns, deleted_count: 0, total_count: 0, primary_column: self.header.primary_column, indexed_columns: self.header.indexed_columns.clone(), }; // Creates a new (append) cursor to the intermediate file in which we'll dump the live entries. let mut cursor_to_intermediate = AppendOnlyCursor { header: intermediate_header, file: intermediate_file, data_type: PhantomData::, eof_file_position: 0, }; let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?; cursor_to_intermediate.eof_file_position = eof_file_position; Ok(cursor_to_intermediate) } }