From 3bf04ae2d6d6698f6acf9f001ed03955afd8b047 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sat, 3 Feb 2024 19:32:50 +0100 Subject: [PATCH] Cleanup --- storage_engine/src/cursor.rs | 11 +- storage_engine/src/entry.rs | 57 ++++++++ storage_engine/src/entry_header.rs | 42 ++++++ storage_engine/src/lib.rs | 3 + storage_engine/src/main.rs | 18 ++- storage_engine/src/storage_engine.rs | 206 +-------------------------- storage_engine/src/store_header.rs | 72 ++++++++++ 7 files changed, 194 insertions(+), 215 deletions(-) create mode 100644 storage_engine/src/entry.rs create mode 100644 storage_engine/src/entry_header.rs create mode 100644 storage_engine/src/store_header.rs diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index c658213..716abf7 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -1,5 +1,5 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; -use tokio::fs::{File, OpenOptions, DirBuilder}; +use tokio::fs::{File, OpenOptions}; use std::path::Path; use std::marker::PhantomData; @@ -7,11 +7,12 @@ use async_trait::async_trait; use bincode; use bincode::{Decode, Encode}; -use crate::binary_coding::{encode, decode, encode_sequence, encode_sequence_with_sizes, decode_sequence}; -use tokio::fs; +use crate::binary_coding::encode; -use crate::error::{Error, DecodeErrorKind}; -use crate::storage_engine::{Store, StoreHeader, FilePosition, Result, ROWS_FILE_NAME, EntryDetailed, EntryHeaderWithDataSize, Entry}; +use crate::entry::{Entry, EntryDetailed}; +use crate::entry_header::EntryHeaderWithDataSize; +use crate::store_header::StoreHeader; +use crate::storage_engine::{Store, FilePosition, Result, ROWS_FILE_NAME}; #[async_trait] trait PrimitiveCursor { diff --git a/storage_engine/src/entry.rs b/storage_engine/src/entry.rs new file mode 100644 index 0000000..84d15f9 --- /dev/null +++ b/storage_engine/src/entry.rs @@ -0,0 +1,57 @@ +use bincode::{Decode, Encode}; + +use crate::binary_coding::{encode, encode_sequence, encode_sequence_with_sizes, decode_sequence}; +use crate::storage_engine::Result; +use crate::error::{Error, DecodeErrorKind}; +use crate::entry_header::{EntryHeader, EntryHeaderWithDataSize}; + +#[derive(Debug)] +pub struct Entry { + header: EntryHeader, + data: Vec, +} + +#[derive(Debug)] +pub struct EntryDetailed { + header: EntryHeaderWithDataSize, + data: Vec, +} + +impl EntryHeader { + fn encode(self: &EntryHeader) -> Result> { + let result: Vec = encode(&self.is_deleted)?; + Ok(result) + } +} + +impl Entry { + pub fn new(data: Vec) -> Self { + Self { header: EntryHeader { is_deleted: false }, data } + } + + pub fn new_deleted(data: Vec) -> Self { + Self { header: EntryHeader { is_deleted: true}, data } + } + + // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] + pub fn encode(&self) -> Result> + where T: Encode + { + let mut result: Vec = self.header.encode()?; + + let (mut encoded_data, sizes) = encode_sequence_with_sizes(&self.data[..])?; + result.append(&mut encode_sequence(&sizes)?); // sizes of data (fixed by number of columns) + result.append(&mut encoded_data); // data variable size + Ok(result) + } +} + +impl EntryDetailed { + pub fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result + where T: Decode + { + let data = decode_sequence::(number_of_columns, bytes) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; + Ok(EntryDetailed { header, data }) + } +} diff --git a/storage_engine/src/entry_header.rs b/storage_engine/src/entry_header.rs new file mode 100644 index 0000000..b1a9f7d --- /dev/null +++ b/storage_engine/src/entry_header.rs @@ -0,0 +1,42 @@ +use crate::binary_coding::{decode, decode_sequence}; +use crate::storage_engine::Result; +use crate::error::{Error, DecodeErrorKind}; +use std::mem::size_of; + +#[derive(Debug)] +pub struct EntryHeader { + pub is_deleted: bool, +} + +#[derive(Debug)] +pub struct EntryHeaderWithDataSize { + pub is_deleted: bool, + pub data_sizes: Vec, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6 + // bytes etc +} +impl EntryHeaderWithDataSize { + pub const IS_DELETED_OFFSET: usize = 0; + pub const IS_DELETED_SIZE: usize = size_of::(); + pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE; + + pub fn size(number_of_columns: usize) -> usize { + let size_of_data_sizes: usize = number_of_columns*size_of::(); + Self::IS_DELETED_SIZE + size_of_data_sizes + } + + pub fn size_of_data(&self) -> usize{ + self.data_sizes.iter().sum() + } + + pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { + let (is_deleted, _) = + decode::(&bytes) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; + + let data_sizes = decode_sequence::(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?; + + Ok(Self { is_deleted, data_sizes } ) + } +} + diff --git a/storage_engine/src/lib.rs b/storage_engine/src/lib.rs index 164e164..65f1a06 100644 --- a/storage_engine/src/lib.rs +++ b/storage_engine/src/lib.rs @@ -3,3 +3,6 @@ mod binary_coding; mod error; mod index; mod cursor; +mod entry; +mod entry_header; +mod store_header; diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index fd38d58..e09e481 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -1,17 +1,15 @@ -use tokio::sync::{Mutex, RwLock}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{BufReader, BufWriter, AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; -use tokio::fs; -use std::path::Path; - mod storage_engine; mod binary_coding; mod error; mod index; mod cursor; +mod entry; +mod entry_header; +mod store_header; -use crate::storage_engine::*; -use crate::cursor::*; +use crate::entry::{Entry, EntryDetailed}; +use crate::storage_engine::{Store, FilePosition}; +use crate::cursor::{ReadCursor, WriteCursor}; type Data = u32; @@ -46,7 +44,7 @@ async fn create_or_connect() -> Result> { } -async fn append_entry(cursor: &mut WriteCursor<'_, Data>, entry: &Entry) -> Result{ +async fn append_entry(cursor: &mut WriteCursor<'_, Data>, entry: &Entry) -> Result { println!("APPENDING"); println!("entry == {:?}", entry); let file_position: FilePosition = cursor.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?; @@ -54,7 +52,7 @@ async fn append_entry(cursor: &mut WriteCursor<'_, Data>, entry: &Entry) - Ok(file_position) } -async fn read_entry(cursor: &mut Cursor, file_position: FilePosition) -> Result>>{ +async fn read_entry(cursor: &mut ReadCursor, file_position: FilePosition) -> Result>> { println!("READING ENTRY at file_position={}", file_position); // let entry = cursor.read_entry_at(file_position).await.map_err(|e| e.to_io_or_panic())?; // println!("ENTRY: {:?}", entry); diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index 091c403..4284f74 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -1,22 +1,18 @@ -use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::fs::{File, OpenOptions, DirBuilder}; +use tokio::fs; use std::path::Path; use std::marker::PhantomData; - use async_trait::async_trait; -use bincode; -use bincode::{Decode, Encode}; -use crate::binary_coding::{encode, decode, encode_sequence, encode_sequence_with_sizes, decode_sequence}; -use tokio::fs; - use crate::index::SomethingSupportingLeq; -use crate::error::{Error, DecodeErrorKind}; +use crate::error::Error; use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader}; +use crate::store_header::StoreHeader; -use crate::index::Index; +// TODO +// use crate::index::Index; -use std::mem::size_of; pub type Result = std::result::Result; @@ -42,99 +38,12 @@ pub struct Store { // All pub header: StoreHeader, pub data_type: PhantomData, - - // meta - // location of rows file - // locations of index files - // - // rows file - // list -} - -// Read Cursors don't modify the rows nor Store Header. -// Write Cursors can modify both rows and Store Header. -// Probably should split these into two types. But they will have a lot of functionality in common. -pub struct Cursor { - header: StoreHeader, - file: File, - data_type: PhantomData, - - eof_file_position: FilePosition, -} - -pub enum AccessMode { - Read, - Write } pub type PositionOfValue = FilePosition; pub type PositionOfRow = FilePosition; -#[derive(Debug, Clone)] -pub struct StoreHeader { - pub number_of_columns: usize, - pub deleted_count: usize, - pub total_count: usize, - pub primary_column: Column, -} -impl StoreHeader { - pub const NUMBER_OF_COLUMNS_SIZE: usize = size_of::(); - pub const DELETED_COUNT_SIZE: usize = size_of::(); - pub const TOTAL_COUNT_SIZE: usize = size_of::(); - pub const PRIMARY_COLUMN_SIZE: usize = size_of::(); - pub const SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; - - pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0; - pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; - pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE; - pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE; -} - -#[derive(Debug)] -pub struct EntryHeader { - is_deleted: bool, -} - -#[derive(Debug)] -pub struct EntryHeaderWithDataSize { - pub is_deleted: bool, - pub data_sizes: Vec, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6 - // bytes etc -} -impl EntryHeaderWithDataSize { - pub const IS_DELETED_OFFSET: usize = 0; - pub const IS_DELETED_SIZE: usize = size_of::(); - pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE; - - pub fn size(number_of_columns: usize) -> usize { - let size_of_data_sizes: usize = number_of_columns*size_of::(); - Self::IS_DELETED_SIZE + size_of_data_sizes - } - - pub fn size_of_data(&self) -> usize{ - self.data_sizes.iter().sum() - } -} - -#[derive(Debug)] -pub struct Entry { - header: EntryHeader, - data: Vec, -} - -#[derive(Debug)] -pub struct EntryDetailed { - header: EntryHeaderWithDataSize, - data: Vec, -} - - -pub struct EntryIterator<'a> { - file: &'a mut File, - current_file_position: FilePosition -} - //===Store=== pub async fn store_exists(table_folder: &str) -> Result { Ok(fs::metadata(table_folder).await.is_ok()) @@ -144,10 +53,6 @@ pub async fn less_than_eq(store: &mut Store, file_position0: FilePosition, todo!() } -// pub trait SomethingSupportingLeq { -// async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> std::result::Result; -// } - #[async_trait] impl SomethingSupportingLeq for Store where T: Send @@ -250,107 +155,8 @@ impl Store { } // ===Store Header=== -impl StoreHeader { - fn encode(&self) -> Result> { - let mut result = encode(&self.number_of_columns)?; - result.append(&mut encode(&self.deleted_count)?); - result.append(&mut encode(&self.total_count)?); - result.append(&mut encode(&self.primary_column)?); - Ok(result) - } - - fn decode_buffer() -> [u8; StoreHeader::SIZE] { - [0; StoreHeader::SIZE] - } - - async fn decode(result: &mut [u8]) -> Result { - let (number_of_columns, _) = - decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; - let (deleted_count, _) = - decode::(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; - let (total_count, _) = - decode::(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?; - let (primary_column, _) = - decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; - let header = StoreHeader { - number_of_columns, - deleted_count, - total_count, - primary_column, - }; - - Ok(header) - } - - // returns new count - pub fn increment_total_count(&mut self) -> usize { - self.total_count += 1; - self.total_count - } - - // returns new count - pub fn increment_deleted_count(&mut self) -> usize { - self.deleted_count += 1; - self.deleted_count - } -} // ====Entry==== -impl EntryHeader { - fn encode(self: &EntryHeader) -> Result> { - let result: Vec = encode(&self.is_deleted)?; - Ok(result) - } -} - -impl EntryHeaderWithDataSize { - pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { - let (is_deleted, _) = - decode::(&bytes) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; - - let data_sizes = decode_sequence::(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?; - - Ok(Self { is_deleted, data_sizes } ) - } -} - -impl Entry { - pub fn new(data: Vec) -> Self { - Self { header: EntryHeader { is_deleted: false }, data } - } - - pub fn new_deleted(data: Vec) -> Self { - Self { header: EntryHeader { is_deleted: true}, data } - } - - // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] - pub fn encode(&self) -> Result> - where T: Encode - { - let mut result: Vec = self.header.encode()?; - - let (mut encoded_data, sizes) = encode_sequence_with_sizes(&self.data[..])?; - result.append(&mut encode_sequence(&sizes)?); // sizes of data (fixed by number of columns) - result.append(&mut encoded_data); // data variable size - Ok(result) - } -} - -impl EntryDetailed { - pub fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result - where T: Decode - { - let data = decode_sequence::(number_of_columns, bytes) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; - Ok(EntryDetailed { header, data }) - } -} // impl StorageEngine for ColumnStore { diff --git a/storage_engine/src/store_header.rs b/storage_engine/src/store_header.rs new file mode 100644 index 0000000..fa84478 --- /dev/null +++ b/storage_engine/src/store_header.rs @@ -0,0 +1,72 @@ +use crate::binary_coding::{encode, decode}; +use crate::storage_engine::{Result, Column}; +use crate::error::{Error, DecodeErrorKind}; +use std::mem::size_of; + +#[derive(Debug, Clone)] +pub struct StoreHeader { + pub number_of_columns: usize, + pub deleted_count: usize, + pub total_count: usize, + pub primary_column: Column, +} + +impl StoreHeader { + pub const NUMBER_OF_COLUMNS_SIZE: usize = size_of::(); + pub const DELETED_COUNT_SIZE: usize = size_of::(); + pub const TOTAL_COUNT_SIZE: usize = size_of::(); + pub const PRIMARY_COLUMN_SIZE: usize = size_of::(); + pub const SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; + + pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0; + pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; + pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE; + pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE; + + pub fn encode(&self) -> Result> { + let mut result = encode(&self.number_of_columns)?; + result.append(&mut encode(&self.deleted_count)?); + result.append(&mut encode(&self.total_count)?); + result.append(&mut encode(&self.primary_column)?); + Ok(result) + } + + pub fn decode_buffer() -> [u8; StoreHeader::SIZE] { + [0; StoreHeader::SIZE] + } + + pub async fn decode(result: &mut [u8]) -> Result { + let (number_of_columns, _) = + decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; + let (deleted_count, _) = + decode::(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; + let (total_count, _) = + decode::(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?; + let (primary_column, _) = + decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; + let header = StoreHeader { + number_of_columns, + deleted_count, + total_count, + primary_column, + }; + + Ok(header) + } + + // returns new count + pub fn increment_total_count(&mut self) -> usize { + self.total_count += 1; + self.total_count + } + + // returns new count + pub fn increment_deleted_count(&mut self) -> usize { + self.deleted_count += 1; + self.deleted_count + } +}