781 lines
27 KiB
Rust
781 lines
27 KiB
Rust
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom};
|
|
use tokio::fs::{File, OpenOptions};
|
|
use tokio::fs;
|
|
use std::path::Path;
|
|
use std::marker::PhantomData;
|
|
use std::collections::{BTreeMap, HashSet};
|
|
|
|
use async_trait::async_trait;
|
|
|
|
use bincode;
|
|
use bincode::{Decode, Encode};
|
|
use crate::binary_coding::{encode, decode};
|
|
|
|
use crate::error::{Error, DecodeErrorKind};
|
|
use crate::segments::entry::{Entry, EntryDetailed};
|
|
use crate::segments::entry_header::{EntryHeaderWithDataSize, EntryHeader};
|
|
use crate::segments::store_header::StoreHeader;
|
|
use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME};
|
|
use crate::index::Index;
|
|
|
|
const GARBAGE_COLLECTION_TRIGGER: usize = 100;
|
|
|
|
// ===Concrete Cursors===
|
|
pub struct ReadCursor<'a, T> {
|
|
header: StoreHeader,
|
|
indexes: &'a [Option<Index<T, FilePosition>>],
|
|
file: File,
|
|
|
|
eof_file_position: FilePosition,
|
|
}
|
|
|
|
pub struct WriteCursor<'a, T> {
|
|
header: &'a mut StoreHeader,
|
|
indexes: &'a mut [Option<Index<T, FilePosition>>],
|
|
file: File,
|
|
|
|
eof_file_position: FilePosition,
|
|
}
|
|
|
|
// This is used as a cursor to temporary file during Garbage Collection
|
|
pub struct AppendOnlyCursor<T> {
|
|
header: StoreHeader,
|
|
file: File,
|
|
data_type: PhantomData<T>,
|
|
|
|
eof_file_position: FilePosition,
|
|
}
|
|
|
|
|
|
// ===Traits===
|
|
#[async_trait]
|
|
pub(crate) trait PrimitiveCursor<T> {
|
|
fn file(&mut self) -> &mut File;
|
|
fn eof_file_position(&self) -> FilePosition;
|
|
|
|
async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> {
|
|
self.file().read_exact(bytes).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn get_bytes(&mut self, count: usize) -> Result<Vec<u8>> {
|
|
let mut result: Vec<u8> = Vec::with_capacity(count);
|
|
self.read_bytes(&mut result).await?;
|
|
Ok(result)
|
|
}
|
|
|
|
async fn seek_to(&mut self, file_position: FilePosition) -> Result<FilePosition> {
|
|
let file_position = self.file().seek(SeekFrom::Start(file_position)).await?;
|
|
Ok(file_position)
|
|
}
|
|
|
|
// Start of the file i.e. the Header, not the entries.
|
|
async fn seek_to_start(&mut self) -> Result<FilePosition> {
|
|
let file_position = self.file().seek(SeekFrom::Start(0)).await?;
|
|
Ok(file_position)
|
|
}
|
|
|
|
async fn seek_to_end(&mut self) -> Result<FilePosition> {
|
|
let file_position = self.file().seek(SeekFrom::End(0)).await?;
|
|
Ok(file_position)
|
|
}
|
|
|
|
// Seeks from current position by offset and returns new file position
|
|
async fn seek_by(&mut self, offset: i64) -> Result<FilePosition> {
|
|
let file_position = self.file().seek(SeekFrom::Current(offset)).await?;
|
|
Ok(file_position)
|
|
}
|
|
|
|
async fn current_file_position(&mut self) -> Result<FilePosition> {
|
|
let next_file_position: FilePosition = self.file().stream_position().await?;
|
|
Ok(next_file_position)
|
|
}
|
|
|
|
async fn is_at_eof(&mut self) -> Result<bool> {
|
|
let current_file_position = self.current_file_position().await?;
|
|
let eof_file_position = self.eof_file_position();
|
|
Ok(current_file_position == eof_file_position)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
pub(crate) trait PrimitiveWriteCursor<T>: PrimitiveCursor<T> {
|
|
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
|
|
Ok(self.file().write(bytes).await?)
|
|
}
|
|
|
|
}
|
|
|
|
#[async_trait]
|
|
pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> {
|
|
fn header(&self) -> &StoreHeader;
|
|
|
|
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
|
|
self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await
|
|
}
|
|
|
|
async fn read_entry_header(&mut self) -> Result<EntryHeaderWithDataSize> {
|
|
let number_of_columns: usize = self.header().number_of_columns;
|
|
let mut header_bytes: Vec<u8> = vec![0; EntryHeaderWithDataSize::size(number_of_columns)];
|
|
self.read_bytes(&mut header_bytes).await?;
|
|
let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?;
|
|
|
|
Ok(header)
|
|
}
|
|
|
|
async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result<EntryHeaderWithDataSize> {
|
|
self.seek_to(file_position).await?;
|
|
self.read_entry_header().await
|
|
}
|
|
|
|
// Returns None when file_position == eof_file_position
|
|
async fn read_entry_at(&mut self, file_position: FilePosition) -> Result<Option<EntryDetailed<T>>>
|
|
where T: Decode
|
|
{
|
|
self.seek_to(file_position).await?;
|
|
self.next().await
|
|
}
|
|
|
|
// ===Iteration===
|
|
// The following functions assume that the current file position is at a valid entry or EOF.
|
|
|
|
|
|
// WARNING: This moves the file_position to start of the data, so you can't just call
|
|
// next_entry_header() a bunch of times. You must move the cursor!
|
|
async fn next_entry_header(&mut self) -> Result<Option<EntryHeaderWithDataSize>> {
|
|
if self.is_at_eof().await? {
|
|
return Ok(None)
|
|
}
|
|
|
|
let entry_header = self.read_entry_header().await?;
|
|
|
|
Ok(Some(entry_header))
|
|
}
|
|
|
|
// This is meant to be used after next_entry_header() is called.
|
|
async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result<FilePosition>{
|
|
let file_position = self.seek_by(entry_header.size_of_data() as i64).await?;
|
|
Ok(file_position)
|
|
}
|
|
|
|
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
|
|
where T: Decode
|
|
{
|
|
let file_position = self.current_file_position().await?;
|
|
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
|
|
|
|
let mut data_bytes: Vec<u8> = vec![0; entry_header.size_of_data()];
|
|
self.read_bytes(&mut data_bytes).await?;
|
|
let entry: EntryDetailed<T> =
|
|
EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?;
|
|
|
|
Ok(Some(entry))
|
|
}
|
|
|
|
// Like next, but only reads the column, not the whole entry.
|
|
async fn next_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
|
|
where T: Decode + Send
|
|
{
|
|
let file_position = self.current_file_position().await?;
|
|
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
|
|
let file_position_at_start_of_data = self.current_file_position().await?;
|
|
|
|
// figuring out how much to decode
|
|
let column_offset = entry_header.offset_of_column(column);
|
|
self.seek_by(column_offset as i64).await?;
|
|
|
|
// reading and decoding
|
|
let mut bytes: Vec<u8> = vec![0; entry_header.data_sizes[column as usize]];
|
|
self.read_bytes(&mut bytes).await?;
|
|
let (value, _) =
|
|
decode::<T>(&bytes[..])
|
|
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
|
|
|
|
// jumping to next entry
|
|
self.seek_to(file_position_at_start_of_data).await?;
|
|
self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?;
|
|
|
|
Ok(Some((entry_header, file_position, value)))
|
|
}
|
|
|
|
async fn next_alive_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
|
|
where T: Decode + Send
|
|
{
|
|
while let Some((header, file_position, t)) = self.next_at_column(column).await? {
|
|
if !header.is_deleted {
|
|
return Ok(Some((header, file_position, t)))
|
|
}
|
|
}
|
|
Ok(None)
|
|
}
|
|
|
|
async fn next_alive(&mut self) -> Result<Option<EntryDetailed<T>>>
|
|
where T: Decode
|
|
{
|
|
while let Some(entry) = self.next().await? {
|
|
if !entry.header.is_deleted {
|
|
return Ok(Some(entry))
|
|
}
|
|
}
|
|
Ok(None)
|
|
}
|
|
|
|
// ===Search===
|
|
async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
|
|
where T: Decode + PartialEq + Send + Sync
|
|
{
|
|
let mut file_position = self.current_file_position().await?;
|
|
while let Some((_, _, t)) = self.next_alive_at_column(column).await? {
|
|
if &t == t0 {
|
|
// go back and decode the whole entry
|
|
self.seek_to(file_position).await?;
|
|
return self.next().await
|
|
} else {
|
|
file_position = self.current_file_position().await?;
|
|
}
|
|
}
|
|
Ok(None)
|
|
}
|
|
|
|
async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Vec<EntryDetailed<T>>>
|
|
where T: Decode + PartialEq + Send + Sync
|
|
{
|
|
let mut entries = vec![];
|
|
while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? {
|
|
entries.push(entry)
|
|
}
|
|
Ok(entries)
|
|
}
|
|
|
|
// ===Debugging===
|
|
async fn read_entries(&mut self) -> Result<()>
|
|
where T: Decode + std::fmt::Debug
|
|
{
|
|
self.seek_to_start_of_data().await?;
|
|
while let Some(entry) = self.next().await? {
|
|
println!("{:?}", entry);
|
|
}
|
|
println!("END of entries.");
|
|
Ok(())
|
|
}
|
|
|
|
async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error> {
|
|
let mut bytes: Vec<u8> = vec![];
|
|
self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?;
|
|
self.file().read_to_end(&mut bytes).await?;
|
|
Ok(bytes)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
pub trait CursorWithWriteStoreHeader<T>: CursorWithStoreHeader<T> + PrimitiveWriteCursor<T> {
|
|
fn header_mut(&mut self) -> &mut StoreHeader;
|
|
fn set_eof_file_position(&mut self, new_file_position: FilePosition);
|
|
|
|
// ===Store Header Manipulation===
|
|
async fn increment_total_count(&mut self) -> Result<()>
|
|
where T: Send
|
|
{
|
|
self.seek_to_start().await?;
|
|
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
|
|
let new_count = self.header_mut().increment_total_count();
|
|
self.write_bytes(&encode::<usize>(&new_count)?).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn increment_deleted_count(&mut self) -> Result<()>
|
|
where T: Send
|
|
{
|
|
self.seek_to_start().await?;
|
|
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?;
|
|
let new_count = self.header_mut().increment_deleted_count();
|
|
self.write_bytes(&encode::<usize>(&new_count)?).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn set_header(&mut self, header: &StoreHeader) -> Result<()>
|
|
where T: Send
|
|
{
|
|
self.seek_to_start().await?;
|
|
let encoded_header: Vec<u8> = header.encode()?;
|
|
self.write_bytes(&encoded_header).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ===Append Entry===
|
|
|
|
// Moves cursor to the end.
|
|
// Returns file position to the start of the new entry.
|
|
async fn append_entry_no_indexing(&mut self, entry: &Entry<T>) -> Result<FilePosition>
|
|
where T: Encode + Send + Sync
|
|
{
|
|
self.increment_total_count().await?;
|
|
|
|
let encoded_entry: Vec<u8> = entry.encode()?;
|
|
let file_position = self.seek_to_end().await?;
|
|
self.write_bytes(&encoded_entry).await?;
|
|
|
|
let eof_file_position: FilePosition = self.current_file_position().await?;
|
|
self.set_eof_file_position(eof_file_position);
|
|
|
|
Ok(file_position)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
pub trait CursorWithAccessToIndex<T>: CursorWithStoreHeader<T> {
|
|
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>];
|
|
|
|
async fn index_lookup(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>>
|
|
where T: Encode + Decode + Ord + Send + Sync
|
|
{
|
|
match &self.indexes()[column as usize] {
|
|
Some(index) => {
|
|
let file_positions = index.lookup(value).await?.unwrap_or_else(|| HashSet::new());
|
|
let mut entries: Vec<EntryDetailed<T>> = vec![];
|
|
for &file_position in file_positions.iter() {
|
|
match self.read_entry_at(file_position).await? {
|
|
Some(entry) => {
|
|
entries.push(entry)
|
|
},
|
|
None => {
|
|
return Err(Error::IndexIsStoringEofFilePosition(column))
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(entries)
|
|
},
|
|
None =>
|
|
Err(Error::AttemptToIndexNonIndexableColumn(column))
|
|
}
|
|
}
|
|
|
|
async fn select_entries_where_eq(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>>
|
|
where T: Encode + Decode + Ord + Send + Sync
|
|
{
|
|
if self.header().is_column_indexed(column) {
|
|
self.index_lookup(column, value).await
|
|
} else {
|
|
self.find_all_eq_bruteforce(column, value).await
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
pub trait CursorWithWriteAccessToIndex<T>: CursorWithAccessToIndex<T> + CursorWithWriteStoreHeader<T> {
|
|
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>];
|
|
|
|
// Assumes that the column is indexable.
|
|
fn mut_index_at(&mut self, column: Column) -> &mut Index<T, FilePosition> {
|
|
match &mut self.indexes_mut()[column as usize] {
|
|
Some(index) => {
|
|
index
|
|
},
|
|
None => {
|
|
unreachable!()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Assumes that the column is indexable.
|
|
async fn insert_into_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()>
|
|
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
|
{
|
|
let index = self.mut_index_at(column as Column);
|
|
index.insert(value, file_position).await?;
|
|
Ok(())
|
|
}
|
|
|
|
// Assumes that the column is indexable.
|
|
async fn delete_from_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()>
|
|
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
|
{
|
|
let index = self.mut_index_at(column as Column);
|
|
index.delete(value, file_position).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn insert_entry(&mut self, entry: Entry<T>) -> Result<FilePosition>
|
|
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
|
{
|
|
let file_position = self.append_entry_no_indexing(&entry).await?;
|
|
|
|
// insert the indexable values of the entry into corresponding indexes.
|
|
for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() {
|
|
if should_index {
|
|
// SAFETY: If should_index is true, then the column is indexable.
|
|
self.insert_into_index(column as Column, value, file_position).await?
|
|
}
|
|
}
|
|
|
|
Ok(file_position)
|
|
}
|
|
|
|
async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed<T>) -> Result<()>
|
|
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
|
{
|
|
for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() {
|
|
if should_index {
|
|
// SAFETY: If should_index is true, then the column is indexable.
|
|
self.delete_from_index(column as Column, value, entry.file_position).await?
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
|
|
// ===========Implementations=============
|
|
// ===PrimitiveCursor===
|
|
impl <T>PrimitiveCursor<T> for ReadCursor<'_, T> {
|
|
fn file(&mut self) -> &mut File {
|
|
&mut self.file
|
|
}
|
|
|
|
fn eof_file_position(&self) -> FilePosition {
|
|
self.eof_file_position
|
|
}
|
|
}
|
|
|
|
impl <T>PrimitiveCursor<T> for WriteCursor<'_, T> {
|
|
fn file(&mut self) -> &mut File {
|
|
&mut self.file
|
|
}
|
|
|
|
fn eof_file_position(&self) -> FilePosition {
|
|
self.eof_file_position
|
|
}
|
|
}
|
|
|
|
impl <T>PrimitiveCursor<T> for AppendOnlyCursor<T> {
|
|
fn file(&mut self) -> &mut File {
|
|
&mut self.file
|
|
}
|
|
|
|
fn eof_file_position(&self) -> FilePosition {
|
|
self.eof_file_position
|
|
}
|
|
}
|
|
|
|
// ===PrimitiveCursor===
|
|
impl <T>PrimitiveWriteCursor<T> for WriteCursor<'_, T> {}
|
|
impl <T>PrimitiveWriteCursor<T> for AppendOnlyCursor<T> {}
|
|
|
|
|
|
// ===CursorWithStoreHeader===
|
|
impl <T>CursorWithStoreHeader<T> for ReadCursor<'_, T> {
|
|
fn header(&self) -> &StoreHeader { &self.header }
|
|
}
|
|
|
|
impl <T>CursorWithStoreHeader<T> for WriteCursor<'_, T> {
|
|
fn header(&self) -> &StoreHeader { &self.header }
|
|
}
|
|
|
|
impl <T>CursorWithStoreHeader<T> for AppendOnlyCursor<T> {
|
|
fn header(&self) -> &StoreHeader { &self.header }
|
|
}
|
|
|
|
// ===CursorWithWriteStoreHeader===
|
|
impl <T>CursorWithWriteStoreHeader<T> for WriteCursor<'_, T> {
|
|
fn header_mut(&mut self) -> &mut StoreHeader { self.header }
|
|
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
|
|
}
|
|
|
|
impl <T>CursorWithWriteStoreHeader<T> for AppendOnlyCursor<T> {
|
|
fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header }
|
|
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
|
|
}
|
|
|
|
// ===CursorWithAccessToIndex===
|
|
impl <T>CursorWithAccessToIndex<T> for ReadCursor<'_, T> {
|
|
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
|
|
}
|
|
|
|
impl <T>CursorWithAccessToIndex<T> for WriteCursor<'_, T> {
|
|
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
|
|
}
|
|
|
|
// ===CursorWithWriteAccessToIndex===
|
|
impl <T>CursorWithWriteAccessToIndex<T> for WriteCursor<'_, T> {
|
|
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>] { self.indexes }
|
|
}
|
|
|
|
|
|
impl <'cursor, T> ReadCursor<'cursor, T> {
|
|
pub async fn new<'store: 'cursor>(store: &'store Store<T>) -> Result<Self>
|
|
where T: Send + Sync
|
|
{
|
|
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
|
|
let file: File =
|
|
OpenOptions::new()
|
|
.read(true)
|
|
.open(path_to_rows)
|
|
.await?;
|
|
|
|
let mut cursor = Self {
|
|
header: store.header.clone(),
|
|
file,
|
|
indexes: &store.indexes,
|
|
|
|
eof_file_position: 0,
|
|
};
|
|
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
|
|
cursor.eof_file_position = eof_file_position;
|
|
|
|
cursor.seek_to_start_of_data().await?;
|
|
|
|
Ok(cursor)
|
|
}
|
|
}
|
|
|
|
|
|
|
|
impl <'cursor, T> WriteCursor<'cursor, T>
|
|
{
|
|
// 'store lives atleast as long as 'cursor
|
|
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
|
|
where T: Send
|
|
{
|
|
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
|
|
let file: File =
|
|
OpenOptions::new()
|
|
.read(true)
|
|
.write(true)
|
|
.open(path_to_rows)
|
|
.await?;
|
|
|
|
let mut cursor = Self {
|
|
header: &mut store.header,
|
|
file,
|
|
indexes: &mut store.indexes,
|
|
|
|
eof_file_position: 0,
|
|
};
|
|
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
|
|
cursor.eof_file_position = eof_file_position;
|
|
|
|
cursor.seek_to_start_of_data().await?;
|
|
|
|
Ok(cursor)
|
|
}
|
|
|
|
// ===Entry Header Manipulation===
|
|
// assumes we are at the start of valid entry.
|
|
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()>
|
|
where T: Send
|
|
{
|
|
let bytes: Vec<u8> = entry_header.encode()?;
|
|
self.write_bytes(&bytes).await?;
|
|
Ok(())
|
|
}
|
|
|
|
// ===Deletion===
|
|
pub async fn mark_deleted_at(&mut self, file_position: FilePosition, enable_garbage_collector: bool) -> Result<()>
|
|
where T: Encode + Decode + Ord + Send + Sync + Clone + Ord
|
|
{
|
|
self.seek_to(file_position).await?;
|
|
let mut entry_header = self.read_entry_header().await?;
|
|
if entry_header.is_deleted {
|
|
Ok(())
|
|
} else {
|
|
// Update store and entry headers
|
|
self.increment_deleted_count().await?;
|
|
self.seek_to(file_position).await?;
|
|
|
|
entry_header.is_deleted = true;
|
|
self.set_new_entry_header(entry_header.into()).await?;
|
|
|
|
// Update index
|
|
self.seek_to(file_position).await?;
|
|
match self.next().await? {
|
|
Some(entry) => {
|
|
self.delete_entry_values_from_indexes(entry).await?
|
|
},
|
|
None => {
|
|
// SAFETY: We just modified its header, so it must exist.
|
|
unreachable!()
|
|
}
|
|
}
|
|
|
|
if enable_garbage_collector {
|
|
self.attempt_garbage_collection_if_necessary().await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T, enable_garbage_collector: bool) -> Result<Option<EntryDetailed<T>>>
|
|
where T: Encode + Decode + Ord + Send + Sync + Clone
|
|
{
|
|
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
|
|
if let Some(entry) = maybe_entry {
|
|
self.mark_deleted_at(entry.file_position, enable_garbage_collector).await?;
|
|
Ok(Some(entry))
|
|
} else {
|
|
Ok(maybe_entry)
|
|
}
|
|
}
|
|
|
|
// Doesn't update indexes.
|
|
async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<usize>
|
|
where T: Encode + Decode + Ord + Send + Sync + Clone
|
|
{
|
|
let mut count = 0;
|
|
while let Some(_) = self.find_first_eq_bruteforce_and_delete(column, t0, false).await? {
|
|
count += 1;
|
|
}
|
|
Ok(count)
|
|
}
|
|
|
|
pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result<usize>
|
|
where T: Encode + Decode + Ord + Send + Sync + Clone
|
|
{
|
|
let count =
|
|
if self.header().is_column_indexed(column) {
|
|
let entries = self.index_lookup(column, value).await?;
|
|
let count = entries.len();
|
|
for entry in entries {
|
|
self.mark_deleted_at(entry.file_position, false).await?
|
|
}
|
|
count
|
|
} else {
|
|
let count = self.find_all_eq_bruteforce_and_delete(column, value).await?;
|
|
count
|
|
};
|
|
if enable_garbage_collector {
|
|
self.attempt_garbage_collection_if_necessary().await?;
|
|
}
|
|
Ok(count)
|
|
}
|
|
|
|
// ===Indexing===
|
|
// WARNING: Assumes the column is NOT indexable.
|
|
pub async fn attach_index(&mut self, column: Column) -> Result<()>
|
|
where T: Ord + Decode + Encode + Send + Sync
|
|
{
|
|
// New Index
|
|
let index = Store::create_empty_index_at(&self.header, column).await?;
|
|
self.indexes[column as usize] = Some(index);
|
|
|
|
// Mark column as indexed
|
|
self.header.make_column_indexed(column);
|
|
self.set_header(&self.header.clone()).await?;
|
|
|
|
// Build index
|
|
self.seek_to_start_of_data().await?;
|
|
while let Some((_, file_position, value)) = self.next_alive_at_column(column).await? {
|
|
self.insert_into_index(column, value, file_position).await?
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ===Garbage Collection===
|
|
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
|
|
where T: Send + Sync + Decode + Encode + Clone + Ord
|
|
{
|
|
if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER {
|
|
println!("=======START GARBAGE COLLECTOR====");
|
|
self.initiate_garbage_collection().await?;
|
|
println!("=======GARBAGE COLLECTOR FINISHED====");
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn initiate_garbage_collection(&mut self) -> Result<usize>
|
|
where T: Send + Sync + Decode + Encode + Clone + Ord
|
|
{
|
|
let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?;
|
|
// Since garbage collection changes FilePositions of live entries, we need to update the
|
|
// indexes too.
|
|
|
|
let mut in_memory_indexes: Vec<Option<BTreeMap<T, HashSet<FilePosition>>>> = Vec::with_capacity(self.header.number_of_columns);
|
|
for column in 0..self.header.number_of_columns {
|
|
if self.header.is_column_indexed(column as Column) {
|
|
let in_memory_index = BTreeMap::new();
|
|
in_memory_indexes.push(Some(in_memory_index))
|
|
} else {
|
|
in_memory_indexes.push(None)
|
|
}
|
|
}
|
|
|
|
// We'll dump all alive entries into a new file.
|
|
let mut entries_deleted = 0;
|
|
self.seek_to_start_of_data().await?;
|
|
{
|
|
while let Some(live_entry) = self.next_alive().await? {
|
|
entries_deleted += 1;
|
|
let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?;
|
|
|
|
// Update index. (Wouldn't it be nice if we had `for let ...`?)
|
|
for (maybe_in_memory_index, value) in in_memory_indexes.iter_mut().zip(&live_entry.data) {
|
|
if let Some(in_memory_index) = maybe_in_memory_index {
|
|
in_memory_index.entry(value.clone()).or_insert_with(HashSet::new).insert(file_position);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ===swap===
|
|
// swapping indexes
|
|
// Update indexes on disk.
|
|
for (column, maybe_in_memory_index) in in_memory_indexes.into_iter().enumerate() {
|
|
if let Some(in_memory_index) = maybe_in_memory_index {
|
|
let index = self.mut_index_at(column as Column);
|
|
index.reset(in_memory_index).await?;
|
|
}
|
|
}
|
|
|
|
// swapping headers
|
|
self.header.deleted_count = 0;
|
|
self.header.total_count = cursor_to_intermediate.header.total_count;
|
|
|
|
self.file = cursor_to_intermediate.file;
|
|
self.eof_file_position = cursor_to_intermediate.eof_file_position;
|
|
|
|
// swap files on disk
|
|
// current file
|
|
let path_to_table = Path::new(&self.header.table_folder);
|
|
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
|
let path_to_intermediate_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
|
|
fs::remove_file(path_to_rows.clone()).await?;
|
|
fs::rename(path_to_intermediate_rows, path_to_rows).await?;
|
|
|
|
Ok(entries_deleted)
|
|
}
|
|
|
|
async fn spawn_cursor_to_intermediate_file(&self) -> Result<AppendOnlyCursor<T>>
|
|
where T: Send
|
|
{
|
|
let table_folder = self.header.table_folder.to_string();
|
|
let path_to_table = Path::new(&table_folder);
|
|
let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
|
|
|
|
let intermediate_file: File = Store::<T>::create_empty_rows_file(path_to_rows, &self.header).await?;
|
|
|
|
let intermediate_header: StoreHeader = StoreHeader {
|
|
table_folder,
|
|
number_of_columns: self.header.number_of_columns,
|
|
deleted_count: 0,
|
|
total_count: 0,
|
|
primary_column: self.header.primary_column,
|
|
indexed_columns: self.header.indexed_columns.clone(),
|
|
};
|
|
|
|
// Creates a new (append) cursor to the intermediate file in which we'll dump the live entries.
|
|
let mut cursor_to_intermediate = AppendOnlyCursor {
|
|
header: intermediate_header,
|
|
file: intermediate_file,
|
|
data_type: PhantomData::<T>,
|
|
|
|
eof_file_position: 0,
|
|
};
|
|
let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?;
|
|
cursor_to_intermediate.eof_file_position = eof_file_position;
|
|
|
|
Ok(cursor_to_intermediate)
|
|
}
|
|
}
|
|
|