Add which columns are indexable to store header

This commit is contained in:
Yuriy Dupyn 2024-02-04 19:00:50 +01:00
parent 4c0f91ad33
commit f2c17d2e66
5 changed files with 100 additions and 46 deletions

View file

@ -72,10 +72,6 @@ pub trait PrimitiveCursor<T> {
Ok(file_position) Ok(file_position)
} }
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
self.seek_to(StoreHeader::SIZE as u64).await
}
// Seeks from current position by offset and returns new file position // Seeks from current position by offset and returns new file position
async fn seek_by(&mut self, offset: i64) -> Result<FilePosition> { async fn seek_by(&mut self, offset: i64) -> Result<FilePosition> {
let file_position = self.file().seek(SeekFrom::Current(offset)).await?; let file_position = self.file().seek(SeekFrom::Current(offset)).await?;
@ -98,6 +94,10 @@ pub trait PrimitiveCursor<T> {
pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> { pub trait CursorWithStoreHeader<T>: PrimitiveCursor<T> {
fn header(&self) -> &StoreHeader; fn header(&self) -> &StoreHeader;
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await
}
async fn read_entry_header(&mut self) -> Result<EntryHeaderWithDataSize> { async fn read_entry_header(&mut self) -> Result<EntryHeaderWithDataSize> {
let number_of_columns: usize = self.header().number_of_columns; let number_of_columns: usize = self.header().number_of_columns;
let mut header_bytes: Vec<u8> = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; let mut header_bytes: Vec<u8> = vec![0; EntryHeaderWithDataSize::size(number_of_columns)];
@ -349,7 +349,7 @@ impl <'cursor, T> WriteCursor<'cursor, T>
Ok(cursor) Ok(cursor)
} }
pub async fn connect<'header: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader) -> Result<Self> pub async fn connect<'header: 'cursor, 'indexes: 'cursor>(path_to_rows: &str, header: &'header mut StoreHeader, indexes: &'indexes mut Vec<Option<Index<T, FilePosition>>>) -> Result<Self>
where T: Send where T: Send
{ {
let file: File = let file: File =
@ -363,7 +363,7 @@ impl <'cursor, T> WriteCursor<'cursor, T>
header, header,
file, file,
data_type: PhantomData::<T>, data_type: PhantomData::<T>,
indexes: todo!(), indexes,
eof_file_position: 0, eof_file_position: 0,
}; };
@ -485,7 +485,8 @@ impl <'cursor, T> WriteCursor<'cursor, T>
number_of_columns: self.header.number_of_columns, number_of_columns: self.header.number_of_columns,
deleted_count: 0, deleted_count: 0,
total_count: 0, total_count: 0,
primary_column: self.header.primary_column primary_column: self.header.primary_column,
indexed_columns: todo!()
}; };
// Creates a new cursor to the intermediate file in which we'll dump the live entries. // Creates a new cursor to the intermediate file in which we'll dump the live entries.

View file

@ -12,6 +12,7 @@ pub enum DecodeErrorKind {
StoreHeaderDeletedCount, StoreHeaderDeletedCount,
StoreHeaderTotalCount, StoreHeaderTotalCount,
StoreHeaderPrimaryColumn, StoreHeaderPrimaryColumn,
StoreHeaderIndexedColumns,
EntryData, EntryData,
EntryIsDeleted, EntryIsDeleted,
EntryHeaderWithDataSizes, EntryHeaderWithDataSizes,

View file

@ -89,9 +89,9 @@ async fn main() -> Result<()> {
} }
{ {
let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?; // let mut cursor = store.write_cursor().await.map_err(|e| e.to_io_or_panic())?;
let entry: Entry<u32> = Entry::new(vec![60, 50, 40, 30, 20]); // let entry: Entry<u32> = Entry::new(vec![60, 50, 40, 30, 20]);
// let file_position = append_entry(&mut cursor, &entry).await?; // let file_position = append_entry(&mut cursor, &entry).await?;
// let file_position = 215; // let file_position = 215;
// cursor.seek_to(file_position).await.map_err(|e| e.to_io_or_panic())?; // cursor.seek_to(file_position).await.map_err(|e| e.to_io_or_panic())?;
@ -107,20 +107,23 @@ async fn main() -> Result<()> {
// println!("{:?}", store); // println!("{:?}", store);
// println!("{:?}", store.read_all_bytes().await?); // println!("{:?}", store.read_all_bytes().await?);
{ {
let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?; cursor.read_entries().await.map_err(|e| e.to_io_or_panic())?;
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; }
// println!("{:?}", x);
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; {
// println!("{:?}", x); let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x); println!("{:?}", x);
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x); println!("{:?}", x);
// let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?; let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
// println!("{:?}", x); println!("{:?}", x);
let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
println!("{:?}", x);
let x = cursor.next().await.map_err(|e| e.to_io_or_panic())?;
println!("{:?}", x);
} }
{ {
@ -138,13 +141,13 @@ async fn main() -> Result<()> {
println!("{:?}", x); println!("{:?}", x);
} }
{ // {
let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?; // let mut cursor = store.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
let column = 3; // let column = 3;
let t0 = 6; // let t0 = 6;
let x = cursor.find_first_eq_bruteforce(column, &t0).await.map_err(|e| e.to_io_or_panic())?; // let x = cursor.find_first_eq_bruteforce(column, &t0).await.map_err(|e| e.to_io_or_panic())?;
println!("{:?}", x); // println!("{:?}", x);
} // }

View file

@ -59,12 +59,17 @@ impl <T>Store<T> {
DirBuilder::new() DirBuilder::new()
.create(path_to_table).await?; .create(path_to_table).await?;
let header = StoreHeader { let header = {
table_folder: table_folder.to_string(), let mut indexed_columns = vec![false; number_of_columns];
number_of_columns, indexed_columns[primary_column as usize] = true;
deleted_count: 0, StoreHeader {
total_count: 0, table_folder: table_folder.to_string(),
primary_column, number_of_columns,
deleted_count: 0,
total_count: 0,
primary_column,
indexed_columns,
}
}; };
// We don't need the file right now. Only cursors will later open it. // We don't need the file right now. Only cursors will later open it.
@ -117,9 +122,16 @@ impl <T>Store<T> {
// Unfortunately we can't yet use store.read_bytes, since it can't be created without the // Unfortunately we can't yet use store.read_bytes, since it can't be created without the
// header. // header.
let mut header_bytes = StoreHeader::decode_buffer(); let header = {
file.read_exact(&mut header_bytes).await?; let mut fixed_header_bytes = StoreHeader::buffer_for_fixed_decoding();
let header = StoreHeader::decode(table_folder, &mut header_bytes).await?; file.read_exact(&mut fixed_header_bytes).await?;
let fixed_header = StoreHeader::decode_fixed(table_folder, &fixed_header_bytes).await?;
// decode the indexes
let mut rest_bytes: Vec<u8> = StoreHeader::buffer_for_rest_decoding(&fixed_header);
file.read_exact(&mut rest_bytes).await?;
StoreHeader::decode_rest(fixed_header, &rest_bytes).await?
};
// let primary_index: Index<T, FilePosition> = Index::connect( // let primary_index: Index<T, FilePosition> = Index::connect(

View file

@ -1,4 +1,4 @@
use crate::binary_coding::{encode, decode}; use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence};
use crate::storage_engine::{Result, Column}; use crate::storage_engine::{Result, Column};
use crate::error::{Error, DecodeErrorKind}; use crate::error::{Error, DecodeErrorKind};
use std::mem::size_of; use std::mem::size_of;
@ -11,8 +11,17 @@ pub struct StoreHeader {
pub deleted_count: usize, pub deleted_count: usize,
pub total_count: usize, pub total_count: usize,
pub primary_column: Column, pub primary_column: Column,
// TODO pub indexed_columns: Vec<bool>,
// pub indexed_columns: Vec<bool>, }
#[derive(Debug, Clone)]
pub struct StoreHeaderFixedPart {
pub table_folder: String, // This one is not encoded into the file
pub number_of_columns: usize,
pub deleted_count: usize,
pub total_count: usize,
pub primary_column: Column,
} }
impl StoreHeader { impl StoreHeader {
@ -20,15 +29,20 @@ impl StoreHeader {
pub const DELETED_COUNT_SIZE: usize = size_of::<usize>(); pub const DELETED_COUNT_SIZE: usize = size_of::<usize>();
pub const TOTAL_COUNT_SIZE: usize = size_of::<usize>(); pub const TOTAL_COUNT_SIZE: usize = size_of::<usize>();
pub const PRIMARY_COLUMN_SIZE: usize = size_of::<Column>(); pub const PRIMARY_COLUMN_SIZE: usize = size_of::<Column>();
pub const SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; pub const FIXED_SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE;
pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0; pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0;
pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE;
pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE; pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE;
pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE; pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE;
pub const INDEXED_COLUMNS_OFFSET: usize = Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE;
fn indexed_columns_size(&self) -> usize { fn indexed_columns_size(number_of_columns: usize) -> usize {
size_of::<bool>() * self.number_of_columns size_of::<bool>() * number_of_columns
}
pub fn size(number_of_columns: usize) -> usize {
Self::FIXED_SIZE + Self::indexed_columns_size(number_of_columns)
} }
pub fn encode(&self) -> Result<Vec<u8>> { pub fn encode(&self) -> Result<Vec<u8>> {
@ -36,14 +50,19 @@ impl StoreHeader {
result.append(&mut encode(&self.deleted_count)?); result.append(&mut encode(&self.deleted_count)?);
result.append(&mut encode(&self.total_count)?); result.append(&mut encode(&self.total_count)?);
result.append(&mut encode(&self.primary_column)?); result.append(&mut encode(&self.primary_column)?);
result.append(&mut encode_sequence(&self.indexed_columns)?);
Ok(result) Ok(result)
} }
pub fn decode_buffer() -> [u8; StoreHeader::SIZE] { pub fn buffer_for_fixed_decoding() -> [u8; Self::FIXED_SIZE] {
[0; StoreHeader::SIZE] [0; Self::FIXED_SIZE]
} }
pub async fn decode(table_folder: &str, result: &mut [u8]) -> Result<StoreHeader> { pub fn buffer_for_rest_decoding(header: &StoreHeaderFixedPart) -> Vec<u8> {
vec![0; Self::indexed_columns_size(header.number_of_columns)]
}
pub async fn decode_fixed(table_folder: &str, result: &[u8]) -> Result<StoreHeaderFixedPart> {
let (number_of_columns, _) = let (number_of_columns, _) =
decode::<usize>(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) decode::<usize>(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
@ -56,7 +75,7 @@ impl StoreHeader {
let (primary_column, _) = let (primary_column, _) =
decode::<Column>(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) decode::<Column>(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?;
let header = StoreHeader { let header = StoreHeaderFixedPart {
table_folder: table_folder.to_string(), table_folder: table_folder.to_string(),
number_of_columns, number_of_columns,
deleted_count, deleted_count,
@ -67,6 +86,24 @@ impl StoreHeader {
Ok(header) Ok(header)
} }
pub async fn decode_rest(header: StoreHeaderFixedPart, result: &[u8]) -> Result<StoreHeader> {
let indexed_columns: Vec<bool> =
decode_sequence::<bool>(header.number_of_columns, result)
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?;
Ok(StoreHeader {
table_folder: header.table_folder,
number_of_columns: header.number_of_columns,
deleted_count: header.deleted_count,
total_count: header.total_count,
primary_column: header.primary_column,
indexed_columns,
})
}
// returns new count // returns new count
pub fn increment_total_count(&mut self) -> usize { pub fn increment_total_count(&mut self) -> usize {
self.total_count += 1; self.total_count += 1;