minisql/storage_engine/src/cursor.rs
2024-02-05 21:50:42 +01:00

410 lines
14 KiB
Rust

use tokio::fs::{File, OpenOptions};
use tokio::fs;
use std::path::Path;
use std::marker::PhantomData;
use std::collections::{BTreeMap, HashSet};
use bincode;
use bincode::{Decode, Encode};
use crate::segments::entry::EntryDetailed;
use crate::segments::entry_header::EntryHeader;
use crate::segments::store_header::StoreHeader;
use crate::store::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME};
use crate::index::Index;
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
use crate::cursor_capabilities::traversal::CursorCanTraverse;
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex};
const GARBAGE_COLLECTION_TRIGGER: usize = 100;
// ===Concrete Cursors===
pub struct ReadCursor<'a, T> {
header: StoreHeader,
indexes: &'a [Option<Index<T, FilePosition>>],
file: File,
eof_file_position: FilePosition,
}
pub struct WriteCursor<'a, T> {
header: &'a mut StoreHeader,
indexes: &'a mut [Option<Index<T, FilePosition>>],
file: File,
eof_file_position: FilePosition,
}
// This is used as a cursor to temporary file during Garbage Collection
pub struct AppendOnlyCursor<T> {
header: StoreHeader,
file: File,
data_type: PhantomData<T>,
eof_file_position: FilePosition,
}
// ===========Implementations=============
// ===primitive capabilities===
impl <T>CursorCanRead<T> for ReadCursor<'_, T> {
fn file(&mut self) -> &mut File {
&mut self.file
}
fn eof_file_position(&self) -> FilePosition {
self.eof_file_position
}
}
impl <T>CursorCanRead<T> for WriteCursor<'_, T> {
fn file(&mut self) -> &mut File {
&mut self.file
}
fn eof_file_position(&self) -> FilePosition {
self.eof_file_position
}
}
impl <T>CursorCanRead<T> for AppendOnlyCursor<T> {
fn file(&mut self) -> &mut File {
&mut self.file
}
fn eof_file_position(&self) -> FilePosition {
self.eof_file_position
}
}
impl <T>CursorCanWrite<T> for WriteCursor<'_, T> {}
impl <T>CursorCanWrite<T> for AppendOnlyCursor<T> {}
// ===capability to access header===
impl <T>CursorCanTraverse<T> for ReadCursor<'_, T> {
fn header(&self) -> &StoreHeader { &self.header }
}
impl <T>CursorCanTraverse<T> for WriteCursor<'_, T> {
fn header(&self) -> &StoreHeader { &self.header }
}
impl <T>CursorCanTraverse<T> for AppendOnlyCursor<T> {
fn header(&self) -> &StoreHeader { &self.header }
}
impl <T>CursorCanModifyEntries<T> for WriteCursor<'_, T> {
fn header_mut(&mut self) -> &mut StoreHeader { self.header }
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
}
impl <T>CursorCanModifyEntries<T> for AppendOnlyCursor<T> {
fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header }
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
}
// ===capability to access index===
impl <T>CursorCanReadIndex<T> for ReadCursor<'_, T> {
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
}
impl <T>CursorCanReadIndex<T> for WriteCursor<'_, T> {
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
}
impl <T>CursorCanWriteToIndex<T> for WriteCursor<'_, T> {
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>] { self.indexes }
}
// ===Specifics===
impl <'cursor, T> ReadCursor<'cursor, T> {
pub async fn new<'store: 'cursor>(store: &'store Store<T>) -> Result<Self>
where T: Send + Sync
{
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
let file: File =
OpenOptions::new()
.read(true)
.open(path_to_rows)
.await?;
let mut cursor = Self {
header: store.header.clone(),
file,
indexes: &store.indexes,
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
cursor.eof_file_position = eof_file_position;
cursor.seek_to_start_of_data().await?;
Ok(cursor)
}
}
impl <'cursor, T> WriteCursor<'cursor, T>
{
// 'store lives atleast as long as 'cursor
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
where T: Send
{
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
let file: File =
OpenOptions::new()
.read(true)
.write(true)
.open(path_to_rows)
.await?;
let mut cursor = Self {
header: &mut store.header,
file,
indexes: &mut store.indexes,
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
cursor.eof_file_position = eof_file_position;
cursor.seek_to_start_of_data().await?;
Ok(cursor)
}
// ===Entry Header Manipulation===
// assumes we are at the start of valid entry.
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()>
where T: Send
{
let bytes: Vec<u8> = entry_header.encode()?;
self.write_bytes(&bytes).await?;
Ok(())
}
// ===Deletion===
pub async fn mark_deleted_at(&mut self, file_position: FilePosition, enable_garbage_collector: bool) -> Result<()>
where T: Encode + Decode + Ord + Send + Sync + Clone + Ord
{
self.seek_to(file_position).await?;
let mut entry_header = self.read_entry_header().await?;
if entry_header.is_deleted {
Ok(())
} else {
// Update store and entry headers
self.increment_deleted_count().await?;
self.seek_to(file_position).await?;
entry_header.is_deleted = true;
self.set_new_entry_header(entry_header.into()).await?;
// Update index
self.seek_to(file_position).await?;
match self.next().await? {
Some(entry) => {
self.delete_entry_values_from_indexes(entry).await?
},
None => {
// SAFETY: We just modified its header, so it must exist.
unreachable!()
}
}
if enable_garbage_collector {
self.attempt_garbage_collection_if_necessary().await?;
}
Ok(())
}
}
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T, enable_garbage_collector: bool) -> Result<Option<EntryDetailed<T>>>
where T: Encode + Decode + Ord + Send + Sync + Clone
{
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
if let Some(entry) = maybe_entry {
self.mark_deleted_at(entry.file_position, enable_garbage_collector).await?;
Ok(Some(entry))
} else {
Ok(maybe_entry)
}
}
// Doesn't update indexes.
async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<usize>
where T: Encode + Decode + Ord + Send + Sync + Clone
{
let mut count = 0;
while let Some(_) = self.find_first_eq_bruteforce_and_delete(column, t0, false).await? {
count += 1;
}
Ok(count)
}
pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result<usize>
where T: Encode + Decode + Ord + Send + Sync + Clone
{
let mut count = 0;
while let Some(entry) = self.next_alive().await? {
count += 1;
self.mark_deleted_at(entry.file_position, false).await?
}
if enable_garbage_collector {
self.attempt_garbage_collection_if_necessary().await?;
}
Ok(count)
}
pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result<usize>
where T: Encode + Decode + Ord + Send + Sync + Clone
{
let count =
if self.header().is_column_indexed(column) {
let entries = self.index_lookup(column, value).await?;
let count = entries.len();
for entry in entries {
self.mark_deleted_at(entry.file_position, false).await?
}
count
} else {
let count = self.find_all_eq_bruteforce_and_delete(column, value).await?;
count
};
if enable_garbage_collector {
self.attempt_garbage_collection_if_necessary().await?;
}
Ok(count)
}
// ===Indexing===
// WARNING: Assumes the column is NOT indexable.
pub async fn attach_index(&mut self, column: Column) -> Result<()>
where T: Ord + Decode + Encode + Send + Sync
{
// New Index
let index = Store::create_empty_index_at(&self.header, column).await?;
self.indexes[column as usize] = Some(index);
// Mark column as indexed
self.header.make_column_indexed(column);
self.set_header(&self.header.clone()).await?;
// Build index
self.seek_to_start_of_data().await?;
while let Some((_, file_position, value)) = self.next_alive_at_column(column).await? {
self.insert_into_index(column, value, file_position).await?
}
Ok(())
}
// ===Garbage Collection===
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
where T: Send + Sync + Decode + Encode + Clone + Ord
{
if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER {
println!("=======START GARBAGE COLLECTOR====");
self.initiate_garbage_collection().await?;
println!("=======GARBAGE COLLECTOR FINISHED====");
}
Ok(())
}
pub async fn initiate_garbage_collection(&mut self) -> Result<usize>
where T: Send + Sync + Decode + Encode + Clone + Ord
{
let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?;
// Since garbage collection changes FilePositions of live entries, we need to update the
// indexes too.
let mut in_memory_indexes: Vec<Option<BTreeMap<T, HashSet<FilePosition>>>> = Vec::with_capacity(self.header.number_of_columns);
for column in 0..self.header.number_of_columns {
if self.header.is_column_indexed(column as Column) {
let in_memory_index = BTreeMap::new();
in_memory_indexes.push(Some(in_memory_index))
} else {
in_memory_indexes.push(None)
}
}
// We'll dump all alive entries into a new file.
let mut entries_deleted = 0;
self.seek_to_start_of_data().await?;
{
while let Some(live_entry) = self.next_alive().await? {
entries_deleted += 1;
let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?;
// Update index. (Wouldn't it be nice if we had `for let ...`?)
for (maybe_in_memory_index, value) in in_memory_indexes.iter_mut().zip(&live_entry.data) {
if let Some(in_memory_index) = maybe_in_memory_index {
in_memory_index.entry(value.clone()).or_insert_with(HashSet::new).insert(file_position);
}
}
}
}
// ===swap===
// swapping indexes
// Update indexes on disk.
for (column, maybe_in_memory_index) in in_memory_indexes.into_iter().enumerate() {
if let Some(in_memory_index) = maybe_in_memory_index {
let index = self.mut_index_at(column as Column);
index.reset(in_memory_index).await?;
}
}
// swapping headers
self.header.deleted_count = 0;
self.header.total_count = cursor_to_intermediate.header.total_count;
self.file = cursor_to_intermediate.file;
self.eof_file_position = cursor_to_intermediate.eof_file_position;
// swap files on disk
// current file
let path_to_table = Path::new(&self.header.table_folder);
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
let path_to_intermediate_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
fs::remove_file(path_to_rows.clone()).await?;
fs::rename(path_to_intermediate_rows, path_to_rows).await?;
Ok(entries_deleted)
}
async fn spawn_cursor_to_intermediate_file(&self) -> Result<AppendOnlyCursor<T>>
where T: Send
{
let table_folder = self.header.table_folder.clone();
let path_to_table = Path::new(&table_folder);
let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
let intermediate_file: File = Store::<T>::create_empty_rows_file(path_to_rows, &self.header).await?;
let intermediate_header: StoreHeader = StoreHeader {
table_folder,
number_of_columns: self.header.number_of_columns,
deleted_count: 0,
total_count: 0,
primary_column: self.header.primary_column,
indexed_columns: self.header.indexed_columns.clone(),
};
// Creates a new (append) cursor to the intermediate file in which we'll dump the live entries.
let mut cursor_to_intermediate = AppendOnlyCursor {
header: intermediate_header,
file: intermediate_file,
data_type: PhantomData::<T>,
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?;
cursor_to_intermediate.eof_file_position = eof_file_position;
Ok(cursor_to_intermediate)
}
}