Make use of indexes
This commit is contained in:
parent
2357ea8230
commit
8139112934
5 changed files with 120 additions and 27 deletions
|
|
@ -245,18 +245,6 @@ pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorWithAccessToIndex<T>: CursorWithStoreHeader<T> {
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>];
|
||||
|
||||
async fn find_in_index(&mut self, k: &T) -> Result<Option<FilePosition>>
|
||||
where T: Encode + Decode + Ord + Send + Sync
|
||||
{
|
||||
// let x = self.primary_index().lookup(k).await?;
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorWithWriteStoreHeader<T>: CursorWithStoreHeader<T> + PrimitiveWriteCursor<T> {
|
||||
fn header_mut(&mut self) -> &mut StoreHeader;
|
||||
|
|
@ -287,7 +275,7 @@ pub trait CursorWithWriteStoreHeader<T>: CursorWithStoreHeader<T> + PrimitiveWri
|
|||
|
||||
// Moves cursor to the end.
|
||||
// Returns file position to the start of the new entry.
|
||||
async fn append_entry(&mut self, entry: &Entry<T>) -> Result<FilePosition>
|
||||
async fn append_entry_no_indexing(&mut self, entry: &Entry<T>) -> Result<FilePosition>
|
||||
where T: Encode + Send + Sync
|
||||
{
|
||||
self.increment_total_count().await?;
|
||||
|
|
@ -303,6 +291,94 @@ pub trait CursorWithWriteStoreHeader<T>: CursorWithStoreHeader<T> + PrimitiveWri
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorWithAccessToIndex<T>: CursorWithStoreHeader<T> {
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>];
|
||||
|
||||
async fn index_lookup(&mut self, column: Column, k: &T) -> Result<Vec<EntryDetailed<T>>>
|
||||
where T: Encode + Decode + Ord + Send + Sync
|
||||
{
|
||||
match &self.indexes()[column as usize] {
|
||||
Some(index) => {
|
||||
let file_positions = index.lookup(k).await?.unwrap_or_else(|| HashSet::new());
|
||||
let mut entries: Vec<EntryDetailed<T>> = vec![];
|
||||
for &file_position in file_positions.iter() {
|
||||
match self.read_entry_at(file_position).await? {
|
||||
Some(entry) => {
|
||||
entries.push(entry)
|
||||
},
|
||||
None => {
|
||||
return Err(Error::IndexIsStoringEofFilePosition(column))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(entries)
|
||||
},
|
||||
None =>
|
||||
Err(Error::AttemptToIndexNonIndexableColumn(column))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: I also need the global find
|
||||
async fn lookup(&mut self, column: Column, k: &T) -> Result<Vec<EntryDetailed<T>>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorWithWriteAccessToIndex<T>: CursorWithAccessToIndex<T> + CursorWithWriteStoreHeader<T> {
|
||||
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>];
|
||||
|
||||
// Assumes that the column is indexable.
|
||||
fn mut_index_at(&mut self, column: Column) -> &mut Index<T, FilePosition> {
|
||||
match &mut self.indexes_mut()[column as usize] {
|
||||
Some(index) => {
|
||||
index
|
||||
},
|
||||
None => {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Assumes that the column is indexable.
|
||||
async fn insert_into_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()>
|
||||
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||
{
|
||||
let index = self.mut_index_at(column as Column);
|
||||
index.insert(value, file_position).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Assumes that the column is indexable.
|
||||
async fn delete_from_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()>
|
||||
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||
{
|
||||
let index = self.mut_index_at(column as Column);
|
||||
index.delete(value, file_position).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert_entry(&mut self, entry: Entry<T>) -> Result<FilePosition>
|
||||
// TODO: Why is 'async_trait necessary?
|
||||
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||
{
|
||||
let file_position = self.append_entry_no_indexing(&entry).await?;
|
||||
|
||||
// insert the indexable values of the entry into corresponding indexes.
|
||||
for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() {
|
||||
if should_index {
|
||||
// SAFETY: If should_index is true, then the column is indexable.
|
||||
self.insert_into_index(column as Column, value, file_position).await?
|
||||
}
|
||||
}
|
||||
|
||||
Ok(file_position)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ===========Implementations=============
|
||||
// ===PrimitiveCursor===
|
||||
impl <T>PrimitiveCursor<T> for ReadCursor<'_, T> {
|
||||
|
|
@ -373,6 +449,10 @@ impl <T>CursorWithAccessToIndex<T> for WriteCursor<'_, T> {
|
|||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
|
||||
}
|
||||
|
||||
// ===CursorWithWriteAccessToIndex===
|
||||
impl <T>CursorWithWriteAccessToIndex<T> for WriteCursor<'_, T> {
|
||||
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>] { self.indexes }
|
||||
}
|
||||
|
||||
|
||||
impl <'cursor, T> ReadCursor<'cursor, T> {
|
||||
|
|
@ -526,7 +606,7 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
{
|
||||
while let Some(live_entry) = self.next_alive().await? {
|
||||
entries_deleted += 1;
|
||||
let file_position = cursor_to_intermediate.append_entry(&live_entry.forget()).await?;
|
||||
let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?;
|
||||
// TODO: Start indexing all of the indexable columns from scratch.
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,8 +7,8 @@ use crate::entry_header::{EntryHeader, EntryHeaderWithDataSize};
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct Entry<T> {
|
||||
header: EntryHeader,
|
||||
data: Vec<T>,
|
||||
pub header: EntryHeader,
|
||||
pub data: Vec<T>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
|
|||
|
|
@ -1,7 +1,11 @@
|
|||
use crate::storage_engine::Column;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
DecodeError(DecodeErrorKind, bincode::error::DecodeError),
|
||||
EncodeError(bincode::error::EncodeError),
|
||||
AttemptToIndexNonIndexableColumn(Column),
|
||||
IndexIsStoringEofFilePosition(Column),
|
||||
IoError(std::io::Error),
|
||||
InvalidStoreHeader,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ mod store_header;
|
|||
|
||||
use crate::entry::{Entry, EntryDetailed};
|
||||
use crate::storage_engine::{Store, FilePosition};
|
||||
use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, CursorWithWriteStoreHeader};
|
||||
use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, CursorWithWriteStoreHeader, CursorWithWriteAccessToIndex, CursorWithAccessToIndex};
|
||||
|
||||
type Data = u32;
|
||||
|
||||
|
|
@ -44,10 +44,12 @@ async fn create_or_connect() -> Result<Store<Data>> {
|
|||
}
|
||||
|
||||
|
||||
async fn append_entry(cursor: &mut WriteCursor<'_, Data>, entry: &Entry<Data>) -> Result<FilePosition> {
|
||||
async fn append_entry(cursor: &mut WriteCursor<'_, Data>, entry: Entry<Data>) -> Result<FilePosition> {
|
||||
println!("APPENDING");
|
||||
println!("entry == {:?}", entry);
|
||||
let file_position: FilePosition = cursor.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?;
|
||||
|
||||
// let file_position: FilePosition = cursor.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?;
|
||||
let file_position: FilePosition = cursor.insert_entry(entry).await.map_err(|e| e.to_io_or_panic())?;
|
||||
println!("file_position == {:?}", file_position);
|
||||
Ok(file_position)
|
||||
}
|
||||
|
|
@ -63,17 +65,20 @@ async fn read_entry(cursor: &mut ReadCursor<'_, Data>, file_position: FilePositi
|
|||
async fn append_bunch_of_entries(store: &mut Store<Data>) -> Result<()> {
|
||||
let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||
let entry0: Entry<u32> = Entry::new(vec![1, 2, 3, 4, 5]);
|
||||
append_entry(&mut cursor, &entry0).await?;
|
||||
append_entry(&mut cursor, entry0).await?;
|
||||
|
||||
let entry1: Entry<u32> = Entry::new(vec![200, 200, 5, 6, 7]);
|
||||
append_entry(&mut cursor, &entry1).await?;
|
||||
append_entry(&mut cursor, entry1).await?;
|
||||
|
||||
// println!("{:?}", store.read_all_bytes().await?);
|
||||
let entry2: Entry<u32> = Entry::new(vec![99, 98, 97, 96, 95]);
|
||||
append_entry(&mut cursor, &entry2).await?;
|
||||
append_entry(&mut cursor, entry2).await?;
|
||||
|
||||
let entry3: Entry<u32> = Entry::new(vec![50,50,50,50,50]);
|
||||
append_entry(&mut cursor, &entry3).await?;
|
||||
append_entry(&mut cursor, entry3).await?;
|
||||
|
||||
let entry4: Entry<u32> = Entry::new(vec![1,50,50,50,50]); // same 0-th column as entry0
|
||||
append_entry(&mut cursor, entry4).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -141,6 +146,13 @@ async fn main() -> Result<()> {
|
|||
println!("{:?}", x);
|
||||
}
|
||||
|
||||
{
|
||||
let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||
let entries = cursor.index_lookup(0, &1).await.map_err(|e| e.to_io_or_panic())?;
|
||||
println!("ARE INDEXES WORKING???");
|
||||
println!("{:?}", entries);
|
||||
}
|
||||
|
||||
// {
|
||||
// let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||
// let column = 3;
|
||||
|
|
|
|||
|
|
@ -155,10 +155,7 @@ impl <T>Store<T> {
|
|||
let mut result = Vec::with_capacity(header.number_of_columns);
|
||||
for (column, &is_indexed) in header.indexed_columns.iter().enumerate() {
|
||||
if is_indexed {
|
||||
result.push(None)
|
||||
// TODO: Once index connect is working, uncomment this line (and remove the
|
||||
// above .push line
|
||||
// result.push(Some(Self::connect_index_at(&header, column as Column).await?))
|
||||
result.push(Some(Self::connect_index_at(&header, column as Column).await?))
|
||||
} else {
|
||||
result.push(None)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue