From 6db62c42d76b497146f678f677492b8de864f86f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jind=C5=99ich=20Moravec?= Date: Sun, 4 Feb 2024 20:20:20 +0100 Subject: [PATCH 1/3] feat: index append encoding --- storage_engine/src/index.rs | 153 ++++++++++++++++++------------------ 1 file changed, 75 insertions(+), 78 deletions(-) diff --git a/storage_engine/src/index.rs b/storage_engine/src/index.rs index 0cc01de..3cb84d2 100644 --- a/storage_engine/src/index.rs +++ b/storage_engine/src/index.rs @@ -1,33 +1,22 @@ -use std::marker::PhantomData; use std::path::PathBuf; use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter}; -use async_trait::async_trait; use std::collections::{BTreeMap, HashSet}; use std::hash::Hash; -use crate::binary_coding::{decode, decode_sequence, encode, encode_sequence}; +use crate::binary_coding::{decode, encode}; use bincode; use bincode::{Decode, Encode}; -use tokio::fs; use crate::error::{DecodeErrorKind, Error}; -use bincode::error::DecodeError; -use std::mem::size_of; - type Result = std::result::Result; -// Implements a persistant self-balancing Binary Search Tree. Nope. -// We need fixed-size nodes. But we want to index Strings which are variable length. - #[derive(Debug)] pub struct Index { file: File, data: BTreeMap>, - key_type: PhantomData, - value_type: PhantomData, } #[derive(Debug)] @@ -48,61 +37,102 @@ where let data = BTreeMap::new(); - Ok(Index { - file, - data, - key_type: PhantomData::, - value_type: PhantomData::, - }) + Ok(Index { file, data }) } pub async fn connect(file_name: PathBuf) -> Result> { - let mut file: File = OpenOptions::new() + let file: File = OpenOptions::new() .read(true) .write(true) - .create(true) .open(file_name) .await?; - let mut bytes = vec![]; - file.read_to_end(&mut bytes).await?; - - let data = Index::decode_tree(&bytes) - .map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?; - - Ok(Index { + let mut index = Index { file, - data, - key_type: PhantomData::, - value_type: PhantomData::, - }) + data: BTreeMap::new(), + }; + + index.load_from_file().await?; + Ok(index) } pub async fn insert(&mut self, k: K, v: V) -> Result<()> { + self.append_to_file(&k, &v).await?; self.data.entry(k).or_insert_with(HashSet::new).insert(v); Ok(()) } + pub fn insert_desynced(&mut self, k: K, v: V) -> () { + self.data.entry(k).or_insert_with(HashSet::new).insert(v); + } + pub async fn lookup(&self, k: &K) -> Result>> { - let hashset = self.data.get(k).unwrap(); - Ok(Some(hashset.clone())) + let hashset = self.data.get(k).cloned(); + Ok(hashset) } - pub async fn delete(&mut self, k: K, v: V) -> Result> { - Ok(Some( - self.data.entry(k).or_insert_with(HashSet::new).remove(&v), - )) + pub async fn delete(&mut self, k: K, v: V) -> Result<()> { + self.data.entry(k).and_modify(|values| { + values.remove(&v); + }); + self.dump_to_file().await } - fn encode(&self) -> Result> { + pub async fn sync_to_disk(&mut self) -> Result<()> { + self.dump_to_file().await + } + + async fn append_to_file(&mut self, key: &K, value: &V) -> Result<()> { let mut encoded = Vec::new(); - encoded.extend(encode(&self.data)?); - Ok(encoded) + encoded.extend(encode(key)?); + encoded.extend(encode(value)?); + + self.file.seek(std::io::SeekFrom::End(0)).await?; + self.file.write(&encoded).await?; + + Ok(()) } - fn decode_tree(data: &[u8]) -> std::result::Result>, DecodeError> { - let data: BTreeMap> = decode(data)?.0; - Ok(data) + async fn dump_to_file(&mut self) -> Result<()> { + let mut writer = BufWriter::new(&mut self.file); + writer.seek(std::io::SeekFrom::Start(0)).await?; + + let mut written: u64 = 0; + let mut encoded = Vec::new(); + for (key, value) in &self.data { + for v in value { + encoded.clear(); + encoded.extend(encode(key)?); + encoded.extend(encode(v)?); + writer.write(&encoded).await?; + written += encoded.len() as u64; + } + } + + writer.flush().await?; + self.file.set_len(written).await?; + Ok(()) + } + + async fn load_from_file(&mut self) -> Result<()> { + let mut bytes = vec![]; + + self.file.seek(std::io::SeekFrom::Start(0)).await?; + self.file.read_to_end(&mut bytes).await?; + + let mut cursor = 0; + while cursor < bytes.len() { + let (key, len) = decode(&bytes[cursor..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?; + cursor += len; + let (value, len) = decode(&bytes[cursor..]) + .map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?; + cursor += len; + + self.insert_desynced(key, value); + } + + Ok(()) } } @@ -112,39 +142,6 @@ mod tests { #[tokio::test] async fn encode_decode() { - let mut index: Index = Index { - file: File::from_std(std::fs::File::create("test").unwrap()), - data: BTreeMap::new(), - key_type: PhantomData::, - value_type: PhantomData::, - }; - - index.insert("foo".to_string(), 123).await.unwrap(); - index.insert("foo".to_string(), 124).await.unwrap(); - index.insert("bar".to_string(), 125).await.unwrap(); - index.insert("bar".to_string(), 126).await.unwrap(); - - let lookup = index.lookup(&"foo".to_string()).await.unwrap().unwrap(); - assert_eq!(lookup.len(), 2); - assert!(lookup.contains(&123)); - assert!(lookup.contains(&124)); - println!("lookup {:?}", lookup); - - let encoded = index.encode().unwrap(); - let decoded = Index::::decode_tree(&encoded).unwrap(); - let decoded = Index { - file: File::from_std(std::fs::File::create("test").unwrap()), - data: decoded, - key_type: PhantomData::, - value_type: PhantomData::, - }; - - let lookup = decoded.lookup(&"foo".to_string()).await.unwrap().unwrap(); - assert_eq!(lookup.len(), 2); - assert!(lookup.contains(&123)); - assert!(lookup.contains(&124)); - println!("lookup {:?}", lookup); - - std::fs::remove_file("test").unwrap(); + todo!(); } } From a5c7306b90f3055861c0728c385d3292a4268d22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jind=C5=99ich=20Moravec?= Date: Sun, 4 Feb 2024 20:48:10 +0100 Subject: [PATCH 2/3] tests: index testing --- storage_engine/src/index.rs | 172 ++++++++++++++++++++++++++++++++---- 1 file changed, 154 insertions(+), 18 deletions(-) diff --git a/storage_engine/src/index.rs b/storage_engine/src/index.rs index 3cb84d2..5d2187a 100644 --- a/storage_engine/src/index.rs +++ b/storage_engine/src/index.rs @@ -75,25 +75,10 @@ where self.data.entry(k).and_modify(|values| { values.remove(&v); }); - self.dump_to_file().await + self.sync_to_disk().await } pub async fn sync_to_disk(&mut self) -> Result<()> { - self.dump_to_file().await - } - - async fn append_to_file(&mut self, key: &K, value: &V) -> Result<()> { - let mut encoded = Vec::new(); - encoded.extend(encode(key)?); - encoded.extend(encode(value)?); - - self.file.seek(std::io::SeekFrom::End(0)).await?; - self.file.write(&encoded).await?; - - Ok(()) - } - - async fn dump_to_file(&mut self) -> Result<()> { let mut writer = BufWriter::new(&mut self.file); writer.seek(std::io::SeekFrom::Start(0)).await?; @@ -114,6 +99,17 @@ where Ok(()) } + async fn append_to_file(&mut self, key: &K, value: &V) -> Result<()> { + let mut encoded = Vec::new(); + encoded.extend(encode(key)?); + encoded.extend(encode(value)?); + + self.file.seek(std::io::SeekFrom::End(0)).await?; + self.file.write(&encoded).await?; + + Ok(()) + } + async fn load_from_file(&mut self) -> Result<()> { let mut bytes = vec![]; @@ -141,7 +137,147 @@ mod tests { use super::*; #[tokio::test] - async fn encode_decode() { - todo!(); + async fn connect_to_new() { + let file_name = PathBuf::from("connect_to_new"); + if file_name.exists() { + tokio::fs::remove_file(&file_name).await.unwrap(); + } + + { + let index = Index::::new(file_name.clone()).await.unwrap(); + assert_eq!(index.data.len(), 0); + } + + { + let index = Index::::connect(file_name.clone()).await.unwrap(); + assert_eq!(index.data.len(), 0); + } + + tokio::fs::remove_file(&file_name).await.unwrap(); + } + + #[tokio::test] + async fn inserting() { + let file_name = PathBuf::from("inserting"); + if file_name.exists() { + tokio::fs::remove_file(&file_name).await.unwrap(); + } + + { + let mut index = Index::::new(file_name.clone()).await.unwrap(); + index.insert(1, 2).await.unwrap(); + index.insert(1, 3).await.unwrap(); + index.insert(1, 4).await.unwrap(); + index.insert(2, 3).await.unwrap(); + index.insert(2, 4).await.unwrap(); + index.insert(2, 5).await.unwrap(); + + assert_eq!(index.data.len(), 2); + assert_eq!(index.data.get(&1).unwrap().len(), 3); + assert_eq!(index.data.get(&2).unwrap().len(), 3); + } + + { + let index = Index::::connect(file_name.clone()).await.unwrap(); + assert_eq!(index.data.len(), 2); + assert_eq!(index.data.get(&1).unwrap().len(), 3); + assert_eq!(index.data.get(&2).unwrap().len(), 3); + } + + tokio::fs::remove_file(&file_name).await.unwrap(); + } + + #[tokio::test] + async fn lookuping() { + let file_name = PathBuf::from("lookuping"); + if file_name.exists() { + tokio::fs::remove_file(&file_name).await.unwrap(); + } + + { + let mut index = Index::::new(file_name.clone()).await.unwrap(); + index.insert(1, 2).await.unwrap(); + index.insert(1, 3).await.unwrap(); + index.insert(1, 4).await.unwrap(); + index.insert(2, 3).await.unwrap(); + index.insert(2, 4).await.unwrap(); + index.insert(2, 5).await.unwrap(); + + assert_eq!(index.lookup(&1).await.unwrap().unwrap().len(), 3); + assert_eq!(index.lookup(&2).await.unwrap().unwrap().len(), 3); + assert_eq!(index.lookup(&3).await.unwrap(), None); + + let first = index.lookup(&1).await.unwrap().unwrap(); + assert!(first.contains(&2)); + assert!(first.contains(&3)); + assert!(first.contains(&4)); + + let second = index.lookup(&2).await.unwrap().unwrap(); + assert!(second.contains(&3)); + assert!(second.contains(&4)); + assert!(second.contains(&5)); + } + + { + let index = Index::::connect(file_name.clone()).await.unwrap(); + assert_eq!(index.lookup(&1).await.unwrap().unwrap().len(), 3); + assert_eq!(index.lookup(&2).await.unwrap().unwrap().len(), 3); + assert_eq!(index.lookup(&3).await.unwrap(), None); + + let first = index.lookup(&1).await.unwrap().unwrap(); + assert!(first.contains(&2)); + assert!(first.contains(&3)); + assert!(first.contains(&4)); + + let second = index.lookup(&2).await.unwrap().unwrap(); + assert!(second.contains(&3)); + assert!(second.contains(&4)); + assert!(second.contains(&5)); + } + + tokio::fs::remove_file(&file_name).await.unwrap(); + } + + #[tokio::test] + async fn deleting() { + let file_name = PathBuf::from("deleting"); + if file_name.exists() { + tokio::fs::remove_file(&file_name).await.unwrap(); + } + + { + let mut index = Index::::new(file_name.clone()).await.unwrap(); + index.insert(1, 2).await.unwrap(); + index.insert(1, 3).await.unwrap(); + index.insert(1, 4).await.unwrap(); + index.insert(2, 3).await.unwrap(); + index.insert(2, 4).await.unwrap(); + index.insert(2, 5).await.unwrap(); + + assert!(index.lookup(&1).await.unwrap().unwrap().contains(&2)); + index.delete(1, 2).await.unwrap(); + assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&2)); + + assert!(index.lookup(&2).await.unwrap().unwrap().contains(&3)); + index.delete(2, 3).await.unwrap(); + assert!(!index.lookup(&2).await.unwrap().unwrap().contains(&3)); + } + + { + let mut index = Index::::connect(file_name.clone()).await.unwrap(); + + assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&2)); + assert!(!index.lookup(&2).await.unwrap().unwrap().contains(&3)); + + assert!(index.lookup(&1).await.unwrap().unwrap().contains(&3)); + index.delete(1, 3).await.unwrap(); + assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&3)); + + assert!(index.lookup(&1).await.unwrap().unwrap().contains(&4)); + assert!(index.lookup(&2).await.unwrap().unwrap().contains(&4)); + assert!(index.lookup(&2).await.unwrap().unwrap().contains(&5)); + } + + tokio::fs::remove_file(&file_name).await.unwrap(); } } From 28f182c4b197a9bfc6e24e17ca8de2eb9d9b645f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jind=C5=99ich=20Moravec?= Date: Sun, 4 Feb 2024 20:52:17 +0100 Subject: [PATCH 3/3] refactor: remove long uses --- storage_engine/src/index.rs | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/storage_engine/src/index.rs b/storage_engine/src/index.rs index 5d2187a..b5c42c1 100644 --- a/storage_engine/src/index.rs +++ b/storage_engine/src/index.rs @@ -4,6 +4,7 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter}; use std::collections::{BTreeMap, HashSet}; use std::hash::Hash; +use std::io::SeekFrom; use crate::binary_coding::{decode, encode}; use bincode; @@ -80,7 +81,7 @@ where pub async fn sync_to_disk(&mut self) -> Result<()> { let mut writer = BufWriter::new(&mut self.file); - writer.seek(std::io::SeekFrom::Start(0)).await?; + writer.seek(SeekFrom::Start(0)).await?; let mut written: u64 = 0; let mut encoded = Vec::new(); @@ -104,7 +105,7 @@ where encoded.extend(encode(key)?); encoded.extend(encode(value)?); - self.file.seek(std::io::SeekFrom::End(0)).await?; + self.file.seek(SeekFrom::End(0)).await?; self.file.write(&encoded).await?; Ok(()) @@ -113,7 +114,7 @@ where async fn load_from_file(&mut self) -> Result<()> { let mut bytes = vec![]; - self.file.seek(std::io::SeekFrom::Start(0)).await?; + self.file.seek(SeekFrom::Start(0)).await?; self.file.read_to_end(&mut bytes).await?; let mut cursor = 0; @@ -135,12 +136,13 @@ where #[cfg(test)] mod tests { use super::*; + use tokio::fs::remove_file; #[tokio::test] async fn connect_to_new() { let file_name = PathBuf::from("connect_to_new"); if file_name.exists() { - tokio::fs::remove_file(&file_name).await.unwrap(); + remove_file(&file_name).await.unwrap(); } { @@ -153,14 +155,14 @@ mod tests { assert_eq!(index.data.len(), 0); } - tokio::fs::remove_file(&file_name).await.unwrap(); + remove_file(&file_name).await.unwrap(); } #[tokio::test] async fn inserting() { let file_name = PathBuf::from("inserting"); if file_name.exists() { - tokio::fs::remove_file(&file_name).await.unwrap(); + remove_file(&file_name).await.unwrap(); } { @@ -184,14 +186,14 @@ mod tests { assert_eq!(index.data.get(&2).unwrap().len(), 3); } - tokio::fs::remove_file(&file_name).await.unwrap(); + remove_file(&file_name).await.unwrap(); } #[tokio::test] async fn lookuping() { let file_name = PathBuf::from("lookuping"); if file_name.exists() { - tokio::fs::remove_file(&file_name).await.unwrap(); + remove_file(&file_name).await.unwrap(); } { @@ -235,14 +237,14 @@ mod tests { assert!(second.contains(&5)); } - tokio::fs::remove_file(&file_name).await.unwrap(); + remove_file(&file_name).await.unwrap(); } #[tokio::test] async fn deleting() { let file_name = PathBuf::from("deleting"); if file_name.exists() { - tokio::fs::remove_file(&file_name).await.unwrap(); + remove_file(&file_name).await.unwrap(); } { @@ -278,6 +280,6 @@ mod tests { assert!(index.lookup(&2).await.unwrap().unwrap().contains(&5)); } - tokio::fs::remove_file(&file_name).await.unwrap(); + remove_file(&file_name).await.unwrap(); } }