Renaming storage_engine.rs ~> store.rs
This commit is contained in:
parent
18b8049958
commit
e0d08e758a
13 changed files with 92 additions and 13 deletions
|
|
@ -1,544 +0,0 @@
|
|||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::fs::{File, OpenOptions, DirBuilder};
|
||||
use tokio::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use bincode::{Decode, Encode};
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::cursor::{ReadCursor, WriteCursor};
|
||||
use crate::cursor_capabilities::header_access::CursorCanReadHeader;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::index::Index;
|
||||
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub type Column = u64;
|
||||
pub type FilePosition = u64;
|
||||
|
||||
// TODO: Consider adding another type parameter for indexable values
|
||||
#[derive(Debug)]
|
||||
pub struct Store<T> {
|
||||
pub header: StoreHeader,
|
||||
pub indexes: StoreIndexes<T>,
|
||||
}
|
||||
|
||||
pub type StoreIndexes<T> = Vec<Option<Index<T, FilePosition>>>;
|
||||
|
||||
|
||||
//===Store===
|
||||
pub async fn store_exists(table_folder: &str) -> Result<bool> {
|
||||
Ok(fs::metadata(table_folder).await.is_ok())
|
||||
}
|
||||
|
||||
pub const ROWS_FILE_NAME: &'static str = "rows";
|
||||
pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate";
|
||||
|
||||
impl <T>Store<T> {
|
||||
// ===Creation===
|
||||
pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result<Self>
|
||||
where T: Encode + Decode + Ord
|
||||
{
|
||||
let path_to_table = Path::new(table_folder);
|
||||
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||
DirBuilder::new()
|
||||
.create(path_to_table).await?;
|
||||
|
||||
let header = {
|
||||
let mut indexed_columns = vec![false; number_of_columns];
|
||||
indexed_columns[primary_column as usize] = true;
|
||||
StoreHeader {
|
||||
table_folder: table_folder.to_string(),
|
||||
number_of_columns,
|
||||
deleted_count: 0,
|
||||
total_count: 0,
|
||||
primary_column,
|
||||
indexed_columns,
|
||||
}
|
||||
};
|
||||
|
||||
// We don't need the file right now. Only cursors will later open it.
|
||||
Self::create_empty_rows_file(path_to_rows, &header).await?;
|
||||
|
||||
let indexes: StoreIndexes<T> = Self::create_initial_indexes(&header).await?;
|
||||
|
||||
let store = Self {
|
||||
header,
|
||||
indexes,
|
||||
};
|
||||
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
pub fn path_to_index_file(header: &StoreHeader, column: Column) -> PathBuf {
|
||||
let path_to_table = Path::new(&header.table_folder);
|
||||
let path_to_index = path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column.to_string()));
|
||||
path_to_index
|
||||
}
|
||||
|
||||
pub async fn create_empty_index_at(header: &StoreHeader, column: Column) -> Result<Index<T, FilePosition>>
|
||||
where T: Encode + Decode + Ord
|
||||
{
|
||||
let path_to_index = Self::path_to_index_file(&header, column);
|
||||
let index = Index::new(path_to_index).await?;
|
||||
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
pub async fn create_initial_indexes(header: &StoreHeader) -> Result<StoreIndexes<T>>
|
||||
where T: Encode + Decode + Ord
|
||||
{
|
||||
let mut result: StoreIndexes<T> = Vec::with_capacity(header.number_of_columns);
|
||||
for _ in 0..header.number_of_columns {
|
||||
result.push(None)
|
||||
}
|
||||
|
||||
result[header.primary_column as usize] = Some(Self::create_empty_index_at(&header, header.primary_column).await?);
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn connect_index_at(header: &StoreHeader, column: Column) -> Result<Index<T, FilePosition>>
|
||||
where T: Encode + Decode + Ord
|
||||
{
|
||||
let path_to_index = Self::path_to_index_file(&header, column);
|
||||
let index: Index<T, FilePosition> = Index::connect(path_to_index).await?;
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result<File> {
|
||||
let mut file: File =
|
||||
OpenOptions::new()
|
||||
.write(true)
|
||||
.read(true)
|
||||
.create_new(true)
|
||||
.open(path_to_rows)
|
||||
.await?;
|
||||
|
||||
let encoded_header: Vec<u8> = header.encode()?;
|
||||
file.write(&encoded_header).await?;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
pub async fn connect(table_folder: &str) -> Result<Self>
|
||||
where T: std::fmt::Debug + Encode + Decode + Ord
|
||||
{
|
||||
let path_to_table = Path::new(table_folder);
|
||||
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||
|
||||
let mut file: File =
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(path_to_rows)
|
||||
.await?;
|
||||
|
||||
// Unfortunately we can't yet use store.read_bytes, since it can't be created without the
|
||||
// header.
|
||||
let header = {
|
||||
let mut fixed_header_bytes = StoreHeader::buffer_for_fixed_decoding();
|
||||
file.read_exact(&mut fixed_header_bytes).await?;
|
||||
let fixed_header = StoreHeader::decode_fixed(table_folder, &fixed_header_bytes).await?;
|
||||
|
||||
// decode the indexes
|
||||
let mut rest_bytes: Vec<u8> = StoreHeader::buffer_for_rest_decoding(&fixed_header);
|
||||
file.read_exact(&mut rest_bytes).await?;
|
||||
StoreHeader::decode_rest(fixed_header, &rest_bytes).await?
|
||||
};
|
||||
|
||||
|
||||
let indexes: StoreIndexes<T> = {
|
||||
let mut result = Vec::with_capacity(header.number_of_columns);
|
||||
for (column, &is_indexed) in header.indexed_columns.iter().enumerate() {
|
||||
if is_indexed {
|
||||
result.push(Some(Self::connect_index_at(&header, column as Column).await?))
|
||||
} else {
|
||||
result.push(None)
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
};
|
||||
|
||||
let store = Self {
|
||||
header,
|
||||
indexes
|
||||
};
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
// ===Cursors===
|
||||
pub async fn read_cursor(&self) -> Result<ReadCursor<T>>
|
||||
where T: Send + Sync
|
||||
{
|
||||
ReadCursor::new(self).await
|
||||
}
|
||||
|
||||
pub async fn write_cursor(&mut self) -> Result<WriteCursor<T>>
|
||||
where T: Send + Sync
|
||||
{
|
||||
WriteCursor::new(self).await
|
||||
}
|
||||
|
||||
// ===Indexes===
|
||||
pub async fn attach_index(&mut self, column: Column) -> Result<()>
|
||||
where T: Ord + Decode + Encode + Send + Sync
|
||||
{
|
||||
if self.header.is_column_indexed(column) {
|
||||
Err(Error::ColumnAlreadyIndexed(column))
|
||||
} else {
|
||||
let mut cursor = self.write_cursor().await?;
|
||||
cursor.attach_index(column).await
|
||||
}
|
||||
}
|
||||
|
||||
// For debugging.
|
||||
#[allow(dead_code)]
|
||||
pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>
|
||||
where T: Send + Sync
|
||||
{
|
||||
let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||
let bytes = cursor.read_all_bytes().await?;
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::segments::entry::Entry;
|
||||
use crate::cursor_capabilities::header_access::CursorCanReadHeader;
|
||||
use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex};
|
||||
|
||||
impl <T>Drop for Store<T> {
|
||||
fn drop(&mut self) {
|
||||
println!("DROPPING TEST FOLDER");
|
||||
let table_folder = self.header.table_folder.clone();
|
||||
// Seems no one has figured out how to do AsyncDrop yet.
|
||||
std::fs::remove_dir_all(table_folder).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_0";
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
assert!(store.header.number_of_columns == number_of_columns);
|
||||
assert!(store.header.total_count == 0);
|
||||
assert!(store.header.deleted_count == 0);
|
||||
assert!(store.header.primary_column == primary_column);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_1";
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
{
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
||||
let entry0: Entry<Data> = Entry::new(vec![1, 2, 3, 4, 5]);
|
||||
cursor.insert_entry(entry0).await.unwrap();
|
||||
|
||||
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
|
||||
cursor.insert_entry(entry1).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 2);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_select_next() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_2";
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
{
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
||||
let entry0: Entry<Data> = Entry::new(vec![1, 2, 3, 4, 5]);
|
||||
cursor.insert_entry(entry0).await.unwrap();
|
||||
|
||||
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
|
||||
cursor.insert_entry(entry1).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 2);
|
||||
}
|
||||
|
||||
{
|
||||
let mut cursor = store.read_cursor().await.unwrap();
|
||||
|
||||
let entry0 = cursor.next().await.unwrap().unwrap();
|
||||
let entry1 = cursor.next().await.unwrap().unwrap();
|
||||
|
||||
assert!(entry0.data == vec![1,2,3,4,5]);
|
||||
assert!(entry1.data == vec![6,7,8,9,10]);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_select_all() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_3";
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
{
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
||||
let entry0: Entry<Data> = Entry::new(vec![1, 2, 3, 4, 5]);
|
||||
cursor.insert_entry(entry0).await.unwrap();
|
||||
|
||||
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
|
||||
cursor.insert_entry(entry1).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 2);
|
||||
}
|
||||
|
||||
{
|
||||
let mut cursor = store.read_cursor().await.unwrap();
|
||||
|
||||
let mut entries = vec![];
|
||||
while let Some(entry) = cursor.next().await.unwrap() {
|
||||
entries.push(entry)
|
||||
}
|
||||
|
||||
assert!(entries.len() == 2);
|
||||
assert!(entries[0].data == vec![1,2,3,4,5]);
|
||||
assert!(entries[1].data == vec![6,7,8,9,10]);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_select_eq() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_4";
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let value = 200;
|
||||
{
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
||||
let entry0: Entry<Data> = Entry::new(vec![1, value, 3, 4, 5]);
|
||||
cursor.insert_entry(entry0).await.unwrap();
|
||||
|
||||
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
|
||||
cursor.insert_entry(entry1).await.unwrap();
|
||||
|
||||
let entry2: Entry<Data> = Entry::new(vec![11, 2, 10, 10, 10]);
|
||||
cursor.insert_entry(entry2).await.unwrap();
|
||||
|
||||
let entry3: Entry<Data> = Entry::new(vec![1, value, 100, 50, 40]);
|
||||
cursor.insert_entry(entry3).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 4);
|
||||
}
|
||||
|
||||
{
|
||||
let mut cursor = store.read_cursor().await.unwrap();
|
||||
let column = 1;
|
||||
|
||||
let entries = cursor.select_entries_where_eq(column, &value).await.unwrap();
|
||||
|
||||
assert!(entries.len() == 2);
|
||||
assert!(entries[0].data == vec![1, value, 3, 4, 5]);
|
||||
assert!(entries[1].data == vec![1, value, 100, 50, 40]);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_select_eq_indexed() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_5";
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let column: Column = 1;
|
||||
|
||||
assert!(store.indexes[column as usize].is_none());
|
||||
store.attach_index(column).await.unwrap();
|
||||
assert!(store.indexes[column as usize].is_some());
|
||||
|
||||
let value = 200;
|
||||
{
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
||||
let entry0: Entry<Data> = Entry::new(vec![1, value, 3, 4, 5]);
|
||||
cursor.insert_entry(entry0).await.unwrap();
|
||||
|
||||
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
|
||||
cursor.insert_entry(entry1).await.unwrap();
|
||||
|
||||
let entry2: Entry<Data> = Entry::new(vec![11, 2, 10, 10, 10]);
|
||||
cursor.insert_entry(entry2).await.unwrap();
|
||||
|
||||
let entry3: Entry<Data> = Entry::new(vec![1, value, 100, 50, 40]);
|
||||
cursor.insert_entry(entry3).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 4);
|
||||
}
|
||||
|
||||
{
|
||||
let mut cursor = store.read_cursor().await.unwrap();
|
||||
let column = 1;
|
||||
|
||||
let entries = cursor.select_entries_where_eq(column, &value).await.unwrap();
|
||||
assert!(entries.len() == 2);
|
||||
// Order may be non-deterministic.
|
||||
assert!(entries[0].data[column as usize] == value);
|
||||
assert!(entries[1].data[column as usize] == value);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_entry() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_6";
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let value = 200;
|
||||
let (_file_position0, file_position1, _file_position2, _file_position3) = {
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
||||
let entry0: Entry<Data> = Entry::new(vec![1, value, 3, 4, 5]);
|
||||
let file_position0 = cursor.insert_entry(entry0).await.unwrap();
|
||||
|
||||
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
|
||||
let file_position1 = cursor.insert_entry(entry1).await.unwrap();
|
||||
|
||||
let entry2: Entry<Data> = Entry::new(vec![11, 2, 10, 10, 10]);
|
||||
let file_position2 = cursor.insert_entry(entry2).await.unwrap();
|
||||
|
||||
let entry3: Entry<Data> = Entry::new(vec![1, value, 100, 50, 40]);
|
||||
let file_position3 = cursor.insert_entry(entry3).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 4);
|
||||
(file_position0, file_position1, file_position2, file_position3)
|
||||
};
|
||||
|
||||
{
|
||||
assert!(store.header.deleted_count == 0);
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
cursor.mark_deleted_at(file_position1, false).await.unwrap();
|
||||
assert!(store.header.deleted_count == 1);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_where_eq() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_7";
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let column: Column = 1;
|
||||
|
||||
assert!(store.indexes[column as usize].is_none());
|
||||
store.attach_index(column).await.unwrap();
|
||||
assert!(store.indexes[column as usize].is_some());
|
||||
|
||||
let value = 200;
|
||||
|
||||
let (_file_position0, _file_position1, _file_position2, _file_position3) = {
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
||||
let entry0: Entry<Data> = Entry::new(vec![1, value, 3, 4, 5]);
|
||||
let file_position0 = cursor.insert_entry(entry0).await.unwrap();
|
||||
|
||||
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
|
||||
let file_position1 = cursor.insert_entry(entry1).await.unwrap();
|
||||
|
||||
let entry2: Entry<Data> = Entry::new(vec![11, 2, 10, 10, 10]);
|
||||
let file_position2 = cursor.insert_entry(entry2).await.unwrap();
|
||||
|
||||
let entry3: Entry<Data> = Entry::new(vec![1, value, 100, 50, 40]);
|
||||
let file_position3 = cursor.insert_entry(entry3).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 4);
|
||||
(file_position0, file_position1, file_position2, file_position3)
|
||||
};
|
||||
|
||||
{
|
||||
assert!(store.header.deleted_count == 0);
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
cursor.delete_entries_where_eq(column, &value, false).await.unwrap();
|
||||
assert!(store.header.deleted_count == 2);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_garbage_collection() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_8";
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let column: Column = 1;
|
||||
|
||||
assert!(store.indexes[column as usize].is_none());
|
||||
store.attach_index(column).await.unwrap();
|
||||
assert!(store.indexes[column as usize].is_some());
|
||||
|
||||
let value = 200;
|
||||
|
||||
let (_file_position0, _file_position1, _file_position2, _file_position3) = {
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
||||
let entry0: Entry<Data> = Entry::new(vec![1, value, 3, 4, 5]);
|
||||
let file_position0 = cursor.insert_entry(entry0).await.unwrap();
|
||||
|
||||
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
|
||||
let file_position1 = cursor.insert_entry(entry1).await.unwrap();
|
||||
|
||||
let entry2: Entry<Data> = Entry::new(vec![11, 2, 10, 10, 10]);
|
||||
let file_position2 = cursor.insert_entry(entry2).await.unwrap();
|
||||
|
||||
let entry3: Entry<Data> = Entry::new(vec![1, value, 100, 50, 40]);
|
||||
let file_position3 = cursor.insert_entry(entry3).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 4);
|
||||
(file_position0, file_position1, file_position2, file_position3)
|
||||
};
|
||||
|
||||
{
|
||||
assert!(store.header.deleted_count == 0);
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
cursor.delete_entries_where_eq(column, &value, false).await.unwrap();
|
||||
assert!(cursor.header().deleted_count == 2);
|
||||
assert!(cursor.header().total_count == 4);
|
||||
|
||||
cursor.initiate_garbage_collection().await.unwrap();
|
||||
assert!(cursor.header().deleted_count == 0);
|
||||
assert!(cursor.header().total_count == 2);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue