This commit is contained in:
Yuriy Dupyn 2024-02-05 17:39:38 +01:00
parent 167028a530
commit 84a880f9e6
7 changed files with 133 additions and 77 deletions

View file

@ -0,0 +1,175 @@
use tokio::io::AsyncReadExt;
use async_trait::async_trait;
use bincode;
use bincode::Decode;
use crate::binary_coding::decode;
use crate::error::{Error, DecodeErrorKind};
use crate::segments::entry::EntryDetailed;
use crate::segments::entry_header::EntryHeaderWithDataSize;
use crate::segments::store_header::StoreHeader;
use crate::store::{FilePosition, Column, Result};
use crate::cursor_capabilities::primitive::CursorCanRead;
#[async_trait]
pub trait CursorCanTraverse<T>: CursorCanRead<T> {
fn header(&self) -> &StoreHeader;
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await
}
async fn read_entry_header(&mut self) -> Result<EntryHeaderWithDataSize> {
let number_of_columns: usize = self.header().number_of_columns;
let mut header_bytes: Vec<u8> = vec![0; EntryHeaderWithDataSize::size(number_of_columns)];
self.read_bytes(&mut header_bytes).await?;
let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?;
Ok(header)
}
async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result<EntryHeaderWithDataSize> {
self.seek_to(file_position).await?;
self.read_entry_header().await
}
// Returns None when file_position == eof_file_position
async fn read_entry_at(&mut self, file_position: FilePosition) -> Result<Option<EntryDetailed<T>>>
where T: Decode
{
self.seek_to(file_position).await?;
self.next().await
}
// ===Iteration===
// The following functions assume that the current file position is at a valid entry or EOF.
// WARNING: This moves the file_position to start of the data, so you can't just call
// next_entry_header() a bunch of times. You must move the cursor!
async fn next_entry_header(&mut self) -> Result<Option<EntryHeaderWithDataSize>> {
if self.is_at_eof().await? {
return Ok(None)
}
let entry_header = self.read_entry_header().await?;
Ok(Some(entry_header))
}
// This is meant to be used after next_entry_header() is called.
async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result<FilePosition>{
let file_position = self.seek_by(entry_header.size_of_data() as i64).await?;
Ok(file_position)
}
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
where T: Decode
{
let file_position = self.current_file_position().await?;
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
let mut data_bytes: Vec<u8> = vec![0; entry_header.size_of_data()];
self.read_bytes(&mut data_bytes).await?;
let entry: EntryDetailed<T> =
EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?;
Ok(Some(entry))
}
// Like next, but only reads the column, not the whole entry.
async fn next_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
where T: Decode + Send
{
let file_position = self.current_file_position().await?;
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
let file_position_at_start_of_data = self.current_file_position().await?;
// figuring out how much to decode
let column_offset = entry_header.offset_of_column(column);
self.seek_by(column_offset as i64).await?;
// reading and decoding
let mut bytes: Vec<u8> = vec![0; entry_header.data_sizes[column as usize]];
self.read_bytes(&mut bytes).await?;
let (value, _) =
decode::<T>(&bytes[..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
// jumping to next entry
self.seek_to(file_position_at_start_of_data).await?;
self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?;
Ok(Some((entry_header, file_position, value)))
}
async fn next_alive_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
where T: Decode + Send
{
while let Some((header, file_position, t)) = self.next_at_column(column).await? {
if !header.is_deleted {
return Ok(Some((header, file_position, t)))
}
}
Ok(None)
}
async fn next_alive(&mut self) -> Result<Option<EntryDetailed<T>>>
where T: Decode
{
while let Some(entry) = self.next().await? {
if !entry.header.is_deleted {
return Ok(Some(entry))
}
}
Ok(None)
}
// ===Search===
async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
where T: Decode + PartialEq + Send + Sync
{
let mut file_position = self.current_file_position().await?;
while let Some((_, _, t)) = self.next_alive_at_column(column).await? {
if &t == t0 {
// go back and decode the whole entry
self.seek_to(file_position).await?;
return self.next().await
} else {
file_position = self.current_file_position().await?;
}
}
Ok(None)
}
async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Vec<EntryDetailed<T>>>
where T: Decode + PartialEq + Send + Sync
{
let mut entries = vec![];
while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? {
entries.push(entry)
}
Ok(entries)
}
// ===Debugging===
async fn read_entries(&mut self) -> Result<()>
where T: Decode + std::fmt::Debug
{
self.seek_to_start_of_data().await?;
while let Some(entry) = self.next().await? {
println!("{:?}", entry);
}
println!("END of entries.");
Ok(())
}
async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error> {
let mut bytes: Vec<u8> = vec![];
self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?;
self.file().read_to_end(&mut bytes).await?;
Ok(bytes)
}
}