Merge branch 'redesign-tables' into redesign-indexes
This commit is contained in:
commit
dae012daa7
6 changed files with 358 additions and 138 deletions
|
|
@ -2,20 +2,41 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom};
|
|||
use tokio::fs::{File, OpenOptions};
|
||||
use std::path::Path;
|
||||
use std::marker::PhantomData;
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use bincode;
|
||||
use bincode::{Decode, Encode};
|
||||
use crate::binary_coding::encode;
|
||||
use crate::binary_coding::{encode, decode};
|
||||
|
||||
use crate::entry::{Entry, EntryDetailed};
|
||||
use crate::entry_header::EntryHeaderWithDataSize;
|
||||
use crate::entry_header::{EntryHeaderWithDataSize, EntryHeader};
|
||||
use crate::store_header::StoreHeader;
|
||||
use crate::storage_engine::{Store, FilePosition, Result, ROWS_FILE_NAME};
|
||||
use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME};
|
||||
|
||||
// ===Concrete Cursors===
|
||||
pub struct ReadCursor<T> {
|
||||
header: StoreHeader,
|
||||
file: File,
|
||||
data_type: PhantomData<T>,
|
||||
|
||||
eof_file_position: FilePosition,
|
||||
}
|
||||
|
||||
pub struct WriteCursor<'a, T> {
|
||||
header: &'a mut StoreHeader,
|
||||
file: File,
|
||||
data_type: PhantomData<T>,
|
||||
|
||||
eof_file_position: FilePosition,
|
||||
}
|
||||
|
||||
|
||||
// ===Traits===
|
||||
#[async_trait]
|
||||
trait PrimitiveCursor<T> {
|
||||
// TODO: Make this private
|
||||
pub trait PrimitiveCursor<T> {
|
||||
fn file(&mut self) -> &mut File;
|
||||
fn eof_file_position(&self) -> FilePosition;
|
||||
|
||||
|
|
@ -30,32 +51,41 @@ trait PrimitiveCursor<T> {
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> {
|
||||
self.file().seek(SeekFrom::Start(file_position)).await?;
|
||||
Ok(())
|
||||
async fn seek_to(&mut self, file_position: FilePosition) -> Result<FilePosition> {
|
||||
let file_position = self.file().seek(SeekFrom::Start(file_position)).await?;
|
||||
Ok(file_position)
|
||||
}
|
||||
|
||||
async fn seek_to_start(&mut self) -> Result<()> {
|
||||
self.file().seek(SeekFrom::Start(0)).await?;
|
||||
Ok(())
|
||||
// Start of the file i.e. the Header, not the entries.
|
||||
async fn seek_to_start(&mut self) -> Result<FilePosition> {
|
||||
let file_position = self.file().seek(SeekFrom::Start(0)).await?;
|
||||
Ok(file_position)
|
||||
}
|
||||
|
||||
async fn seek_to_end(&mut self) -> Result<()> {
|
||||
self.file().seek(SeekFrom::End(0)).await?;
|
||||
Ok(())
|
||||
async fn seek_to_end(&mut self) -> Result<FilePosition> {
|
||||
let file_position = self.file().seek(SeekFrom::End(0)).await?;
|
||||
Ok(file_position)
|
||||
}
|
||||
|
||||
async fn seek_to_start_of_data(&mut self) -> Result<()> {
|
||||
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
|
||||
self.seek_to(StoreHeader::SIZE as u64).await
|
||||
}
|
||||
|
||||
// Seeks from current position by offset and returns new file position
|
||||
async fn seek_by(&mut self, offset: i64) -> Result<FilePosition> {
|
||||
let file_position = self.file().seek(SeekFrom::Current(offset)).await?;
|
||||
Ok(file_position)
|
||||
}
|
||||
|
||||
async fn current_file_position(&mut self) -> Result<FilePosition> {
|
||||
let next_file_position: FilePosition = self.file().stream_position().await?;
|
||||
Ok(next_file_position)
|
||||
}
|
||||
|
||||
async fn is_at_eof(&mut self) -> Result<bool> {
|
||||
Ok(self.current_file_position().await? == self.eof_file_position())
|
||||
let current_file_position = self.current_file_position().await?;
|
||||
let eof_file_position = self.eof_file_position();
|
||||
Ok(current_file_position == eof_file_position)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -86,24 +116,94 @@ pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> {
|
|||
}
|
||||
|
||||
// ===Iteration===
|
||||
// Assumes that the current file position is at a valid entry or EOF.
|
||||
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode
|
||||
{
|
||||
// The following functions assume that the current file position is at a valid entry or EOF.
|
||||
|
||||
|
||||
// WARNING: This moves the file_position to start of the data, so you can't just call
|
||||
// next_entry_header() a bunch of times. You must move the cursor!
|
||||
async fn next_entry_header(&mut self) -> Result<Option<EntryHeaderWithDataSize>> {
|
||||
if self.is_at_eof().await? {
|
||||
return Ok(None)
|
||||
}
|
||||
|
||||
let entry_header = self.read_entry_header().await?;
|
||||
|
||||
Ok(Some(entry_header))
|
||||
}
|
||||
|
||||
// This is meant to be used after next_entry_header() is called.
|
||||
async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result<FilePosition>{
|
||||
let file_position = self.seek_by(entry_header.size_of_data() as i64).await?;
|
||||
Ok(file_position)
|
||||
}
|
||||
|
||||
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode
|
||||
{
|
||||
let file_position = self.current_file_position().await?;
|
||||
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
|
||||
|
||||
let mut data_bytes: Vec<u8> = vec![0; entry_header.size_of_data()];
|
||||
self.read_bytes(&mut data_bytes).await?;
|
||||
let entry: EntryDetailed<T> =
|
||||
EntryDetailed::decode(entry_header, self.header().number_of_columns, &mut data_bytes)?;
|
||||
EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?;
|
||||
|
||||
Ok(Some(entry))
|
||||
}
|
||||
|
||||
// Like next, but only reads the column, not the whole entry.
|
||||
async fn next_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, T)>>
|
||||
where T: Decode + Send
|
||||
{
|
||||
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
|
||||
let file_position_at_start_of_data = self.current_file_position().await?;
|
||||
|
||||
// figuring out how much to decode
|
||||
let column_offset = entry_header.offset_of_column(column);
|
||||
self.seek_by(column_offset as i64).await?;
|
||||
|
||||
// reading and decoding
|
||||
let mut bytes: Vec<u8> = vec![0; entry_header.data_sizes[column as usize]];
|
||||
self.read_bytes(&mut bytes).await?;
|
||||
let (value, _) =
|
||||
decode::<T>(&bytes[..])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
|
||||
|
||||
// jumping to next entry
|
||||
self.seek_to(file_position_at_start_of_data).await?;
|
||||
self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?;
|
||||
|
||||
Ok(Some((entry_header, value)))
|
||||
}
|
||||
|
||||
async fn next_alive(&mut self) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode
|
||||
{
|
||||
while let Some(entry) = self.next().await? {
|
||||
if !entry.header.is_deleted {
|
||||
return Ok(Some(entry))
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
// ===Search===
|
||||
async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode + PartialEq + Send + Sync
|
||||
{
|
||||
let mut file_position = self.current_file_position().await?;
|
||||
while let Some((_, t)) = self.next_at_column(column).await? {
|
||||
if &t == t0 {
|
||||
// go back and decode the whole entry
|
||||
self.seek_to(file_position).await?;
|
||||
return self.next().await
|
||||
} else {
|
||||
file_position = self.current_file_position().await?;
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
// ===Debugging===
|
||||
async fn read_entries(&mut self) -> Result<()>
|
||||
where T: Decode + std::fmt::Debug
|
||||
|
|
@ -118,29 +218,13 @@ pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> {
|
|||
|
||||
async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error> {
|
||||
let mut bytes: Vec<u8> = vec![];
|
||||
self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?;
|
||||
self.file().read_to_end(&mut bytes).await?;
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ===Concrete Cursors===
|
||||
pub struct ReadCursor<T> {
|
||||
header: StoreHeader,
|
||||
file: File,
|
||||
data_type: PhantomData<T>,
|
||||
|
||||
eof_file_position: FilePosition,
|
||||
}
|
||||
|
||||
pub struct WriteCursor<'a, T> {
|
||||
header: &'a mut StoreHeader,
|
||||
file: File,
|
||||
data_type: PhantomData<T>,
|
||||
|
||||
eof_file_position: FilePosition,
|
||||
}
|
||||
|
||||
// ===PrimitiveCursor===
|
||||
impl <T>PrimitiveCursor<T> for ReadCursor<T> {
|
||||
fn file(&mut self) -> &mut File {
|
||||
|
|
@ -179,7 +263,7 @@ impl <T> ReadCursor<T> {
|
|||
pub async fn new(store: &Store<T>) -> Result<Self>
|
||||
where T: Send
|
||||
{
|
||||
let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME);
|
||||
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
|
||||
let file: File =
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
|
|
@ -191,8 +275,11 @@ impl <T> ReadCursor<T> {
|
|||
file,
|
||||
data_type: store.data_type,
|
||||
|
||||
eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data
|
||||
eof_file_position: 0,
|
||||
};
|
||||
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
|
||||
cursor.eof_file_position = eof_file_position;
|
||||
|
||||
cursor.seek_to_start_of_data().await?;
|
||||
|
||||
Ok(cursor)
|
||||
|
|
@ -208,7 +295,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
|
|||
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
|
||||
where T: Send
|
||||
{
|
||||
let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME);
|
||||
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
|
||||
let file: File =
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
|
|
@ -221,13 +308,42 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
|
|||
file,
|
||||
data_type: store.data_type,
|
||||
|
||||
eof_file_position: 0, // This will be overwriten by the seek_to_start_of_data
|
||||
eof_file_position: 0,
|
||||
};
|
||||
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
|
||||
cursor.eof_file_position = eof_file_position;
|
||||
|
||||
cursor.seek_to_start_of_data().await?;
|
||||
|
||||
Ok(cursor)
|
||||
}
|
||||
|
||||
|
||||
pub async fn connect<'header: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader) -> Result<Self>
|
||||
where T: Send
|
||||
{
|
||||
let file: File =
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(path_to_rows)
|
||||
.await?;
|
||||
|
||||
let mut cursor = Self {
|
||||
header,
|
||||
file,
|
||||
data_type: PhantomData::<T>,
|
||||
|
||||
eof_file_position: 0,
|
||||
};
|
||||
let eof_file_position: FilePosition = cursor.seek_to_end().await?;
|
||||
cursor.eof_file_position = eof_file_position;
|
||||
|
||||
cursor.seek_to_start_of_data().await?;
|
||||
|
||||
Ok(cursor)
|
||||
}
|
||||
|
||||
|
||||
// ===Primitive Operations===
|
||||
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
|
||||
Ok(self.file.write(bytes).await?)
|
||||
|
|
@ -255,12 +371,10 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
|
|||
}
|
||||
|
||||
// ===Entry Header Manipulation===
|
||||
// assumes we are at the start of the valid entry.
|
||||
async fn set_entry_is_deleted_to(&mut self, is_deleted: bool) -> Result<()>
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to(EntryHeaderWithDataSize::IS_DELETED_OFFSET as u64).await?;
|
||||
self.write_bytes(&encode::<bool>(&is_deleted)?).await?;
|
||||
// assumes we are at the start of valid entry.
|
||||
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> {
|
||||
let bytes: Vec<u8> = entry_header.encode()?;
|
||||
self.write_bytes(&bytes).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -274,8 +388,7 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
|
|||
self.increment_total_count().await?;
|
||||
|
||||
let encoded_entry: Vec<u8> = entry.encode()?;
|
||||
self.seek_to_end().await?;
|
||||
let file_position: FilePosition = self.current_file_position().await?;
|
||||
let file_position = self.seek_to_end().await?;
|
||||
self.write_bytes(&encoded_entry).await?;
|
||||
|
||||
let eof_file_position: FilePosition = self.current_file_position().await?;
|
||||
|
|
@ -289,19 +402,33 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
|
|||
where T: Send
|
||||
{
|
||||
self.seek_to(file_position).await?;
|
||||
let entry_header = self.read_entry_header().await?;
|
||||
let mut entry_header = self.read_entry_header().await?;
|
||||
if entry_header.is_deleted {
|
||||
Ok(())
|
||||
} else {
|
||||
self.increment_deleted_count().await?;
|
||||
self.seek_to(file_position).await?;
|
||||
self.set_entry_is_deleted_to(true).await?;
|
||||
|
||||
entry_header.is_deleted = true;
|
||||
self.set_new_entry_header(entry_header.into()).await?;
|
||||
|
||||
self.attempt_garbage_collection_if_necessary().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode + PartialEq + Send + Sync
|
||||
{
|
||||
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
|
||||
if let Some(entry) = maybe_entry {
|
||||
self.mark_deleted_at(entry.file_position).await?;
|
||||
Ok(Some(entry))
|
||||
} else {
|
||||
Ok(maybe_entry)
|
||||
}
|
||||
}
|
||||
|
||||
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> {
|
||||
// TODO: What should be the policy? Counting size of garbage? Counting how many entries are
|
||||
// garbage?
|
||||
|
|
@ -311,24 +438,44 @@ impl <'cursor, T> WriteCursor<'cursor, T> {
|
|||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn initiate_garbage_collection(&mut self) -> Result<usize>
|
||||
where T: Send
|
||||
{
|
||||
let table_folder = self.header.table_folder.to_string();
|
||||
let path_to_table = Path::new(&table_folder);
|
||||
let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
|
||||
|
||||
let intermediate_file: File = Store::<T>::create_empty_rows_file(path_to_rows, &self.header).await?;
|
||||
|
||||
let mut intermediate_header: StoreHeader = StoreHeader {
|
||||
table_folder,
|
||||
number_of_columns: self.header.number_of_columns,
|
||||
deleted_count: 0,
|
||||
total_count: 0,
|
||||
primary_column: self.header.primary_column
|
||||
};
|
||||
|
||||
// Creates a new cursor to the intermediate file in which we'll dump the live entries.
|
||||
// let mut cursor_to_intermediate = Self {
|
||||
// header: &mut intermediate_header,
|
||||
// file: intermediate_file,
|
||||
// data_type: PhantomData::<T>,
|
||||
|
||||
// eof_file_position: 0,
|
||||
// };
|
||||
let mut cursor_to_intermediate: Self = todo!();
|
||||
let eof_file_position: FilePosition = cursor_to_intermediate.seek_to_end().await?;
|
||||
cursor_to_intermediate.eof_file_position = eof_file_position;
|
||||
|
||||
|
||||
|
||||
// TODO: intermediate_header does not live long enough, so after garbage collection is
|
||||
// done, we need to use it in the swap.
|
||||
cursor_to_intermediate.header = todo!();
|
||||
|
||||
// In it there will be only the alive rows.
|
||||
// Afterwards we swap the files, and delete the garbage.
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// TODO
|
||||
// pub async fn search_for(&mut self, index: T) -> Result<()>
|
||||
// where T: Send
|
||||
// {
|
||||
// // let index = self.primary_index.borrow_mut();
|
||||
// // let x = index.lookup(self, 123).await?;
|
||||
// todo!()
|
||||
// }
|
||||
// pub async fn search_for_entry_with_id(&mut self, id: T) -> Result<Option<EntryDetailed<T>>> {
|
||||
// // TODO: make call to the primary index
|
||||
// todo!()
|
||||
// }
|
||||
|
||||
// // TODO: This needs to be some sort of an iterator
|
||||
// pub async fn get_all_eq(&self, column: Column, value: T) -> Result<Option<EntryDetailed<T>>> {
|
||||
// todo!()
|
||||
// }
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
use bincode::{Decode, Encode};
|
||||
|
||||
use crate::binary_coding::{encode, encode_sequence, encode_sequence_with_sizes, decode_sequence};
|
||||
use crate::storage_engine::Result;
|
||||
use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence};
|
||||
use crate::storage_engine::{Result, FilePosition};
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use crate::entry_header::{EntryHeader, EntryHeaderWithDataSize};
|
||||
|
||||
|
|
@ -13,15 +13,9 @@ pub struct Entry<T> {
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct EntryDetailed<T> {
|
||||
header: EntryHeaderWithDataSize,
|
||||
data: Vec<T>,
|
||||
}
|
||||
|
||||
impl EntryHeader {
|
||||
fn encode(self: &EntryHeader) -> Result<Vec<u8>> {
|
||||
let result: Vec<u8> = encode(&self.is_deleted)?;
|
||||
Ok(result)
|
||||
}
|
||||
pub header: EntryHeaderWithDataSize,
|
||||
pub file_position: FilePosition,
|
||||
pub data: Vec<T>,
|
||||
}
|
||||
|
||||
impl <T>Entry<T> {
|
||||
|
|
@ -47,11 +41,11 @@ impl <T>Entry<T> {
|
|||
}
|
||||
|
||||
impl <T>EntryDetailed<T> {
|
||||
pub fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result<Self>
|
||||
pub fn decode(header: EntryHeaderWithDataSize, file_position: FilePosition, number_of_columns: usize, bytes: &[u8]) -> Result<Self>
|
||||
where T: Decode
|
||||
{
|
||||
let data = decode_sequence::<T>(number_of_columns, bytes)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?;
|
||||
Ok(EntryDetailed { header, data })
|
||||
Ok(EntryDetailed { header, file_position, data })
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use crate::binary_coding::{decode, decode_sequence};
|
||||
use crate::storage_engine::Result;
|
||||
use crate::binary_coding::{decode, encode, decode_sequence};
|
||||
use crate::storage_engine::{Result, Column};
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use std::mem::size_of;
|
||||
|
||||
|
|
@ -14,6 +14,20 @@ pub struct EntryHeaderWithDataSize {
|
|||
pub data_sizes: Vec<usize>, // vec![5, 6, 20] means that column 0 stores 5 bytes, column 1 stores 6
|
||||
// bytes etc
|
||||
}
|
||||
|
||||
impl EntryHeader {
|
||||
pub fn encode(self: &EntryHeader) -> Result<Vec<u8>> {
|
||||
let result: Vec<u8> = encode(&self.is_deleted)?;
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<EntryHeaderWithDataSize> for EntryHeader {
|
||||
fn from(entry: EntryHeaderWithDataSize) -> Self {
|
||||
Self { is_deleted: entry.is_deleted, }
|
||||
}
|
||||
}
|
||||
|
||||
impl EntryHeaderWithDataSize {
|
||||
pub const IS_DELETED_OFFSET: usize = 0;
|
||||
pub const IS_DELETED_SIZE: usize = size_of::<bool>();
|
||||
|
|
@ -28,6 +42,18 @@ impl EntryHeaderWithDataSize {
|
|||
self.data_sizes.iter().sum()
|
||||
}
|
||||
|
||||
pub fn offset_of_column(&self, column: Column) -> usize {
|
||||
let mut sum = 0;
|
||||
for (i, size) in self.data_sizes.iter().enumerate() {
|
||||
if i < column as usize {
|
||||
sum += size;
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
sum
|
||||
}
|
||||
|
||||
pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result<Self> {
|
||||
let (is_deleted, _) =
|
||||
decode::<bool>(&bytes)
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ mod store_header;
|
|||
|
||||
use crate::entry::{Entry, EntryDetailed};
|
||||
use crate::storage_engine::{Store, FilePosition};
|
||||
use crate::cursor::{ReadCursor, WriteCursor};
|
||||
use crate::cursor::{ReadCursor, WriteCursor, CursorWithStoreHeader, PrimitiveCursor};
|
||||
|
||||
type Data = u32;
|
||||
|
||||
|
|
@ -20,8 +20,8 @@ type Result<T> = std::result::Result<T, std::io::Error>;
|
|||
async fn create_store() -> Result<Store<Data>> {
|
||||
let mut store: Store<Data> = Store::new(TABLE_PATH, 5, 0).await.map_err(|e| e.to_io_or_panic())?;
|
||||
println!("CREATED");
|
||||
println!("THE STORE: {:?}", store);
|
||||
println!("THE BYTES: {:?}", store.read_all_bytes().await?);
|
||||
// println!("THE STORE: {:?}", store);
|
||||
// println!("THE BYTES: {:?}", store.read_all_bytes().await?);
|
||||
|
||||
Ok(store)
|
||||
}
|
||||
|
|
@ -29,8 +29,8 @@ async fn create_store() -> Result<Store<Data>> {
|
|||
async fn connect_store() -> Result<Store<Data>> {
|
||||
let mut store: Store<Data> = Store::connect(TABLE_PATH).await.map_err(|e| e.to_io_or_panic())?;
|
||||
println!("CONNECTED");
|
||||
println!("THE STORE: {:?}", store);
|
||||
println!("THE BYTES: {:?}", store.read_all_bytes().await?);
|
||||
// println!("THE STORE: {:?}", store);
|
||||
// println!("THE BYTES: {:?}", store.read_all_bytes().await?);
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
|
|
@ -60,6 +60,22 @@ async fn read_entry(cursor: &mut ReadCursor<Data>, file_position: FilePosition)
|
|||
todo!()
|
||||
}
|
||||
|
||||
async fn append_bunch_of_entries(store: &mut Store<Data>) -> Result<()> {
|
||||
let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||
let entry0: Entry<u32> = Entry::new(vec![1, 2, 3, 4, 5]);
|
||||
append_entry(&mut cursor, &entry0).await?;
|
||||
|
||||
let entry1: Entry<u32> = Entry::new(vec![200, 200, 5, 6, 7]);
|
||||
append_entry(&mut cursor, &entry1).await?;
|
||||
|
||||
// println!("{:?}", store.read_all_bytes().await?);
|
||||
let entry2: Entry<u32> = Entry::new(vec![99, 98, 97, 96, 95]);
|
||||
append_entry(&mut cursor, &entry2).await?;
|
||||
|
||||
let entry3: Entry<u32> = Entry::new(vec![50,50,50,50,50]);
|
||||
append_entry(&mut cursor, &entry3).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
|
|
@ -67,42 +83,70 @@ async fn main() -> Result<()> {
|
|||
|
||||
let mut store: Store<Data> = create_or_connect().await?;
|
||||
|
||||
if store.header.total_count == 0 {
|
||||
println!("INSERTING!");
|
||||
append_bunch_of_entries(&mut store).await?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||
let entry0: Entry<u32> = Entry::new(vec![1, 2, 3, 4, 5]);
|
||||
append_entry(&mut cursor, &entry0).await?;
|
||||
|
||||
let entry1: Entry<u32> = Entry::new(vec![200, 200, 5, 6, 7]);
|
||||
append_entry(&mut cursor, &entry1).await?;
|
||||
let entry: Entry<u32> = Entry::new(vec![60, 50, 40, 30, 20]);
|
||||
// let file_position = append_entry(&mut cursor, &entry).await?;
|
||||
// let file_position = 215;
|
||||
// cursor.seek_to(file_position).await.map_err(|e| e.to_io_or_panic())?;
|
||||
|
||||
// println!("{:?}", store.read_all_bytes().await?);
|
||||
let entry2: Entry<u32> = Entry::new(vec![99, 98, 97, 96, 95]);
|
||||
append_entry(&mut cursor, &entry2).await?;
|
||||
// let entry_header = cursor.read_entry_header().await.map_err(|e| e.to_io_or_panic())?;
|
||||
// println!("entry header = {:?}", entry_header);
|
||||
|
||||
let entry3: Entry<u32> = Entry::new(vec![50,50,50,50,50]);
|
||||
append_entry(&mut cursor, &entry3).await?;
|
||||
// println!("FILE POSITION == {}", file_position);
|
||||
// cursor.mark_deleted_at(file_position).await.map_err(|e| e.to_io_or_panic())?;
|
||||
// let entry_header = cursor.read_entry_header().await.map_err(|e| e.to_io_or_panic())?;
|
||||
// println!("entry header after delete = {:?}", entry_header);
|
||||
}
|
||||
|
||||
// println!("{:?}", store);
|
||||
// println!("{:?}", store.read_all_bytes().await?);
|
||||
|
||||
// let entry0: Entry<u32> = Entry::new(vec![99, 98, 97, 96, 95]);
|
||||
// append_entry(&mut cursor, &entry0).await?;
|
||||
{
|
||||
let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||
cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?;
|
||||
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
|
||||
// println!("{:?}", x);
|
||||
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
|
||||
// println!("{:?}", x);
|
||||
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
|
||||
// println!("{:?}", x);
|
||||
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
|
||||
// println!("{:?}", x);
|
||||
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
|
||||
// println!("{:?}", x);
|
||||
}
|
||||
|
||||
// let entry1: Entry<u32> = Entry::new(vec![50,50,50,50,50]);
|
||||
// let file_position = append_entry(&mut cursor, &entry1).await?;
|
||||
// println!("CURRENT FILE_POSITION = {}", file_position);
|
||||
{
|
||||
let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||
let column = 2;
|
||||
let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?;
|
||||
println!("{:?}", x);
|
||||
let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?;
|
||||
println!("{:?}", x);
|
||||
let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?;
|
||||
println!("{:?}", x);
|
||||
let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?;
|
||||
println!("{:?}", x);
|
||||
let x = cursor.next_at_column(column).await.map_err(|e| e.to_io_or_panic())?;
|
||||
println!("{:?}", x);
|
||||
}
|
||||
|
||||
// Now file_position point to entry1.
|
||||
// cursor.mark_deleted_at(file_position).await.map_err(|e| e.to_io_or_panic())?;
|
||||
// cursor.seek_to(file_position).await.map_err(|e| e.to_io_or_panic())?;
|
||||
{
|
||||
let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||
let column = 3;
|
||||
let t0 = 6;
|
||||
let x = cursor.find_first_eq_bruteforce(column, &t0).await.map_err(|e| e.to_io_or_panic())?;
|
||||
println!("{:?}", x);
|
||||
}
|
||||
|
||||
// let entry2: StoreEntry<u32> = StoreEntry::new_deleted(vec![3, 2, 1]);
|
||||
// let cursor2 = store.append_entry(&entry2).await.map_err(|e| e.to_io_or_panic())?;
|
||||
// println!("cursor2 = {}", cursor2);
|
||||
|
||||
println!("{:?}", store);
|
||||
println!("{:?}", store.read_all_bytes().await?);
|
||||
|
||||
|
||||
println!("DONE");
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::fs::{File, OpenOptions, DirBuilder};
|
||||
use tokio::fs;
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::marker::PhantomData;
|
||||
use async_trait::async_trait;
|
||||
|
||||
|
|
@ -28,7 +28,6 @@ pub struct Store<T> {
|
|||
// {write: 0, read: n + 1} ~> {write:0, read: n} // destroy read
|
||||
// {write: 0, read: 0} ~> {write: 1, read: 0} // create write
|
||||
// {write: 1, read: 0} ~> {write: 0, read: 0} // destroy write
|
||||
pub table_folder: String,
|
||||
// primary_index: Vec<Index<T, FilePosition>>>,
|
||||
// indexes: Vec<Option<Index<T, HashSet<FilePosition>>>>,
|
||||
// primary_index: Index<PositionOfValue, PositionOfRow>,
|
||||
|
|
@ -54,6 +53,7 @@ pub async fn less_than_eq<T>(store: &mut Store<T>, file_position0: FilePosition,
|
|||
}
|
||||
|
||||
pub const ROWS_FILE_NAME: &'static str = "rows";
|
||||
pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate";
|
||||
|
||||
impl <T>Store<T> {
|
||||
// ===Creation===
|
||||
|
|
@ -63,6 +63,31 @@ impl <T>Store<T> {
|
|||
DirBuilder::new()
|
||||
.create(path_to_table).await?;
|
||||
|
||||
let header = StoreHeader {
|
||||
table_folder: table_folder.to_string(),
|
||||
number_of_columns,
|
||||
deleted_count: 0,
|
||||
total_count: 0,
|
||||
primary_column,
|
||||
};
|
||||
|
||||
// We don't need the file right now. Only cursors will later open it.
|
||||
Self::create_empty_rows_file(path_to_rows, &header).await?;
|
||||
|
||||
// TODO: indexes
|
||||
// let index: Index<PositionOfValue, PositionOfRow> = Index::new(
|
||||
// &format!("rows_{}", primary_column.to_string()),
|
||||
// ).await?;
|
||||
|
||||
let store = Self {
|
||||
header,
|
||||
data_type: PhantomData::<T>,
|
||||
};
|
||||
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result<File> {
|
||||
let mut file: File =
|
||||
OpenOptions::new()
|
||||
.write(true)
|
||||
|
|
@ -71,28 +96,10 @@ impl <T>Store<T> {
|
|||
.open(path_to_rows)
|
||||
.await?;
|
||||
|
||||
let header = StoreHeader {
|
||||
number_of_columns,
|
||||
deleted_count: 0,
|
||||
total_count: 0,
|
||||
primary_column,
|
||||
};
|
||||
let encoded_header: Vec<u8> = header.encode()?;
|
||||
file.write(&encoded_header).await?;
|
||||
|
||||
|
||||
// TODO: indexes
|
||||
// let index: Index<PositionOfValue, PositionOfRow> = Index::new(
|
||||
// &format!("rows_{}", primary_column.to_string()),
|
||||
// ).await?;
|
||||
|
||||
let store = Self {
|
||||
table_folder: table_folder.to_string(),
|
||||
header,
|
||||
data_type: PhantomData::<T>,
|
||||
};
|
||||
|
||||
Ok(store)
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
pub async fn connect(table_folder: &str) -> Result<Self>
|
||||
|
|
@ -112,10 +119,9 @@ impl <T>Store<T> {
|
|||
// header.
|
||||
let mut header_bytes = StoreHeader::decode_buffer();
|
||||
file.read_exact(&mut header_bytes).await?;
|
||||
let header = StoreHeader::decode(&mut header_bytes).await?;
|
||||
let header = StoreHeader::decode(table_folder, &mut header_bytes).await?;
|
||||
|
||||
let store = Self {
|
||||
table_folder: table_folder.to_string(),
|
||||
header,
|
||||
data_type: PhantomData::<T>,
|
||||
};
|
||||
|
|
|
|||
|
|
@ -5,6 +5,8 @@ use std::mem::size_of;
|
|||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StoreHeader {
|
||||
pub table_folder: String, // This one is not encoded into the file
|
||||
|
||||
pub number_of_columns: usize,
|
||||
pub deleted_count: usize,
|
||||
pub total_count: usize,
|
||||
|
|
@ -35,7 +37,7 @@ impl StoreHeader {
|
|||
[0; StoreHeader::SIZE]
|
||||
}
|
||||
|
||||
pub async fn decode(result: &mut [u8]) -> Result<StoreHeader> {
|
||||
pub async fn decode(table_folder: &str, result: &mut [u8]) -> Result<StoreHeader> {
|
||||
let (number_of_columns, _) =
|
||||
decode::<usize>(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
|
||||
|
|
@ -49,6 +51,7 @@ impl StoreHeader {
|
|||
decode::<Column>(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?;
|
||||
let header = StoreHeader {
|
||||
table_folder: table_folder.to_string(),
|
||||
number_of_columns,
|
||||
deleted_count,
|
||||
total_count,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue