feat: server improvements
This commit is contained in:
parent
595b47dc06
commit
04311ebe48
5 changed files with 80 additions and 110 deletions
|
|
@ -7,6 +7,8 @@ use tokio::net::{TcpListener, TcpStream};
|
|||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use minisql::interpreter::{Response, State};
|
||||
use minisql::operation;
|
||||
use parser::parse_and_validate;
|
||||
use proto::handshake::errors::ServerHandshakeError;
|
||||
use proto::handshake::request::HandshakeRequest;
|
||||
use proto::handshake::response::HandshakeResponse;
|
||||
|
|
@ -19,13 +21,11 @@ use proto::writer::protowriter::{ProtoFlush, ProtoWriter};
|
|||
|
||||
use crate::cancellation::ResetCancelToken;
|
||||
use crate::config::Configuration;
|
||||
use crate::parser_stub::parse_query;
|
||||
use crate::proto_wrapper::{CompleteStatus, ServerProto};
|
||||
|
||||
mod config;
|
||||
mod proto_wrapper;
|
||||
mod cancellation;
|
||||
mod parser_stub;
|
||||
|
||||
type TokenStore = Arc<Mutex<HashMap<(i32, i32), ResetCancelToken>>>;
|
||||
type DbState = Arc<RwLock<State>>;
|
||||
|
|
@ -127,53 +127,14 @@ where
|
|||
break;
|
||||
}
|
||||
FrontendMessage::Query(data) => {
|
||||
println!("Received Query: {:?}", data);
|
||||
let operation = parse_query(data.query.as_str());
|
||||
println!("Parsed query: {:?}", operation);
|
||||
|
||||
let mut state = state.write().await;
|
||||
let result = state.interpret(operation);
|
||||
println!("Result: {:?}", result);
|
||||
|
||||
let result = handle_query(writer, &mut state, data.query.into(), &token).await;
|
||||
match result {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
writer.write_error_message(&format!("Error: {:?}", e)).await?;
|
||||
}
|
||||
Ok(res) => {
|
||||
match res {
|
||||
Response::Deleted(i) => writer.write_command_complete(CompleteStatus::Delete(i)).await?,
|
||||
Response::Inserted => writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await?,
|
||||
Response::Selected(schema, rows) => {
|
||||
let mut rows = rows.lock().await;
|
||||
let first_row = rows.next();
|
||||
match first_row {
|
||||
Some(row) => {
|
||||
writer.write_table_header(&schema, &row).await?;
|
||||
writer.write_table_row(&row).await?;
|
||||
|
||||
let mut sent_rows = 1;
|
||||
while let Some(row) = rows.next() {
|
||||
writer.write_table_row(&row).await?;
|
||||
sent_rows += 1;
|
||||
|
||||
if token.is_canceled() {
|
||||
token.reset();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
writer.write_command_complete(CompleteStatus::Select(sent_rows)).await?;
|
||||
}
|
||||
None => {
|
||||
writer.write_command_complete(CompleteStatus::Select(0)).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
writer.write_error_message(&e.to_string()).await?
|
||||
}
|
||||
}
|
||||
|
||||
writer.write_ready_for_query().await?;
|
||||
}
|
||||
}
|
||||
|
|
@ -182,3 +143,73 @@ where
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_query<W>(writer: &mut W, state: &mut State, query: String, token: &ResetCancelToken) -> anyhow::Result<()>
|
||||
where
|
||||
W: BackendProtoWriter + ProtoFlush + Send,
|
||||
{
|
||||
let metadata = state.metadata();
|
||||
let operation = parse_and_validate(query, &metadata)?;
|
||||
println!("Parsed query: {:?}", operation);
|
||||
|
||||
let Response = state.interpret(operation)?;
|
||||
println!("Result: {:?}", Response);
|
||||
|
||||
match Response {
|
||||
Response::Deleted(i) => writer.write_command_complete(CompleteStatus::Delete(i)).await?,
|
||||
Response::Inserted => writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await?,
|
||||
Response::Selected(schema, mut rows) => {
|
||||
match rows.next() {
|
||||
Some(row) => {
|
||||
writer.write_table_header(&schema, &row).await?;
|
||||
writer.write_table_row(&row).await?;
|
||||
|
||||
let mut sent_rows = 1;
|
||||
for row in rows {
|
||||
sent_rows += 1;
|
||||
writer.write_table_row(&row).await?;
|
||||
if token.is_canceled() {
|
||||
token.reset();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
writer.write_command_complete(CompleteStatus::Select(sent_rows)).await?;
|
||||
}
|
||||
_ => {
|
||||
writer.write_command_complete(CompleteStatus::Select(0)).await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// let mut rows = rows.lock().await;
|
||||
// let first_row = rows.next();
|
||||
// match first_row {
|
||||
// Some(row) => {
|
||||
// writer.write_table_header(&schema, &row).await?;
|
||||
// writer.write_table_row(&row).await?;
|
||||
//
|
||||
// let mut sent_rows = 1;
|
||||
// while let Some(row) = rows.next() {
|
||||
// writer.write_table_row(&row).await?;
|
||||
// sent_rows += 1;
|
||||
//
|
||||
// if token.is_canceled() {
|
||||
// token.reset();
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// writer.write_command_complete(CompleteStatus::Select(sent_rows)).await?;
|
||||
// }
|
||||
// None => {
|
||||
// writer.write_command_complete(CompleteStatus::Select(0)).await?;
|
||||
// }
|
||||
// }
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue