use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; use tokio::fs::{File, OpenOptions, DirBuilder}; use std::path::Path; use std::marker::PhantomData; use async_trait::async_trait; use bincode; use bincode::{Decode, Encode}; use crate::binary_coding::{encode, decode, encode_sequence, encode_sequence_with_sizes, decode_sequence}; use tokio::fs; use crate::index::SomethingSupportingLeq; use crate::error::{Error, DecodeErrorKind}; use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader}; use crate::index::Index; use std::mem::size_of; pub type Result = std::result::Result; pub type Column = u64; pub type FilePosition = u64; // TODO: Consider introducing a phantom type for the data that's used in the store. #[derive(Debug)] pub struct Store { // TODO: This needs to track how many read-write cursors there are...? // RWMutex // {write: 0, read: n} ~> {write:0, read: n + 1} // create read // {write: 0, read: n + 1} ~> {write:0, read: n} // destroy read // {write: 0, read: 0} ~> {write: 1, read: 0} // create write // {write: 1, read: 0} ~> {write: 0, read: 0} // destroy write pub table_folder: String, // primary_index: Vec>>, // indexes: Vec>>>, // primary_index: Index, // TODO: It's not good to have StoreHeader copied to all the cursors, since they may modify it. // How to sync? // All pub header: StoreHeader, pub data_type: PhantomData, // meta // location of rows file // locations of index files // // rows file // list } // Read Cursors don't modify the rows nor Store Header. // Write Cursors can modify both rows and Store Header. // Probably should split these into two types. But they will have a lot of functionality in common. pub struct Cursor { header: StoreHeader, file: File, data_type: PhantomData, eof_file_position: FilePosition, } pub enum AccessMode { Read, Write } pub type PositionOfValue = FilePosition; pub type PositionOfRow = FilePosition; #[derive(Debug, Clone)] pub struct StoreHeader { pub number_of_columns: usize, pub deleted_count: usize, pub total_count: usize, pub primary_column: Column, } impl StoreHeader { pub const NUMBER_OF_COLUMNS_SIZE: usize = size_of::(); pub const DELETED_COUNT_SIZE: usize = size_of::(); pub const TOTAL_COUNT_SIZE: usize = size_of::(); pub const PRIMARY_COLUMN_SIZE: usize = size_of::(); pub const SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0; pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE; pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE; } #[derive(Debug)] pub struct EntryHeader { is_deleted: bool, } #[derive(Debug)] pub struct EntryHeaderWithDataSize { pub is_deleted: bool, pub data_sizes: Vec, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6 // bytes etc } impl EntryHeaderWithDataSize { pub const IS_DELETED_OFFSET: usize = 0; pub const IS_DELETED_SIZE: usize = size_of::(); pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE; pub fn size(number_of_columns: usize) -> usize { let size_of_data_sizes: usize = number_of_columns*size_of::(); Self::IS_DELETED_SIZE + size_of_data_sizes } pub fn size_of_data(&self) -> usize{ self.data_sizes.iter().sum() } } #[derive(Debug)] pub struct Entry { header: EntryHeader, data: Vec, } #[derive(Debug)] pub struct EntryDetailed { header: EntryHeaderWithDataSize, data: Vec, } pub struct EntryIterator<'a> { file: &'a mut File, current_file_position: FilePosition } //===Store=== pub async fn store_exists(table_folder: &str) -> Result { Ok(fs::metadata(table_folder).await.is_ok()) } pub async fn less_than_eq(store: &mut Store, file_position0: FilePosition, file_position1: FilePosition) -> Result { todo!() } // pub trait SomethingSupportingLeq { // async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> std::result::Result; // } #[async_trait] impl SomethingSupportingLeq for Store where T: Send { async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> std::result::Result { Ok(true) } } pub const ROWS_FILE_NAME: &'static str = "rows"; impl Store { // ===Creation=== pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result { let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); DirBuilder::new() .create(path_to_table).await?; let mut file: File = OpenOptions::new() .write(true) .read(true) .create_new(true) .open(path_to_rows) .await?; let header = StoreHeader { number_of_columns, deleted_count: 0, total_count: 0, primary_column, }; let encoded_header: Vec = header.encode()?; file.write(&encoded_header).await?; // TODO: indexes // let index: Index = Index::new( // &format!("rows_{}", primary_column.to_string()), // ).await?; let store = Self { table_folder: table_folder.to_string(), header, data_type: PhantomData::, }; Ok(store) } pub async fn connect(table_folder: &str) -> Result where T: std::fmt::Debug { let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); let mut file: File = OpenOptions::new() .read(true) .write(true) .open(path_to_rows) .await?; // Unfortunately we can't yet use store.read_bytes, since it can't be created without the // header. let mut header_bytes = StoreHeader::decode_buffer(); file.read_exact(&mut header_bytes).await?; let header = StoreHeader::decode(&mut header_bytes).await?; let store = Self { table_folder: table_folder.to_string(), header, data_type: PhantomData::, }; Ok(store) } // ===Cursors=== pub async fn read_cursor(&self) -> Result> where T: Send { ReadCursor::new(self).await } pub async fn write_cursor(&mut self) -> Result> where T: Send { WriteCursor::new(self).await } // For debugging. pub async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> where T: Send { let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?; let bytes = cursor.read_all_bytes().await?; Ok(bytes) } } // ===Store Header=== impl StoreHeader { fn encode(&self) -> Result> { let mut result = encode(&self.number_of_columns)?; result.append(&mut encode(&self.deleted_count)?); result.append(&mut encode(&self.total_count)?); result.append(&mut encode(&self.primary_column)?); Ok(result) } fn decode_buffer() -> [u8; StoreHeader::SIZE] { [0; StoreHeader::SIZE] } async fn decode(result: &mut [u8]) -> Result { let (number_of_columns, _) = decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; let (deleted_count, _) = decode::(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; let (total_count, _) = decode::(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?; let (primary_column, _) = decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; let header = StoreHeader { number_of_columns, deleted_count, total_count, primary_column, }; Ok(header) } // returns new count pub fn increment_total_count(&mut self) -> usize { self.total_count += 1; self.total_count } // returns new count pub fn increment_deleted_count(&mut self) -> usize { self.deleted_count += 1; self.deleted_count } } // ====Entry==== impl EntryHeader { fn encode(self: &EntryHeader) -> Result> { let result: Vec = encode(&self.is_deleted)?; Ok(result) } } impl EntryHeaderWithDataSize { pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { let (is_deleted, _) = decode::(&bytes) .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; let data_sizes = decode_sequence::(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..]) .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?; Ok(Self { is_deleted, data_sizes } ) } } impl Entry { pub fn new(data: Vec) -> Self { Self { header: EntryHeader { is_deleted: false }, data } } pub fn new_deleted(data: Vec) -> Self { Self { header: EntryHeader { is_deleted: true}, data } } // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] pub fn encode(&self) -> Result> where T: Encode { let mut result: Vec = self.header.encode()?; let (mut encoded_data, sizes) = encode_sequence_with_sizes(&self.data[..])?; result.append(&mut encode_sequence(&sizes)?); // sizes of data (fixed by number of columns) result.append(&mut encoded_data); // data variable size Ok(result) } } impl EntryDetailed { pub fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result where T: Decode { let data = decode_sequence::(number_of_columns, bytes) .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; Ok(EntryDetailed { header, data }) } } // impl StorageEngine for ColumnStore { // async fn append(&mut self, id: Index, entry: Row) -> Result // async fn get_all(&self) -> ??? // async fn get_eq(&self, column: Column, value: T) -> ??? // async fn delete_all(&mut self) // async fn delete_eq(&mut self, column: Column, value: T) -> ??? // } // struct Error { // } // Selected( // &'a TableSchema, // ColumnSelection, // TODO: Don't do the Box(dyn Iterator<...>) // you'll have a concrete implementation of Iterator, and that's what you'll use // Box + 'a + Send>, // ), // #[async_trait] // trait StorageEngine // where T: Encode + Decode // { // async fn append(&mut self, id: Index, entry: Row) -> Result // async fn get_all(&self) -> ??? // async fn get_eq(&self, column: Column, value: T) -> ??? // async fn delete_all(&mut self) // async fn delete_eq(&mut self, column: Column, value: T) -> ??? // } // #[cfg(test)] // mod tests { // #[test] // fn hello_test() { // assert!(true); // } // } // let sroage_engine = STorageEngine::new("users") // let mut next_position = 0 // type FilePosition = usize; // type StoreFile = Vec; // type IndexFile = ??? // struct IndexEntry { // } // #00000 [false, u26, "Arnold", "schwarzenegger", "gettothechoppa@yahoo.com"] #5120000 [true, u27, "Arnold", "Vosloo", "avosloo@aol.com"] // #00000 [true, u27, "Arnold", "Vosloo", "avosloo@aol.com"] // at #00000 512 kb deleted, // ... // [(u26, [#00000]), (u27, [#5120000])] // [("Arnold", [#000000, #5120000]), ("Arnfsdaf", []), ("Adasdsd", []), ("Bdsad", [])] // // basically always keep indexes in memory and on write always sync on disk // CREATE INDEX usersname ON "users" (name); // INSERT INTO users (id, name, surname, email) VALUES (u26, "Arnold", "schwarzenegger", "gettothechoppa@yahoo.com"); // INSERT INTO users (id, name, surname, email) VALUES (u27, "Arnold", "Vosloo", "avosloo@aol.com"); // SELECT * FROM users WHERE id=u26; // SELECT * FROM users WHERE name="Arnold"; // SELECT * FROM cars; // DELETE FROM users WHERE name="Arnold";