Add brute-force search

This commit is contained in:
Yuriy Dupyn 2024-02-03 22:47:29 +01:00
parent a345bf99c6
commit ff378b1dd8
4 changed files with 135 additions and 31 deletions

View file

@ -2,17 +2,18 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom};
use tokio::fs::{File, OpenOptions}; use tokio::fs::{File, OpenOptions};
use std::path::Path; use std::path::Path;
use std::marker::PhantomData; use std::marker::PhantomData;
use crate::error::{Error, DecodeErrorKind};
use async_trait::async_trait; use async_trait::async_trait;
use bincode; use bincode;
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
use crate::binary_coding::encode; use crate::binary_coding::{encode, decode};
use crate::entry::{Entry, EntryDetailed}; use crate::entry::{Entry, EntryDetailed};
use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader}; use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader};
use crate::store_header::StoreHeader; use crate::store_header::StoreHeader;
use crate::storage_engine::{Store, FilePosition, Result, ROWS_FILE_NAME}; use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME};
#[async_trait] #[async_trait]
// TODO: Make this private // TODO: Make this private
@ -51,6 +52,12 @@ pub trait PrimitiveCursor<T> {
self.seek_to(StoreHeader::SIZE as u64).await self.seek_to(StoreHeader::SIZE as u64).await
} }
// Seeks from current position by offset and returns new file position
async fn seek_by(&mut self, offset: i64) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::Current(offset)).await?;
Ok(file_position)
}
async fn current_file_position(&mut self) -> Result<FilePosition> { async fn current_file_position(&mut self) -> Result<FilePosition> {
let next_file_position: FilePosition = self.file().stream_position().await?; let next_file_position: FilePosition = self.file().stream_position().await?;
Ok(next_file_position) Ok(next_file_position)
@ -59,7 +66,6 @@ pub trait PrimitiveCursor<T> {
async fn is_at_eof(&mut self) -> Result<bool> { async fn is_at_eof(&mut self) -> Result<bool> {
let current_file_position = self.current_file_position().await?; let current_file_position = self.current_file_position().await?;
let eof_file_position = self.eof_file_position(); let eof_file_position = self.eof_file_position();
println!("IN EOF: current={}, eof_file_position={}", current_file_position, eof_file_position);
Ok(current_file_position == eof_file_position) Ok(current_file_position == eof_file_position)
} }
} }
@ -91,18 +97,31 @@ pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> {
} }
// ===Iteration=== // ===Iteration===
// Assumes that the current file position is at a valid entry or EOF. // The following functions assume that the current file position is at a valid entry or EOF.
// WARNING: This moves the file_position to start of the data, so you can't just call
// next_entry_header() a bunch of times. You must move the cursor!
async fn next_entry_header(&mut self) -> Result<Option<EntryHeaderWithDataSize>> {
if self.is_at_eof().await? {
return Ok(None)
}
let entry_header = self.read_entry_header().await?;
Ok(Some(entry_header))
}
// This is meant to be used after next_entry_header() is called.
async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result<FilePosition>{
let file_position = self.seek_by(entry_header.size_of_data() as i64).await?;
Ok(file_position)
}
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>> async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
where T: Decode where T: Decode
{ {
println!("are we at eof?"); let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
if self.is_at_eof().await? {
println!("YES");
return Ok(None)
}
println!("NO");
let entry_header = self.read_entry_header().await?;
let mut data_bytes: Vec<u8> = vec![0; entry_header.size_of_data()]; let mut data_bytes: Vec<u8> = vec![0; entry_header.size_of_data()];
self.read_bytes(&mut data_bytes).await?; self.read_bytes(&mut data_bytes).await?;
@ -112,6 +131,59 @@ pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> {
Ok(Some(entry)) Ok(Some(entry))
} }
// Like next, but only reads the column, not the whole entry.
async fn next_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, T)>>
where T: Decode + Send
{
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
let file_position_at_start_of_data = self.current_file_position().await?;
// figuring out how much to decode
let column_offset = entry_header.offset_of_column(column);
self.seek_by(column_offset as i64).await?;
// reading and decoding
let mut bytes: Vec<u8> = vec![0; entry_header.data_sizes[column as usize]];
self.read_bytes(&mut bytes).await?;
let (value, _) =
decode::<T>(&bytes[..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
// jumping to next entry
self.seek_to(file_position_at_start_of_data).await?;
self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?;
Ok(Some((entry_header, value)))
}
async fn next_alive(&mut self) -> Result<Option<EntryDetailed<T>>>
where T: Decode
{
while let Some(entry) = self.next().await? {
if !entry.header.is_deleted {
return Ok(Some(entry))
}
}
Ok(None)
}
// ===Search===
async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
where T: Decode + PartialEq + Send + Sync
{
let mut file_position = self.current_file_position().await?;
while let Some((_, t)) = self.next_at_column(column).await? {
if &t == t0 {
// go back and decode the whole entry
self.seek_to(file_position).await?;
return self.next().await
} else {
file_position = self.current_file_position().await?;
}
}
Ok(None)
}
// ===Debugging=== // ===Debugging===
async fn read_entries(&mut self) -> Result<()> async fn read_entries(&mut self) -> Result<()>
where T: Decode + std::fmt::Debug where T: Decode + std::fmt::Debug
@ -301,14 +373,10 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
where T: Send where T: Send
{ {
self.seek_to(file_position).await?; self.seek_to(file_position).await?;
println!("Attempting to read the entry");
let mut entry_header = self.read_entry_header().await?; let mut entry_header = self.read_entry_header().await?;
println!("Entry Header == {:?}", entry_header);
if entry_header.is_deleted { if entry_header.is_deleted {
println!("Already deleted");
Ok(()) Ok(())
} else { } else {
println!("Marking as deleted");
self.increment_deleted_count().await?; self.increment_deleted_count().await?;
self.seek_to(file_position).await?; self.seek_to(file_position).await?;

View file

@ -13,8 +13,8 @@ pub struct Entry<T> {
#[derive(Debug)] #[derive(Debug)]
pub struct EntryDetailed<T> { pub struct EntryDetailed<T> {
header: EntryHeaderWithDataSize, pub header: EntryHeaderWithDataSize,
data: Vec<T>, pub data: Vec<T>,
} }
impl <T>Entry<T> { impl <T>Entry<T> {

View file

@ -1,5 +1,5 @@
use crate::binary_coding::{decode, encode, decode_sequence}; use crate::binary_coding::{decode, encode, decode_sequence};
use crate::storage_engine::Result; use crate::storage_engine::{Result, Column};
use crate::error::{Error, DecodeErrorKind}; use crate::error::{Error, DecodeErrorKind};
use std::mem::size_of; use std::mem::size_of;
@ -42,6 +42,18 @@ impl EntryHeaderWithDataSize {
self.data_sizes.iter().sum() self.data_sizes.iter().sum()
} }
pub fn offset_of_column(&self, column: Column) -> usize {
let mut sum = 0;
for (i, size) in self.data_sizes.iter().enumerate() {
if i < column as usize {
sum += size;
} else {
break
}
}
sum
}
pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result<Self> { pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result<Self> {
let (is_deleted, _) = let (is_deleted, _) =
decode::<bool>(&bytes) decode::<bool>(&bytes)

View file

@ -108,20 +108,44 @@ async fn main() -> Result<()> {
// println!("{:?}", store); // println!("{:?}", store);
// println!("{:?}", store.read_all_bytes().await?); // println!("{:?}", store.read_all_bytes().await?);
{
let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x);
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x);
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x);
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x);
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x);
cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?;
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x);
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x);
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x);
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x);
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x);
}
{
let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
let column = 2;
let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?;
println!("{:?}", x);
let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?;
println!("{:?}", x);
let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?;
println!("{:?}", x);
let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?;
println!("{:?}", x);
let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?;
println!("{:?}", x);
}
{
let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
let column = 3;
let t0 = 6;
let x = cursor.find_first_eq_bruteforce(column, &t0).await.map_err(|e| e.to_io_or_panic())?;
println!("{:?}", x);
}