Merge branch 'redesign-tables' into 'main'

Storage Engine

See merge request x433485/minisql!26
This commit is contained in:
Yuriy Dupyn 2024-02-05 16:13:32 +01:00
commit efad0cc6bd
17 changed files with 2062 additions and 1 deletions

10
Cargo.lock generated
View file

@ -601,6 +601,16 @@ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "storage_engine"
version = "0.1.0"
dependencies = [
"async-trait",
"bincode",
"thiserror",
"tokio",
]
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.10.0" version = "0.10.0"

View file

@ -5,5 +5,6 @@ members = [
"proto", "proto",
"server", "server",
"client", "client",
"parser" "parser",
"storage_engine"
] ]

10
storage_engine/Cargo.toml Normal file
View file

@ -0,0 +1,10 @@
[package]
name = "storage_engine"
version = "0.1.0"
edition = "2021"
[dependencies]
bincode = "2.0.0-rc.3"
tokio = { version = "1.34.0", features = ["full"] }
async-trait = "0.1.74"
thiserror = "1.0.50"

View file

@ -0,0 +1,87 @@
use bincode;
use bincode::{Decode, Encode};
use bincode::config::{BigEndian, Configuration, Fixint};
use std::mem::size_of;
const BIN_CONFIG: Configuration<BigEndian, Fixint> = bincode::config::standard().with_big_endian().with_fixed_int_encoding();
pub fn encode<T: Encode>(t: &T) -> Result<Vec<u8>, bincode::error::EncodeError> {
bincode::encode_to_vec(t, BIN_CONFIG)
}
pub fn decode<T: Decode>(bytes: &[u8]) -> Result<(T, usize), bincode::error::DecodeError> {
bincode::decode_from_slice(bytes, BIN_CONFIG)
}
pub fn encode_vector<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
let size: usize = ts.len();
let mut result = encode(&size)?;
for t in ts {
result.append(&mut encode(&t)?);
}
Ok(result)
}
pub fn decode_vector<T: Decode>(bytes: &[u8]) -> Result<Vec<T>, bincode::error::DecodeError> {
let mut offset = size_of::<usize>();
let result_len: usize = decode(&bytes[..offset])?.0;
let mut result: Vec<T> = Vec::with_capacity(result_len);
for _ in 0..result_len {
let (x, bytes_consumed) = decode::<T>(&bytes[offset..])?;
offset += bytes_consumed;
result.push(x);
}
Ok(result)
}
// We don't care about encoding the length here (since it will be used for a row with known column
// size)
pub fn encode_sequence<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
let mut result = vec![];
for t in ts {
result.append(&mut encode(&t)?);
}
Ok(result)
}
pub fn encode_sequence_with_sizes<T: Encode>(ts: &[T]) -> Result<(Vec<u8>, Vec<usize>), bincode::error::EncodeError> {
let mut result_bytes = vec![];
let mut sizes = Vec::with_capacity(ts.len());
for t in ts {
let mut bytes = encode(&t)?;
sizes.push(bytes.len());
result_bytes.append(&mut bytes);
}
Ok((result_bytes, sizes))
}
pub fn decode_sequence<T: Decode>(len: usize, bytes: &[u8]) -> Result<Vec<T>, bincode::error::DecodeError> {
let mut result: Vec<T> = Vec::with_capacity(len);
let mut offset = 0;
for _ in 0..len {
let (x, bytes_consumed) = decode::<T>(&bytes[offset..])?;
offset += bytes_consumed;
result.push(x);
}
Ok(result)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encoding_decoding() {
let xs: Vec<String> = vec!["foo".to_string(), "bar".to_string()];
let exs = encode_vector(&xs[..]).unwrap();
// WARNING: Don't forget to specify the type here
let dxs = decode_vector::<String>(&exs[..]).unwrap();
assert!(dxs == xs);
}
}

View file

