Merge branch 'interpreter-to-storage-engine' into 'main'
Connect interpreter to Storage Engine See merge request x433485/minisql!34
This commit is contained in:
commit
ad98cfafb2
29 changed files with 776 additions and 262 deletions
8
Cargo.lock
generated
8
Cargo.lock
generated
|
|
@ -286,9 +286,13 @@ dependencies = [
|
|||
"anyhow",
|
||||
"async-trait",
|
||||
"bimap",
|
||||
"bincode",
|
||||
"proto",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"storage_engine",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -554,9 +558,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.112"
|
||||
version = "1.0.113"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4d1bd37ce2324cf3bf85e5a25f96eb4baf0d5aa6eba43e7ae8958870c4ec48ed"
|
||||
checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"ryu",
|
||||
|
|
|
|||
|
|
@ -10,6 +10,10 @@ rust-version = "1.74"
|
|||
anyhow = "1.0.79"
|
||||
async-trait = "0.1.77"
|
||||
bimap = { version = "0.6.3", features = ["serde"] }
|
||||
serde = { version = "1.0.196", features = ["derive"] }
|
||||
bincode = "2.0.0-rc.3"
|
||||
serde = { version = "1.0.196", features = ["derive", "rc"] }
|
||||
tokio = { version = "1.34.0", features = ["full"] }
|
||||
thiserror = "1.0.50"
|
||||
proto = { path = "../proto" }
|
||||
storage_engine = { path = "../storage_engine" }
|
||||
serde_json = "1.0.113"
|
||||
|
|
|
|||
13
minisql/src/cancellation.rs
Normal file
13
minisql/src/cancellation.rs
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
pub trait Cancellation {
|
||||
fn is_canceled(&self) -> bool;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) struct DummyCancellation;
|
||||
|
||||
#[cfg(test)]
|
||||
impl Cancellation for DummyCancellation {
|
||||
fn is_canceled(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
|
@ -13,6 +13,14 @@ pub enum RuntimeError {
|
|||
AttemptToIndexNonIndexableColumn(TableName, ColumnName),
|
||||
#[error("table {0} already indexes column {1}")]
|
||||
AttemptToIndexAlreadyIndexedColumn(TableName, ColumnName),
|
||||
#[error("File-System Error: {0}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
#[error("Storage Engine error for table {0}: {1}")]
|
||||
StorageEngineError(TableName, storage_engine::error::Error),
|
||||
#[error("runtime anyhow error: {0}")]
|
||||
AnyhowError(#[from] anyhow::Error),
|
||||
#[error("serde error: {0}")]
|
||||
SerdeError(#[from] serde_json::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ use crate::type_system::Value;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use std::ops::{Index, IndexMut};
|
||||
use std::slice::SliceIndex;
|
||||
use storage_engine::segments::entry::EntryDetailed;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Row(Vec<Value>);
|
||||
|
|
@ -39,6 +40,12 @@ impl FromIterator<Value> for Row {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<EntryDetailed<Value>> for Row {
|
||||
fn from(entry: EntryDetailed<Value>) -> Self {
|
||||
Row(entry.data)
|
||||
}
|
||||
}
|
||||
|
||||
// To satisfy clippy.
|
||||
impl Default for Row {
|
||||
fn default() -> Self {
|
||||
|
|
|
|||
403
minisql/src/interpreter2.rs
Normal file
403
minisql/src/interpreter2.rs
Normal file
|
|
@ -0,0 +1,403 @@
|
|||
use crate::operation::{ColumnSelection, Condition, Operation};
|
||||
use crate::result::DbResult;
|
||||
use crate::schema::{Column, TableName, TablePosition, TableSchema};
|
||||
use crate::type_system::Value;
|
||||
use crate::error::RuntimeError;
|
||||
use crate::response_writer::{ResponseWriter, CompleteStatus};
|
||||
use crate::internals::row::Row;
|
||||
|
||||
use bimap::BiMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs;
|
||||
|
||||
use storage_engine::store::Store;
|
||||
use storage_engine::segments::entry::Entry;
|
||||
use storage_engine::cursor::{ReadCursor, WriteCursor};
|
||||
use storage_engine::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use storage_engine::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex};
|
||||
use crate::cancellation::Cancellation;
|
||||
|
||||
const METADATA_FILE: &'static str = "metadata.json";
|
||||
|
||||
// ==============Interpreter================
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct State {
|
||||
table_name_position_mapping: BiMap<TableName, TablePosition>,
|
||||
table_schemas: Vec<Arc<TableSchema>>,
|
||||
#[serde(skip)]
|
||||
tables: Tables,
|
||||
}
|
||||
|
||||
pub struct StateHandler {
|
||||
db_path: PathBuf,
|
||||
state: RwLock<State>,
|
||||
}
|
||||
|
||||
pub type Tables = Vec<RwLock<Table>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Table {
|
||||
schema: Arc<TableSchema>,
|
||||
store: Store<Value>
|
||||
}
|
||||
|
||||
pub type DbSchema = Vec<(TableName, TablePosition, Arc<TableSchema>)>;
|
||||
// To satisfy clippy.
|
||||
impl Default for State {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Table {
|
||||
async fn new(table_schema: TableSchema, db_path: &Path) -> DbResult<Self> {
|
||||
let table_folder_name = table_schema.table_name();
|
||||
let path_to_table_folder = db_path.join(table_folder_name);
|
||||
|
||||
let number_of_columns = table_schema.number_of_columns();
|
||||
let primary_column = table_schema.primary_column() as storage_engine::store::Column;
|
||||
let store: Store<Value> = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let table = Self {
|
||||
schema: Arc::new(table_schema),
|
||||
store,
|
||||
};
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
async fn connect(table_schema: Arc<TableSchema>, db_path: &Path) -> DbResult<Self> {
|
||||
let table_folder_name = table_schema.table_name();
|
||||
let path_to_table_folder = db_path.join(table_folder_name);
|
||||
|
||||
let store: Store<Value> = Store::connect(&path_to_table_folder).await.map_err(|e| RuntimeError::StorageEngineError(table_schema.table_name().to_string(), e))?;
|
||||
|
||||
let table = Self {
|
||||
schema: table_schema,
|
||||
store,
|
||||
};
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
async fn read(&self) -> DbResult<ReadCursor<Value>> {
|
||||
let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?;
|
||||
Ok(cursor)
|
||||
}
|
||||
|
||||
async fn write(&mut self) -> DbResult<WriteCursor<Value>> {
|
||||
let cursor = self.store.write_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e))?;
|
||||
Ok(cursor)
|
||||
}
|
||||
|
||||
pub fn schema(&self) -> Arc<TableSchema> {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
pub fn table_name(&self) -> &TableName {
|
||||
self.schema.table_name()
|
||||
}
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
table_name_position_mapping: BiMap::new(),
|
||||
table_schemas: vec![],
|
||||
tables: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn db_schema(&self) -> DbSchema {
|
||||
let mut schema: DbSchema = Vec::new();
|
||||
for (table_name, &table_position) in &self.table_name_position_mapping {
|
||||
let table_schema: Arc<TableSchema> = self.table_schemas[table_position].clone();
|
||||
schema.push((table_name.clone(), table_position, table_schema));
|
||||
}
|
||||
schema
|
||||
}
|
||||
|
||||
async fn table_at(&self, table_position: TablePosition) -> RwLockReadGuard<Table> {
|
||||
self.tables[table_position].read().await
|
||||
}
|
||||
|
||||
async fn table_at_mut(&self, table_position: TablePosition) -> RwLockWriteGuard<Table> {
|
||||
self.tables[table_position].write().await
|
||||
}
|
||||
|
||||
async fn attach_table(&mut self, table: Table) {
|
||||
// TODO: You need to update the global DB SCHEMA!
|
||||
let new_table_position: TablePosition = self.tables.len();
|
||||
self.table_name_position_mapping
|
||||
.insert(table.schema().table_name().clone(), new_table_position);
|
||||
self.table_schemas.push(table.schema());
|
||||
self.tables.push(RwLock::new(table));
|
||||
}
|
||||
|
||||
async fn select_all_rows<W, C>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection) -> DbResult<usize>
|
||||
where W: ResponseWriter,
|
||||
C: Cancellation
|
||||
{
|
||||
let mut count = 0;
|
||||
while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? {
|
||||
count += 1;
|
||||
let row: Row = From::from(entry);
|
||||
let restricted_row = row.restrict_columns(&column_selection);
|
||||
response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?;
|
||||
|
||||
if cancellation.is_canceled() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
async fn select_eq<W, C>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult<usize>
|
||||
where W: ResponseWriter,
|
||||
C: Cancellation
|
||||
{
|
||||
let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
|
||||
let count = entries.len();
|
||||
for entry in entries {
|
||||
let row: Row = From::from(entry);
|
||||
let restricted_row = row.restrict_columns(&column_selection);
|
||||
response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?;
|
||||
|
||||
if cancellation.is_canceled() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>) -> DbResult<usize> {
|
||||
let count = cursor.delete_all_entries(true)
|
||||
.await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?;
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, eq_column: Column, value: Value) -> DbResult<usize> {
|
||||
let count =
|
||||
cursor.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true)
|
||||
.await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?;
|
||||
Ok(count)
|
||||
}
|
||||
}
|
||||
|
||||
impl StateHandler {
|
||||
pub fn is_existing_db(db_path: &PathBuf) -> bool {
|
||||
db_path.exists() && db_path.is_dir() &&
|
||||
db_path.join(METADATA_FILE).exists() && db_path.join(METADATA_FILE).is_file()
|
||||
}
|
||||
|
||||
pub async fn new(db_path: PathBuf) -> DbResult<Self> {
|
||||
fs::create_dir(db_path.clone()).await.map_err(|e| RuntimeError::IoError(e))?;
|
||||
|
||||
let state = Self {
|
||||
db_path,
|
||||
state: RwLock::new(State::new()),
|
||||
};
|
||||
|
||||
state.save_metadata().await?;
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
pub async fn connect(db_path: PathBuf) -> DbResult<Self> {
|
||||
let metadata_file = db_path.join(METADATA_FILE);
|
||||
let metadata_raw = fs::read_to_string(metadata_file).await?;
|
||||
let mut metadata: State = serde_json::from_str(&metadata_raw)?;
|
||||
|
||||
let mut tables = Vec::with_capacity(metadata.table_schemas.len());
|
||||
for table_schema in &metadata.table_schemas {
|
||||
let table = Table::connect(table_schema.clone(), &db_path).await?;
|
||||
tables.push(RwLock::new(table));
|
||||
}
|
||||
|
||||
metadata.tables = tables;
|
||||
Ok(Self {
|
||||
db_path,
|
||||
state: RwLock::new(metadata),
|
||||
})
|
||||
}
|
||||
|
||||
async fn save_metadata(&self) -> DbResult<()> {
|
||||
let metadata_file = self.db_path.join(METADATA_FILE);
|
||||
let state = self.state.read().await;
|
||||
let metadata_raw = serde_json::to_string(&*state)?;
|
||||
fs::write(metadata_file, metadata_raw).await.map_err(|e| RuntimeError::IoError(e))
|
||||
}
|
||||
|
||||
pub async fn read_state(&self) -> RwLockReadGuard<State> {
|
||||
self.state.read().await
|
||||
}
|
||||
|
||||
pub async fn interpret<W: ResponseWriter, C: Cancellation>(&self, response_writer: &mut W, cancellation: &C, operation: Operation) -> DbResult<()> {
|
||||
use Operation::*;
|
||||
|
||||
match operation {
|
||||
Select(table_position, column_selection, maybe_condition) => {
|
||||
let state = self.state.read().await;
|
||||
|
||||
let table = state.table_at(table_position).await;
|
||||
let cursor = table.read().await?;
|
||||
|
||||
response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?;
|
||||
let count = match maybe_condition {
|
||||
None => State::select_all_rows(&table, cursor, response_writer, cancellation, column_selection).await?,
|
||||
Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, cancellation, column_selection, eq_column, value).await?
|
||||
};
|
||||
response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
}
|
||||
Insert(table_position, values) => {
|
||||
let state = self.state.read().await;
|
||||
|
||||
let mut table = state.table_at_mut(table_position).await;
|
||||
let mut cursor = table.write().await?;
|
||||
|
||||
let entry = Entry::new(values);
|
||||
cursor.insert_entry(entry).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
|
||||
|
||||
response_writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
}
|
||||
Delete(table_position, maybe_condition) => {
|
||||
let state = self.state.read().await;
|
||||
|
||||
let mut table = state.table_at_mut(table_position).await;
|
||||
let table_name = table.table_name().clone();
|
||||
let cursor = table.write().await?;
|
||||
|
||||
let count = match maybe_condition {
|
||||
None => State::delete_all_rows(table_name, cursor).await?,
|
||||
Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, eq_column, value).await?
|
||||
};
|
||||
|
||||
response_writer.write_command_complete(CompleteStatus::Delete(count)).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
}
|
||||
CreateTable(table_schema) => {
|
||||
{
|
||||
let mut state = self.state.write().await;
|
||||
let table = Table::new(table_schema, &self.db_path).await?;
|
||||
state.attach_table(table).await;
|
||||
// WARNING: We need to drop the write lock on state unless we want a deadlock.
|
||||
}
|
||||
self.save_metadata().await?;
|
||||
response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
}
|
||||
CreateIndex(table_position, column) => {
|
||||
let state = self.state.read().await;
|
||||
|
||||
let mut table = state.table_at_mut(table_position).await;
|
||||
let mut cursor = table.write().await?;
|
||||
cursor.attach_index(column as storage_engine::store::Column).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
|
||||
response_writer.write_command_complete(CompleteStatus::CreateIndex).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::operation::Operation;
|
||||
use crate::schema::Column;
|
||||
use crate::response_writer::ResponseWriterStub;
|
||||
use crate::type_system::{DbType, IndexableValue, Value};
|
||||
use std::collections::HashSet;
|
||||
use tokio::fs::{File, OpenOptions, DirBuilder};
|
||||
use tokio::fs;
|
||||
use crate::cancellation::DummyCancellation;
|
||||
|
||||
impl Drop for State {
|
||||
fn drop(&mut self) {
|
||||
println!("CLEANING UP INTERPRETER STATE");
|
||||
|
||||
let table_folder = "db-test-0";
|
||||
// Seems no one has figured out how to do AsyncDrop yet.
|
||||
std::fs::remove_dir_all(table_folder).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn users_schema() -> TableSchema {
|
||||
TableSchema::new(
|
||||
"users".to_string(),
|
||||
"id".to_string(),
|
||||
vec!["id".to_string(), "name".to_string(), "age".to_string()],
|
||||
vec![DbType::Uuid, DbType::String, DbType::Int],
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_creation() {
|
||||
let db_path = Path::new("db-test-0");
|
||||
let state = StateHandler::new(db_path.to_path_buf()).await.unwrap();
|
||||
|
||||
let users_schema = users_schema();
|
||||
let users = users_schema.table_name().clone();
|
||||
|
||||
let mut response_writer = ResponseWriterStub::new();
|
||||
|
||||
state
|
||||
.interpret(&mut response_writer, &DummyCancellation, Operation::CreateTable(users_schema.clone())).await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
println!("==EMPTY SELECT===");
|
||||
let users_position: TablePosition = 0;
|
||||
state
|
||||
.interpret(&mut response_writer, &DummyCancellation, Operation::Select(
|
||||
users_position,
|
||||
users_schema.all_selection(),
|
||||
None,
|
||||
)).await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
{
|
||||
let users = 0;
|
||||
let (id, name, age) = (
|
||||
Value::Uuid(0),
|
||||
Value::String("Plato".to_string()),
|
||||
Value::Int(64),
|
||||
);
|
||||
|
||||
println!("About to insert!");
|
||||
|
||||
state
|
||||
.interpret(&mut response_writer, &DummyCancellation, Operation::Insert(
|
||||
users,
|
||||
vec![id.clone(), name.clone(), age.clone()],
|
||||
)).await
|
||||
.unwrap();
|
||||
}
|
||||
{
|
||||
println!("==SELECT===");
|
||||
let users_position: TablePosition = 0;
|
||||
state
|
||||
.interpret(&mut response_writer, &DummyCancellation, Operation::Select(
|
||||
users_position,
|
||||
users_schema.all_selection(),
|
||||
None,
|
||||
)).await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// assert!(false);
|
||||
|
||||
// assert!(state.tables.len() == 1);
|
||||
// let table = &state.tables[0];
|
||||
// assert!(table.rows().len() == 0);
|
||||
|
||||
// assert!(table.table_name() == &users);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,9 +1,11 @@
|
|||
mod error;
|
||||
mod internals;
|
||||
pub mod interpreter;
|
||||
pub mod interpreter2;
|
||||
pub mod operation;
|
||||
pub mod response_writer;
|
||||
pub mod restricted_row;
|
||||
mod result;
|
||||
pub mod schema;
|
||||
pub mod type_system;
|
||||
pub mod cancellation;
|
||||
|
|
|
|||
|
|
@ -34,3 +34,53 @@ pub trait ResponseWriter {
|
|||
async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()>;
|
||||
async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
// ===Stub implementation for testing===
|
||||
//
|
||||
pub struct ResponseWriterStub {}
|
||||
|
||||
impl ResponseWriterStub {
|
||||
pub fn new() -> Self {
|
||||
ResponseWriterStub {}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ResponseWriter for ResponseWriterStub
|
||||
{
|
||||
async fn write_table_header(
|
||||
&mut self,
|
||||
table_schema: &TableSchema,
|
||||
columns: &ColumnSelection,
|
||||
) -> anyhow::Result<()> {
|
||||
let column_names = table_schema.get_columns();
|
||||
for &column in columns {
|
||||
let column_name = column_names[column];
|
||||
print!("{}, ", column_name)
|
||||
}
|
||||
println!();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()> {
|
||||
for (_, value) in row.iter() {
|
||||
print!("{:?}, ", value)
|
||||
}
|
||||
println!();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()> {
|
||||
use CompleteStatus::*;
|
||||
match status {
|
||||
Insert { oid, rows } => println!("oid = {}, rows = {}", oid, rows),
|
||||
Delete(count) => println!("Deleted {}", count),
|
||||
Select(count) => println!("Selected {}", count),
|
||||
CreateTable => println!("Table created"),
|
||||
CreateIndex => println!("Index created"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
use crate::schema::Column;
|
||||
use crate::schema::{Column, TableSchema};
|
||||
use crate::type_system::Value;
|
||||
use crate::operation::ColumnSelection;
|
||||
use std::ops::Index;
|
||||
use std::slice::SliceIndex;
|
||||
use storage_engine::segments::entry::EntryDetailed;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RestrictedRow(Vec<(Column, Value)>);
|
||||
|
|
|
|||
|
|
@ -50,15 +50,25 @@ impl TableSchema {
|
|||
&self.table_name
|
||||
}
|
||||
|
||||
pub fn primary_column(&self) -> Column {
|
||||
self.primary_key
|
||||
}
|
||||
|
||||
pub fn column_type(&self, column: Column) -> DbType {
|
||||
self.types[column].clone()
|
||||
}
|
||||
|
||||
pub fn get_columns(&self) -> Vec<&ColumnName> {
|
||||
self.column_name_position_mapping
|
||||
let mut columns_in_random_order: Vec<_> = self.column_name_position_mapping
|
||||
.iter()
|
||||
.map(|(name, _)| name)
|
||||
.collect()
|
||||
.collect();
|
||||
columns_in_random_order.sort_by(|&(_, column0), &(_, column1)| column0.cmp(column1));
|
||||
|
||||
let columns: Vec<_> = columns_in_random_order
|
||||
.iter()
|
||||
.map(|(name, _)| *name)
|
||||
.collect();
|
||||
columns
|
||||
}
|
||||
|
||||
pub fn does_column_exist(&self, column_name: &ColumnName) -> bool {
|
||||
|
|
|
|||
|
|
@ -2,9 +2,12 @@ use crate::error::TypeConversionError;
|
|||
use proto::message::primitive::pgoid::PgOid;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::Ordering;
|
||||
// TODO: Private???
|
||||
// use bincode::{Encode, Encoder, EncodeError, Decode, Decoder, DecodeError};
|
||||
use bincode::{Encode, Decode};
|
||||
|
||||
// ==============Types================
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
|
||||
pub enum DbType {
|
||||
String,
|
||||
Int,
|
||||
|
|
@ -15,7 +18,7 @@ pub enum DbType {
|
|||
|
||||
// ==============Values================
|
||||
pub type Uuid = u64;
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Encode, Decode)]
|
||||
#[serde(try_from = "String", into = "String")]
|
||||
pub enum Value {
|
||||
Number(f64),
|
||||
|
|
@ -81,6 +84,54 @@ impl Ord for IndexableValue {
|
|||
}
|
||||
}
|
||||
|
||||
impl Eq for Value {}
|
||||
|
||||
impl PartialOrd for Value {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Make column know about indexable types
|
||||
impl Ord for Value {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
match (self, other) {
|
||||
(Value::String(s0), Value::String(s1)) => s0.cmp(s1),
|
||||
(Value::Int(n0), Value::Int(n1)) => n0.cmp(n1),
|
||||
(Value::Uuid(id0), Value::Uuid(id1)) => id0.cmp(id1),
|
||||
(Value::None(_), Value::None(_)) => Ordering::Equal,
|
||||
(Value::None(_), Value::Some(_)) => Ordering::Less,
|
||||
(Value::Some(_), Value::None(_)) => Ordering::Greater,
|
||||
(Value::Some(v0), Value::Some(v1)) => v0.cmp(v1),
|
||||
_ =>
|
||||
// SAFETY:
|
||||
// We are using indexable values as keys in key-value maps.
|
||||
// When validation is done, it can't happen that we will be comparing two values
|
||||
// of different types.
|
||||
// Ofcourse another option is to artificialy order e.g.
|
||||
// None < Some(...) < String < Int < Uuid
|
||||
// where ... is again None < Some(...) < String < Int < Uuid
|
||||
// where ...
|
||||
// infinitely deep total order. But this is pointless for our usecase.
|
||||
{
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// impl Encode for Value {
|
||||
// fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
|
||||
// todo!()
|
||||
// }
|
||||
// }
|
||||
|
||||
// impl Decode for Value {
|
||||
// fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, DecodeError> {
|
||||
// todo!()
|
||||
// }
|
||||
// }
|
||||
|
||||
impl DbType {
|
||||
fn new_n_option(n: usize, inside: DbType) -> DbType {
|
||||
if n == 0 {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use crate::syntax::RawQuerySyntax;
|
||||
use minisql::{interpreter::DbSchema, operation::Operation};
|
||||
use minisql::{interpreter2::DbSchema, operation::Operation};
|
||||
use nom::{branch::alt, character::complete::{multispace0, char}, multi::many1, sequence::{delimited, terminated}, IResult};
|
||||
use thiserror::Error;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
use std::collections::{BTreeMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::parsing::literal::Literal;
|
||||
|
|
@ -6,7 +7,7 @@ use crate::syntax;
|
|||
use crate::syntax::{ColumnSchema, RawQuerySyntax, RawTableSchema};
|
||||
use minisql::operation;
|
||||
use minisql::{
|
||||
interpreter::DbSchema,
|
||||
interpreter2::DbSchema,
|
||||
operation::Operation,
|
||||
schema::{Column, ColumnName, TableName, TablePosition, TableSchema},
|
||||
type_system::DbType,
|
||||
|
|
@ -63,15 +64,15 @@ pub fn validate_operation(
|
|||
}
|
||||
}
|
||||
|
||||
fn validate_table_exists<'a>(
|
||||
db_schema: &DbSchema<'a>,
|
||||
table_name: &'a TableName,
|
||||
) -> Result<(TablePosition, &'a TableSchema), ValidationError> {
|
||||
fn validate_table_exists(
|
||||
db_schema: &DbSchema,
|
||||
table_name: &TableName,
|
||||
) -> Result<(TablePosition, Arc<TableSchema>), ValidationError> {
|
||||
db_schema
|
||||
.iter()
|
||||
.find(|(tname, _, _)| table_name.eq(tname))
|
||||
.ok_or(ValidationError::TableDoesNotExist(table_name.to_string()))
|
||||
.map(|(_, table_position, table_schema)| (*table_position, *table_schema))
|
||||
.map(|(_, table_position, table_schema)| (*table_position, table_schema.clone()))
|
||||
}
|
||||
|
||||
fn validate_create_table(
|
||||
|
|
@ -167,7 +168,7 @@ fn validate_select(
|
|||
.iter()
|
||||
.filter_map(|column_name| schema.get_column(column_name))
|
||||
.collect();
|
||||
let validated_condition = validate_condition(condition, schema)?;
|
||||
let validated_condition = validate_condition(condition, &schema)?;
|
||||
Ok(Operation::Select(
|
||||
table_position,
|
||||
selection,
|
||||
|
|
@ -178,7 +179,7 @@ fn validate_select(
|
|||
}
|
||||
}
|
||||
syntax::ColumnSelection::All => {
|
||||
let validated_condition = validate_condition(condition, schema)?;
|
||||
let validated_condition = validate_condition(condition, &schema)?;
|
||||
Ok(Operation::Select(
|
||||
table_position,
|
||||
schema.all_selection(),
|
||||
|
|
@ -267,13 +268,13 @@ fn validate_delete(
|
|||
db_schema: &DbSchema,
|
||||
) -> Result<Operation, ValidationError> {
|
||||
let (table_position, schema) = validate_table_exists(db_schema, &table_name)?;
|
||||
let validated_condition = validate_condition(condition, schema)?;
|
||||
let validated_condition = validate_condition(condition, &schema)?;
|
||||
Ok(Operation::Delete(table_position, validated_condition))
|
||||
}
|
||||
|
||||
fn validate_condition(
|
||||
condition: Option<syntax::Condition>,
|
||||
schema: &TableSchema,
|
||||
schema: &Arc<TableSchema>,
|
||||
) -> Result<Option<operation::Condition>, ValidationError> {
|
||||
match condition {
|
||||
Some(condition) => match condition {
|
||||
|
|
@ -338,14 +339,14 @@ where
|
|||
None
|
||||
}
|
||||
|
||||
fn get_table_schema<'a>(
|
||||
db_schema: &DbSchema<'a>,
|
||||
table_name: &'a TableName,
|
||||
) -> Option<&'a TableSchema> {
|
||||
fn get_table_schema(
|
||||
db_schema: &DbSchema,
|
||||
table_name: &TableName,
|
||||
) -> Option<Arc<TableSchema>> {
|
||||
let (_, _, table_schema) = db_schema
|
||||
.iter()
|
||||
.find(|(tname, _, _)| table_name.eq(tname))?;
|
||||
Some(table_schema)
|
||||
Some(table_schema.clone())
|
||||
}
|
||||
|
||||
fn literal_to_value(lit: Literal, hint: &DbType) -> Value {
|
||||
|
|
@ -428,11 +429,11 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
fn db_schema(users_schema: &TableSchema) -> DbSchema {
|
||||
vec![("users".to_string(), 0, users_schema)]
|
||||
fn db_schema(users_schema: Arc<TableSchema>) -> DbSchema {
|
||||
vec![("users".to_string(), 0, users_schema.clone())]
|
||||
}
|
||||
|
||||
fn empty_db_schema() -> DbSchema<'static> {
|
||||
fn empty_db_schema() -> DbSchema {
|
||||
vec![]
|
||||
}
|
||||
|
||||
|
|
@ -547,7 +548,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_create_already_exists() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = CreateTable(raw_users_schema());
|
||||
let result = validate_operation(syntax, &db_schema);
|
||||
|
|
@ -561,7 +562,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_basic() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
let users_position = 0;
|
||||
let id = 0;
|
||||
let name = 1;
|
||||
|
|
@ -583,7 +584,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_non_existent_table() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax =
|
||||
Select("does_not_exist".to_string(), ColumnSelection::All, None);
|
||||
|
|
@ -594,7 +595,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_eq() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let users_position = 0;
|
||||
let id = 0;
|
||||
|
|
@ -622,7 +623,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_eq_columns_selection() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let users_position = 0;
|
||||
let name = 1;
|
||||
|
|
@ -652,7 +653,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_eq_columns_selection_nonexistent_column_selected() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = Select(
|
||||
"users".to_string(),
|
||||
|
|
@ -666,7 +667,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_eq_non_existent_column() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = Select(
|
||||
"users".to_string(),
|
||||
|
|
@ -680,7 +681,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_eq_type_error() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = Select(
|
||||
"users".to_string(),
|
||||
|
|
@ -695,7 +696,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_insert() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let users_position = 0;
|
||||
|
||||
|
|
@ -732,7 +733,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_insert_non_existent_column() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = Insert(
|
||||
"users".to_string(),
|
||||
|
|
@ -750,7 +751,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_insert_ill_typed_column() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = Insert(
|
||||
"users".to_string(),
|
||||
|
|
@ -768,7 +769,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_delete_all() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let users_position = 0;
|
||||
|
||||
|
|
@ -786,7 +787,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_delete_eq() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let users_position = 0;
|
||||
let age = 2;
|
||||
|
|
@ -816,7 +817,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_create_index() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let users_position = 0;
|
||||
let age = 2;
|
||||
|
|
@ -836,7 +837,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_create_index_nonexistent_column() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = CreateIndex("users".to_string(), "does_not_exist".to_string());
|
||||
let result = validate_operation(syntax, &db_schema);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use minisql::cancellation::Cancellation;
|
||||
|
||||
pub struct ResetCancelToken {
|
||||
is_canceled: Arc<AtomicBool>,
|
||||
|
|
@ -12,10 +13,6 @@ impl ResetCancelToken {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn is_canceled(&self) -> bool {
|
||||
self.is_canceled.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub fn cancel(&self) {
|
||||
self.is_canceled.store(true, Ordering::SeqCst);
|
||||
}
|
||||
|
|
@ -25,6 +22,12 @@ impl ResetCancelToken {
|
|||
}
|
||||
}
|
||||
|
||||
impl Cancellation for ResetCancelToken {
|
||||
fn is_canceled(&self) -> bool {
|
||||
self.is_canceled.load(Ordering::SeqCst)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for ResetCancelToken {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@ pub struct Configuration {
|
|||
help = "Port for the server to listen on"
|
||||
)]
|
||||
port: u16,
|
||||
#[arg(short, long, help = "Path to the data file")]
|
||||
file: PathBuf,
|
||||
#[arg(short, long, help = "Path to the folder for database data")]
|
||||
folder: PathBuf,
|
||||
#[arg(short, long, help = "Delay between rows in milliseconds")]
|
||||
throttle: Option<u64>,
|
||||
}
|
||||
|
|
@ -30,8 +30,8 @@ impl Configuration {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_file_path(&self) -> &PathBuf {
|
||||
&self.file
|
||||
pub fn get_folder_path(&self) -> &PathBuf {
|
||||
&self.folder
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
|
|
|||
|
|
@ -1,14 +1,13 @@
|
|||
use std::collections::HashMap;
|
||||
use std::io::ErrorKind;
|
||||
use std::sync::Arc;
|
||||
|
||||
use clap::Parser;
|
||||
use tokio::io::{BufReader, BufWriter};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use minisql::interpreter::{Response, State};
|
||||
use minisql::response_writer::{CompleteStatus, ResponseWriter};
|
||||
use minisql::interpreter2::StateHandler;
|
||||
use minisql::response_writer::ResponseWriter;
|
||||
use parser::parse_and_validate;
|
||||
use proto::handshake::errors::ServerHandshakeError;
|
||||
use proto::handshake::request::HandshakeRequest;
|
||||
|
|
@ -22,23 +21,21 @@ use proto::writer::protowriter::{ProtoFlush, ProtoWriter};
|
|||
|
||||
use crate::cancellation::ResetCancelToken;
|
||||
use crate::config::Configuration;
|
||||
use crate::persistence::state_to_file;
|
||||
use crate::proto_wrapper::ServerProtoWrapper;
|
||||
|
||||
mod cancellation;
|
||||
mod config;
|
||||
mod persistence;
|
||||
mod proto_wrapper;
|
||||
|
||||
type TokenStore = Arc<Mutex<HashMap<(i32, i32), ResetCancelToken>>>;
|
||||
type SharedDbState = Arc<RwLock<State>>;
|
||||
type SharedDbState = Arc<StateHandler>;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let config = Configuration::parse();
|
||||
let config = Arc::new(config);
|
||||
|
||||
let state = Arc::new(RwLock::new(get_state(&config).await?));
|
||||
let state = Arc::new(get_state(&config).await?);
|
||||
let tokens = Arc::new(Mutex::new(HashMap::<(i32, i32), ResetCancelToken>::new()));
|
||||
|
||||
let addr = config.get_socket_address();
|
||||
|
|
@ -59,15 +56,12 @@ async fn main() -> anyhow::Result<()> {
|
|||
}
|
||||
}
|
||||
|
||||
async fn get_state(config: &Configuration) -> anyhow::Result<State> {
|
||||
let result = persistence::state_from_file(config.get_file_path()).await;
|
||||
match result {
|
||||
Err(ref e) if e.kind() == ErrorKind::NotFound => {
|
||||
println!("WARNING: No DB state file found, creating new one");
|
||||
Ok(State::new())
|
||||
}
|
||||
Err(e) => Err(e)?,
|
||||
Ok(state) => Ok(state),
|
||||
async fn get_state(config: &Configuration) -> anyhow::Result<StateHandler> {
|
||||
let path = config.get_folder_path();
|
||||
if StateHandler::is_existing_db(&path) {
|
||||
Ok(StateHandler::connect(path.clone()).await?)
|
||||
} else {
|
||||
Ok(StateHandler::new(path.clone()).await?)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -191,75 +185,10 @@ where
|
|||
token.reset();
|
||||
|
||||
let operation = {
|
||||
let state = state.read().await;
|
||||
let db_schema = state.db_schema();
|
||||
let db_schema = state.read_state().await.db_schema();
|
||||
parse_and_validate(query, &db_schema)?
|
||||
};
|
||||
|
||||
let need_write = {
|
||||
let mut state = state.write().await;
|
||||
let response = state.interpret(operation)?;
|
||||
|
||||
match response {
|
||||
Response::Deleted(i) => {
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::Delete(i))
|
||||
.await?;
|
||||
true
|
||||
}
|
||||
Response::Inserted => {
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 })
|
||||
.await?;
|
||||
true
|
||||
}
|
||||
Response::Selected(schema, columns, mut rows) => {
|
||||
writer.write_table_header(schema, &columns).await?;
|
||||
match rows.next() {
|
||||
Some(row) => {
|
||||
writer.write_table_row(&row).await?;
|
||||
|
||||
let mut sent_rows = 1;
|
||||
for row in rows {
|
||||
sent_rows += 1;
|
||||
writer.write_table_row(&row).await?;
|
||||
if token.is_canceled() {
|
||||
token.reset();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::Select(sent_rows))
|
||||
.await?;
|
||||
}
|
||||
_ => {
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::Select(0))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
Response::TableCreated => {
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::CreateTable)
|
||||
.await?;
|
||||
true
|
||||
}
|
||||
Response::IndexCreated => {
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::CreateIndex)
|
||||
.await?;
|
||||
true
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if need_write {
|
||||
let state = state.read().await;
|
||||
state_to_file(&state, config.get_file_path()).await?;
|
||||
}
|
||||
|
||||
state.interpret(writer, token, operation).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,15 +0,0 @@
|
|||
use minisql::interpreter::State;
|
||||
use std::path::PathBuf;
|
||||
use tokio::{fs, io};
|
||||
|
||||
pub async fn state_from_file(path: &PathBuf) -> io::Result<State> {
|
||||
let content = fs::read_to_string(path).await?;
|
||||
let state = serde_json::from_str(&content)?;
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
pub async fn state_to_file(state: &State, path: &PathBuf) -> io::Result<()> {
|
||||
let content = serde_json::to_string(state)?;
|
||||
fs::write(path, content).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -10,10 +10,11 @@ use bincode::{Decode, Encode};
|
|||
use crate::segments::entry::EntryDetailed;
|
||||
use crate::segments::entry_header::EntryHeader;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::storage_engine::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME};
|
||||
use crate::store::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME};
|
||||
use crate::index::Index;
|
||||
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
|
||||
use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
|
||||
use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex};
|
||||
|
||||
const GARBAGE_COLLECTION_TRIGGER: usize = 100;
|
||||
|
|
@ -82,24 +83,24 @@ impl <T>CursorCanWrite<T> for AppendOnlyCursor<T> {}
|
|||
|
||||
|
||||
// ===capability to access header===
|
||||
impl <T>CursorCanReadHeader<T> for ReadCursor<'_, T> {
|
||||
impl <T>CursorCanTraverse<T> for ReadCursor<'_, T> {
|
||||
fn header(&self) -> &StoreHeader { &self.header }
|
||||
}
|
||||
|
||||
impl <T>CursorCanReadHeader<T> for WriteCursor<'_, T> {
|
||||
impl <T>CursorCanTraverse<T> for WriteCursor<'_, T> {
|
||||
fn header(&self) -> &StoreHeader { &self.header }
|
||||
}
|
||||
|
||||
impl <T>CursorCanReadHeader<T> for AppendOnlyCursor<T> {
|
||||
impl <T>CursorCanTraverse<T> for AppendOnlyCursor<T> {
|
||||
fn header(&self) -> &StoreHeader { &self.header }
|
||||
}
|
||||
|
||||
impl <T>CursorCanWriteHeader<T> for WriteCursor<'_, T> {
|
||||
impl <T>CursorCanModifyEntries<T> for WriteCursor<'_, T> {
|
||||
fn header_mut(&mut self) -> &mut StoreHeader { self.header }
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
|
||||
}
|
||||
|
||||
impl <T>CursorCanWriteHeader<T> for AppendOnlyCursor<T> {
|
||||
impl <T>CursorCanModifyEntries<T> for AppendOnlyCursor<T> {
|
||||
fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header }
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
|
||||
}
|
||||
|
|
@ -243,6 +244,21 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
Ok(count)
|
||||
}
|
||||
|
||||
pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result<usize>
|
||||
where T: Encode + Decode + Ord + Send + Sync + Clone
|
||||
{
|
||||
let mut count = 0;
|
||||
while let Some(entry) = self.next_alive().await? {
|
||||
count += 1;
|
||||
self.mark_deleted_at(entry.file_position, false).await?
|
||||
}
|
||||
|
||||
if enable_garbage_collector {
|
||||
self.attempt_garbage_collection_if_necessary().await?;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result<usize>
|
||||
where T: Encode + Decode + Ord + Send + Sync + Clone
|
||||
{
|
||||
|
|
@ -363,7 +379,7 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
|||
async fn spawn_cursor_to_intermediate_file(&self) -> Result<AppendOnlyCursor<T>>
|
||||
where T: Send
|
||||
{
|
||||
let table_folder = self.header.table_folder.to_string();
|
||||
let table_folder = self.header.table_folder.clone();
|
||||
let path_to_table = Path::new(&table_folder);
|
||||
let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
|
||||
|
||||
|
|
|
|||
67
storage_engine/src/cursor_capabilities/entry_modification.rs
Normal file
67
storage_engine/src/cursor_capabilities/entry_modification.rs
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
use async_trait::async_trait;
|
||||
|
||||
use bincode;
|
||||
use bincode::Encode;
|
||||
use crate::binary_coding::encode;
|
||||
|
||||
use crate::segments::entry::Entry;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::store::{FilePosition, Result};
|
||||
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
|
||||
fn header_mut(&mut self) -> &mut StoreHeader;
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition);
|
||||
|
||||
// ===Store Header Manipulation===
|
||||
async fn increment_total_count(&mut self) -> Result<()>
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
|
||||
let new_count = self.header_mut().increment_total_count();
|
||||
self.write_bytes(&encode::<usize>(&new_count)?).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn increment_deleted_count(&mut self) -> Result<()>
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?;
|
||||
let new_count = self.header_mut().increment_deleted_count();
|
||||
self.write_bytes(&encode::<usize>(&new_count)?).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn set_header(&mut self, header: &StoreHeader) -> Result<()>
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
let encoded_header: Vec<u8> = header.encode()?;
|
||||
self.write_bytes(&encoded_header).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ===Append Entry===
|
||||
|
||||
// Moves cursor to the end.
|
||||
// Returns file position to the start of the new entry.
|
||||
async fn append_entry_no_indexing(&mut self, entry: &Entry<T>) -> Result<FilePosition>
|
||||
where T: Encode + Send + Sync
|
||||
{
|
||||
self.increment_total_count().await?;
|
||||
|
||||
let encoded_entry: Vec<u8> = entry.encode()?;
|
||||
let file_position = self.seek_to_end().await?;
|
||||
self.write_bytes(&encoded_entry).await?;
|
||||
|
||||
let eof_file_position: FilePosition = self.current_file_position().await?;
|
||||
self.set_eof_file_position(eof_file_position);
|
||||
|
||||
Ok(file_position)
|
||||
}
|
||||
}
|
||||
|
|
@ -7,12 +7,13 @@ use bincode::{Decode, Encode};
|
|||
|
||||
use crate::error::Error;
|
||||
use crate::segments::entry::{Entry, EntryDetailed};
|
||||
use crate::storage_engine::{FilePosition, Column, Result};
|
||||
use crate::store::{FilePosition, Column, Result};
|
||||
use crate::index::Index;
|
||||
use crate::cursor_capabilities::header_access::{CursorCanReadHeader, CursorCanWriteHeader};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanReadIndex<T>: CursorCanReadHeader<T> {
|
||||
pub trait CursorCanReadIndex<T>: CursorCanTraverse<T> {
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>];
|
||||
|
||||
async fn index_lookup(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>>
|
||||
|
|
@ -52,7 +53,7 @@ pub trait CursorCanReadIndex<T>: CursorCanReadHeader<T> {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanWriteHeader<T> {
|
||||
pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntries<T> {
|
||||
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>];
|
||||
|
||||
// Assumes that the column is indexable.
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
pub(crate) mod primitive;
|
||||
pub(crate) mod header_access;
|
||||
pub(crate) mod index_access;
|
||||
pub mod traversal;
|
||||
pub mod entry_modification;
|
||||
pub mod index_access;
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom};
|
|||
use tokio::fs::File;
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::storage_engine::{FilePosition, Result};
|
||||
use crate::store::{FilePosition, Result};
|
||||
|
||||
#[async_trait]
|
||||
pub(crate) trait CursorCanRead<T> {
|
||||
|
|
|
|||
|
|
@ -2,18 +2,19 @@ use tokio::io::AsyncReadExt;
|
|||
use async_trait::async_trait;
|
||||
|
||||
use bincode;
|
||||
use bincode::{Decode, Encode};
|
||||
use crate::binary_coding::{encode, decode};
|
||||
use bincode::Decode;
|
||||
use crate::binary_coding::decode;
|
||||
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use crate::segments::entry::{Entry, EntryDetailed};
|
||||
use crate::segments::entry::EntryDetailed;
|
||||
use crate::segments::entry_header::EntryHeaderWithDataSize;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::storage_engine::{FilePosition, Column, Result};
|
||||
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
|
||||
use crate::store::{FilePosition, Column, Result};
|
||||
use crate::cursor_capabilities::primitive::CursorCanRead;
|
||||
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanReadHeader<T>: CursorCanRead<T> {
|
||||
pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
||||
fn header(&self) -> &StoreHeader;
|
||||
|
||||
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
|
||||
|
|
@ -172,59 +173,3 @@ pub trait CursorCanReadHeader<T>: CursorCanRead<T> {
|
|||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanWriteHeader<T>: CursorCanReadHeader<T> + CursorCanWrite<T> {
|
||||
fn header_mut(&mut self) -> &mut StoreHeader;
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition);
|
||||
|
||||
// ===Store Header Manipulation===
|
||||
async fn increment_total_count(&mut self) -> Result<()>
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
|
||||
let new_count = self.header_mut().increment_total_count();
|
||||
self.write_bytes(&encode::<usize>(&new_count)?).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn increment_deleted_count(&mut self) -> Result<()>
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?;
|
||||
let new_count = self.header_mut().increment_deleted_count();
|
||||
self.write_bytes(&encode::<usize>(&new_count)?).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn set_header(&mut self, header: &StoreHeader) -> Result<()>
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
let encoded_header: Vec<u8> = header.encode()?;
|
||||
self.write_bytes(&encoded_header).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ===Append Entry===
|
||||
|
||||
// Moves cursor to the end.
|
||||
// Returns file position to the start of the new entry.
|
||||
async fn append_entry_no_indexing(&mut self, entry: &Entry<T>) -> Result<FilePosition>
|
||||
where T: Encode + Send + Sync
|
||||
{
|
||||
self.increment_total_count().await?;
|
||||
|
||||
let encoded_entry: Vec<u8> = entry.encode()?;
|
||||
let file_position = self.seek_to_end().await?;
|
||||
self.write_bytes(&encoded_entry).await?;
|
||||
|
||||
let eof_file_position: FilePosition = self.current_file_position().await?;
|
||||
self.set_eof_file_position(eof_file_position);
|
||||
|
||||
Ok(file_position)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,25 +1,41 @@
|
|||
use crate::storage_engine::Column;
|
||||
use crate::store::Column;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Decoding Error: {0} {1}")]
|
||||
DecodeError(DecodeErrorKind, bincode::error::DecodeError),
|
||||
#[error("Encoding Error: {0}")]
|
||||
EncodeError(bincode::error::EncodeError),
|
||||
#[error("Attempt to index non-indexed column {0}")]
|
||||
AttemptToIndexNonIndexableColumn(Column),
|
||||
#[error("Index Corruption: Index is storing eof file position for column {0}")]
|
||||
IndexIsStoringEofFilePosition(Column),
|
||||
#[error("Column {0} is already indexed")]
|
||||
ColumnAlreadyIndexed(Column),
|
||||
#[error("File-System Error: {0}")]
|
||||
IoError(std::io::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Error)]
|
||||
pub enum DecodeErrorKind {
|
||||
#[error("StoreHeaderNumberOfColumns")]
|
||||
StoreHeaderNumberOfColumns,
|
||||
#[error("StoreHeaderDeletedCount")]
|
||||
StoreHeaderDeletedCount,
|
||||
#[error("StoreHeaderTotalCount")]
|
||||
StoreHeaderTotalCount,
|
||||
#[error("StoreHeaderPrimaryColumn")]
|
||||
StoreHeaderPrimaryColumn,
|
||||
#[error("StoreHeaderIndexedColumns")]
|
||||
StoreHeaderIndexedColumns,
|
||||
#[error("EntryData")]
|
||||
EntryData,
|
||||
#[error("EntryIsDeleted")]
|
||||
EntryIsDeleted,
|
||||
#[error("EntryHeaderWithDataSizes")]
|
||||
EntryHeaderWithDataSizes,
|
||||
#[error("CorruptedData")]
|
||||
CorruptedData,
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
pub mod storage_engine;
|
||||
pub mod store;
|
||||
mod binary_coding;
|
||||
mod error;
|
||||
pub mod error;
|
||||
mod index;
|
||||
mod cursor;
|
||||
mod segments;
|
||||
mod cursor_capabilities;
|
||||
pub mod cursor;
|
||||
pub mod segments;
|
||||
pub mod cursor_capabilities;
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
use bincode::{Decode, Encode};
|
||||
|
||||
use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence};
|
||||
use crate::storage_engine::{Result, FilePosition};
|
||||
use crate::store::{Result, FilePosition};
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use crate::segments::entry_header::{EntryHeader, EntryHeaderWithDataSize};
|
||||
|
||||
|
|
@ -23,10 +23,6 @@ impl <T>Entry<T> {
|
|||
Self { header: EntryHeader { is_deleted: false }, data }
|
||||
}
|
||||
|
||||
pub fn new_deleted(data: Vec<T>) -> Self {
|
||||
Self { header: EntryHeader { is_deleted: true}, data }
|
||||
}
|
||||
|
||||
// FORMAT: [EntryHeaderWithDataSize, ..sequence of data]
|
||||
pub fn encode(&self) -> Result<Vec<u8>>
|
||||
where T: Encode
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use crate::binary_coding::{decode, encode, decode_sequence};
|
||||
use crate::storage_engine::{Result, Column};
|
||||
use crate::store::{Result, Column};
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use std::mem::size_of;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,11 +1,12 @@
|
|||
use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence};
|
||||
use crate::storage_engine::{Result, Column};
|
||||
use crate::store::{Result, Column};
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use std::mem::size_of;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StoreHeader {
|
||||
pub table_folder: String, // This one is not encoded into the file
|
||||
pub table_folder: PathBuf, // This one is not encoded into the file
|
||||
|
||||
pub number_of_columns: usize,
|
||||
pub deleted_count: usize,
|
||||
|
|
@ -16,7 +17,7 @@ pub struct StoreHeader {
|
|||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StoreHeaderFixedPart {
|
||||
pub table_folder: String, // This one is not encoded into the file
|
||||
pub table_folder: PathBuf, // This one is not encoded into the file
|
||||
|
||||
pub number_of_columns: usize,
|
||||
pub deleted_count: usize,
|
||||
|
|
@ -63,7 +64,7 @@ impl StoreHeader {
|
|||
vec![0; Self::indexed_columns_size(header.number_of_columns)]
|
||||
}
|
||||
|
||||
pub async fn decode_fixed(table_folder: &str, result: &[u8]) -> Result<StoreHeaderFixedPart> {
|
||||
pub async fn decode_fixed(table_folder: &PathBuf, result: &[u8]) -> Result<StoreHeaderFixedPart> {
|
||||
let (number_of_columns, _) =
|
||||
decode::<usize>(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
|
||||
|
|
@ -77,7 +78,7 @@ impl StoreHeader {
|
|||
decode::<Column>(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?;
|
||||
let header = StoreHeaderFixedPart {
|
||||
table_folder: table_folder.to_string(),
|
||||
table_folder: table_folder.clone(),
|
||||
number_of_columns,
|
||||
deleted_count,
|
||||
total_count,
|
||||
|
|
@ -93,7 +94,7 @@ impl StoreHeader {
|
|||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?;
|
||||
|
||||
Ok(StoreHeader {
|
||||
table_folder: header.table_folder,
|
||||
table_folder: header.table_folder.into(),
|
||||
number_of_columns: header.number_of_columns,
|
||||
deleted_count: header.deleted_count,
|
||||
total_count: header.total_count,
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ use bincode::{Decode, Encode};
|
|||
|
||||
use crate::error::Error;
|
||||
use crate::cursor::{ReadCursor, WriteCursor};
|
||||
use crate::cursor_capabilities::header_access::CursorCanReadHeader;
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::index::Index;
|
||||
|
||||
|
|
@ -36,10 +36,9 @@ pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_i
|
|||
|
||||
impl <T>Store<T> {
|
||||
// ===Creation===
|
||||
pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result<Self>
|
||||
pub async fn new(path_to_table: &Path, number_of_columns: usize, primary_column: Column) -> Result<Self>
|
||||
where T: Encode + Decode + Ord
|
||||
{
|
||||
let path_to_table = Path::new(table_folder);
|
||||
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||
DirBuilder::new()
|
||||
.create(path_to_table).await?;
|
||||
|
|
@ -48,7 +47,7 @@ impl <T>Store<T> {
|
|||
let mut indexed_columns = vec![false; number_of_columns];
|
||||
indexed_columns[primary_column as usize] = true;
|
||||
StoreHeader {
|
||||
table_folder: table_folder.to_string(),
|
||||
table_folder: path_to_table.to_path_buf(),
|
||||
number_of_columns,
|
||||
deleted_count: 0,
|
||||
total_count: 0,
|
||||
|
|
@ -121,7 +120,7 @@ impl <T>Store<T> {
|
|||
Ok(file)
|
||||
}
|
||||
|
||||
pub async fn connect(table_folder: &str) -> Result<Self>
|
||||
pub async fn connect(table_folder: &PathBuf) -> Result<Self>
|
||||
where T: std::fmt::Debug + Encode + Decode + Ord
|
||||
{
|
||||
let path_to_table = Path::new(table_folder);
|
||||
|
|
@ -208,8 +207,8 @@ impl <T>Store<T> {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::segments::entry::Entry;
|
||||
use crate::cursor_capabilities::header_access::CursorCanReadHeader;
|
||||
use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
|
||||
impl <T>Drop for Store<T> {
|
||||
fn drop(&mut self) {
|
||||
|
|
@ -225,7 +224,7 @@ mod tests {
|
|||
async fn test_create() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_0";
|
||||
let table_path = Path::new("test_table_0");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
|
@ -240,7 +239,7 @@ mod tests {
|
|||
async fn test_insert() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_1";
|
||||
let table_path = Path::new("test_table_1");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
|
@ -262,7 +261,7 @@ mod tests {
|
|||
async fn test_select_next() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_2";
|
||||
let table_path = Path::new("test_table_2");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
|
@ -294,7 +293,7 @@ mod tests {
|
|||
async fn test_select_all() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_3";
|
||||
let table_path = Path::new("test_table_3");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
|
@ -329,7 +328,7 @@ mod tests {
|
|||
async fn test_select_eq() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_4";
|
||||
let table_path = Path::new("test_table_4");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
|
@ -369,7 +368,7 @@ mod tests {
|
|||
async fn test_select_eq_indexed() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_5";
|
||||
let table_path = Path::new("test_table_5");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
|
@ -415,7 +414,7 @@ mod tests {
|
|||
async fn test_delete_entry() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_6";
|
||||
let table_path = Path::new("test_table_6");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
|
@ -452,7 +451,7 @@ mod tests {
|
|||
async fn test_delete_where_eq() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_7";
|
||||
let table_path = Path::new("test_table_7");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
|
@ -496,7 +495,7 @@ mod tests {
|
|||
async fn test_garbage_collection() {
|
||||
type Data = u32;
|
||||
|
||||
let table_path = "test_table_8";
|
||||
let table_path = Path::new("test_table_8");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
Loading…
Add table
Add a link
Reference in a new issue