use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::fs::{File, OpenOptions, DirBuilder}; use tokio::fs; use std::path::{Path, PathBuf}; use bincode::{Decode, Encode}; use crate::error::Error; use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader}; use crate::segments::store_header::StoreHeader; use crate::index::Index; pub type Result = std::result::Result; pub type Column = u64; pub type FilePosition = u64; // TODO: Consider adding another type parameter for indexable values #[derive(Debug)] pub struct Store { pub header: StoreHeader, pub indexes: StoreIndexes, } pub type StoreIndexes = Vec>>; //===Store=== pub async fn store_exists(table_folder: &str) -> Result { Ok(fs::metadata(table_folder).await.is_ok()) } pub const ROWS_FILE_NAME: &'static str = "rows"; pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate"; impl Store { // ===Creation=== pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result where T: Encode + Decode + Ord { let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); DirBuilder::new() .create(path_to_table).await?; let header = { let mut indexed_columns = vec![false; number_of_columns]; indexed_columns[primary_column as usize] = true; StoreHeader { table_folder: table_folder.to_string(), number_of_columns, deleted_count: 0, total_count: 0, primary_column, indexed_columns, } }; // We don't need the file right now. Only cursors will later open it. Self::create_empty_rows_file(path_to_rows, &header).await?; let indexes: StoreIndexes = Self::create_initial_indexes(&header).await?; let store = Self { header, indexes, }; Ok(store) } pub fn path_to_index_file(header: &StoreHeader, column: Column) -> PathBuf { let path_to_table = Path::new(&header.table_folder); let path_to_index = path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column.to_string())); path_to_index } pub async fn create_empty_index_at(header: &StoreHeader, column: Column) -> Result> where T: Encode + Decode + Ord { let path_to_index = Self::path_to_index_file(&header, column); let index = Index::new(path_to_index).await?; Ok(index) } pub async fn create_initial_indexes(header: &StoreHeader) -> Result> where T: Encode + Decode + Ord { let mut result: StoreIndexes = Vec::with_capacity(header.number_of_columns); for _ in 0..header.number_of_columns { result.push(None) } result[header.primary_column as usize] = Some(Self::create_empty_index_at(&header, header.primary_column).await?); Ok(result) } pub async fn connect_index_at(header: &StoreHeader, column: Column) -> Result> where T: Encode + Decode + Ord { let path_to_index = Self::path_to_index_file(&header, column); let index: Index = Index::connect(path_to_index).await?; Ok(index) } pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result { let mut file: File = OpenOptions::new() .write(true) .read(true) .create_new(true) .open(path_to_rows) .await?; let encoded_header: Vec = header.encode()?; file.write(&encoded_header).await?; Ok(file) } pub async fn connect(table_folder: &str) -> Result where T: std::fmt::Debug + Encode + Decode + Ord { let path_to_table = Path::new(table_folder); let path_to_rows = path_to_table.join(ROWS_FILE_NAME); let mut file: File = OpenOptions::new() .read(true) .write(true) .open(path_to_rows) .await?; // Unfortunately we can't yet use store.read_bytes, since it can't be created without the // header. let header = { let mut fixed_header_bytes = StoreHeader::buffer_for_fixed_decoding(); file.read_exact(&mut fixed_header_bytes).await?; let fixed_header = StoreHeader::decode_fixed(table_folder, &fixed_header_bytes).await?; // decode the indexes let mut rest_bytes: Vec = StoreHeader::buffer_for_rest_decoding(&fixed_header); file.read_exact(&mut rest_bytes).await?; StoreHeader::decode_rest(fixed_header, &rest_bytes).await? }; let indexes: StoreIndexes = { let mut result = Vec::with_capacity(header.number_of_columns); for (column, &is_indexed) in header.indexed_columns.iter().enumerate() { if is_indexed { result.push(Some(Self::connect_index_at(&header, column as Column).await?)) } else { result.push(None) } } result }; let store = Self { header, indexes }; Ok(store) } // ===Cursors=== pub async fn read_cursor(&self) -> Result> where T: Send + Sync { ReadCursor::new(self).await } pub async fn write_cursor(&mut self) -> Result> where T: Send + Sync { WriteCursor::new(self).await } // ===Indexes=== pub async fn attach_index(&mut self, column: Column) -> Result<()> where T: Ord + Decode + Encode + Send + Sync { if self.header.is_column_indexed(column) { Err(Error::ColumnAlreadyIndexed(column)) } else { let mut cursor = self.write_cursor().await?; cursor.attach_index(column).await } } // For debugging. #[allow(dead_code)] pub async fn read_all_bytes(&mut self) -> std::result::Result, std::io::Error> where T: Send + Sync { let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?; let bytes = cursor.read_all_bytes().await?; Ok(bytes) } } #[cfg(test)] mod tests { use super::*; use crate::segments::entry::{Entry, EntryDetailed}; use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, CursorWithWriteAccessToIndex, CursorWithAccessToIndex}; impl Drop for Store { fn drop(&mut self) { println!("DROPPING TEST FOLDER"); let table_folder = self.header.table_folder.clone(); // Seems no one has figured out how to do AsyncDrop yet. std::fs::remove_dir_all(table_folder).unwrap(); } } #[tokio::test] async fn test_create() { type Data = u32; let table_path = "test_table_0"; let number_of_columns = 5; let primary_column = 0; let store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); assert!(store.header.number_of_columns == number_of_columns); assert!(store.header.total_count == 0); assert!(store.header.deleted_count == 0); assert!(store.header.primary_column == primary_column); } #[tokio::test] async fn test_insert() { type Data = u32; let table_path = "test_table_1"; let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); { let mut cursor = store.write_cursor().await.unwrap(); let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); cursor.insert_entry(entry0).await.unwrap(); let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); cursor.insert_entry(entry1).await.unwrap(); assert!(store.header.total_count == 2); } } #[tokio::test] async fn test_select_next() { type Data = u32; let table_path = "test_table_2"; let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); { let mut cursor = store.write_cursor().await.unwrap(); let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); cursor.insert_entry(entry0).await.unwrap(); let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); cursor.insert_entry(entry1).await.unwrap(); assert!(store.header.total_count == 2); } { let mut cursor = store.read_cursor().await.unwrap(); let entry0 = cursor.next().await.unwrap().unwrap(); let entry1 = cursor.next().await.unwrap().unwrap(); assert!(entry0.data == vec![1,2,3,4,5]); assert!(entry1.data == vec![6,7,8,9,10]); } } #[tokio::test] async fn test_select_all() { type Data = u32; let table_path = "test_table_3"; let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); { let mut cursor = store.write_cursor().await.unwrap(); let entry0: Entry = Entry::new(vec![1, 2, 3, 4, 5]); cursor.insert_entry(entry0).await.unwrap(); let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); cursor.insert_entry(entry1).await.unwrap(); assert!(store.header.total_count == 2); } { let mut cursor = store.read_cursor().await.unwrap(); let mut entries = vec![]; while let Some(entry) = cursor.next().await.unwrap() { entries.push(entry) } assert!(entries.len() == 2); assert!(entries[0].data == vec![1,2,3,4,5]); assert!(entries[1].data == vec![6,7,8,9,10]); } } #[tokio::test] async fn test_select_eq() { type Data = u32; let table_path = "test_table_4"; let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let value = 200; { let mut cursor = store.write_cursor().await.unwrap(); let entry0: Entry = Entry::new(vec![1, value, 3, 4, 5]); cursor.insert_entry(entry0).await.unwrap(); let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); cursor.insert_entry(entry1).await.unwrap(); let entry2: Entry = Entry::new(vec![11, 2, 10, 10, 10]); cursor.insert_entry(entry2).await.unwrap(); let entry3: Entry = Entry::new(vec![1, value, 100, 50, 40]); cursor.insert_entry(entry3).await.unwrap(); assert!(store.header.total_count == 4); } { let mut cursor = store.read_cursor().await.unwrap(); let column = 1; let entries = cursor.select_entries_where_eq(column, &value).await.unwrap(); assert!(entries.len() == 2); assert!(entries[0].data == vec![1, value, 3, 4, 5]); assert!(entries[1].data == vec![1, value, 100, 50, 40]); } } #[tokio::test] async fn test_select_eq_indexed() { type Data = u32; let table_path = "test_table_5"; let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let column: Column = 1; assert!(store.indexes[column as usize].is_none()); store.attach_index(column).await.unwrap(); assert!(store.indexes[column as usize].is_some()); let value = 200; { let mut cursor = store.write_cursor().await.unwrap(); let entry0: Entry = Entry::new(vec![1, value, 3, 4, 5]); cursor.insert_entry(entry0).await.unwrap(); let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); cursor.insert_entry(entry1).await.unwrap(); let entry2: Entry = Entry::new(vec![11, 2, 10, 10, 10]); cursor.insert_entry(entry2).await.unwrap(); let entry3: Entry = Entry::new(vec![1, value, 100, 50, 40]); cursor.insert_entry(entry3).await.unwrap(); assert!(store.header.total_count == 4); } { let mut cursor = store.read_cursor().await.unwrap(); let column = 1; let entries = cursor.select_entries_where_eq(column, &value).await.unwrap(); assert!(entries.len() == 2); // Order may be non-deterministic. assert!(entries[0].data[column as usize] == value); assert!(entries[1].data[column as usize] == value); } } #[tokio::test] async fn test_delete_entry() { type Data = u32; let table_path = "test_table_6"; let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let value = 200; let (_file_position0, file_position1, _file_position2, _file_position3) = { let mut cursor = store.write_cursor().await.unwrap(); let entry0: Entry = Entry::new(vec![1, value, 3, 4, 5]); let file_position0 = cursor.insert_entry(entry0).await.unwrap(); let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); let file_position1 = cursor.insert_entry(entry1).await.unwrap(); let entry2: Entry = Entry::new(vec![11, 2, 10, 10, 10]); let file_position2 = cursor.insert_entry(entry2).await.unwrap(); let entry3: Entry = Entry::new(vec![1, value, 100, 50, 40]); let file_position3 = cursor.insert_entry(entry3).await.unwrap(); assert!(store.header.total_count == 4); (file_position0, file_position1, file_position2, file_position3) }; { assert!(store.header.deleted_count == 0); let mut cursor = store.write_cursor().await.unwrap(); cursor.mark_deleted_at(file_position1, false).await.unwrap(); assert!(store.header.deleted_count == 1); } } #[tokio::test] async fn test_delete_where_eq() { type Data = u32; let table_path = "test_table_7"; let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let column: Column = 1; assert!(store.indexes[column as usize].is_none()); store.attach_index(column).await.unwrap(); assert!(store.indexes[column as usize].is_some()); let value = 200; let (_file_position0, _file_position1, _file_position2, _file_position3) = { let mut cursor = store.write_cursor().await.unwrap(); let entry0: Entry = Entry::new(vec![1, value, 3, 4, 5]); let file_position0 = cursor.insert_entry(entry0).await.unwrap(); let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); let file_position1 = cursor.insert_entry(entry1).await.unwrap(); let entry2: Entry = Entry::new(vec![11, 2, 10, 10, 10]); let file_position2 = cursor.insert_entry(entry2).await.unwrap(); let entry3: Entry = Entry::new(vec![1, value, 100, 50, 40]); let file_position3 = cursor.insert_entry(entry3).await.unwrap(); assert!(store.header.total_count == 4); (file_position0, file_position1, file_position2, file_position3) }; { assert!(store.header.deleted_count == 0); let mut cursor = store.write_cursor().await.unwrap(); cursor.delete_entries_where_eq(column, &value, false).await.unwrap(); assert!(store.header.deleted_count == 2); } } #[tokio::test] async fn test_garbage_collection() { type Data = u32; let table_path = "test_table_8"; let number_of_columns = 5; let primary_column = 0; let mut store: Store = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let column: Column = 1; assert!(store.indexes[column as usize].is_none()); store.attach_index(column).await.unwrap(); assert!(store.indexes[column as usize].is_some()); let value = 200; let (_file_position0, _file_position1, _file_position2, _file_position3) = { let mut cursor = store.write_cursor().await.unwrap(); let entry0: Entry = Entry::new(vec![1, value, 3, 4, 5]); let file_position0 = cursor.insert_entry(entry0).await.unwrap(); let entry1: Entry = Entry::new(vec![6, 7, 8, 9, 10]); let file_position1 = cursor.insert_entry(entry1).await.unwrap(); let entry2: Entry = Entry::new(vec![11, 2, 10, 10, 10]); let file_position2 = cursor.insert_entry(entry2).await.unwrap(); let entry3: Entry = Entry::new(vec![1, value, 100, 50, 40]); let file_position3 = cursor.insert_entry(entry3).await.unwrap(); assert!(store.header.total_count == 4); (file_position0, file_position1, file_position2, file_position3) }; { assert!(store.header.deleted_count == 0); let mut cursor = store.write_cursor().await.unwrap(); cursor.delete_entries_where_eq(column, &value, false).await.unwrap(); assert!(cursor.header().deleted_count == 2); assert!(cursor.header().total_count == 4); cursor.initiate_garbage_collection().await.unwrap(); assert!(cursor.header().deleted_count == 0); assert!(cursor.header().total_count == 2); } } }