@ -0,0 +1,394 @@
use tokio::fs::{File, OpenOptions};
use tokio::fs;
use std::path::Path;
use std::marker::PhantomData;
use std::collections::{BTreeMap, HashSet};
use bincode;
use bincode::{Decode, Encode};
use crate::segments::entry::EntryDetailed;
use crate::segments::entry_header::EntryHeader;
use crate::segments::store_header::StoreHeader;
use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME};
use crate::index::Index;
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader};
use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex};
const GARBAGE_COLLECTION_TRIGGER: usize = 100;
// ===Concrete Cursors===
pub struct ReadCursor<'a, T> {
header: StoreHeader,
indexes: &'a [Option<Index<T, FilePosition>>],
file: File,
eof_file_position: FilePosition,
}
pub struct WriteCursor<'a, T> {
header: &'a mut StoreHeader,
indexes: &'a mut [Option<Index<T, FilePosition>>],
file: File,
eof_file_position: FilePosition,
}
// This is used as a cursor to temporary file during Garbage Collection
pub struct AppendOnlyCursor<T> {
header: StoreHeader,
file: File,
data_type: PhantomData<T>,
eof_file_position: FilePosition,
}
// ===========Implementations=============
// ===primitive capabilities===
impl <T>CursorCanRead<T> for ReadCursor<'_, T> {
fn file(&mut self) -> &mut File {
&mut self.file
}
fn eof_file_position(&self) -> FilePosition {
self.eof_file_position
}
}
impl <T>CursorCanRead<T> for WriteCursor<'_, T> {
fn file(&mut self) -> &mut File {
&mut self.file
}
fn eof_file_position(&self) -> FilePosition {
self.eof_file_position
}
}
impl <T>CursorCanRead<T> for AppendOnlyCursor<T> {
fn file(&mut self) -> &mut File {
&mut self.file
}
fn eof_file_position(&self) -> FilePosition {
self.eof_file_position
}
}
impl <T>CursorCanWrite<T> for WriteCursor<'_, T> {}
impl <T>CursorCanWrite<T> for AppendOnlyCursor<T> {}
// ===capability to access header===
impl <T>CursorCanReadHeader<T> for ReadCursor<'_, T> {
fn header(&self) -> &StoreHeader { &self.header }
}
impl <T>CursorCanReadHeader<T> for WriteCursor<'_, T> {
fn header(&self) -> &StoreHeader { &self.header }
}
impl <T>CursorCanReadHeader<T> for AppendOnlyCursor<T> {
fn header(&self) -> &StoreHeader { &self.header }
}
impl <T>CursorCanWriteHeader<T> for WriteCursor<'_, T> {
fn header_mut(&mut self) -> &mut StoreHeader { self.header }
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
}
impl <T>CursorCanWriteHeader<T> for AppendOnlyCursor<T> {
fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header }
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
}
// ===capability to access index===
impl <T>CursorCanReadIndex<T> for ReadCursor<'_, T> {
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
}
impl <T>CursorCanReadIndex<T> for WriteCursor<'_, T> {
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
}
impl <T>CursorCanWriteToIndex<T> for WriteCursor<'_, T> {
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>] { self.indexes }
}
// ===Specifics===
impl <'cursor, T> ReadCursor<'cursor, T> {
pub async fn new<'store: 'cursor>(store: &'store Store<T>) -> Result<Self>
where T: Send + Sync
{
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
let file: File =
OpenOptions::new()
.read(true)
.open(path_to_rows)
.await?;
let mut cursor = Self {
header: store.header.clone(),
file,
indexes: &store.indexes,
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
cursor.eof_file_position = eof_file_position;
cursor.seek_to_start_of_data().await?;
Ok(cursor)
}
}
impl <'cursor, T> WriteCursor<'cursor, T>
{
// 'store lives atleast as long as 'cursor
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
where T: Send
{
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
let file: File =
OpenOptions::new()
.read(true)
.write(true)
.open(path_to_rows)
.await?;
let mut cursor = Self {
header: &mut store.header,
file,
indexes: &mut store.indexes,
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
cursor.eof_file_position = eof_file_position;
cursor.seek_to_start_of_data().await?;
Ok(cursor)
}
// ===Entry Header Manipulation===
// assumes we are at the start of valid entry.
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()>
where T: Send
{
let bytes: Vec<u8> = entry_header.encode()?;
self.write_bytes(&bytes).await?;
Ok(())
}
// ===Deletion===
pub async fn mark_deleted_at(&mut self, file_position: FilePosition, enable_garbage_collector: bool) -> Result<()>
where T: Encode + Decode + Ord + Send + Sync + Clone + Ord
{
self.seek_to(file_position).await?;
let mut entry_header = self.read_entry_header().await?;
if entry_header.is_deleted {
Ok(())
} else {
// Update store and entry headers
self.increment_deleted_count().await?;
self.seek_to(file_position).await?;
entry_header.is_deleted = true;
self.set_new_entry_header(entry_header.into()).await?;
// Update index
self.seek_to(file_position).await?;
match self.next().await? {
Some(entry) => {
self.delete_entry_values_from_indexes(entry).await?
},
None => {
// SAFETY: We just modified its header, so it must exist.
unreachable!()
}
}
if enable_garbage_collector {
self.attempt_garbage_collection_if_necessary().await?;
}
Ok(())
}
}
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T, enable_garbage_collector: bool) -> Result<Option<EntryDetailed<T>>>
where T: Encode + Decode + Ord + Send + Sync + Clone
{
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
if let Some(entry) = maybe_entry {
self.mark_deleted_at(entry.file_position, enable_garbage_collector).await?;
Ok(Some(entry))
} else {
Ok(maybe_entry)
}
}
// Doesn't update indexes.
async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<usize>
where T: Encode + Decode + Ord + Send + Sync + Clone
{
let mut count = 0;
while let Some(_) = self.find_first_eq_bruteforce_and_delete(column, t0, false).await? {
count += 1;
}
Ok(count)
}
pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result<usize>
where T: Encode + Decode + Ord + Send + Sync + Clone
{
let count =
if self.header().is_column_indexed(column) {
let entries = self.index_lookup(column, value).await?;
let count = entries.len();
for entry in entries {
self.mark_deleted_at(entry.file_position, false).await?
}
count
} else {
let count = self.find_all_eq_bruteforce_and_delete(column, value).await?;
count
};
if enable_garbage_collector {
self.attempt_garbage_collection_if_necessary().await?;
}
Ok(count)
}
// ===Indexing===
// WARNING: Assumes the column is NOT indexable.
pub async fn attach_index(&mut self, column: Column) -> Result<()>
where T: Ord + Decode + Encode + Send + Sync
{
// New Index
let index = Store::create_empty_index_at(&self.header, column).await?;
self.indexes[column as usize] = Some(index);
// Mark column as indexed
self.header.make_column_indexed(column);
self.set_header(&self.header.clone()).await?;
// Build index
self.seek_to_start_of_data().await?;
while let Some((_, file_position, value)) = self.next_alive_at_column(column).await? {
self.insert_into_index(column, value, file_position).await?
}
Ok(())
}
// ===Garbage Collection===
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
where T: Send + Sync + Decode + Encode + Clone + Ord
{
if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER {
println!("=======START GARBAGE COLLECTOR====");
self.initiate_garbage_collection().await?;
println!("=======GARBAGE COLLECTOR FINISHED====");
}
Ok(())
}
pub async fn initiate_garbage_collection(&mut self) -> Result<usize>
where T: Send + Sync + Decode + Encode + Clone + Ord
{
let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?;
// Since garbage collection changes FilePositions of live entries, we need to update the
// indexes too.
let mut in_memory_indexes: Vec<Option<BTreeMap<T, HashSet<FilePosition>>>> = Vec::with_capacity(self.header.number_of_columns);
for column in 0..self.header.number_of_columns {
if self.header.is_column_indexed(column as Column) {
let in_memory_index = BTreeMap::new();
in_memory_indexes.push(Some(in_memory_index))
} else {
in_memory_indexes.push(None)
}
}
// We'll dump all alive entries into a new file.
let mut entries_deleted = 0;
self.seek_to_start_of_data().await?;
{
while let Some(live_entry) = self.next_alive().await? {
entries_deleted += 1;
let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?;
// Update index. (Wouldn't it be nice if we had `for let ...`?)
for (maybe_in_memory_index, value) in in_memory_indexes.iter_mut().zip(&live_entry.data) {
if let Some(in_memory_index) = maybe_in_memory_index {
in_memory_index.entry(value.clone()).or_insert_with(HashSet::new).insert(file_position);
}
}
}
}
// ===swap===
// swapping indexes
// Update indexes on disk.
for (column, maybe_in_memory_index) in in_memory_indexes.into_iter().enumerate() {
if let Some(in_memory_index) = maybe_in_memory_index {
let index = self.mut_index_at(column as Column);
index.reset(in_memory_index).await?;
}
}
// swapping headers
self.header.deleted_count = 0;
self.header.total_count = cursor_to_intermediate.header.total_count;
self.file = cursor_to_intermediate.file;
self.eof_file_position = cursor_to_intermediate.eof_file_position;
// swap files on disk
// current file
let path_to_table = Path::new(&self.header.table_folder);
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
let path_to_intermediate_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
fs::remove_file(path_to_rows.clone()).await?;
fs::rename(path_to_intermediate_rows, path_to_rows).await?;
Ok(entries_deleted)
}
async fn spawn_cursor_to_intermediate_file(&self) -> Result<AppendOnlyCursor<T>>
where T: Send
{
let table_folder = self.header.table_folder.to_string();
let path_to_table = Path::new(&table_folder);
let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
let intermediate_file: File = Store::<T>::create_empty_rows_file(path_to_rows, &self.header).await?;
let intermediate_header: StoreHeader = StoreHeader {
table_folder,
number_of_columns: self.header.number_of_columns,
deleted_count: 0,
total_count: 0,
primary_column: self.header.primary_column,
indexed_columns: self.header.indexed_columns.clone(),
};
// Creates a new (append) cursor to the intermediate file in which we'll dump the live entries.
let mut cursor_to_intermediate = AppendOnlyCursor {
header: intermediate_header,
file: intermediate_file,
data_type: PhantomData::<T>,
eof_file_position: 0,
};
let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?;
cursor_to_intermediate.eof_file_position = eof_file_position;
Ok(cursor_to_intermediate)
}
}

View file

@ -0,0 +1,230 @@
use tokio::io::AsyncReadExt;
use async_trait::async_trait;
use bincode;
use bincode::{Decode, Encode};
use crate::binary_coding::{encode, decode};
use crate::error::{Error, DecodeErrorKind};
use crate::segments::entry::{Entry, EntryDetailed};
use crate::segments::entry_header::EntryHeaderWithDataSize;
use crate::segments::store_header::StoreHeader;
use crate::storage_engine::{FilePosition, Column, Result};
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
#[async_trait]
pub trait CursorCanReadHeader<T>: CursorCanRead<T> {
fn header(&self) -> &StoreHeader;
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await
}
async fn read_entry_header(&mut self) -> Result<EntryHeaderWithDataSize> {
let number_of_columns: usize = self.header().number_of_columns;
let mut header_bytes: Vec<u8> = vec![0; EntryHeaderWithDataSize::size(number_of_columns)];
self.read_bytes(&mut header_bytes).await?;
let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..], number_of_columns)?;
Ok(header)
}
async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result<EntryHeaderWithDataSize> {
self.seek_to(file_position).await?;
self.read_entry_header().await
}
// Returns None when file_position == eof_file_position
async fn read_entry_at(&mut self, file_position: FilePosition) -> Result<Option<EntryDetailed<T>>>
where T: Decode
{
self.seek_to(file_position).await?;
self.next().await
}
// ===Iteration===
// The following functions assume that the current file position is at a valid entry or EOF.
// WARNING: This moves the file_position to start of the data, so you can't just call
// next_entry_header() a bunch of times. You must move the cursor!
async fn next_entry_header(&mut self) -> Result<Option<EntryHeaderWithDataSize>> {
if self.is_at_eof().await? {
return Ok(None)
}
let entry_header = self.read_entry_header().await?;
Ok(Some(entry_header))
}
// This is meant to be used after next_entry_header() is called.
async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result<FilePosition>{
let file_position = self.seek_by(entry_header.size_of_data() as i64).await?;
Ok(file_position)
}
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
where T: Decode
{
let file_position = self.current_file_position().await?;
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
let mut data_bytes: Vec<u8> = vec![0; entry_header.size_of_data()];
self.read_bytes(&mut data_bytes).await?;
let entry: EntryDetailed<T> =
EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?;
Ok(Some(entry))
}
// Like next, but only reads the column, not the whole entry.
async fn next_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
where T: Decode + Send
{
let file_position = self.current_file_position().await?;
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
let file_position_at_start_of_data = self.current_file_position().await?;
// figuring out how much to decode
let column_offset = entry_header.offset_of_column(column);
self.seek_by(column_offset as i64).await?;
// reading and decoding
let mut bytes: Vec<u8> = vec![0; entry_header.data_sizes[column as usize]];
self.read_bytes(&mut bytes).await?;
let (value, _) =
decode::<T>(&bytes[..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
// jumping to next entry
self.seek_to(file_position_at_start_of_data).await?;
self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?;
Ok(Some((entry_header, file_position, value)))
}
async fn next_alive_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
where T: Decode + Send
{
while let Some((header, file_position, t)) = self.next_at_column(column).await? {
if !header.is_deleted {
return Ok(Some((header, file_position, t)))
}
}
Ok(None)
}
async fn next_alive(&mut self) -> Result<Option<EntryDetailed<T>>>
where T: Decode
{
while let Some(entry) = self.next().await? {
if !entry.header.is_deleted {
return Ok(Some(entry))
}
}
Ok(None)
}
// ===Search===
async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
where T: Decode + PartialEq + Send + Sync
{
let mut file_position = self.current_file_position().await?;
while let Some((_, _, t)) = self.next_alive_at_column(column).await? {
if &t == t0 {
// go back and decode the whole entry
self.seek_to(file_position).await?;
return self.next().await
} else {
file_position = self.current_file_position().await?;
}
}
Ok(None)
}
async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Vec<EntryDetailed<T>>>
where T: Decode + PartialEq + Send + Sync
{
let mut entries = vec![];
while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? {
entries.push(entry)
}
Ok(entries)
}
// ===Debugging===
async fn read_entries(&mut self) -> Result<()>
where T: Decode + std::fmt::Debug
{
self.seek_to_start_of_data().await?;
while let Some(entry) = self.next().await? {
println!("{:?}", entry);
}
println!("END of entries.");
Ok(())
}
async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error> {
let mut bytes: Vec<u8> = vec![];
self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?;
self.file().read_to_end(&mut bytes).await?;
Ok(bytes)
}
}
#[async_trait]
pub trait CursorCanWriteHeader<T>: CursorCanReadHeader<T> + CursorCanWrite<T> {
fn header_mut(&mut self) -> &mut StoreHeader;
fn set_eof_file_position(&mut self, new_file_position: FilePosition);
// ===Store Header Manipulation===
async fn increment_total_count(&mut self) -> Result<()>
where T: Send
{
self.seek_to_start().await?;
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
let new_count = self.header_mut().increment_total_count();
self.write_bytes(&encode::<usize>(&new_count)?).await?;
Ok(())
}
async fn increment_deleted_count(&mut self) -> Result<()>
where T: Send
{
self.seek_to_start().await?;
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?;
let new_count = self.header_mut().increment_deleted_count();
self.write_bytes(&encode::<usize>(&new_count)?).await?;
Ok(())
}
async fn set_header(&mut self, header: &StoreHeader) -> Result<()>
where T: Send
{
self.seek_to_start().await?;
let encoded_header: Vec<u8> = header.encode()?;
self.write_bytes(&encoded_header).await?;
Ok(())
}
// ===Append Entry===
// Moves cursor to the end.
// Returns file position to the start of the new entry.
async fn append_entry_no_indexing(&mut self, entry: &Entry<T>) -> Result<FilePosition>
where T: Encode + Send + Sync
{
self.increment_total_count().await?;
let encoded_entry: Vec<u8> = entry.encode()?;
let file_position = self.seek_to_end().await?;
self.write_bytes(&encoded_entry).await?;
let eof_file_position: FilePosition = self.current_file_position().await?;
self.set_eof_file_position(eof_file_position);
Ok(file_position)
}
}

View file

@ -0,0 +1,115 @@
use std::collections::HashSet;
use async_trait::async_trait;
use bincode;
use bincode::{Decode, Encode};
use crate::error::Error;
use crate::segments::entry::{Entry, EntryDetailed};
use crate::storage_engine::{FilePosition, Column, Result};
use crate::index::Index;
use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader};
#[async_trait]
pub trait CursorCanReadIndex<T>: CursorCanReadHeader<T> {
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>];
async fn index_lookup(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>>
where T: Encode + Decode + Ord + Send + Sync
{
match &self.indexes()[column as usize] {
Some(index) => {
let file_positions = index.lookup(value).await?.unwrap_or_else(|| HashSet::new());
let mut entries: Vec<EntryDetailed<T>> = vec![];
for &file_position in file_positions.iter() {
match self.read_entry_at(file_position).await? {
Some(entry) => {
entries.push(entry)
},
None => {
return Err(Error::IndexIsStoringEofFilePosition(column))
}
}
}
Ok(entries)
},
None =>
Err(Error::AttemptToIndexNonIndexableColumn(column))
}
}
async fn select_entries_where_eq(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>>
where T: Encode + Decode + Ord + Send + Sync
{
if self.header().is_column_indexed(column) {
self.index_lookup(column, value).await
} else {
self.find_all_eq_bruteforce(column, value).await
}
}
}
#[async_trait]
pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanWriteHeader<T> {
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>];
// Assumes that the column is indexable.
fn mut_index_at(&mut self, column: Column) -> &mut Index<T, FilePosition> {
match &mut self.indexes_mut()[column as usize] {
Some(index) => {
index
},
None => {
unreachable!()
}
}
}
// Assumes that the column is indexable.
async fn insert_into_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()>
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
{
let index = self.mut_index_at(column as Column);
index.insert(value, file_position).await?;
Ok(())
}
// Assumes that the column is indexable.
async fn delete_from_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()>
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
{
let index = self.mut_index_at(column as Column);
index.delete(value, file_position).await?;
Ok(())
}
async fn insert_entry(&mut self, entry: Entry<T>) -> Result<FilePosition>
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
{
let file_position = self.append_entry_no_indexing(&entry).await?;
// insert the indexable values of the entry into corresponding indexes.
for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() {
if should_index {
// SAFETY: If should_index is true, then the column is indexable.
self.insert_into_index(column as Column, value, file_position).await?
}
}
Ok(file_position)
}
async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed<T>) -> Result<()>
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
{
for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() {
if should_index {
// SAFETY: If should_index is true, then the column is indexable.
self.delete_from_index(column as Column, value, entry.file_position).await?
}
}
Ok(())
}
}

