Split Store into Store and Cursor
This commit is contained in:
parent
dbd2ba9946
commit
3e7e8665fd
2 changed files with 252 additions and 203 deletions
|
|
@ -23,11 +23,12 @@ async fn create_store() -> Result<Store<Data>> {
|
||||||
println!("THE STORE: {:?}", store);
|
println!("THE STORE: {:?}", store);
|
||||||
println!("THE BYTES: {:?}", store.read_all_bytes().await?);
|
println!("THE BYTES: {:?}", store.read_all_bytes().await?);
|
||||||
|
|
||||||
|
let mut cursor = store.cursor(AccessMode::Write).await.map_err(|e| e.to_io_or_panic())?;
|
||||||
let entry0: Entry<u32> = Entry::new_deleted(vec![1, 2, 3, 4, 5]);
|
let entry0: Entry<u32> = Entry::new_deleted(vec![1, 2, 3, 4, 5]);
|
||||||
append_entry(&mut store, &entry0).await?;
|
append_entry(&mut cursor, &entry0).await?;
|
||||||
|
|
||||||
let entry1: Entry<u32> = Entry::new_deleted(vec![200, 200, 5, 6, 7]);
|
let entry1: Entry<u32> = Entry::new_deleted(vec![200, 200, 5, 6, 7]);
|
||||||
append_entry(&mut store, &entry1).await?;
|
append_entry(&mut cursor, &entry1).await?;
|
||||||
|
|
||||||
println!("{:?}", store.read_all_bytes().await?);
|
println!("{:?}", store.read_all_bytes().await?);
|
||||||
Ok(store)
|
Ok(store)
|
||||||
|
|
@ -51,17 +52,17 @@ async fn create_or_connect() -> Result<Store<Data>> {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn append_entry(store: &mut Store<Data>, entry: &Entry<Data>) -> Result<FilePosition>{
|
async fn append_entry(cursor: &mut Cursor<Data>, entry: &Entry<Data>) -> Result<FilePosition>{
|
||||||
println!("APPENDING");
|
println!("APPENDING");
|
||||||
println!("entry == {:?}", entry);
|
println!("entry == {:?}", entry);
|
||||||
let file_position: FilePosition = store.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?;
|
let file_position: FilePosition = cursor.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?;
|
||||||
println!("file_position == {:?}", file_position);
|
println!("file_position == {:?}", file_position);
|
||||||
Ok(file_position)
|
Ok(file_position)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_entry(store: &mut Store<Data>, file_position: FilePosition) -> Result<Option<EntryDetailed<Data>>>{
|
async fn read_entry(cursor: &mut Cursor<Data>, file_position: FilePosition) -> Result<Option<EntryDetailed<Data>>>{
|
||||||
println!("READING ENTRY at file_position={}", file_position);
|
println!("READING ENTRY at file_position={}", file_position);
|
||||||
let entry = store.read_entry_at(file_position).await.map_err(|e| e.to_io_or_panic())?;
|
let entry = cursor.read_entry_at(file_position).await.map_err(|e| e.to_io_or_panic())?;
|
||||||
println!("ENTRY: {:?}", entry);
|
println!("ENTRY: {:?}", entry);
|
||||||
Ok(entry)
|
Ok(entry)
|
||||||
}
|
}
|
||||||
|
|
@ -71,17 +72,18 @@ async fn read_entry(store: &mut Store<Data>, file_position: FilePosition) -> Res
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
println!("STOOOOOOOOOOOORAAAAAAAAAAAGE");
|
println!("STOOOOOOOOOOOORAAAAAAAAAAAGE");
|
||||||
|
|
||||||
let mut store: Store<Data> = create_or_connect().await?;
|
let store: Store<Data> = create_or_connect().await?;
|
||||||
|
|
||||||
// let entry0 = read_entry(&mut store, 16).await?;
|
// let entry0 = read_entry(&mut store, 16).await?;
|
||||||
// let entry1 = read_entry(&mut store, 45).await?;
|
// let entry1 = read_entry(&mut store, 45).await?;
|
||||||
// println!("{:?}", store);
|
// println!("{:?}", store);
|
||||||
// println!("{:?}", store.read_all_bytes().await?);
|
// println!("{:?}", store.read_all_bytes().await?);
|
||||||
|
|
||||||
|
let mut cursor = store.cursor(AccessMode::Write).await.map_err(|e| e.to_io_or_panic())?;
|
||||||
let entry0: Entry<u32> = Entry::new(vec![99, 98, 97, 96, 95]);
|
let entry0: Entry<u32> = Entry::new(vec![99, 98, 97, 96, 95]);
|
||||||
append_entry(&mut store, &entry0).await?;
|
append_entry(&mut cursor, &entry0).await?;
|
||||||
|
|
||||||
store.read_entries().await.map_err(|e| e.to_io_or_panic())?;
|
cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?;
|
||||||
|
|
||||||
|
|
||||||
// let entry2: StoreEntry<u32> = StoreEntry::new_deleted(vec![3, 2, 1]);
|
// let entry2: StoreEntry<u32> = StoreEntry::new_deleted(vec![3, 2, 1]);
|
||||||
|
|
|
||||||
|
|
@ -15,8 +15,6 @@ use crate::error::{Error, DecodeErrorKind};
|
||||||
|
|
||||||
use crate::index::Index;
|
use crate::index::Index;
|
||||||
|
|
||||||
use std::cell::RefCell;
|
|
||||||
|
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, Error>;
|
type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
@ -27,15 +25,19 @@ pub type FilePosition = u64;
|
||||||
// TODO: Consider introducing a phantom type for the data that's used in the store.
|
// TODO: Consider introducing a phantom type for the data that's used in the store.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Store<T> {
|
pub struct Store<T> {
|
||||||
|
// TODO: This needs to track how many read-write cursors there are...?
|
||||||
|
// RWMutex
|
||||||
|
// {write: 0, read: n} ~> {write:0, read: n + 1} // create read
|
||||||
|
// {write: 0, read: n + 1} ~> {write:0, read: n} // destroy read
|
||||||
|
// {write: 0, read: 0} ~> {write: 1, read: 0} // create write
|
||||||
|
// {write: 1, read: 0} ~> {write: 0, read: 0} // destroy write
|
||||||
table_folder: String,
|
table_folder: String,
|
||||||
file: File,
|
|
||||||
// primary_index: Vec<Index<T, FilePosition>>>,
|
// primary_index: Vec<Index<T, FilePosition>>>,
|
||||||
// indexes: Vec<Option<Index<T, HashSet<FilePosition>>>>,
|
// indexes: Vec<Option<Index<T, HashSet<FilePosition>>>>,
|
||||||
// primary_index: Index<PositionOfValue, PositionOfRow>,
|
// primary_index: Index<PositionOfValue, PositionOfRow>,
|
||||||
header: StoreHeader,
|
header: StoreHeader,
|
||||||
data_type: PhantomData<T>,
|
data_type: PhantomData<T>,
|
||||||
|
|
||||||
eof_file_position: FilePosition,
|
|
||||||
// meta
|
// meta
|
||||||
// location of rows file
|
// location of rows file
|
||||||
// locations of index files
|
// locations of index files
|
||||||
|
|
@ -44,16 +46,24 @@ pub struct Store<T> {
|
||||||
// list
|
// list
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct Cursor<T> {
|
||||||
|
header: StoreHeader,
|
||||||
|
file: File,
|
||||||
|
data_type: PhantomData<T>,
|
||||||
|
|
||||||
|
eof_file_position: FilePosition,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum AccessMode {
|
||||||
|
Read,
|
||||||
|
Write
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type PositionOfValue = FilePosition;
|
||||||
|
pub type PositionOfRow = FilePosition;
|
||||||
|
|
||||||
|
|
||||||
type PositionOfValue = FilePosition;
|
#[derive(Debug, Clone)]
|
||||||
type PositionOfRow = FilePosition;
|
|
||||||
|
|
||||||
// TODO: Basically a pointer to Store + its own file position
|
|
||||||
// pub struct Cursor<'a, T> {
|
|
||||||
// }
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct StoreHeader {
|
pub struct StoreHeader {
|
||||||
number_of_columns: usize,
|
number_of_columns: usize,
|
||||||
deleted_count: usize,
|
deleted_count: usize,
|
||||||
|
|
@ -143,8 +153,225 @@ impl <T>SomethingSupportingLeq for Store<T>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const ROWS_FILE_NAME: &'static str = "rows";
|
||||||
|
|
||||||
impl <T>Store<T> {
|
impl <T>Store<T> {
|
||||||
const ROWS_FILE_NAME: &'static str = "rows";
|
// For debugging.
|
||||||
|
// Moves file cursor to the end.
|
||||||
|
pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>{
|
||||||
|
let mut bytes: Vec<u8> = vec![];
|
||||||
|
let mut cursor = self.cursor(AccessMode::Read).await.map_err(|e| e.to_io_or_panic())?;
|
||||||
|
cursor.file.read_to_end(&mut bytes).await?;
|
||||||
|
Ok(bytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===Creation===
|
||||||
|
pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result<Self> {
|
||||||
|
let path_to_table = Path::new(table_folder);
|
||||||
|
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||||
|
DirBuilder::new()
|
||||||
|
.create(path_to_table).await?;
|
||||||
|
|
||||||
|
let mut file: File =
|
||||||
|
OpenOptions::new()
|
||||||
|
.write(true)
|
||||||
|
.read(true)
|
||||||
|
.create_new(true)
|
||||||
|
.open(path_to_rows)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let header = StoreHeader {
|
||||||
|
number_of_columns,
|
||||||
|
deleted_count: 0,
|
||||||
|
total_count: 0,
|
||||||
|
primary_column,
|
||||||
|
};
|
||||||
|
let encoded_header: Vec<u8> = header.encode()?;
|
||||||
|
file.write(&encoded_header).await?;
|
||||||
|
|
||||||
|
|
||||||
|
// TODO: indexes
|
||||||
|
// let index: Index<PositionOfValue, PositionOfRow> = Index::new(
|
||||||
|
// &format!("rows_{}", primary_column.to_string()),
|
||||||
|
// ).await?;
|
||||||
|
|
||||||
|
let store = Self {
|
||||||
|
table_folder: table_folder.to_string(),
|
||||||
|
header,
|
||||||
|
data_type: PhantomData::<T>,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(store)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn connect(table_folder: &str) -> Result<Self>
|
||||||
|
where T: std::fmt::Debug
|
||||||
|
{
|
||||||
|
let path_to_table = Path::new(table_folder);
|
||||||
|
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||||
|
|
||||||
|
let mut file: File =
|
||||||
|
OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.write(true)
|
||||||
|
.open(path_to_rows)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Unfortunately we can't yet use store.read_bytes, since it can't be created without the
|
||||||
|
// header.
|
||||||
|
let mut header_bytes = StoreHeader::decode_buffer();
|
||||||
|
file.read_exact(&mut header_bytes).await?;
|
||||||
|
let header = StoreHeader::decode(&mut header_bytes).await?;
|
||||||
|
|
||||||
|
let store = Self {
|
||||||
|
table_folder: table_folder.to_string(),
|
||||||
|
header,
|
||||||
|
data_type: PhantomData::<T>,
|
||||||
|
};
|
||||||
|
Ok(store)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn cursor(&self, mode: AccessMode) -> Result<Cursor<T>> {
|
||||||
|
Cursor::new(&self, mode).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn garbage_collect(&mut self) -> Result<()> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===Store Header===
|
||||||
|
impl StoreHeader {
|
||||||
|
fn encode(&self) -> Result<Vec<u8>> {
|
||||||
|
let mut result = encode(&self.number_of_columns)?;
|
||||||
|
result.append(&mut encode(&self.deleted_count)?);
|
||||||
|
result.append(&mut encode(&self.total_count)?);
|
||||||
|
result.append(&mut encode(&self.primary_column)?);
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decode_buffer() -> [u8; StoreHeader::SIZE] {
|
||||||
|
[0; StoreHeader::SIZE]
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn decode(result: &mut [u8]) -> Result<StoreHeader> {
|
||||||
|
let (number_of_columns, _) =
|
||||||
|
decode::<usize>(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE])
|
||||||
|
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
|
||||||
|
let (deleted_count, _) =
|
||||||
|
decode::<usize>(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE])
|
||||||
|
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?;
|
||||||
|
let (total_count, _) =
|
||||||
|
decode::<usize>(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE])
|
||||||
|
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?;
|
||||||
|
let (primary_column, _) =
|
||||||
|
decode::<Column>(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE])
|
||||||
|
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?;
|
||||||
|
let header = StoreHeader {
|
||||||
|
number_of_columns,
|
||||||
|
deleted_count,
|
||||||
|
total_count,
|
||||||
|
primary_column,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(header)
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns new count
|
||||||
|
fn increment_total_count(&mut self) -> usize {
|
||||||
|
self.total_count += 1;
|
||||||
|
self.total_count
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns new count
|
||||||
|
fn increment_deleted_count(&mut self) -> usize {
|
||||||
|
self.deleted_count += 1;
|
||||||
|
self.deleted_count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ====Entry====
|
||||||
|
impl EntryHeader {
|
||||||
|
fn encode(self: &EntryHeader) -> Result<Vec<u8>> {
|
||||||
|
let result: Vec<u8> = encode(&self.is_deleted)?;
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EntryHeaderWithDataSize {
|
||||||
|
fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result<Self> {
|
||||||
|
let (is_deleted, _) =
|
||||||
|
decode::<bool>(&bytes)
|
||||||
|
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
|
||||||
|
|
||||||
|
let data_sizes = decode_sequence::<usize>(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..])
|
||||||
|
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?;
|
||||||
|
|
||||||
|
Ok(Self { is_deleted, data_sizes } )
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <T>Entry<T> {
|
||||||
|
pub fn new(data: Vec<T>) -> Self {
|
||||||
|
Self { header: EntryHeader { is_deleted: false }, data }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_deleted(data: Vec<T>) -> Self {
|
||||||
|
Self { header: EntryHeader { is_deleted: true}, data }
|
||||||
|
}
|
||||||
|
|
||||||
|
// FORMAT: [EntryHeaderWithDataSize, ..sequence of data]
|
||||||
|
fn encode(&self) -> Result<Vec<u8>>
|
||||||
|
where T: Encode
|
||||||
|
{
|
||||||
|
let mut result: Vec<u8> = self.header.encode()?;
|
||||||
|
|
||||||
|
let (mut encoded_data, sizes) = encode_sequence_with_sizes(&self.data[..])?;
|
||||||
|
result.append(&mut encode_sequence(&sizes)?); // sizes of data (fixed by number of columns)
|
||||||
|
result.append(&mut encoded_data); // data variable size
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <T>EntryDetailed<T> {
|
||||||
|
fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result<Self>
|
||||||
|
where T: Decode
|
||||||
|
{
|
||||||
|
let data = decode_sequence::<T>(number_of_columns, bytes)
|
||||||
|
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?;
|
||||||
|
Ok(EntryDetailed { header, data })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//=================Cursor==================
|
||||||
|
impl <T>Cursor<T> {
|
||||||
|
pub async fn new(store: &Store<T>, mode: AccessMode) -> Result<Self> {
|
||||||
|
let path_to_rows = Path::new(&store.table_folder).join(ROWS_FILE_NAME);
|
||||||
|
let file: File = match mode {
|
||||||
|
AccessMode::Read =>
|
||||||
|
OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.open(path_to_rows)
|
||||||
|
.await?,
|
||||||
|
|
||||||
|
AccessMode::Write =>
|
||||||
|
OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.write(true)
|
||||||
|
.open(path_to_rows)
|
||||||
|
.await?,
|
||||||
|
};
|
||||||
|
|
||||||
|
let cursor = Self {
|
||||||
|
header: store.header.clone(),
|
||||||
|
file,
|
||||||
|
data_type: store.data_type,
|
||||||
|
|
||||||
|
eof_file_position: 0,
|
||||||
|
};
|
||||||
|
Ok(cursor)
|
||||||
|
}
|
||||||
|
|
||||||
//===primitive file operations===
|
//===primitive file operations===
|
||||||
// Moves the file cursor right.
|
// Moves the file cursor right.
|
||||||
|
|
@ -165,7 +392,7 @@ impl <T>Store<T> {
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn seek_to(&mut self, file_position: FilePosition) -> Result<()>{
|
async fn seek_to(&mut self, file_position: FilePosition) -> Result<()> {
|
||||||
self.file.seek(SeekFrom::Start(file_position)).await?;
|
self.file.seek(SeekFrom::Start(file_position)).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -203,76 +430,6 @@ impl <T>Store<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===Creation===
|
// ===Creation===
|
||||||
pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result<Self> {
|
|
||||||
let path_to_table = Path::new(table_folder);
|
|
||||||
let path_to_rows = path_to_table.join(Self::ROWS_FILE_NAME);
|
|
||||||
DirBuilder::new()
|
|
||||||
.create(path_to_table).await?;
|
|
||||||
|
|
||||||
let file: File =
|
|
||||||
OpenOptions::new()
|
|
||||||
.write(true)
|
|
||||||
.read(true)
|
|
||||||
.create_new(true)
|
|
||||||
.open(path_to_rows)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let header = StoreHeader {
|
|
||||||
number_of_columns,
|
|
||||||
deleted_count: 0,
|
|
||||||
total_count: 0,
|
|
||||||
primary_column,
|
|
||||||
};
|
|
||||||
let encoded_header: Vec<u8> = header.encode()?;
|
|
||||||
|
|
||||||
|
|
||||||
// let index: Index<PositionOfValue, PositionOfRow> = Index::new(
|
|
||||||
// &format!("rows_{}", primary_column.to_string()),
|
|
||||||
// ).await?;
|
|
||||||
|
|
||||||
let mut store = Self {
|
|
||||||
table_folder: table_folder.to_string(),
|
|
||||||
file,
|
|
||||||
header,
|
|
||||||
data_type: PhantomData::<T>,
|
|
||||||
eof_file_position: 0,
|
|
||||||
};
|
|
||||||
store.write_bytes(&encoded_header).await?;
|
|
||||||
store.eof_file_position = store.current_file_position().await?;
|
|
||||||
|
|
||||||
Ok(store)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn connect(table_folder: &str) -> Result<Self>
|
|
||||||
where T: std::fmt::Debug
|
|
||||||
{
|
|
||||||
let path_to_table = Path::new(table_folder);
|
|
||||||
let path_to_rows = path_to_table.join(Self::ROWS_FILE_NAME);
|
|
||||||
|
|
||||||
let mut file: File =
|
|
||||||
OpenOptions::new()
|
|
||||||
.read(true)
|
|
||||||
.write(true)
|
|
||||||
.open(path_to_rows)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Unfortunately we can't yet use store.read_bytes, since it can't be created without the
|
|
||||||
// header.
|
|
||||||
let mut header_bytes = StoreHeader::decode_buffer();
|
|
||||||
file.read_exact(&mut header_bytes).await?;
|
|
||||||
let header = StoreHeader::decode(&mut header_bytes).await?;
|
|
||||||
|
|
||||||
let eof_file_position = file.seek(SeekFrom::End(0)).await?;
|
|
||||||
|
|
||||||
let store = Self {
|
|
||||||
table_folder: table_folder.to_string(),
|
|
||||||
file,
|
|
||||||
header,
|
|
||||||
data_type: PhantomData::<T>,
|
|
||||||
eof_file_position,
|
|
||||||
};
|
|
||||||
Ok(store)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ===Append Entry===
|
// ===Append Entry===
|
||||||
async fn increment_total_count(&mut self) -> Result<()> {
|
async fn increment_total_count(&mut self) -> Result<()> {
|
||||||
|
|
@ -398,118 +555,8 @@ impl <T>Store<T> {
|
||||||
pub async fn get_all_eq(&self, column: Column, value: T) -> Result<Option<EntryDetailed<T>>> {
|
pub async fn get_all_eq(&self, column: Column, value: T) -> Result<Option<EntryDetailed<T>>> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn garbage_collect(&mut self) -> Result<()> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===Store Header===
|
|
||||||
impl StoreHeader {
|
|
||||||
fn encode(&self) -> Result<Vec<u8>> {
|
|
||||||
let mut result = encode(&self.number_of_columns)?;
|
|
||||||
result.append(&mut encode(&self.deleted_count)?);
|
|
||||||
result.append(&mut encode(&self.total_count)?);
|
|
||||||
result.append(&mut encode(&self.primary_column)?);
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn decode_buffer() -> [u8; StoreHeader::SIZE] {
|
|
||||||
[0; StoreHeader::SIZE]
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn decode(result: &mut [u8]) -> Result<StoreHeader> {
|
|
||||||
let (number_of_columns, _) =
|
|
||||||
decode::<usize>(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE])
|
|
||||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
|
|
||||||
let (deleted_count, _) =
|
|
||||||
decode::<usize>(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE])
|
|
||||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?;
|
|
||||||
let (total_count, _) =
|
|
||||||
decode::<usize>(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE])
|
|
||||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?;
|
|
||||||
let (primary_column, _) =
|
|
||||||
decode::<Column>(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE])
|
|
||||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?;
|
|
||||||
let header = StoreHeader {
|
|
||||||
number_of_columns,
|
|
||||||
deleted_count,
|
|
||||||
total_count,
|
|
||||||
primary_column,
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(header)
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns new count
|
|
||||||
fn increment_total_count(&mut self) -> usize {
|
|
||||||
self.total_count += 1;
|
|
||||||
self.total_count
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns new count
|
|
||||||
fn increment_deleted_count(&mut self) -> usize {
|
|
||||||
self.deleted_count += 1;
|
|
||||||
self.deleted_count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ====Entry====
|
|
||||||
impl EntryHeader {
|
|
||||||
fn encode(self: &EntryHeader) -> Result<Vec<u8>> {
|
|
||||||
let result: Vec<u8> = encode(&self.is_deleted)?;
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EntryHeaderWithDataSize {
|
|
||||||
fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result<Self> {
|
|
||||||
let (is_deleted, _) =
|
|
||||||
decode::<bool>(&bytes)
|
|
||||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
|
|
||||||
|
|
||||||
let data_sizes = decode_sequence::<usize>(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..])
|
|
||||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?;
|
|
||||||
|
|
||||||
Ok(Self { is_deleted, data_sizes } )
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl <T>Entry<T> {
|
|
||||||
pub fn new(data: Vec<T>) -> Self {
|
|
||||||
Self { header: EntryHeader { is_deleted: false }, data }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new_deleted(data: Vec<T>) -> Self {
|
|
||||||
Self { header: EntryHeader { is_deleted: true}, data }
|
|
||||||
}
|
|
||||||
|
|
||||||
// FORMAT: [EntryHeaderWithDataSize, ..sequence of data]
|
|
||||||
fn encode(&self) -> Result<Vec<u8>>
|
|
||||||
where T: Encode
|
|
||||||
{
|
|
||||||
let mut result: Vec<u8> = self.header.encode()?;
|
|
||||||
|
|
||||||
let (mut encoded_data, sizes) = encode_sequence_with_sizes(&self.data[..])?;
|
|
||||||
result.append(&mut encode_sequence(&sizes)?); // sizes of data (fixed by number of columns)
|
|
||||||
result.append(&mut encoded_data); // data variable size
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl <T>EntryDetailed<T> {
|
|
||||||
fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result<Self>
|
|
||||||
where T: Decode
|
|
||||||
{
|
|
||||||
let data = decode_sequence::<T>(number_of_columns, bytes)
|
|
||||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?;
|
|
||||||
Ok(EntryDetailed { header, data })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// impl StorageEngine for ColumnStore {
|
// impl StorageEngine for ColumnStore {
|
||||||
// async fn append(&mut self, id: Index, entry: Row<T>) -> Result<???, Error>
|
// async fn append(&mut self, id: Index, entry: Row<T>) -> Result<???, Error>
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue