Merge branch 'index-persistency' into 'redesign-tables'

Index persistency

See merge request x433485/minisql!31
This commit is contained in:
Jindřich Moravec 2024-02-04 21:07:25 +01:00
commit 59e2759afe

View file

@ -1,33 +1,23 @@
use std::marker::PhantomData;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::fs::{File, OpenOptions}; use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter};
use async_trait::async_trait;
use std::collections::{BTreeMap, HashSet}; use std::collections::{BTreeMap, HashSet};
use std::hash::Hash; use std::hash::Hash;
use std::io::SeekFrom;
use crate::binary_coding::{decode, decode_sequence, encode, encode_sequence}; use crate::binary_coding::{decode, encode};
use bincode; use bincode;
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
use tokio::fs;
use crate::error::{DecodeErrorKind, Error}; use crate::error::{DecodeErrorKind, Error};
use bincode::error::DecodeError;
use std::mem::size_of;
type Result<T> = std::result::Result<T, Error>; type Result<T> = std::result::Result<T, Error>;
// Implements a persistant self-balancing Binary Search Tree. Nope.
// We need fixed-size nodes. But we want to index Strings which are variable length.
#[derive(Debug)] #[derive(Debug)]
pub struct Index<K, V> { pub struct Index<K, V> {
file: File, file: File,
data: BTreeMap<K, HashSet<V>>, data: BTreeMap<K, HashSet<V>>,
key_type: PhantomData<K>,
value_type: PhantomData<V>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -48,103 +38,248 @@ where
let data = BTreeMap::new(); let data = BTreeMap::new();
Ok(Index { Ok(Index { file, data })
file,
data,
key_type: PhantomData::<K>,
value_type: PhantomData::<V>,
})
} }
pub async fn connect(file_name: PathBuf) -> Result<Index<K, V>> { pub async fn connect(file_name: PathBuf) -> Result<Index<K, V>> {
let mut file: File = OpenOptions::new() let file: File = OpenOptions::new()
.read(true) .read(true)
.write(true) .write(true)
.create(true)
.open(file_name) .open(file_name)
.await?; .await?;
let mut bytes = vec![]; let mut index = Index {
file.read_to_end(&mut bytes).await?;
let data = Index::decode_tree(&bytes)
.map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?;
Ok(Index {
file, file,
data, data: BTreeMap::new(),
key_type: PhantomData::<K>, };
value_type: PhantomData::<V>,
}) index.load_from_file().await?;
Ok(index)
} }
pub async fn insert(&mut self, k: K, v: V) -> Result<()> { pub async fn insert(&mut self, k: K, v: V) -> Result<()> {
self.append_to_file(&k, &v).await?;
self.data.entry(k).or_insert_with(HashSet::new).insert(v); self.data.entry(k).or_insert_with(HashSet::new).insert(v);
Ok(()) Ok(())
} }
pub fn insert_desynced(&mut self, k: K, v: V) -> () {
self.data.entry(k).or_insert_with(HashSet::new).insert(v);
}
pub async fn lookup(&self, k: &K) -> Result<Option<HashSet<V>>> { pub async fn lookup(&self, k: &K) -> Result<Option<HashSet<V>>> {
let hashset = self.data.get(k).unwrap(); let hashset = self.data.get(k).cloned();
Ok(Some(hashset.clone())) Ok(hashset)
} }
pub async fn delete(&mut self, k: K, v: V) -> Result<Option<bool>> { pub async fn delete(&mut self, k: K, v: V) -> Result<()> {
Ok(Some( self.data.entry(k).and_modify(|values| {
self.data.entry(k).or_insert_with(HashSet::new).remove(&v), values.remove(&v);
)) });
self.sync_to_disk().await
} }
fn encode(&self) -> Result<Vec<u8>> { pub async fn sync_to_disk(&mut self) -> Result<()> {
let mut writer = BufWriter::new(&mut self.file);
writer.seek(SeekFrom::Start(0)).await?;
let mut written: u64 = 0;
let mut encoded = Vec::new(); let mut encoded = Vec::new();
encoded.extend(encode(&self.data)?); for (key, value) in &self.data {
Ok(encoded) for v in value {
encoded.clear();
encoded.extend(encode(key)?);
encoded.extend(encode(v)?);
writer.write(&encoded).await?;
written += encoded.len() as u64;
}
}
writer.flush().await?;
self.file.set_len(written).await?;
Ok(())
} }
fn decode_tree(data: &[u8]) -> std::result::Result<BTreeMap<K, HashSet<V>>, DecodeError> { async fn append_to_file(&mut self, key: &K, value: &V) -> Result<()> {
let data: BTreeMap<K, HashSet<V>> = decode(data)?.0; let mut encoded = Vec::new();
Ok(data) encoded.extend(encode(key)?);
encoded.extend(encode(value)?);
self.file.seek(SeekFrom::End(0)).await?;
self.file.write(&encoded).await?;
Ok(())
}
async fn load_from_file(&mut self) -> Result<()> {
let mut bytes = vec![];
self.file.seek(SeekFrom::Start(0)).await?;
self.file.read_to_end(&mut bytes).await?;
let mut cursor = 0;
while cursor < bytes.len() {
let (key, len) = decode(&bytes[cursor..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?;
cursor += len;
let (value, len) = decode(&bytes[cursor..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?;
cursor += len;
self.insert_desynced(key, value);
}
Ok(())
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use tokio::fs::remove_file;
#[tokio::test] #[tokio::test]
async fn encode_decode() { async fn connect_to_new() {
let mut index: Index<String, u32> = Index { let file_name = PathBuf::from("connect_to_new");
file: File::from_std(std::fs::File::create("test").unwrap()), if file_name.exists() {
data: BTreeMap::new(), remove_file(&file_name).await.unwrap();
key_type: PhantomData::<String>, }
value_type: PhantomData::<u32>,
};
index.insert("foo".to_string(), 123).await.unwrap(); {
index.insert("foo".to_string(), 124).await.unwrap(); let index = Index::<u64, u64>::new(file_name.clone()).await.unwrap();
index.insert("bar".to_string(), 125).await.unwrap(); assert_eq!(index.data.len(), 0);
index.insert("bar".to_string(), 126).await.unwrap(); }
let lookup = index.lookup(&"foo".to_string()).await.unwrap().unwrap(); {
assert_eq!(lookup.len(), 2); let index = Index::<u64, u64>::connect(file_name.clone()).await.unwrap();
assert!(lookup.contains(&123)); assert_eq!(index.data.len(), 0);
assert!(lookup.contains(&124)); }
println!("lookup {:?}", lookup);
let encoded = index.encode().unwrap(); remove_file(&file_name).await.unwrap();
let decoded = Index::<String, u32>::decode_tree(&encoded).unwrap(); }
let decoded = Index {
file: File::from_std(std::fs::File::create("test").unwrap()),
data: decoded,
key_type: PhantomData::<String>,
value_type: PhantomData::<u32>,
};
let lookup = decoded.lookup(&"foo".to_string()).await.unwrap().unwrap(); #[tokio::test]
assert_eq!(lookup.len(), 2); async fn inserting() {
assert!(lookup.contains(&123)); let file_name = PathBuf::from("inserting");
assert!(lookup.contains(&124)); if file_name.exists() {
println!("lookup {:?}", lookup); remove_file(&file_name).await.unwrap();
}
std::fs::remove_file("test").unwrap(); {
let mut index = Index::<u64, u64>::new(file_name.clone()).await.unwrap();
index.insert(1, 2).await.unwrap();
index.insert(1, 3).await.unwrap();
index.insert(1, 4).await.unwrap();
index.insert(2, 3).await.unwrap();
index.insert(2, 4).await.unwrap();
index.insert(2, 5).await.unwrap();
assert_eq!(index.data.len(), 2);
assert_eq!(index.data.get(&1).unwrap().len(), 3);
assert_eq!(index.data.get(&2).unwrap().len(), 3);
}
{
let index = Index::<u64, u64>::connect(file_name.clone()).await.unwrap();
assert_eq!(index.data.len(), 2);
assert_eq!(index.data.get(&1).unwrap().len(), 3);
assert_eq!(index.data.get(&2).unwrap().len(), 3);
}
remove_file(&file_name).await.unwrap();
}
#[tokio::test]
async fn lookuping() {
let file_name = PathBuf::from("lookuping");
if file_name.exists() {
remove_file(&file_name).await.unwrap();
}
{
let mut index = Index::<u64, u64>::new(file_name.clone()).await.unwrap();
index.insert(1, 2).await.unwrap();
index.insert(1, 3).await.unwrap();
index.insert(1, 4).await.unwrap();
index.insert(2, 3).await.unwrap();
index.insert(2, 4).await.unwrap();
index.insert(2, 5).await.unwrap();
assert_eq!(index.lookup(&1).await.unwrap().unwrap().len(), 3);
assert_eq!(index.lookup(&2).await.unwrap().unwrap().len(), 3);
assert_eq!(index.lookup(&3).await.unwrap(), None);
let first = index.lookup(&1).await.unwrap().unwrap();
assert!(first.contains(&2));
assert!(first.contains(&3));
assert!(first.contains(&4));
let second = index.lookup(&2).await.unwrap().unwrap();
assert!(second.contains(&3));
assert!(second.contains(&4));
assert!(second.contains(&5));
}
{
let index = Index::<u64, u64>::connect(file_name.clone()).await.unwrap();
assert_eq!(index.lookup(&1).await.unwrap().unwrap().len(), 3);
assert_eq!(index.lookup(&2).await.unwrap().unwrap().len(), 3);
assert_eq!(index.lookup(&3).await.unwrap(), None);
let first = index.lookup(&1).await.unwrap().unwrap();
assert!(first.contains(&2));
assert!(first.contains(&3));
assert!(first.contains(&4));
let second = index.lookup(&2).await.unwrap().unwrap();
assert!(second.contains(&3));
assert!(second.contains(&4));
assert!(second.contains(&5));
}
remove_file(&file_name).await.unwrap();
}
#[tokio::test]
async fn deleting() {
let file_name = PathBuf::from("deleting");
if file_name.exists() {
remove_file(&file_name).await.unwrap();
}
{
let mut index = Index::<u64, u64>::new(file_name.clone()).await.unwrap();
index.insert(1, 2).await.unwrap();
index.insert(1, 3).await.unwrap();
index.insert(1, 4).await.unwrap();
index.insert(2, 3).await.unwrap();
index.insert(2, 4).await.unwrap();
index.insert(2, 5).await.unwrap();
assert!(index.lookup(&1).await.unwrap().unwrap().contains(&2));
index.delete(1, 2).await.unwrap();
assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&2));
assert!(index.lookup(&2).await.unwrap().unwrap().contains(&3));
index.delete(2, 3).await.unwrap();
assert!(!index.lookup(&2).await.unwrap().unwrap().contains(&3));
}
{
let mut index = Index::<u64, u64>::connect(file_name.clone()).await.unwrap();
assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&2));
assert!(!index.lookup(&2).await.unwrap().unwrap().contains(&3));
assert!(index.lookup(&1).await.unwrap().unwrap().contains(&3));
index.delete(1, 3).await.unwrap();
assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&3));
assert!(index.lookup(&1).await.unwrap().unwrap().contains(&4));
assert!(index.lookup(&2).await.unwrap().unwrap().contains(&4));
assert!(index.lookup(&2).await.unwrap().unwrap().contains(&5));
}
remove_file(&file_name).await.unwrap();
} }
} }