View file

@ -0,0 +1,3 @@
pub(crate) mod primitive;
pub(crate) mod header_access;
pub(crate) mod index_access;

View file

@ -0,0 +1,62 @@
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom};
use tokio::fs::File;
use async_trait::async_trait;
use crate::storage_engine::{FilePosition, Result};
#[async_trait]
pub(crate) trait CursorCanRead<T> {
fn file(&mut self) -> &mut File;
fn eof_file_position(&self) -> FilePosition;
async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> {
self.file().read_exact(bytes).await?;
Ok(())
}
async fn get_bytes(&mut self, count: usize) -> Result<Vec<u8>> {
let mut result: Vec<u8> = Vec::with_capacity(count);
self.read_bytes(&mut result).await?;
Ok(result)
}
async fn seek_to(&mut self, file_position: FilePosition) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::Start(file_position)).await?;
Ok(file_position)
}
// Start of the file i.e. the Header, not the entries.
async fn seek_to_start(&mut self) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::Start(0)).await?;
Ok(file_position)
}
async fn seek_to_end(&mut self) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::End(0)).await?;
Ok(file_position)
}
// Seeks from current position by offset and returns new file position
async fn seek_by(&mut self, offset: i64) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::Current(offset)).await?;
Ok(file_position)
}
async fn current_file_position(&mut self) -> Result<FilePosition> {
let next_file_position: FilePosition = self.file().stream_position().await?;
Ok(next_file_position)
}
async fn is_at_eof(&mut self) -> Result<bool> {
let current_file_position = self.current_file_position().await?;
let eof_file_position = self.eof_file_position();
Ok(current_file_position == eof_file_position)
}
}
#[async_trait]
pub(crate) trait CursorCanWrite<T>: CursorCanRead<T> {
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
Ok(self.file().write(bytes).await?)
}
}

