First attempt at storage engine

This commit is contained in:
Yuriy Dupyn 2024-02-01 03:02:12 +01:00
parent ee757c7ca2
commit bd3fdb266f
6 changed files with 514 additions and 1 deletions

View file

@ -0,0 +1,436 @@
use tokio::io::{BufReader, BufWriter, AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom};
use tokio::sync::{Mutex, RwLock};
use tokio::fs::{File, OpenOptions};
use tokio::fs;
use bincode;
use bincode::de::Decoder;
use bincode::enc::write::Writer;
use bincode::enc::Encoder;
use bincode::{BorrowDecode, Decode, Encode};
use bincode::config::{BigEndian, Configuration, Fixint};
use std::mem::size_of;
// =============Byte encoding/decoding============
const BIN_CONFIG: Configuration<BigEndian, Fixint> = bincode::config::standard().with_big_endian().with_fixed_int_encoding();
fn encode<T: Encode>(t: &T) -> Result<Vec<u8>, bincode::error::EncodeError> {
bincode::encode_to_vec(t, BIN_CONFIG)
}
fn decode<T: Decode>(bytes: &[u8]) -> Result<(T, usize), bincode::error::DecodeError> {
bincode::decode_from_slice(bytes, BIN_CONFIG)
}
fn encode_vector<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
let size: usize = ts.len();
let mut result = encode(&size)?;
for t in ts {
result.append(&mut encode(&t)?);
}
Ok(result)
}
fn decode_vector<T: Decode>(bytes: &[u8]) -> Result<Vec<T>, bincode::error::DecodeError> {
let mut offset = size_of::<usize>();
let result_len: usize = decode(&bytes[..offset])?.0;
let mut result: Vec<T> = Vec::with_capacity(result_len);
for _ in 0..result_len {
let (x, bytes_consumed) = decode::<T>(&bytes[offset..])?;
offset += bytes_consumed;
result.push(x);
}
Ok(result)
}
// We don't care about encoding the length here (since it will be used for a row with known column
// size)
fn encode_sequence<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
let mut result = vec![];
for t in ts {
result.append(&mut encode(&t)?);
}
Ok(result)
}
fn decode_sequence<T: Decode>(len: usize, bytes: &[u8]) -> Result<Vec<T>, bincode::error::DecodeError> {
let mut result: Vec<T> = Vec::with_capacity(len);
let mut offset = 0;
for _ in 0..len {
let (x, bytes_consumed) = decode::<T>(&bytes[offset..])?;
offset += bytes_consumed;
result.push(x);
}
Ok(result)
}
fn example_encoding_decoding() {
let xs: Vec<u32> = vec![123, 250, 256, 123, 123, 123];
let xs: Vec<u32> = vec![];
let xs: Vec<u32> = vec![123];
let xs: Vec<u32> = vec![123, 250];
let xs: Vec<String> = vec!["foo".to_string(), "bar".to_string()];
println!("original {:?}", xs);
let exs = encode_vector(&xs[..]).unwrap();
println!("encoded {:?}", exs);
// WARNING: Don't forget to specify the type here
// let dxs = decode_vector::<u32>(&exs[..]).unwrap();
let dxs = decode_vector::<String>(&exs[..]).unwrap();
println!("decoded {:?}", dxs);
}
// ============Column Store===============
// ColumnStore
#[derive(Debug)]
pub struct Store {
column_file_name: String,
// TODO: This should be private
pub file: File,
header: StoreHeader
// meta
// location of rows file
// locations of index files
//
// rows file
// list
}
// These type aliases are here because they make writing decoders easier.
type NumberOfColumns = usize;
type DeletedCount = usize;
#[derive(Debug)]
pub struct StoreHeader {
number_of_columns: NumberOfColumns,
deleted_count: DeletedCount,
}
#[derive(Debug)]
pub struct Entry<T> {
is_deleted: bool,
// file_position: FilePosition,
data: Vec<T>,
}
#[derive(Debug)]
pub enum Error {
DecodeError(DecodeErrorKind, bincode::error::DecodeError),
EncodeError(bincode::error::EncodeError),
IoError(std::io::Error),
InvalidStoreHeader,
}
#[derive(Debug)]
pub enum DecodeErrorKind {
StoreHeaderNumberOfColumns,
StoreHeaderDeletedCount,
EntryData,
EntryIsDeleted,
EntryDataSize
}
// ===Errors===
impl Error {
pub fn to_io_or_panic(self) -> std::io::Error {
use Error::*;
match self {
IoError(err) => err,
err => {
println!("{:?}", err);
panic!();
}
}
}
}
impl From<bincode::error::EncodeError> for Error {
fn from(err: bincode::error::EncodeError) -> Self {
Self::EncodeError(err)
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self::IoError(err)
}
}
// ====Entry====
impl <T>Entry<T> {
pub fn new(data: Vec<T>) -> Self {
Self { is_deleted: false, data }
}
pub fn new_deleted(data: Vec<T>) -> Self {
Self { is_deleted: true, data }
}
// FORMAT: [HEADER, ..sequence of data]
// HEADER: [Boolean (one byte), number of bytes in the data (not including the boolean)]
fn encode(self: &Entry<T>) -> Result<Vec<u8>, Error>
where T: Encode
{
let mut result: Vec<u8> = encode(&self.is_deleted)?; // bool 1 byte
let mut encoded_data = encode_sequence(&self.data[..])?;
let encoded_data_len = encoded_data.len();
result.append(&mut encode(&encoded_data_len)?); // usize 8 bytes
println!("enc data len == {}", encoded_data_len);
println!("encoded_data == {:?} ", encoded_data);
result.append(&mut encoded_data); // data variable size
Ok(result)
}
// in bytes
pub fn header_size() -> usize {
size_of::<bool>() + size_of::<usize>()
}
// TODO: Maybe introduce an EntryHeader as a separate type?
pub fn decode_header(header_bytes: Vec<u8>) -> Result<(bool, usize), Error> {
let (is_deleted, offset) =
decode::<bool>(&header_bytes)
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
let (data_size, _) =
decode::<usize>(&header_bytes[offset..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryDataSize, e))?;
Ok((is_deleted, data_size))
}
}
pub type Column = u64;
pub type Cursor = u64;
impl StoreHeader {
fn encode(&self) -> Result<Vec<u8>, Error> {
// FORMAT: First Number of Columns, Then Deleted Count.
let mut result = encode(&self.number_of_columns)?;
result.append(&mut encode(&self.deleted_count)?);
Ok(result)
}
async fn decode(file: &mut File) -> Result<StoreHeader, Error> {
let number_of_columns_size = size_of::<NumberOfColumns>();
let deleted_count_size = size_of::<DeletedCount>();
let header_size: usize = number_of_columns_size + deleted_count_size;
let mut header_bytes: Vec<u8> = vec![0; header_size];
// TODO: Why do we need to have mutable reference for the file when we are reading it?
match file.read_exact(&mut header_bytes).await {
Ok(_) => {
let offset = 0;
let (number_of_columns, offset) =
decode::<NumberOfColumns>(&header_bytes[offset..offset + number_of_columns_size])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
let (deleted_count, _) =
decode::<DeletedCount>(&header_bytes[offset..offset + deleted_count_size])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?;
let header = StoreHeader {
number_of_columns,
deleted_count,
};
Ok(header)
},
Err(err) => {
// TODO: When err is of the kind UnexpectedEof, return InvalidStoreHeader
println!("ARE WE HERE?");
Err(Error::from(err))
}
}
}
}
impl Store {
// For debugging
pub async fn get_all_bytes(mut self) -> Result<Vec<u8>, Error>{
let mut bytes: Vec<u8> = vec![];
use std::io::Read;
// for byte in self.file.bytes() {
// }
self.file.read_exact(&mut bytes[..]).await?;
Ok(bytes)
}
pub async fn new(column_file_name: &str, number_of_columns: usize) -> Result<Self, Error> {
let mut file =
OpenOptions::new()
.write(true)
.read(true)
.create_new(true)
.open(column_file_name)
.await?;
let header = StoreHeader {
number_of_columns,
deleted_count: 0,
} ;
let encoded_header = header.encode()?;
file.write(&encoded_header).await?;
println!("is something being encoded? {:?}", encoded_header);
let store = Self {
column_file_name: column_file_name.to_string(),
file,
header,
};
Ok(store)
}
pub async fn connect(column_file_name: &str) -> Result<Self, Error> {
let mut file = OpenOptions::new().read(true).write(true).open(column_file_name).await?;
let header = StoreHeader::decode(&mut file).await?;
Ok(Self {
column_file_name: column_file_name.to_string(),
file,
header
})
}
pub async fn entry_at<T: Decode>(&mut self, cursor: Cursor) -> Result<Entry<T>, Error> {
self.file.seek(SeekFrom::Start(cursor)).await?;
// 1. read header bytes (fixed number of bytes).
// 2. decode header
// 3. read entry data bytes.
// 4. decode data
// That will tell us how much data there is.
let entry_header_size = Entry::<T>::header_size();
let mut header_bytes: Vec<u8> = vec![0; entry_header_size];
self.file.read_exact(&mut header_bytes).await?;
println!("cursor == {}", cursor);
println!("header_bytes == {:?}", header_bytes);
let (is_deleted, data_size) = Entry::<T>::decode_header(header_bytes)?;
self.file.seek(SeekFrom::Current(entry_header_size as i64)).await?;
let mut data_bytes: Vec<u8> = vec![0; data_size];
println!("(is_delted, data_size) = ({}, {})", is_deleted, data_size);
let data =
decode_sequence::<T>(self.header.number_of_columns, &mut data_bytes)
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?;
Ok(Entry {
is_deleted,
data
})
}
pub async fn append_entry<T: Encode>(&mut self, entry: &Entry<T>) -> Result<Cursor, Error> {
// On linux when opening a file in append mode, the seek is set to 0
// and only updated after a write. That's why we do the cursor gymnastic at the end.
let encoded_entry: Vec<u8> = entry.encode()?;
println!("encoded_entry == {:?}", encoded_entry);
println!("bool size == {}", size_of::<bool>());
println!("usize size == {}", size_of::<usize>());
self.file.write(&encoded_entry).await?;
let next_cursor: Cursor = self.file.stream_position().await?;
let cursor: Cursor = next_cursor - encoded_entry.len() as u64;
Ok(cursor)
}
pub async fn iterate_all<T>(&mut self) -> Result<Cursor, Error> {
// Loop through the rows and print them out
todo!()
}
}
// impl StorageEngine for ColumnStore {
// async fn append(&mut self, id: Index, entry: Row<T>) -> Result<???, Error>
// async fn get_all(&self) -> ???
// async fn get_eq(&self, column: Column, value: T) -> ???
// async fn delete_all(&mut self)
// async fn delete_eq(&mut self, column: Column, value: T) -> ???
// }
// struct Error {
// }
// Selected(
// &'a TableSchema,
// ColumnSelection,
// TODO: Don't do the Box(dyn Iterator<...>)
// you'll have a concrete implementation of Iterator, and that's what you'll use
// Box<dyn Iterator<Item = RestrictedRow> + 'a + Send>,
// ),
// #[async_trait]
// trait StorageEngine<T>
// where T: Encode + Decode
// {
// async fn append(&mut self, id: Index, entry: Row<T>) -> Result<???, Error>
// async fn get_all(&self) -> ???
// async fn get_eq(&self, column: Column, value: T) -> ???
// async fn delete_all(&mut self)
// async fn delete_eq(&mut self, column: Column, value: T) -> ???
// }
// #[cfg(test)]
// mod tests {
// #[test]
// fn hello_test() {
// assert!(true);
// }
// }
// let sroage_engine = STorageEngine::new("users")
// let mut next_position = 0
// type FilePosition = usize;
// type StoreFile = Vec<Entry>;
// type IndexFile = ???
// struct IndexEntry {
// }
// #00000 [false, u26, "Arnold", "schwarzenegger", "gettothechoppa@yahoo.com"] #5120000 [true, u27, "Arnold", "Vosloo", "avosloo@aol.com"]
// #00000 [true, u27, "Arnold", "Vosloo", "avosloo@aol.com"]
// at #00000 512 kb deleted,
// ...
// [(u26, [#00000]), (u27, [#5120000])]
// [("Arnold", [#000000, #5120000]), ("Arnfsdaf", []), ("Adasdsd", []), ("Bdsad", [])]
// // basically always keep indexes in memory and on write always sync on disk
// CREATE INDEX usersname ON "users" (name);
// INSERT INTO users (id, name, surname, email) VALUES (u26, "Arnold", "schwarzenegger", "gettothechoppa@yahoo.com");
// INSERT INTO users (id, name, surname, email) VALUES (u27, "Arnold", "Vosloo", "avosloo@aol.com");
// SELECT * FROM users WHERE id=u26;
// SELECT * FROM users WHERE name="Arnold";
// SELECT * FROM cars;
// DELETE FROM users WHERE name="Arnold";