From 89305b6126b3b61faaa960bcf0ed080855a748d7 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sun, 4 Feb 2024 16:57:19 +0100 Subject: [PATCH] Sketch out indexes in Store --- storage_engine/src/cursor.rs | 70 +++++++++++++++++++++++----- storage_engine/src/index.rs | 4 +- storage_engine/src/main.rs | 2 +- storage_engine/src/storage_engine.rs | 38 ++++++++------- 4 files changed, 82 insertions(+), 32 deletions(-) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 05f14db..a0c1671 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -14,10 +14,14 @@ use crate::entry::{Entry, EntryDetailed}; use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader}; use crate::store_header::StoreHeader; use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; +use crate::index::Index; + + // ===Concrete Cursors=== -pub struct ReadCursor { +pub struct ReadCursor<'a, T> { header: StoreHeader, + indexes: Vec>>, file: File, data_type: PhantomData, @@ -26,6 +30,7 @@ pub struct ReadCursor { pub struct WriteCursor<'a, T> { header: &'a mut StoreHeader, + indexes: Vec>>, file: File, data_type: PhantomData, @@ -224,9 +229,20 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { } } +#[async_trait] +pub trait CursorWithAccessToIndex: CursorWithStoreHeader { + fn indexes(&mut self) -> &[Option<&Index>]; + + async fn find_in_index(&mut self, k: &T) -> Result> + where T: Encode + Decode + Ord + Send + Sync + { + // let x = self.primary_index().lookup(k).await?; + todo!() + } +} // ===PrimitiveCursor=== -impl PrimitiveCursor for ReadCursor { +impl PrimitiveCursor for ReadCursor<'_, T> { fn file(&mut self) -> &mut File { &mut self.file } @@ -247,7 +263,7 @@ impl PrimitiveCursor for WriteCursor<'_, T> { } // ===CursorWithStoreHeader=== -impl CursorWithStoreHeader for ReadCursor { +impl CursorWithStoreHeader for ReadCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } @@ -259,9 +275,23 @@ impl CursorWithStoreHeader for WriteCursor<'_, T> { } } -impl ReadCursor { - pub async fn new(store: &Store) -> Result - where T: Send +// ===CursorWithAccessToIndex=== +impl CursorWithAccessToIndex for ReadCursor<'_, T> { + fn indexes(&mut self) -> &[Option<&Index>] { + &self.indexes + } +} + +impl CursorWithAccessToIndex for WriteCursor<'_, T> { + fn indexes(&mut self) -> &[Option<&Index>] { + &self.indexes + } +} + + +impl <'cursor, T> ReadCursor<'cursor, T> { + pub async fn new<'store: 'cursor>(store: &'store Store) -> Result + where T: Send + Sync { let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let file: File = @@ -274,6 +304,7 @@ impl ReadCursor { header: store.header.clone(), file, data_type: store.data_type, + indexes: todo!(), eof_file_position: 0, }; @@ -284,13 +315,12 @@ impl ReadCursor { Ok(cursor) } - - pub async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> Result { - todo!() - } } -impl <'cursor, T> WriteCursor<'cursor, T> { +impl <'cursor, T> WriteCursor<'cursor, T> +// TODO: Consider adding this manually to wher eit is really needed + where T: Sync +{ // 'store lives atleast as long as 'cursor pub async fn new<'store: 'cursor>(store: &'store mut Store) -> Result where T: Send @@ -307,6 +337,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> { header: &mut store.header, file, data_type: store.data_type, + indexes: todo!(), eof_file_position: 0, }; @@ -332,6 +363,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> { header, file, data_type: PhantomData::, + indexes: todo!(), eof_file_position: 0, }; @@ -478,4 +510,20 @@ impl <'cursor, T> WriteCursor<'cursor, T> { // Afterwards we swap the files, and delete the garbage. todo!() } + + // ===Indexing=== + async fn insert_to_index(&mut self, t: T, file_position: FilePosition) -> Result> + where T: Encode + Decode + Ord + Send + Sync + { + // let x = self.primary_index.insert(t, file_position).await?; + todo!() + } + + async fn delete_from_index(&mut self, t: T, file_position: FilePosition) -> Result> + where T: Encode + Decode + Ord + Send + Sync + { + // let x = self.primary_index.delete(t, file_position).await?; + todo!() + } } + diff --git a/storage_engine/src/index.rs b/storage_engine/src/index.rs index f0a1391..97da926 100644 --- a/storage_engine/src/index.rs +++ b/storage_engine/src/index.rs @@ -83,8 +83,8 @@ where Ok(()) } - pub async fn lookup(&self, k: K) -> Result>> { - let hashset = self.data.get(&k).unwrap(); + pub async fn lookup(&self, k: &K) -> Result>> { + let hashset = self.data.get(k).unwrap(); Ok(Some(hashset.clone())) } diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index cf32a88..d9b2562 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -52,7 +52,7 @@ async fn append_entry(cursor: &mut WriteCursor<'_, Data>, entry: &Entry) - Ok(file_position) } -async fn read_entry(cursor: &mut ReadCursor, file_position: FilePosition) -> Result>> { +async fn read_entry(cursor: &mut ReadCursor<'_, Data>, file_position: FilePosition) -> Result>> { println!("READING ENTRY at file_position={}", file_position); // let entry = cursor.read_entry_at(file_position).await.map_err(|e| e.to_io_or_panic())?; // println!("ENTRY: {:?}", entry); diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index 8fdebb0..1ff53ad 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -3,15 +3,12 @@ use tokio::fs::{File, OpenOptions, DirBuilder}; use tokio::fs; use std::path::{Path, PathBuf}; use std::marker::PhantomData; -use async_trait::async_trait; +use bincode::{Decode, Encode}; -use crate::index::SomethingSupportingLeq; use crate::error::Error; use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader}; use crate::store_header::StoreHeader; - -// TODO -// use crate::index::Index; +use crate::index::Index; pub type Result = std::result::Result; @@ -37,6 +34,7 @@ pub struct Store { // All pub header: StoreHeader, pub data_type: PhantomData, + pub primary_index: Index, } pub type PositionOfValue = FilePosition; @@ -48,16 +46,14 @@ pub async fn store_exists(table_folder: &str) -> Result { Ok(fs::metadata(table_folder).await.is_ok()) } -pub async fn less_than_eq(store: &mut Store, file_position0: FilePosition, file_position1: FilePosition) -> Result { - todo!() -} - pub const ROWS_FILE_NAME: &'static str = "rows"; pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate"; impl Store { // ===Creation=== - pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result { + pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result + where T: Encode + Decode + Ord + { let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); DirBuilder::new() @@ -74,14 +70,14 @@ impl Store { // We don't need the file right now. Only cursors will later open it. Self::create_empty_rows_file(path_to_rows, &header).await?; - // TODO: indexes - // let index: Index = Index::new( - // &format!("rows_{}", primary_column.to_string()), - // ).await?; + let primary_index: Index = Index::new( + &format!("rows_{}", primary_column.to_string()), + ).await?; let store = Self { header, data_type: PhantomData::, + primary_index, }; Ok(store) @@ -103,7 +99,7 @@ impl Store { } pub async fn connect(table_folder: &str) -> Result - where T: std::fmt::Debug + where T: std::fmt::Debug + Encode + Decode + Ord { let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); @@ -121,29 +117,35 @@ impl Store { file.read_exact(&mut header_bytes).await?; let header = StoreHeader::decode(table_folder, &mut header_bytes).await?; + + let primary_index: Index = Index::connect( + &format!("rows_{}", header.primary_column.to_string()), + ).await?; + let store = Self { header, data_type: PhantomData::, + primary_index }; Ok(store) } // ===Cursors=== pub async fn read_cursor(&self) -> Result> - where T: Send + where T: Send + Sync { ReadCursor::new(self).await } pub async fn write_cursor(&mut self) -> Result> - where T: Send + where T: Send + Sync { WriteCursor::new(self).await } // For debugging. pub async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> - where T: Send + where T: Send + Sync { let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?; let bytes = cursor.read_all_bytes().await?;