From 813911293431a0078ee42726aa22a6639837706e Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sun, 4 Feb 2024 23:54:22 +0100 Subject: [PATCH] Make use of indexes --- storage_engine/src/cursor.rs | 108 +++++++++++++++++++++++---- storage_engine/src/entry.rs | 4 +- storage_engine/src/error.rs | 4 + storage_engine/src/main.rs | 26 +++++-- storage_engine/src/storage_engine.rs | 5 +- 5 files changed, 120 insertions(+), 27 deletions(-) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 72a6a4c..5587d6b 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -245,18 +245,6 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { } } -#[async_trait] -pub trait CursorWithAccessToIndex: CursorWithStoreHeader { - fn indexes(&mut self) -> &[Option>]; - - async fn find_in_index(&mut self, k: &T) -> Result> - where T: Encode + Decode + Ord + Send + Sync - { - // let x = self.primary_index().lookup(k).await?; - todo!() - } -} - #[async_trait] pub trait CursorWithWriteStoreHeader: CursorWithStoreHeader + PrimitiveWriteCursor { fn header_mut(&mut self) -> &mut StoreHeader; @@ -287,7 +275,7 @@ pub trait CursorWithWriteStoreHeader: CursorWithStoreHeader + PrimitiveWri // Moves cursor to the end. // Returns file position to the start of the new entry. - async fn append_entry(&mut self, entry: &Entry) -> Result + async fn append_entry_no_indexing(&mut self, entry: &Entry) -> Result where T: Encode + Send + Sync { self.increment_total_count().await?; @@ -303,6 +291,94 @@ pub trait CursorWithWriteStoreHeader: CursorWithStoreHeader + PrimitiveWri } } +#[async_trait] +pub trait CursorWithAccessToIndex: CursorWithStoreHeader { + fn indexes(&mut self) -> &[Option>]; + + async fn index_lookup(&mut self, column: Column, k: &T) -> Result>> + where T: Encode + Decode + Ord + Send + Sync + { + match &self.indexes()[column as usize] { + Some(index) => { + let file_positions = index.lookup(k).await?.unwrap_or_else(|| HashSet::new()); + let mut entries: Vec> = vec![]; + for &file_position in file_positions.iter() { + match self.read_entry_at(file_position).await? { + Some(entry) => { + entries.push(entry) + }, + None => { + return Err(Error::IndexIsStoringEofFilePosition(column)) + } + } + } + + Ok(entries) + }, + None => + Err(Error::AttemptToIndexNonIndexableColumn(column)) + } + } + + // TODO: I also need the global find + async fn lookup(&mut self, column: Column, k: &T) -> Result>> { + todo!() + } +} + +#[async_trait] +pub trait CursorWithWriteAccessToIndex: CursorWithAccessToIndex + CursorWithWriteStoreHeader { + fn indexes_mut(&mut self) -> &mut [Option>]; + + // Assumes that the column is indexable. + fn mut_index_at(&mut self, column: Column) -> &mut Index { + match &mut self.indexes_mut()[column as usize] { + Some(index) => { + index + }, + None => { + unreachable!() + } + } + } + + // Assumes that the column is indexable. + async fn insert_into_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> + where T: Encode + Decode + Ord + Send + Sync + 'async_trait + { + let index = self.mut_index_at(column as Column); + index.insert(value, file_position).await?; + Ok(()) + } + + // Assumes that the column is indexable. + async fn delete_from_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> + where T: Encode + Decode + Ord + Send + Sync + 'async_trait + { + let index = self.mut_index_at(column as Column); + index.delete(value, file_position).await?; + Ok(()) + } + + async fn insert_entry(&mut self, entry: Entry) -> Result + // TODO: Why is 'async_trait necessary? + where T: Encode + Decode + Ord + Send + Sync + 'async_trait + { + let file_position = self.append_entry_no_indexing(&entry).await?; + + // insert the indexable values of the entry into corresponding indexes. + for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { + if should_index { + // SAFETY: If should_index is true, then the column is indexable. + self.insert_into_index(column as Column, value, file_position).await? + } + } + + Ok(file_position) + } +} + + // ===========Implementations============= // ===PrimitiveCursor=== impl PrimitiveCursor for ReadCursor<'_, T> { @@ -373,6 +449,10 @@ impl CursorWithAccessToIndex for WriteCursor<'_, T> { fn indexes(&mut self) -> &[Option>] { &self.indexes } } +// ===CursorWithWriteAccessToIndex=== +impl CursorWithWriteAccessToIndex for WriteCursor<'_, T> { + fn indexes_mut(&mut self) -> &mut [Option>] { self.indexes } +} impl <'cursor, T> ReadCursor<'cursor, T> { @@ -526,7 +606,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> { while let Some(live_entry) = self.next_alive().await? { entries_deleted += 1; - let file_position = cursor_to_intermediate.append_entry(&live_entry.forget()).await?; + let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?; // TODO: Start indexing all of the indexable columns from scratch. } } diff --git a/storage_engine/src/entry.rs b/storage_engine/src/entry.rs index b292051..ed0c1fa 100644 --- a/storage_engine/src/entry.rs +++ b/storage_engine/src/entry.rs @@ -7,8 +7,8 @@ use crate::entry_header::{EntryHeader, EntryHeaderWithDataSize}; #[derive(Debug)] pub struct Entry { - header: EntryHeader, - data: Vec, + pub header: EntryHeader, + pub data: Vec, } #[derive(Debug)] diff --git a/storage_engine/src/error.rs b/storage_engine/src/error.rs index 951f167..a80937b 100644 --- a/storage_engine/src/error.rs +++ b/storage_engine/src/error.rs @@ -1,7 +1,11 @@ +use crate::storage_engine::Column; + #[derive(Debug)] pub enum Error { DecodeError(DecodeErrorKind, bincode::error::DecodeError), EncodeError(bincode::error::EncodeError), + AttemptToIndexNonIndexableColumn(Column), + IndexIsStoringEofFilePosition(Column), IoError(std::io::Error), InvalidStoreHeader, } diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index d8df150..21d560d 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -9,7 +9,7 @@ mod store_header; use crate::entry::{Entry, EntryDetailed}; use crate::storage_engine::{Store, FilePosition}; -use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, CursorWithWriteStoreHeader}; +use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, CursorWithWriteStoreHeader, CursorWithWriteAccessToIndex, CursorWithAccessToIndex}; type Data = u32; @@ -44,10 +44,12 @@ async fn create_or_connect() -> Result> { } -async fn append_entry(cursor: &mut WriteCursor<'_, Data>, entry: &Entry) -> Result { +async fn append_entry(cursor: &mut WriteCursor<'_, Data>, entry: Entry) -> Result { println!("APPENDING"); println!("entry == {:?}", entry); - let file_position: FilePosition = cursor.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?; + + // let file_position: FilePosition = cursor.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?; + let file_position: FilePosition = cursor.insert_entry(entry).await.map_err(|e| e.to_io_or_panic())?; println!("file_position == {:?}", file_position); Ok(file_position) } @@ -63,17 +65,20 @@ async fn read_entry(cursor: &mut ReadCursor<'_, Data>, file_position: FilePositi async fn append_bunch_of_entries(store: &mut Store) -> Result<()> { let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?; let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); - append_entry(&mut cursor, &entry0).await?; + append_entry(&mut cursor, entry0).await?; let entry1: Entry = Entry::new(vec![200, 200, 5, 6, 7]); - append_entry(&mut cursor, &entry1).await?; + append_entry(&mut cursor, entry1).await?; // println!("{:?}", store.read_all_bytes().await?); let entry2: Entry = Entry::new(vec![99, 98, 97, 96, 95]); - append_entry(&mut cursor, &entry2).await?; + append_entry(&mut cursor, entry2).await?; let entry3: Entry = Entry::new(vec![50,50,50,50,50]); - append_entry(&mut cursor, &entry3).await?; + append_entry(&mut cursor, entry3).await?; + + let entry4: Entry = Entry::new(vec![1,50,50,50,50]); // same 0-th column as entry0 + append_entry(&mut cursor, entry4).await?; Ok(()) } @@ -141,6 +146,13 @@ async fn main() -> Result<()> { println!("{:?}", x); } + { + let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + let entries = cursor.index_lookup(0, &1).await.map_err(|e| e.to_io_or_panic())?; + println!("ARE INDEXES WORKING???"); + println!("{:?}", entries); + } + // { // let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; // let column = 3; diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index 6113c88..c631228 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -155,10 +155,7 @@ impl Store { let mut result = Vec::with_capacity(header.number_of_columns); for (column, &is_indexed) in header.indexed_columns.iter().enumerate() { if is_indexed { - result.push(None) - // TODO: Once index connect is working, uncomment this line (and remove the - // above .push line - // result.push(Some(Self::connect_index_at(&header, column as Column).await?)) + result.push(Some(Self::connect_index_at(&header, column as Column).await?)) } else { result.push(None) }