diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 8fefaa6..0c3a5df 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -72,10 +72,6 @@ pub trait PrimitiveCursor { Ok(file_position) } - async fn seek_to_start_of_data(&mut self) -> Result { - self.seek_to(StoreHeader::SIZE as u64).await - } - // Seeks from current position by offset and returns new file position async fn seek_by(&mut self, offset: i64) -> Result { let file_position = self.file().seek(SeekFrom::Current(offset)).await?; @@ -98,6 +94,10 @@ pub trait PrimitiveCursor { pub trait CursorWithStoreHeader: PrimitiveCursor { fn header(&self) -> &StoreHeader; + async fn seek_to_start_of_data(&mut self) -> Result { + self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await + } + async fn read_entry_header(&mut self) -> Result { let number_of_columns: usize = self.header().number_of_columns; let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; @@ -349,7 +349,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> Ok(cursor) } - pub async fn connect<'header: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader) -> Result + pub async fn connect<'header: 'cursor, 'indexes: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader, indexes: &'indexes mut Vec>>) -> Result where T: Send { let file: File = @@ -363,7 +363,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> header, file, data_type: PhantomData::, - indexes: todo!(), + indexes, eof_file_position: 0, }; @@ -485,7 +485,8 @@ impl <'cursor, T> WriteCursor<'cursor, T> number_of_columns: self.header.number_of_columns, deleted_count: 0, total_count: 0, - primary_column: self.header.primary_column + primary_column: self.header.primary_column, + indexed_columns: todo!() }; // Creates a new cursor to the intermediate file in which we'll dump the live entries. diff --git a/storage_engine/src/error.rs b/storage_engine/src/error.rs index 2c7817c..951f167 100644 --- a/storage_engine/src/error.rs +++ b/storage_engine/src/error.rs @@ -12,6 +12,7 @@ pub enum DecodeErrorKind { StoreHeaderDeletedCount, StoreHeaderTotalCount, StoreHeaderPrimaryColumn, + StoreHeaderIndexedColumns, EntryData, EntryIsDeleted, EntryHeaderWithDataSizes, diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index d9b2562..5f4bb69 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -89,9 +89,9 @@ async fn main() -> Result<()> { } { - let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?; + // let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?; - let entry: Entry = Entry::new(vec![60, 50, 40, 30, 20]); + // let entry: Entry = Entry::new(vec![60, 50, 40, 30, 20]); // let file_position = append_entry(&mut cursor, &entry).await?; // let file_position = 215; // cursor.seek_to(file_position).await.map_err(|e| e.to_io_or_panic())?; @@ -107,20 +107,23 @@ async fn main() -> Result<()> { // println!("{:?}", store); // println!("{:?}", store.read_all_bytes().await?); - { let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); + } + + { + let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); } { @@ -138,13 +141,13 @@ async fn main() -> Result<()> { println!("{:?}", x); } - { - let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; - let column = 3; - let t0 = 6; - let x = cursor.find_first_eq_bruteforce(column, &t0).await.map_err(|e| e.to_io_or_panic())?; - println!("{:?}", x); - } + // { + // let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + // let column = 3; + // let t0 = 6; + // let x = cursor.find_first_eq_bruteforce(column, &t0).await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // } diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index a753a3d..d696e38 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -59,12 +59,17 @@ impl Store { DirBuilder::new() .create(path_to_table).await?; - let header = StoreHeader { - table_folder: table_folder.to_string(), - number_of_columns, - deleted_count: 0, - total_count: 0, - primary_column, + let header = { + let mut indexed_columns = vec![false; number_of_columns]; + indexed_columns[primary_column as usize] = true; + StoreHeader { + table_folder: table_folder.to_string(), + number_of_columns, + deleted_count: 0, + total_count: 0, + primary_column, + indexed_columns, + } }; // We don't need the file right now. Only cursors will later open it. @@ -117,9 +122,16 @@ impl Store { // Unfortunately we can't yet use store.read_bytes, since it can't be created without the // header. - let mut header_bytes = StoreHeader::decode_buffer(); - file.read_exact(&mut header_bytes).await?; - let header = StoreHeader::decode(table_folder, &mut header_bytes).await?; + let header = { + let mut fixed_header_bytes = StoreHeader::buffer_for_fixed_decoding(); + file.read_exact(&mut fixed_header_bytes).await?; + let fixed_header = StoreHeader::decode_fixed(table_folder, &fixed_header_bytes).await?; + + // decode the indexes + let mut rest_bytes: Vec = StoreHeader::buffer_for_rest_decoding(&fixed_header); + file.read_exact(&mut rest_bytes).await?; + StoreHeader::decode_rest(fixed_header, &rest_bytes).await? + }; // let primary_index: Index = Index::connect( diff --git a/storage_engine/src/store_header.rs b/storage_engine/src/store_header.rs index fc571a4..31b23f0 100644 --- a/storage_engine/src/store_header.rs +++ b/storage_engine/src/store_header.rs @@ -1,4 +1,4 @@ -use crate::binary_coding::{encode, decode}; +use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence}; use crate::storage_engine::{Result, Column}; use crate::error::{Error, DecodeErrorKind}; use std::mem::size_of; @@ -11,8 +11,17 @@ pub struct StoreHeader { pub deleted_count: usize, pub total_count: usize, pub primary_column: Column, - // TODO - // pub indexed_columns: Vec, + pub indexed_columns: Vec, +} + +#[derive(Debug, Clone)] +pub struct StoreHeaderFixedPart { + pub table_folder: String, // This one is not encoded into the file + + pub number_of_columns: usize, + pub deleted_count: usize, + pub total_count: usize, + pub primary_column: Column, } impl StoreHeader { @@ -20,15 +29,20 @@ impl StoreHeader { pub const DELETED_COUNT_SIZE: usize = size_of::(); pub const TOTAL_COUNT_SIZE: usize = size_of::(); pub const PRIMARY_COLUMN_SIZE: usize = size_of::(); - pub const SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; + pub const FIXED_SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0; pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE; pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE; + pub const INDEXED_COLUMNS_OFFSET: usize = Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE; - fn indexed_columns_size(&self) -> usize { - size_of::() * self.number_of_columns + fn indexed_columns_size(number_of_columns: usize) -> usize { + size_of::() * number_of_columns + } + + pub fn size(number_of_columns: usize) -> usize { + Self::FIXED_SIZE + Self::indexed_columns_size(number_of_columns) } pub fn encode(&self) -> Result> { @@ -36,14 +50,19 @@ impl StoreHeader { result.append(&mut encode(&self.deleted_count)?); result.append(&mut encode(&self.total_count)?); result.append(&mut encode(&self.primary_column)?); + result.append(&mut encode_sequence(&self.indexed_columns)?); Ok(result) } - pub fn decode_buffer() -> [u8; StoreHeader::SIZE] { - [0; StoreHeader::SIZE] + pub fn buffer_for_fixed_decoding() -> [u8; Self::FIXED_SIZE] { + [0; Self::FIXED_SIZE] } - pub async fn decode(table_folder: &str, result: &mut [u8]) -> Result { + pub fn buffer_for_rest_decoding(header: &StoreHeaderFixedPart) -> Vec { + vec![0; Self::indexed_columns_size(header.number_of_columns)] + } + + pub async fn decode_fixed(table_folder: &str, result: &[u8]) -> Result { let (number_of_columns, _) = decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; @@ -56,7 +75,7 @@ impl StoreHeader { let (primary_column, _) = decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; - let header = StoreHeader { + let header = StoreHeaderFixedPart { table_folder: table_folder.to_string(), number_of_columns, deleted_count, @@ -67,6 +86,24 @@ impl StoreHeader { Ok(header) } + pub async fn decode_rest(header: StoreHeaderFixedPart, result: &[u8]) -> Result { + let indexed_columns: Vec = + decode_sequence::(header.number_of_columns, result) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?; + + Ok(StoreHeader { + table_folder: header.table_folder, + number_of_columns: header.number_of_columns, + deleted_count: header.deleted_count, + total_count: header.total_count, + primary_column: header.primary_column, + + indexed_columns, + }) + } + + + // returns new count pub fn increment_total_count(&mut self) -> usize { self.total_count += 1;