Rename cursor ~> file_position

This commit is contained in:
Yuriy Dupyn 2024-02-02 19:04:51 +01:00
parent cac34d95e0
commit 28741006e7
2 changed files with 81 additions and 46 deletions

View file

@ -20,7 +20,8 @@ type Result<T> = std::result::Result<T, std::io::Error>;
async fn create_store() -> Result<Store<Data>> { async fn create_store() -> Result<Store<Data>> {
let mut store: Store<Data> = Store::new(TABLE_PATH, 5, 0).await.map_err(|e| e.to_io_or_panic())?; let mut store: Store<Data> = Store::new(TABLE_PATH, 5, 0).await.map_err(|e| e.to_io_or_panic())?;
println!("CREATED"); println!("CREATED");
println!("{:?}", store.read_all_bytes().await?); println!("THE STORE: {:?}", store);
println!("THE BYTES: {:?}", store.read_all_bytes().await?);
let entry0: Entry<u32> = Entry::new_deleted(vec![1, 2, 3, 4, 5]); let entry0: Entry<u32> = Entry::new_deleted(vec![1, 2, 3, 4, 5]);
append_entry(&mut store, &entry0).await?; append_entry(&mut store, &entry0).await?;
@ -31,10 +32,12 @@ async fn create_store() -> Result<Store<Data>> {
println!("{:?}", store.read_all_bytes().await?); println!("{:?}", store.read_all_bytes().await?);
Ok(store) Ok(store)
} }
async fn connect_store() -> Result<Store<Data>> { async fn connect_store() -> Result<Store<Data>> {
let mut store: Store<Data> = Store::connect(TABLE_PATH).await.map_err(|e| e.to_io_or_panic())?; let mut store: Store<Data> = Store::connect(TABLE_PATH).await.map_err(|e| e.to_io_or_panic())?;
println!("CONNECTED"); println!("CONNECTED");
println!("{:?}", store.read_all_bytes().await?); println!("THE STORE: {:?}", store);
println!("THE BYTES: {:?}", store.read_all_bytes().await?);
Ok(store) Ok(store)
} }
@ -48,17 +51,17 @@ async fn create_or_connect() -> Result<Store<Data>> {
} }
async fn append_entry(store: &mut Store<Data>, entry: &Entry<Data>) -> Result<Cursor>{ async fn append_entry(store: &mut Store<Data>, entry: &Entry<Data>) -> Result<FilePosition>{
println!("APPENDING"); println!("APPENDING");
println!("entry == {:?}", entry); println!("entry == {:?}", entry);
let cursor: Cursor = store.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?; let file_position: FilePosition = store.append_entry(&entry).await.map_err(|e| e.to_io_or_panic())?;
println!("cursor == {:?}", cursor); println!("file_position == {:?}", file_position);
Ok(cursor) Ok(file_position)
} }
async fn read_entry(store: &mut Store<Data>, cursor: Cursor) -> Result<EntryDetailed<Data>>{ async fn read_entry(store: &mut Store<Data>, file_position: FilePosition) -> Result<Option<EntryDetailed<Data>>>{
println!("READING ENTRY at cursor={}", cursor); println!("READING ENTRY at file_position={}", file_position);
let entry = store.read_entry_at(cursor).await.map_err(|e| e.to_io_or_panic())?; let entry = store.read_entry_at(file_position).await.map_err(|e| e.to_io_or_panic())?;
println!("ENTRY: {:?}", entry); println!("ENTRY: {:?}", entry);
Ok(entry) Ok(entry)
} }
@ -75,10 +78,10 @@ async fn main() -> Result<()> {
// println!("{:?}", store); // println!("{:?}", store);
// println!("{:?}", store.read_all_bytes().await?); // println!("{:?}", store.read_all_bytes().await?);
// let entry0: Entry<u32> = Entry::new(vec![99, 98, 97, 96, 95]); let entry0: Entry<u32> = Entry::new(vec![99, 98, 97, 96, 95]);
// append_entry(&mut store, &entry0).await?; append_entry(&mut store, &entry0).await?;
store.read_entries(2).await.map_err(|e| e.to_io_or_panic())?; store.read_entries().await.map_err(|e| e.to_io_or_panic())?;
// let entry2: StoreEntry<u32> = StoreEntry::new_deleted(vec![3, 2, 1]); // let entry2: StoreEntry<u32> = StoreEntry::new_deleted(vec![3, 2, 1]);

View file

@ -15,7 +15,7 @@ use std::mem::size_of;
type Result<T> = std::result::Result<T, Error>; type Result<T> = std::result::Result<T, Error>;
pub type Column = u64; pub type Column = u64;
pub type Cursor = u64; pub type FilePosition = u64;
// TODO: Consider introducing a phantom type for the data that's used in the store. // TODO: Consider introducing a phantom type for the data that's used in the store.
#[derive(Debug)] #[derive(Debug)]
@ -24,6 +24,8 @@ pub struct Store<T> {
file: File, file: File,
header: StoreHeader, header: StoreHeader,
data_type: PhantomData<T>, data_type: PhantomData<T>,
eof_file_position: FilePosition,
// meta // meta
// location of rows file // location of rows file
// locations of index files // locations of index files
@ -32,6 +34,10 @@ pub struct Store<T> {
// list // list
} }
// TODO: Basically a pointer to Store + its own file position
// pub struct Cursor<'a, T> {
// }
#[derive(Debug)] #[derive(Debug)]
pub struct StoreHeader { pub struct StoreHeader {
number_of_columns: usize, number_of_columns: usize,
@ -94,33 +100,41 @@ pub struct EntryDetailed<T> {
data: Vec<T>, data: Vec<T>,
} }
pub struct EntryIterator<'a> {
file: &'a mut File,
current_file_position: FilePosition
}
//===Store=== //===Store===
pub async fn store_exists(table_folder: &str) -> Result<bool> { pub async fn store_exists(table_folder: &str) -> Result<bool> {
Ok(fs::metadata(table_folder).await.is_ok()) Ok(fs::metadata(table_folder).await.is_ok())
} }
impl <T>Store<T> { impl <T>Store<T> {
const ROWS_FILE_NAME: &'static str = "rows";
//===primitive file operations=== //===primitive file operations===
// Moves the cursor right. // Moves the file cursor right.
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> { async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
Ok(self.file.write(bytes).await?) Ok(self.file.write(bytes).await?)
} }
// Moves the cursor right. // Moves the file cursor right.
async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> { async fn read_bytes(&mut self, bytes: &mut [u8]) -> Result<()> {
self.file.read_exact(bytes).await?; self.file.read_exact(bytes).await?;
Ok(()) Ok(())
} }
// Moves the cursor right. // Moves the file cursor right.
async fn get_bytes(&mut self, count: usize) -> Result<Vec<u8>> { async fn get_bytes(&mut self, count: usize) -> Result<Vec<u8>> {
let mut result: Vec<u8> = Vec::with_capacity(count); let mut result: Vec<u8> = Vec::with_capacity(count);
self.read_bytes(&mut result).await?; self.read_bytes(&mut result).await?;
Ok(result) Ok(result)
} }
async fn seek_to(&mut self, cursor: Cursor) -> Result<()>{ async fn seek_to(&mut self, file_position: FilePosition) -> Result<()>{
self.file.seek(SeekFrom::Start(cursor)).await?; self.file.seek(SeekFrom::Start(file_position)).await?;
Ok(()) Ok(())
} }
@ -138,13 +152,13 @@ impl <T>Store<T> {
self.seek_to(StoreHeader::SIZE as u64).await self.seek_to(StoreHeader::SIZE as u64).await
} }
async fn current_cursor(&mut self) -> Result<Cursor> { async fn current_file_position(&mut self) -> Result<FilePosition> {
let next_cursor: Cursor = self.file.stream_position().await?; let next_file_position: FilePosition = self.file.stream_position().await?;
Ok(next_cursor) Ok(next_file_position)
} }
// For debugging. // For debugging.
// Moves cursor to the end. // Moves file cursor to the end.
pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>{ pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>{
let mut bytes: Vec<u8> = vec![]; let mut bytes: Vec<u8> = vec![];
self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?; self.seek_to_start().await.map_err(|e| e.to_io_or_panic())?;
@ -152,8 +166,6 @@ impl <T>Store<T> {
Ok(bytes) Ok(bytes)
} }
const ROWS_FILE_NAME: &'static str = "rows";
// ===Creation=== // ===Creation===
pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result<Self> { pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result<Self> {
let path_to_table = Path::new(table_folder); let path_to_table = Path::new(table_folder);
@ -177,15 +189,15 @@ impl <T>Store<T> {
}; };
let encoded_header: Vec<u8> = header.encode()?; let encoded_header: Vec<u8> = header.encode()?;
println!("ENCODED_HEADER: {:?}", encoded_header);
let mut store = Self { let mut store = Self {
table_folder: table_folder.to_string(), table_folder: table_folder.to_string(),
file, file,
header, header,
data_type: PhantomData::<T>, data_type: PhantomData::<T>,
eof_file_position: 0,
}; };
store.write_bytes(&encoded_header).await?; store.write_bytes(&encoded_header).await?;
store.eof_file_position = store.current_file_position().await?;
Ok(store) Ok(store)
} }
@ -208,13 +220,15 @@ impl <T>Store<T> {
file.read_exact(&mut header_bytes).await?; file.read_exact(&mut header_bytes).await?;
let header = StoreHeader::decode(&mut header_bytes).await?; let header = StoreHeader::decode(&mut header_bytes).await?;
let eof_file_position = file.seek(SeekFrom::End(0)).await?;
let store = Self { let store = Self {
table_folder: table_folder.to_string(), table_folder: table_folder.to_string(),
file, file,
header, header,
data_type: PhantomData::<T>, data_type: PhantomData::<T>,
eof_file_position
}; };
println!("just connected TOOOOO {:?}", store);
Ok(store) Ok(store)
} }
@ -236,24 +250,27 @@ impl <T>Store<T> {
} }
// Moves cursor to the end. // Moves cursor to the end.
pub async fn append_entry(&mut self, entry: &Entry<T>) -> Result<Cursor> pub async fn append_entry(&mut self, entry: &Entry<T>) -> Result<FilePosition>
where T: Encode where T: Encode
{ {
self.increment_total_count().await?; self.increment_total_count().await?;
let encoded_entry: Vec<u8> = entry.encode()?; let encoded_entry: Vec<u8> = entry.encode()?;
self.seek_to_end().await?; self.seek_to_end().await?;
let cursor: Cursor = self.current_cursor().await?; let file_position: FilePosition = self.current_file_position().await?;
self.write_bytes(&encoded_entry).await?; self.write_bytes(&encoded_entry).await?;
Ok(cursor) let eof_file_position: FilePosition = self.current_file_position().await?;
self.eof_file_position = eof_file_position;
Ok(file_position)
} }
// ===Deletion=== // ===Deletion===
pub async fn mark_deleted_at(&mut self, cursor: Cursor) -> Result<()> { pub async fn mark_deleted_at(&mut self, file_position: FilePosition) -> Result<()> {
self.increment_deleted_count().await?; self.increment_deleted_count().await?;
self.seek_to(cursor).await?; self.seek_to(file_position).await?;
// TODO: Now you need to mutate the entry itself // TODO: Now you need to mutate the entry itself
todo!() todo!()
@ -263,9 +280,8 @@ impl <T>Store<T> {
// ===Lookup=== // ===Lookup===
// WARNING: The cursor has to be at the start of an entry. Otherwise garbage data will be // WARNING: The cursor has to be at the start of an entry. Otherwise garbage data will be
// decoded as an entry. // decoded as an entry.
pub async fn read_entry_header_at(&mut self, cursor: Cursor) -> Result<EntryHeaderWithDataSize> { pub async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result<EntryHeaderWithDataSize> {
self.seek_to(cursor).await?; self.seek_to(file_position).await?;
self.file.seek(SeekFrom::Start(cursor)).await?;
let number_of_columns: usize = self.header.number_of_columns; let number_of_columns: usize = self.header.number_of_columns;
let mut header_bytes: Vec<u8> = vec![0; EntryHeaderWithDataSize::size(number_of_columns)]; let mut header_bytes: Vec<u8> = vec![0; EntryHeaderWithDataSize::size(number_of_columns)];
@ -278,10 +294,15 @@ impl <T>Store<T> {
Ok(header) Ok(header)
} }
pub async fn read_entry_at(&mut self, cursor: Cursor) -> Result<EntryDetailed<T>> // Returns None when file_positoin == eof_file_position
pub async fn read_entry_at(&mut self, file_position: FilePosition) -> Result<Option<EntryDetailed<T>>>
where T: Decode where T: Decode
{ {
let header = self.read_entry_header_at(cursor).await?; if file_position == self.eof_file_position {
return Ok(None)
}
let header = self.read_entry_header_at(file_position).await?;
let mut data_bytes: Vec<u8> = vec![0; header.size_of_data()]; let mut data_bytes: Vec<u8> = vec![0; header.size_of_data()];
// TODO: Get rid of the println's // TODO: Get rid of the println's
@ -291,20 +312,31 @@ impl <T>Store<T> {
let entry: EntryDetailed<T> = let entry: EntryDetailed<T> =
EntryDetailed::decode(header, self.header.number_of_columns, &mut data_bytes)?; EntryDetailed::decode(header, self.header.number_of_columns, &mut data_bytes)?;
Ok(entry) Ok(Some(entry))
} }
pub async fn read_entries(&mut self, n: usize) -> Result<()> // TODO: This needs to be some sort of an iterator
// pub async fn entries() -> EntryIterator<T> {
// todo!()
// }
pub async fn read_entries(&mut self) -> Result<()>
where T: Decode + std::fmt::Debug where T: Decode + std::fmt::Debug
{ {
self.seek_to_start_of_data().await?; self.seek_to_start_of_data().await?;
let mut cursor: Cursor = self.current_cursor().await?; let mut file_position: FilePosition = self.current_file_position().await?;
for i in 0..n { loop {
let entry = self.read_entry_at(cursor).await?; match self.read_entry_at(file_position).await? {
println!("({}, {:?})", i, entry); Some(entry) => {
cursor = self.current_cursor().await?; println!("{:?}", entry);
file_position = self.current_file_position().await?;
},
None => {
println!("END of entries.");
return Ok(())
}
}
} }
Ok(())
} }
pub async fn search_for_entry_with_id(&mut self, id: T) -> Result<Option<EntryDetailed<T>>> { pub async fn search_for_entry_with_id(&mut self, id: T) -> Result<Option<EntryDetailed<T>>> {
@ -313,7 +345,7 @@ impl <T>Store<T> {
} }
// TODO: This needs to be some sort of an iterator // TODO: This needs to be some sort of an iterator
pub async fn get_eq(&self, column: Column, value: T) -> Result<Option<EntryDetailed<T>>> { pub async fn get_all_eq(&self, column: Column, value: T) -> Result<Option<EntryDetailed<T>>> {
todo!() todo!()
} }