From a37c3a5e772f0f3d4bf09f1b44b462162d095499 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sat, 3 Feb 2024 19:00:00 +0100 Subject: [PATCH] Split Cursor into ReadCursor and WriteCursor --- storage_engine/src/cursor.rs | 302 +++++++++++++++++++++++++++ storage_engine/src/lib.rs | 1 + storage_engine/src/main.rs | 1 + storage_engine/src/storage_engine.rs | 70 +++---- 4 files changed, 334 insertions(+), 40 deletions(-) create mode 100644 storage_engine/src/cursor.rs diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs new file mode 100644 index 0000000..648d964 --- /dev/null +++ b/storage_engine/src/cursor.rs @@ -0,0 +1,302 @@ +use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; +use tokio::fs::{File, OpenOptions, DirBuilder}; +use std::path::Path; +use std::marker::PhantomData; + +use async_trait::async_trait; + +use bincode; +use bincode::{Decode, Encode}; +use crate::binary_coding::{encode, decode, encode_sequence, encode_sequence_with_sizes, decode_sequence}; +use tokio::fs; + +use crate::error::{Error, DecodeErrorKind}; +use crate::storage_engine::{Store, StoreHeader, FilePosition, Result, ROWS_FILE_NAME, EntryDetailed, EntryHeaderWithDataSize, Entry}; + +#[async_trait] +trait PrimitiveCursor { + fn file(&mut self) -> &mut File; + fn eof_file_position(&self) -> FilePosition; + + async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> { + self.file().read_exact(bytes).await?; + Ok(()) + } + + async fn get_bytes(&mut self, count: usize) -> Result> { + let mut result: Vec = Vec::with_capacity(count); + self.read_bytes(&mut result).await?; + Ok(result) + } + + async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> { + self.file().seek(SeekFrom::Start(file_position)).await?; + Ok(()) + } + + async fn seek_to_start(&mut self) -> Result<()> { + self.file().seek(SeekFrom::Start(0)).await?; + Ok(()) + } + + async fn seek_to_end(&mut self) -> Result<()> { + self.file().seek(SeekFrom::End(0)).await?; + Ok(()) + } + + async fn seek_to_start_of_data(&mut self) -> Result<()> { + self.seek_to(StoreHeader::SIZE as u64).await + } + + async fn current_file_position(&mut self) -> Result { + let next_file_position: FilePosition = self.file().stream_position().await?; + Ok(next_file_position) + } + + async fn is_at_eof(&mut self) -> Result { + Ok(self.current_file_position().await? == self.eof_file_position()) + } +} + +#[async_trait] +trait CursorWithStoreHeader: PrimitiveCursor { + fn header(&self) -> &StoreHeader; + + async fn read_entry_header(&mut self) -> Result { + let number_of_columns: usize = self.header().number_of_columns; + let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; + self.read_bytes(&mut header_bytes).await?; + let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?; + + Ok(header) + } + + async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { + self.seek_to(file_position).await?; + self.read_entry_header().await + } + + // Returns None when file_position == eof_file_position + async fn read_entry_at(&mut self, file_position: FilePosition) -> Result>> + where T: Decode + { + self.seek_to(file_position).await?; + self.next().await + } + + // ===Iteration=== + // Assumes that the current file position is at a valid entry or EOF. + async fn next(&mut self) -> Result>> + where T: Decode + { + if self.is_at_eof().await? { + return Ok(None) + } + + let entry_header = self.read_entry_header().await?; + + let mut data_bytes: Vec = vec![0; entry_header.size_of_data()]; + self.read_bytes(&mut data_bytes).await?; + let entry: EntryDetailed = + EntryDetailed::decode(entry_header, self.header().number_of_columns, &mut data_bytes)?; + + Ok(Some(entry)) + } + + async fn read_entries(&mut self) -> Result<()> + where T: Decode + std::fmt::Debug + { + self.seek_to_start_of_data().await?; + while let Some(entry) = self.next().await? { + println!("{:?}", entry); + } + println!("END of entries."); + Ok(()) + } +} + + +// ===Concrete Cursors=== +pub struct ReadCursor { + header: StoreHeader, + file: File, + data_type: PhantomData, + + eof_file_position: FilePosition, +} + +pub struct WriteCursor<'a, T> { + header: &'a mut StoreHeader, + file: File, + data_type: PhantomData, + + eof_file_position: FilePosition, +} + +// ===PrimitiveCursor=== +impl PrimitiveCursor for ReadCursor { + fn file(&mut self) -> &mut File { + &mut self.file + } + + fn eof_file_position(&self) -> FilePosition { + self.eof_file_position + } +} + +impl PrimitiveCursor for WriteCursor<'_, T> { + fn file(&mut self) -> &mut File { + &mut self.file + } + + fn eof_file_position(&self) -> FilePosition { + self.eof_file_position + } +} + +// ===CursorWithStoreHeader=== +impl CursorWithStoreHeader for ReadCursor { + fn header(&self) -> &StoreHeader { + &self.header + } +} + +impl CursorWithStoreHeader for WriteCursor<'_, T> { + fn header(&self) -> &StoreHeader { + &self.header + } +} + +impl ReadCursor { + pub async fn new(store: &Store) -> Result + where T: Send + { + let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME); + let file: File = + OpenOptions::new() + .read(true) + .open(path_to_rows) + .await?; + + let mut cursor = Self { + header: store.header.clone(), + file, + data_type: store.data_type, + + eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data + }; + cursor.seek_to_start_of_data().await?; + + Ok(cursor) + } +} + +impl <'cursor, T> WriteCursor<'cursor, T> { + // 'store lives atleast as long as 'cursor + pub async fn new<'store: 'cursor>(store: &'store mut Store) -> Result + where T: Send + { + let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME); + let file: File = + OpenOptions::new() + .read(true) + .write(true) + .open(path_to_rows) + .await?; + + let mut cursor = Self { + header: &mut store.header, + file, + data_type: store.data_type, + + eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data + }; + cursor.seek_to_start_of_data().await?; + + Ok(cursor) + } + + // ===Primitive Operations=== + async fn write_bytes(&mut self, bytes: &[u8]) -> Result { + Ok(self.file.write(bytes).await?) + } + + // ===Store Header Manipulation=== + async fn increment_total_count(&mut self) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; + let new_count = self.header.increment_total_count(); + self.write_bytes(&encode::(&new_count)?).await?; + Ok(()) + } + + async fn increment_deleted_count(&mut self) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; + let new_count = self.header.increment_deleted_count(); + self.write_bytes(&encode::(&new_count)?).await?; + Ok(()) + } + + // ===Entry Header Manipulation=== + // assumes we are at the start of the valid entry. + async fn set_entry_is_deleted_to(&mut self, is_deleted: bool) -> Result<()> + where T: Send + { + self.seek_to(EntryHeaderWithDataSize::IS_DELETED_OFFSET as u64).await?; + self.write_bytes(&encode::(&is_deleted)?).await?; + Ok(()) + } + + // ===Append Entry=== + + // Moves cursor to the end. + // Returns file position to the start of the new entry. + pub async fn append_entry(&mut self, entry: &Entry) -> Result + where T: Encode + Send + { + self.increment_total_count().await?; + + let encoded_entry: Vec = entry.encode()?; + self.seek_to_end().await?; + let file_position: FilePosition = self.current_file_position().await?; + self.write_bytes(&encoded_entry).await?; + + let eof_file_position: FilePosition = self.current_file_position().await?; + self.eof_file_position = eof_file_position; + + Ok(file_position) + } + + // ===Deletion=== + pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> + where T: Send + { + self.seek_to(file_position).await?; + let entry_header = self.read_entry_header().await?; + if entry_header.is_deleted { + Ok(()) + } else { + self.increment_deleted_count().await?; + self.seek_to(file_position).await?; + self.set_entry_is_deleted_to(true).await?; + + self.attempt_garbage_collection_if_necessary().await?; + Ok(()) + } + } + + async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> { + // TODO: What should be the policy? Counting size of garbage? Counting how many entries are + // garbage? + if self.header.deleted_count > 100 { + todo!() + } else { + Ok(()) + } + } +} diff --git a/storage_engine/src/lib.rs b/storage_engine/src/lib.rs index 192f3db..164e164 100644 --- a/storage_engine/src/lib.rs +++ b/storage_engine/src/lib.rs @@ -2,3 +2,4 @@ pub mod storage_engine; mod binary_coding; mod error; mod index; +mod cursor; diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index 117ac4a..504e519 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -8,6 +8,7 @@ mod storage_engine; mod binary_coding; mod error; mod index; +mod cursor; use crate::storage_engine::*; diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index ddc0bb2..27e9e41 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -17,7 +17,7 @@ use crate::index::Index; use std::mem::size_of; -type Result = std::result::Result; +pub type Result = std::result::Result; pub type Column = u64; pub type FilePosition = u64; @@ -31,7 +31,7 @@ pub struct Store { // {write: 0, read: n + 1} ~> {write:0, read: n} // destroy read // {write: 0, read: 0} ~> {write: 1, read: 0} // create write // {write: 1, read: 0} ~> {write: 0, read: 0} // destroy write - table_folder: String, + pub table_folder: String, // primary_index: Vec>>, // indexes: Vec>>>, // primary_index: Index, @@ -39,8 +39,8 @@ pub struct Store { // TODO: It's not good to have StoreHeader copied to all the cursors, since they may modify it. // How to sync? // All - header: StoreHeader, - data_type: PhantomData, + pub header: StoreHeader, + pub data_type: PhantomData, // meta // location of rows file @@ -61,14 +61,6 @@ pub struct Cursor { eof_file_position: FilePosition, } -pub struct WriteCursor<'a, T> { - header: &'a mut StoreHeader, - file: File, - data_type: PhantomData, - - eof_file_position: FilePosition, -} - pub enum AccessMode { Read, Write @@ -80,22 +72,22 @@ pub type PositionOfRow = FilePosition; #[derive(Debug, Clone)] pub struct StoreHeader { - number_of_columns: usize, - deleted_count: usize, - total_count: usize, - primary_column: Column, + pub number_of_columns: usize, + pub deleted_count: usize, + pub total_count: usize, + pub primary_column: Column, } impl StoreHeader { - const NUMBER_OF_COLUMNS_SIZE: usize = size_of::(); - const DELETED_COUNT_SIZE: usize = size_of::(); - const TOTAL_COUNT_SIZE: usize = size_of::(); - const PRIMARY_COLUMN_SIZE: usize = size_of::(); - const SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; + pub const NUMBER_OF_COLUMNS_SIZE: usize = size_of::(); + pub const DELETED_COUNT_SIZE: usize = size_of::(); + pub const TOTAL_COUNT_SIZE: usize = size_of::(); + pub const PRIMARY_COLUMN_SIZE: usize = size_of::(); + pub const SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; - const NUMBER_OF_COLUMNS_OFFSET: usize = 0; - const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; - const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE; - const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE; + pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0; + pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; + pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE; + pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE; } #[derive(Debug)] @@ -105,21 +97,21 @@ pub struct EntryHeader { #[derive(Debug)] pub struct EntryHeaderWithDataSize { - is_deleted: bool, - data_sizes: Vec, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6 + pub is_deleted: bool, + pub data_sizes: Vec, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6 // bytes etc } impl EntryHeaderWithDataSize { - const IS_DELETED_OFFSET: usize = 0; - const IS_DELETED_SIZE: usize = size_of::(); - const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE; + pub const IS_DELETED_OFFSET: usize = 0; + pub const IS_DELETED_SIZE: usize = size_of::(); + pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE; - fn size(number_of_columns: usize) -> usize { + pub fn size(number_of_columns: usize) -> usize { let size_of_data_sizes: usize = number_of_columns*size_of::(); Self::IS_DELETED_SIZE + size_of_data_sizes } - fn size_of_data(&self) -> usize{ + pub fn size_of_data(&self) -> usize{ self.data_sizes.iter().sum() } } @@ -164,7 +156,7 @@ impl SomethingSupportingLeq for Store } } -const ROWS_FILE_NAME: &'static str = "rows"; +pub const ROWS_FILE_NAME: &'static str = "rows"; impl Store { // For debugging. @@ -289,13 +281,13 @@ impl StoreHeader { } // returns new count - fn increment_total_count(&mut self) -> usize { + pub fn increment_total_count(&mut self) -> usize { self.total_count += 1; self.total_count } // returns new count - fn increment_deleted_count(&mut self) -> usize { + pub fn increment_deleted_count(&mut self) -> usize { self.deleted_count += 1; self.deleted_count } @@ -310,7 +302,7 @@ impl EntryHeader { } impl EntryHeaderWithDataSize { - fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { + pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { let (is_deleted, _) = decode::(&bytes) .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; @@ -332,7 +324,7 @@ impl Entry { } // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] - fn encode(&self) -> Result> + pub fn encode(&self) -> Result> where T: Encode { let mut result: Vec = self.header.encode()?; @@ -345,7 +337,7 @@ impl Entry { } impl EntryDetailed { - fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result + pub fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result where T: Decode { let data = decode_sequence::(number_of_columns, bytes) @@ -405,7 +397,6 @@ impl Cursor { Ok(result) } - // TODO: make private pub async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> { self.file.seek(SeekFrom::Start(file_position)).await?; Ok(()) @@ -425,7 +416,6 @@ impl Cursor { self.seek_to(StoreHeader::SIZE as u64).await } - // TODO: Make private pub async fn current_file_position(&mut self) -> Result { let next_file_position: FilePosition = self.file.stream_position().await?; Ok(next_file_position)