Merge remote-tracking branch 'origin/redesign-tables' into redesign-tables
This commit is contained in:
commit
3a50328e51
1 changed files with 209 additions and 74 deletions
|
|
@ -1,33 +1,23 @@
|
||||||
use std::marker::PhantomData;
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use tokio::fs::{File, OpenOptions};
|
use tokio::fs::{File, OpenOptions};
|
||||||
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
|
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use std::collections::{BTreeMap, HashSet};
|
use std::collections::{BTreeMap, HashSet};
|
||||||
use std::hash::Hash;
|
use std::hash::Hash;
|
||||||
|
use std::io::SeekFrom;
|
||||||
|
|
||||||
use crate::binary_coding::{decode, decode_sequence, encode, encode_sequence};
|
use crate::binary_coding::{decode, encode};
|
||||||
use bincode;
|
use bincode;
|
||||||
use bincode::{Decode, Encode};
|
use bincode::{Decode, Encode};
|
||||||
use tokio::fs;
|
|
||||||
|
|
||||||
use crate::error::{DecodeErrorKind, Error};
|
use crate::error::{DecodeErrorKind, Error};
|
||||||
|
|
||||||
use bincode::error::DecodeError;
|
|
||||||
use std::mem::size_of;
|
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, Error>;
|
type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
// Implements a persistant self-balancing Binary Search Tree. Nope.
|
|
||||||
// We need fixed-size nodes. But we want to index Strings which are variable length.
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Index<K, V> {
|
pub struct Index<K, V> {
|
||||||
file: File,
|
file: File,
|
||||||
data: BTreeMap<K, HashSet<V>>,
|
data: BTreeMap<K, HashSet<V>>,
|
||||||
key_type: PhantomData<K>,
|
|
||||||
value_type: PhantomData<V>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -48,103 +38,248 @@ where
|
||||||
|
|
||||||
let data = BTreeMap::new();
|
let data = BTreeMap::new();
|
||||||
|
|
||||||
Ok(Index {
|
Ok(Index { file, data })
|
||||||
file,
|
|
||||||
data,
|
|
||||||
key_type: PhantomData::<K>,
|
|
||||||
value_type: PhantomData::<V>,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn connect(file_name: PathBuf) -> Result<Index<K, V>> {
|
pub async fn connect(file_name: PathBuf) -> Result<Index<K, V>> {
|
||||||
let mut file: File = OpenOptions::new()
|
let file: File = OpenOptions::new()
|
||||||
.read(true)
|
.read(true)
|
||||||
.write(true)
|
.write(true)
|
||||||
.create(true)
|
|
||||||
.open(file_name)
|
.open(file_name)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let mut bytes = vec![];
|
let mut index = Index {
|
||||||
file.read_to_end(&mut bytes).await?;
|
|
||||||
|
|
||||||
let data = Index::decode_tree(&bytes)
|
|
||||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?;
|
|
||||||
|
|
||||||
Ok(Index {
|
|
||||||
file,
|
file,
|
||||||
data,
|
data: BTreeMap::new(),
|
||||||
key_type: PhantomData::<K>,
|
};
|
||||||
value_type: PhantomData::<V>,
|
|
||||||
})
|
index.load_from_file().await?;
|
||||||
|
Ok(index)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn insert(&mut self, k: K, v: V) -> Result<()> {
|
pub async fn insert(&mut self, k: K, v: V) -> Result<()> {
|
||||||
|
self.append_to_file(&k, &v).await?;
|
||||||
self.data.entry(k).or_insert_with(HashSet::new).insert(v);
|
self.data.entry(k).or_insert_with(HashSet::new).insert(v);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn insert_desynced(&mut self, k: K, v: V) -> () {
|
||||||
|
self.data.entry(k).or_insert_with(HashSet::new).insert(v);
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn lookup(&self, k: &K) -> Result<Option<HashSet<V>>> {
|
pub async fn lookup(&self, k: &K) -> Result<Option<HashSet<V>>> {
|
||||||
let hashset = self.data.get(k).unwrap();
|
let hashset = self.data.get(k).cloned();
|
||||||
Ok(Some(hashset.clone()))
|
Ok(hashset)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn delete(&mut self, k: K, v: V) -> Result<Option<bool>> {
|
pub async fn delete(&mut self, k: K, v: V) -> Result<()> {
|
||||||
Ok(Some(
|
self.data.entry(k).and_modify(|values| {
|
||||||
self.data.entry(k).or_insert_with(HashSet::new).remove(&v),
|
values.remove(&v);
|
||||||
))
|
});
|
||||||
|
self.sync_to_disk().await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn encode(&self) -> Result<Vec<u8>> {
|
pub async fn sync_to_disk(&mut self) -> Result<()> {
|
||||||
|
let mut writer = BufWriter::new(&mut self.file);
|
||||||
|
writer.seek(SeekFrom::Start(0)).await?;
|
||||||
|
|
||||||
|
let mut written: u64 = 0;
|
||||||
let mut encoded = Vec::new();
|
let mut encoded = Vec::new();
|
||||||
encoded.extend(encode(&self.data)?);
|
for (key, value) in &self.data {
|
||||||
Ok(encoded)
|
for v in value {
|
||||||
|
encoded.clear();
|
||||||
|
encoded.extend(encode(key)?);
|
||||||
|
encoded.extend(encode(v)?);
|
||||||
|
writer.write(&encoded).await?;
|
||||||
|
written += encoded.len() as u64;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.flush().await?;
|
||||||
|
self.file.set_len(written).await?;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn decode_tree(data: &[u8]) -> std::result::Result<BTreeMap<K, HashSet<V>>, DecodeError> {
|
async fn append_to_file(&mut self, key: &K, value: &V) -> Result<()> {
|
||||||
let data: BTreeMap<K, HashSet<V>> = decode(data)?.0;
|
let mut encoded = Vec::new();
|
||||||
Ok(data)
|
encoded.extend(encode(key)?);
|
||||||
|
encoded.extend(encode(value)?);
|
||||||
|
|
||||||
|
self.file.seek(SeekFrom::End(0)).await?;
|
||||||
|
self.file.write(&encoded).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_from_file(&mut self) -> Result<()> {
|
||||||
|
let mut bytes = vec![];
|
||||||
|
|
||||||
|
self.file.seek(SeekFrom::Start(0)).await?;
|
||||||
|
self.file.read_to_end(&mut bytes).await?;
|
||||||
|
|
||||||
|
let mut cursor = 0;
|
||||||
|
while cursor < bytes.len() {
|
||||||
|
let (key, len) = decode(&bytes[cursor..])
|
||||||
|
.map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?;
|
||||||
|
cursor += len;
|
||||||
|
let (value, len) = decode(&bytes[cursor..])
|
||||||
|
.map_err(|e| Error::DecodeError(DecodeErrorKind::CorruptedData, e))?;
|
||||||
|
cursor += len;
|
||||||
|
|
||||||
|
self.insert_desynced(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use tokio::fs::remove_file;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn encode_decode() {
|
async fn connect_to_new() {
|
||||||
let mut index: Index<String, u32> = Index {
|
let file_name = PathBuf::from("connect_to_new");
|
||||||
file: File::from_std(std::fs::File::create("test").unwrap()),
|
if file_name.exists() {
|
||||||
data: BTreeMap::new(),
|
remove_file(&file_name).await.unwrap();
|
||||||
key_type: PhantomData::<String>,
|
}
|
||||||
value_type: PhantomData::<u32>,
|
|
||||||
};
|
|
||||||
|
|
||||||
index.insert("foo".to_string(), 123).await.unwrap();
|
{
|
||||||
index.insert("foo".to_string(), 124).await.unwrap();
|
let index = Index::<u64, u64>::new(file_name.clone()).await.unwrap();
|
||||||
index.insert("bar".to_string(), 125).await.unwrap();
|
assert_eq!(index.data.len(), 0);
|
||||||
index.insert("bar".to_string(), 126).await.unwrap();
|
}
|
||||||
|
|
||||||
let lookup = index.lookup(&"foo".to_string()).await.unwrap().unwrap();
|
{
|
||||||
assert_eq!(lookup.len(), 2);
|
let index = Index::<u64, u64>::connect(file_name.clone()).await.unwrap();
|
||||||
assert!(lookup.contains(&123));
|
assert_eq!(index.data.len(), 0);
|
||||||
assert!(lookup.contains(&124));
|
}
|
||||||
println!("lookup {:?}", lookup);
|
|
||||||
|
|
||||||
let encoded = index.encode().unwrap();
|
remove_file(&file_name).await.unwrap();
|
||||||
let decoded = Index::<String, u32>::decode_tree(&encoded).unwrap();
|
}
|
||||||
let decoded = Index {
|
|
||||||
file: File::from_std(std::fs::File::create("test").unwrap()),
|
|
||||||
data: decoded,
|
|
||||||
key_type: PhantomData::<String>,
|
|
||||||
value_type: PhantomData::<u32>,
|
|
||||||
};
|
|
||||||
|
|
||||||
let lookup = decoded.lookup(&"foo".to_string()).await.unwrap().unwrap();
|
#[tokio::test]
|
||||||
assert_eq!(lookup.len(), 2);
|
async fn inserting() {
|
||||||
assert!(lookup.contains(&123));
|
let file_name = PathBuf::from("inserting");
|
||||||
assert!(lookup.contains(&124));
|
if file_name.exists() {
|
||||||
println!("lookup {:?}", lookup);
|
remove_file(&file_name).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
std::fs::remove_file("test").unwrap();
|
{
|
||||||
|
let mut index = Index::<u64, u64>::new(file_name.clone()).await.unwrap();
|
||||||
|
index.insert(1, 2).await.unwrap();
|
||||||
|
index.insert(1, 3).await.unwrap();
|
||||||
|
index.insert(1, 4).await.unwrap();
|
||||||
|
index.insert(2, 3).await.unwrap();
|
||||||
|
index.insert(2, 4).await.unwrap();
|
||||||
|
index.insert(2, 5).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(index.data.len(), 2);
|
||||||
|
assert_eq!(index.data.get(&1).unwrap().len(), 3);
|
||||||
|
assert_eq!(index.data.get(&2).unwrap().len(), 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let index = Index::<u64, u64>::connect(file_name.clone()).await.unwrap();
|
||||||
|
assert_eq!(index.data.len(), 2);
|
||||||
|
assert_eq!(index.data.get(&1).unwrap().len(), 3);
|
||||||
|
assert_eq!(index.data.get(&2).unwrap().len(), 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
remove_file(&file_name).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn lookuping() {
|
||||||
|
let file_name = PathBuf::from("lookuping");
|
||||||
|
if file_name.exists() {
|
||||||
|
remove_file(&file_name).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut index = Index::<u64, u64>::new(file_name.clone()).await.unwrap();
|
||||||
|
index.insert(1, 2).await.unwrap();
|
||||||
|
index.insert(1, 3).await.unwrap();
|
||||||
|
index.insert(1, 4).await.unwrap();
|
||||||
|
index.insert(2, 3).await.unwrap();
|
||||||
|
index.insert(2, 4).await.unwrap();
|
||||||
|
index.insert(2, 5).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(index.lookup(&1).await.unwrap().unwrap().len(), 3);
|
||||||
|
assert_eq!(index.lookup(&2).await.unwrap().unwrap().len(), 3);
|
||||||
|
assert_eq!(index.lookup(&3).await.unwrap(), None);
|
||||||
|
|
||||||
|
let first = index.lookup(&1).await.unwrap().unwrap();
|
||||||
|
assert!(first.contains(&2));
|
||||||
|
assert!(first.contains(&3));
|
||||||
|
assert!(first.contains(&4));
|
||||||
|
|
||||||
|
let second = index.lookup(&2).await.unwrap().unwrap();
|
||||||
|
assert!(second.contains(&3));
|
||||||
|
assert!(second.contains(&4));
|
||||||
|
assert!(second.contains(&5));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let index = Index::<u64, u64>::connect(file_name.clone()).await.unwrap();
|
||||||
|
assert_eq!(index.lookup(&1).await.unwrap().unwrap().len(), 3);
|
||||||
|
assert_eq!(index.lookup(&2).await.unwrap().unwrap().len(), 3);
|
||||||
|
assert_eq!(index.lookup(&3).await.unwrap(), None);
|
||||||
|
|
||||||
|
let first = index.lookup(&1).await.unwrap().unwrap();
|
||||||
|
assert!(first.contains(&2));
|
||||||
|
assert!(first.contains(&3));
|
||||||
|
assert!(first.contains(&4));
|
||||||
|
|
||||||
|
let second = index.lookup(&2).await.unwrap().unwrap();
|
||||||
|
assert!(second.contains(&3));
|
||||||
|
assert!(second.contains(&4));
|
||||||
|
assert!(second.contains(&5));
|
||||||
|
}
|
||||||
|
|
||||||
|
remove_file(&file_name).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn deleting() {
|
||||||
|
let file_name = PathBuf::from("deleting");
|
||||||
|
if file_name.exists() {
|
||||||
|
remove_file(&file_name).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut index = Index::<u64, u64>::new(file_name.clone()).await.unwrap();
|
||||||
|
index.insert(1, 2).await.unwrap();
|
||||||
|
index.insert(1, 3).await.unwrap();
|
||||||
|
index.insert(1, 4).await.unwrap();
|
||||||
|
index.insert(2, 3).await.unwrap();
|
||||||
|
index.insert(2, 4).await.unwrap();
|
||||||
|
index.insert(2, 5).await.unwrap();
|
||||||
|
|
||||||
|
assert!(index.lookup(&1).await.unwrap().unwrap().contains(&2));
|
||||||
|
index.delete(1, 2).await.unwrap();
|
||||||
|
assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&2));
|
||||||
|
|
||||||
|
assert!(index.lookup(&2).await.unwrap().unwrap().contains(&3));
|
||||||
|
index.delete(2, 3).await.unwrap();
|
||||||
|
assert!(!index.lookup(&2).await.unwrap().unwrap().contains(&3));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut index = Index::<u64, u64>::connect(file_name.clone()).await.unwrap();
|
||||||
|
|
||||||
|
assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&2));
|
||||||
|
assert!(!index.lookup(&2).await.unwrap().unwrap().contains(&3));
|
||||||
|
|
||||||
|
assert!(index.lookup(&1).await.unwrap().unwrap().contains(&3));
|
||||||
|
index.delete(1, 3).await.unwrap();
|
||||||
|
assert!(!index.lookup(&1).await.unwrap().unwrap().contains(&3));
|
||||||
|
|
||||||
|
assert!(index.lookup(&1).await.unwrap().unwrap().contains(&4));
|
||||||
|
assert!(index.lookup(&2).await.unwrap().unwrap().contains(&4));
|
||||||
|
assert!(index.lookup(&2).await.unwrap().unwrap().contains(&5));
|
||||||
|
}
|
||||||
|
|
||||||
|
remove_file(&file_name).await.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue