diff --git a/Cargo.lock b/Cargo.lock index 5a3a6e1..ed45328 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -520,6 +520,7 @@ dependencies = [ "async-trait", "clap", "minisql", + "parser", "proto", "rand", "tokio", diff --git a/minisql/src/interpreter.rs b/minisql/src/interpreter.rs index 90e4f02..0be4310 100644 --- a/minisql/src/interpreter.rs +++ b/minisql/src/interpreter.rs @@ -23,7 +23,7 @@ pub struct State { // #[derive(Debug)] pub enum Response<'a> { - Selected(&'a TableSchema, Arc + 'a + Send>>), + Selected(&'a TableSchema, Box + 'a + Send>), Inserted, Deleted(usize), // how many were deleted TableCreated, @@ -112,7 +112,7 @@ impl State { let selected_rows = match maybe_condition { None => { let x = table.select_all_rows(selected_column_positions); - Arc::new(Mutex::new(x)) as Arc + 'a + Send>> + Box::new(x) as Box + 'a + Send> }, Some(Condition::Eq(eq_column_name, value)) => { @@ -125,7 +125,7 @@ impl State { eq_column_position, value, )?; - Arc::new(Mutex::new(x)) as Arc + 'a + Send>> + Box::new(x) as Box + 'a + Send> } }; diff --git a/server/Cargo.toml b/server/Cargo.toml index 8d8c52c..6a511f6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -13,3 +13,4 @@ async-trait = "0.1.74" rand = "0.8.5" minisql = { path = "../minisql" } proto = { path = "../proto" } +parser = { path = "../parser" } \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index b35c738..3588cd1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -7,6 +7,8 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{Mutex, RwLock}; use minisql::interpreter::{Response, State}; +use minisql::operation; +use parser::parse_and_validate; use proto::handshake::errors::ServerHandshakeError; use proto::handshake::request::HandshakeRequest; use proto::handshake::response::HandshakeResponse; @@ -19,13 +21,11 @@ use proto::writer::protowriter::{ProtoFlush, ProtoWriter}; use crate::cancellation::ResetCancelToken; use crate::config::Configuration; -use crate::parser_stub::parse_query; use crate::proto_wrapper::{CompleteStatus, ServerProto}; mod config; mod proto_wrapper; mod cancellation; -mod parser_stub; type TokenStore = Arc>>; type DbState = Arc>; @@ -127,53 +127,14 @@ where break; } FrontendMessage::Query(data) => { - println!("Received Query: {:?}", data); - let operation = parse_query(data.query.as_str()); - println!("Parsed query: {:?}", operation); - let mut state = state.write().await; - let result = state.interpret(operation); - println!("Result: {:?}", result); - + let result = handle_query(writer, &mut state, data.query.into(), &token).await; match result { + Ok(_) => {} Err(e) => { - writer.write_error_message(&format!("Error: {:?}", e)).await?; - } - Ok(res) => { - match res { - Response::Deleted(i) => writer.write_command_complete(CompleteStatus::Delete(i)).await?, - Response::Inserted => writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await?, - Response::Selected(schema, rows) => { - let mut rows = rows.lock().await; - let first_row = rows.next(); - match first_row { - Some(row) => { - writer.write_table_header(&schema, &row).await?; - writer.write_table_row(&row).await?; - - let mut sent_rows = 1; - while let Some(row) = rows.next() { - writer.write_table_row(&row).await?; - sent_rows += 1; - - if token.is_canceled() { - token.reset(); - break; - } - } - - writer.write_command_complete(CompleteStatus::Select(sent_rows)).await?; - } - None => { - writer.write_command_complete(CompleteStatus::Select(0)).await?; - } - } - } - _ => {} - } + writer.write_error_message(&e.to_string()).await? } } - writer.write_ready_for_query().await?; } } @@ -182,3 +143,73 @@ where Ok(()) } + +async fn handle_query(writer: &mut W, state: &mut State, query: String, token: &ResetCancelToken) -> anyhow::Result<()> +where + W: BackendProtoWriter + ProtoFlush + Send, +{ + let metadata = state.metadata(); + let operation = parse_and_validate(query, &metadata)?; + println!("Parsed query: {:?}", operation); + + let Response = state.interpret(operation)?; + println!("Result: {:?}", Response); + + match Response { + Response::Deleted(i) => writer.write_command_complete(CompleteStatus::Delete(i)).await?, + Response::Inserted => writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await?, + Response::Selected(schema, mut rows) => { + match rows.next() { + Some(row) => { + writer.write_table_header(&schema, &row).await?; + writer.write_table_row(&row).await?; + + let mut sent_rows = 1; + for row in rows { + sent_rows += 1; + writer.write_table_row(&row).await?; + if token.is_canceled() { + token.reset(); + break; + } + } + + writer.write_command_complete(CompleteStatus::Select(sent_rows)).await?; + } + _ => { + writer.write_command_complete(CompleteStatus::Select(0)).await?; + } + } + + + + // let mut rows = rows.lock().await; + // let first_row = rows.next(); + // match first_row { + // Some(row) => { + // writer.write_table_header(&schema, &row).await?; + // writer.write_table_row(&row).await?; + // + // let mut sent_rows = 1; + // while let Some(row) = rows.next() { + // writer.write_table_row(&row).await?; + // sent_rows += 1; + // + // if token.is_canceled() { + // token.reset(); + // break; + // } + // } + // + // writer.write_command_complete(CompleteStatus::Select(sent_rows)).await?; + // } + // None => { + // writer.write_command_complete(CompleteStatus::Select(0)).await?; + // } + // } + } + _ => {} + } + + Ok(()) +} diff --git a/server/src/parser_stub.rs b/server/src/parser_stub.rs deleted file mode 100644 index c8f3e6a..0000000 --- a/server/src/parser_stub.rs +++ /dev/null @@ -1,63 +0,0 @@ -use minisql::operation::{ColumnSelection, Operation}; -use minisql::schema::TableSchema; -use minisql::type_system::{DbType, IndexableValue, Value}; - -const TABLE_NAME: &'static str = "tablus"; - -static mut ID_COUNTER: u64 = 0; - -pub fn parse_query(query: &str) -> Operation { - if query.contains("select") { - if query.contains("*") { - Operation::Select(TABLE_NAME.to_string(), ColumnSelection::All, None) - } else { - Operation::Select(TABLE_NAME.to_string(), ColumnSelection::Columns(vec![ - "name".to_string(), - "price".to_string(), - ]), None) - } - } else if query.contains("insert") { - - let id = unsafe { - ID_COUNTER += 1; - ID_COUNTER - }; - - let rand_rak = rand::random::(); - let rand_price = rand::random::(); - - Operation::Insert(TABLE_NAME.to_string(), vec![ - ("id".to_string(), Value::Indexable(IndexableValue::Uuid(id))), - ("name".to_string(), Value::Indexable(IndexableValue::String(format!("Car {}", rand_rak)))), - ("price".to_string(), Value::Number(rand_price)), - ("mileage".to_string(), Value::Indexable(IndexableValue::Int(1234))), - ]) - } else if query.contains("delete") { - Operation::Delete(TABLE_NAME.to_string(), None) - } else if query.contains("create table") { - Operation::CreateTable(TABLE_NAME.to_string(), get_cars_schema()) - } else if query.contains("create index") { - Operation::CreateIndex(TABLE_NAME.to_string(), "price".to_string()) - } else { - panic!("Unknown query: {}", query); - } -} - -fn get_cars_schema() -> TableSchema { - TableSchema::new( - "cars".to_string(), - 0, - vec![ - ("id".to_string(), 0), - ("name".to_string(), 1), - ("price".to_string(), 2), - ("mileage".to_string(), 3), - ], - vec![ - DbType::Uuid, - DbType::String, - DbType::Number, - DbType::Int, - ] - ) -} \ No newline at end of file