diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 0c3a5df..05f9d05 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -13,7 +13,7 @@ use crate::binary_coding::{encode, decode}; use crate::entry::{Entry, EntryDetailed}; use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader}; use crate::store_header::StoreHeader; -use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; +use crate::storage_engine::{Store, FilePosition, Column, Result, StoreIndexes, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; use crate::index::Index; @@ -349,7 +349,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> Ok(cursor) } - pub async fn connect<'header: 'cursor, 'indexes: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader, indexes: &'indexes mut Vec>>) -> Result + pub async fn connect<'header: 'cursor, 'indexes: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader, indexes: &'indexes mut StoreIndexes) -> Result where T: Send { let file: File = diff --git a/storage_engine/src/index.rs b/storage_engine/src/index.rs index 97da926..0cc01de 100644 --- a/storage_engine/src/index.rs +++ b/storage_engine/src/index.rs @@ -1,6 +1,6 @@ use std::marker::PhantomData; -use std::path::Path; -use tokio::fs::{DirBuilder, File, OpenOptions}; +use std::path::PathBuf; +use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use async_trait::async_trait; @@ -38,7 +38,7 @@ where K: Encode + Decode + Ord, V: Encode + Decode + Clone + Eq + Hash, { - pub async fn new(file_name: &str) -> Result> { + pub async fn new(file_name: PathBuf) -> Result> { let file: File = OpenOptions::new() .read(true) .write(true) @@ -56,7 +56,7 @@ where }) } - pub async fn connect(file_name: &str) -> Result> { + pub async fn connect(file_name: PathBuf) -> Result> { let mut file: File = OpenOptions::new() .read(true) .write(true) @@ -124,7 +124,7 @@ mod tests { index.insert("bar".to_string(), 125).await.unwrap(); index.insert("bar".to_string(), 126).await.unwrap(); - let lookup = index.lookup("foo".to_string()).await.unwrap().unwrap(); + let lookup = index.lookup(&"foo".to_string()).await.unwrap().unwrap(); assert_eq!(lookup.len(), 2); assert!(lookup.contains(&123)); assert!(lookup.contains(&124)); @@ -139,7 +139,7 @@ mod tests { value_type: PhantomData::, }; - let lookup = decoded.lookup("foo".to_string()).await.unwrap().unwrap(); + let lookup = decoded.lookup(&"foo".to_string()).await.unwrap().unwrap(); assert_eq!(lookup.len(), 2); assert!(lookup.contains(&123)); assert!(lookup.contains(&124)); diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index d696e38..e2df22e 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -16,27 +16,16 @@ pub type Result = std::result::Result; pub type Column = u64; pub type FilePosition = u64; -// TODO: Consider introducing a phantom type for the data that's used in the store. +// TODO: Consider adding another type parameter for indexable values #[derive(Debug)] pub struct Store { - // TODO: This needs to track how many read-write cursors there are...? - // RWMutex - // {write: 0, read: n} ~> {write:0, read: n + 1} // create read - // {write: 0, read: n + 1} ~> {write:0, read: n} // destroy read - // {write: 0, read: 0} ~> {write: 1, read: 0} // create write - // {write: 1, read: 0} ~> {write: 0, read: 0} // destroy write - // primary_index: Vec>>, - // indexes: Vec>>>, - // primary_index: Index, - - // TODO: It's not good to have StoreHeader copied to all the cursors, since they may modify it. - // How to sync? - // All pub header: StoreHeader, pub data_type: PhantomData, - pub indexes: Vec>>, + pub indexes: StoreIndexes, } +pub type StoreIndexes = Vec>>; + pub type PositionOfValue = FilePosition; pub type PositionOfRow = FilePosition; @@ -75,13 +64,7 @@ impl Store { // We don't need the file right now. Only cursors will later open it. Self::create_empty_rows_file(path_to_rows, &header).await?; - // TODO: I need to construct indexes - // let primary_index: Index = Index::new( - // &format!("rows_{}", primary_column.to_string()), - // ).await?; - - // TODO - let indexes = vec![]; + let indexes: StoreIndexes = Self::create_initial_indexes(&header).await?; let store = Self { header, @@ -92,6 +75,42 @@ impl Store { Ok(store) } + pub fn path_to_index_file(header: &StoreHeader, column: Column) -> PathBuf { + let path_to_table = Path::new(&header.table_folder); + let path_to_index = path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column.to_string())); + path_to_index + } + + pub async fn create_empty_index_at(header: &StoreHeader, column: Column) -> Result> + where T: Encode + Decode + Ord + { + let path_to_index = Self::path_to_index_file(&header, column); + let index = Index::new(path_to_index).await?; + + Ok(index) + } + + pub async fn create_initial_indexes(header: &StoreHeader) -> Result> + where T: Encode + Decode + Ord + { + let mut result: StoreIndexes = Vec::with_capacity(header.number_of_columns); + for _ in 0..header.number_of_columns { + result.push(None) + } + + result[header.primary_column as usize] = Some(Self::create_empty_index_at(&header, header.primary_column).await?); + + Ok(result) + } + + pub async fn connect_index_at(header: &StoreHeader, column: Column) -> Result> + where T: Encode + Decode + Ord + { + let path_to_index = Self::path_to_index_file(&header, column); + let index: Index = Index::connect(path_to_index).await?; + Ok(index) + } + pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result { let mut file: File = OpenOptions::new() @@ -134,17 +153,26 @@ impl Store { }; - // let primary_index: Index = Index::connect( - // &format!("rows_{}", header.primary_column.to_string()), - // ).await?; + let indexes: StoreIndexes = { + let mut result = Vec::with_capacity(header.number_of_columns); + for (column, &is_indexed) in header.indexed_columns.iter().enumerate() { + if is_indexed { + result.push(None) + // TODO: Once index connect is working, uncomment this line (and remove the + // above .push line + // result.push(Some(Self::connect_index_at(&header, column as Column).await?)) + } else { + result.push(None) + } + } - // TODO - let indexes = vec![]; + result + }; let store = Self { header, data_type: PhantomData::, - indexes, + indexes }; Ok(store) }