feat: server cleanup
This commit is contained in:
parent
e87b11f19f
commit
76a5be0b79
1 changed files with 15 additions and 39 deletions
|
|
@ -7,6 +7,7 @@ use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio::sync::{Mutex, RwLock};
|
use tokio::sync::{Mutex, RwLock};
|
||||||
|
|
||||||
use minisql::interpreter::{Response, State};
|
use minisql::interpreter::{Response, State};
|
||||||
|
use minisql::operation::Operation;
|
||||||
use parser::parse_and_validate;
|
use parser::parse_and_validate;
|
||||||
use proto::handshake::errors::ServerHandshakeError;
|
use proto::handshake::errors::ServerHandshakeError;
|
||||||
use proto::handshake::request::HandshakeRequest;
|
use proto::handshake::request::HandshakeRequest;
|
||||||
|
|
@ -27,7 +28,7 @@ mod proto_wrapper;
|
||||||
mod cancellation;
|
mod cancellation;
|
||||||
|
|
||||||
type TokenStore = Arc<Mutex<HashMap<(i32, i32), ResetCancelToken>>>;
|
type TokenStore = Arc<Mutex<HashMap<(i32, i32), ResetCancelToken>>>;
|
||||||
type DbState = Arc<RwLock<State>>;
|
type SharedDbState = Arc<RwLock<State>>;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
|
@ -53,7 +54,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_stream(mut stream: TcpStream, state: DbState, tokens: TokenStore) -> anyhow::Result<()> {
|
async fn handle_stream(mut stream: TcpStream, state: SharedDbState, tokens: TokenStore) -> anyhow::Result<()> {
|
||||||
let (reader, writer) = stream.split();
|
let (reader, writer) = stream.split();
|
||||||
let mut writer = ProtoWriter::new(BufWriter::new(writer));
|
let mut writer = ProtoWriter::new(BufWriter::new(writer));
|
||||||
let mut reader = ProtoReader::new(BufReader::new(reader), 1024);
|
let mut reader = ProtoReader::new(BufReader::new(reader), 1024);
|
||||||
|
|
@ -109,7 +110,7 @@ async fn handle_cancellation(pid: i32, key: i32, tokens: &TokenStore) -> anyhow:
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_connection<R, W>(reader: &mut R, writer: &mut W, request: HandshakeRequest, state: DbState, token: ResetCancelToken) -> anyhow::Result<()>
|
async fn handle_connection<R, W>(reader: &mut R, writer: &mut W, request: HandshakeRequest, state: SharedDbState, token: ResetCancelToken) -> anyhow::Result<()>
|
||||||
where
|
where
|
||||||
R: FrontendProtoReader + Send,
|
R: FrontendProtoReader + Send,
|
||||||
W: BackendProtoWriter + ProtoFlush + Send,
|
W: BackendProtoWriter + ProtoFlush + Send,
|
||||||
|
|
@ -126,8 +127,8 @@ where
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
FrontendMessage::Query(data) => {
|
FrontendMessage::Query(data) => {
|
||||||
let mut state = state.write().await;
|
println!("Received Query: {:?}", data.query);
|
||||||
let result = handle_query(writer, &mut state, data.query.into(), &token).await;
|
let result = handle_query(writer, &state, data.query.into(), &token).await;
|
||||||
match result {
|
match result {
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
@ -143,18 +144,20 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_query<W>(writer: &mut W, state: &mut State, query: String, token: &ResetCancelToken) -> anyhow::Result<()>
|
async fn handle_query<W>(writer: &mut W, state: &SharedDbState, query: String, token: &ResetCancelToken) -> anyhow::Result<()>
|
||||||
where
|
where
|
||||||
W: BackendProtoWriter + ProtoFlush + Send,
|
W: BackendProtoWriter + ProtoFlush + Send,
|
||||||
{
|
{
|
||||||
|
let operation = {
|
||||||
|
let state = state.read().await;
|
||||||
let metadata = state.metadata();
|
let metadata = state.metadata();
|
||||||
let operation = parse_and_validate(query, &metadata)?;
|
parse_and_validate(query, &metadata)?
|
||||||
println!("Parsed query: {:?}", operation);
|
};
|
||||||
|
|
||||||
let Response = state.interpret(operation)?;
|
let mut state = state.write().await;
|
||||||
println!("Result: {:?}", Response);
|
let response = state.interpret(operation)?;
|
||||||
|
|
||||||
match Response {
|
match response {
|
||||||
Response::Deleted(i) => writer.write_command_complete(CompleteStatus::Delete(i)).await?,
|
Response::Deleted(i) => writer.write_command_complete(CompleteStatus::Delete(i)).await?,
|
||||||
Response::Inserted => writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await?,
|
Response::Inserted => writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await?,
|
||||||
Response::Selected(schema, mut rows) => {
|
Response::Selected(schema, mut rows) => {
|
||||||
|
|
@ -179,33 +182,6 @@ where
|
||||||
writer.write_command_complete(CompleteStatus::Select(0)).await?;
|
writer.write_command_complete(CompleteStatus::Select(0)).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// let mut rows = rows.lock().await;
|
|
||||||
// let first_row = rows.next();
|
|
||||||
// match first_row {
|
|
||||||
// Some(row) => {
|
|
||||||
// writer.write_table_header(&schema, &row).await?;
|
|
||||||
// writer.write_table_row(&row).await?;
|
|
||||||
//
|
|
||||||
// let mut sent_rows = 1;
|
|
||||||
// while let Some(row) = rows.next() {
|
|
||||||
// writer.write_table_row(&row).await?;
|
|
||||||
// sent_rows += 1;
|
|
||||||
//
|
|
||||||
// if token.is_canceled() {
|
|
||||||
// token.reset();
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// writer.write_command_complete(CompleteStatus::Select(sent_rows)).await?;
|
|
||||||
// }
|
|
||||||
// None => {
|
|
||||||
// writer.write_command_complete(CompleteStatus::Select(0)).await?;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue