This commit is contained in:
Yuriy Dupyn 2024-02-05 23:35:49 +01:00
parent c25c6edc6a
commit 588db169f8
8 changed files with 15 additions and 55 deletions

View file

@ -1,7 +1,5 @@
use bincode;
use bincode::config::{BigEndian, Configuration, Fixint}; use bincode::config::{BigEndian, Configuration, Fixint};
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
use std::mem::size_of;
const BIN_CONFIG: Configuration<BigEndian, Fixint> = bincode::config::standard() const BIN_CONFIG: Configuration<BigEndian, Fixint> = bincode::config::standard()
.with_big_endian() .with_big_endian()
@ -15,27 +13,6 @@ pub fn decode<T: Decode>(bytes: &[u8]) -> Result<(T, usize), bincode::error::Dec
bincode::decode_from_slice(bytes, BIN_CONFIG) bincode::decode_from_slice(bytes, BIN_CONFIG)
} }
pub fn encode_vector<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
let size: usize = ts.len();
let mut result = encode(&size)?;
for t in ts {
result.append(&mut encode(&t)?);
}
Ok(result)
}
pub fn decode_vector<T: Decode>(bytes: &[u8]) -> Result<Vec<T>, bincode::error::DecodeError> {
let mut offset = size_of::<usize>();
let result_len: usize = decode(&bytes[..offset])?.0;
let mut result: Vec<T> = Vec::with_capacity(result_len);
for _ in 0..result_len {
let (x, bytes_consumed) = decode::<T>(&bytes[offset..])?;
offset += bytes_consumed;
result.push(x);
}
Ok(result)
}
// We don't care about encoding the length here (since it will be used for a row with known column // We don't care about encoding the length here (since it will be used for a row with known column
// size) // size)
pub fn encode_sequence<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> { pub fn encode_sequence<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
@ -72,20 +49,3 @@ pub fn decode_sequence<T: Decode>(
} }
Ok(result) Ok(result)
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encoding_decoding() {
let xs: Vec<String> = vec!["foo".to_string(), "bar".to_string()];
let exs = encode_vector(&xs[..]).unwrap();
// WARNING: Don't forget to specify the type here
let dxs = decode_vector::<String>(&exs[..]).unwrap();
assert!(dxs == xs);
}
}

View file

