From 3e7e8665fd6909f3f0c7a4adc788bac4ada9b2d3 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sat, 3 Feb 2024 16:39:40 +0100 Subject: [PATCH] Split Store into Store and Cursor --- storage_engine/src/main.rs | 20 +- storage_engine/src/storage_engine.rs | 435 +++++++++++++++------------ 2 files changed, 252 insertions(+), 203 deletions(-) diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index a286df5..c5b47de 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -23,11 +23,12 @@ async fn create_store() -> Result> { println!("THE STORE: {:?}", store); println!("THE BYTES: {:?}", store.read_all_bytes().await?); + let mut cursor = store.cursor(AccessMode::Write).await.map_err(|e| e.to_io_or_panic())?; let entry0: Entry = Entry::new_deleted(vec![1, 2, 3, 4, 5]); - append_entry(&mut store, &entry0).await?; + append_entry(&mut cursor, &entry0).await?; let entry1: Entry = Entry::new_deleted(vec![200, 200, 5, 6, 7]); - append_entry(&mut store, &entry1).await?; + append_entry(&mut cursor, &entry1).await?; println!("{:?}", store.read_all_bytes().await?); Ok(store) @@ -51,17 +52,17 @@ async fn create_or_connect() -> Result> { } -async fn append_entry(store: &mut Store, entry: &Entry) -> Result{ +async fn append_entry(cursor: &mut Cursor, entry: &Entry) -> Result{ println!("APPENDING"); println!("entry == {:?}", entry); - let file_position: FilePosition = store.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?; + let file_position: FilePosition = cursor.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?; println!("file_position == {:?}", file_position); Ok(file_position) } -async fn read_entry(store: &mut Store, file_position: FilePosition) -> Result>>{ +async fn read_entry(cursor: &mut Cursor, file_position: FilePosition) -> Result>>{ println!("READING ENTRY at file_position={}", file_position); - let entry = store.read_entry_at(file_position).await.map_err(|e| e.to_io_or_panic())?; + let entry = cursor.read_entry_at(file_position).await.map_err(|e| e.to_io_or_panic())?; println!("ENTRY: {:?}", entry); Ok(entry) } @@ -71,17 +72,18 @@ async fn read_entry(store: &mut Store, file_position: FilePosition) -> Res async fn main() -> Result<()> { println!("STOOOOOOOOOOOORAAAAAAAAAAAGE"); - let mut store: Store = create_or_connect().await?; + let store: Store = create_or_connect().await?; // let entry0 = read_entry(&mut store, 16).await?; // let entry1 = read_entry(&mut store, 45).await?; // println!("{:?}", store); // println!("{:?}", store.read_all_bytes().await?); + let mut cursor = store.cursor(AccessMode::Write).await.map_err(|e| e.to_io_or_panic())?; let entry0: Entry = Entry::new(vec![99, 98, 97, 96, 95]); - append_entry(&mut store, &entry0).await?; + append_entry(&mut cursor, &entry0).await?; - store.read_entries().await.map_err(|e| e.to_io_or_panic())?; + cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; // let entry2: StoreEntry = StoreEntry::new_deleted(vec![3, 2, 1]); diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index 00054b1..d96a42d 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -15,8 +15,6 @@ use crate::error::{Error, DecodeErrorKind}; use crate::index::Index; -use std::cell::RefCell; - use std::mem::size_of; type Result = std::result::Result; @@ -27,15 +25,19 @@ pub type FilePosition = u64; // TODO: Consider introducing a phantom type for the data that's used in the store. #[derive(Debug)] pub struct Store { + // TODO: This needs to track how many read-write cursors there are...? + // RWMutex + // {write: 0, read: n} ~> {write:0, read: n + 1} // create read + // {write: 0, read: n + 1} ~> {write:0, read: n} // destroy read + // {write: 0, read: 0} ~> {write: 1, read: 0} // create write + // {write: 1, read: 0} ~> {write: 0, read: 0} // destroy write table_folder: String, - file: File, // primary_index: Vec>>, // indexes: Vec>>>, // primary_index: Index, header: StoreHeader, data_type: PhantomData, - eof_file_position: FilePosition, // meta // location of rows file // locations of index files @@ -44,16 +46,24 @@ pub struct Store { // list } +pub struct Cursor { + header: StoreHeader, + file: File, + data_type: PhantomData, + + eof_file_position: FilePosition, +} + +pub enum AccessMode { + Read, + Write +} + +pub type PositionOfValue = FilePosition; +pub type PositionOfRow = FilePosition; -type PositionOfValue = FilePosition; -type PositionOfRow = FilePosition; - -// TODO: Basically a pointer to Store + its own file position -// pub struct Cursor<'a, T> { -// } - -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct StoreHeader { number_of_columns: usize, deleted_count: usize, @@ -143,8 +153,225 @@ impl SomethingSupportingLeq for Store } } +const ROWS_FILE_NAME: &'static str = "rows"; + impl Store { - const ROWS_FILE_NAME: &'static str = "rows"; + // For debugging. + // Moves file cursor to the end. + pub async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error>{ + let mut bytes: Vec = vec![]; + let mut cursor = self.cursor(AccessMode::Read).await.map_err(|e| e.to_io_or_panic())?; + cursor.file.read_to_end(&mut bytes).await?; + Ok(bytes) + } + + // ===Creation=== + pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result { + let path_to_table = Path::new(table_folder); + let path_to_rows = path_to_table.join(ROWS_FILE_NAME); + DirBuilder::new() + .create(path_to_table).await?; + + let mut file: File = + OpenOptions::new() + .write(true) + .read(true) + .create_new(true) + .open(path_to_rows) + .await?; + + let header = StoreHeader { + number_of_columns, + deleted_count: 0, + total_count: 0, + primary_column, + }; + let encoded_header: Vec = header.encode()?; + file.write(&encoded_header).await?; + + + // TODO: indexes + // let index: Index = Index::new( + // &format!("rows_{}", primary_column.to_string()), + // ).await?; + + let store = Self { + table_folder: table_folder.to_string(), + header, + data_type: PhantomData::, + }; + + Ok(store) + } + + pub async fn connect(table_folder: &str) -> Result + where T: std::fmt::Debug + { + let path_to_table = Path::new(table_folder); + let path_to_rows = path_to_table.join(ROWS_FILE_NAME); + + let mut file: File = + OpenOptions::new() + .read(true) + .write(true) + .open(path_to_rows) + .await?; + + // Unfortunately we can't yet use store.read_bytes, since it can't be created without the + // header. + let mut header_bytes = StoreHeader::decode_buffer(); + file.read_exact(&mut header_bytes).await?; + let header = StoreHeader::decode(&mut header_bytes).await?; + + let store = Self { + table_folder: table_folder.to_string(), + header, + data_type: PhantomData::, + }; + Ok(store) + } + + pub async fn cursor(&self, mode: AccessMode) -> Result> { + Cursor::new(&self, mode).await + } + + pub async fn garbage_collect(&mut self) -> Result<()> { + todo!() + } +} + +// ===Store Header=== +impl StoreHeader { + fn encode(&self) -> Result> { + let mut result = encode(&self.number_of_columns)?; + result.append(&mut encode(&self.deleted_count)?); + result.append(&mut encode(&self.total_count)?); + result.append(&mut encode(&self.primary_column)?); + Ok(result) + } + + fn decode_buffer() -> [u8; StoreHeader::SIZE] { + [0; StoreHeader::SIZE] + } + + async fn decode(result: &mut [u8]) -> Result { + let (number_of_columns, _) = + decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; + let (deleted_count, _) = + decode::(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; + let (total_count, _) = + decode::(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?; + let (primary_column, _) = + decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; + let header = StoreHeader { + number_of_columns, + deleted_count, + total_count, + primary_column, + }; + + Ok(header) + } + + // returns new count + fn increment_total_count(&mut self) -> usize { + self.total_count += 1; + self.total_count + } + + // returns new count + fn increment_deleted_count(&mut self) -> usize { + self.deleted_count += 1; + self.deleted_count + } +} + +// ====Entry==== +impl EntryHeader { + fn encode(self: &EntryHeader) -> Result> { + let result: Vec = encode(&self.is_deleted)?; + Ok(result) + } +} + +impl EntryHeaderWithDataSize { + fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { + let (is_deleted, _) = + decode::(&bytes) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; + + let data_sizes = decode_sequence::(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?; + + Ok(Self { is_deleted, data_sizes } ) + } +} + +impl Entry { + pub fn new(data: Vec) -> Self { + Self { header: EntryHeader { is_deleted: false }, data } + } + + pub fn new_deleted(data: Vec) -> Self { + Self { header: EntryHeader { is_deleted: true}, data } + } + + // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] + fn encode(&self) -> Result> + where T: Encode + { + let mut result: Vec = self.header.encode()?; + + let (mut encoded_data, sizes) = encode_sequence_with_sizes(&self.data[..])?; + result.append(&mut encode_sequence(&sizes)?); // sizes of data (fixed by number of columns) + result.append(&mut encoded_data); // data variable size + Ok(result) + } +} + +impl EntryDetailed { + fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result + where T: Decode + { + let data = decode_sequence::(number_of_columns, bytes) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; + Ok(EntryDetailed { header, data }) + } +} + + +//=================Cursor================== +impl Cursor { + pub async fn new(store: &Store, mode: AccessMode) -> Result { + let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME); + let file: File = match mode { + AccessMode::Read => + OpenOptions::new() + .read(true) + .open(path_to_rows) + .await?, + + AccessMode::Write => + OpenOptions::new() + .read(true) + .write(true) + .open(path_to_rows) + .await?, + }; + + let cursor = Self { + header: store.header.clone(), + file, + data_type: store.data_type, + + eof_file_position: 0, + }; + Ok(cursor) + } //===primitive file operations=== // Moves the file cursor right. @@ -165,7 +392,7 @@ impl Store { Ok(result) } - async fn seek_to(&mut self, file_position: FilePosition) -> Result<()>{ + async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> { self.file.seek(SeekFrom::Start(file_position)).await?; Ok(()) } @@ -203,76 +430,6 @@ impl Store { } // ===Creation=== - pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result { - let path_to_table = Path::new(table_folder); - let path_to_rows = path_to_table.join(Self::ROWS_FILE_NAME); - DirBuilder::new() - .create(path_to_table).await?; - - let file: File = - OpenOptions::new() - .write(true) - .read(true) - .create_new(true) - .open(path_to_rows) - .await?; - - let header = StoreHeader { - number_of_columns, - deleted_count: 0, - total_count: 0, - primary_column, - }; - let encoded_header: Vec = header.encode()?; - - - // let index: Index = Index::new( - // &format!("rows_{}", primary_column.to_string()), - // ).await?; - - let mut store = Self { - table_folder: table_folder.to_string(), - file, - header, - data_type: PhantomData::, - eof_file_position: 0, - }; - store.write_bytes(&encoded_header).await?; - store.eof_file_position = store.current_file_position().await?; - - Ok(store) - } - - pub async fn connect(table_folder: &str) -> Result - where T: std::fmt::Debug - { - let path_to_table = Path::new(table_folder); - let path_to_rows = path_to_table.join(Self::ROWS_FILE_NAME); - - let mut file: File = - OpenOptions::new() - .read(true) - .write(true) - .open(path_to_rows) - .await?; - - // Unfortunately we can't yet use store.read_bytes, since it can't be created without the - // header. - let mut header_bytes = StoreHeader::decode_buffer(); - file.read_exact(&mut header_bytes).await?; - let header = StoreHeader::decode(&mut header_bytes).await?; - - let eof_file_position = file.seek(SeekFrom::End(0)).await?; - - let store = Self { - table_folder: table_folder.to_string(), - file, - header, - data_type: PhantomData::, - eof_file_position, - }; - Ok(store) - } // ===Append Entry=== async fn increment_total_count(&mut self) -> Result<()> { @@ -398,118 +555,8 @@ impl Store { pub async fn get_all_eq(&self, column: Column, value: T) -> Result>> { todo!() } - - pub async fn garbage_collect(&mut self) -> Result<()> { - todo!() - } } -// ===Store Header=== -impl StoreHeader { - fn encode(&self) -> Result> { - let mut result = encode(&self.number_of_columns)?; - result.append(&mut encode(&self.deleted_count)?); - result.append(&mut encode(&self.total_count)?); - result.append(&mut encode(&self.primary_column)?); - Ok(result) - } - - fn decode_buffer() -> [u8; StoreHeader::SIZE] { - [0; StoreHeader::SIZE] - } - - async fn decode(result: &mut [u8]) -> Result { - let (number_of_columns, _) = - decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; - let (deleted_count, _) = - decode::(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; - let (total_count, _) = - decode::(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?; - let (primary_column, _) = - decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; - let header = StoreHeader { - number_of_columns, - deleted_count, - total_count, - primary_column, - }; - - Ok(header) - } - - // returns new count - fn increment_total_count(&mut self) -> usize { - self.total_count += 1; - self.total_count - } - - // returns new count - fn increment_deleted_count(&mut self) -> usize { - self.deleted_count += 1; - self.deleted_count - } -} - -// ====Entry==== -impl EntryHeader { - fn encode(self: &EntryHeader) -> Result> { - let result: Vec = encode(&self.is_deleted)?; - Ok(result) - } -} - -impl EntryHeaderWithDataSize { - fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { - let (is_deleted, _) = - decode::(&bytes) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; - - let data_sizes = decode_sequence::(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?; - - Ok(Self { is_deleted, data_sizes } ) - } -} - -impl Entry { - pub fn new(data: Vec) -> Self { - Self { header: EntryHeader { is_deleted: false }, data } - } - - pub fn new_deleted(data: Vec) -> Self { - Self { header: EntryHeader { is_deleted: true}, data } - } - - // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] - fn encode(&self) -> Result> - where T: Encode - { - let mut result: Vec = self.header.encode()?; - - let (mut encoded_data, sizes) = encode_sequence_with_sizes(&self.data[..])?; - result.append(&mut encode_sequence(&sizes)?); // sizes of data (fixed by number of columns) - result.append(&mut encoded_data); // data variable size - Ok(result) - } -} - -impl EntryDetailed { - fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result - where T: Decode - { - let data = decode_sequence::(number_of_columns, bytes) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; - Ok(EntryDetailed { header, data }) - } -} - - - - // impl StorageEngine for ColumnStore { // async fn append(&mut self, id: Index, entry: Row) -> Result