View file

@ -0,0 +1,50 @@
use crate::storage_engine::Column;
#[derive(Debug)]
pub enum Error {
DecodeError(DecodeErrorKind, bincode::error::DecodeError),
EncodeError(bincode::error::EncodeError),
AttemptToIndexNonIndexableColumn(Column),
IndexIsStoringEofFilePosition(Column),
ColumnAlreadyIndexed(Column),
IoError(std::io::Error),
}
#[derive(Debug)]
pub enum DecodeErrorKind {
StoreHeaderNumberOfColumns,
StoreHeaderDeletedCount,
StoreHeaderTotalCount,
StoreHeaderPrimaryColumn,
StoreHeaderIndexedColumns,
EntryData,
EntryIsDeleted,
EntryHeaderWithDataSizes,
CorruptedData,
}
// ===Errors===
impl Error {
pub fn to_io_or_panic(self) -> std::io::Error {
use Error::*;
match self {
IoError(err) => err,
err => {
println!("{:?}", err);
panic!();
}
}
}
}
impl From<bincode::error::EncodeError> for Error {
fn from(err: bincode::error::EncodeError) -> Self {
Self::EncodeError(err)
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self::IoError(err)
}
}

290
storage_engine/src/index.rs Normal file
View file

@ -0,0 +1,290 @@
use std::path::PathBuf;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter};
use std::collections::{BTreeMap, HashSet};
use std::hash::Hash;
use std::io::SeekFrom;
use crate::binary_coding::{decode, encode};
use bincode;
use bincode::{Decode, Encode};
use crate::error::{DecodeErrorKind, Error};
type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
pub struct Index<K, V> {
file: File,
data: BTreeMap<K, HashSet<V>>,
}
#[derive(Debug)]
pub struct IndexHeader {}
impl<K, V> Index<K, V>
where
K: Encode + Decode + Ord,
V: Encode + Decode + Clone + Eq + Hash,
{
pub async fn new(file_name: PathBuf) -> Result<Index<K, V>> {
let file: File = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(file_name)
.await?;
let data = BTreeMap::new();
Ok(Index { file, data })
}
pub async fn connect(file_name: PathBuf) -> Result<Index<K, V>> {
let file: File = OpenOptions::new()
.read(true)
.write(true)
.open(file_name)
.await?;
let mut index = Index {
file,
data: BTreeMap::new(),
};
index.load_from_file().await?;
Ok(index)
}
pub async fn insert(&mut self, k: K, v: V) -> Result<()> {
self.append_to_file(&k, &v).await?;
self.data.entry(k).or_insert_with(HashSet::new).insert(v);
Ok(())
}
pub fn insert_desynced(&mut self, k: K, v: V) -> () {
self.data.entry(k).or_insert_with(HashSet::new).insert(v);
}
pub async fn lookup(&self, k: &K) -> Result<Option<HashSet<V>>> {
let hashset = self.data.get(k).cloned();
Ok(hashset)
}
pub async fn delete(&mut self, k: K, v: V) -> Result<()> {
self.data.entry(k).and_modify(|values| {
values.remove(&v);
});
self.sync_to_disk().await
}
pub async fn sync_to_disk(&mut self) -> Result<()> {
let mut writer = BufWriter::new(&mut self.file);
writer.seek(SeekFrom::Start(0)).await?;
let mut written: u64 = 0;
let mut encoded = Vec::new();
for (key, value) in &self.data {
for v in value {
encoded.clear();
encoded.extend(encode(key)?);
encoded.extend(encode(v)?);
writer.write(&encoded).await?;
written += encoded.len() as u64;
}
}
writer.flush().await?;
self.file.set_len(written).await?;
Ok(())
}
pub async fn reset(&mut self, data: BTreeMap<K, HashSet<V>>) -> Result<()> {
self.data = data;
self.sync_to_disk().await
}
async fn append_to_file(&mut self, key: &K, value: &V) -> Result<()> {
let mut encoded = Vec::new();
encoded.extend(encode(key)?);
encoded.extend(encode(value)?);
self.file.seek(SeekFrom::End(0)).await?;
self.file.write(&encoded).await?;
Ok(())
}
async fn load_from_file(&mut self) -> Result<()> {
let mut bytes = vec![];
self.file.seek(SeekFrom::Start(0)).await?;
self.file.read_to_end(&mut bytes).await?;
let mut cursor = 0;
while cursor < bytes.len() {
let (key, len) = decode(&bytes[cursor..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?;
cursor += len;
let (value, len) = decode(&bytes[cursor..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?;
cursor += len;
self.insert_desynced(key, value);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::fs::remove_file;
#[tokio::test]
async fn connect_to_new() {
let file_name = PathBuf::from("connect_to_new");
if file_name.exists() {
remove_file(&file_name).await.unwrap();
}
{
let index = Index::<u64, u64>::new(file_name.clone()).await.unwrap();
assert_eq!(index.data.len(), 0);
}
{
let index = Index::<u64, u64>::connect(file_name.clone()).await.unwrap();
assert_eq!(index.data.len(), 0);
}
remove_file(&file_name).await.unwrap();
}
#[tokio::test]
async fn inserting() {
let file_name = PathBuf::from("inserting");
if file_name.exists() {
remove_file(&file_name).await.unwrap();
}
{
let mut index = Index::<u64, u64>::new(file_name.clone()).await.unwrap();
index.insert(1, 2).await.unwrap();
index.insert(1, 3).await.unwrap();
index.insert(1, 4).await.unwrap();
index.insert(2, 3).await.unwrap();
index.insert(2, 4).await.unwrap();
index.insert(2, 5).await.unwrap();
assert_eq!(index.data.len(), 2);
assert_eq!(index.data.get(&1).unwrap().len(), 3);
assert_eq!(index.data.get(&2).unwrap().len(), 3);
}
{
let index = Index::<u64, u64>::connect(file_name.clone()).await.unwrap();
assert_eq!(index.data.len(), 2);
assert_eq!(index.data.get(&1).unwrap().len(), 3);
assert_eq!(index.data.get(&2).unwrap().len(), 3);
}
remove_file(&file_name).await.unwrap();
}
#[tokio::test]
async fn lookuping() {
let file_name = PathBuf::from("lookuping");
if file_name.exists() {
remove_file(&file_name).await.unwrap();
}
{
let mut index = Index::<u64, u64>::new(file_name.clone()).await.unwrap();
index.insert(1, 2).await.unwrap();
index.insert(1, 3).await.unwrap();
index.insert(1, 4).await.unwrap();
index.insert(2, 3).await.unwrap();
index.insert(2, 4).await.unwrap();
index.insert(2, 5).await.unwrap();
assert_eq!(index.lookup(&1).await.unwrap().unwrap().len(), 3);
assert_eq!(index.lookup(&2).await.unwrap().unwrap().len(), 3);
assert_eq!(index.lookup(&3).await.unwrap(), None);
let first = index.lookup(&1).await.unwrap().unwrap();
assert!(first.contains(&2));
assert!(first.contains(&3));
assert!(first.contains(&4));
let second = index.lookup(&2).await.unwrap().unwrap();
assert!(second.contains(&3));
assert!(second.contains(&4));
assert!(second.contains(&5));
}
{
let index = Index::<u64, u64>::connect(file_name.clone()).await.unwrap();
assert_eq!(index.lookup(&1).await.unwrap().unwrap().len(), 3);
assert_eq!(index.lookup(&2).await.unwrap().unwrap().len(), 3);
assert_eq!(index.lookup(&3).await.unwrap(), None);
let first = index.lookup(&1).await.unwrap().unwrap();
assert!(first.contains(&2));
assert!(first.contains(&3));
assert!(first.contains(&4));
let second = index.lookup(&2).await.unwrap().unwrap();
assert!(second.contains(&3));
assert!(second.contains(&4));
assert!(second.contains(&5));
}
remove_file(&file_name).await.unwrap();
}
#[tokio::test]
async fn deleting() {
let file_name = PathBuf::from("deleting");
if file_name.exists() {
remove_file(&file_name).await.unwrap();
}
{
let mut index = Index::<u64, u64>::new(file_name.clone()).await.unwrap();
index.insert(1, 2).await.unwrap();
index.insert(1, 3).await.unwrap();
index.insert(1, 4).await.unwrap();
index.insert(2, 3).await.unwrap();
index.insert(2, 4).await.unwrap();
index.insert(2, 5).await.unwrap();
assert!(index.lookup(&1).await.unwrap().unwrap().contains(&2));
index.delete(1, 2).await.unwrap();
assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&2));
assert!(index.lookup(&2).await.unwrap().unwrap().contains(&3));
index.delete(2, 3).await.unwrap();
assert!(!index.lookup(&2).await.unwrap().unwrap().contains(&3));
}
{
let mut index = Index::<u64, u64>::connect(file_name.clone()).await.unwrap();
assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&2));
assert!(!index.lookup(&2).await.unwrap().unwrap().contains(&3));
assert!(index.lookup(&1).await.unwrap().unwrap().contains(&3));
index.delete(1, 3).await.unwrap();
assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&3));
assert!(index.lookup(&1).await.unwrap().unwrap().contains(&4));
assert!(index.lookup(&2).await.unwrap().unwrap().contains(&4));
assert!(index.lookup(&2).await.unwrap().unwrap().contains(&5));
}
remove_file(&file_name).await.unwrap();
}
}

View file

@ -0,0 +1,7 @@
pub mod storage_engine;
mod binary_coding;
mod error;
mod index;
mod cursor;
mod segments;
mod cursor_capabilities;

View file

@ -0,0 +1,60 @@
use bincode::{Decode, Encode};
use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence};
use crate::storage_engine::{Result, FilePosition};
use crate::error::{Error, DecodeErrorKind};
use crate::segments::entry_header::{EntryHeader, EntryHeaderWithDataSize};
#[derive(Debug)]
pub struct Entry<T> {
pub header: EntryHeader,
pub data: Vec<T>,
}
#[derive(Debug)]
pub struct EntryDetailed<T> {
pub header: EntryHeaderWithDataSize,
pub file_position: FilePosition,
pub data: Vec<T>,
}
impl <T>Entry<T> {
pub fn new(data: Vec<T>) -> Self {
Self { header: EntryHeader { is_deleted: false }, data }
}
pub fn new_deleted(data: Vec<T>) -> Self {
Self { header: EntryHeader { is_deleted: true}, data }
}
// FORMAT: [EntryHeaderWithDataSize, ..sequence of data]
pub fn encode(&self) -> Result<Vec<u8>>
where T: Encode
{
let mut result: Vec<u8> = self.header.encode()?;
let (mut encoded_data, sizes) = encode_sequence_with_sizes(&self.data[..])?;
result.append(&mut encode_sequence(&sizes)?); // sizes of data (fixed by number of columns)
result.append(&mut encoded_data); // data variable size
Ok(result)
}
}
impl <T>EntryDetailed<T> {
pub fn decode(header: EntryHeaderWithDataSize, file_position: FilePosition, number_of_columns: usize, bytes: &[u8]) -> Result<Self>
where T: Decode
{
let data = decode_sequence::<T>(number_of_columns, bytes)
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?;
Ok(EntryDetailed { header, file_position, data })
}
pub fn forget(&self) -> Entry<T>
where T: Clone
{
Entry {
header: self.header.clone().into(),
data: self.data.clone(),
}
}
}

