From 6db62c42d76b497146f678f677492b8de864f86f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jind=C5=99ich=20Moravec?= Date: Sun, 4 Feb 2024 20:20:20 +0100 Subject: [PATCH] feat: index append encoding --- storage_engine/src/index.rs | 153 ++++++++++++++++++------------------ 1 file changed, 75 insertions(+), 78 deletions(-) diff --git a/storage_engine/src/index.rs b/storage_engine/src/index.rs index 0cc01de..3cb84d2 100644 --- a/storage_engine/src/index.rs +++ b/storage_engine/src/index.rs @@ -1,33 +1,22 @@ -use std::marker::PhantomData; use std::path::PathBuf; use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter}; -use async_trait::async_trait; use std::collections::{BTreeMap, HashSet}; use std::hash::Hash; -use crate::binary_coding::{decode, decode_sequence, encode, encode_sequence}; +use crate::binary_coding::{decode, encode}; use bincode; use bincode::{Decode, Encode}; -use tokio::fs; use crate::error::{DecodeErrorKind, Error}; -use bincode::error::DecodeError; -use std::mem::size_of; - type Result = std::result::Result; -// Implements a persistant self-balancing Binary Search Tree. Nope. -// We need fixed-size nodes. But we want to index Strings which are variable length. - #[derive(Debug)] pub struct Index { file: File, data: BTreeMap>, - key_type: PhantomData, - value_type: PhantomData, } #[derive(Debug)] @@ -48,61 +37,102 @@ where let data = BTreeMap::new(); - Ok(Index { - file, - data, - key_type: PhantomData::, - value_type: PhantomData::, - }) + Ok(Index { file, data }) } pub async fn connect(file_name: PathBuf) -> Result> { - let mut file: File = OpenOptions::new() + let file: File = OpenOptions::new() .read(true) .write(true) - .create(true) .open(file_name) .await?; - let mut bytes = vec![]; - file.read_to_end(&mut bytes).await?; - - let data = Index::decode_tree(&bytes) - .map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?; - - Ok(Index { + let mut index = Index { file, - data, - key_type: PhantomData::, - value_type: PhantomData::, - }) + data: BTreeMap::new(), + }; + + index.load_from_file().await?; + Ok(index) } pub async fn insert(&mut self, k: K, v: V) -> Result<()> { + self.append_to_file(&k, &v).await?; self.data.entry(k).or_insert_with(HashSet::new).insert(v); Ok(()) } + pub fn insert_desynced(&mut self, k: K, v: V) -> () { + self.data.entry(k).or_insert_with(HashSet::new).insert(v); + } + pub async fn lookup(&self, k: &K) -> Result>> { - let hashset = self.data.get(k).unwrap(); - Ok(Some(hashset.clone())) + let hashset = self.data.get(k).cloned(); + Ok(hashset) } - pub async fn delete(&mut self, k: K, v: V) -> Result> { - Ok(Some( - self.data.entry(k).or_insert_with(HashSet::new).remove(&v), - )) + pub async fn delete(&mut self, k: K, v: V) -> Result<()> { + self.data.entry(k).and_modify(|values| { + values.remove(&v); + }); + self.dump_to_file().await } - fn encode(&self) -> Result> { + pub async fn sync_to_disk(&mut self) -> Result<()> { + self.dump_to_file().await + } + + async fn append_to_file(&mut self, key: &K, value: &V) -> Result<()> { let mut encoded = Vec::new(); - encoded.extend(encode(&self.data)?); - Ok(encoded) + encoded.extend(encode(key)?); + encoded.extend(encode(value)?); + + self.file.seek(std::io::SeekFrom::End(0)).await?; + self.file.write(&encoded).await?; + + Ok(()) } - fn decode_tree(data: &[u8]) -> std::result::Result>, DecodeError> { - let data: BTreeMap> = decode(data)?.0; - Ok(data) + async fn dump_to_file(&mut self) -> Result<()> { + let mut writer = BufWriter::new(&mut self.file); + writer.seek(std::io::SeekFrom::Start(0)).await?; + + let mut written: u64 = 0; + let mut encoded = Vec::new(); + for (key, value) in &self.data { + for v in value { + encoded.clear(); + encoded.extend(encode(key)?); + encoded.extend(encode(v)?); + writer.write(&encoded).await?; + written += encoded.len() as u64; + } + } + + writer.flush().await?; + self.file.set_len(written).await?; + Ok(()) + } + + async fn load_from_file(&mut self) -> Result<()> { + let mut bytes = vec![]; + + self.file.seek(std::io::SeekFrom::Start(0)).await?; + self.file.read_to_end(&mut bytes).await?; + + let mut cursor = 0; + while cursor < bytes.len() { + let (key, len) = decode(&bytes[cursor..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?; + cursor += len; + let (value, len) = decode(&bytes[cursor..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?; + cursor += len; + + self.insert_desynced(key, value); + } + + Ok(()) } } @@ -112,39 +142,6 @@ mod tests { #[tokio::test] async fn encode_decode() { - let mut index: Index = Index { - file: File::from_std(std::fs::File::create("test").unwrap()), - data: BTreeMap::new(), - key_type: PhantomData::, - value_type: PhantomData::, - }; - - index.insert("foo".to_string(), 123).await.unwrap(); - index.insert("foo".to_string(), 124).await.unwrap(); - index.insert("bar".to_string(), 125).await.unwrap(); - index.insert("bar".to_string(), 126).await.unwrap(); - - let lookup = index.lookup(&"foo".to_string()).await.unwrap().unwrap(); - assert_eq!(lookup.len(), 2); - assert!(lookup.contains(&123)); - assert!(lookup.contains(&124)); - println!("lookup {:?}", lookup); - - let encoded = index.encode().unwrap(); - let decoded = Index::::decode_tree(&encoded).unwrap(); - let decoded = Index { - file: File::from_std(std::fs::File::create("test").unwrap()), - data: decoded, - key_type: PhantomData::, - value_type: PhantomData::, - }; - - let lookup = decoded.lookup(&"foo".to_string()).await.unwrap().unwrap(); - assert_eq!(lookup.len(), 2); - assert!(lookup.contains(&123)); - assert!(lookup.contains(&124)); - println!("lookup {:?}", lookup); - - std::fs::remove_file("test").unwrap(); + todo!(); } }