diff --git a/Cargo.lock b/Cargo.lock index 5ea9fdc..2944fae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -599,6 +599,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "storage_engine" +version = "0.1.0" +dependencies = [ + "async-trait", + "bincode", + "thiserror", + "tokio", +] + [[package]] name = "strsim" version = "0.10.0" diff --git a/Cargo.toml b/Cargo.toml index 57d0219..3e6cda3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,5 +5,6 @@ members = [ "proto", "server", "client", - "parser" + "parser", + "storage_engine" ] diff --git a/storage_engine/Cargo.toml b/storage_engine/Cargo.toml new file mode 100644 index 0000000..6658764 --- /dev/null +++ b/storage_engine/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "storage_engine" +version = "0.1.0" +edition = "2021" + +[dependencies] +bincode = "2.0.0-rc.3" +tokio = { version = "1.34.0", features = ["full"] } +async-trait = "0.1.74" +thiserror = "1.0.50" diff --git a/storage_engine/src/lib.rs b/storage_engine/src/lib.rs new file mode 100644 index 0000000..0e2eb79 --- /dev/null +++ b/storage_engine/src/lib.rs @@ -0,0 +1,2 @@ + +pub mod storage_engine; diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs new file mode 100644 index 0000000..fffde93 --- /dev/null +++ b/storage_engine/src/main.rs @@ -0,0 +1,54 @@ +use tokio::sync::{Mutex, RwLock}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{BufReader, BufWriter, AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; +use tokio::fs; + +mod storage_engine; + +use crate::storage_engine::*; + +#[tokio::main] +async fn main() -> Result<(), std::io::Error> { + println!("STOOOOOOOOOOOORAAAAAAAAAAAGE"); + + let blob_name = "blob10.minisql"; + + // WARNING: Number of columns is 5????? + + let mut store = Store::new(blob_name, 5).await.map_err(|e| e.to_io_or_panic())?; + + // let store_bytes = store.get_all_bytes().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", store_bytes); + + let mut buff: Vec = vec![0;1]; + let x = store.file.read_exact(&mut buff[..]).await?; + println!("{:?}", buff); + + + // let entry0: Entry = Entry::new_deleted(vec![1, 2, 3, 4, 5]); + // let entry1: Entry = Entry::new_deleted(vec![200,200,5,6,7]); + // let cursor0 = store.append_entry(&entry0).await.map_err(|e| e.to_io_or_panic())?; + // // println!("cursor0 = {}", cursor0); + + // let cursor1 = store.append_entry(&entry1).await.map_err(|e| e.to_io_or_panic())?; + // println!("cursor0 = {}, cursor1 = {}", cursor0, cursor1); + + // let mut store = Store::connect(blob_name).await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", store); + + // let x = store.entry_at::(16).await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + + // let store_bytes = store.get_all_bytes().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", store_bytes); + + // let mut store = ColumnStore::connect("blob08.minisql").await.map_err(|e| e.to_io_or_panic())?; + // let entry2: StoreEntry = StoreEntry::new_deleted(vec![3, 2, 1]); + // let cursor2 = store.append_entry(&entry2).await.map_err(|e| e.to_io_or_panic())?; + // println!("cursor2 = {}", cursor2); + // println!("{:?}", store); + + + println!("DONE"); + Ok(()) +} diff --git a/storage_engine/src/storage_engine.rs b/storage_engine/src/storage_engine.rs new file mode 100644 index 0000000..802444e --- /dev/null +++ b/storage_engine/src/storage_engine.rs @@ -0,0 +1,436 @@ +use tokio::io::{BufReader, BufWriter, AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; +use tokio::sync::{Mutex, RwLock}; +use tokio::fs::{File, OpenOptions}; +use tokio::fs; + +use bincode; +use bincode::de::Decoder; +use bincode::enc::write::Writer; +use bincode::enc::Encoder; +use bincode::{BorrowDecode, Decode, Encode}; +use bincode::config::{BigEndian, Configuration, Fixint}; + +use std::mem::size_of; + + +// =============Byte encoding/decoding============ +const BIN_CONFIG: Configuration = bincode::config::standard().with_big_endian().with_fixed_int_encoding(); + +fn encode(t: &T) -> Result, bincode::error::EncodeError> { + bincode::encode_to_vec(t, BIN_CONFIG) +} + +fn decode(bytes: &[u8]) -> Result<(T, usize), bincode::error::DecodeError> { + bincode::decode_from_slice(bytes, BIN_CONFIG) +} + +fn encode_vector(ts: &[T]) -> Result, bincode::error::EncodeError> { + let size: usize = ts.len(); + let mut result = encode(&size)?; + for t in ts { + result.append(&mut encode(&t)?); + } + Ok(result) +} + +fn decode_vector(bytes: &[u8]) -> Result, bincode::error::DecodeError> { + let mut offset = size_of::(); + let result_len: usize = decode(&bytes[..offset])?.0; + let mut result: Vec = Vec::with_capacity(result_len); + for _ in 0..result_len { + let (x, bytes_consumed) = decode::(&bytes[offset..])?; + offset += bytes_consumed; + result.push(x); + } + Ok(result) +} + +// We don't care about encoding the length here (since it will be used for a row with known column +// size) +fn encode_sequence(ts: &[T]) -> Result, bincode::error::EncodeError> { + let mut result = vec![]; + for t in ts { + result.append(&mut encode(&t)?); + + } + Ok(result) +} + +fn decode_sequence(len: usize, bytes: &[u8]) -> Result, bincode::error::DecodeError> { + let mut result: Vec = Vec::with_capacity(len); + let mut offset = 0; + for _ in 0..len { + let (x, bytes_consumed) = decode::(&bytes[offset..])?; + offset += bytes_consumed; + result.push(x); + } + Ok(result) +} + + +fn example_encoding_decoding() { + let xs: Vec = vec![123, 250, 256, 123, 123, 123]; + let xs: Vec = vec![]; + let xs: Vec = vec![123]; + let xs: Vec = vec![123, 250]; + + let xs: Vec = vec!["foo".to_string(), "bar".to_string()]; + + + println!("original {:?}", xs); + let exs = encode_vector(&xs[..]).unwrap(); + println!("encoded {:?}", exs); + + // WARNING: Don't forget to specify the type here + // let dxs = decode_vector::(&exs[..]).unwrap(); + let dxs = decode_vector::(&exs[..]).unwrap(); + println!("decoded {:?}", dxs); +} + +// ============Column Store=============== + +// ColumnStore +#[derive(Debug)] +pub struct Store { + column_file_name: String, + // TODO: This should be private + pub file: File, + header: StoreHeader + // meta + // location of rows file + // locations of index files + // + // rows file + // list +} + +// These type aliases are here because they make writing decoders easier. +type NumberOfColumns = usize; +type DeletedCount = usize; +#[derive(Debug)] +pub struct StoreHeader { + number_of_columns: NumberOfColumns, + deleted_count: DeletedCount, +} + +#[derive(Debug)] +pub struct Entry { + is_deleted: bool, + // file_position: FilePosition, + data: Vec, +} + +#[derive(Debug)] +pub enum Error { + DecodeError(DecodeErrorKind, bincode::error::DecodeError), + EncodeError(bincode::error::EncodeError), + IoError(std::io::Error), + InvalidStoreHeader, +} + +#[derive(Debug)] +pub enum DecodeErrorKind { + StoreHeaderNumberOfColumns, + StoreHeaderDeletedCount, + EntryData, + EntryIsDeleted, + EntryDataSize +} + +// ===Errors=== +impl Error { + pub fn to_io_or_panic(self) -> std::io::Error { + use Error::*; + match self { + IoError(err) => err, + err => { + println!("{:?}", err); + panic!(); + } + } + } +} + +impl From for Error { + fn from(err: bincode::error::EncodeError) -> Self { + Self::EncodeError(err) + } +} + +impl From for Error { + fn from(err: std::io::Error) -> Self { + Self::IoError(err) + } +} + + +// ====Entry==== +impl Entry { + pub fn new(data: Vec) -> Self { + Self { is_deleted: false, data } + } + + pub fn new_deleted(data: Vec) -> Self { + Self { is_deleted: true, data } + } + + // FORMAT: [HEADER, ..sequence of data] + // HEADER: [Boolean (one byte), number of bytes in the data (not including the boolean)] + fn encode(self: &Entry) -> Result, Error> + where T: Encode + { + let mut result: Vec = encode(&self.is_deleted)?; // bool 1 byte + let mut encoded_data = encode_sequence(&self.data[..])?; + let encoded_data_len = encoded_data.len(); + result.append(&mut encode(&encoded_data_len)?); // usize 8 bytes + println!("enc data len == {}", encoded_data_len); + println!("encoded_data == {:?} ", encoded_data); + result.append(&mut encoded_data); // data variable size + Ok(result) + } + + // in bytes + pub fn header_size() -> usize { + size_of::() + size_of::() + } + + // TODO: Maybe introduce an EntryHeader as a separate type? + pub fn decode_header(header_bytes: Vec) -> Result<(bool, usize), Error> { + let (is_deleted, offset) = + decode::(&header_bytes) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; + + let (data_size, _) = + decode::(&header_bytes[offset..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryDataSize, e))?; + Ok((is_deleted, data_size)) + } +} + +pub type Column = u64; +pub type Cursor = u64; + +impl StoreHeader { + fn encode(&self) -> Result, Error> { + // FORMAT: First Number of Columns, Then Deleted Count. + let mut result = encode(&self.number_of_columns)?; + result.append(&mut encode(&self.deleted_count)?); + Ok(result) + } + + async fn decode(file: &mut File) -> Result { + let number_of_columns_size = size_of::(); + let deleted_count_size = size_of::(); + let header_size: usize = number_of_columns_size + deleted_count_size; + let mut header_bytes: Vec = vec![0; header_size]; + // TODO: Why do we need to have mutable reference for the file when we are reading it? + match file.read_exact(&mut header_bytes).await { + Ok(_) => { + let offset = 0; + let (number_of_columns, offset) = + decode::(&header_bytes[offset..offset + number_of_columns_size]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; + let (deleted_count, _) = + decode::(&header_bytes[offset..offset + deleted_count_size]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; + let header = StoreHeader { + number_of_columns, + deleted_count, + }; + Ok(header) + }, + Err(err) => { + // TODO: When err is of the kind UnexpectedEof, return InvalidStoreHeader + println!("ARE WE HERE?"); + Err(Error::from(err)) + } + } + } +} + + +impl Store { + // For debugging + pub async fn get_all_bytes(mut self) -> Result, Error>{ + let mut bytes: Vec = vec![]; + use std::io::Read; + // for byte in self.file.bytes() { + // } + self.file.read_exact(&mut bytes[..]).await?; + Ok(bytes) + } + + pub async fn new(column_file_name: &str, number_of_columns: usize) -> Result { + let mut file = + OpenOptions::new() + .write(true) + .read(true) + .create_new(true) + .open(column_file_name) + .await?; + + let header = StoreHeader { + number_of_columns, + deleted_count: 0, + } ; + let encoded_header = header.encode()?; + file.write(&encoded_header).await?; + println!("is something being encoded? {:?}", encoded_header); + let store = Self { + column_file_name: column_file_name.to_string(), + file, + header, + }; + Ok(store) + } + + pub async fn connect(column_file_name: &str) -> Result { + let mut file = OpenOptions::new().read(true).write(true).open(column_file_name).await?; + + let header = StoreHeader::decode(&mut file).await?; + Ok(Self { + column_file_name: column_file_name.to_string(), + file, + header + }) + } + + pub async fn entry_at(&mut self, cursor: Cursor) -> Result, Error> { + self.file.seek(SeekFrom::Start(cursor)).await?; + + // 1. read header bytes (fixed number of bytes). + // 2. decode header + // 3. read entry data bytes. + // 4. decode data + // That will tell us how much data there is. + let entry_header_size = Entry::::header_size(); + let mut header_bytes: Vec = vec![0; entry_header_size]; + self.file.read_exact(&mut header_bytes).await?; + + println!("cursor == {}", cursor); + println!("header_bytes == {:?}", header_bytes); + + let (is_deleted, data_size) = Entry::::decode_header(header_bytes)?; + + self.file.seek(SeekFrom::Current(entry_header_size as i64)).await?; + let mut data_bytes: Vec = vec![0; data_size]; + + println!("(is_delted, data_size) = ({}, {})", is_deleted, data_size); + let data = + decode_sequence::(self.header.number_of_columns, &mut data_bytes) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; + Ok(Entry { + is_deleted, + data + }) + } + + pub async fn append_entry(&mut self, entry: &Entry) -> Result { + // On linux when opening a file in append mode, the seek is set to 0 + // and only updated after a write. That's why we do the cursor gymnastic at the end. + let encoded_entry: Vec = entry.encode()?; + println!("encoded_entry == {:?}", encoded_entry); + println!("bool size == {}", size_of::()); + println!("usize size == {}", size_of::()); + self.file.write(&encoded_entry).await?; + let next_cursor: Cursor = self.file.stream_position().await?; + let cursor: Cursor = next_cursor - encoded_entry.len() as u64; + Ok(cursor) + } + + pub async fn iterate_all(&mut self) -> Result { + // Loop through the rows and print them out + todo!() + } +} + +// impl StorageEngine for ColumnStore { +// async fn append(&mut self, id: Index, entry: Row) -> Result + +// async fn get_all(&self) -> ??? +// async fn get_eq(&self, column: Column, value: T) -> ??? + +// async fn delete_all(&mut self) +// async fn delete_eq(&mut self, column: Column, value: T) -> ??? +// } + +// struct Error { +// } + + +// Selected( +// &'a TableSchema, +// ColumnSelection, +// TODO: Don't do the Box(dyn Iterator<...>) +// you'll have a concrete implementation of Iterator, and that's what you'll use +// Box + 'a + Send>, +// ), + + + +// #[async_trait] +// trait StorageEngine +// where T: Encode + Decode +// { +// async fn append(&mut self, id: Index, entry: Row) -> Result + +// async fn get_all(&self) -> ??? +// async fn get_eq(&self, column: Column, value: T) -> ??? + +// async fn delete_all(&mut self) +// async fn delete_eq(&mut self, column: Column, value: T) -> ??? +// } + +// #[cfg(test)] +// mod tests { +// #[test] +// fn hello_test() { +// assert!(true); +// } +// } + +// let sroage_engine = STorageEngine::new("users") +// let mut next_position = 0 + + +// type FilePosition = usize; + + +// type StoreFile = Vec; +// type IndexFile = ??? + +// struct IndexEntry { + +// } + + +// #00000 [false, u26, "Arnold", "schwarzenegger", "gettothechoppa@yahoo.com"] #5120000 [true, u27, "Arnold", "Vosloo", "avosloo@aol.com"] +// #00000 [true, u27, "Arnold", "Vosloo", "avosloo@aol.com"] + + +// at #00000 512 kb deleted, +// ... + + + + +// [(u26, [#00000]), (u27, [#5120000])] +// [("Arnold", [#000000, #5120000]), ("Arnfsdaf", []), ("Adasdsd", []), ("Bdsad", [])] +// // basically always keep indexes in memory and on write always sync on disk + + + + +// CREATE INDEX usersname ON "users" (name); + +// INSERT INTO users (id, name, surname, email) VALUES (u26, "Arnold", "schwarzenegger", "gettothechoppa@yahoo.com"); +// INSERT INTO users (id, name, surname, email) VALUES (u27, "Arnold", "Vosloo", "avosloo@aol.com"); + + +// SELECT * FROM users WHERE id=u26; + +// SELECT * FROM users WHERE name="Arnold"; + + +// SELECT * FROM cars; +// DELETE FROM users WHERE name="Arnold";