formatting
This commit is contained in:
parent
ad98cfafb2
commit
c25c6edc6a
29 changed files with 886 additions and 571 deletions
|
|
@ -1,9 +1,11 @@
|
|||
use bincode;
|
||||
use bincode::{Decode, Encode};
|
||||
use bincode::config::{BigEndian, Configuration, Fixint};
|
||||
use bincode::{Decode, Encode};
|
||||
use std::mem::size_of;
|
||||
|
||||
const BIN_CONFIG: Configuration<BigEndian, Fixint> = bincode::config::standard().with_big_endian().with_fixed_int_encoding();
|
||||
const BIN_CONFIG: Configuration<BigEndian, Fixint> = bincode::config::standard()
|
||||
.with_big_endian()
|
||||
.with_fixed_int_encoding();
|
||||
|
||||
pub fn encode<T: Encode>(t: &T) -> Result<Vec<u8>, bincode::error::EncodeError> {
|
||||
bincode::encode_to_vec(t, BIN_CONFIG)
|
||||
|
|
@ -40,24 +42,27 @@ pub fn encode_sequence<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::E
|
|||
let mut result = vec![];
|
||||
for t in ts {
|
||||
result.append(&mut encode(&t)?);
|
||||
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn encode_sequence_with_sizes<T: Encode>(ts: &[T]) -> Result<(Vec<u8>, Vec<usize>), bincode::error::EncodeError> {
|
||||
pub fn encode_sequence_with_sizes<T: Encode>(
|
||||
ts: &[T],
|
||||
) -> Result<(Vec<u8>, Vec<usize>), bincode::error::EncodeError> {
|
||||
let mut result_bytes = vec![];
|
||||
let mut sizes = Vec::with_capacity(ts.len());
|
||||
for t in ts {
|
||||
let mut bytes = encode(&t)?;
|
||||
sizes.push(bytes.len());
|
||||
result_bytes.append(&mut bytes);
|
||||
|
||||
}
|
||||
Ok((result_bytes, sizes))
|
||||
}
|
||||
|
||||
pub fn decode_sequence<T: Decode>(len: usize, bytes: &[u8]) -> Result<Vec<T>, bincode::error::DecodeError> {
|
||||
pub fn decode_sequence<T: Decode>(
|
||||
len: usize,
|
||||
bytes: &[u8],
|
||||
) -> Result<Vec<T>, bincode::error::DecodeError> {
|
||||
let mut result: Vec<T> = Vec::with_capacity(len);
|
||||
let mut offset = 0;
|
||||
for _ in 0..len {
|
||||
|
|
@ -68,7 +73,6 @@ pub fn decode_sequence<T: Decode>(len: usize, bytes: &[u8]) -> Result<Vec<T>, bi
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
|||
|
|
@ -1,21 +1,24 @@
|
|||
use tokio::fs::{File, OpenOptions};
|
||||
use tokio::fs;
|
||||
use std::path::Path;
|
||||
use std::marker::PhantomData;
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
use std::marker::PhantomData;
|
||||
use std::path::Path;
|
||||
use tokio::fs;
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
|
||||
use bincode;
|
||||
use bincode::{Decode, Encode};
|
||||
|
||||
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
|
||||
use crate::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex};
|
||||
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::index::Index;
|
||||
use crate::segments::entry::EntryDetailed;
|
||||
use crate::segments::entry_header::EntryHeader;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::store::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME};
|
||||
use crate::index::Index;
|
||||
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
|
||||
use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex};
|
||||
use crate::store::{
|
||||
Column, FilePosition, Result, Store, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME,
|
||||
ROWS_FILE_NAME,
|
||||
};
|
||||
|
||||
const GARBAGE_COLLECTION_TRIGGER: usize = 100;
|
||||
|
||||
|
|
@ -45,10 +48,9 @@ pub struct AppendOnlyCursor<T> {
|
|||
eof_file_position: FilePosition,
|
||||
}
|
||||
|
||||
|
||||
// ===========Implementations=============
|
||||
// ===primitive capabilities===
|
||||
impl <T>CursorCanRead<T> for ReadCursor<'_, T> {
|
||||
impl<T> CursorCanRead<T> for ReadCursor<'_, T> {
|
||||
fn file(&mut self) -> &mut File {
|
||||
&mut self.file
|
||||
}
|
||||
|
|
@ -58,7 +60,7 @@ impl <T>CursorCanRead<T> for ReadCursor<'_, T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl <T>CursorCanRead<T> for WriteCursor<'_, T> {
|
||||
impl<T> CursorCanRead<T> for WriteCursor<'_, T> {
|
||||
fn file(&mut self) -> &mut File {
|
||||
&mut self.file
|
||||
}
|
||||
|
|
@ -68,7 +70,7 @@ impl <T>CursorCanRead<T> for WriteCursor<'_, T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl <T>CursorCanRead<T> for AppendOnlyCursor<T> {
|
||||
impl<T> CursorCanRead<T> for AppendOnlyCursor<T> {
|
||||
fn file(&mut self) -> &mut File {
|
||||
&mut self.file
|
||||
}
|
||||
|
|
@ -78,108 +80,123 @@ impl <T>CursorCanRead<T> for AppendOnlyCursor<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl <T>CursorCanWrite<T> for WriteCursor<'_, T> {}
|
||||
impl <T>CursorCanWrite<T> for AppendOnlyCursor<T> {}
|
||||
|
||||
impl<T> CursorCanWrite<T> for WriteCursor<'_, T> {}
|
||||
impl<T> CursorCanWrite<T> for AppendOnlyCursor<T> {}
|
||||
|
||||
// ===capability to access header===
|
||||
impl <T>CursorCanTraverse<T> for ReadCursor<'_, T> {
|
||||
fn header(&self) -> &StoreHeader { &self.header }
|
||||
impl<T> CursorCanTraverse<T> for ReadCursor<'_, T> {
|
||||
fn header(&self) -> &StoreHeader {
|
||||
&self.header
|
||||
}
|
||||
}
|
||||
|
||||
impl <T>CursorCanTraverse<T> for WriteCursor<'_, T> {
|
||||
fn header(&self) -> &StoreHeader { &self.header }
|
||||
impl<T> CursorCanTraverse<T> for WriteCursor<'_, T> {
|
||||
fn header(&self) -> &StoreHeader {
|
||||
&self.header
|
||||
}
|
||||
}
|
||||
|
||||
impl <T>CursorCanTraverse<T> for AppendOnlyCursor<T> {
|
||||
fn header(&self) -> &StoreHeader { &self.header }
|
||||
impl<T> CursorCanTraverse<T> for AppendOnlyCursor<T> {
|
||||
fn header(&self) -> &StoreHeader {
|
||||
&self.header
|
||||
}
|
||||
}
|
||||
|
||||
impl <T>CursorCanModifyEntries<T> for WriteCursor<'_, T> {
|
||||
fn header_mut(&mut self) -> &mut StoreHeader { self.header }
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
|
||||
impl<T> CursorCanModifyEntries<T> for WriteCursor<'_, T> {
|
||||
fn header_mut(&mut self) -> &mut StoreHeader {
|
||||
self.header
|
||||
}
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition) {
|
||||
self.eof_file_position = new_file_position
|
||||
}
|
||||
}
|
||||
|
||||
impl <T>CursorCanModifyEntries<T> for AppendOnlyCursor<T> {
|
||||
fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header }
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
|
||||
impl<T> CursorCanModifyEntries<T> for AppendOnlyCursor<T> {
|
||||
fn header_mut(&mut self) -> &mut StoreHeader {
|
||||
&mut self.header
|
||||
}
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition) {
|
||||
self.eof_file_position = new_file_position
|
||||
}
|
||||
}
|
||||
|
||||
// ===capability to access index===
|
||||
impl <T>CursorCanReadIndex<T> for ReadCursor<'_, T> {
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
|
||||
}
|
||||
|
||||
impl <T>CursorCanReadIndex<T> for WriteCursor<'_, T> {
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
|
||||
}
|
||||
|
||||
impl <T>CursorCanWriteToIndex<T> for WriteCursor<'_, T> {
|
||||
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>] { self.indexes }
|
||||
}
|
||||
|
||||
|
||||
// ===Specifics===
|
||||
impl <'cursor, T> ReadCursor<'cursor, T> {
|
||||
pub async fn new<'store: 'cursor>(store: &'store Store<T>) -> Result<Self>
|
||||
where T: Send + Sync
|
||||
{
|
||||
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
|
||||
let file: File =
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.open(path_to_rows)
|
||||
.await?;
|
||||
|
||||
let mut cursor = Self {
|
||||
header: store.header.clone(),
|
||||
file,
|
||||
indexes: &store.indexes,
|
||||
|
||||
eof_file_position: 0,
|
||||
};
|
||||
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
|
||||
cursor.eof_file_position = eof_file_position;
|
||||
|
||||
cursor.seek_to_start_of_data().await?;
|
||||
|
||||
Ok(cursor)
|
||||
impl<T> CursorCanReadIndex<T> for ReadCursor<'_, T> {
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] {
|
||||
&self.indexes
|
||||
}
|
||||
}
|
||||
|
||||
impl <'cursor, T> WriteCursor<'cursor, T>
|
||||
{
|
||||
// 'store lives atleast as long as 'cursor
|
||||
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
|
||||
where T: Send
|
||||
impl<T> CursorCanReadIndex<T> for WriteCursor<'_, T> {
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] {
|
||||
&self.indexes
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> CursorCanWriteToIndex<T> for WriteCursor<'_, T> {
|
||||
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>] {
|
||||
self.indexes
|
||||
}
|
||||
}
|
||||
|
||||
// ===Specifics===
|
||||
impl<'cursor, T> ReadCursor<'cursor, T> {
|
||||
pub async fn new<'store: 'cursor>(store: &'store Store<T>) -> Result<Self>
|
||||
where
|
||||
T: Send + Sync,
|
||||
{
|
||||
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
|
||||
let file: File =
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(path_to_rows)
|
||||
.await?;
|
||||
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
|
||||
let file: File = OpenOptions::new().read(true).open(path_to_rows).await?;
|
||||
|
||||
let mut cursor = Self {
|
||||
header: &mut store.header,
|
||||
file,
|
||||
indexes: &mut store.indexes,
|
||||
let mut cursor = Self {
|
||||
header: store.header.clone(),
|
||||
file,
|
||||
indexes: &store.indexes,
|
||||
|
||||
eof_file_position: 0,
|
||||
};
|
||||
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
|
||||
cursor.eof_file_position = eof_file_position;
|
||||
eof_file_position: 0,
|
||||
};
|
||||
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
|
||||
cursor.eof_file_position = eof_file_position;
|
||||
|
||||
cursor.seek_to_start_of_data().await?;
|
||||
cursor.seek_to_start_of_data().await?;
|
||||
|
||||
Ok(cursor)
|
||||
Ok(cursor)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'cursor, T> WriteCursor<'cursor, T> {
|
||||
// 'store lives atleast as long as 'cursor
|
||||
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
|
||||
where
|
||||
T: Send,
|
||||
{
|
||||
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
|
||||
let file: File = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(path_to_rows)
|
||||
.await?;
|
||||
|
||||
let mut cursor = Self {
|
||||
header: &mut store.header,
|
||||
file,
|
||||
indexes: &mut store.indexes,
|
||||
|
||||
eof_file_position: 0,
|
||||
};
|
||||
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
|
||||
cursor.eof_file_position = eof_file_position;
|
||||
|
||||
cursor.seek_to_start_of_data().await?;
|
||||
|
||||
Ok(cursor)
|
||||
}
|
||||
|
||||
// ===Entry Header Manipulation===
|
||||
// assumes we are at the start of valid entry.
|
||||
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()>
|
||||
where T: Send
|
||||
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()>
|
||||
where
|
||||
T: Send,
|
||||
{
|
||||
let bytes: Vec<u8> = entry_header.encode()?;
|
||||
self.write_bytes(&bytes).await?;
|
||||
|
|
@ -187,8 +204,13 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
}
|
||||
|
||||
// ===Deletion===
|
||||
pub async fn mark_deleted_at(&mut self, file_position: FilePosition, enable_garbage_collector: bool) -> Result<()>
|
||||
where T: Encode + Decode + Ord + Send + Sync + Clone + Ord
|
||||
pub async fn mark_deleted_at(
|
||||
&mut self,
|
||||
file_position: FilePosition,
|
||||
enable_garbage_collector: bool,
|
||||
) -> Result<()>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + Clone + Ord,
|
||||
{
|
||||
self.seek_to(file_position).await?;
|
||||
let mut entry_header = self.read_entry_header().await?;
|
||||
|
|
@ -205,9 +227,7 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
// Update index
|
||||
self.seek_to(file_position).await?;
|
||||
match self.next().await? {
|
||||
Some(entry) => {
|
||||
self.delete_entry_values_from_indexes(entry).await?
|
||||
},
|
||||
Some(entry) => self.delete_entry_values_from_indexes(entry).await?,
|
||||
None => {
|
||||
// SAFETY: We just modified its header, so it must exist.
|
||||
unreachable!()
|
||||
|
|
@ -221,12 +241,19 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
}
|
||||
}
|
||||
|
||||
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T, enable_garbage_collector: bool) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Encode + Decode + Ord + Send + Sync + Clone
|
||||
async fn find_first_eq_bruteforce_and_delete(
|
||||
&mut self,
|
||||
column: Column,
|
||||
t0: &T,
|
||||
enable_garbage_collector: bool,
|
||||
) -> Result<Option<EntryDetailed<T>>>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + Clone,
|
||||
{
|
||||
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
|
||||
if let Some(entry) = maybe_entry {
|
||||
self.mark_deleted_at(entry.file_position, enable_garbage_collector).await?;
|
||||
self.mark_deleted_at(entry.file_position, enable_garbage_collector)
|
||||
.await?;
|
||||
Ok(Some(entry))
|
||||
} else {
|
||||
Ok(maybe_entry)
|
||||
|
|
@ -234,18 +261,23 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
}
|
||||
|
||||
// Doesn't update indexes.
|
||||
async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<usize>
|
||||
where T: Encode + Decode + Ord + Send + Sync + Clone
|
||||
async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<usize>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + Clone,
|
||||
{
|
||||
let mut count = 0;
|
||||
while let Some(_) = self.find_first_eq_bruteforce_and_delete(column, t0, false).await? {
|
||||
while let Some(_) = self
|
||||
.find_first_eq_bruteforce_and_delete(column, t0, false)
|
||||
.await?
|
||||
{
|
||||
count += 1;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result<usize>
|
||||
where T: Encode + Decode + Ord + Send + Sync + Clone
|
||||
pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result<usize>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + Clone,
|
||||
{
|
||||
let mut count = 0;
|
||||
while let Some(entry) = self.next_alive().await? {
|
||||
|
|
@ -259,21 +291,28 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
Ok(count)
|
||||
}
|
||||
|
||||
pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result<usize>
|
||||
where T: Encode + Decode + Ord + Send + Sync + Clone
|
||||
pub async fn delete_entries_where_eq(
|
||||
&mut self,
|
||||
column: Column,
|
||||
value: &T,
|
||||
enable_garbage_collector: bool,
|
||||
) -> Result<usize>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + Clone,
|
||||
{
|
||||
let count =
|
||||
if self.header().is_column_indexed(column) {
|
||||
let entries = self.index_lookup(column, value).await?;
|
||||
let count = entries.len();
|
||||
for entry in entries {
|
||||
self.mark_deleted_at(entry.file_position, false).await?
|
||||
}
|
||||
count
|
||||
} else {
|
||||
let count = self.find_all_eq_bruteforce_and_delete(column, value).await?;
|
||||
count
|
||||
};
|
||||
let count = if self.header().is_column_indexed(column) {
|
||||
let entries = self.index_lookup(column, value).await?;
|
||||
let count = entries.len();
|
||||
for entry in entries {
|
||||
self.mark_deleted_at(entry.file_position, false).await?
|
||||
}
|
||||
count
|
||||
} else {
|
||||
let count = self
|
||||
.find_all_eq_bruteforce_and_delete(column, value)
|
||||
.await?;
|
||||
count
|
||||
};
|
||||
if enable_garbage_collector {
|
||||
self.attempt_garbage_collection_if_necessary().await?;
|
||||
}
|
||||
|
|
@ -282,8 +321,9 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
|
||||
// ===Indexing===
|
||||
// WARNING: Assumes the column is NOT indexable.
|
||||
pub async fn attach_index(&mut self, column: Column) -> Result<()>
|
||||
where T: Ord + Decode + Encode + Send + Sync
|
||||
pub async fn attach_index(&mut self, column: Column) -> Result<()>
|
||||
where
|
||||
T: Ord + Decode + Encode + Send + Sync,
|
||||
{
|
||||
// New Index
|
||||
let index = Store::create_empty_index_at(&self.header, column).await?;
|
||||
|
|
@ -303,8 +343,9 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
}
|
||||
|
||||
// ===Garbage Collection===
|
||||
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
|
||||
where T: Send + Sync + Decode + Encode + Clone + Ord
|
||||
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
|
||||
where
|
||||
T: Send + Sync + Decode + Encode + Clone + Ord,
|
||||
{
|
||||
if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER {
|
||||
println!("=======START GARBAGE COLLECTOR====");
|
||||
|
|
@ -314,14 +355,16 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn initiate_garbage_collection(&mut self) -> Result<usize>
|
||||
where T: Send + Sync + Decode + Encode + Clone + Ord
|
||||
pub async fn initiate_garbage_collection(&mut self) -> Result<usize>
|
||||
where
|
||||
T: Send + Sync + Decode + Encode + Clone + Ord,
|
||||
{
|
||||
let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?;
|
||||
// Since garbage collection changes FilePositions of live entries, we need to update the
|
||||
// indexes too.
|
||||
|
||||
let mut in_memory_indexes: Vec<Option<BTreeMap<T, HashSet<FilePosition>>>> = Vec::with_capacity(self.header.number_of_columns);
|
||||
let mut in_memory_indexes: Vec<Option<BTreeMap<T, HashSet<FilePosition>>>> =
|
||||
Vec::with_capacity(self.header.number_of_columns);
|
||||
for column in 0..self.header.number_of_columns {
|
||||
if self.header.is_column_indexed(column as Column) {
|
||||
let in_memory_index = BTreeMap::new();
|
||||
|
|
@ -337,12 +380,19 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
{
|
||||
while let Some(live_entry) = self.next_alive().await? {
|
||||
entries_deleted += 1;
|
||||
let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?;
|
||||
let file_position = cursor_to_intermediate
|
||||
.append_entry_no_indexing(&live_entry.forget())
|
||||
.await?;
|
||||
|
||||
// Update index. (Wouldn't it be nice if we had `for let ...`?)
|
||||
for (maybe_in_memory_index, value) in in_memory_indexes.iter_mut().zip(&live_entry.data) {
|
||||
for (maybe_in_memory_index, value) in
|
||||
in_memory_indexes.iter_mut().zip(&live_entry.data)
|
||||
{
|
||||
if let Some(in_memory_index) = maybe_in_memory_index {
|
||||
in_memory_index.entry(value.clone()).or_insert_with(HashSet::new).insert(file_position);
|
||||
in_memory_index
|
||||
.entry(value.clone())
|
||||
.or_insert_with(HashSet::new)
|
||||
.insert(file_position);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -369,21 +419,24 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
// current file
|
||||
let path_to_table = Path::new(&self.header.table_folder);
|
||||
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||
let path_to_intermediate_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
|
||||
let path_to_intermediate_rows =
|
||||
path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
|
||||
fs::remove_file(path_to_rows.clone()).await?;
|
||||
fs::rename(path_to_intermediate_rows, path_to_rows).await?;
|
||||
|
||||
Ok(entries_deleted)
|
||||
}
|
||||
|
||||
async fn spawn_cursor_to_intermediate_file(&self) -> Result<AppendOnlyCursor<T>>
|
||||
where T: Send
|
||||
async fn spawn_cursor_to_intermediate_file(&self) -> Result<AppendOnlyCursor<T>>
|
||||
where
|
||||
T: Send,
|
||||
{
|
||||
let table_folder = self.header.table_folder.clone();
|
||||
let path_to_table = Path::new(&table_folder);
|
||||
let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
|
||||
|
||||
let intermediate_file: File = Store::<T>::create_empty_rows_file(path_to_rows, &self.header).await?;
|
||||
let intermediate_file: File =
|
||||
Store::<T>::create_empty_rows_file(path_to_rows, &self.header).await?;
|
||||
|
||||
let intermediate_header: StoreHeader = StoreHeader {
|
||||
table_folder,
|
||||
|
|
|
|||
|
|
@ -1,14 +1,14 @@
|
|||
use async_trait::async_trait;
|
||||
|
||||
use crate::binary_coding::encode;
|
||||
use bincode;
|
||||
use bincode::Encode;
|
||||
use crate::binary_coding::encode;
|
||||
|
||||
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::segments::entry::Entry;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::store::{FilePosition, Result};
|
||||
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
|
||||
|
|
@ -16,8 +16,9 @@ pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
|
|||
fn set_eof_file_position(&mut self, new_file_position: FilePosition);
|
||||
|
||||
// ===Store Header Manipulation===
|
||||
async fn increment_total_count(&mut self) -> Result<()>
|
||||
where T: Send
|
||||
async fn increment_total_count(&mut self) -> Result<()>
|
||||
where
|
||||
T: Send,
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
|
||||
|
|
@ -26,18 +27,21 @@ pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn increment_deleted_count(&mut self) -> Result<()>
|
||||
where T: Send
|
||||
async fn increment_deleted_count(&mut self) -> Result<()>
|
||||
where
|
||||
T: Send,
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?;
|
||||
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64)
|
||||
.await?;
|
||||
let new_count = self.header_mut().increment_deleted_count();
|
||||
self.write_bytes(&encode::<usize>(&new_count)?).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn set_header(&mut self, header: &StoreHeader) -> Result<()>
|
||||
where T: Send
|
||||
async fn set_header(&mut self, header: &StoreHeader) -> Result<()>
|
||||
where
|
||||
T: Send,
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
let encoded_header: Vec<u8> = header.encode()?;
|
||||
|
|
@ -50,8 +54,9 @@ pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
|
|||
|
||||
// Moves cursor to the end.
|
||||
// Returns file position to the start of the new entry.
|
||||
async fn append_entry_no_indexing(&mut self, entry: &Entry<T>) -> Result<FilePosition>
|
||||
where T: Encode + Send + Sync
|
||||
async fn append_entry_no_indexing(&mut self, entry: &Entry<T>) -> Result<FilePosition>
|
||||
where
|
||||
T: Encode + Send + Sync,
|
||||
{
|
||||
self.increment_total_count().await?;
|
||||
|
||||
|
|
|
|||
|
|
@ -5,19 +5,20 @@ use async_trait::async_trait;
|
|||
use bincode;
|
||||
use bincode::{Decode, Encode};
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::segments::entry::{Entry, EntryDetailed};
|
||||
use crate::store::{FilePosition, Column, Result};
|
||||
use crate::index::Index;
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::error::Error;
|
||||
use crate::index::Index;
|
||||
use crate::segments::entry::{Entry, EntryDetailed};
|
||||
use crate::store::{Column, FilePosition, Result};
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanReadIndex<T>: CursorCanTraverse<T> {
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>];
|
||||
|
||||
async fn index_lookup(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>>
|
||||
where T: Encode + Decode + Ord + Send + Sync
|
||||
async fn index_lookup(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync,
|
||||
{
|
||||
match &self.indexes()[column as usize] {
|
||||
Some(index) => {
|
||||
|
|
@ -25,24 +26,24 @@ pub trait CursorCanReadIndex<T>: CursorCanTraverse<T> {
|
|||
let mut entries: Vec<EntryDetailed<T>> = vec![];
|
||||
for &file_position in file_positions.iter() {
|
||||
match self.read_entry_at(file_position).await? {
|
||||
Some(entry) => {
|
||||
entries.push(entry)
|
||||
},
|
||||
None => {
|
||||
return Err(Error::IndexIsStoringEofFilePosition(column))
|
||||
}
|
||||
Some(entry) => entries.push(entry),
|
||||
None => return Err(Error::IndexIsStoringEofFilePosition(column)),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(entries)
|
||||
},
|
||||
None =>
|
||||
Err(Error::AttemptToIndexNonIndexableColumn(column))
|
||||
}
|
||||
None => Err(Error::AttemptToIndexNonIndexableColumn(column)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn select_entries_where_eq(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>>
|
||||
where T: Encode + Decode + Ord + Send + Sync
|
||||
async fn select_entries_where_eq(
|
||||
&mut self,
|
||||
column: Column,
|
||||
value: &T,
|
||||
) -> Result<Vec<EntryDetailed<T>>>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync,
|
||||
{
|
||||
if self.header().is_column_indexed(column) {
|
||||
self.index_lookup(column, value).await
|
||||
|
|
@ -59,9 +60,7 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
|
|||
// Assumes that the column is indexable.
|
||||
fn mut_index_at(&mut self, column: Column) -> &mut Index<T, FilePosition> {
|
||||
match &mut self.indexes_mut()[column as usize] {
|
||||
Some(index) => {
|
||||
index
|
||||
},
|
||||
Some(index) => index,
|
||||
None => {
|
||||
unreachable!()
|
||||
}
|
||||
|
|
@ -69,8 +68,14 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
|
|||
}
|
||||
|
||||
// Assumes that the column is indexable.
|
||||
async fn insert_into_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()>
|
||||
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||
async fn insert_into_index(
|
||||
&mut self,
|
||||
column: Column,
|
||||
value: T,
|
||||
file_position: FilePosition,
|
||||
) -> Result<()>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + 'async_trait,
|
||||
{
|
||||
let index = self.mut_index_at(column as Column);
|
||||
index.insert(value, file_position).await?;
|
||||
|
|
@ -78,24 +83,37 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
|
|||
}
|
||||
|
||||
// Assumes that the column is indexable.
|
||||
async fn delete_from_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()>
|
||||
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||
async fn delete_from_index(
|
||||
&mut self,
|
||||
column: Column,
|
||||
value: T,
|
||||
file_position: FilePosition,
|
||||
) -> Result<()>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + 'async_trait,
|
||||
{
|
||||
let index = self.mut_index_at(column as Column);
|
||||
index.delete(value, file_position).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert_entry(&mut self, entry: Entry<T>) -> Result<FilePosition>
|
||||
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||
|
||||
async fn insert_entry(&mut self, entry: Entry<T>) -> Result<FilePosition>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + 'async_trait,
|
||||
{
|
||||
let file_position = self.append_entry_no_indexing(&entry).await?;
|
||||
|
||||
// insert the indexable values of the entry into corresponding indexes.
|
||||
for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() {
|
||||
for (column, (value, should_index)) in entry
|
||||
.data
|
||||
.into_iter()
|
||||
.zip(self.header().indexed_columns.clone())
|
||||
.enumerate()
|
||||
{
|
||||
if should_index {
|
||||
// SAFETY: If should_index is true, then the column is indexable.
|
||||
self.insert_into_index(column as Column, value, file_position).await?
|
||||
self.insert_into_index(column as Column, value, file_position)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -103,12 +121,19 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
|
|||
}
|
||||
|
||||
async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed<T>) -> Result<()>
|
||||
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + 'async_trait,
|
||||
{
|
||||
for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() {
|
||||
for (column, (value, should_index)) in entry
|
||||
.data
|
||||
.into_iter()
|
||||
.zip(self.header().indexed_columns.clone())
|
||||
.enumerate()
|
||||
{
|
||||
if should_index {
|
||||
// SAFETY: If should_index is true, then the column is indexable.
|
||||
self.delete_from_index(column as Column, value, entry.file_position).await?
|
||||
self.delete_from_index(column as Column, value, entry.file_position)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
pub(crate) mod primitive;
|
||||
pub mod traversal;
|
||||
pub mod entry_modification;
|
||||
pub mod index_access;
|
||||
pub(crate) mod primitive;
|
||||
pub mod traversal;
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom};
|
||||
use tokio::fs::File;
|
||||
use async_trait::async_trait;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
|
||||
|
||||
use crate::store::{FilePosition, Result};
|
||||
|
||||
|
|
|
|||
|
|
@ -1,24 +1,24 @@
|
|||
use tokio::io::AsyncReadExt;
|
||||
use async_trait::async_trait;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
use crate::binary_coding::decode;
|
||||
use bincode;
|
||||
use bincode::Decode;
|
||||
use crate::binary_coding::decode;
|
||||
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use crate::cursor_capabilities::primitive::CursorCanRead;
|
||||
use crate::error::{DecodeErrorKind, Error};
|
||||
use crate::segments::entry::EntryDetailed;
|
||||
use crate::segments::entry_header::EntryHeaderWithDataSize;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::store::{FilePosition, Column, Result};
|
||||
use crate::cursor_capabilities::primitive::CursorCanRead;
|
||||
|
||||
use crate::store::{Column, FilePosition, Result};
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
||||
fn header(&self) -> &StoreHeader;
|
||||
|
||||
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
|
||||
self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await
|
||||
self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn read_entry_header(&mut self) -> Result<EntryHeaderWithDataSize> {
|
||||
|
|
@ -30,14 +30,21 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
|||
Ok(header)
|
||||
}
|
||||
|
||||
async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result<EntryHeaderWithDataSize> {
|
||||
async fn read_entry_header_at(
|
||||
&mut self,
|
||||
file_position: FilePosition,
|
||||
) -> Result<EntryHeaderWithDataSize> {
|
||||
self.seek_to(file_position).await?;
|
||||
self.read_entry_header().await
|
||||
}
|
||||
|
||||
// Returns None when file_position == eof_file_position
|
||||
async fn read_entry_at(&mut self, file_position: FilePosition) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode
|
||||
async fn read_entry_at(
|
||||
&mut self,
|
||||
file_position: FilePosition,
|
||||
) -> Result<Option<EntryDetailed<T>>>
|
||||
where
|
||||
T: Decode,
|
||||
{
|
||||
self.seek_to(file_position).await?;
|
||||
self.next().await
|
||||
|
|
@ -46,12 +53,11 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
|||
// ===Iteration===
|
||||
// The following functions assume that the current file position is at a valid entry or EOF.
|
||||
|
||||
|
||||
// WARNING: This moves the file_position to start of the data, so you can't just call
|
||||
// next_entry_header() a bunch of times. You must move the cursor!
|
||||
async fn next_entry_header(&mut self) -> Result<Option<EntryHeaderWithDataSize>> {
|
||||
if self.is_at_eof().await? {
|
||||
return Ok(None)
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let entry_header = self.read_entry_header().await?;
|
||||
|
|
@ -60,31 +66,47 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
|||
}
|
||||
|
||||
// This is meant to be used after next_entry_header() is called.
|
||||
async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result<FilePosition>{
|
||||
async fn jump_from_start_of_entry_data_to_next_entry(
|
||||
&mut self,
|
||||
entry_header: &EntryHeaderWithDataSize,
|
||||
) -> Result<FilePosition> {
|
||||
let file_position = self.seek_by(entry_header.size_of_data() as i64).await?;
|
||||
Ok(file_position)
|
||||
}
|
||||
|
||||
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode
|
||||
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
|
||||
where
|
||||
T: Decode,
|
||||
{
|
||||
let file_position = self.current_file_position().await?;
|
||||
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
|
||||
let Some(entry_header) = self.next_entry_header().await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let mut data_bytes: Vec<u8> = vec![0; entry_header.size_of_data()];
|
||||
self.read_bytes(&mut data_bytes).await?;
|
||||
let entry: EntryDetailed<T> =
|
||||
EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?;
|
||||
let entry: EntryDetailed<T> = EntryDetailed::decode(
|
||||
entry_header,
|
||||
file_position,
|
||||
self.header().number_of_columns,
|
||||
&mut data_bytes,
|
||||
)?;
|
||||
|
||||
Ok(Some(entry))
|
||||
}
|
||||
|
||||
// Like next, but only reads the column, not the whole entry.
|
||||
async fn next_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
|
||||
where T: Decode + Send
|
||||
// Like next, but only reads the column, not the whole entry.
|
||||
async fn next_at_column(
|
||||
&mut self,
|
||||
column: Column,
|
||||
) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
|
||||
where
|
||||
T: Decode + Send,
|
||||
{
|
||||
let file_position = self.current_file_position().await?;
|
||||
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
|
||||
let Some(entry_header) = self.next_entry_header().await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
let file_position_at_start_of_data = self.current_file_position().await?;
|
||||
|
||||
// figuring out how much to decode
|
||||
|
|
@ -94,49 +116,59 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
|||
// reading and decoding
|
||||
let mut bytes: Vec<u8> = vec![0; entry_header.data_sizes[column as usize]];
|
||||
self.read_bytes(&mut bytes).await?;
|
||||
let (value, _) =
|
||||
decode::<T>(&bytes[..])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
|
||||
let (value, _) = decode::<T>(&bytes[..])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
|
||||
|
||||
// jumping to next entry
|
||||
self.seek_to(file_position_at_start_of_data).await?;
|
||||
self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?;
|
||||
self.jump_from_start_of_entry_data_to_next_entry(&entry_header)
|
||||
.await?;
|
||||
|
||||
Ok(Some((entry_header, file_position, value)))
|
||||
}
|
||||
|
||||
async fn next_alive_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
|
||||
where T: Decode + Send
|
||||
async fn next_alive_at_column(
|
||||
&mut self,
|
||||
column: Column,
|
||||
) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
|
||||
where
|
||||
T: Decode + Send,
|
||||
{
|
||||
while let Some((header, file_position, t)) = self.next_at_column(column).await? {
|
||||
if !header.is_deleted {
|
||||
return Ok(Some((header, file_position, t)))
|
||||
if !header.is_deleted {
|
||||
return Ok(Some((header, file_position, t)));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
async fn next_alive(&mut self) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode
|
||||
async fn next_alive(&mut self) -> Result<Option<EntryDetailed<T>>>
|
||||
where
|
||||
T: Decode,
|
||||
{
|
||||
while let Some(entry) = self.next().await? {
|
||||
if !entry.header.is_deleted {
|
||||
return Ok(Some(entry))
|
||||
if !entry.header.is_deleted {
|
||||
return Ok(Some(entry));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
// ===Search===
|
||||
async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode + PartialEq + Send + Sync
|
||||
async fn find_first_eq_bruteforce(
|
||||
&mut self,
|
||||
column: Column,
|
||||
t0: &T,
|
||||
) -> Result<Option<EntryDetailed<T>>>
|
||||
where
|
||||
T: Decode + PartialEq + Send + Sync,
|
||||
{
|
||||
let mut file_position = self.current_file_position().await?;
|
||||
while let Some((_, _, t)) = self.next_alive_at_column(column).await? {
|
||||
if &t == t0 {
|
||||
if &t == t0 {
|
||||
// go back and decode the whole entry
|
||||
self.seek_to(file_position).await?;
|
||||
return self.next().await
|
||||
return self.next().await;
|
||||
} else {
|
||||
file_position = self.current_file_position().await?;
|
||||
}
|
||||
|
|
@ -144,8 +176,13 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Vec<EntryDetailed<T>>>
|
||||
where T: Decode + PartialEq + Send + Sync
|
||||
async fn find_all_eq_bruteforce(
|
||||
&mut self,
|
||||
column: Column,
|
||||
t0: &T,
|
||||
) -> Result<Vec<EntryDetailed<T>>>
|
||||
where
|
||||
T: Decode + PartialEq + Send + Sync,
|
||||
{
|
||||
let mut entries = vec![];
|
||||
while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? {
|
||||
|
|
@ -155,8 +192,9 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
|||
}
|
||||
|
||||
// ===Debugging===
|
||||
async fn read_entries(&mut self) -> Result<()>
|
||||
where T: Decode + std::fmt::Debug
|
||||
async fn read_entries(&mut self) -> Result<()>
|
||||
where
|
||||
T: Decode + std::fmt::Debug,
|
||||
{
|
||||
self.seek_to_start_of_data().await?;
|
||||
while let Some(entry) = self.next().await? {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
pub mod store;
|
||||
mod binary_coding;
|
||||
pub mod cursor;
|
||||
pub mod cursor_capabilities;
|
||||
pub mod error;
|
||||
mod index;
|
||||
pub mod cursor;
|
||||
pub mod segments;
|
||||
pub mod cursor_capabilities;
|
||||
pub mod store;
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
use bincode::{Decode, Encode};
|
||||
|
||||
use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence};
|
||||
use crate::store::{Result, FilePosition};
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use crate::binary_coding::{decode_sequence, encode_sequence, encode_sequence_with_sizes};
|
||||
use crate::error::{DecodeErrorKind, Error};
|
||||
use crate::segments::entry_header::{EntryHeader, EntryHeaderWithDataSize};
|
||||
use crate::store::{FilePosition, Result};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Entry<T> {
|
||||
|
|
@ -18,14 +18,18 @@ pub struct EntryDetailed<T> {
|
|||
pub data: Vec<T>,
|
||||
}
|
||||
|
||||
impl <T>Entry<T> {
|
||||
impl<T> Entry<T> {
|
||||
pub fn new(data: Vec<T>) -> Self {
|
||||
Self { header: EntryHeader { is_deleted: false }, data }
|
||||
Self {
|
||||
header: EntryHeader { is_deleted: false },
|
||||
data,
|
||||
}
|
||||
}
|
||||
|
||||
// FORMAT: [EntryHeaderWithDataSize, ..sequence of data]
|
||||
pub fn encode(&self) -> Result<Vec<u8>>
|
||||
where T: Encode
|
||||
pub fn encode(&self) -> Result<Vec<u8>>
|
||||
where
|
||||
T: Encode,
|
||||
{
|
||||
let mut result: Vec<u8> = self.header.encode()?;
|
||||
|
||||
|
|
@ -36,17 +40,28 @@ impl <T>Entry<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl <T>EntryDetailed<T> {
|
||||
pub fn decode(header: EntryHeaderWithDataSize, file_position: FilePosition, number_of_columns: usize, bytes: &[u8]) -> Result<Self>
|
||||
where T: Decode
|
||||
impl<T> EntryDetailed<T> {
|
||||
pub fn decode(
|
||||
header: EntryHeaderWithDataSize,
|
||||
file_position: FilePosition,
|
||||
number_of_columns: usize,
|
||||
bytes: &[u8],
|
||||
) -> Result<Self>
|
||||
where
|
||||
T: Decode,
|
||||
{
|
||||
let data = decode_sequence::<T>(number_of_columns, bytes)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?;
|
||||
Ok(EntryDetailed { header, file_position, data })
|
||||
Ok(EntryDetailed {
|
||||
header,
|
||||
file_position,
|
||||
data,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn forget(&self) -> Entry<T>
|
||||
where T: Clone
|
||||
pub fn forget(&self) -> Entry<T>
|
||||
where
|
||||
T: Clone,
|
||||
{
|
||||
Entry {
|
||||
header: self.header.clone().into(),
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use crate::binary_coding::{decode, encode, decode_sequence};
|
||||
use crate::store::{Result, Column};
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use crate::binary_coding::{decode, decode_sequence, encode};
|
||||
use crate::error::{DecodeErrorKind, Error};
|
||||
use crate::store::{Column, Result};
|
||||
use std::mem::size_of;
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
@ -12,7 +12,7 @@ pub struct EntryHeader {
|
|||
pub struct EntryHeaderWithDataSize {
|
||||
pub is_deleted: bool,
|
||||
pub data_sizes: Vec<usize>, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6
|
||||
// bytes etc
|
||||
// bytes etc
|
||||
}
|
||||
|
||||
impl EntryHeader {
|
||||
|
|
@ -24,21 +24,23 @@ impl EntryHeader {
|
|||
|
||||
impl From<EntryHeaderWithDataSize> for EntryHeader {
|
||||
fn from(entry: EntryHeaderWithDataSize) -> Self {
|
||||
Self { is_deleted: entry.is_deleted, }
|
||||
Self {
|
||||
is_deleted: entry.is_deleted,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EntryHeaderWithDataSize {
|
||||
pub const IS_DELETED_OFFSET: usize = 0;
|
||||
pub const IS_DELETED_SIZE: usize = size_of::<bool>();
|
||||
pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE;
|
||||
pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE;
|
||||
|
||||
pub fn size(number_of_columns: usize) -> usize {
|
||||
let size_of_data_sizes: usize = number_of_columns*size_of::<usize>();
|
||||
let size_of_data_sizes: usize = number_of_columns * size_of::<usize>();
|
||||
Self::IS_DELETED_SIZE + size_of_data_sizes
|
||||
}
|
||||
|
||||
pub fn size_of_data(&self) -> usize{
|
||||
pub fn size_of_data(&self) -> usize {
|
||||
self.data_sizes.iter().sum()
|
||||
}
|
||||
|
||||
|
|
@ -48,21 +50,23 @@ impl EntryHeaderWithDataSize {
|
|||
if i < column as usize {
|
||||
sum += size;
|
||||
} else {
|
||||
break
|
||||
break;
|
||||
}
|
||||
}
|
||||
sum
|
||||
}
|
||||
|
||||
pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result<Self> {
|
||||
let (is_deleted, _) =
|
||||
decode::<bool>(&bytes)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
|
||||
let (is_deleted, _) = decode::<bool>(&bytes)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
|
||||
|
||||
let data_sizes = decode_sequence::<usize>(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?;
|
||||
let data_sizes =
|
||||
decode_sequence::<usize>(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?;
|
||||
|
||||
Ok(Self { is_deleted, data_sizes } )
|
||||
Ok(Self {
|
||||
is_deleted,
|
||||
data_sizes,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence};
|
||||
use crate::store::{Result, Column};
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use crate::binary_coding::{decode, decode_sequence, encode, encode_sequence};
|
||||
use crate::error::{DecodeErrorKind, Error};
|
||||
use crate::store::{Column, Result};
|
||||
use std::mem::size_of;
|
||||
use std::path::PathBuf;
|
||||
|
||||
|
|
@ -30,14 +30,19 @@ impl StoreHeader {
|
|||
pub const DELETED_COUNT_SIZE: usize = size_of::<usize>();
|
||||
pub const TOTAL_COUNT_SIZE: usize = size_of::<usize>();
|
||||
pub const PRIMARY_COLUMN_SIZE: usize = size_of::<Column>();
|
||||
pub const FIXED_SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE;
|
||||
pub const FIXED_SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE
|
||||
+ Self::DELETED_COUNT_SIZE
|
||||
+ Self::TOTAL_COUNT_SIZE
|
||||
+ Self::PRIMARY_COLUMN_SIZE;
|
||||
|
||||
pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0;
|
||||
pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE;
|
||||
pub const DELETED_COUNT_OFFSET: usize =
|
||||
Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE;
|
||||
pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE;
|
||||
pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE;
|
||||
#[allow(dead_code)]
|
||||
pub const INDEXED_COLUMNS_OFFSET: usize = Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE;
|
||||
pub const INDEXED_COLUMNS_OFFSET: usize =
|
||||
Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE;
|
||||
|
||||
fn indexed_columns_size(number_of_columns: usize) -> usize {
|
||||
size_of::<bool>() * number_of_columns
|
||||
|
|
@ -64,19 +69,29 @@ impl StoreHeader {
|
|||
vec![0; Self::indexed_columns_size(header.number_of_columns)]
|
||||
}
|
||||
|
||||
pub async fn decode_fixed(table_folder: &PathBuf, result: &[u8]) -> Result<StoreHeaderFixedPart> {
|
||||
let (number_of_columns, _) =
|
||||
decode::<usize>(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
|
||||
let (deleted_count, _) =
|
||||
decode::<usize>(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?;
|
||||
let (total_count, _) =
|
||||
decode::<usize>(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?;
|
||||
let (primary_column, _) =
|
||||
decode::<Column>(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?;
|
||||
pub async fn decode_fixed(
|
||||
table_folder: &PathBuf,
|
||||
result: &[u8],
|
||||
) -> Result<StoreHeaderFixedPart> {
|
||||
let (number_of_columns, _) = decode::<usize>(
|
||||
&result[Self::NUMBER_OF_COLUMNS_OFFSET
|
||||
..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE],
|
||||
)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
|
||||
let (deleted_count, _) = decode::<usize>(
|
||||
&result
|
||||
[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE],
|
||||
)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?;
|
||||
let (total_count, _) = decode::<usize>(
|
||||
&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE],
|
||||
)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?;
|
||||
let (primary_column, _) = decode::<Column>(
|
||||
&result[Self::PRIMARY_COLUMN_OFFSET
|
||||
..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE],
|
||||
)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?;
|
||||
let header = StoreHeaderFixedPart {
|
||||
table_folder: table_folder.clone(),
|
||||
number_of_columns,
|
||||
|
|
@ -89,9 +104,8 @@ impl StoreHeader {
|
|||
}
|
||||
|
||||
pub async fn decode_rest(header: StoreHeaderFixedPart, result: &[u8]) -> Result<StoreHeader> {
|
||||
let indexed_columns: Vec<bool> =
|
||||
decode_sequence::<bool>(header.number_of_columns, result)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?;
|
||||
let indexed_columns: Vec<bool> = decode_sequence::<bool>(header.number_of_columns, result)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?;
|
||||
|
||||
Ok(StoreHeader {
|
||||
table_folder: header.table_folder.into(),
|
||||
|
|
@ -104,8 +118,6 @@ impl StoreHeader {
|
|||
})
|
||||
}
|
||||
|
||||
|
||||
|
||||
// returns new count
|
||||
pub fn increment_total_count(&mut self) -> usize {
|
||||
self.total_count += 1;
|
||||
|
|
|
|||
|
|
@ -1,15 +1,14 @@
|
|||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::fs::{File, OpenOptions, DirBuilder};
|
||||
use tokio::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use bincode::{Decode, Encode};
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs;
|
||||
use tokio::fs::{DirBuilder, File, OpenOptions};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::cursor::{ReadCursor, WriteCursor};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::error::Error;
|
||||
use crate::index::Index;
|
||||
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
|
|
@ -25,7 +24,6 @@ pub struct Store<T> {
|
|||
|
||||
pub type StoreIndexes<T> = Vec<Option<Index<T, FilePosition>>>;
|
||||
|
||||
|
||||
//===Store===
|
||||
pub async fn store_exists(table_folder: &str) -> Result<bool> {
|
||||
Ok(fs::metadata(table_folder).await.is_ok())
|
||||
|
|
@ -34,14 +32,18 @@ pub async fn store_exists(table_folder: &str) -> Result<bool> {
|
|||
pub const ROWS_FILE_NAME: &'static str = "rows";
|
||||
pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate";
|
||||
|
||||
impl <T>Store<T> {
|
||||
impl<T> Store<T> {
|
||||
// ===Creation===
|
||||
pub async fn new(path_to_table: &Path, number_of_columns: usize, primary_column: Column) -> Result<Self>
|
||||
where T: Encode + Decode + Ord
|
||||
pub async fn new(
|
||||
path_to_table: &Path,
|
||||
number_of_columns: usize,
|
||||
primary_column: Column,
|
||||
) -> Result<Self>
|
||||
where
|
||||
T: Encode + Decode + Ord,
|
||||
{
|
||||
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||
DirBuilder::new()
|
||||
.create(path_to_table).await?;
|
||||
DirBuilder::new().create(path_to_table).await?;
|
||||
|
||||
let header = {
|
||||
let mut indexed_columns = vec![false; number_of_columns];
|
||||
|
|
@ -61,22 +63,24 @@ impl <T>Store<T> {
|
|||
|
||||
let indexes: StoreIndexes<T> = Self::create_initial_indexes(&header).await?;
|
||||
|
||||
let store = Self {
|
||||
header,
|
||||
indexes,
|
||||
};
|
||||
let store = Self { header, indexes };
|
||||
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
pub fn path_to_index_file(header: &StoreHeader, column: Column) -> PathBuf {
|
||||
let path_to_table = Path::new(&header.table_folder);
|
||||
let path_to_index = path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column.to_string()));
|
||||
let path_to_index =
|
||||
path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column.to_string()));
|
||||
path_to_index
|
||||
}
|
||||
|
||||
pub async fn create_empty_index_at(header: &StoreHeader, column: Column) -> Result<Index<T, FilePosition>>
|
||||
where T: Encode + Decode + Ord
|
||||
pub async fn create_empty_index_at(
|
||||
header: &StoreHeader,
|
||||
column: Column,
|
||||
) -> Result<Index<T, FilePosition>>
|
||||
where
|
||||
T: Encode + Decode + Ord,
|
||||
{
|
||||
let path_to_index = Self::path_to_index_file(&header, column);
|
||||
let index = Index::new(path_to_index).await?;
|
||||
|
|
@ -84,35 +88,43 @@ impl <T>Store<T> {
|
|||
Ok(index)
|
||||
}
|
||||
|
||||
pub async fn create_initial_indexes(header: &StoreHeader) -> Result<StoreIndexes<T>>
|
||||
where T: Encode + Decode + Ord
|
||||
pub async fn create_initial_indexes(header: &StoreHeader) -> Result<StoreIndexes<T>>
|
||||
where
|
||||
T: Encode + Decode + Ord,
|
||||
{
|
||||
let mut result: StoreIndexes<T> = Vec::with_capacity(header.number_of_columns);
|
||||
for _ in 0..header.number_of_columns {
|
||||
result.push(None)
|
||||
}
|
||||
|
||||
result[header.primary_column as usize] = Some(Self::create_empty_index_at(&header, header.primary_column).await?);
|
||||
result[header.primary_column as usize] =
|
||||
Some(Self::create_empty_index_at(&header, header.primary_column).await?);
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn connect_index_at(header: &StoreHeader, column: Column) -> Result<Index<T, FilePosition>>
|
||||
where T: Encode + Decode + Ord
|
||||
pub async fn connect_index_at(
|
||||
header: &StoreHeader,
|
||||
column: Column,
|
||||
) -> Result<Index<T, FilePosition>>
|
||||
where
|
||||
T: Encode + Decode + Ord,
|
||||
{
|
||||
let path_to_index = Self::path_to_index_file(&header, column);
|
||||
let index: Index<T, FilePosition> = Index::connect(path_to_index).await?;
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result<File> {
|
||||
let mut file: File =
|
||||
OpenOptions::new()
|
||||
.write(true)
|
||||
.read(true)
|
||||
.create_new(true)
|
||||
.open(path_to_rows)
|
||||
.await?;
|
||||
pub async fn create_empty_rows_file(
|
||||
path_to_rows: PathBuf,
|
||||
header: &StoreHeader,
|
||||
) -> Result<File> {
|
||||
let mut file: File = OpenOptions::new()
|
||||
.write(true)
|
||||
.read(true)
|
||||
.create_new(true)
|
||||
.open(path_to_rows)
|
||||
.await?;
|
||||
|
||||
let encoded_header: Vec<u8> = header.encode()?;
|
||||
file.write(&encoded_header).await?;
|
||||
|
|
@ -121,17 +133,17 @@ impl <T>Store<T> {
|
|||
}
|
||||
|
||||
pub async fn connect(table_folder: &PathBuf) -> Result<Self>
|
||||
where T: std::fmt::Debug + Encode + Decode + Ord
|
||||
where
|
||||
T: std::fmt::Debug + Encode + Decode + Ord,
|
||||
{
|
||||
let path_to_table = Path::new(table_folder);
|
||||
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||
|
||||
let mut file: File =
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(path_to_rows)
|
||||
.await?;
|
||||
let mut file: File = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(path_to_rows)
|
||||
.await?;
|
||||
|
||||
// Unfortunately we can't yet use store.read_bytes, since it can't be created without the
|
||||
// header.
|
||||
|
|
@ -146,12 +158,13 @@ impl <T>Store<T> {
|
|||
StoreHeader::decode_rest(fixed_header, &rest_bytes).await?
|
||||
};
|
||||
|
||||
|
||||
let indexes: StoreIndexes<T> = {
|
||||
let mut result = Vec::with_capacity(header.number_of_columns);
|
||||
for (column, &is_indexed) in header.indexed_columns.iter().enumerate() {
|
||||
if is_indexed {
|
||||
result.push(Some(Self::connect_index_at(&header, column as Column).await?))
|
||||
result.push(Some(
|
||||
Self::connect_index_at(&header, column as Column).await?,
|
||||
))
|
||||
} else {
|
||||
result.push(None)
|
||||
}
|
||||
|
|
@ -160,29 +173,29 @@ impl <T>Store<T> {
|
|||
result
|
||||
};
|
||||
|
||||
let store = Self {
|
||||
header,
|
||||
indexes
|
||||
};
|
||||
let store = Self { header, indexes };
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
// ===Cursors===
|
||||
pub async fn read_cursor(&self) -> Result<ReadCursor<T>>
|
||||
where T: Send + Sync
|
||||
where
|
||||
T: Send + Sync,
|
||||
{
|
||||
ReadCursor::new(self).await
|
||||
}
|
||||
|
||||
pub async fn write_cursor(&mut self) -> Result<WriteCursor<T>>
|
||||
where T: Send + Sync
|
||||
where
|
||||
T: Send + Sync,
|
||||
{
|
||||
WriteCursor::new(self).await
|
||||
}
|
||||
|
||||
// ===Indexes===
|
||||
pub async fn attach_index(&mut self, column: Column) -> Result<()>
|
||||
where T: Ord + Decode + Encode + Send + Sync
|
||||
pub async fn attach_index(&mut self, column: Column) -> Result<()>
|
||||
where
|
||||
T: Ord + Decode + Encode + Send + Sync,
|
||||
{
|
||||
if self.header.is_column_indexed(column) {
|
||||
Err(Error::ColumnAlreadyIndexed(column))
|
||||
|
|
@ -195,7 +208,8 @@ impl <T>Store<T> {
|
|||
// For debugging.
|
||||
#[allow(dead_code)]
|
||||
pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>
|
||||
where T: Send + Sync
|
||||
where
|
||||
T: Send + Sync,
|
||||
{
|
||||
let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||
let bytes = cursor.read_all_bytes().await?;
|
||||
|
|
@ -206,11 +220,11 @@ impl <T>Store<T> {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::segments::entry::Entry;
|
||||
use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex};
|
||||
use crate::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::segments::entry::Entry;
|
||||
|
||||
impl <T>Drop for Store<T> {
|
||||
impl<T> Drop for Store<T> {
|
||||
fn drop(&mut self) {
|
||||
println!("DROPPING TEST FOLDER");
|
||||
let table_folder = self.header.table_folder.clone();
|
||||
|
|
@ -219,7 +233,6 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create() {
|
||||
type Data = u32;
|
||||
|
|
@ -227,7 +240,9 @@ mod tests {
|
|||
let table_path = Path::new("test_table_0");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
let store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(store.header.number_of_columns == number_of_columns);
|
||||
assert!(store.header.total_count == 0);
|
||||
|
|
@ -242,7 +257,9 @@ mod tests {
|
|||
let table_path = Path::new("test_table_1");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
|
@ -264,7 +281,9 @@ mod tests {
|
|||
let table_path = Path::new("test_table_2");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
|
@ -284,8 +303,8 @@ mod tests {
|
|||
let entry0 = cursor.next().await.unwrap().unwrap();
|
||||
let entry1 = cursor.next().await.unwrap().unwrap();
|
||||
|
||||
assert!(entry0.data == vec![1,2,3,4,5]);
|
||||
assert!(entry1.data == vec![6,7,8,9,10]);
|
||||
assert!(entry0.data == vec![1, 2, 3, 4, 5]);
|
||||
assert!(entry1.data == vec![6, 7, 8, 9, 10]);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -296,7 +315,9 @@ mod tests {
|
|||
let table_path = Path::new("test_table_3");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
|
@ -319,8 +340,8 @@ mod tests {
|
|||
}
|
||||
|
||||
assert!(entries.len() == 2);
|
||||
assert!(entries[0].data == vec![1,2,3,4,5]);
|
||||
assert!(entries[1].data == vec![6,7,8,9,10]);
|
||||
assert!(entries[0].data == vec![1, 2, 3, 4, 5]);
|
||||
assert!(entries[1].data == vec![6, 7, 8, 9, 10]);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -331,7 +352,9 @@ mod tests {
|
|||
let table_path = Path::new("test_table_4");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let value = 200;
|
||||
{
|
||||
|
|
@ -356,7 +379,10 @@ mod tests {
|
|||
let mut cursor = store.read_cursor().await.unwrap();
|
||||
let column = 1;
|
||||
|
||||
let entries = cursor.select_entries_where_eq(column, &value).await.unwrap();
|
||||
let entries = cursor
|
||||
.select_entries_where_eq(column, &value)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(entries.len() == 2);
|
||||
assert!(entries[0].data == vec![1, value, 3, 4, 5]);
|
||||
|
|
@ -371,7 +397,9 @@ mod tests {
|
|||
let table_path = Path::new("test_table_5");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let column: Column = 1;
|
||||
|
||||
|
|
@ -402,7 +430,10 @@ mod tests {
|
|||
let mut cursor = store.read_cursor().await.unwrap();
|
||||
let column = 1;
|
||||
|
||||
let entries = cursor.select_entries_where_eq(column, &value).await.unwrap();
|
||||
let entries = cursor
|
||||
.select_entries_where_eq(column, &value)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(entries.len() == 2);
|
||||
// Order may be non-deterministic.
|
||||
assert!(entries[0].data[column as usize] == value);
|
||||
|
|
@ -417,7 +448,9 @@ mod tests {
|
|||
let table_path = Path::new("test_table_6");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let value = 200;
|
||||
let (_file_position0, file_position1, _file_position2, _file_position3) = {
|
||||
|
|
@ -436,7 +469,12 @@ mod tests {
|
|||
let file_position3 = cursor.insert_entry(entry3).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 4);
|
||||
(file_position0, file_position1, file_position2, file_position3)
|
||||
(
|
||||
file_position0,
|
||||
file_position1,
|
||||
file_position2,
|
||||
file_position3,
|
||||
)
|
||||
};
|
||||
|
||||
{
|
||||
|
|
@ -454,7 +492,9 @@ mod tests {
|
|||
let table_path = Path::new("test_table_7");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let column: Column = 1;
|
||||
|
||||
|
|
@ -480,13 +520,21 @@ mod tests {
|
|||
let file_position3 = cursor.insert_entry(entry3).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 4);
|
||||
(file_position0, file_position1, file_position2, file_position3)
|
||||
(
|
||||
file_position0,
|
||||
file_position1,
|
||||
file_position2,
|
||||
file_position3,
|
||||
)
|
||||
};
|
||||
|
||||
{
|
||||
assert!(store.header.deleted_count == 0);
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
cursor.delete_entries_where_eq(column, &value, false).await.unwrap();
|
||||
cursor
|
||||
.delete_entries_where_eq(column, &value, false)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(store.header.deleted_count == 2);
|
||||
}
|
||||
}
|
||||
|
|
@ -498,7 +546,9 @@ mod tests {
|
|||
let table_path = Path::new("test_table_8");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let column: Column = 1;
|
||||
|
||||
|
|
@ -524,13 +574,21 @@ mod tests {
|
|||
let file_position3 = cursor.insert_entry(entry3).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 4);
|
||||
(file_position0, file_position1, file_position2, file_position3)
|
||||
(
|
||||
file_position0,
|
||||
file_position1,
|
||||
file_position2,
|
||||
file_position3,
|
||||
)
|
||||
};
|
||||
|
||||
{
|
||||
assert!(store.header.deleted_count == 0);
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
cursor.delete_entries_where_eq(column, &value, false).await.unwrap();
|
||||
cursor
|
||||
.delete_entries_where_eq(column, &value, false)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(cursor.header().deleted_count == 2);
|
||||
assert!(cursor.header().total_count == 4);
|
||||
|
||||
|
|
@ -539,5 +597,4 @@ mod tests {
|
|||
assert!(cursor.header().total_count == 2);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue