diff --git a/server/src/main.rs b/server/src/main.rs index bcee08d..a71be68 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -7,6 +7,7 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{Mutex, RwLock}; use minisql::interpreter::{Response, State}; +use minisql::operation::Operation; use parser::parse_and_validate; use proto::handshake::errors::ServerHandshakeError; use proto::handshake::request::HandshakeRequest; @@ -27,7 +28,7 @@ mod proto_wrapper; mod cancellation; type TokenStore = Arc>>; -type DbState = Arc>; +type SharedDbState = Arc>; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -53,7 +54,7 @@ async fn main() -> anyhow::Result<()> { } } -async fn handle_stream(mut stream: TcpStream, state: DbState, tokens: TokenStore) -> anyhow::Result<()> { +async fn handle_stream(mut stream: TcpStream, state: SharedDbState, tokens: TokenStore) -> anyhow::Result<()> { let (reader, writer) = stream.split(); let mut writer = ProtoWriter::new(BufWriter::new(writer)); let mut reader = ProtoReader::new(BufReader::new(reader), 1024); @@ -109,7 +110,7 @@ async fn handle_cancellation(pid: i32, key: i32, tokens: &TokenStore) -> anyhow: Ok(()) } -async fn handle_connection(reader: &mut R, writer: &mut W, request: HandshakeRequest, state: DbState, token: ResetCancelToken) -> anyhow::Result<()> +async fn handle_connection(reader: &mut R, writer: &mut W, request: HandshakeRequest, state: SharedDbState, token: ResetCancelToken) -> anyhow::Result<()> where R: FrontendProtoReader + Send, W: BackendProtoWriter + ProtoFlush + Send, @@ -126,8 +127,8 @@ where break; } FrontendMessage::Query(data) => { - let mut state = state.write().await; - let result = handle_query(writer, &mut state, data.query.into(), &token).await; + println!("Received Query: {:?}", data.query); + let result = handle_query(writer, &state, data.query.into(), &token).await; match result { Ok(_) => {} Err(e) => { @@ -143,18 +144,20 @@ where Ok(()) } -async fn handle_query(writer: &mut W, state: &mut State, query: String, token: &ResetCancelToken) -> anyhow::Result<()> +async fn handle_query(writer: &mut W, state: &SharedDbState, query: String, token: &ResetCancelToken) -> anyhow::Result<()> where W: BackendProtoWriter + ProtoFlush + Send, { - let metadata = state.metadata(); - let operation = parse_and_validate(query, &metadata)?; - println!("Parsed query: {:?}", operation); + let operation = { + let state = state.read().await; + let metadata = state.metadata(); + parse_and_validate(query, &metadata)? + }; - let Response = state.interpret(operation)?; - println!("Result: {:?}", Response); + let mut state = state.write().await; + let response = state.interpret(operation)?; - match Response { + match response { Response::Deleted(i) => writer.write_command_complete(CompleteStatus::Delete(i)).await?, Response::Inserted => writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await?, Response::Selected(schema, mut rows) => { @@ -179,33 +182,6 @@ where writer.write_command_complete(CompleteStatus::Select(0)).await?; } } - - - - // let mut rows = rows.lock().await; - // let first_row = rows.next(); - // match first_row { - // Some(row) => { - // writer.write_table_header(&schema, &row).await?; - // writer.write_table_row(&row).await?; - // - // let mut sent_rows = 1; - // while let Some(row) = rows.next() { - // writer.write_table_row(&row).await?; - // sent_rows += 1; - // - // if token.is_canceled() { - // token.reset(); - // break; - // } - // } - // - // writer.write_command_complete(CompleteStatus::Select(sent_rows)).await?; - // } - // None => { - // writer.write_command_complete(CompleteStatus::Select(0)).await?; - // } - // } } _ => {} }