From cad4ba82155d89632f2d43b1e7079b2e899ef8d7 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Fri, 2 Feb 2024 13:56:37 +0100 Subject: [PATCH] Print first n entries --- storage_engine/src/binary_coding.rs | 78 ++++ storage_engine/src/error.rs | 44 ++ storage_engine/src/lib.rs | 3 +- storage_engine/src/main.rs | 94 +++-- storage_engine/src/storage_engine.rs | 578 ++++++++++++++------------- 5 files changed, 481 insertions(+), 316 deletions(-) create mode 100644 storage_engine/src/binary_coding.rs create mode 100644 storage_engine/src/error.rs diff --git a/storage_engine/src/binary_coding.rs b/storage_engine/src/binary_coding.rs new file mode 100644 index 0000000..b6fe132 --- /dev/null +++ b/storage_engine/src/binary_coding.rs @@ -0,0 +1,78 @@ +use bincode; +use bincode::{Decode, Encode}; +use bincode::config::{BigEndian, Configuration, Fixint}; +use std::mem::size_of; + +const BIN_CONFIG: Configuration = bincode::config::standard().with_big_endian().with_fixed_int_encoding(); + +pub fn encode(t: &T) -> Result, bincode::error::EncodeError> { + bincode::encode_to_vec(t, BIN_CONFIG) +} + +pub fn decode(bytes: &[u8]) -> Result<(T, usize), bincode::error::DecodeError> { + bincode::decode_from_slice(bytes, BIN_CONFIG) +} + +pub fn encode_vector(ts: &[T]) -> Result, bincode::error::EncodeError> { + let size: usize = ts.len(); + let mut result = encode(&size)?; + for t in ts { + result.append(&mut encode(&t)?); + } + Ok(result) +} + +pub fn decode_vector(bytes: &[u8]) -> Result, bincode::error::DecodeError> { + let mut offset = size_of::(); + let result_len: usize = decode(&bytes[..offset])?.0; + let mut result: Vec = Vec::with_capacity(result_len); + for _ in 0..result_len { + let (x, bytes_consumed) = decode::(&bytes[offset..])?; + offset += bytes_consumed; + result.push(x); + } + Ok(result) +} + +// We don't care about encoding the length here (since it will be used for a row with known column +// size) +pub fn encode_sequence(ts: &[T]) -> Result, bincode::error::EncodeError> { + let mut result = vec![]; + for t in ts { + result.append(&mut encode(&t)?); + + } + Ok(result) +} + +pub fn decode_sequence(len: usize, bytes: &[u8]) -> Result, bincode::error::DecodeError> { + let mut result: Vec = Vec::with_capacity(len); + let mut offset = 0; + for _ in 0..len { + let (x, bytes_consumed) = decode::(&bytes[offset..])?; + offset += bytes_consumed; + result.push(x); + } + Ok(result) +} + + +fn example_encoding_decoding() { + let xs: Vec = vec![123, 250, 256, 123, 123, 123]; + let xs: Vec = vec![]; + let xs: Vec = vec![123]; + let xs: Vec = vec![123, 250]; + + let xs: Vec = vec!["foo".to_string(), "bar".to_string()]; + + + println!("original {:?}", xs); + let exs = encode_vector(&xs[..]).unwrap(); + println!("encoded {:?}", exs); + + // WARNING: Don't forget to specify the type here + // let dxs = decode_vector::(&exs[..]).unwrap(); + let dxs = decode_vector::(&exs[..]).unwrap(); + println!("decoded {:?}", dxs); +} + diff --git a/storage_engine/src/error.rs b/storage_engine/src/error.rs new file mode 100644 index 0000000..7c213f3 --- /dev/null +++ b/storage_engine/src/error.rs @@ -0,0 +1,44 @@ +#[derive(Debug)] +pub enum Error { + DecodeError(DecodeErrorKind, bincode::error::DecodeError), + EncodeError(bincode::error::EncodeError), + IoError(std::io::Error), + InvalidStoreHeader, +} + +#[derive(Debug)] +pub enum DecodeErrorKind { + StoreHeaderNumberOfColumns, + StoreHeaderDeletedCount, + EntryData, + EntryIsDeleted, + EntryDataSize +} + +// ===Errors=== +impl Error { + pub fn to_io_or_panic(self) -> std::io::Error { + use Error::*; + match self { + IoError(err) => err, + err => { + println!("{:?}", err); + panic!(); + } + } + } +} + +impl From for Error { + fn from(err: bincode::error::EncodeError) -> Self { + Self::EncodeError(err) + } +} + +impl From for Error { + fn from(err: std::io::Error) -> Self { + Self::IoError(err) + } +} + + diff --git a/storage_engine/src/lib.rs b/storage_engine/src/lib.rs index 0e2eb79..7ce85ce 100644 --- a/storage_engine/src/lib.rs +++ b/storage_engine/src/lib.rs @@ -1,2 +1,3 @@ - pub mod storage_engine; +mod binary_coding; +mod error; diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index fffde93..ac6fa88 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -2,47 +2,85 @@ use tokio::sync::{Mutex, RwLock}; use tokio::fs::{File, OpenOptions}; use tokio::io::{BufReader, BufWriter, AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; use tokio::fs; +use std::path::Path; mod storage_engine; +mod binary_coding; +mod error; use crate::storage_engine::*; +type Data = u32; + +const TABLE_PATH: &'static str = "test_table"; + +type Result = std::result::Result; + +async fn create_store() -> Result> { + let mut store: Store = Store::new(TABLE_PATH, 5).await.map_err(|e| e.to_io_or_panic())?; + println!("CREATED"); + println!("{:?}", store.read_all_bytes().await?); + + let entry0: Entry = Entry::new_deleted(vec![1, 2, 3, 4, 5]); + append_entry(&mut store, &entry0).await?; + + let entry1: Entry = Entry::new_deleted(vec![200, 200, 5, 6, 7]); + append_entry(&mut store, &entry1).await?; + + println!("{:?}", store.read_all_bytes().await?); + Ok(store) +} +async fn connect_store() -> Result> { + let mut store: Store = Store::connect(TABLE_PATH).await.map_err(|e| e.to_io_or_panic())?; + println!("CONNECTED"); + println!("{:?}", store.read_all_bytes().await?); + Ok(store) +} + +async fn create_or_connect() -> Result> { + let exists = storage_engine::store_exists(TABLE_PATH).await.map_err(|e| e.to_io_or_panic())?; + if exists { + connect_store().await + } else { + create_store().await + } +} + + +async fn append_entry(store: &mut Store, entry: &Entry) -> Result{ + println!("APPENDING"); + println!("entry == {:?}", entry); + let cursor: Cursor = store.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?; + println!("cursor == {:?}", cursor); + Ok(cursor) +} + +async fn read_entry(store: &mut Store, cursor: Cursor) -> Result>{ + println!("READING ENTRY at cursor={}", cursor); + let entry = store.read_entry_at(cursor).await.map_err(|e| e.to_io_or_panic())?; + println!("ENTRY: {:?}", entry); + Ok(entry) +} + + #[tokio::main] -async fn main() -> Result<(), std::io::Error> { +async fn main() -> Result<()> { println!("STOOOOOOOOOOOORAAAAAAAAAAAGE"); - let blob_name = "blob10.minisql"; + let mut store: Store = create_or_connect().await?; - // WARNING: Number of columns is 5????? - - let mut store = Store::new(blob_name, 5).await.map_err(|e| e.to_io_or_panic())?; - - // let store_bytes = store.get_all_bytes().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", store_bytes); - - let mut buff: Vec = vec![0;1]; - let x = store.file.read_exact(&mut buff[..]).await?; - println!("{:?}", buff); - - - // let entry0: Entry = Entry::new_deleted(vec![1, 2, 3, 4, 5]); - // let entry1: Entry = Entry::new_deleted(vec![200,200,5,6,7]); - // let cursor0 = store.append_entry(&entry0).await.map_err(|e| e.to_io_or_panic())?; - // // println!("cursor0 = {}", cursor0); - - // let cursor1 = store.append_entry(&entry1).await.map_err(|e| e.to_io_or_panic())?; - // println!("cursor0 = {}, cursor1 = {}", cursor0, cursor1); - - // let mut store = Store::connect(blob_name).await.map_err(|e| e.to_io_or_panic())?; + // let entry0 = read_entry(&mut store, 16).await?; + // let entry1 = read_entry(&mut store, 45).await?; // println!("{:?}", store); + // println!("{:?}", store.read_all_bytes().await?); + + // let entry0: Entry = Entry::new(vec![99, 98, 97, 96, 95]); + // append_entry(&mut store, &entry0).await?; + + store.read_entries(4).await.map_err(|e| e.to_io_or_panic())?; - // let x = store.entry_at::(16).await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let store_bytes = store.get_all_bytes().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", store_bytes); - // let mut store = ColumnStore::connect("blob08.minisql").await.map_err(|e| e.to_io_or_panic())?; // let entry2: StoreEntry = StoreEntry::new_deleted(vec![3, 2, 1]); // let cursor2 = store.append_entry(&entry2).await.map_err(|e| e.to_io_or_panic())?; // println!("cursor2 = {}", cursor2); diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs index 802444e..d947d14 100644 --- a/storage_engine/src/storage_engine.rs +++ b/storage_engine/src/storage_engine.rs @@ -1,101 +1,29 @@ -use tokio::io::{BufReader, BufWriter, AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; -use tokio::sync::{Mutex, RwLock}; -use tokio::fs::{File, OpenOptions}; -use tokio::fs; +use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; +use tokio::fs::{File, OpenOptions, DirBuilder}; +use std::path::Path; +use std::marker::PhantomData; use bincode; -use bincode::de::Decoder; -use bincode::enc::write::Writer; -use bincode::enc::Encoder; -use bincode::{BorrowDecode, Decode, Encode}; -use bincode::config::{BigEndian, Configuration, Fixint}; +use bincode::{Decode, Encode}; +use crate::binary_coding::{encode, decode, encode_sequence, decode_sequence}; +use tokio::fs; + +use crate::error::{Error, DecodeErrorKind}; use std::mem::size_of; +type Result = std::result::Result; -// =============Byte encoding/decoding============ -const BIN_CONFIG: Configuration = bincode::config::standard().with_big_endian().with_fixed_int_encoding(); +pub type Column = u64; +pub type Cursor = u64; -fn encode(t: &T) -> Result, bincode::error::EncodeError> { - bincode::encode_to_vec(t, BIN_CONFIG) -} - -fn decode(bytes: &[u8]) -> Result<(T, usize), bincode::error::DecodeError> { - bincode::decode_from_slice(bytes, BIN_CONFIG) -} - -fn encode_vector(ts: &[T]) -> Result, bincode::error::EncodeError> { - let size: usize = ts.len(); - let mut result = encode(&size)?; - for t in ts { - result.append(&mut encode(&t)?); - } - Ok(result) -} - -fn decode_vector(bytes: &[u8]) -> Result, bincode::error::DecodeError> { - let mut offset = size_of::(); - let result_len: usize = decode(&bytes[..offset])?.0; - let mut result: Vec = Vec::with_capacity(result_len); - for _ in 0..result_len { - let (x, bytes_consumed) = decode::(&bytes[offset..])?; - offset += bytes_consumed; - result.push(x); - } - Ok(result) -} - -// We don't care about encoding the length here (since it will be used for a row with known column -// size) -fn encode_sequence(ts: &[T]) -> Result, bincode::error::EncodeError> { - let mut result = vec![]; - for t in ts { - result.append(&mut encode(&t)?); - - } - Ok(result) -} - -fn decode_sequence(len: usize, bytes: &[u8]) -> Result, bincode::error::DecodeError> { - let mut result: Vec = Vec::with_capacity(len); - let mut offset = 0; - for _ in 0..len { - let (x, bytes_consumed) = decode::(&bytes[offset..])?; - offset += bytes_consumed; - result.push(x); - } - Ok(result) -} - - -fn example_encoding_decoding() { - let xs: Vec = vec![123, 250, 256, 123, 123, 123]; - let xs: Vec = vec![]; - let xs: Vec = vec![123]; - let xs: Vec = vec![123, 250]; - - let xs: Vec = vec!["foo".to_string(), "bar".to_string()]; - - - println!("original {:?}", xs); - let exs = encode_vector(&xs[..]).unwrap(); - println!("encoded {:?}", exs); - - // WARNING: Don't forget to specify the type here - // let dxs = decode_vector::(&exs[..]).unwrap(); - let dxs = decode_vector::(&exs[..]).unwrap(); - println!("decoded {:?}", dxs); -} - -// ============Column Store=============== - -// ColumnStore +// TODO: Consider introducing a phantom type for the data that's used in the store. #[derive(Debug)] -pub struct Store { - column_file_name: String, - // TODO: This should be private - pub file: File, - header: StoreHeader +pub struct Store { + table_folder: String, + file: File, + header: StoreHeader, + data_type: PhantomData, // meta // location of rows file // locations of index files @@ -104,246 +32,322 @@ pub struct Store { // list } -// These type aliases are here because they make writing decoders easier. -type NumberOfColumns = usize; -type DeletedCount = usize; #[derive(Debug)] pub struct StoreHeader { - number_of_columns: NumberOfColumns, - deleted_count: DeletedCount, + number_of_columns: usize, + deleted_count: usize, +} +impl StoreHeader { + const NUMBER_OF_COLUMNS_SIZE: usize = size_of::(); + const DELETED_COUNT_SIZE: usize = size_of::(); + const SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE; +} + +#[derive(Debug)] +pub struct EntryHeader { + is_deleted: bool, +} +impl EntryHeader { + const IS_DELETED_SIZE: usize = size_of::(); + const HEADER_SIZE: usize = Self::IS_DELETED_SIZE; +} + +#[derive(Debug)] +pub struct EntryHeaderWithDataSize { + is_deleted: bool, + data_size: usize, // in bytes +} +impl EntryHeaderWithDataSize { + const IS_DELETED_SIZE: usize = size_of::(); + const DATA_SIZE_SIZE: usize = size_of::(); + const SIZE: usize = Self::IS_DELETED_SIZE + Self::DATA_SIZE_SIZE; } #[derive(Debug)] pub struct Entry { - is_deleted: bool, - // file_position: FilePosition, + header: EntryHeader, data: Vec, } #[derive(Debug)] -pub enum Error { - DecodeError(DecodeErrorKind, bincode::error::DecodeError), - EncodeError(bincode::error::EncodeError), - IoError(std::io::Error), - InvalidStoreHeader, +pub struct EntryDetailed { + header: EntryHeaderWithDataSize, + data: Vec, } -#[derive(Debug)] -pub enum DecodeErrorKind { - StoreHeaderNumberOfColumns, - StoreHeaderDeletedCount, - EntryData, - EntryIsDeleted, - EntryDataSize +//===Store=== +pub async fn store_exists(table_folder: &str) -> Result { + Ok(fs::metadata(table_folder).await.is_ok()) } -// ===Errors=== -impl Error { - pub fn to_io_or_panic(self) -> std::io::Error { - use Error::*; - match self { - IoError(err) => err, - err => { - println!("{:?}", err); - panic!(); - } - } - } -} - -impl From for Error { - fn from(err: bincode::error::EncodeError) -> Self { - Self::EncodeError(err) - } -} - -impl From for Error { - fn from(err: std::io::Error) -> Self { - Self::IoError(err) - } -} - - -// ====Entry==== -impl Entry { - pub fn new(data: Vec) -> Self { - Self { is_deleted: false, data } +impl Store { + //===primitive file operations=== + // Moves the cursor right. + async fn write_bytes(&mut self, bytes: &[u8]) -> Result { + Ok(self.file.write(bytes).await?) } - pub fn new_deleted(data: Vec) -> Self { - Self { is_deleted: true, data } + // Moves the cursor right. + async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> { + self.file.read_exact(bytes).await?; + Ok(()) } - // FORMAT: [HEADER, ..sequence of data] - // HEADER: [Boolean (one byte), number of bytes in the data (not including the boolean)] - fn encode(self: &Entry) -> Result, Error> - where T: Encode - { - let mut result: Vec = encode(&self.is_deleted)?; // bool 1 byte - let mut encoded_data = encode_sequence(&self.data[..])?; - let encoded_data_len = encoded_data.len(); - result.append(&mut encode(&encoded_data_len)?); // usize 8 bytes - println!("enc data len == {}", encoded_data_len); - println!("encoded_data == {:?} ", encoded_data); - result.append(&mut encoded_data); // data variable size + // Moves the cursor right. + async fn get_bytes(&mut self, count: usize) -> Result> { + let mut result: Vec = Vec::with_capacity(count); + self.read_bytes(&mut result).await?; Ok(result) } - // in bytes - pub fn header_size() -> usize { - size_of::() + size_of::() + async fn seek_to(&mut self, cursor: Cursor) -> Result<()>{ + self.file.seek(SeekFrom::Start(cursor)).await?; + Ok(()) } - // TODO: Maybe introduce an EntryHeader as a separate type? - pub fn decode_header(header_bytes: Vec) -> Result<(bool, usize), Error> { - let (is_deleted, offset) = - decode::(&header_bytes) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; + async fn seek_to_start(&mut self) -> Result<()> { + self.file.seek(SeekFrom::Start(0)).await?; + Ok(()) + } - let (data_size, _) = - decode::(&header_bytes[offset..]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryDataSize, e))?; - Ok((is_deleted, data_size)) + async fn seek_to_end(&mut self) -> Result<()> { + self.file.seek(SeekFrom::End(0)).await?; + Ok(()) + } + + async fn seek_to_start_of_data(&mut self) -> Result<()> { + self.seek_to(StoreHeader::SIZE as u64).await + } + + async fn current_cursor(&mut self) -> Result { + let next_cursor: Cursor = self.file.stream_position().await?; + Ok(next_cursor) + } + + // For debugging. + // Moves cursor to the end. + pub async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error>{ + let mut bytes: Vec = vec![]; + self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?; + self.file.read_to_end(&mut bytes).await?; + Ok(bytes) + } + + const ROWS_FILE_NAME: &'static str = "rows"; + + // ===Creation=== + pub async fn new(table_folder: &str, number_of_columns: usize) -> Result { + let path_to_table = Path::new(table_folder); + let path_to_rows = path_to_table.join(Self::ROWS_FILE_NAME); + DirBuilder::new() + .create(path_to_table).await?; + + let file: File = + OpenOptions::new() + .write(true) + .read(true) + .create_new(true) + .open(path_to_rows) + .await?; + + let header = StoreHeader { + number_of_columns, + deleted_count: 0, + }; + let encoded_header: Vec = header.encode()?; + + let mut store = Self { + table_folder: table_folder.to_string(), + file, + header, + data_type: PhantomData::, + }; + store.write_bytes(&encoded_header).await?; + Ok(store) + } + + pub async fn connect(table_folder: &str) -> Result { + let path_to_table = Path::new(table_folder); + let path_to_rows = path_to_table.join(Self::ROWS_FILE_NAME); + + let mut file: File = + OpenOptions::new() + .read(true) + .write(true) + .open(path_to_rows) + .await?; + + // Unfortunately we can't yet use store.read_bytes, since it can't be created without the + // header. + let mut header_bytes = StoreHeader::decode_buffer(); + file.read_exact(&mut header_bytes).await?; + let header = StoreHeader::decode(&mut header_bytes).await?; + + Ok(Self { + table_folder: table_folder.to_string(), + file, + header, + data_type: PhantomData::, + }) + } + + // ===Append Entry=== + // Moves cursor to the end. + pub async fn append_entry(&mut self, entry: &Entry) -> Result + where T: Encode + { + let encoded_entry: Vec = entry.encode()?; + self.seek_to_end().await?; + let cursor: Cursor = self.current_cursor().await?; + self.write_bytes(&encoded_entry).await?; + Ok(cursor) + } + + + // ===Lookup=== + // WARNING: The cursor has to be at the start of an entry. Otherwise garbage data will be + // decoded as an entry. + pub async fn read_entry_header_at(&mut self, cursor: Cursor) -> Result { + self.seek_to(cursor).await?; + self.file.seek(SeekFrom::Start(cursor)).await?; + + let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::SIZE]; + self.read_bytes(&mut header_bytes).await?; + let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..])?; + // TODO: Get rid of the println's + // println!("HEADER_BYTES: {:?}", header_bytes); + // println!("HEADER: {:?}", header); + + Ok(header) + } + + pub async fn read_entry_data(&mut self, header: &EntryHeaderWithDataSize) -> Result> { + let mut data_bytes: Vec = vec![0; header.data_size]; + // TODO: Get rid of the println's + // println!("HEADER_BYTES: {:?}", header_bytes); + // println!("PREPARED_DATA_BYTES: {:?}", data_bytes); + self.read_bytes(&mut data_bytes).await?; + todo!() + } + + pub async fn read_entry_at(&mut self, cursor: Cursor) -> Result> + where T: Decode + { + let header = self.read_entry_header_at(cursor).await?; + + let mut data_bytes: Vec = vec![0; header.data_size]; + // TODO: Get rid of the println's + // println!("PREPARED_DATA_BYTES: {:?}", data_bytes); + self.read_bytes(&mut data_bytes).await?; + // println!("DATA_BYTES: {:?}", data_bytes); + let entry: EntryDetailed = + EntryDetailed::decode(header, self.header.number_of_columns, &mut data_bytes)?; + + Ok(entry) + } + + pub async fn read_entries(&mut self, n: usize) -> Result<()> + where T: Decode + std::fmt::Debug + { + self.seek_to_start_of_data().await?; + let mut cursor: Cursor = self.current_cursor().await?; + for i in 0..n { + let entry = self.read_entry_at(cursor).await?; + println!("({}, {:?})", i, entry); + cursor = self.current_cursor().await?; + } + Ok(()) } } -pub type Column = u64; -pub type Cursor = u64; - +// ===Store Header=== impl StoreHeader { - fn encode(&self) -> Result, Error> { + fn encode(&self) -> Result> { // FORMAT: First Number of Columns, Then Deleted Count. let mut result = encode(&self.number_of_columns)?; result.append(&mut encode(&self.deleted_count)?); Ok(result) } - async fn decode(file: &mut File) -> Result { - let number_of_columns_size = size_of::(); - let deleted_count_size = size_of::(); - let header_size: usize = number_of_columns_size + deleted_count_size; - let mut header_bytes: Vec = vec![0; header_size]; - // TODO: Why do we need to have mutable reference for the file when we are reading it? - match file.read_exact(&mut header_bytes).await { - Ok(_) => { - let offset = 0; - let (number_of_columns, offset) = - decode::(&header_bytes[offset..offset + number_of_columns_size]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; - let (deleted_count, _) = - decode::(&header_bytes[offset..offset + deleted_count_size]) - .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; - let header = StoreHeader { - number_of_columns, - deleted_count, - }; - Ok(header) - }, - Err(err) => { - // TODO: When err is of the kind UnexpectedEof, return InvalidStoreHeader - println!("ARE WE HERE?"); - Err(Error::from(err)) - } - } - } -} - - -impl Store { - // For debugging - pub async fn get_all_bytes(mut self) -> Result, Error>{ - let mut bytes: Vec = vec![]; - use std::io::Read; - // for byte in self.file.bytes() { - // } - self.file.read_exact(&mut bytes[..]).await?; - Ok(bytes) + fn decode_buffer() -> [u8; StoreHeader::SIZE] { + [0; StoreHeader::SIZE] } - pub async fn new(column_file_name: &str, number_of_columns: usize) -> Result { - let mut file = - OpenOptions::new() - .write(true) - .read(true) - .create_new(true) - .open(column_file_name) - .await?; - + async fn decode(result: &mut [u8]) -> Result { + let offset = 0; + let (number_of_columns, offset) = + decode::(&result[offset..offset + Self::NUMBER_OF_COLUMNS_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; + let (deleted_count, _) = + decode::(&result[offset..offset + Self::DELETED_COUNT_SIZE]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; let header = StoreHeader { number_of_columns, - deleted_count: 0, - } ; - let encoded_header = header.encode()?; - file.write(&encoded_header).await?; - println!("is something being encoded? {:?}", encoded_header); - let store = Self { - column_file_name: column_file_name.to_string(), - file, - header, + deleted_count, }; - Ok(store) - } - pub async fn connect(column_file_name: &str) -> Result { - let mut file = OpenOptions::new().read(true).write(true).open(column_file_name).await?; - - let header = StoreHeader::decode(&mut file).await?; - Ok(Self { - column_file_name: column_file_name.to_string(), - file, - header - }) - } - - pub async fn entry_at(&mut self, cursor: Cursor) -> Result, Error> { - self.file.seek(SeekFrom::Start(cursor)).await?; - - // 1. read header bytes (fixed number of bytes). - // 2. decode header - // 3. read entry data bytes. - // 4. decode data - // That will tell us how much data there is. - let entry_header_size = Entry::::header_size(); - let mut header_bytes: Vec = vec![0; entry_header_size]; - self.file.read_exact(&mut header_bytes).await?; - - println!("cursor == {}", cursor); - println!("header_bytes == {:?}", header_bytes); - - let (is_deleted, data_size) = Entry::::decode_header(header_bytes)?; - - self.file.seek(SeekFrom::Current(entry_header_size as i64)).await?; - let mut data_bytes: Vec = vec![0; data_size]; - - println!("(is_delted, data_size) = ({}, {})", is_deleted, data_size); - let data = - decode_sequence::(self.header.number_of_columns, &mut data_bytes) - .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; - Ok(Entry { - is_deleted, - data - }) - } - - pub async fn append_entry(&mut self, entry: &Entry) -> Result { - // On linux when opening a file in append mode, the seek is set to 0 - // and only updated after a write. That's why we do the cursor gymnastic at the end. - let encoded_entry: Vec = entry.encode()?; - println!("encoded_entry == {:?}", encoded_entry); - println!("bool size == {}", size_of::()); - println!("usize size == {}", size_of::()); - self.file.write(&encoded_entry).await?; - let next_cursor: Cursor = self.file.stream_position().await?; - let cursor: Cursor = next_cursor - encoded_entry.len() as u64; - Ok(cursor) - } - - pub async fn iterate_all(&mut self) -> Result { - // Loop through the rows and print them out - todo!() + Ok(header) } } +// ====Entry==== +impl EntryHeader { + fn encode(self: &EntryHeader) -> Result> { + let result: Vec = encode(&self.is_deleted)?; + Ok(result) + } +} + +impl EntryHeaderWithDataSize { + fn decode(bytes: &mut [u8]) -> Result { + let (is_deleted, offset) = + decode::(&bytes) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; + + let (data_size, _) = + decode::(&bytes[offset..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryDataSize, e))?; + Ok(Self { is_deleted, data_size} ) + } +} + +impl Entry { + pub fn new(data: Vec) -> Self { + Self { header: EntryHeader { is_deleted: false }, data } + } + + pub fn new_deleted(data: Vec) -> Self { + Self { header: EntryHeader { is_deleted: true}, data } + } + + // FORMAT: [HEADER, ..sequence of data] + // HEADER: [Boolean (one byte), number of bytes in the data (not including the boolean)] + fn encode(self: &Entry) -> Result> + where T: Encode + { + let mut result: Vec = self.header.encode()?; + let mut encoded_data = encode_sequence(&self.data[..])?; + let encoded_data_len: usize = encoded_data.len(); + result.append(&mut encode(&encoded_data_len)?); // usize 8 bytes + result.append(&mut encoded_data); // data variable size + Ok(result) + } + +} + +impl EntryDetailed { + fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result + where T: Decode + { + let data = decode_sequence::(number_of_columns, bytes) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; + Ok(EntryDetailed { header, data }) + } +} + + + + // impl StorageEngine for ColumnStore { // async fn append(&mut self, id: Index, entry: Row) -> Result