303 lines
8.7 KiB
Rust
303 lines
8.7 KiB
Rust
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
use tokio::fs::{File, OpenOptions, DirBuilder};
|
|
use tokio::fs;
|
|
use std::path::{Path, PathBuf};
|
|
use std::marker::PhantomData;
|
|
use bincode::{Decode, Encode};
|
|
|
|
use crate::error::Error;
|
|
use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader};
|
|
use crate::store_header::StoreHeader;
|
|
use crate::index::Index;
|
|
|
|
|
|
pub type Result<T> = std::result::Result<T, Error>;
|
|
|
|
pub type Column = u64;
|
|
pub type FilePosition = u64;
|
|
|
|
// TODO: Consider adding another type parameter for indexable values
|
|
#[derive(Debug)]
|
|
pub struct Store<T> {
|
|
pub header: StoreHeader,
|
|
pub indexes: StoreIndexes<T>,
|
|
}
|
|
|
|
pub type StoreIndexes<T> = Vec<Option<Index<T, FilePosition>>>;
|
|
|
|
pub type PositionOfValue = FilePosition;
|
|
pub type PositionOfRow = FilePosition;
|
|
|
|
|
|
//===Store===
|
|
pub async fn store_exists(table_folder: &str) -> Result<bool> {
|
|
Ok(fs::metadata(table_folder).await.is_ok())
|
|
}
|
|
|
|
pub const ROWS_FILE_NAME: &'static str = "rows";
|
|
pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate";
|
|
|
|
impl <T>Store<T> {
|
|
// ===Creation===
|
|
pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result<Self>
|
|
where T: Encode + Decode + Ord
|
|
{
|
|
let path_to_table = Path::new(table_folder);
|
|
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
|
DirBuilder::new()
|
|
.create(path_to_table).await?;
|
|
|
|
let header = {
|
|
let mut indexed_columns = vec![false; number_of_columns];
|
|
indexed_columns[primary_column as usize] = true;
|
|
StoreHeader {
|
|
table_folder: table_folder.to_string(),
|
|
number_of_columns,
|
|
deleted_count: 0,
|
|
total_count: 0,
|
|
primary_column,
|
|
indexed_columns,
|
|
}
|
|
};
|
|
|
|
// We don't need the file right now. Only cursors will later open it.
|
|
Self::create_empty_rows_file(path_to_rows, &header).await?;
|
|
|
|
let indexes: StoreIndexes<T> = Self::create_initial_indexes(&header).await?;
|
|
|
|
let store = Self {
|
|
header,
|
|
indexes,
|
|
};
|
|
|
|
Ok(store)
|
|
}
|
|
|
|
pub fn path_to_index_file(header: &StoreHeader, column: Column) -> PathBuf {
|
|
let path_to_table = Path::new(&header.table_folder);
|
|
let path_to_index = path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column.to_string()));
|
|
path_to_index
|
|
}
|
|
|
|
pub async fn create_empty_index_at(header: &StoreHeader, column: Column) -> Result<Index<T, FilePosition>>
|
|
where T: Encode + Decode + Ord
|
|
{
|
|
let path_to_index = Self::path_to_index_file(&header, column);
|
|
let index = Index::new(path_to_index).await?;
|
|
|
|
Ok(index)
|
|
}
|
|
|
|
pub async fn create_initial_indexes(header: &StoreHeader) -> Result<StoreIndexes<T>>
|
|
where T: Encode + Decode + Ord
|
|
{
|
|
let mut result: StoreIndexes<T> = Vec::with_capacity(header.number_of_columns);
|
|
for _ in 0..header.number_of_columns {
|
|
result.push(None)
|
|
}
|
|
|
|
result[header.primary_column as usize] = Some(Self::create_empty_index_at(&header, header.primary_column).await?);
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
pub async fn connect_index_at(header: &StoreHeader, column: Column) -> Result<Index<T, FilePosition>>
|
|
where T: Encode + Decode + Ord
|
|
{
|
|
let path_to_index = Self::path_to_index_file(&header, column);
|
|
let index: Index<T, FilePosition> = Index::connect(path_to_index).await?;
|
|
Ok(index)
|
|
}
|
|
|
|
pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result<File> {
|
|
let mut file: File =
|
|
OpenOptions::new()
|
|
.write(true)
|
|
.read(true)
|
|
.create_new(true)
|
|
.open(path_to_rows)
|
|
.await?;
|
|
|
|
let encoded_header: Vec<u8> = header.encode()?;
|
|
file.write(&encoded_header).await?;
|
|
|
|
Ok(file)
|
|
}
|
|
|
|
pub async fn connect(table_folder: &str) -> Result<Self>
|
|
where T: std::fmt::Debug + Encode + Decode + Ord
|
|
{
|
|
let path_to_table = Path::new(table_folder);
|
|
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
|
|
|
let mut file: File =
|
|
OpenOptions::new()
|
|
.read(true)
|
|
.write(true)
|
|
.open(path_to_rows)
|
|
.await?;
|
|
|
|
// Unfortunately we can't yet use store.read_bytes, since it can't be created without the
|
|
// header.
|
|
let header = {
|
|
let mut fixed_header_bytes = StoreHeader::buffer_for_fixed_decoding();
|
|
file.read_exact(&mut fixed_header_bytes).await?;
|
|
let fixed_header = StoreHeader::decode_fixed(table_folder, &fixed_header_bytes).await?;
|
|
|
|
// decode the indexes
|
|
let mut rest_bytes: Vec<u8> = StoreHeader::buffer_for_rest_decoding(&fixed_header);
|
|
file.read_exact(&mut rest_bytes).await?;
|
|
StoreHeader::decode_rest(fixed_header, &rest_bytes).await?
|
|
};
|
|
|
|
|
|
let indexes: StoreIndexes<T> = {
|
|
let mut result = Vec::with_capacity(header.number_of_columns);
|
|
for (column, &is_indexed) in header.indexed_columns.iter().enumerate() {
|
|
if is_indexed {
|
|
result.push(Some(Self::connect_index_at(&header, column as Column).await?))
|
|
} else {
|
|
result.push(None)
|
|
}
|
|
}
|
|
|
|
result
|
|
};
|
|
|
|
let store = Self {
|
|
header,
|
|
indexes
|
|
};
|
|
Ok(store)
|
|
}
|
|
|
|
// ===Cursors===
|
|
pub async fn read_cursor(&self) -> Result<ReadCursor<T>>
|
|
where T: Send + Sync
|
|
{
|
|
ReadCursor::new(self).await
|
|
}
|
|
|
|
pub async fn write_cursor(&mut self) -> Result<WriteCursor<T>>
|
|
where T: Send + Sync
|
|
{
|
|
WriteCursor::new(self).await
|
|
}
|
|
|
|
// ===Indexes===
|
|
pub async fn attach_index(&mut self, column: Column) -> Result<()>
|
|
where T: Ord + Decode + Encode + Send + Sync
|
|
{
|
|
if self.header.is_column_indexed(column) {
|
|
Err(Error::ColumnAlreadyIndexed(column))
|
|
} else {
|
|
let mut cursor = self.write_cursor().await?;
|
|
cursor.attach_index(column).await
|
|
}
|
|
}
|
|
|
|
// For debugging.
|
|
pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>
|
|
where T: Send + Sync
|
|
{
|
|
let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
|
let bytes = cursor.read_all_bytes().await?;
|
|
Ok(bytes)
|
|
}
|
|
}
|
|
|
|
// ===Store Header===
|
|
|
|
// ====Entry====
|
|
|
|
|
|
// impl StorageEngine for ColumnStore {
|
|
// async fn append(&mut self, id: Index, entry: Row<T>) -> Result<???, Error>
|
|
|
|
// async fn get_all(&self) -> ???
|
|
// async fn get_eq(&self, column: Column, value: T) -> ???
|
|
|
|
// async fn delete_all(&mut self)
|
|
// async fn delete_eq(&mut self, column: Column, value: T) -> ???
|
|
// }
|
|
|
|
// struct Error {
|
|
// }
|
|
|
|
|
|
// Selected(
|
|
// &'a TableSchema,
|
|
// ColumnSelection,
|
|
// TODO: Don't do the Box(dyn Iterator<...>)
|
|
// you'll have a concrete implementation of Iterator, and that's what you'll use
|
|
// Box<dyn Iterator<Item = RestrictedRow> + 'a + Send>,
|
|
// ),
|
|
|
|
|
|
|
|
// #[async_trait]
|
|
// trait StorageEngine<T>
|
|
// where T: Encode + Decode
|
|
// {
|
|
// async fn append(&mut self, id: Index, entry: Row<T>) -> Result<???, Error>
|
|
|
|
// async fn get_all(&self) -> ???
|
|
// async fn get_eq(&self, column: Column, value: T) -> ???
|
|
|
|
// async fn delete_all(&mut self)
|
|
// async fn delete_eq(&mut self, column: Column, value: T) -> ???
|
|
// }
|
|
|
|
// #[cfg(test)]
|
|
// mod tests {
|
|
// #[test]
|
|
// fn hello_test() {
|
|
// assert!(true);
|
|
// }
|
|
// }
|
|
|
|
// let sroage_engine = STorageEngine::new("users")
|
|
// let mut next_position = 0
|
|
|
|
|
|
// type FilePosition = usize;
|
|
|
|
|
|
// type StoreFile = Vec<Entry>;
|
|
// type IndexFile = ???
|
|
|
|
// struct IndexEntry {
|
|
|
|
// }
|
|
|
|
|
|
// #00000 [false, u26, "Arnold", "schwarzenegger", "gettothechoppa@yahoo.com"] #5120000 [true, u27, "Arnold", "Vosloo", "avosloo@aol.com"]
|
|
// #00000 [true, u27, "Arnold", "Vosloo", "avosloo@aol.com"]
|
|
|
|
|
|
// at #00000 512 kb deleted,
|
|
// ...
|
|
|
|
|
|
|
|
|
|
// [(u26, [#00000]), (u27, [#5120000])]
|
|
// [("Arnold", [#000000, #5120000]), ("Arnfsdaf", []), ("Adasdsd", []), ("Bdsad", [])]
|
|
// // basically always keep indexes in memory and on write always sync on disk
|
|
|
|
|
|
|
|
|
|
// CREATE INDEX usersname ON "users" (name);
|
|
|
|
// INSERT INTO users (id, name, surname, email) VALUES (u26, "Arnold", "schwarzenegger", "gettothechoppa@yahoo.com");
|
|
// INSERT INTO users (id, name, surname, email) VALUES (u27, "Arnold", "Vosloo", "avosloo@aol.com");
|
|
|
|
|
|
// SELECT * FROM users WHERE id=u26;
|
|
|
|
// SELECT * FROM users WHERE name="Arnold";
|
|
|
|
|
|
// SELECT * FROM cars;
|
|
// DELETE FROM users WHERE name="Arnold";
|