From ff378b1dd83bd06a4168c97685e7d95c3f4b3033 Mon Sep 17 00:00:00 2001 From: Yuriy Dupyn <2153100+omedusyo@users.noreply.github.com> Date: Sat, 3 Feb 2024 22:47:29 +0100 Subject: [PATCH] Add brute-force search --- storage_engine/src/cursor.rs | 100 ++++++++++++++++++++++++----- storage_engine/src/entry.rs | 4 +- storage_engine/src/entry_header.rs | 14 +++- storage_engine/src/main.rs | 48 ++++++++++---- 4 files changed, 135 insertions(+), 31 deletions(-) diff --git a/storage_engine/src/cursor.rs b/storage_engine/src/cursor.rs index 763db68..42584b7 100644 --- a/storage_engine/src/cursor.rs +++ b/storage_engine/src/cursor.rs @@ -2,17 +2,18 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom}; use tokio::fs::{File, OpenOptions}; use std::path::Path; use std::marker::PhantomData; +use crate::error::{Error, DecodeErrorKind}; use async_trait::async_trait; use bincode; use bincode::{Decode, Encode}; -use crate::binary_coding::encode; +use crate::binary_coding::{encode, decode}; use crate::entry::{Entry, EntryDetailed}; use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader}; use crate::store_header::StoreHeader; -use crate::storage_engine::{Store, FilePosition, Result, ROWS_FILE_NAME}; +use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME}; #[async_trait] // TODO: Make this private @@ -51,6 +52,12 @@ pub trait PrimitiveCursor { self.seek_to(StoreHeader::SIZE as u64).await } + // Seeks from current position by offset and returns new file position + async fn seek_by(&mut self, offset: i64) -> Result { + let file_position = self.file().seek(SeekFrom::Current(offset)).await?; + Ok(file_position) + } + async fn current_file_position(&mut self) -> Result { let next_file_position: FilePosition = self.file().stream_position().await?; Ok(next_file_position) @@ -59,7 +66,6 @@ pub trait PrimitiveCursor { async fn is_at_eof(&mut self) -> Result { let current_file_position = self.current_file_position().await?; let eof_file_position = self.eof_file_position(); - println!("IN EOF: current={}, eof_file_position={}", current_file_position, eof_file_position); Ok(current_file_position == eof_file_position) } } @@ -91,18 +97,31 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { } // ===Iteration=== - // Assumes that the current file position is at a valid entry or EOF. + // The following functions assume that the current file position is at a valid entry or EOF. + + + // WARNING: This moves the file_position to start of the data, so you can't just call + // next_entry_header() a bunch of times. You must move the cursor! + async fn next_entry_header(&mut self) -> Result> { + if self.is_at_eof().await? { + return Ok(None) + } + + let entry_header = self.read_entry_header().await?; + + Ok(Some(entry_header)) + } + + // This is meant to be used after next_entry_header() is called. + async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result{ + let file_position = self.seek_by(entry_header.size_of_data() as i64).await?; + Ok(file_position) + } + async fn next(&mut self) -> Result>> where T: Decode { - println!("are we at eof?"); - if self.is_at_eof().await? { - println!("YES"); - return Ok(None) - } - println!("NO"); - - let entry_header = self.read_entry_header().await?; + let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; let mut data_bytes: Vec = vec![0; entry_header.size_of_data()]; self.read_bytes(&mut data_bytes).await?; @@ -112,6 +131,59 @@ pub trait CursorWithStoreHeader: PrimitiveCursor { Ok(Some(entry)) } + // Like next, but only reads the column, not the whole entry. + async fn next_at_column(&mut self, column: Column) -> Result> + where T: Decode + Send + { + let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; + let file_position_at_start_of_data = self.current_file_position().await?; + + // figuring out how much to decode + let column_offset = entry_header.offset_of_column(column); + self.seek_by(column_offset as i64).await?; + + // reading and decoding + let mut bytes: Vec = vec![0; entry_header.data_sizes[column as usize]]; + self.read_bytes(&mut bytes).await?; + let (value, _) = + decode::(&bytes[..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; + + // jumping to next entry + self.seek_to(file_position_at_start_of_data).await?; + self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?; + + Ok(Some((entry_header, value))) + } + + async fn next_alive(&mut self) -> Result>> + where T: Decode + { + while let Some(entry) = self.next().await? { + if !entry.header.is_deleted { + return Ok(Some(entry)) + } + } + Ok(None) + } + + // ===Search=== + async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result>> + where T: Decode + PartialEq + Send + Sync + { + let mut file_position = self.current_file_position().await?; + while let Some((_, t)) = self.next_at_column(column).await? { + if &t == t0 { + // go back and decode the whole entry + self.seek_to(file_position).await?; + return self.next().await + } else { + file_position = self.current_file_position().await?; + } + } + Ok(None) + } + // ===Debugging=== async fn read_entries(&mut self) -> Result<()> where T: Decode + std::fmt::Debug @@ -301,14 +373,10 @@ impl <'cursor, T> WriteCursor<'cursor, T> { where T: Send { self.seek_to(file_position).await?; - println!("Attempting to read the entry"); let mut entry_header = self.read_entry_header().await?; - println!("Entry Header == {:?}", entry_header); if entry_header.is_deleted { - println!("Already deleted"); Ok(()) } else { - println!("Marking as deleted"); self.increment_deleted_count().await?; self.seek_to(file_position).await?; diff --git a/storage_engine/src/entry.rs b/storage_engine/src/entry.rs index 9d9de38..90d3dca 100644 --- a/storage_engine/src/entry.rs +++ b/storage_engine/src/entry.rs @@ -13,8 +13,8 @@ pub struct Entry { #[derive(Debug)] pub struct EntryDetailed { - header: EntryHeaderWithDataSize, - data: Vec, + pub header: EntryHeaderWithDataSize, + pub data: Vec, } impl Entry { diff --git a/storage_engine/src/entry_header.rs b/storage_engine/src/entry_header.rs index 4aa904c..7c8d626 100644 --- a/storage_engine/src/entry_header.rs +++ b/storage_engine/src/entry_header.rs @@ -1,5 +1,5 @@ use crate::binary_coding::{decode, encode, decode_sequence}; -use crate::storage_engine::Result; +use crate::storage_engine::{Result, Column}; use crate::error::{Error, DecodeErrorKind}; use std::mem::size_of; @@ -42,6 +42,18 @@ impl EntryHeaderWithDataSize { self.data_sizes.iter().sum() } + pub fn offset_of_column(&self, column: Column) -> usize { + let mut sum = 0; + for (i, size) in self.data_sizes.iter().enumerate() { + if i < column as usize { + sum += size; + } else { + break + } + } + sum + } + pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result { let (is_deleted, _) = decode::(&bytes) diff --git a/storage_engine/src/main.rs b/storage_engine/src/main.rs index d530bd1..cf32a88 100644 --- a/storage_engine/src/main.rs +++ b/storage_engine/src/main.rs @@ -108,20 +108,44 @@ async fn main() -> Result<()> { // println!("{:?}", store); // println!("{:?}", store.read_all_bytes().await?); - let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + { + let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; + // println!("{:?}", x); + } - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); - // let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; - // println!("{:?}", x); + { + let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + let column = 2; + let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + } + + { + let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; + let column = 3; + let t0 = 6; + let x = cursor.find_first_eq_bruteforce(column, &t0).await.map_err(|e| e.to_io_or_panic())?; + println!("{:?}", x); + } - cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?;