Print first n entries

This commit is contained in:
Yuriy Dupyn 2024-02-02 13:56:37 +01:00
parent eb034592fa
commit cad4ba8215
5 changed files with 481 additions and 316 deletions

View file

@ -1,101 +1,29 @@
use tokio::io::{BufReader, BufWriter, AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom};
use tokio::sync::{Mutex, RwLock};
use tokio::fs::{File, OpenOptions};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom};
use tokio::fs::{File, OpenOptions, DirBuilder};
use std::path::Path;
use std::marker::PhantomData;
use bincode;
use bincode::de::Decoder;
use bincode::enc::write::Writer;
use bincode::enc::Encoder;
use bincode::{BorrowDecode, Decode, Encode};
use bincode::config::{BigEndian, Configuration, Fixint};
use bincode::{Decode, Encode};
use crate::binary_coding::{encode, decode, encode_sequence, decode_sequence};
use tokio::fs;
use crate::error::{Error, DecodeErrorKind};
use std::mem::size_of;
type Result<T> = std::result::Result<T, Error>;
// =============Byte encoding/decoding============
const BIN_CONFIG: Configuration<BigEndian, Fixint> = bincode::config::standard().with_big_endian().with_fixed_int_encoding();
pub type Column = u64;
pub type Cursor = u64;
fn encode<T: Encode>(t: &T) -> Result<Vec<u8>, bincode::error::EncodeError> {
bincode::encode_to_vec(t, BIN_CONFIG)
}
fn decode<T: Decode>(bytes: &[u8]) -> Result<(T, usize), bincode::error::DecodeError> {
bincode::decode_from_slice(bytes, BIN_CONFIG)
}
fn encode_vector<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
let size: usize = ts.len();
let mut result = encode(&size)?;
for t in ts {
result.append(&mut encode(&t)?);
}
Ok(result)
}
fn decode_vector<T: Decode>(bytes: &[u8]) -> Result<Vec<T>, bincode::error::DecodeError> {
let mut offset = size_of::<usize>();
let result_len: usize = decode(&bytes[..offset])?.0;
let mut result: Vec<T> = Vec::with_capacity(result_len);
for _ in 0..result_len {
let (x, bytes_consumed) = decode::<T>(&bytes[offset..])?;
offset += bytes_consumed;
result.push(x);
}
Ok(result)
}
// We don't care about encoding the length here (since it will be used for a row with known column
// size)
fn encode_sequence<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
let mut result = vec![];
for t in ts {
result.append(&mut encode(&t)?);
}
Ok(result)
}
fn decode_sequence<T: Decode>(len: usize, bytes: &[u8]) -> Result<Vec<T>, bincode::error::DecodeError> {
let mut result: Vec<T> = Vec::with_capacity(len);
let mut offset = 0;
for _ in 0..len {
let (x, bytes_consumed) = decode::<T>(&bytes[offset..])?;
offset += bytes_consumed;
result.push(x);
}
Ok(result)
}
fn example_encoding_decoding() {
let xs: Vec<u32> = vec![123, 250, 256, 123, 123, 123];
let xs: Vec<u32> = vec![];
let xs: Vec<u32> = vec![123];
let xs: Vec<u32> = vec![123, 250];
let xs: Vec<String> = vec!["foo".to_string(), "bar".to_string()];
println!("original {:?}", xs);
let exs = encode_vector(&xs[..]).unwrap();
println!("encoded {:?}", exs);
// WARNING: Don't forget to specify the type here
// let dxs = decode_vector::<u32>(&exs[..]).unwrap();
let dxs = decode_vector::<String>(&exs[..]).unwrap();
println!("decoded {:?}", dxs);
}
// ============Column Store===============
// ColumnStore
// TODO: Consider introducing a phantom type for the data that's used in the store.
#[derive(Debug)]
pub struct Store {
column_file_name: String,
// TODO: This should be private
pub file: File,
header: StoreHeader
pub struct Store<T> {
table_folder: String,
file: File,
header: StoreHeader,
data_type: PhantomData<T>,
// meta
// location of rows file
// locations of index files
@ -104,246 +32,322 @@ pub struct Store {
// list
}
// These type aliases are here because they make writing decoders easier.
type NumberOfColumns = usize;
type DeletedCount = usize;
#[derive(Debug)]
pub struct StoreHeader {
number_of_columns: NumberOfColumns,
deleted_count: DeletedCount,
number_of_columns: usize,
deleted_count: usize,
}
impl StoreHeader {
const NUMBER_OF_COLUMNS_SIZE: usize = size_of::<usize>();
const DELETED_COUNT_SIZE: usize = size_of::<usize>();
const SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE;
}
#[derive(Debug)]
pub struct EntryHeader {
is_deleted: bool,
}
impl EntryHeader {
const IS_DELETED_SIZE: usize = size_of::<bool>();
const HEADER_SIZE: usize = Self::IS_DELETED_SIZE;
}
#[derive(Debug)]
pub struct EntryHeaderWithDataSize {
is_deleted: bool,
data_size: usize, // in bytes
}
impl EntryHeaderWithDataSize {
const IS_DELETED_SIZE: usize = size_of::<bool>();
const DATA_SIZE_SIZE: usize = size_of::<usize>();
const SIZE: usize = Self::IS_DELETED_SIZE + Self::DATA_SIZE_SIZE;
}
#[derive(Debug)]
pub struct Entry<T> {
is_deleted: bool,
// file_position: FilePosition,
header: EntryHeader,
data: Vec<T>,
}
#[derive(Debug)]
pub enum Error {
DecodeError(DecodeErrorKind, bincode::error::DecodeError),
EncodeError(bincode::error::EncodeError),
IoError(std::io::Error),
InvalidStoreHeader,
pub struct EntryDetailed<T> {
header: EntryHeaderWithDataSize,
data: Vec<T>,
}
#[derive(Debug)]
pub enum DecodeErrorKind {
StoreHeaderNumberOfColumns,
StoreHeaderDeletedCount,
EntryData,
EntryIsDeleted,
EntryDataSize
//===Store===
pub async fn store_exists(table_folder: &str) -> Result<bool> {
Ok(fs::metadata(table_folder).await.is_ok())
}
// ===Errors===
impl Error {
pub fn to_io_or_panic(self) -> std::io::Error {
use Error::*;
match self {
IoError(err) => err,
err => {
println!("{:?}", err);
panic!();
}
}
}
}
impl From<bincode::error::EncodeError> for Error {
fn from(err: bincode::error::EncodeError) -> Self {
Self::EncodeError(err)
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self::IoError(err)
}
}
// ====Entry====
impl <T>Entry<T> {
pub fn new(data: Vec<T>) -> Self {
Self { is_deleted: false, data }
impl <T>Store<T> {
//===primitive file operations===
// Moves the cursor right.
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
Ok(self.file.write(bytes).await?)
}
pub fn new_deleted(data: Vec<T>) -> Self {
Self { is_deleted: true, data }
// Moves the cursor right.
async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> {
self.file.read_exact(bytes).await?;
Ok(())
}
// FORMAT: [HEADER, ..sequence of data]
// HEADER: [Boolean (one byte), number of bytes in the data (not including the boolean)]
fn encode(self: &Entry<T>) -> Result<Vec<u8>, Error>
where T: Encode
{
let mut result: Vec<u8> = encode(&self.is_deleted)?; // bool 1 byte
let mut encoded_data = encode_sequence(&self.data[..])?;
let encoded_data_len = encoded_data.len();
result.append(&mut encode(&encoded_data_len)?); // usize 8 bytes
println!("enc data len == {}", encoded_data_len);
println!("encoded_data == {:?} ", encoded_data);
result.append(&mut encoded_data); // data variable size
// Moves the cursor right.
async fn get_bytes(&mut self, count: usize) -> Result<Vec<u8>> {
let mut result: Vec<u8> = Vec::with_capacity(count);
self.read_bytes(&mut result).await?;
Ok(result)
}
// in bytes
pub fn header_size() -> usize {
size_of::<bool>() + size_of::<usize>()
async fn seek_to(&mut self, cursor: Cursor) -> Result<()>{
self.file.seek(SeekFrom::Start(cursor)).await?;
Ok(())
}
// TODO: Maybe introduce an EntryHeader as a separate type?
pub fn decode_header(header_bytes: Vec<u8>) -> Result<(bool, usize), Error> {
let (is_deleted, offset) =
decode::<bool>(&header_bytes)
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
async fn seek_to_start(&mut self) -> Result<()> {
self.file.seek(SeekFrom::Start(0)).await?;
Ok(())
}
let (data_size, _) =
decode::<usize>(&header_bytes[offset..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryDataSize, e))?;
Ok((is_deleted, data_size))
async fn seek_to_end(&mut self) -> Result<()> {
self.file.seek(SeekFrom::End(0)).await?;
Ok(())
}
async fn seek_to_start_of_data(&mut self) -> Result<()> {
self.seek_to(StoreHeader::SIZE as u64).await
}
async fn current_cursor(&mut self) -> Result<Cursor> {
let next_cursor: Cursor = self.file.stream_position().await?;
Ok(next_cursor)
}
// For debugging.
// Moves cursor to the end.
pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>{
let mut bytes: Vec<u8> = vec![];
self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?;
self.file.read_to_end(&mut bytes).await?;
Ok(bytes)
}
const ROWS_FILE_NAME: &'static str = "rows";
// ===Creation===
pub async fn new(table_folder: &str, number_of_columns: usize) -> Result<Self> {
let path_to_table = Path::new(table_folder);
let path_to_rows = path_to_table.join(Self::ROWS_FILE_NAME);
DirBuilder::new()
.create(path_to_table).await?;
let file: File =
OpenOptions::new()
.write(true)
.read(true)
.create_new(true)
.open(path_to_rows)
.await?;
let header = StoreHeader {
number_of_columns,
deleted_count: 0,
};
let encoded_header: Vec<u8> = header.encode()?;
let mut store = Self {
table_folder: table_folder.to_string(),
file,
header,
data_type: PhantomData::<T>,
};
store.write_bytes(&encoded_header).await?;
Ok(store)
}
pub async fn connect(table_folder: &str) -> Result<Self> {
let path_to_table = Path::new(table_folder);
let path_to_rows = path_to_table.join(Self::ROWS_FILE_NAME);
let mut file: File =
OpenOptions::new()
.read(true)
.write(true)
.open(path_to_rows)
.await?;
// Unfortunately we can't yet use store.read_bytes, since it can't be created without the
// header.
let mut header_bytes = StoreHeader::decode_buffer();
file.read_exact(&mut header_bytes).await?;
let header = StoreHeader::decode(&mut header_bytes).await?;
Ok(Self {
table_folder: table_folder.to_string(),
file,
header,
data_type: PhantomData::<T>,
})
}
// ===Append Entry===
// Moves cursor to the end.
pub async fn append_entry(&mut self, entry: &Entry<T>) -> Result<Cursor>
where T: Encode
{
let encoded_entry: Vec<u8> = entry.encode()?;
self.seek_to_end().await?;
let cursor: Cursor = self.current_cursor().await?;
self.write_bytes(&encoded_entry).await?;
Ok(cursor)
}
// ===Lookup===
// WARNING: The cursor has to be at the start of an entry. Otherwise garbage data will be
// decoded as an entry.
pub async fn read_entry_header_at(&mut self, cursor: Cursor) -> Result<EntryHeaderWithDataSize> {
self.seek_to(cursor).await?;
self.file.seek(SeekFrom::Start(cursor)).await?;
let mut header_bytes: Vec<u8> = vec![0; EntryHeaderWithDataSize::SIZE];
self.read_bytes(&mut header_bytes).await?;
let header = EntryHeaderWithDataSize::decode(&mut header_bytes[..])?;
// TODO: Get rid of the println's
// println!("HEADER_BYTES: {:?}", header_bytes);
// println!("HEADER: {:?}", header);
Ok(header)
}
pub async fn read_entry_data(&mut self, header: &EntryHeaderWithDataSize) -> Result<Vec<T>> {
let mut data_bytes: Vec<u8> = vec![0; header.data_size];
// TODO: Get rid of the println's
// println!("HEADER_BYTES: {:?}", header_bytes);
// println!("PREPARED_DATA_BYTES: {:?}", data_bytes);
self.read_bytes(&mut data_bytes).await?;
todo!()
}
pub async fn read_entry_at(&mut self, cursor: Cursor) -> Result<EntryDetailed<T>>
where T: Decode
{
let header = self.read_entry_header_at(cursor).await?;
let mut data_bytes: Vec<u8> = vec![0; header.data_size];
// TODO: Get rid of the println's
// println!("PREPARED_DATA_BYTES: {:?}", data_bytes);
self.read_bytes(&mut data_bytes).await?;
// println!("DATA_BYTES: {:?}", data_bytes);
let entry: EntryDetailed<T> =
EntryDetailed::decode(header, self.header.number_of_columns, &mut data_bytes)?;
Ok(entry)
}
pub async fn read_entries(&mut self, n: usize) -> Result<()>
where T: Decode + std::fmt::Debug
{
self.seek_to_start_of_data().await?;
let mut cursor: Cursor = self.current_cursor().await?;
for i in 0..n {
let entry = self.read_entry_at(cursor).await?;
println!("({}, {:?})", i, entry);
cursor = self.current_cursor().await?;
}
Ok(())
}
}
pub type Column = u64;
pub type Cursor = u64;
// ===Store Header===
impl StoreHeader {
fn encode(&self) -> Result<Vec<u8>, Error> {
fn encode(&self) -> Result<Vec<u8>> {
// FORMAT: First Number of Columns, Then Deleted Count.
let mut result = encode(&self.number_of_columns)?;
result.append(&mut encode(&self.deleted_count)?);
Ok(result)
}
async fn decode(file: &mut File) -> Result<StoreHeader, Error> {
let number_of_columns_size = size_of::<NumberOfColumns>();
let deleted_count_size = size_of::<DeletedCount>();
let header_size: usize = number_of_columns_size + deleted_count_size;
let mut header_bytes: Vec<u8> = vec![0; header_size];
// TODO: Why do we need to have mutable reference for the file when we are reading it?
match file.read_exact(&mut header_bytes).await {
Ok(_) => {
let offset = 0;
let (number_of_columns, offset) =
decode::<NumberOfColumns>(&header_bytes[offset..offset + number_of_columns_size])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
let (deleted_count, _) =
decode::<DeletedCount>(&header_bytes[offset..offset + deleted_count_size])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?;
let header = StoreHeader {
number_of_columns,
deleted_count,
};
Ok(header)
},
Err(err) => {
// TODO: When err is of the kind UnexpectedEof, return InvalidStoreHeader
println!("ARE WE HERE?");
Err(Error::from(err))
}
}
}
}
impl Store {
// For debugging
pub async fn get_all_bytes(mut self) -> Result<Vec<u8>, Error>{
let mut bytes: Vec<u8> = vec![];
use std::io::Read;
// for byte in self.file.bytes() {
// }
self.file.read_exact(&mut bytes[..]).await?;
Ok(bytes)
fn decode_buffer() -> [u8; StoreHeader::SIZE] {
[0; StoreHeader::SIZE]
}
pub async fn new(column_file_name: &str, number_of_columns: usize) -> Result<Self, Error> {
let mut file =
OpenOptions::new()
.write(true)
.read(true)
.create_new(true)
.open(column_file_name)
.await?;
async fn decode(result: &mut [u8]) -> Result<StoreHeader> {
let offset = 0;
let (number_of_columns, offset) =
decode::<usize>(&result[offset..offset + Self::NUMBER_OF_COLUMNS_SIZE])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
let (deleted_count, _) =
decode::<usize>(&result[offset..offset + Self::DELETED_COUNT_SIZE])
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?;
let header = StoreHeader {
number_of_columns,
deleted_count: 0,
} ;
let encoded_header = header.encode()?;
file.write(&encoded_header).await?;
println!("is something being encoded? {:?}", encoded_header);
let store = Self {
column_file_name: column_file_name.to_string(),
file,
header,
deleted_count,
};
Ok(store)
}
pub async fn connect(column_file_name: &str) -> Result<Self, Error> {
let mut file = OpenOptions::new().read(true).write(true).open(column_file_name).await?;
let header = StoreHeader::decode(&mut file).await?;
Ok(Self {
column_file_name: column_file_name.to_string(),
file,
header
})
}
pub async fn entry_at<T: Decode>(&mut self, cursor: Cursor) -> Result<Entry<T>, Error> {
self.file.seek(SeekFrom::Start(cursor)).await?;
// 1. read header bytes (fixed number of bytes).
// 2. decode header
// 3. read entry data bytes.
// 4. decode data
// That will tell us how much data there is.
let entry_header_size = Entry::<T>::header_size();
let mut header_bytes: Vec<u8> = vec![0; entry_header_size];
self.file.read_exact(&mut header_bytes).await?;
println!("cursor == {}", cursor);
println!("header_bytes == {:?}", header_bytes);
let (is_deleted, data_size) = Entry::<T>::decode_header(header_bytes)?;
self.file.seek(SeekFrom::Current(entry_header_size as i64)).await?;
let mut data_bytes: Vec<u8> = vec![0; data_size];
println!("(is_delted, data_size) = ({}, {})", is_deleted, data_size);
let data =
decode_sequence::<T>(self.header.number_of_columns, &mut data_bytes)
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?;
Ok(Entry {
is_deleted,
data
})
}
pub async fn append_entry<T: Encode>(&mut self, entry: &Entry<T>) -> Result<Cursor, Error> {
// On linux when opening a file in append mode, the seek is set to 0
// and only updated after a write. That's why we do the cursor gymnastic at the end.
let encoded_entry: Vec<u8> = entry.encode()?;
println!("encoded_entry == {:?}", encoded_entry);
println!("bool size == {}", size_of::<bool>());
println!("usize size == {}", size_of::<usize>());
self.file.write(&encoded_entry).await?;
let next_cursor: Cursor = self.file.stream_position().await?;
let cursor: Cursor = next_cursor - encoded_entry.len() as u64;
Ok(cursor)
}
pub async fn iterate_all<T>(&mut self) -> Result<Cursor, Error> {
// Loop through the rows and print them out
todo!()
Ok(header)
}
}
// ====Entry====
impl EntryHeader {
fn encode(self: &EntryHeader) -> Result<Vec<u8>> {
let result: Vec<u8> = encode(&self.is_deleted)?;
Ok(result)
}
}
impl EntryHeaderWithDataSize {
fn decode(bytes: &mut [u8]) -> Result<Self> {
let (is_deleted, offset) =
decode::<bool>(&bytes)
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
let (data_size, _) =
decode::<usize>(&bytes[offset..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryDataSize, e))?;
Ok(Self { is_deleted, data_size} )
}
}
impl <T>Entry<T> {
pub fn new(data: Vec<T>) -> Self {
Self { header: EntryHeader { is_deleted: false }, data }
}
pub fn new_deleted(data: Vec<T>) -> Self {
Self { header: EntryHeader { is_deleted: true}, data }
}
// FORMAT: [HEADER, ..sequence of data]
// HEADER: [Boolean (one byte), number of bytes in the data (not including the boolean)]
fn encode(self: &Entry<T>) -> Result<Vec<u8>>
where T: Encode
{
let mut result: Vec<u8> = self.header.encode()?;
let mut encoded_data = encode_sequence(&self.data[..])?;
let encoded_data_len: usize = encoded_data.len();
result.append(&mut encode(&encoded_data_len)?); // usize 8 bytes
result.append(&mut encoded_data); // data variable size
Ok(result)
}
}
impl <T>EntryDetailed<T> {
fn decode(header: EntryHeaderWithDataSize, number_of_columns: usize, bytes: &[u8]) -> Result<Self>
where T: Decode
{
let data = decode_sequence::<T>(number_of_columns, bytes)
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?;
Ok(EntryDetailed { header, data })
}
}
// impl StorageEngine for ColumnStore {
// async fn append(&mut self, id: Index, entry: Row<T>) -> Result<???, Error>