Add indexing to deletion
This commit is contained in:
parent
b0f05f36f2
commit
1086b2fc5e
2 changed files with 52 additions and 37 deletions
|
|
@ -400,7 +400,6 @@ pub trait CursorWithWriteAccessToIndex<T>: CursorWithAccessToIndex<T> + CursorWi
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn insert_entry(&mut self, entry: Entry<T>) -> Result<FilePosition>
|
async fn insert_entry(&mut self, entry: Entry<T>) -> Result<FilePosition>
|
||||||
// TODO: Why is 'async_trait necessary?
|
|
||||||
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||||
{
|
{
|
||||||
let file_position = self.append_entry_no_indexing(&entry).await?;
|
let file_position = self.append_entry_no_indexing(&entry).await?;
|
||||||
|
|
@ -415,6 +414,18 @@ pub trait CursorWithWriteAccessToIndex<T>: CursorWithAccessToIndex<T> + CursorWi
|
||||||
|
|
||||||
Ok(file_position)
|
Ok(file_position)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed<T>) -> Result<()>
|
||||||
|
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||||
|
{
|
||||||
|
for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() {
|
||||||
|
if should_index {
|
||||||
|
// SAFETY: If should_index is true, then the column is indexable.
|
||||||
|
self.delete_from_index(column as Column, value, entry.file_position).await?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -591,26 +602,39 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
||||||
|
|
||||||
// ===Deletion===
|
// ===Deletion===
|
||||||
pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()>
|
pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()>
|
||||||
where T: Send + Decode + Encode
|
where T: Encode + Decode + Ord + Send + Sync
|
||||||
{
|
{
|
||||||
self.seek_to(file_position).await?;
|
self.seek_to(file_position).await?;
|
||||||
let mut entry_header = self.read_entry_header().await?;
|
let mut entry_header = self.read_entry_header().await?;
|
||||||
if entry_header.is_deleted {
|
if entry_header.is_deleted {
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
|
// Update store and entry headers
|
||||||
self.increment_deleted_count().await?;
|
self.increment_deleted_count().await?;
|
||||||
self.seek_to(file_position).await?;
|
self.seek_to(file_position).await?;
|
||||||
|
|
||||||
entry_header.is_deleted = true;
|
entry_header.is_deleted = true;
|
||||||
self.set_new_entry_header(entry_header.into()).await?;
|
self.set_new_entry_header(entry_header.into()).await?;
|
||||||
|
|
||||||
|
// Update index
|
||||||
|
self.seek_to(file_position).await?;
|
||||||
|
match self.next().await? {
|
||||||
|
Some(entry) => {
|
||||||
|
self.delete_entry_values_from_indexes(entry).await?
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
// SAFETY: We just modified its header, so it must exist.
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
self.attempt_garbage_collection_if_necessary().await?;
|
self.attempt_garbage_collection_if_necessary().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
|
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
|
||||||
where T: Decode + Encode + PartialEq + Send + Sync
|
where T: Encode + Decode + Ord + Send + Sync
|
||||||
{
|
{
|
||||||
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
|
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
|
||||||
if let Some(entry) = maybe_entry {
|
if let Some(entry) = maybe_entry {
|
||||||
|
|
@ -621,6 +645,28 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ===Indexing===
|
||||||
|
// WARNING: Assumes the column is NOT indexable.
|
||||||
|
pub async fn attach_index(&mut self, column: Column) -> Result<()>
|
||||||
|
where T: Ord + Decode + Encode + Send + Sync
|
||||||
|
{
|
||||||
|
// New Index
|
||||||
|
let index = Store::create_empty_index_at(&self.header, column).await?;
|
||||||
|
self.indexes[column as usize] = Some(index);
|
||||||
|
|
||||||
|
// Mark column as indexed
|
||||||
|
self.header.make_column_indexed(column);
|
||||||
|
self.set_header(&self.header.clone()).await?;
|
||||||
|
|
||||||
|
// Build index
|
||||||
|
self.seek_to_start_of_data().await?;
|
||||||
|
while let Some((_, file_position, value)) = self.next_alive_at_column(column).await? {
|
||||||
|
self.insert_into_index(column, value, file_position).await?
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// ===Garbage Collection===
|
// ===Garbage Collection===
|
||||||
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
|
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
|
||||||
where T: Send + Decode + Encode
|
where T: Send + Decode + Encode
|
||||||
|
|
@ -701,34 +747,5 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
||||||
|
|
||||||
Ok(cursor_to_intermediate)
|
Ok(cursor_to_intermediate)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===Indexing===
|
|
||||||
// WARNING: Assumes the column is NOT indexable.
|
|
||||||
pub async fn attach_index(&mut self, column: Column) -> Result<()>
|
|
||||||
where T: Ord + Decode + Encode + Send + Sync
|
|
||||||
{
|
|
||||||
// New Index
|
|
||||||
let index = Store::create_empty_index_at(&self.header, column).await?;
|
|
||||||
self.indexes[column as usize] = Some(index);
|
|
||||||
|
|
||||||
// Mark column as indexed
|
|
||||||
self.header.make_column_indexed(column);
|
|
||||||
self.set_header(&self.header.clone()).await?;
|
|
||||||
|
|
||||||
// Build index
|
|
||||||
self.seek_to_start_of_data().await?;
|
|
||||||
while let Some((_, file_position, value)) = self.next_alive_at_column(column).await? {
|
|
||||||
self.insert_into_index(column, value, file_position).await?
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn delete_from_index(&mut self, t: T, file_position: FilePosition) -> Result<Option<FilePosition>>
|
|
||||||
where T: Encode + Decode + Ord + Send + Sync
|
|
||||||
{
|
|
||||||
// let x = self.primary_index.delete(t, file_position).await?;
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -166,9 +166,9 @@ async fn main() -> Result<()> {
|
||||||
|
|
||||||
{
|
{
|
||||||
let column = 1;
|
let column = 1;
|
||||||
println!("BUILDING AN INDEX");
|
// println!("BUILDING AN INDEX");
|
||||||
store.attach_index(column).await.map_err(|e| e.to_io_or_panic())?;
|
// store.attach_index(column).await.map_err(|e| e.to_io_or_panic())?;
|
||||||
println!("INDEX BUILT!");
|
// println!("INDEX BUILT!");
|
||||||
|
|
||||||
let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||||
let value = 2;
|
let value = 2;
|
||||||
|
|
@ -178,8 +178,6 @@ async fn main() -> Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// {
|
// {
|
||||||
// let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
// let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||||
// let column = 3;
|
// let column = 3;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue