Split cursor capabilities
This commit is contained in:
parent
f18fd3a796
commit
edfecfa8d6
7 changed files with 438 additions and 411 deletions
230
storage_engine/src/cursor_capabilities/header_access.rs
Normal file
230
storage_engine/src/cursor_capabilities/header_access.rs
Normal file
|
|
@ -0,0 +1,230 @@
|
|||
use tokio::io::AsyncReadExt;
|
||||
use async_trait::async_trait;
|
||||
|
||||
use bincode;
|
||||
use bincode::{Decode, Encode};
|
||||
use crate::binary_coding::{encode, decode};
|
||||
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use crate::segments::entry::{Entry, EntryDetailed};
|
||||
use crate::segments::entry_header::EntryHeaderWithDataSize;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::storage_engine::{FilePosition, Column, Result};
|
||||
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanReadHeader<T>: CursorCanRead<T> {
|
||||
fn header(&self) -> &StoreHeader;
|
||||
|
||||
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
|
||||
self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await
|
||||
}
|
||||
|
||||
async fn read_entry_header(&mut self) -> Result<EntryHeaderWithDataSize> {
|
||||
let number_of_columns: usize = self.header().number_of_columns;
|
||||
let mut header_bytes: Vec<u8> = vec![0; EntryHeaderWithDataSize::size(number_of_columns)];
|
||||
self.read_bytes(&mut header_bytes).await?;
|
||||
let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?;
|
||||
|
||||
Ok(header)
|
||||
}
|
||||
|
||||
async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result<EntryHeaderWithDataSize> {
|
||||
self.seek_to(file_position).await?;
|
||||
self.read_entry_header().await
|
||||
}
|
||||
|
||||
// Returns None when file_position == eof_file_position
|
||||
async fn read_entry_at(&mut self, file_position: FilePosition) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode
|
||||
{
|
||||
self.seek_to(file_position).await?;
|
||||
self.next().await
|
||||
}
|
||||
|
||||
// ===Iteration===
|
||||
// The following functions assume that the current file position is at a valid entry or EOF.
|
||||
|
||||
|
||||
// WARNING: This moves the file_position to start of the data, so you can't just call
|
||||
// next_entry_header() a bunch of times. You must move the cursor!
|
||||
async fn next_entry_header(&mut self) -> Result<Option<EntryHeaderWithDataSize>> {
|
||||
if self.is_at_eof().await? {
|
||||
return Ok(None)
|
||||
}
|
||||
|
||||
let entry_header = self.read_entry_header().await?;
|
||||
|
||||
Ok(Some(entry_header))
|
||||
}
|
||||
|
||||
// This is meant to be used after next_entry_header() is called.
|
||||
async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result<FilePosition>{
|
||||
let file_position = self.seek_by(entry_header.size_of_data() as i64).await?;
|
||||
Ok(file_position)
|
||||
}
|
||||
|
||||
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode
|
||||
{
|
||||
let file_position = self.current_file_position().await?;
|
||||
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
|
||||
|
||||
let mut data_bytes: Vec<u8> = vec![0; entry_header.size_of_data()];
|
||||
self.read_bytes(&mut data_bytes).await?;
|
||||
let entry: EntryDetailed<T> =
|
||||
EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?;
|
||||
|
||||
Ok(Some(entry))
|
||||
}
|
||||
|
||||
// Like next, but only reads the column, not the whole entry.
|
||||
async fn next_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
|
||||
where T: Decode + Send
|
||||
{
|
||||
let file_position = self.current_file_position().await?;
|
||||
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
|
||||
let file_position_at_start_of_data = self.current_file_position().await?;
|
||||
|
||||
// figuring out how much to decode
|
||||
let column_offset = entry_header.offset_of_column(column);
|
||||
self.seek_by(column_offset as i64).await?;
|
||||
|
||||
// reading and decoding
|
||||
let mut bytes: Vec<u8> = vec![0; entry_header.data_sizes[column as usize]];
|
||||
self.read_bytes(&mut bytes).await?;
|
||||
let (value, _) =
|
||||
decode::<T>(&bytes[..])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
|
||||
|
||||
// jumping to next entry
|
||||
self.seek_to(file_position_at_start_of_data).await?;
|
||||
self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?;
|
||||
|
||||
Ok(Some((entry_header, file_position, value)))
|
||||
}
|
||||
|
||||
async fn next_alive_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
|
||||
where T: Decode + Send
|
||||
{
|
||||
while let Some((header, file_position, t)) = self.next_at_column(column).await? {
|
||||
if !header.is_deleted {
|
||||
return Ok(Some((header, file_position, t)))
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn next_alive(&mut self) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode
|
||||
{
|
||||
while let Some(entry) = self.next().await? {
|
||||
if !entry.header.is_deleted {
|
||||
return Ok(Some(entry))
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
// ===Search===
|
||||
async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode + PartialEq + Send + Sync
|
||||
{
|
||||
let mut file_position = self.current_file_position().await?;
|
||||
while let Some((_, _, t)) = self.next_alive_at_column(column).await? {
|
||||
if &t == t0 {
|
||||
// go back and decode the whole entry
|
||||
self.seek_to(file_position).await?;
|
||||
return self.next().await
|
||||
} else {
|
||||
file_position = self.current_file_position().await?;
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Vec<EntryDetailed<T>>>
|
||||
where T: Decode + PartialEq + Send + Sync
|
||||
{
|
||||
let mut entries = vec![];
|
||||
while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? {
|
||||
entries.push(entry)
|
||||
}
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
// ===Debugging===
|
||||
async fn read_entries(&mut self) -> Result<()>
|
||||
where T: Decode + std::fmt::Debug
|
||||
{
|
||||
self.seek_to_start_of_data().await?;
|
||||
while let Some(entry) = self.next().await? {
|
||||
println!("{:?}", entry);
|
||||
}
|
||||
println!("END of entries.");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error> {
|
||||
let mut bytes: Vec<u8> = vec![];
|
||||
self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?;
|
||||
self.file().read_to_end(&mut bytes).await?;
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanWriteHeader<T>: CursorCanReadHeader<T> + CursorCanWrite<T> {
|
||||
fn header_mut(&mut self) -> &mut StoreHeader;
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition);
|
||||
|
||||
// ===Store Header Manipulation===
|
||||
async fn increment_total_count(&mut self) -> Result<()>
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
|
||||
let new_count = self.header_mut().increment_total_count();
|
||||
self.write_bytes(&encode::<usize>(&new_count)?).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn increment_deleted_count(&mut self) -> Result<()>
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?;
|
||||
let new_count = self.header_mut().increment_deleted_count();
|
||||
self.write_bytes(&encode::<usize>(&new_count)?).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn set_header(&mut self, header: &StoreHeader) -> Result<()>
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
let encoded_header: Vec<u8> = header.encode()?;
|
||||
self.write_bytes(&encoded_header).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ===Append Entry===
|
||||
|
||||
// Moves cursor to the end.
|
||||
// Returns file position to the start of the new entry.
|
||||
async fn append_entry_no_indexing(&mut self, entry: &Entry<T>) -> Result<FilePosition>
|
||||
where T: Encode + Send + Sync
|
||||
{
|
||||
self.increment_total_count().await?;
|
||||
|
||||
let encoded_entry: Vec<u8> = entry.encode()?;
|
||||
let file_position = self.seek_to_end().await?;
|
||||
self.write_bytes(&encoded_entry).await?;
|
||||
|
||||
let eof_file_position: FilePosition = self.current_file_position().await?;
|
||||
self.set_eof_file_position(eof_file_position);
|
||||
|
||||
Ok(file_position)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue