diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 5587d6b..f15a80a 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -173,9 +173,10 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { } // Like next, but only reads the column, not the whole entry. - async fn next_at_column(&mut self, column: Column) -> Result> + async fn next_at_column(&mut self, column: Column) -> Result> where T: Decode + Send { + let file_position = self.current_file_position().await?; let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; let file_position_at_start_of_data = self.current_file_position().await?; @@ -194,9 +195,20 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { self.seek_to(file_position_at_start_of_data).await?; self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?; - Ok(Some((entry_header, value))) + Ok(Some((entry_header, file_position, value))) } + async fn next_alive_at_column(&mut self, column: Column) -> Result> + where T: Decode + Send + { + while let Some((header, file_position, t)) = self.next_at_column(column).await? { + if !header.is_deleted { + return Ok(Some((header, file_position, t))) + } + } + Ok(None) + } + async fn next_alive(&mut self) -> Result>> where T: Decode { @@ -213,7 +225,7 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { where T: Decode + PartialEq + Send + Sync { let mut file_position = self.current_file_position().await?; - while let Some((_, t)) = self.next_at_column(column).await? { + while let Some((_, _, t)) = self.next_alive_at_column(column).await? { if &t == t0 { // go back and decode the whole entry self.seek_to(file_position).await?; @@ -225,6 +237,16 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { Ok(None) } + async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> + where T: Decode + PartialEq + Send + Sync + { + let mut entries = vec![]; + while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? { + entries.push(entry) + } + Ok(entries) + } + // ===Debugging=== async fn read_entries(&mut self) -> Result<()> where T: Decode + std::fmt::Debug @@ -271,6 +293,16 @@ pub trait CursorWithWriteStoreHeader: CursorWithStoreHeader + PrimitiveWri Ok(()) } + async fn set_header(&mut self, header: &StoreHeader) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + let encoded_header: Vec = header.encode()?; + self.write_bytes(&encoded_header).await?; + + Ok(()) + } + // ===Append Entry=== // Moves cursor to the end. @@ -320,9 +352,16 @@ pub trait CursorWithAccessToIndex: CursorWithStoreHeader { } } - // TODO: I also need the global find - async fn lookup(&mut self, column: Column, k: &T) -> Result>> { - todo!() + async fn select_entries_where_eq(&mut self, column: Column, value: &T) -> Result>> + where T: Encode + Decode + Ord + Send + Sync + { + if self.header().is_column_indexed(column) { + println!("INDEXED LOOKUP"); + self.index_lookup(column, value).await + } else { + println!("BRUTE-FORCE LOOKUP"); + self.find_all_eq_bruteforce(column, value).await + } } } @@ -664,11 +703,25 @@ impl <'cursor, T> WriteCursor<'cursor, T> } // ===Indexing=== - async fn insert_to_index(&mut self, t: T, file_position: FilePosition) -> Result> - where T: Encode + Decode + Ord + Send + Sync + // WARNING: Assumes the column is NOT indexable. + pub async fn attach_index(&mut self, column: Column) -> Result<()> + where T: Ord + Decode + Encode + Send + Sync { - // let x = self.primary_index.insert(t, file_position).await?; - todo!() + // New Index + let index = Store::create_empty_index_at(&self.header, column).await?; + self.indexes[column as usize] = Some(index); + + // Mark column as indexed + self.header.make_column_indexed(column); + self.set_header(&self.header.clone()).await?; + + // Build index + self.seek_to_start_of_data().await?; + while let Some((_, file_position, value)) = self.next_alive_at_column(column).await? { + self.insert_into_index(column, value, file_position).await? + } + + Ok(()) } async fn delete_from_index(&mut self, t: T, file_position: FilePosition) -> Result> diff --git a/storage_engine/src/error.rs b/storage_engine/src/error.rs index a80937b..4adf004 100644 --- a/storage_engine/src/error.rs +++ b/storage_engine/src/error.rs @@ -6,6 +6,7 @@ pub enum Error { EncodeError(bincode::error::EncodeError), AttemptToIndexNonIndexableColumn(Column), IndexIsStoringEofFilePosition(Column), + ColumnAlreadyIndexed(Column), IoError(std::io::Error), InvalidStoreHeader, } diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index 21d560d..55bf693 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -148,11 +148,38 @@ async fn main() -> Result<()> { { let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; - let entries = cursor.index_lookup(0, &1).await.map_err(|e| e.to_io_or_panic())?; + let column = 0; + let value = 1; + let entries = cursor.select_entries_where_eq(column, &value).await.map_err(|e| e.to_io_or_panic())?; println!("ARE INDEXES WORKING???"); println!("{:?}", entries); } + { + let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + let column = 1; + let value = 2; + let entries = cursor.select_entries_where_eq(column, &value).await.map_err(|e| e.to_io_or_panic())?; + println!("ARE INDEXES WORKING???"); + println!("{:?}", entries); + } + + { + let column = 1; + println!("BUILDING AN INDEX"); + store.attach_index(column).await.map_err(|e| e.to_io_or_panic())?; + println!("INDEX BUILT!"); + + let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + let value = 2; + let entries = cursor.select_entries_where_eq(column, &value).await.map_err(|e| e.to_io_or_panic())?; + println!("ARE INDEXES WORKING???"); + println!("{:?}", entries); + } + + + + // { // let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; // let column = 3; diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index c631228..94ac5bd 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -184,9 +184,16 @@ impl Store { WriteCursor::new(self).await } - pub async fn make_indexable(&mut self, column: Column) -> Result<()> { - // Creates an index from scratch at above column - todo!() + // ===Indexes=== + pub async fn attach_index(&mut self, column: Column) -> Result<()> + where T: Ord + Decode + Encode + Send + Sync + { + if self.header.is_column_indexed(column) { + Err(Error::ColumnAlreadyIndexed(column)) + } else { + let mut cursor = self.write_cursor().await?; + cursor.attach_index(column).await + } } // For debugging. diff --git a/storage_engine/src/store_header.rs b/storage_engine/src/store_header.rs index 31b23f0..004e26a 100644 --- a/storage_engine/src/store_header.rs +++ b/storage_engine/src/store_header.rs @@ -115,4 +115,12 @@ impl StoreHeader { self.deleted_count += 1; self.deleted_count } + + pub fn is_column_indexed(&self, column: Column) -> bool { + self.indexed_columns[column as usize] + } + + pub fn make_column_indexed(&mut self, column: Column) { + self.indexed_columns[column as usize] = true + } }