@ -92,7 +92,7 @@ impl<T> CursorCanTraverse<T> for ReadCursor<'_, T> {
impl<T> CursorCanTraverse<T> for WriteCursor<'_, T> { impl<T> CursorCanTraverse<T> for WriteCursor<'_, T> {
fn header(&self) -> &StoreHeader { fn header(&self) -> &StoreHeader {
&self.header self.header
} }
} }
@ -123,13 +123,13 @@ impl<T> CursorCanModifyEntries<T> for AppendOnlyCursor<T> {
// ===capability to access index=== // ===capability to access index===
impl<T> CursorCanReadIndex<T> for ReadCursor<'_, T> { impl<T> CursorCanReadIndex<T> for ReadCursor<'_, T> {
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] {
&self.indexes self.indexes
} }
} }
impl<T> CursorCanReadIndex<T> for WriteCursor<'_, T> { impl<T> CursorCanReadIndex<T> for WriteCursor<'_, T> {
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] {
&self.indexes self.indexes
} }
} }
@ -266,9 +266,10 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
T: Encode + Decode + Ord + Send + Sync + Clone, T: Encode + Decode + Ord + Send + Sync + Clone,
{ {
let mut count = 0; let mut count = 0;
while let Some(_) = self while self
.find_first_eq_bruteforce_and_delete(column, t0, false) .find_first_eq_bruteforce_and_delete(column, t0, false)
.await? .await?
.is_some()
{ {
count += 1; count += 1;
} }
@ -326,7 +327,7 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
T: Ord + Decode + Encode + Send + Sync, T: Ord + Decode + Encode + Send + Sync,
{ {
// New Index // New Index
let index = Store::create_empty_index_at(&self.header, column).await?; let index = Store::create_empty_index_at(self.header, column).await?;
self.indexes[column as usize] = Some(index); self.indexes[column as usize] = Some(index);
// Mark column as indexed // Mark column as indexed
@ -436,7 +437,7 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
let intermediate_file: File = let intermediate_file: File =
Store::<T>::create_empty_rows_file(path_to_rows, &self.header).await?; Store::<T>::create_empty_rows_file(path_to_rows, self.header).await?;
let intermediate_header: StoreHeader = StoreHeader { let intermediate_header: StoreHeader = StoreHeader {
table_folder, table_folder,

View file

@ -4,7 +4,7 @@ use crate::binary_coding::encode;
use bincode; use bincode;
use bincode::Encode; use bincode::Encode;
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; use crate::cursor_capabilities::primitive::CursorCanWrite;
use crate::cursor_capabilities::traversal::CursorCanTraverse; use crate::cursor_capabilities::traversal::CursorCanTraverse;
use crate::segments::entry::Entry; use crate::segments::entry::Entry;
use crate::segments::store_header::StoreHeader; use crate::segments::store_header::StoreHeader;

View file

@ -22,7 +22,7 @@ pub trait CursorCanReadIndex<T>: CursorCanTraverse<T> {
{ {
match &self.indexes()[column as usize] { match &self.indexes()[column as usize] {
Some(index) => { Some(index) => {
let file_positions = index.lookup(value).await?.unwrap_or_else(|| HashSet::new()); let file_positions = index.lookup(value).await?.unwrap_or_else(HashSet::new);
let mut entries: Vec<EntryDetailed<T>> = vec![]; let mut entries: Vec<EntryDetailed<T>> = vec![];
for &file_position in file_positions.iter() { for &file_position in file_positions.iter() {
match self.read_entry_at(file_position).await? { match self.read_entry_at(file_position).await? {

View file

@ -5,7 +5,7 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
use crate::store::{FilePosition, Result}; use crate::store::{FilePosition, Result};
#[async_trait] #[async_trait]
pub(crate) trait CursorCanRead<T> { pub trait CursorCanRead<T> {
fn file(&mut self) -> &mut File; fn file(&mut self) -> &mut File;
fn eof_file_position(&self) -> FilePosition; fn eof_file_position(&self) -> FilePosition;
@ -55,7 +55,7 @@ pub(crate) trait CursorCanRead<T> {
} }
#[async_trait] #[async_trait]
pub(crate) trait CursorCanWrite<T>: CursorCanRead<T> { pub trait CursorCanWrite<T>: CursorCanRead<T> {
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> { async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
Ok(self.file().write(bytes).await?) Ok(self.file().write(bytes).await?)
} }

View file

@ -89,7 +89,7 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
entry_header, entry_header,
file_position, file_position,
self.header().number_of_columns, self.header().number_of_columns,
&mut data_bytes, &data_bytes,
)?; )?;
Ok(Some(entry)) Ok(Some(entry))

View file

@ -7,7 +7,6 @@ use std::hash::Hash;
use std::io::SeekFrom; use std::io::SeekFrom;
use crate::binary_coding::{decode, encode}; use crate::binary_coding::{decode, encode};
use bincode;
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
use crate::error::{DecodeErrorKind, Error}; use crate::error::{DecodeErrorKind, Error};
@ -63,7 +62,7 @@ where
Ok(()) Ok(())
} }
pub fn insert_desynced(&mut self, k: K, v: V) -> () { pub fn insert_desynced(&mut self, k: K, v: V) {
self.data.entry(k).or_insert_with(HashSet::new).insert(v); self.data.entry(k).or_insert_with(HashSet::new).insert(v);
} }

View file

@ -29,8 +29,8 @@ pub async fn store_exists(table_folder: &str) -> Result<bool> {
Ok(fs::metadata(table_folder).await.is_ok()) Ok(fs::metadata(table_folder).await.is_ok())
} }
pub const ROWS_FILE_NAME: &'static str = "rows"; pub const ROWS_FILE_NAME: &str = "rows";
pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate"; pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &str = "rows_intermediate";
impl<T> Store<T> { impl<T> Store<T> {
// ===Creation=== // ===Creation===