View file

@ -0,0 +1,68 @@
use crate::binary_coding::{decode, encode, decode_sequence};
use crate::storage_engine::{Result, Column};
use crate::error::{Error, DecodeErrorKind};
use std::mem::size_of;
#[derive(Debug)]
pub struct EntryHeader {
pub is_deleted: bool,
}
#[derive(Debug, Clone)]
pub struct EntryHeaderWithDataSize {
pub is_deleted: bool,
pub data_sizes: Vec<usize>, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6
// bytes etc
}
impl EntryHeader {
pub fn encode(self: &EntryHeader) -> Result<Vec<u8>> {
let result: Vec<u8> = encode(&self.is_deleted)?;
Ok(result)
}
}
impl From<EntryHeaderWithDataSize> for EntryHeader {
fn from(entry: EntryHeaderWithDataSize) -> Self {
Self { is_deleted: entry.is_deleted, }
}
}
impl EntryHeaderWithDataSize {
pub const IS_DELETED_OFFSET: usize = 0;
pub const IS_DELETED_SIZE: usize = size_of::<bool>();
pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE;
pub fn size(number_of_columns: usize) -> usize {
let size_of_data_sizes: usize = number_of_columns*size_of::<usize>();
Self::IS_DELETED_SIZE + size_of_data_sizes
}
pub fn size_of_data(&self) -> usize{
self.data_sizes.iter().sum()
}
pub fn offset_of_column(&self, column: Column) -> usize {
let mut sum = 0;
for (i, size) in self.data_sizes.iter().enumerate() {
if i < column as usize {
sum += size;
} else {
break
}
}
sum
}
pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result<Self> {
let (is_deleted, _) =
decode::<bool>(&bytes)
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
let data_sizes = decode_sequence::<usize>(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?;
Ok(Self { is_deleted, data_sizes } )
}
}

View file

@ -0,0 +1,3 @@
pub mod entry;
pub mod entry_header;
pub mod store_header;

View file

@ -0,0 +1,127 @@
use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence};
use crate::storage_engine::{Result, Column};
use crate::error::{Error, DecodeErrorKind};
use std::mem::size_of;
#[derive(Debug, Clone)]
pub struct StoreHeader {
pub table_folder: String, // This one is not encoded into the file
pub number_of_columns: usize,
pub deleted_count: usize,
pub total_count: usize,
pub primary_column: Column,
pub indexed_columns: Vec<bool>,
}
#[derive(Debug, Clone)]
pub struct StoreHeaderFixedPart {
pub table_folder: String, // This one is not encoded into the file
pub number_of_columns: usize,
pub deleted_count: usize,
pub total_count: usize,
pub primary_column: Column,
}
impl StoreHeader {
pub const NUMBER_OF_COLUMNS_SIZE: usize = size_of::<usize>();
pub const DELETED_COUNT_SIZE: usize = size_of::<usize>();
pub const TOTAL_COUNT_SIZE: usize = size_of::<usize>();
pub const PRIMARY_COLUMN_SIZE: usize = size_of::<Column>();
pub const FIXED_SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE;
pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0;
pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE;
pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE;
pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE;
#[allow(dead_code)]
pub const INDEXED_COLUMNS_OFFSET: usize = Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE;
fn indexed_columns_size(number_of_columns: usize) -> usize {
size_of::<bool>() * number_of_columns
}
pub fn size(number_of_columns: usize) -> usize {
Self::FIXED_SIZE + Self::indexed_columns_size(number_of_columns)
}
pub fn encode(&self) -> Result<Vec<u8>> {
let mut result = encode(&self.number_of_columns)?;
result.append(&mut encode(&self.deleted_count)?);
result.append(&mut encode(&self.total_count)?);
result.append(&mut encode(&self.primary_column)?);
result.append(&mut encode_sequence(&self.indexed_columns)?);
Ok(result)
}
pub fn buffer_for_fixed_decoding() -> [u8; Self::FIXED_SIZE] {
[0; Self::FIXED_SIZE]
}
pub fn buffer_for_rest_decoding(header: &StoreHeaderFixedPart) -> Vec<u8> {
vec![0; Self::indexed_columns_size(header.number_of_columns)]
}
pub async fn decode_fixed(table_folder: &str, result: &[u8]) -> Result<StoreHeaderFixedPart> {
let (number_of_columns, _) =
decode::<usize>(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
let (deleted_count, _) =
decode::<usize>(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?;
let (total_count, _) =
decode::<usize>(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?;
let (primary_column, _) =
decode::<Column>(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?;
let header = StoreHeaderFixedPart {
table_folder: table_folder.to_string(),
number_of_columns,
deleted_count,
total_count,
primary_column,
};
Ok(header)
}
pub async fn decode_rest(header: StoreHeaderFixedPart, result: &[u8]) -> Result<StoreHeader> {
let indexed_columns: Vec<bool> =
decode_sequence::<bool>(header.number_of_columns, result)
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?;
Ok(StoreHeader {
table_folder: header.table_folder,
number_of_columns: header.number_of_columns,
deleted_count: header.deleted_count,
total_count: header.total_count,
primary_column: header.primary_column,
indexed_columns,
})
}
// returns new count
pub fn increment_total_count(&mut self) -> usize {
self.total_count += 1;
self.total_count
}
// returns new count
pub fn increment_deleted_count(&mut self) -> usize {
self.deleted_count += 1;
self.deleted_count
}
pub fn is_column_indexed(&self, column: Column) -> bool {
self.indexed_columns[column as usize]
}
pub fn make_column_indexed(&mut self, column: Column) {
self.indexed_columns[column as usize] = true
}
}

View file

@ -0,0 +1,544 @@
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::fs::{File, OpenOptions, DirBuilder};
use tokio::fs;
use std::path::{Path, PathBuf};
use bincode::{Decode, Encode};
use crate::error::Error;
use crate::cursor::{ReadCursor, WriteCursor};
use crate::cursor_capabilities::header_access::CursorCanReadHeader;
use crate::segments::store_header::StoreHeader;
use crate::index::Index;
pub type Result<T> = std::result::Result<T, Error>;
pub type Column = u64;
pub type FilePosition = u64;
// TODO: Consider adding another type parameter for indexable values
#[derive(Debug)]
pub struct Store<T> {
pub header: StoreHeader,
pub indexes: StoreIndexes<T>,
}
pub type StoreIndexes<T> = Vec<Option<Index<T, FilePosition>>>;
//===Store===
pub async fn store_exists(table_folder: &str) -> Result<bool> {
Ok(fs::metadata(table_folder).await.is_ok())
}
pub const ROWS_FILE_NAME: &'static str = "rows";
pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate";
impl <T>Store<T> {
// ===Creation===
pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result<Self>
where T: Encode + Decode + Ord
{
let path_to_table = Path::new(table_folder);
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
DirBuilder::new()
.create(path_to_table).await?;
let header = {
let mut indexed_columns = vec![false; number_of_columns];
indexed_columns[primary_column as usize] = true;
StoreHeader {
table_folder: table_folder.to_string(),
number_of_columns,
deleted_count: 0,
total_count: 0,
primary_column,
indexed_columns,
}
};
// We don't need the file right now. Only cursors will later open it.
Self::create_empty_rows_file(path_to_rows, &header).await?;
let indexes: StoreIndexes<T> = Self::create_initial_indexes(&header).await?;
let store = Self {
header,
indexes,
};
Ok(store)
}
pub fn path_to_index_file(header: &StoreHeader, column: Column) -> PathBuf {
let path_to_table = Path::new(&header.table_folder);
let path_to_index = path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column.to_string()));
path_to_index
}
pub async fn create_empty_index_at(header: &StoreHeader, column: Column) -> Result<Index<T, FilePosition>>
where T: Encode + Decode + Ord
{
let path_to_index = Self::path_to_index_file(&header, column);
let index = Index::new(path_to_index).await?;
Ok(index)
}
pub async fn create_initial_indexes(header: &StoreHeader) -> Result<StoreIndexes<T>>
where T: Encode + Decode + Ord
{
let mut result: StoreIndexes<T> = Vec::with_capacity(header.number_of_columns);
for _ in 0..header.number_of_columns {
result.push(None)
}
result[header.primary_column as usize] = Some(Self::create_empty_index_at(&header, header.primary_column).await?);
Ok(result)
}
pub async fn connect_index_at(header: &StoreHeader, column: Column) -> Result<Index<T, FilePosition>>
where T: Encode + Decode + Ord
{
let path_to_index = Self::path_to_index_file(&header, column);
let index: Index<T, FilePosition> = Index::connect(path_to_index).await?;
Ok(index)
}
pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result<File> {
let mut file: File =
OpenOptions::new()
.write(true)
.read(true)
.create_new(true)
.open(path_to_rows)
.await?;
let encoded_header: Vec<u8> = header.encode()?;
file.write(&encoded_header).await?;
Ok(file)
}
pub async fn connect(table_folder: &str) -> Result<Self>
where T: std::fmt::Debug + Encode + Decode + Ord
{
let path_to_table = Path::new(table_folder);
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
let mut file: File =
OpenOptions::new()
.read(true)
.write(true)
.open(path_to_rows)
.await?;
// Unfortunately we can't yet use store.read_bytes, since it can't be created without the
// header.
let header = {
let mut fixed_header_bytes = StoreHeader::buffer_for_fixed_decoding();
file.read_exact(&mut fixed_header_bytes).await?;
let fixed_header = StoreHeader::decode_fixed(table_folder, &fixed_header_bytes).await?;
// decode the indexes
let mut rest_bytes: Vec<u8> = StoreHeader::buffer_for_rest_decoding(&fixed_header);
file.read_exact(&mut rest_bytes).await?;
StoreHeader::decode_rest(fixed_header, &rest_bytes).await?
};
let indexes: StoreIndexes<T> = {
let mut result = Vec::with_capacity(header.number_of_columns);
for (column, &is_indexed) in header.indexed_columns.iter().enumerate() {
if is_indexed {
result.push(Some(Self::connect_index_at(&header, column as Column).await?))
} else {
result.push(None)
}
}
result
};
let store = Self {
header,
indexes
};
Ok(store)
}
// ===Cursors===
pub async fn read_cursor(&self) -> Result<ReadCursor<T>>
where T: Send + Sync
{
ReadCursor::new(self).await
}
pub async fn write_cursor(&mut self) -> Result<WriteCursor<T>>
where T: Send + Sync
{
WriteCursor::new(self).await
}
// ===Indexes===
pub async fn attach_index(&mut self, column: Column) -> Result<()>
where T: Ord + Decode + Encode + Send + Sync
{
if self.header.is_column_indexed(column) {
Err(Error::ColumnAlreadyIndexed(column))
} else {
let mut cursor = self.write_cursor().await?;
cursor.attach_index(column).await
}
}
// For debugging.
#[allow(dead_code)]
pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>
where T: Send + Sync
{
let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
let bytes = cursor.read_all_bytes().await?;
Ok(bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::segments::entry::Entry;
use crate::cursor_capabilities::header_access::CursorCanReadHeader;
use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex};
impl <T>Drop for Store<T> {
fn drop(&mut self) {
println!("DROPPING TEST FOLDER");
let table_folder = self.header.table_folder.clone();
// Seems no one has figured out how to do AsyncDrop yet.
std::fs::remove_dir_all(table_folder).unwrap();
}
}
#[tokio::test]
async fn test_create() {
type Data = u32;
let table_path = "test_table_0";
let number_of_columns = 5;
let primary_column = 0;
let store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
assert!(store.header.number_of_columns == number_of_columns);
assert!(store.header.total_count == 0);
assert!(store.header.deleted_count == 0);
assert!(store.header.primary_column == primary_column);
}
#[tokio::test]
async fn test_insert() {
type Data = u32;
let table_path = "test_table_1";
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
{
let mut cursor = store.write_cursor().await.unwrap();
let entry0: Entry<Data> = Entry::new(vec![1, 2, 3, 4, 5]);
cursor.insert_entry(entry0).await.unwrap();
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
cursor.insert_entry(entry1).await.unwrap();
assert!(store.header.total_count == 2);
}
}
#[tokio::test]
async fn test_select_next() {
type Data = u32;
let table_path = "test_table_2";
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
{
let mut cursor = store.write_cursor().await.unwrap();
let entry0: Entry<Data> = Entry::new(vec![1, 2, 3, 4, 5]);
cursor.insert_entry(entry0).await.unwrap();
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
cursor.insert_entry(entry1).await.unwrap();
assert!(store.header.total_count == 2);
}
{
let mut cursor = store.read_cursor().await.unwrap();
let entry0 = cursor.next().await.unwrap().unwrap();
let entry1 = cursor.next().await.unwrap().unwrap();
assert!(entry0.data == vec![1,2,3,4,5]);
assert!(entry1.data == vec![6,7,8,9,10]);
}
}
#[tokio::test]
async fn test_select_all() {
type Data = u32;
let table_path = "test_table_3";
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
{
let mut cursor = store.write_cursor().await.unwrap();
let entry0: Entry<Data> = Entry::new(vec![1, 2, 3, 4, 5]);
cursor.insert_entry(entry0).await.unwrap();
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
cursor.insert_entry(entry1).await.unwrap();
assert!(store.header.total_count == 2);
}
{
let mut cursor = store.read_cursor().await.unwrap();
let mut entries = vec![];
while let Some(entry) = cursor.next().await.unwrap() {
entries.push(entry)
}
assert!(entries.len() == 2);
assert!(entries[0].data == vec![1,2,3,4,5]);
assert!(entries[1].data == vec![6,7,8,9,10]);
}
}
#[tokio::test]
async fn test_select_eq() {
type Data = u32;
let table_path = "test_table_4";
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
let value = 200;
{
let mut cursor = store.write_cursor().await.unwrap();
let entry0: Entry<Data> = Entry::new(vec![1, value, 3, 4, 5]);
cursor.insert_entry(entry0).await.unwrap();
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
cursor.insert_entry(entry1).await.unwrap();
let entry2: Entry<Data> = Entry::new(vec![11, 2, 10, 10, 10]);
cursor.insert_entry(entry2).await.unwrap();
let entry3: Entry<Data> = Entry::new(vec![1, value, 100, 50, 40]);
cursor.insert_entry(entry3).await.unwrap();
assert!(store.header.total_count == 4);
}
{
let mut cursor = store.read_cursor().await.unwrap();
let column = 1;
let entries = cursor.select_entries_where_eq(column, &value).await.unwrap();
assert!(entries.len() == 2);
assert!(entries[0].data == vec![1, value, 3, 4, 5]);
assert!(entries[1].data == vec![1, value, 100, 50, 40]);
}
}
#[tokio::test]
async fn test_select_eq_indexed() {
type Data = u32;
let table_path = "test_table_5";
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
let column: Column = 1;
assert!(store.indexes[column as usize].is_none());
store.attach_index(column).await.unwrap();
assert!(store.indexes[column as usize].is_some());
let value = 200;
{
let mut cursor = store.write_cursor().await.unwrap();
let entry0: Entry<Data> = Entry::new(vec![1, value, 3, 4, 5]);
cursor.insert_entry(entry0).await.unwrap();
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
cursor.insert_entry(entry1).await.unwrap();
let entry2: Entry<Data> = Entry::new(vec![11, 2, 10, 10, 10]);
cursor.insert_entry(entry2).await.unwrap();
let entry3: Entry<Data> = Entry::new(vec![1, value, 100, 50, 40]);
cursor.insert_entry(entry3).await.unwrap();
assert!(store.header.total_count == 4);
}
{
let mut cursor = store.read_cursor().await.unwrap();
let column = 1;
let entries = cursor.select_entries_where_eq(column, &value).await.unwrap();
assert!(entries.len() == 2);
// Order may be non-deterministic.
assert!(entries[0].data[column as usize] == value);
assert!(entries[1].data[column as usize] == value);
}
}
#[tokio::test]
async fn test_delete_entry() {
type Data = u32;
let table_path = "test_table_6";
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
let value = 200;
let (_file_position0, file_position1, _file_position2, _file_position3) = {
let mut cursor = store.write_cursor().await.unwrap();
let entry0: Entry<Data> = Entry::new(vec![1, value, 3, 4, 5]);
let file_position0 = cursor.insert_entry(entry0).await.unwrap();
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
let file_position1 = cursor.insert_entry(entry1).await.unwrap();
let entry2: Entry<Data> = Entry::new(vec![11, 2, 10, 10, 10]);
let file_position2 = cursor.insert_entry(entry2).await.unwrap();
let entry3: Entry<Data> = Entry::new(vec![1, value, 100, 50, 40]);
let file_position3 = cursor.insert_entry(entry3).await.unwrap();
assert!(store.header.total_count == 4);
(file_position0, file_position1, file_position2, file_position3)
};
{
assert!(store.header.deleted_count == 0);
let mut cursor = store.write_cursor().await.unwrap();
cursor.mark_deleted_at(file_position1, false).await.unwrap();
assert!(store.header.deleted_count == 1);
}
}
#[tokio::test]
async fn test_delete_where_eq() {
type Data = u32;
let table_path = "test_table_7";
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
let column: Column = 1;
assert!(store.indexes[column as usize].is_none());
store.attach_index(column).await.unwrap();
assert!(store.indexes[column as usize].is_some());
let value = 200;
let (_file_position0, _file_position1, _file_position2, _file_position3) = {
let mut cursor = store.write_cursor().await.unwrap();
let entry0: Entry<Data> = Entry::new(vec![1, value, 3, 4, 5]);
let file_position0 = cursor.insert_entry(entry0).await.unwrap();
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
let file_position1 = cursor.insert_entry(entry1).await.unwrap();
let entry2: Entry<Data> = Entry::new(vec![11, 2, 10, 10, 10]);
let file_position2 = cursor.insert_entry(entry2).await.unwrap();
let entry3: Entry<Data> = Entry::new(vec![1, value, 100, 50, 40]);
let file_position3 = cursor.insert_entry(entry3).await.unwrap();
assert!(store.header.total_count == 4);
(file_position0, file_position1, file_position2, file_position3)
};
{
assert!(store.header.deleted_count == 0);
let mut cursor = store.write_cursor().await.unwrap();
cursor.delete_entries_where_eq(column, &value, false).await.unwrap();
assert!(store.header.deleted_count == 2);
}
}
#[tokio::test]
async fn test_garbage_collection() {
type Data = u32;
let table_path = "test_table_8";
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
let column: Column = 1;
assert!(store.indexes[column as usize].is_none());
store.attach_index(column).await.unwrap();
assert!(store.indexes[column as usize].is_some());
let value = 200;
let (_file_position0, _file_position1, _file_position2, _file_position3) = {
let mut cursor = store.write_cursor().await.unwrap();
let entry0: Entry<Data> = Entry::new(vec![1, value, 3, 4, 5]);
let file_position0 = cursor.insert_entry(entry0).await.unwrap();
let entry1: Entry<Data> = Entry::new(vec![6, 7, 8, 9, 10]);
let file_position1 = cursor.insert_entry(entry1).await.unwrap();
let entry2: Entry<Data> = Entry::new(vec![11, 2, 10, 10, 10]);
let file_position2 = cursor.insert_entry(entry2).await.unwrap();
let entry3: Entry<Data> = Entry::new(vec![1, value, 100, 50, 40]);
let file_position3 = cursor.insert_entry(entry3).await.unwrap();
assert!(store.header.total_count == 4);
(file_position0, file_position1, file_position2, file_position3)
};
{
assert!(store.header.deleted_count == 0);
let mut cursor = store.write_cursor().await.unwrap();
cursor.delete_entries_where_eq(column, &value, false).await.unwrap();
assert!(cursor.header().deleted_count == 2);
assert!(cursor.header().total_count == 4);
cursor.initiate_garbage_collection().await.unwrap();
assert!(cursor.header().deleted_count == 0);
assert!(cursor.header().total_count == 2);
}
}
}