From edfecfa8d676d40d56b29406c6489039077de203 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Mon, 5 Feb 2024 16:03:19 +0100 Subject: [PATCH] Split cursor capabilities --- storage_engine/src/cursor.rs | 431 +----------------- .../src/cursor_capabilities/header_access.rs | 230 ++++++++++ .../src/cursor_capabilities/index_access.rs | 115 +++++ storage_engine/src/cursor_capabilities/mod.rs | 3 + .../src/cursor_capabilities/primitive.rs | 62 +++ storage_engine/src/lib.rs | 1 + storage_engine/src/storage_engine.rs | 7 +- 7 files changed, 438 insertions(+), 411 deletions(-) create mode 100644 storage_engine/src/cursor_capabilities/header_access.rs create mode 100644 storage_engine/src/cursor_capabilities/index_access.rs create mode 100644 storage_engine/src/cursor_capabilities/mod.rs create mode 100644 storage_engine/src/cursor_capabilities/primitive.rs diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 0811bfe..3f68560 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -1,22 +1,20 @@ -use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; use tokio::fs::{File, OpenOptions}; use tokio::fs; use std::path::Path; use std::marker::PhantomData; use std::collections::{BTreeMap, HashSet}; -use async_trait::async_trait; - use bincode; use bincode::{Decode, Encode}; -use crate::binary_coding::{encode, decode}; -use crate::error::{Error, DecodeErrorKind}; -use crate::segments::entry::{Entry, EntryDetailed}; -use crate::segments::entry_header::{EntryHeaderWithDataSize, EntryHeader}; +use crate::segments::entry::EntryDetailed; +use crate::segments::entry_header::EntryHeader; use crate::segments::store_header::StoreHeader; use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; use crate::index::Index; +use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; +use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader}; +use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex}; const GARBAGE_COLLECTION_TRIGGER: usize = 100; @@ -47,389 +45,9 @@ pub struct AppendOnlyCursor { } -// ===Traits=== -#[async_trait] -pub(crate) trait PrimitiveCursor { - fn file(&mut self) -> &mut File; - fn eof_file_position(&self) -> FilePosition; - - async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> { - self.file().read_exact(bytes).await?; - Ok(()) - } - - async fn get_bytes(&mut self, count: usize) -> Result> { - let mut result: Vec = Vec::with_capacity(count); - self.read_bytes(&mut result).await?; - Ok(result) - } - - async fn seek_to(&mut self, file_position: FilePosition) -> Result { - let file_position = self.file().seek(SeekFrom::Start(file_position)).await?; - Ok(file_position) - } - - // Start of the file i.e. the Header, not the entries. - async fn seek_to_start(&mut self) -> Result { - let file_position = self.file().seek(SeekFrom::Start(0)).await?; - Ok(file_position) - } - - async fn seek_to_end(&mut self) -> Result { - let file_position = self.file().seek(SeekFrom::End(0)).await?; - Ok(file_position) - } - - // Seeks from current position by offset and returns new file position - async fn seek_by(&mut self, offset: i64) -> Result { - let file_position = self.file().seek(SeekFrom::Current(offset)).await?; - Ok(file_position) - } - - async fn current_file_position(&mut self) -> Result { - let next_file_position: FilePosition = self.file().stream_position().await?; - Ok(next_file_position) - } - - async fn is_at_eof(&mut self) -> Result { - let current_file_position = self.current_file_position().await?; - let eof_file_position = self.eof_file_position(); - Ok(current_file_position == eof_file_position) - } -} - -#[async_trait] -pub(crate) trait PrimitiveWriteCursor: PrimitiveCursor { - async fn write_bytes(&mut self, bytes: &[u8]) -> Result { - Ok(self.file().write(bytes).await?) - } - -} - -#[async_trait] -pub trait CursorWithStoreHeader: PrimitiveCursor { - fn header(&self) -> &StoreHeader; - - async fn seek_to_start_of_data(&mut self) -> Result { - self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await - } - - async fn read_entry_header(&mut self) -> Result { - let number_of_columns: usize = self.header().number_of_columns; - let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; - self.read_bytes(&mut header_bytes).await?; - let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?; - - Ok(header) - } - - async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { - self.seek_to(file_position).await?; - self.read_entry_header().await - } - - // Returns None when file_position == eof_file_position - async fn read_entry_at(&mut self, file_position: FilePosition) -> Result>> - where T: Decode - { - self.seek_to(file_position).await?; - self.next().await - } - - // ===Iteration=== - // The following functions assume that the current file position is at a valid entry or EOF. - - - // WARNING: This moves the file_position to start of the data, so you can't just call - // next_entry_header() a bunch of times. You must move the cursor! - async fn next_entry_header(&mut self) -> Result> { - if self.is_at_eof().await? { - return Ok(None) - } - - let entry_header = self.read_entry_header().await?; - - Ok(Some(entry_header)) - } - - // This is meant to be used after next_entry_header() is called. - async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result{ - let file_position = self.seek_by(entry_header.size_of_data() as i64).await?; - Ok(file_position) - } - - async fn next(&mut self) -> Result>> - where T: Decode - { - let file_position = self.current_file_position().await?; - let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; - - let mut data_bytes: Vec = vec![0; entry_header.size_of_data()]; - self.read_bytes(&mut data_bytes).await?; - let entry: EntryDetailed = - EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?; - - Ok(Some(entry)) - } - - // Like next, but only reads the column, not the whole entry. - async fn next_at_column(&mut self, column: Column) -> Result> - where T: Decode + Send - { - let file_position = self.current_file_position().await?; - let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; - let file_position_at_start_of_data = self.current_file_position().await?; - - // figuring out how much to decode - let column_offset = entry_header.offset_of_column(column); - self.seek_by(column_offset as i64).await?; - - // reading and decoding - let mut bytes: Vec = vec![0; entry_header.data_sizes[column as usize]]; - self.read_bytes(&mut bytes).await?; - let (value, _) = - decode::(&bytes[..]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; - - // jumping to next entry - self.seek_to(file_position_at_start_of_data).await?; - self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?; - - Ok(Some((entry_header, file_position, value))) - } - - async fn next_alive_at_column(&mut self, column: Column) -> Result> - where T: Decode + Send - { - while let Some((header, file_position, t)) = self.next_at_column(column).await? { - if !header.is_deleted { - return Ok(Some((header, file_position, t))) - } - } - Ok(None) - } - - async fn next_alive(&mut self) -> Result>> - where T: Decode - { - while let Some(entry) = self.next().await? { - if !entry.header.is_deleted { - return Ok(Some(entry)) - } - } - Ok(None) - } - - // ===Search=== - async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> - where T: Decode + PartialEq + Send + Sync - { - let mut file_position = self.current_file_position().await?; - while let Some((_, _, t)) = self.next_alive_at_column(column).await? { - if &t == t0 { - // go back and decode the whole entry - self.seek_to(file_position).await?; - return self.next().await - } else { - file_position = self.current_file_position().await?; - } - } - Ok(None) - } - - async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> - where T: Decode + PartialEq + Send + Sync - { - let mut entries = vec![]; - while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? { - entries.push(entry) - } - Ok(entries) - } - - // ===Debugging=== - async fn read_entries(&mut self) -> Result<()> - where T: Decode + std::fmt::Debug - { - self.seek_to_start_of_data().await?; - while let Some(entry) = self.next().await? { - println!("{:?}", entry); - } - println!("END of entries."); - Ok(()) - } - - async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> { - let mut bytes: Vec = vec![]; - self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?; - self.file().read_to_end(&mut bytes).await?; - Ok(bytes) - } -} - -#[async_trait] -pub trait CursorWithWriteStoreHeader: CursorWithStoreHeader + PrimitiveWriteCursor { - fn header_mut(&mut self) -> &mut StoreHeader; - fn set_eof_file_position(&mut self, new_file_position: FilePosition); - - // ===Store Header Manipulation=== - async fn increment_total_count(&mut self) -> Result<()> - where T: Send - { - self.seek_to_start().await?; - self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; - let new_count = self.header_mut().increment_total_count(); - self.write_bytes(&encode::(&new_count)?).await?; - Ok(()) - } - - async fn increment_deleted_count(&mut self) -> Result<()> - where T: Send - { - self.seek_to_start().await?; - self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; - let new_count = self.header_mut().increment_deleted_count(); - self.write_bytes(&encode::(&new_count)?).await?; - Ok(()) - } - - async fn set_header(&mut self, header: &StoreHeader) -> Result<()> - where T: Send - { - self.seek_to_start().await?; - let encoded_header: Vec = header.encode()?; - self.write_bytes(&encoded_header).await?; - - Ok(()) - } - - // ===Append Entry=== - - // Moves cursor to the end. - // Returns file position to the start of the new entry. - async fn append_entry_no_indexing(&mut self, entry: &Entry) -> Result - where T: Encode + Send + Sync - { - self.increment_total_count().await?; - - let encoded_entry: Vec = entry.encode()?; - let file_position = self.seek_to_end().await?; - self.write_bytes(&encoded_entry).await?; - - let eof_file_position: FilePosition = self.current_file_position().await?; - self.set_eof_file_position(eof_file_position); - - Ok(file_position) - } -} - -#[async_trait] -pub trait CursorWithAccessToIndex: CursorWithStoreHeader { - fn indexes(&mut self) -> &[Option>]; - - async fn index_lookup(&mut self, column: Column, value: &T) -> Result>> - where T: Encode + Decode + Ord + Send + Sync - { - match &self.indexes()[column as usize] { - Some(index) => { - let file_positions = index.lookup(value).await?.unwrap_or_else(|| HashSet::new()); - let mut entries: Vec> = vec![]; - for &file_position in file_positions.iter() { - match self.read_entry_at(file_position).await? { - Some(entry) => { - entries.push(entry) - }, - None => { - return Err(Error::IndexIsStoringEofFilePosition(column)) - } - } - } - - Ok(entries) - }, - None => - Err(Error::AttemptToIndexNonIndexableColumn(column)) - } - } - - async fn select_entries_where_eq(&mut self, column: Column, value: &T) -> Result>> - where T: Encode + Decode + Ord + Send + Sync - { - if self.header().is_column_indexed(column) { - self.index_lookup(column, value).await - } else { - self.find_all_eq_bruteforce(column, value).await - } - } -} - -#[async_trait] -pub trait CursorWithWriteAccessToIndex: CursorWithAccessToIndex + CursorWithWriteStoreHeader { - fn indexes_mut(&mut self) -> &mut [Option>]; - - // Assumes that the column is indexable. - fn mut_index_at(&mut self, column: Column) -> &mut Index { - match &mut self.indexes_mut()[column as usize] { - Some(index) => { - index - }, - None => { - unreachable!() - } - } - } - - // Assumes that the column is indexable. - async fn insert_into_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> - where T: Encode + Decode + Ord + Send + Sync + 'async_trait - { - let index = self.mut_index_at(column as Column); - index.insert(value, file_position).await?; - Ok(()) - } - - // Assumes that the column is indexable. - async fn delete_from_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> - where T: Encode + Decode + Ord + Send + Sync + 'async_trait - { - let index = self.mut_index_at(column as Column); - index.delete(value, file_position).await?; - Ok(()) - } - - async fn insert_entry(&mut self, entry: Entry) -> Result - where T: Encode + Decode + Ord + Send + Sync + 'async_trait - { - let file_position = self.append_entry_no_indexing(&entry).await?; - - // insert the indexable values of the entry into corresponding indexes. - for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { - if should_index { - // SAFETY: If should_index is true, then the column is indexable. - self.insert_into_index(column as Column, value, file_position).await? - } - } - - Ok(file_position) - } - - async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed) -> Result<()> - where T: Encode + Decode + Ord + Send + Sync + 'async_trait - { - for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { - if should_index { - // SAFETY: If should_index is true, then the column is indexable. - self.delete_from_index(column as Column, value, entry.file_position).await? - } - } - Ok(()) - } -} - - // ===========Implementations============= -// ===PrimitiveCursor=== -impl PrimitiveCursor for ReadCursor<'_, T> { +// ===primitive capabilities=== +impl CursorCanRead for ReadCursor<'_, T> { fn file(&mut self) -> &mut File { &mut self.file } @@ -439,7 +57,7 @@ impl PrimitiveCursor for ReadCursor<'_, T> { } } -impl PrimitiveCursor for WriteCursor<'_, T> { +impl CursorCanRead for WriteCursor<'_, T> { fn file(&mut self) -> &mut File { &mut self.file } @@ -449,7 +67,7 @@ impl PrimitiveCursor for WriteCursor<'_, T> { } } -impl PrimitiveCursor for AppendOnlyCursor { +impl CursorCanRead for AppendOnlyCursor { fn file(&mut self) -> &mut File { &mut self.file } @@ -459,50 +77,48 @@ impl PrimitiveCursor for AppendOnlyCursor { } } -// ===PrimitiveCursor=== -impl PrimitiveWriteCursor for WriteCursor<'_, T> {} -impl PrimitiveWriteCursor for AppendOnlyCursor {} +impl CursorCanWrite for WriteCursor<'_, T> {} +impl CursorCanWrite for AppendOnlyCursor {} -// ===CursorWithStoreHeader=== -impl CursorWithStoreHeader for ReadCursor<'_, T> { +// ===capability to access header=== +impl CursorCanReadHeader for ReadCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } -impl CursorWithStoreHeader for WriteCursor<'_, T> { +impl CursorCanReadHeader for WriteCursor<'_, T> { fn header(&self) -> &StoreHeader { &self.header } } -impl CursorWithStoreHeader for AppendOnlyCursor { +impl CursorCanReadHeader for AppendOnlyCursor { fn header(&self) -> &StoreHeader { &self.header } } -// ===CursorWithWriteStoreHeader=== -impl CursorWithWriteStoreHeader for WriteCursor<'_, T> { +impl CursorCanWriteHeader for WriteCursor<'_, T> { fn header_mut(&mut self) -> &mut StoreHeader { self.header } fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } } -impl CursorWithWriteStoreHeader for AppendOnlyCursor { +impl CursorCanWriteHeader for AppendOnlyCursor { fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header } fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } } -// ===CursorWithAccessToIndex=== -impl CursorWithAccessToIndex for ReadCursor<'_, T> { +// ===capability to access index=== +impl CursorCanReadIndex for ReadCursor<'_, T> { fn indexes(&mut self) -> &[Option>] { &self.indexes } } -impl CursorWithAccessToIndex for WriteCursor<'_, T> { +impl CursorCanReadIndex for WriteCursor<'_, T> { fn indexes(&mut self) -> &[Option>] { &self.indexes } } -// ===CursorWithWriteAccessToIndex=== -impl CursorWithWriteAccessToIndex for WriteCursor<'_, T> { +impl CursorCanWriteToIndex for WriteCursor<'_, T> { fn indexes_mut(&mut self) -> &mut [Option>] { self.indexes } } +// ===Specifics=== impl <'cursor, T> ReadCursor<'cursor, T> { pub async fn new<'store: 'cursor>(store: &'store Store) -> Result where T: Send + Sync @@ -530,8 +146,6 @@ impl <'cursor, T> ReadCursor<'cursor, T> { } } - - impl <'cursor, T> WriteCursor<'cursor, T> { // 'store lives atleast as long as 'cursor @@ -778,4 +392,3 @@ impl <'cursor, T> WriteCursor<'cursor, T> Ok(cursor_to_intermediate) } } - diff --git a/storage_engine/src/cursor_capabilities/header_access.rs b/storage_engine/src/cursor_capabilities/header_access.rs new file mode 100644 index 0000000..57ccabb --- /dev/null +++ b/storage_engine/src/cursor_capabilities/header_access.rs @@ -0,0 +1,230 @@ +use tokio::io::AsyncReadExt; +use async_trait::async_trait; + +use bincode; +use bincode::{Decode, Encode}; +use crate::binary_coding::{encode, decode}; + +use crate::error::{Error, DecodeErrorKind}; +use crate::segments::entry::{Entry, EntryDetailed}; +use crate::segments::entry_header::EntryHeaderWithDataSize; +use crate::segments::store_header::StoreHeader; +use crate::storage_engine::{FilePosition, Column, Result}; +use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; + +#[async_trait] +pub trait CursorCanReadHeader: CursorCanRead { + fn header(&self) -> &StoreHeader; + + async fn seek_to_start_of_data(&mut self) -> Result { + self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await + } + + async fn read_entry_header(&mut self) -> Result { + let number_of_columns: usize = self.header().number_of_columns; + let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; + self.read_bytes(&mut header_bytes).await?; + let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?; + + Ok(header) + } + + async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { + self.seek_to(file_position).await?; + self.read_entry_header().await + } + + // Returns None when file_position == eof_file_position + async fn read_entry_at(&mut self, file_position: FilePosition) -> Result>> + where T: Decode + { + self.seek_to(file_position).await?; + self.next().await + } + + // ===Iteration=== + // The following functions assume that the current file position is at a valid entry or EOF. + + + // WARNING: This moves the file_position to start of the data, so you can't just call + // next_entry_header() a bunch of times. You must move the cursor! + async fn next_entry_header(&mut self) -> Result> { + if self.is_at_eof().await? { + return Ok(None) + } + + let entry_header = self.read_entry_header().await?; + + Ok(Some(entry_header)) + } + + // This is meant to be used after next_entry_header() is called. + async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result{ + let file_position = self.seek_by(entry_header.size_of_data() as i64).await?; + Ok(file_position) + } + + async fn next(&mut self) -> Result>> + where T: Decode + { + let file_position = self.current_file_position().await?; + let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; + + let mut data_bytes: Vec = vec![0; entry_header.size_of_data()]; + self.read_bytes(&mut data_bytes).await?; + let entry: EntryDetailed = + EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?; + + Ok(Some(entry)) + } + + // Like next, but only reads the column, not the whole entry. + async fn next_at_column(&mut self, column: Column) -> Result> + where T: Decode + Send + { + let file_position = self.current_file_position().await?; + let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; + let file_position_at_start_of_data = self.current_file_position().await?; + + // figuring out how much to decode + let column_offset = entry_header.offset_of_column(column); + self.seek_by(column_offset as i64).await?; + + // reading and decoding + let mut bytes: Vec = vec![0; entry_header.data_sizes[column as usize]]; + self.read_bytes(&mut bytes).await?; + let (value, _) = + decode::(&bytes[..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; + + // jumping to next entry + self.seek_to(file_position_at_start_of_data).await?; + self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?; + + Ok(Some((entry_header, file_position, value))) + } + + async fn next_alive_at_column(&mut self, column: Column) -> Result> + where T: Decode + Send + { + while let Some((header, file_position, t)) = self.next_at_column(column).await? { + if !header.is_deleted { + return Ok(Some((header, file_position, t))) + } + } + Ok(None) + } + + async fn next_alive(&mut self) -> Result>> + where T: Decode + { + while let Some(entry) = self.next().await? { + if !entry.header.is_deleted { + return Ok(Some(entry)) + } + } + Ok(None) + } + + // ===Search=== + async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> + where T: Decode + PartialEq + Send + Sync + { + let mut file_position = self.current_file_position().await?; + while let Some((_, _, t)) = self.next_alive_at_column(column).await? { + if &t == t0 { + // go back and decode the whole entry + self.seek_to(file_position).await?; + return self.next().await + } else { + file_position = self.current_file_position().await?; + } + } + Ok(None) + } + + async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> + where T: Decode + PartialEq + Send + Sync + { + let mut entries = vec![]; + while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? { + entries.push(entry) + } + Ok(entries) + } + + // ===Debugging=== + async fn read_entries(&mut self) -> Result<()> + where T: Decode + std::fmt::Debug + { + self.seek_to_start_of_data().await?; + while let Some(entry) = self.next().await? { + println!("{:?}", entry); + } + println!("END of entries."); + Ok(()) + } + + async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> { + let mut bytes: Vec = vec![]; + self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?; + self.file().read_to_end(&mut bytes).await?; + Ok(bytes) + } +} + +#[async_trait] +pub trait CursorCanWriteHeader: CursorCanReadHeader + CursorCanWrite { + fn header_mut(&mut self) -> &mut StoreHeader; + fn set_eof_file_position(&mut self, new_file_position: FilePosition); + + // ===Store Header Manipulation=== + async fn increment_total_count(&mut self) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; + let new_count = self.header_mut().increment_total_count(); + self.write_bytes(&encode::(&new_count)?).await?; + Ok(()) + } + + async fn increment_deleted_count(&mut self) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; + let new_count = self.header_mut().increment_deleted_count(); + self.write_bytes(&encode::(&new_count)?).await?; + Ok(()) + } + + async fn set_header(&mut self, header: &StoreHeader) -> Result<()> + where T: Send + { + self.seek_to_start().await?; + let encoded_header: Vec = header.encode()?; + self.write_bytes(&encoded_header).await?; + + Ok(()) + } + + // ===Append Entry=== + + // Moves cursor to the end. + // Returns file position to the start of the new entry. + async fn append_entry_no_indexing(&mut self, entry: &Entry) -> Result + where T: Encode + Send + Sync + { + self.increment_total_count().await?; + + let encoded_entry: Vec = entry.encode()?; + let file_position = self.seek_to_end().await?; + self.write_bytes(&encoded_entry).await?; + + let eof_file_position: FilePosition = self.current_file_position().await?; + self.set_eof_file_position(eof_file_position); + + Ok(file_position) + } +} diff --git a/storage_engine/src/cursor_capabilities/index_access.rs b/storage_engine/src/cursor_capabilities/index_access.rs new file mode 100644 index 0000000..d9d7fc3 --- /dev/null +++ b/storage_engine/src/cursor_capabilities/index_access.rs @@ -0,0 +1,115 @@ +use std::collections::HashSet; + +use async_trait::async_trait; + +use bincode; +use bincode::{Decode, Encode}; + +use crate::error::Error; +use crate::segments::entry::{Entry, EntryDetailed}; +use crate::storage_engine::{FilePosition, Column, Result}; +use crate::index::Index; +use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader}; + +#[async_trait] +pub trait CursorCanReadIndex: CursorCanReadHeader { + fn indexes(&mut self) -> &[Option>]; + + async fn index_lookup(&mut self, column: Column, value: &T) -> Result>> + where T: Encode + Decode + Ord + Send + Sync + { + match &self.indexes()[column as usize] { + Some(index) => { + let file_positions = index.lookup(value).await?.unwrap_or_else(|| HashSet::new()); + let mut entries: Vec> = vec![]; + for &file_position in file_positions.iter() { + match self.read_entry_at(file_position).await? { + Some(entry) => { + entries.push(entry) + }, + None => { + return Err(Error::IndexIsStoringEofFilePosition(column)) + } + } + } + + Ok(entries) + }, + None => + Err(Error::AttemptToIndexNonIndexableColumn(column)) + } + } + + async fn select_entries_where_eq(&mut self, column: Column, value: &T) -> Result>> + where T: Encode + Decode + Ord + Send + Sync + { + if self.header().is_column_indexed(column) { + self.index_lookup(column, value).await + } else { + self.find_all_eq_bruteforce(column, value).await + } + } +} + +#[async_trait] +pub trait CursorCanWriteToIndex: CursorCanReadIndex + CursorCanWriteHeader { + fn indexes_mut(&mut self) -> &mut [Option>]; + + // Assumes that the column is indexable. + fn mut_index_at(&mut self, column: Column) -> &mut Index { + match &mut self.indexes_mut()[column as usize] { + Some(index) => { + index + }, + None => { + unreachable!() + } + } + } + + // Assumes that the column is indexable. + async fn insert_into_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> + where T: Encode + Decode + Ord + Send + Sync + 'async_trait + { + let index = self.mut_index_at(column as Column); + index.insert(value, file_position).await?; + Ok(()) + } + + // Assumes that the column is indexable. + async fn delete_from_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> + where T: Encode + Decode + Ord + Send + Sync + 'async_trait + { + let index = self.mut_index_at(column as Column); + index.delete(value, file_position).await?; + Ok(()) + } + + async fn insert_entry(&mut self, entry: Entry) -> Result + where T: Encode + Decode + Ord + Send + Sync + 'async_trait + { + let file_position = self.append_entry_no_indexing(&entry).await?; + + // insert the indexable values of the entry into corresponding indexes. + for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { + if should_index { + // SAFETY: If should_index is true, then the column is indexable. + self.insert_into_index(column as Column, value, file_position).await? + } + } + + Ok(file_position) + } + + async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed) -> Result<()> + where T: Encode + Decode + Ord + Send + Sync + 'async_trait + { + for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { + if should_index { + // SAFETY: If should_index is true, then the column is indexable. + self.delete_from_index(column as Column, value, entry.file_position).await? + } + } + Ok(()) + } +} diff --git a/storage_engine/src/cursor_capabilities/mod.rs b/storage_engine/src/cursor_capabilities/mod.rs new file mode 100644 index 0000000..6d301cb --- /dev/null +++ b/storage_engine/src/cursor_capabilities/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod primitive; +pub(crate) mod header_access; +pub(crate) mod index_access; diff --git a/storage_engine/src/cursor_capabilities/primitive.rs b/storage_engine/src/cursor_capabilities/primitive.rs new file mode 100644 index 0000000..d11b79b --- /dev/null +++ b/storage_engine/src/cursor_capabilities/primitive.rs @@ -0,0 +1,62 @@ +use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; +use tokio::fs::File; +use async_trait::async_trait; + +use crate::storage_engine::{FilePosition, Result}; + +#[async_trait] +pub(crate) trait CursorCanRead { + fn file(&mut self) -> &mut File; + fn eof_file_position(&self) -> FilePosition; + + async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> { + self.file().read_exact(bytes).await?; + Ok(()) + } + + async fn get_bytes(&mut self, count: usize) -> Result> { + let mut result: Vec = Vec::with_capacity(count); + self.read_bytes(&mut result).await?; + Ok(result) + } + + async fn seek_to(&mut self, file_position: FilePosition) -> Result { + let file_position = self.file().seek(SeekFrom::Start(file_position)).await?; + Ok(file_position) + } + + // Start of the file i.e. the Header, not the entries. + async fn seek_to_start(&mut self) -> Result { + let file_position = self.file().seek(SeekFrom::Start(0)).await?; + Ok(file_position) + } + + async fn seek_to_end(&mut self) -> Result { + let file_position = self.file().seek(SeekFrom::End(0)).await?; + Ok(file_position) + } + + // Seeks from current position by offset and returns new file position + async fn seek_by(&mut self, offset: i64) -> Result { + let file_position = self.file().seek(SeekFrom::Current(offset)).await?; + Ok(file_position) + } + + async fn current_file_position(&mut self) -> Result { + let next_file_position: FilePosition = self.file().stream_position().await?; + Ok(next_file_position) + } + + async fn is_at_eof(&mut self) -> Result { + let current_file_position = self.current_file_position().await?; + let eof_file_position = self.eof_file_position(); + Ok(current_file_position == eof_file_position) + } +} + +#[async_trait] +pub(crate) trait CursorCanWrite: CursorCanRead { + async fn write_bytes(&mut self, bytes: &[u8]) -> Result { + Ok(self.file().write(bytes).await?) + } +} diff --git a/storage_engine/src/lib.rs b/storage_engine/src/lib.rs index cae01f5..e7920dd 100644 --- a/storage_engine/src/lib.rs +++ b/storage_engine/src/lib.rs @@ -4,3 +4,4 @@ mod error; mod index; mod cursor; mod segments; +mod cursor_capabilities; diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index 01f87d0..d31327c 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -5,7 +5,8 @@ use std::path::{Path, PathBuf}; use bincode::{Decode, Encode}; use crate::error::Error; -use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader}; +use crate::cursor::{ReadCursor, WriteCursor}; +use crate::cursor_capabilities::header_access::CursorCanReadHeader; use crate::segments::store_header::StoreHeader; use crate::index::Index; @@ -207,7 +208,9 @@ impl Store { mod tests { use super::*; use crate::segments::entry::{Entry, EntryDetailed}; - use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, CursorWithWriteAccessToIndex, CursorWithAccessToIndex}; + use crate::cursor::{ReadCursor, WriteCursor}; + use crate::cursor_capabilities::header_access::CursorCanReadHeader; + use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex}; impl Drop for Store { fn drop(&mut self) {