feat: connect server to the new interpreter
This commit is contained in:
parent
9b9f9f16f6
commit
f5d45f6a1d
6 changed files with 70 additions and 145 deletions
|
|
@ -8,9 +8,9 @@ use crate::internals::row::Row;
|
|||
|
||||
use bimap::BiMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use std::rc::Rc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs;
|
||||
|
||||
|
|
@ -25,7 +25,7 @@ const METADATA_FILE: &'static str = "metadata.json";
|
|||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct State {
|
||||
table_name_position_mapping: BiMap<TableName, TablePosition>,
|
||||
table_schemas: Vec<Rc<TableSchema>>,
|
||||
table_schemas: Vec<Arc<TableSchema>>,
|
||||
#[serde(skip)]
|
||||
tables: Tables,
|
||||
}
|
||||
|
|
@ -39,11 +39,11 @@ pub type Tables = Vec<RwLock<Table>>;
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct Table {
|
||||
schema: Rc<TableSchema>,
|
||||
schema: Arc<TableSchema>,
|
||||
store: Store<Value>
|
||||
}
|
||||
|
||||
pub type DbSchema = Vec<(TableName, TablePosition, Rc<TableSchema>)>;
|
||||
pub type DbSchema = Vec<(TableName, TablePosition, Arc<TableSchema>)>;
|
||||
// To satisfy clippy.
|
||||
impl Default for State {
|
||||
fn default() -> Self {
|
||||
|
|
@ -61,13 +61,13 @@ impl Table {
|
|||
let store: Store<Value> = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let table = Self {
|
||||
schema: Rc::new(table_schema),
|
||||
schema: Arc::new(table_schema),
|
||||
store,
|
||||
};
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
async fn connect(table_schema: Rc<TableSchema>, db_path: &Path) -> DbResult<Self> {
|
||||
async fn connect(table_schema: Arc<TableSchema>, db_path: &Path) -> DbResult<Self> {
|
||||
let table_folder_name = table_schema.table_name();
|
||||
let path_to_table_folder = db_path.join(table_folder_name);
|
||||
|
||||
|
|
@ -90,7 +90,7 @@ impl Table {
|
|||
Ok(cursor)
|
||||
}
|
||||
|
||||
pub fn schema(&self) -> Rc<TableSchema> {
|
||||
pub fn schema(&self) -> Arc<TableSchema> {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
|
|
@ -111,7 +111,7 @@ impl State {
|
|||
pub fn db_schema(&self) -> DbSchema {
|
||||
let mut schema: DbSchema = Vec::new();
|
||||
for (table_name, &table_position) in &self.table_name_position_mapping {
|
||||
let table_schema: Rc<TableSchema> = self.table_schemas[table_position].clone();
|
||||
let table_schema: Arc<TableSchema> = self.table_schemas[table_position].clone();
|
||||
schema.push((table_name.clone(), table_position, table_schema));
|
||||
}
|
||||
schema
|
||||
|
|
@ -160,6 +160,11 @@ impl State {
|
|||
}
|
||||
|
||||
impl StateHandler {
|
||||
pub fn is_existing_db(db_path: &PathBuf) -> bool {
|
||||
db_path.exists() && db_path.is_dir() &&
|
||||
db_path.join(METADATA_FILE).exists() && db_path.join(METADATA_FILE).is_file()
|
||||
}
|
||||
|
||||
pub async fn new(db_path: PathBuf) -> DbResult<Self> {
|
||||
fs::create_dir(db_path.clone()).await.map_err(|e| RuntimeError::IoError(e))?;
|
||||
|
||||
|
|
@ -195,6 +200,10 @@ impl StateHandler {
|
|||
fs::write(metadata_file, metadata_raw).await.map_err(|e| RuntimeError::IoError(e))
|
||||
}
|
||||
|
||||
pub async fn read_state(&self) -> RwLockReadGuard<State> {
|
||||
self.state.read().await
|
||||
}
|
||||
|
||||
pub async fn interpret<Writer: ResponseWriter>(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> {
|
||||
use Operation::*;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use crate::syntax::RawQuerySyntax;
|
||||
use minisql::{interpreter::DbSchema, operation::Operation};
|
||||
use minisql::{interpreter2::DbSchema, operation::Operation};
|
||||
use nom::{branch::alt, character::complete::{multispace0, char}, multi::many1, sequence::{delimited, terminated}, IResult};
|
||||
use thiserror::Error;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
use std::collections::{BTreeMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::parsing::literal::Literal;
|
||||
|
|
@ -6,7 +7,7 @@ use crate::syntax;
|
|||
use crate::syntax::{ColumnSchema, RawQuerySyntax, RawTableSchema};
|
||||
use minisql::operation;
|
||||
use minisql::{
|
||||
interpreter::DbSchema,
|
||||
interpreter2::DbSchema,
|
||||
operation::Operation,
|
||||
schema::{Column, ColumnName, TableName, TablePosition, TableSchema},
|
||||
type_system::DbType,
|
||||
|
|
@ -63,15 +64,15 @@ pub fn validate_operation(
|
|||
}
|
||||
}
|
||||
|
||||
fn validate_table_exists<'a>(
|
||||
db_schema: &DbSchema<'a>,
|
||||
table_name: &'a TableName,
|
||||
) -> Result<(TablePosition, &'a TableSchema), ValidationError> {
|
||||
fn validate_table_exists(
|
||||
db_schema: &DbSchema,
|
||||
table_name: &TableName,
|
||||
) -> Result<(TablePosition, Arc<TableSchema>), ValidationError> {
|
||||
db_schema
|
||||
.iter()
|
||||
.find(|(tname, _, _)| table_name.eq(tname))
|
||||
.ok_or(ValidationError::TableDoesNotExist(table_name.to_string()))
|
||||
.map(|(_, table_position, table_schema)| (*table_position, *table_schema))
|
||||
.map(|(_, table_position, table_schema)| (*table_position, table_schema.clone()))
|
||||
}
|
||||
|
||||
fn validate_create_table(
|
||||
|
|
@ -167,7 +168,7 @@ fn validate_select(
|
|||
.iter()
|
||||
.filter_map(|column_name| schema.get_column(column_name))
|
||||
.collect();
|
||||
let validated_condition = validate_condition(condition, schema)?;
|
||||
let validated_condition = validate_condition(condition, &schema)?;
|
||||
Ok(Operation::Select(
|
||||
table_position,
|
||||
selection,
|
||||
|
|
@ -178,7 +179,7 @@ fn validate_select(
|
|||
}
|
||||
}
|
||||
syntax::ColumnSelection::All => {
|
||||
let validated_condition = validate_condition(condition, schema)?;
|
||||
let validated_condition = validate_condition(condition, &schema)?;
|
||||
Ok(Operation::Select(
|
||||
table_position,
|
||||
schema.all_selection(),
|
||||
|
|
@ -267,13 +268,13 @@ fn validate_delete(
|
|||
db_schema: &DbSchema,
|
||||
) -> Result<Operation, ValidationError> {
|
||||
let (table_position, schema) = validate_table_exists(db_schema, &table_name)?;
|
||||
let validated_condition = validate_condition(condition, schema)?;
|
||||
let validated_condition = validate_condition(condition, &schema)?;
|
||||
Ok(Operation::Delete(table_position, validated_condition))
|
||||
}
|
||||
|
||||
fn validate_condition(
|
||||
condition: Option<syntax::Condition>,
|
||||
schema: &TableSchema,
|
||||
schema: &Arc<TableSchema>,
|
||||
) -> Result<Option<operation::Condition>, ValidationError> {
|
||||
match condition {
|
||||
Some(condition) => match condition {
|
||||
|
|
@ -338,14 +339,14 @@ where
|
|||
None
|
||||
}
|
||||
|
||||
fn get_table_schema<'a>(
|
||||
db_schema: &DbSchema<'a>,
|
||||
table_name: &'a TableName,
|
||||
) -> Option<&'a TableSchema> {
|
||||
fn get_table_schema(
|
||||
db_schema: &DbSchema,
|
||||
table_name: &TableName,
|
||||
) -> Option<Arc<TableSchema>> {
|
||||
let (_, _, table_schema) = db_schema
|
||||
.iter()
|
||||
.find(|(tname, _, _)| table_name.eq(tname))?;
|
||||
Some(table_schema)
|
||||
Some(table_schema.clone())
|
||||
}
|
||||
|
||||
fn literal_to_value(lit: Literal, hint: &DbType) -> Value {
|
||||
|
|
@ -428,11 +429,11 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
fn db_schema(users_schema: &TableSchema) -> DbSchema {
|
||||
vec![("users".to_string(), 0, users_schema)]
|
||||
fn db_schema(users_schema: Arc<TableSchema>) -> DbSchema {
|
||||
vec![("users".to_string(), 0, users_schema.clone())]
|
||||
}
|
||||
|
||||
fn empty_db_schema() -> DbSchema<'static> {
|
||||
fn empty_db_schema() -> DbSchema {
|
||||
vec![]
|
||||
}
|
||||
|
||||
|
|
@ -547,7 +548,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_create_already_exists() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = CreateTable(raw_users_schema());
|
||||
let result = validate_operation(syntax, &db_schema);
|
||||
|
|
@ -561,7 +562,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_basic() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
let users_position = 0;
|
||||
let id = 0;
|
||||
let name = 1;
|
||||
|
|
@ -583,7 +584,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_non_existent_table() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax =
|
||||
Select("does_not_exist".to_string(), ColumnSelection::All, None);
|
||||
|
|
@ -594,7 +595,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_eq() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let users_position = 0;
|
||||
let id = 0;
|
||||
|
|
@ -622,7 +623,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_eq_columns_selection() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let users_position = 0;
|
||||
let name = 1;
|
||||
|
|
@ -652,7 +653,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_eq_columns_selection_nonexistent_column_selected() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = Select(
|
||||
"users".to_string(),
|
||||
|
|
@ -666,7 +667,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_eq_non_existent_column() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = Select(
|
||||
"users".to_string(),
|
||||
|
|
@ -680,7 +681,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_select_eq_type_error() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = Select(
|
||||
"users".to_string(),
|
||||
|
|
@ -695,7 +696,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_insert() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let users_position = 0;
|
||||
|
||||
|
|
@ -732,7 +733,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_insert_non_existent_column() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = Insert(
|
||||
"users".to_string(),
|
||||
|
|
@ -750,7 +751,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_insert_ill_typed_column() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = Insert(
|
||||
"users".to_string(),
|
||||
|
|
@ -768,7 +769,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_delete_all() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let users_position = 0;
|
||||
|
||||
|
|
@ -786,7 +787,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_delete_eq() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let users_position = 0;
|
||||
let age = 2;
|
||||
|
|
@ -816,7 +817,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_create_index() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let users_position = 0;
|
||||
let age = 2;
|
||||
|
|
@ -836,7 +837,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_create_index_nonexistent_column() {
|
||||
let users_schema: TableSchema = users_schema();
|
||||
let db_schema: DbSchema = db_schema(&users_schema);
|
||||
let db_schema: DbSchema = db_schema(Arc::new(users_schema));
|
||||
|
||||
let syntax: RawQuerySyntax = CreateIndex("users".to_string(), "does_not_exist".to_string());
|
||||
let result = validate_operation(syntax, &db_schema);
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@ pub struct Configuration {
|
|||
help = "Port for the server to listen on"
|
||||
)]
|
||||
port: u16,
|
||||
#[arg(short, long, help = "Path to the data file")]
|
||||
file: PathBuf,
|
||||
#[arg(short, long, help = "Path to the folder for database data")]
|
||||
folder: PathBuf,
|
||||
#[arg(short, long, help = "Delay between rows in milliseconds")]
|
||||
throttle: Option<u64>,
|
||||
}
|
||||
|
|
@ -30,8 +30,8 @@ impl Configuration {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_file_path(&self) -> &PathBuf {
|
||||
&self.file
|
||||
pub fn get_folder_path(&self) -> &PathBuf {
|
||||
&self.folder
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
|
|
|||
|
|
@ -1,14 +1,13 @@
|
|||
use std::collections::HashMap;
|
||||
use std::io::ErrorKind;
|
||||
use std::sync::Arc;
|
||||
|
||||
use clap::Parser;
|
||||
use tokio::io::{BufReader, BufWriter};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use minisql::interpreter::{Response, State};
|
||||
use minisql::response_writer::{CompleteStatus, ResponseWriter};
|
||||
use minisql::interpreter2::StateHandler;
|
||||
use minisql::response_writer::ResponseWriter;
|
||||
use parser::parse_and_validate;
|
||||
use proto::handshake::errors::ServerHandshakeError;
|
||||
use proto::handshake::request::HandshakeRequest;
|
||||
|
|
@ -22,23 +21,21 @@ use proto::writer::protowriter::{ProtoFlush, ProtoWriter};
|
|||
|
||||
use crate::cancellation::ResetCancelToken;
|
||||
use crate::config::Configuration;
|
||||
use crate::persistence::state_to_file;
|
||||
use crate::proto_wrapper::ServerProtoWrapper;
|
||||
|
||||
mod cancellation;
|
||||
mod config;
|
||||
mod persistence;
|
||||
mod proto_wrapper;
|
||||
|
||||
type TokenStore = Arc<Mutex<HashMap<(i32, i32), ResetCancelToken>>>;
|
||||
type SharedDbState = Arc<RwLock<State>>;
|
||||
type SharedDbState = Arc<StateHandler>;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let config = Configuration::parse();
|
||||
let config = Arc::new(config);
|
||||
|
||||
let state = Arc::new(RwLock::new(get_state(&config).await?));
|
||||
let state = Arc::new(get_state(&config).await?);
|
||||
let tokens = Arc::new(Mutex::new(HashMap::<(i32, i32), ResetCancelToken>::new()));
|
||||
|
||||
let addr = config.get_socket_address();
|
||||
|
|
@ -59,15 +56,12 @@ async fn main() -> anyhow::Result<()> {
|
|||
}
|
||||
}
|
||||
|
||||
async fn get_state(config: &Configuration) -> anyhow::Result<State> {
|
||||
let result = persistence::state_from_file(config.get_file_path()).await;
|
||||
match result {
|
||||
Err(ref e) if e.kind() == ErrorKind::NotFound => {
|
||||
println!("WARNING: No DB state file found, creating new one");
|
||||
Ok(State::new())
|
||||
}
|
||||
Err(e) => Err(e)?,
|
||||
Ok(state) => Ok(state),
|
||||
async fn get_state(config: &Configuration) -> anyhow::Result<StateHandler> {
|
||||
let path = config.get_folder_path();
|
||||
if StateHandler::is_existing_db(&path) {
|
||||
Ok(StateHandler::connect(path.clone()).await?)
|
||||
} else {
|
||||
Ok(StateHandler::new(path.clone()).await?)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -191,75 +185,11 @@ where
|
|||
token.reset();
|
||||
|
||||
let operation = {
|
||||
let state = state.read().await;
|
||||
let db_schema = state.db_schema();
|
||||
let db_schema = state.read_state().await.db_schema();
|
||||
parse_and_validate(query, &db_schema)?
|
||||
};
|
||||
|
||||
let need_write = {
|
||||
let mut state = state.write().await;
|
||||
let response = state.interpret(operation)?;
|
||||
|
||||
match response {
|
||||
Response::Deleted(i) => {
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::Delete(i))
|
||||
.await?;
|
||||
true
|
||||
}
|
||||
Response::Inserted => {
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 })
|
||||
.await?;
|
||||
true
|
||||
}
|
||||
Response::Selected(schema, columns, mut rows) => {
|
||||
writer.write_table_header(schema, &columns).await?;
|
||||
match rows.next() {
|
||||
Some(row) => {
|
||||
writer.write_table_row(&row).await?;
|
||||
|
||||
let mut sent_rows = 1;
|
||||
for row in rows {
|
||||
sent_rows += 1;
|
||||
writer.write_table_row(&row).await?;
|
||||
if token.is_canceled() {
|
||||
token.reset();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::Select(sent_rows))
|
||||
.await?;
|
||||
}
|
||||
_ => {
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::Select(0))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
Response::TableCreated => {
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::CreateTable)
|
||||
.await?;
|
||||
true
|
||||
}
|
||||
Response::IndexCreated => {
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::CreateIndex)
|
||||
.await?;
|
||||
true
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if need_write {
|
||||
let state = state.read().await;
|
||||
state_to_file(&state, config.get_file_path()).await?;
|
||||
}
|
||||
|
||||
// TODO: PASS DOWN RESET CANCEL TOKEN
|
||||
state.interpret(writer, operation).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,15 +0,0 @@
|
|||
use minisql::interpreter::State;
|
||||
use std::path::PathBuf;
|
||||
use tokio::{fs, io};
|
||||
|
||||
pub async fn state_from_file(path: &PathBuf) -> io::Result<State> {
|
||||
let content = fs::read_to_string(path).await?;
|
||||
let state = serde_json::from_str(&content)?;
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
pub async fn state_to_file(state: &State, path: &PathBuf) -> io::Result<()> {
|
||||
let content = serde_json::to_string(state)?;
|
||||
fs::write(path, content).await?;
|
||||
Ok(())
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue