use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; use tokio::fs::{File, OpenOptions, DirBuilder}; use std::path::Path; use std::marker::PhantomData; use async_trait::async_trait; use bincode; use bincode::{Decode, Encode}; use crate::binary_coding::{encode, decode, encode_sequence, encode_sequence_with_sizes, decode_sequence}; use tokio::fs; use crate::index::SomethingSupportingLeq; use crate::error::{Error, DecodeErrorKind}; use crate::index::Index; use std::mem::size_of; pub type Result = std::result::Result; pub type Column = u64; pub type FilePosition = u64; // TODO: Consider introducing a phantom type for the data that's used in the store. #[derive(Debug)] pub struct Store { // TODO: This needs to track how many read-write cursors there are...? // RWMutex // {write: 0, read: n} ~> {write:0, read: n + 1} // create read // {write: 0, read: n + 1} ~> {write:0, read: n} // destroy read // {write: 0, read: 0} ~> {write: 1, read: 0} // create write // {write: 1, read: 0} ~> {write: 0, read: 0} // destroy write pub table_folder: String, // primary_index: Vec>>, // indexes: Vec>>>, // primary_index: Index, // TODO: It's not good to have StoreHeader copied to all the cursors, since they may modify it. // How to sync? // All pub header: StoreHeader, pub data_type: PhantomData, // meta // location of rows file // locations of index files // // rows file // list } // Read Cursors don't modify the rows nor Store Header. // Write Cursors can modify both rows and Store Header. // Probably should split these into two types. But they will have a lot of functionality in common. pub struct Cursor { header: StoreHeader, file: File, data_type: PhantomData, eof_file_position: FilePosition, } pub enum AccessMode { Read, Write } pub type PositionOfValue = FilePosition; pub type PositionOfRow = FilePosition; #[derive(Debug, Clone)] pub struct StoreHeader { pub number_of_columns: usize, pub deleted_count: usize, pub total_count: usize, pub primary_column: Column, } impl StoreHeader { pub const NUMBER_OF_COLUMNS_SIZE: usize = size_of::(); pub const DELETED_COUNT_SIZE: usize = size_of::(); pub const TOTAL_COUNT_SIZE: usize = size_of::(); pub const PRIMARY_COLUMN_SIZE: usize = size_of::(); pub const SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0; pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE; pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE; } #[derive(Debug)] pub struct EntryHeader { is_deleted: bool, } #[derive(Debug)] pub struct EntryHeaderWithDataSize { pub is_deleted: bool, pub data_sizes: Vec, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6 // bytes etc } impl EntryHeaderWithDataSize { pub const IS_DELETED_OFFSET: usize = 0; pub const IS_DELETED_SIZE: usize = size_of::(); pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE; pub fn size(number_of_columns: usize) -> usize { let size_of_data_sizes: usize = number_of_columns*size_of::(); Self::IS_DELETED_SIZE + size_of_data_sizes } pub fn size_of_data(&self) -> usize{ self.data_sizes.iter().sum() } } #[derive(Debug)] pub struct Entry { header: EntryHeader, data: Vec, } #[derive(Debug)] pub struct EntryDetailed { header: EntryHeaderWithDataSize, data: Vec, } pub struct EntryIterator<'a> { file: &'a mut File, current_file_position: FilePosition } //===Store=== pub async fn store_exists(table_folder: &str) -> Result { Ok(fs::metadata(table_folder).await.is_ok()) } pub async fn less_than_eq(store: &mut Store, file_position0: FilePosition, file_position1: FilePosition) -> Result { todo!() } // pub trait SomethingSupportingLeq { // async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> std::result::Result; // } #[async_trait] impl SomethingSupportingLeq for Store where T: Send { async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> std::result::Result { Ok(true) } } pub const ROWS_FILE_NAME: &'static str = "rows"; impl Store { // For debugging. // Moves file cursor to the end. pub async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error>{ let mut bytes: Vec = vec![]; let mut cursor = self.cursor(AccessMode::Read).await.map_err(|e| e.to_io_or_panic())?; cursor.file.read_to_end(&mut bytes).await?; Ok(bytes) } // ===Creation=== pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result { let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); DirBuilder::new() .create(path_to_table).await?; let mut file: File = OpenOptions::new() .write(true) .read(true) .create_new(true) .open(path_to_rows) .await?; let header = StoreHeader { number_of_columns, deleted_count: 0, total_count: 0, primary_column, }; let encoded_header: Vec = header.encode()?; file.write(&encoded_header).await?; // TODO: indexes // let index: Index = Index::new( // &format!("rows_{}", primary_column.to_string()), // ).await?; let store = Self { table_folder: table_folder.to_string(), header, data_type: PhantomData::, }; Ok(store) } pub async fn connect(table_folder: &str) -> Result where T: std::fmt::Debug { let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); let mut file: File = OpenOptions::new() .read(true) .write(true) .open(path_to_rows) .await?; // Unfortunately we can't yet use store.read_bytes, since it can't be created without the // header. let mut header_bytes = StoreHeader::decode_buffer(); file.read_exact(&mut header_bytes).await?; let header = StoreHeader::decode(&mut header_bytes).await?; let store = Self { table_folder: table_folder.to_string(), header, data_type: PhantomData::, }; Ok(store) } pub async fn cursor(&self, mode: AccessMode) -> Result> { Cursor::new(&self, mode).await } pub async fn garbage_collect(&mut self) -> Result<()> { todo!() } } // ===Store Header=== impl StoreHeader { fn encode(&self) -> Result> { let mut result = encode(&self.number_of_columns)?; result.append(&mut encode(&self.deleted_count)?); result.append(&mut encode(&self.total_count)?); result.append(&mut encode(&self.primary_column)?); Ok(result) } fn decode_buffer() -> [u8; StoreHeader::SIZE] { [0; StoreHeader::SIZE] } async fn decode(result: &mut [u8]) -> Result { let (number_of_columns, _) = decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; let (deleted_count, _) = decode::(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; let (total_count, _) = decode::(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?; let (primary_column, _) = decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; let header = StoreHeader { number_of_columns, deleted_count, total_count, primary_column, }; Ok(header) } // returns new count pub fn increment_total_count(&mut self) -> usize { self.total_count += 1; self.total_count } // returns new count pub fn increment_deleted_count(&mut self) -> usize { self.deleted_count += 1; self.deleted_count } } // ====Entry==== impl EntryHeader { fn encode(self: &EntryHeader) -> Result> { let result: Vec = encode(&self.is_deleted)?; Ok(result) } } impl EntryHeaderWithDataSize { pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { let (is_deleted, _) = decode::(&bytes) .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; let data_sizes = decode_sequence::(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..]) .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?; Ok(Self { is_deleted, data_sizes } ) } } impl Entry { pub fn new(data: Vec) -> Self { Self { header: EntryHeader { is_deleted: false }, data } } pub fn new_deleted(data: Vec) -> Self { Self { header: EntryHeader { is_deleted: true}, data } } // FORMAT: [EntryHeaderWithDataSize, ..sequence of data] pub fn encode(&self) -> Result> where T: Encode { let mut result: Vec = self.header.encode()?; let (mut encoded_data, sizes) = encode_sequence_with_sizes(&self.data[..])?; result.append(&mut encode_sequence(&sizes)?); // sizes of data (fixed by number of columns) result.append(&mut encoded_data); // data variable size Ok(result) } } impl EntryDetailed { pub fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result where T: Decode { let data = decode_sequence::(number_of_columns, bytes) .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; Ok(EntryDetailed { header, data }) } } //=================Cursor================== impl Cursor { pub async fn new(store: &Store, mode: AccessMode) -> Result { let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME); let file: File = match mode { AccessMode::Read => OpenOptions::new() .read(true) .open(path_to_rows) .await?, AccessMode::Write => OpenOptions::new() .read(true) .write(true) .open(path_to_rows) .await?, }; let mut cursor = Self { header: store.header.clone(), file, data_type: store.data_type, eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data }; cursor.seek_to_start_of_data().await?; Ok(cursor) } //===primitive file operations=== // Moves the file cursor right. async fn write_bytes(&mut self, bytes: &[u8]) -> Result { Ok(self.file.write(bytes).await?) } // Moves the file cursor right. async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> { self.file.read_exact(bytes).await?; Ok(()) } // Moves the file cursor right. async fn get_bytes(&mut self, count: usize) -> Result> { let mut result: Vec = Vec::with_capacity(count); self.read_bytes(&mut result).await?; Ok(result) } pub async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> { self.file.seek(SeekFrom::Start(file_position)).await?; Ok(()) } async fn seek_to_start(&mut self) -> Result<()> { self.file.seek(SeekFrom::Start(0)).await?; Ok(()) } async fn seek_to_end(&mut self) -> Result<()> { self.file.seek(SeekFrom::End(0)).await?; Ok(()) } async fn seek_to_start_of_data(&mut self) -> Result<()> { self.seek_to(StoreHeader::SIZE as u64).await } pub async fn current_file_position(&mut self) -> Result { let next_file_position: FilePosition = self.file.stream_position().await?; Ok(next_file_position) } async fn is_at_eof(&mut self) -> Result { Ok(self.current_file_position().await? == self.eof_file_position) } pub async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> Result { todo!() } // ===Iteration=== // Assumes that the current file position is at a valid entry or EOF. pub async fn next(&mut self) -> Result>> where T: Decode { if self.is_at_eof().await? { return Ok(None) } let header = self.read_entry_header().await?; let mut data_bytes: Vec = vec![0; header.size_of_data()]; self.read_bytes(&mut data_bytes).await?; let entry: EntryDetailed = EntryDetailed::decode(header, self.header.number_of_columns, &mut data_bytes)?; Ok(Some(entry)) } // ===Store Header Manipulation=== async fn increment_total_count(&mut self) -> Result<()> { self.seek_to_start().await?; self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; let new_count = self.header.increment_total_count(); self.write_bytes(&encode::(&new_count)?).await?; Ok(()) } async fn increment_deleted_count(&mut self) -> Result<()> { self.seek_to_start().await?; self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; let new_count = self.header.increment_deleted_count(); self.write_bytes(&encode::(&new_count)?).await?; Ok(()) } // ===Entry Header Manipulation=== // assumes we are at the start of the valid entry. async fn set_entry_is_deleted_to(&mut self, is_deleted: bool) -> Result<()> { self.seek_to(EntryHeaderWithDataSize::IS_DELETED_OFFSET as u64).await?; self.write_bytes(&encode::(&is_deleted)?).await?; Ok(()) } // ===Append Entry=== // Moves cursor to the end. // Returns file position to the start of the new entry. pub async fn append_entry(&mut self, entry: &Entry) -> Result where T: Encode { self.increment_total_count().await?; let encoded_entry: Vec = entry.encode()?; self.seek_to_end().await?; let file_position: FilePosition = self.current_file_position().await?; self.write_bytes(&encoded_entry).await?; let eof_file_position: FilePosition = self.current_file_position().await?; self.eof_file_position = eof_file_position; Ok(file_position) } // ===Deletion=== pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> { self.seek_to(file_position).await?; let entry_header = self.read_entry_header().await?; if entry_header.is_deleted { Ok(()) } else { self.increment_deleted_count().await?; self.seek_to(file_position).await?; self.set_entry_is_deleted_to(true).await?; self.attempt_garbage_collection_if_necessary().await?; Ok(()) } } async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> { // TODO: What should be the policy? Counting size of garbage? Counting how many entries are // garbage? if self.header.deleted_count > 100 { todo!() } else { Ok(()) } } // ===Lookup=== // WARNING: The cursor has to be at the start of an entry. Otherwise garbage data will be // decoded as an entry. async fn read_entry_header(&mut self) -> Result { let number_of_columns: usize = self.header.number_of_columns; let mut header_bytes: Vec = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; self.read_bytes(&mut header_bytes).await?; let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?; // TODO: Get rid of the println's // println!("HEADER_BYTES: {:?}", header_bytes); // println!("HEADER: {:?}", header); Ok(header) } pub async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result { self.seek_to(file_position).await?; self.read_entry_header().await } pub async fn search_for(&mut self, index: T) -> Result<()> where T: Send { // let index = self.primary_index.borrow_mut(); // let x = index.lookup(self, 123).await?; todo!() } // Returns None when file_positoin == eof_file_position pub async fn read_entry_at(&mut self, file_position: FilePosition) -> Result>> where T: Decode { self.seek_to(file_position).await?; self.next().await } // TODO: This needs to be some sort of an iterator // pub async fn entries() -> EntryIterator { // todo!() // } pub async fn read_entries(&mut self) -> Result<()> where T: Decode + std::fmt::Debug { self.seek_to_start_of_data().await?; let mut file_position: FilePosition = self.current_file_position().await?; loop { match self.read_entry_at(file_position).await? { Some(entry) => { println!("{:?}", entry); file_position = self.current_file_position().await?; }, None => { println!("END of entries."); return Ok(()) } } } } pub async fn search_for_entry_with_id(&mut self, id: T) -> Result>> { // TODO: make call to the primary index todo!() } // TODO: This needs to be some sort of an iterator pub async fn get_all_eq(&self, column: Column, value: T) -> Result>> { todo!() } } // impl StorageEngine for ColumnStore { // async fn append(&mut self, id: Index, entry: Row) -> Result // async fn get_all(&self) -> ??? // async fn get_eq(&self, column: Column, value: T) -> ??? // async fn delete_all(&mut self) // async fn delete_eq(&mut self, column: Column, value: T) -> ??? // } // struct Error { // } // Selected( // &'a TableSchema, // ColumnSelection, // TODO: Don't do the Box(dyn Iterator<...>) // you'll have a concrete implementation of Iterator, and that's what you'll use // Box + 'a + Send>, // ), // #[async_trait] // trait StorageEngine // where T: Encode + Decode // { // async fn append(&mut self, id: Index, entry: Row) -> Result // async fn get_all(&self) -> ??? // async fn get_eq(&self, column: Column, value: T) -> ??? // async fn delete_all(&mut self) // async fn delete_eq(&mut self, column: Column, value: T) -> ??? // } // #[cfg(test)] // mod tests { // #[test] // fn hello_test() { // assert!(true); // } // } // let sroage_engine = STorageEngine::new("users") // let mut next_position = 0 // type FilePosition = usize; // type StoreFile = Vec; // type IndexFile = ??? // struct IndexEntry { // } // #00000 [false, u26, "Arnold", "schwarzenegger", "gettothechoppa@yahoo.com"] #5120000 [true, u27, "Arnold", "Vosloo", "avosloo@aol.com"] // #00000 [true, u27, "Arnold", "Vosloo", "avosloo@aol.com"] // at #00000 512 kb deleted, // ... // [(u26, [#00000]), (u27, [#5120000])] // [("Arnold", [#000000, #5120000]), ("Arnfsdaf", []), ("Adasdsd", []), ("Bdsad", [])] // // basically always keep indexes in memory and on write always sync on disk // CREATE INDEX usersname ON "users" (name); // INSERT INTO users (id, name, surname, email) VALUES (u26, "Arnold", "schwarzenegger", "gettothechoppa@yahoo.com"); // INSERT INTO users (id, name, surname, email) VALUES (u27, "Arnold", "Vosloo", "avosloo@aol.com"); // SELECT * FROM users WHERE id=u26; // SELECT * FROM users WHERE name="Arnold"; // SELECT * FROM cars; // DELETE FROM users WHERE name="Arnold";