diff --git a/storage_engine/src/binary_coding.rs b/storage_engine/src/binary_coding.rs index b6fe132..5e6378d 100644 --- a/storage_engine/src/binary_coding.rs +++ b/storage_engine/src/binary_coding.rs @@ -45,6 +45,18 @@ pub fn encode_sequence(ts: &[T]) -> Result, bincode::error::E Ok(result) } +pub fn encode_sequence_with_sizes(ts: &[T]) -> Result<(Vec, Vec), bincode::error::EncodeError> { + let mut result_bytes = vec![]; + let mut sizes = Vec::with_capacity(ts.len()); + for t in ts { + let mut bytes = encode(&t)?; + sizes.push(bytes.len()); + result_bytes.append(&mut bytes); + + } + Ok((result_bytes, sizes)) +} + pub fn decode_sequence(len: usize, bytes: &[u8]) -> Result, bincode::error::DecodeError> { let mut result: Vec = Vec::with_capacity(len); let mut offset = 0; diff --git a/storage_engine/src/error.rs b/storage_engine/src/error.rs index 898be3a..10a74ae 100644 --- a/storage_engine/src/error.rs +++ b/storage_engine/src/error.rs @@ -11,9 +11,10 @@ pub enum DecodeErrorKind { StoreHeaderNumberOfColumns, StoreHeaderDeletedCount, StoreHeaderTotalCount, + StoreHeaderPrimaryColumn, EntryData, EntryIsDeleted, - EntryDataSize + EntryHeaderWithDataSizes, } // ===Errors=== diff --git a/storage_engine/src/index.rs b/storage_engine/src/index.rs new file mode 100644 index 0000000..b7e48ab --- /dev/null +++ b/storage_engine/src/index.rs @@ -0,0 +1,68 @@ +use std::marker::PhantomData; +use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; +use tokio::fs::{File, OpenOptions, DirBuilder}; +use std::path::Path; + +use std::collections::{BTreeMap}; + +use bincode; +use bincode::{Decode, Encode}; +use crate::binary_coding::{encode, decode, encode_sequence, decode_sequence}; +use tokio::fs; + +use crate::error::{Error, DecodeErrorKind}; + +use std::mem::size_of; + +type Result = std::result::Result; + +// Implements a persistant self-balancing Binary Search Tree. Nope. +// We need fixed-size nodes. But we want to index Strings which are variable length. + +pub struct Index { + file: File, + // None means index is asleep on disk. + in_memory: Option>, + header: IndexHeader, + key_type: PhantomData, + value_type: PhantomData, +} + +pub struct IndexHeader { +} + +impl Index { + pub async fn new(file_name: &str) -> Result> { + todo!() + } + + pub async fn connect(file_name: &str) -> Result> { + todo!() + } + + // Saves the in-memory index to disk and deallocates. + pub async fn sleep() -> Result> { + todo!() + } + + // Loads the index into memory + pub async fn wake() -> Result> { + todo!() + } + + pub async fn insert() -> Result<()> + where I: Encode, V: Encode + { + todo!() + } + + pub async fn lookup(&mut self, k: I) -> Result> + where I: Encode + Decode, + { + todo!() + } + + pub async fn delete(&mut self, k: I) -> Result> { + todo!() + } +} diff --git a/storage_engine/src/lib.rs b/storage_engine/src/lib.rs index 7ce85ce..192f3db 100644 --- a/storage_engine/src/lib.rs +++ b/storage_engine/src/lib.rs @@ -1,3 +1,4 @@ pub mod storage_engine; mod binary_coding; mod error; +mod index; diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index 6518649..0bdd82c 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -7,6 +7,7 @@ use std::path::Path; mod storage_engine; mod binary_coding; mod error; +mod index; use crate::storage_engine::*; @@ -17,7 +18,7 @@ const TABLE_PATH: &'static str = "test_table"; type Result = std::result::Result; async fn create_store() -> Result> { - let mut store: Store = Store::new(TABLE_PATH, 5).await.map_err(|e| e.to_io_or_panic())?; + let mut store: Store = Store::new(TABLE_PATH, 5, 0).await.map_err(|e| e.to_io_or_panic())?; println!("CREATED"); println!("{:?}", store.read_all_bytes().await?); @@ -77,7 +78,7 @@ async fn main() -> Result<()> { // let entry0: Entry = Entry::new(vec![99, 98, 97, 96, 95]); // append_entry(&mut store, &entry0).await?; - store.read_entries(3).await.map_err(|e| e.to_io_or_panic())?; + store.read_entries(2).await.map_err(|e| e.to_io_or_panic())?; // let entry2: StoreEntry = StoreEntry::new_deleted(vec![3, 2, 1]); diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index 9e12041..2555939 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -5,7 +5,7 @@ use std::marker::PhantomData; use bincode; use bincode::{Decode, Encode}; -use crate::binary_coding::{encode, decode, encode_sequence, decode_sequence}; +use crate::binary_coding::{encode, decode, encode_sequence, encode_sequence_with_sizes, decode_sequence}; use tokio::fs; use crate::error::{Error, DecodeErrorKind}; @@ -37,16 +37,19 @@ pub struct StoreHeader { number_of_columns: usize, deleted_count: usize, total_count: usize, + primary_column: Column, } impl StoreHeader { const NUMBER_OF_COLUMNS_SIZE: usize = size_of::(); const DELETED_COUNT_SIZE: usize = size_of::(); const TOTAL_COUNT_SIZE: usize = size_of::(); - const SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE; + const PRIMARY_COLUMN_SIZE: usize = size_of::(); + const SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; const NUMBER_OF_COLUMNS_OFFSET: usize = 0; const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE; + const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE; } #[derive(Debug)] @@ -61,12 +64,22 @@ impl EntryHeader { #[derive(Debug)] pub struct EntryHeaderWithDataSize { is_deleted: bool, - data_size: usize, // in bytes + data_sizes: Vec, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6 + // bytes etc } impl EntryHeaderWithDataSize { + const IS_DELETED_OFFSET: usize = 0; const IS_DELETED_SIZE: usize = size_of::(); - const DATA_SIZE_SIZE: usize = size_of::(); - const SIZE: usize = Self::IS_DELETED_SIZE + Self::DATA_SIZE_SIZE; + const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE; + + fn size(number_of_columns: usize) -> usize { + let size_of_data_sizes: usize = number_of_columns*size_of::(); + Self::IS_DELETED_SIZE + size_of_data_sizes + } + + fn size_of_data(&self) -> usize{ + self.data_sizes.iter().sum() + } } #[derive(Debug)] @@ -142,7 +155,7 @@ impl Store { const ROWS_FILE_NAME: &'static str = "rows"; // ===Creation=== - pub async fn new(table_folder: &str, number_of_columns: usize) -> Result { + pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result { let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(Self::ROWS_FILE_NAME); DirBuilder::new() @@ -160,9 +173,12 @@ impl Store { number_of_columns, deleted_count: 0, total_count: 0, + primary_column, }; let encoded_header: Vec = header.encode()?; + println!("ENCODED_HEADER: {:?}", encoded_header); + let mut store = Self { table_folder: table_folder.to_string(), file, @@ -173,7 +189,9 @@ impl Store { Ok(store) } - pub async fn connect(table_folder: &str) -> Result { + pub async fn connect(table_folder: &str) -> Result + where T: std::fmt::Debug + { let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(Self::ROWS_FILE_NAME); @@ -190,12 +208,14 @@ impl Store { file.read_exact(&mut header_bytes).await?; let header = StoreHeader::decode(&mut header_bytes).await?; - Ok(Self { + let store = Self { table_folder: table_folder.to_string(), file, header, data_type: PhantomData::, - }) + }; + println!("just connected TOOOOO {:?}", store); + Ok(store) } // ===Append Entry=== @@ -247,9 +267,10 @@ impl Store { self.seek_to(cursor).await?; self.file.seek(SeekFrom::Start(cursor)).await?; - let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::SIZE]; + let number_of_columns: usize = self.header.number_of_columns; + let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; self.read_bytes(&mut header_bytes).await?; - let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..])?; + let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?; // TODO: Get rid of the println's // println!("HEADER_BYTES: {:?}", header_bytes); // println!("HEADER: {:?}", header); @@ -257,21 +278,12 @@ impl Store { Ok(header) } - pub async fn read_entry_data(&mut self, header: &EntryHeaderWithDataSize) -> Result> { - let mut data_bytes: Vec = vec![0; header.data_size]; - // TODO: Get rid of the println's - // println!("HEADER_BYTES: {:?}", header_bytes); - // println!("PREPARED_DATA_BYTES: {:?}", data_bytes); - self.read_bytes(&mut data_bytes).await?; - todo!() - } - pub async fn read_entry_at(&mut self, cursor: Cursor) -> Result> where T: Decode { let header = self.read_entry_header_at(cursor).await?; - let mut data_bytes: Vec = vec![0; header.data_size]; + let mut data_bytes: Vec = vec![0; header.size_of_data()]; // TODO: Get rid of the println's // println!("PREPARED_DATA_BYTES: {:?}", data_bytes); self.read_bytes(&mut data_bytes).await?; @@ -294,15 +306,29 @@ impl Store { } Ok(()) } + + pub async fn search_for_entry_with_id(&mut self, id: T) -> Result>> { + // TODO: make call to the primary index + todo!() + } + + // TODO: This needs to be some sort of an iterator + pub async fn get_eq(&self, column: Column, value: T) -> Result>> { + todo!() + } + + pub async fn garbage_collect(&mut self) -> Result<()> { + todo!() + } } // ===Store Header=== impl StoreHeader { fn encode(&self) -> Result> { - // FORMAT: First Number of Columns, Then Deleted Count. let mut result = encode(&self.number_of_columns)?; result.append(&mut encode(&self.deleted_count)?); result.append(&mut encode(&self.total_count)?); + result.append(&mut encode(&self.primary_column)?); Ok(result) } @@ -317,13 +343,17 @@ impl StoreHeader { let (deleted_count, _) = decode::(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; - let (total_count, _offset) = + let (total_count, _) = decode::(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?; + let (primary_column, _) = + decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; let header = StoreHeader { number_of_columns, deleted_count, total_count, + primary_column, }; Ok(header) @@ -351,15 +381,15 @@ impl EntryHeader { } impl EntryHeaderWithDataSize { - fn decode(bytes: &mut [u8]) -> Result { - let (is_deleted, offset) = + fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { + let (is_deleted, _) = decode::(&bytes) .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; - let (data_size, _) = - decode::(&bytes[offset..]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryDataSize, e))?; - Ok(Self { is_deleted, data_size} ) + let data_sizes = decode_sequence::(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?; + + Ok(Self { is_deleted, data_sizes } ) } } @@ -372,19 +402,17 @@ impl Entry { Self { header: EntryHeader { is_deleted: true}, data } } - // FORMAT: [HEADER, ..sequence of data] - // HEADER: [Boolean (one byte), number of bytes in the data (not including the boolean)] - fn encode(self: &Entry) -> Result> + // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] + fn encode(&self) -> Result> where T: Encode { let mut result: Vec = self.header.encode()?; - let mut encoded_data = encode_sequence(&self.data[..])?; - let encoded_data_len: usize = encoded_data.len(); - result.append(&mut encode(&encoded_data_len)?); // usize 8 bytes + + let (mut encoded_data, sizes) = encode_sequence_with_sizes(&self.data[..])?; + result.append(&mut encode_sequence(&sizes)?); // sizes of data (fixed by number of columns) result.append(&mut encoded_data); // data variable size Ok(result) } - } impl EntryDetailed {