minisql/storage_engine/src/cursor.rs
2024-02-04 15:46:43 +01:00

481 lines
16 KiB
Rust

use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom};
use tokio::fs::{File, OpenOptions};
use std::path::Path;
use std::marker::PhantomData;
use crate::error::{Error, DecodeErrorKind};
use async_trait::async_trait;
use bincode;
use bincode::{Decode, Encode};
use crate::binary_coding::{encode, decode};
use crate::entry::{Entry, EntryDetailed};
use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader};
use crate::store_header::StoreHeader;
use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME};
// ===Concrete Cursors===
pub struct ReadCursor<T> {
header: StoreHeader,
file: File,
data_type: PhantomData<T>,
eof_file_position: FilePosition,
}
pub struct WriteCursor<'a, T> {
header: &'a mut StoreHeader,
file: File,
data_type: PhantomData<T>,
eof_file_position: FilePosition,
}
// ===Traits===
#[async_trait]
// TODO: Make this private
pub trait PrimitiveCursor<T> {
fn file(&mut self) -> &mut File;
fn eof_file_position(&self) -> FilePosition;
async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> {
self.file().read_exact(bytes).await?;
Ok(())
}
async fn get_bytes(&mut self, count: usize) -> Result<Vec<u8>> {
let mut result: Vec<u8> = Vec::with_capacity(count);
self.read_bytes(&mut result).await?;
Ok(result)
}
async fn seek_to(&mut self, file_position: FilePosition) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::Start(file_position)).await?;
Ok(file_position)
}
// Start of the file i.e. the Header, not the entries.
async fn seek_to_start(&mut self) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::Start(0)).await?;
Ok(file_position)
}
async fn seek_to_end(&mut self) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::End(0)).await?;
Ok(file_position)
}
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
self.seek_to(StoreHeader::SIZE as u64).await
}
// Seeks from current position by offset and returns new file position
async fn seek_by(&mut self, offset: i64) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::Current(offset)).await?;
Ok(file_position)
}
async fn current_file_position(&mut self) -> Result<FilePosition> {
let next_file_position: FilePosition = self.file().stream_position().await?;
Ok(next_file_position)
}
async fn is_at_eof(&mut self) -> Result<bool> {
let current_file_position = self.current_file_position().await?;
let eof_file_position = self.eof_file_position();
Ok(current_file_position == eof_file_position)
}
}
#[async_trait]
pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> {
fn header(&self) -> &StoreHeader;
async fn read_entry_header(&mut self) -> Result<EntryHeaderWithDataSize> {
let number_of_columns: usize = self.header().number_of_columns;
let mut header_bytes: Vec<u8> = vec![0; EntryHeaderWithDataSize::size(number_of_columns)];
self.read_bytes(&mut header_bytes).await?;
let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?;
Ok(header)
}
async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result<EntryHeaderWithDataSize> {
self.seek_to(file_position).await?;
self.read_entry_header().await
}
// Returns None when file_position == eof_file_position
async fn read_entry_at(&mut self, file_position: FilePosition) -> Result<Option<EntryDetailed<T>>>
where T: Decode
{
self.seek_to(file_position).await?;
self.next().await
}
// ===Iteration===
// The following functions assume that the current file position is at a valid entry or EOF.
// WARNING: This moves the file_position to start of the data, so you can't just call
// next_entry_header() a bunch of times. You must move the cursor!
async fn next_entry_header(&mut self) -> Result<Option<EntryHeaderWithDataSize>> {
if self.is_at_eof().await? {
return Ok(None)
}
let entry_header = self.read_entry_header().await?;
Ok(Some(entry_header))
}
// This is meant to be used after next_entry_header() is called.
async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result<FilePosition>{
let file_position = self.seek_by(entry_header.size_of_data() as i64).await?;
Ok(file_position)
}
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
where T: Decode
{
let file_position = self.current_file_position().await?;
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
let mut data_bytes: Vec<u8> = vec![0; entry_header.size_of_data()];
self.read_bytes(&mut data_bytes).await?;
let entry: EntryDetailed<T> =
EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?;
Ok(Some(entry))
}
// Like next, but only reads the column, not the whole entry.
async fn next_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, T)>>
where T: Decode + Send
{
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
let file_position_at_start_of_data = self.current_file_position().await?;
// figuring out how much to decode
let column_offset = entry_header.offset_of_column(column);
self.seek_by(column_offset as i64).await?;
// reading and decoding
let mut bytes: Vec<u8> = vec![0; entry_header.data_sizes[column as usize]];
self.read_bytes(&mut bytes).await?;
let (value, _) =
decode::<T>(&bytes[..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
// jumping to next entry
self.seek_to(file_position_at_start_of_data).await?;
self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?;
Ok(Some((entry_header, value)))
}
async fn next_alive(&mut self) -> Result<Option<EntryDetailed<T>>>
where T: Decode
{
while let Some(entry) = self.next().await? {
if !entry.header.is_deleted {
return Ok(Some(entry))
}
}
Ok(None)
}
// ===Search===
async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
where T: Decode + PartialEq + Send + Sync
{
let mut file_position = self.current_file_position().await?;
while let Some((_, t)) = self.next_at_column(column).await? {
if &t == t0 {
// go back and decode the whole entry
self.seek_to(file_position).await?;
return self.next().await
} else {
file_position = self.current_file_position().await?;
}
}
Ok(None)
}
// ===Debugging===
async fn read_entries(&mut self) -> Result<()>
where T: Decode + std::fmt::Debug
{
self.seek_to_start_of_data().await?;
while let Some(entry) = self.next().await? {
println!("{:?}", entry);
}
println!("END of entries.");
Ok(())
}
async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error> {
let mut bytes: Vec<u8> = vec![];
self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?;
self.file().read_to_end(&mut bytes).await?;
Ok(bytes)
}
}
// ===PrimitiveCursor===
impl <T>PrimitiveCursor<T> for ReadCursor<T> {
fn file(&mut self) -> &mut File {
&mut self.file
}
fn eof_file_position(&self) -> FilePosition {
self.eof_file_position
}
}
impl <T>PrimitiveCursor<T> for WriteCursor<'_, T> {
fn file(&mut self) -> &mut File {
&mut self.file
}
fn eof_file_position(&self) -> FilePosition {
self.eof_file_position
}
}
// ===CursorWithStoreHeader===
impl <T>CursorWithStoreHeader<T> for ReadCursor<T> {
fn header(&self) -> &StoreHeader {
&self.header
}
}
impl <T>CursorWithStoreHeader<T> for WriteCursor<'_, T> {
fn header(&self) -> &StoreHeader {
&self.header
}
}
impl <T> ReadCursor<T> {
pub async fn new(store: &Store<T>) -> Result<Self>
where T: Send
{
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
let file: File =
OpenOptions::new()
.read(true)
.open(path_to_rows)
.await?;
let mut cursor = Self {
header: store.header.clone(),
file,
data_type: store.data_type,
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
cursor.eof_file_position = eof_file_position;
cursor.seek_to_start_of_data().await?;
Ok(cursor)
}
pub async fn less_than_eq(&mut self, file_position0: FilePosition, file_position1: FilePosition) -> Result<bool> {
todo!()
}
}
impl <'cursor, T> WriteCursor<'cursor, T> {
// 'store lives atleast as long as 'cursor
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
where T: Send
{
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
let file: File =
OpenOptions::new()
.read(true)
.write(true)
.open(path_to_rows)
.await?;
let mut cursor = Self {
header: &mut store.header,
file,
data_type: store.data_type,
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
cursor.eof_file_position = eof_file_position;
cursor.seek_to_start_of_data().await?;
Ok(cursor)
}
pub async fn connect<'header: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader) -> Result<Self>
where T: Send
{
let file: File =
OpenOptions::new()
.read(true)
.write(true)
.open(path_to_rows)
.await?;
let mut cursor = Self {
header,
file,
data_type: PhantomData::<T>,
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
cursor.eof_file_position = eof_file_position;
cursor.seek_to_start_of_data().await?;
Ok(cursor)
}
// ===Primitive Operations===
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
Ok(self.file.write(bytes).await?)
}
// ===Store Header Manipulation===
async fn increment_total_count(&mut self) -> Result<()>
where T: Send
{
self.seek_to_start().await?;
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
let new_count = self.header.increment_total_count();
self.write_bytes(&encode::<usize>(&new_count)?).await?;
Ok(())
}
async fn increment_deleted_count(&mut self) -> Result<()>
where T: Send
{
self.seek_to_start().await?;
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?;
let new_count = self.header.increment_deleted_count();
self.write_bytes(&encode::<usize>(&new_count)?).await?;
Ok(())
}
// ===Entry Header Manipulation===
// assumes we are at the start of valid entry.
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> {
let bytes: Vec<u8> = entry_header.encode()?;
self.write_bytes(&bytes).await?;
Ok(())
}
// ===Append Entry===
// Moves cursor to the end.
// Returns file position to the start of the new entry.
pub async fn append_entry(&mut self, entry: &Entry<T>) -> Result<FilePosition>
where T: Encode + Send
{
self.increment_total_count().await?;
let encoded_entry: Vec<u8> = entry.encode()?;
let file_position = self.seek_to_end().await?;
self.write_bytes(&encoded_entry).await?;
let eof_file_position: FilePosition = self.current_file_position().await?;
self.eof_file_position = eof_file_position;
Ok(file_position)
}
// ===Deletion===
pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()>
where T: Send
{
self.seek_to(file_position).await?;
let mut entry_header = self.read_entry_header().await?;
if entry_header.is_deleted {
Ok(())
} else {
self.increment_deleted_count().await?;
self.seek_to(file_position).await?;
entry_header.is_deleted = true;
self.set_new_entry_header(entry_header.into()).await?;
self.attempt_garbage_collection_if_necessary().await?;
Ok(())
}
}
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
where T: Decode + PartialEq + Send + Sync
{
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
if let Some(entry) = maybe_entry {
self.mark_deleted_at(entry.file_position).await?;
Ok(Some(entry))
} else {
Ok(maybe_entry)
}
}
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> {
// TODO: What should be the policy? Counting size of garbage? Counting how many entries are
// garbage?
if self.header.deleted_count > 100 {
todo!()
} else {
Ok(())
}
}
async fn initiate_garbage_collection(&mut self) -> Result<usize>
where T: Send
{
let table_folder = self.header.table_folder.to_string();
let path_to_table = Path::new(&table_folder);
let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
let intermediate_file: File = Store::<T>::create_empty_rows_file(path_to_rows, &self.header).await?;
let mut intermediate_header: StoreHeader = StoreHeader {
table_folder,
number_of_columns: self.header.number_of_columns,
deleted_count: 0,
total_count: 0,
primary_column: self.header.primary_column
};
// Creates a new cursor to the intermediate file in which we'll dump the live entries.
// let mut cursor_to_intermediate = Self {
// header: &mut intermediate_header,
// file: intermediate_file,
// data_type: PhantomData::<T>,
// eof_file_position: 0,
// };
let mut cursor_to_intermediate: Self = todo!();
let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?;
cursor_to_intermediate.eof_file_position = eof_file_position;
// TODO: intermediate_header does not live long enough, so after garbage collection is
// done, we need to use it in the swap.
cursor_to_intermediate.header = todo!();
// In it there will be only the alive rows.
// Afterwards we swap the files, and delete the garbage.
todo!()
}
}