Sketch out indexes in Store
This commit is contained in:
parent
dae012daa7
commit
89305b6126
4 changed files with 82 additions and 32 deletions
|
|
@ -14,10 +14,14 @@ use crate::entry::{Entry, EntryDetailed};
|
||||||
use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader};
|
use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader};
|
||||||
use crate::store_header::StoreHeader;
|
use crate::store_header::StoreHeader;
|
||||||
use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME};
|
use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME};
|
||||||
|
use crate::index::Index;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// ===Concrete Cursors===
|
// ===Concrete Cursors===
|
||||||
pub struct ReadCursor<T> {
|
pub struct ReadCursor<'a, T> {
|
||||||
header: StoreHeader,
|
header: StoreHeader,
|
||||||
|
indexes: Vec<Option<&'a Index<T, FilePosition>>>,
|
||||||
file: File,
|
file: File,
|
||||||
data_type: PhantomData<T>,
|
data_type: PhantomData<T>,
|
||||||
|
|
||||||
|
|
@ -26,6 +30,7 @@ pub struct ReadCursor<T> {
|
||||||
|
|
||||||
pub struct WriteCursor<'a, T> {
|
pub struct WriteCursor<'a, T> {
|
||||||
header: &'a mut StoreHeader,
|
header: &'a mut StoreHeader,
|
||||||
|
indexes: Vec<Option<&'a Index<T, FilePosition>>>,
|
||||||
file: File,
|
file: File,
|
||||||
data_type: PhantomData<T>,
|
data_type: PhantomData<T>,
|
||||||
|
|
||||||
|
|
@ -224,9 +229,20 @@ pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait CursorWithAccessToIndex<T>: CursorWithStoreHeader<T> {
|
||||||
|
fn indexes(&mut self) -> &[Option<&Index<T, FilePosition>>];
|
||||||
|
|
||||||
|
async fn find_in_index(&mut self, k: &T) -> Result<Option<FilePosition>>
|
||||||
|
where T: Encode + Decode + Ord + Send + Sync
|
||||||
|
{
|
||||||
|
// let x = self.primary_index().lookup(k).await?;
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ===PrimitiveCursor===
|
// ===PrimitiveCursor===
|
||||||
impl <T>PrimitiveCursor<T> for ReadCursor<T> {
|
impl <T>PrimitiveCursor<T> for ReadCursor<'_, T> {
|
||||||
fn file(&mut self) -> &mut File {
|
fn file(&mut self) -> &mut File {
|
||||||
&mut self.file
|
&mut self.file
|
||||||
}
|
}
|
||||||
|
|
@ -247,7 +263,7 @@ impl <T>PrimitiveCursor<T> for WriteCursor<'_, T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===CursorWithStoreHeader===
|
// ===CursorWithStoreHeader===
|
||||||
impl <T>CursorWithStoreHeader<T> for ReadCursor<T> {
|
impl <T>CursorWithStoreHeader<T> for ReadCursor<'_, T> {
|
||||||
fn header(&self) -> &StoreHeader {
|
fn header(&self) -> &StoreHeader {
|
||||||
&self.header
|
&self.header
|
||||||
}
|
}
|
||||||
|
|
@ -259,9 +275,23 @@ impl <T>CursorWithStoreHeader<T> for WriteCursor<'_, T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl <T> ReadCursor<T> {
|
// ===CursorWithAccessToIndex===
|
||||||
pub async fn new(store: &Store<T>) -> Result<Self>
|
impl <T>CursorWithAccessToIndex<T> for ReadCursor<'_, T> {
|
||||||
where T: Send
|
fn indexes(&mut self) -> &[Option<&Index<T, FilePosition>>] {
|
||||||
|
&self.indexes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <T>CursorWithAccessToIndex<T> for WriteCursor<'_, T> {
|
||||||
|
fn indexes(&mut self) -> &[Option<&Index<T, FilePosition>>] {
|
||||||
|
&self.indexes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl <'cursor, T> ReadCursor<'cursor, T> {
|
||||||
|
pub async fn new<'store: 'cursor>(store: &'store Store<T>) -> Result<Self>
|
||||||
|
where T: Send + Sync
|
||||||
{
|
{
|
||||||
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
|
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
|
||||||
let file: File =
|
let file: File =
|
||||||
|
|
@ -274,6 +304,7 @@ impl <T> ReadCursor<T> {
|
||||||
header: store.header.clone(),
|
header: store.header.clone(),
|
||||||
file,
|
file,
|
||||||
data_type: store.data_type,
|
data_type: store.data_type,
|
||||||
|
indexes: todo!(),
|
||||||
|
|
||||||
eof_file_position: 0,
|
eof_file_position: 0,
|
||||||
};
|
};
|
||||||
|
|
@ -284,13 +315,12 @@ impl <T> ReadCursor<T> {
|
||||||
|
|
||||||
Ok(cursor)
|
Ok(cursor)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> Result<bool> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl <'cursor, T> WriteCursor<'cursor, T> {
|
impl <'cursor, T> WriteCursor<'cursor, T>
|
||||||
|
// TODO: Consider adding this manually to wher eit is really needed
|
||||||
|
where T: Sync
|
||||||
|
{
|
||||||
// 'store lives atleast as long as 'cursor
|
// 'store lives atleast as long as 'cursor
|
||||||
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
|
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
|
||||||
where T: Send
|
where T: Send
|
||||||
|
|
@ -307,6 +337,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
|
||||||
header: &mut store.header,
|
header: &mut store.header,
|
||||||
file,
|
file,
|
||||||
data_type: store.data_type,
|
data_type: store.data_type,
|
||||||
|
indexes: todo!(),
|
||||||
|
|
||||||
eof_file_position: 0,
|
eof_file_position: 0,
|
||||||
};
|
};
|
||||||
|
|
@ -332,6 +363,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
|
||||||
header,
|
header,
|
||||||
file,
|
file,
|
||||||
data_type: PhantomData::<T>,
|
data_type: PhantomData::<T>,
|
||||||
|
indexes: todo!(),
|
||||||
|
|
||||||
eof_file_position: 0,
|
eof_file_position: 0,
|
||||||
};
|
};
|
||||||
|
|
@ -478,4 +510,20 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
|
||||||
// Afterwards we swap the files, and delete the garbage.
|
// Afterwards we swap the files, and delete the garbage.
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ===Indexing===
|
||||||
|
async fn insert_to_index(&mut self, t: T, file_position: FilePosition) -> Result<Option<FilePosition>>
|
||||||
|
where T: Encode + Decode + Ord + Send + Sync
|
||||||
|
{
|
||||||
|
// let x = self.primary_index.insert(t, file_position).await?;
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn delete_from_index(&mut self, t: T, file_position: FilePosition) -> Result<Option<FilePosition>>
|
||||||
|
where T: Encode + Decode + Ord + Send + Sync
|
||||||
|
{
|
||||||
|
// let x = self.primary_index.delete(t, file_position).await?;
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -83,8 +83,8 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn lookup(&self, k: K) -> Result<Option<HashSet<V>>> {
|
pub async fn lookup(&self, k: &K) -> Result<Option<HashSet<V>>> {
|
||||||
let hashset = self.data.get(&k).unwrap();
|
let hashset = self.data.get(k).unwrap();
|
||||||
Ok(Some(hashset.clone()))
|
Ok(Some(hashset.clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -52,7 +52,7 @@ async fn append_entry(cursor: &mut WriteCursor<'_, Data>, entry: &Entry<Data>) -
|
||||||
Ok(file_position)
|
Ok(file_position)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_entry(cursor: &mut ReadCursor<Data>, file_position: FilePosition) -> Result<Option<EntryDetailed<Data>>> {
|
async fn read_entry(cursor: &mut ReadCursor<'_, Data>, file_position: FilePosition) -> Result<Option<EntryDetailed<Data>>> {
|
||||||
println!("READING ENTRY at file_position={}", file_position);
|
println!("READING ENTRY at file_position={}", file_position);
|
||||||
// let entry = cursor.read_entry_at(file_position).await.map_err(|e| e.to_io_or_panic())?;
|
// let entry = cursor.read_entry_at(file_position).await.map_err(|e| e.to_io_or_panic())?;
|
||||||
// println!("ENTRY: {:?}", entry);
|
// println!("ENTRY: {:?}", entry);
|
||||||
|
|
|
||||||
|
|
@ -3,15 +3,12 @@ use tokio::fs::{File, OpenOptions, DirBuilder};
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use async_trait::async_trait;
|
use bincode::{Decode, Encode};
|
||||||
|
|
||||||
use crate::index::SomethingSupportingLeq;
|
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader};
|
use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader};
|
||||||
use crate::store_header::StoreHeader;
|
use crate::store_header::StoreHeader;
|
||||||
|
use crate::index::Index;
|
||||||
// TODO
|
|
||||||
// use crate::index::Index;
|
|
||||||
|
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
@ -37,6 +34,7 @@ pub struct Store<T> {
|
||||||
// All
|
// All
|
||||||
pub header: StoreHeader,
|
pub header: StoreHeader,
|
||||||
pub data_type: PhantomData<T>,
|
pub data_type: PhantomData<T>,
|
||||||
|
pub primary_index: Index<T, FilePosition>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type PositionOfValue = FilePosition;
|
pub type PositionOfValue = FilePosition;
|
||||||
|
|
@ -48,16 +46,14 @@ pub async fn store_exists(table_folder: &str) -> Result<bool> {
|
||||||
Ok(fs::metadata(table_folder).await.is_ok())
|
Ok(fs::metadata(table_folder).await.is_ok())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn less_than_eq<T>(store: &mut Store<T>, file_position0: FilePosition, file_position1: FilePosition) -> Result<bool> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const ROWS_FILE_NAME: &'static str = "rows";
|
pub const ROWS_FILE_NAME: &'static str = "rows";
|
||||||
pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate";
|
pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate";
|
||||||
|
|
||||||
impl <T>Store<T> {
|
impl <T>Store<T> {
|
||||||
// ===Creation===
|
// ===Creation===
|
||||||
pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result<Self> {
|
pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result<Self>
|
||||||
|
where T: Encode + Decode + Ord
|
||||||
|
{
|
||||||
let path_to_table = Path::new(table_folder);
|
let path_to_table = Path::new(table_folder);
|
||||||
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||||
DirBuilder::new()
|
DirBuilder::new()
|
||||||
|
|
@ -74,14 +70,14 @@ impl <T>Store<T> {
|
||||||
// We don't need the file right now. Only cursors will later open it.
|
// We don't need the file right now. Only cursors will later open it.
|
||||||
Self::create_empty_rows_file(path_to_rows, &header).await?;
|
Self::create_empty_rows_file(path_to_rows, &header).await?;
|
||||||
|
|
||||||
// TODO: indexes
|
let primary_index: Index<T, FilePosition> = Index::new(
|
||||||
// let index: Index<PositionOfValue, PositionOfRow> = Index::new(
|
&format!("rows_{}", primary_column.to_string()),
|
||||||
// &format!("rows_{}", primary_column.to_string()),
|
).await?;
|
||||||
// ).await?;
|
|
||||||
|
|
||||||
let store = Self {
|
let store = Self {
|
||||||
header,
|
header,
|
||||||
data_type: PhantomData::<T>,
|
data_type: PhantomData::<T>,
|
||||||
|
primary_index,
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(store)
|
Ok(store)
|
||||||
|
|
@ -103,7 +99,7 @@ impl <T>Store<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn connect(table_folder: &str) -> Result<Self>
|
pub async fn connect(table_folder: &str) -> Result<Self>
|
||||||
where T: std::fmt::Debug
|
where T: std::fmt::Debug + Encode + Decode + Ord
|
||||||
{
|
{
|
||||||
let path_to_table = Path::new(table_folder);
|
let path_to_table = Path::new(table_folder);
|
||||||
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||||
|
|
@ -121,29 +117,35 @@ impl <T>Store<T> {
|
||||||
file.read_exact(&mut header_bytes).await?;
|
file.read_exact(&mut header_bytes).await?;
|
||||||
let header = StoreHeader::decode(table_folder, &mut header_bytes).await?;
|
let header = StoreHeader::decode(table_folder, &mut header_bytes).await?;
|
||||||
|
|
||||||
|
|
||||||
|
let primary_index: Index<T, FilePosition> = Index::connect(
|
||||||
|
&format!("rows_{}", header.primary_column.to_string()),
|
||||||
|
).await?;
|
||||||
|
|
||||||
let store = Self {
|
let store = Self {
|
||||||
header,
|
header,
|
||||||
data_type: PhantomData::<T>,
|
data_type: PhantomData::<T>,
|
||||||
|
primary_index
|
||||||
};
|
};
|
||||||
Ok(store)
|
Ok(store)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===Cursors===
|
// ===Cursors===
|
||||||
pub async fn read_cursor(&self) -> Result<ReadCursor<T>>
|
pub async fn read_cursor(&self) -> Result<ReadCursor<T>>
|
||||||
where T: Send
|
where T: Send + Sync
|
||||||
{
|
{
|
||||||
ReadCursor::new(self).await
|
ReadCursor::new(self).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn write_cursor(&mut self) -> Result<WriteCursor<T>>
|
pub async fn write_cursor(&mut self) -> Result<WriteCursor<T>>
|
||||||
where T: Send
|
where T: Send + Sync
|
||||||
{
|
{
|
||||||
WriteCursor::new(self).await
|
WriteCursor::new(self).await
|
||||||
}
|
}
|
||||||
|
|
||||||
// For debugging.
|
// For debugging.
|
||||||
pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>
|
pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>
|
||||||
where T: Send
|
where T: Send + Sync
|
||||||
{
|
{
|
||||||
let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||||
let bytes = cursor.read_all_bytes().await?;
|
let bytes = cursor.read_all_bytes().await?;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue