197 lines
5.7 KiB
Rust
197 lines
5.7 KiB
Rust
use minisql::interpreter::State;
|
|
use parser::{parse_and_validate, Error};
|
|
use proto::handshake::response::HandshakeResponse;
|
|
use proto::handshake::server::do_server_handshake;
|
|
use proto::message::backend::{
|
|
BackendMessage, ColumnDescription, CommandCompleteData, DataRowData, ErrorResponseData,
|
|
ReadyForQueryData, RowDescriptionData,
|
|
};
|
|
use proto::message::frontend::FrontendMessage;
|
|
use proto::reader::oneway::OneWayProtoReader;
|
|
use proto::reader::protoreader::ProtoReader;
|
|
use proto::writer::backend::BackendProtoWriter;
|
|
use proto::writer::protowriter::{ProtoFlush, ProtoWriter};
|
|
use tokio::io::{BufReader, BufWriter};
|
|
use tokio::net::{TcpListener, TcpStream};
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
let addr = "0.0.0.0:5432";
|
|
let listener = TcpListener::bind(&addr).await?;
|
|
println!("Server started at {addr}");
|
|
|
|
loop {
|
|
let (socket, _) = listener.accept().await?;
|
|
println!("New client connected: {}", socket.peer_addr()?);
|
|
tokio::spawn(async move {
|
|
let reason = handle_stream(socket).await;
|
|
println!("Client disconnected: {reason:?}");
|
|
});
|
|
}
|
|
}
|
|
|
|
async fn handle_stream(mut stream: TcpStream) -> anyhow::Result<()> {
|
|
let (reader, writer) = stream.split();
|
|
let mut writer = ProtoWriter::new(BufWriter::new(writer));
|
|
let mut reader = ProtoReader::new(BufReader::new(reader), 1024);
|
|
|
|
let response = HandshakeResponse::new("minisql", 123, 123);
|
|
|
|
let request = do_server_handshake(&mut writer, &mut reader, response).await?;
|
|
|
|
println!("Handshake complete:\n{request:?}");
|
|
let mut state = State::new();
|
|
|
|
loop {
|
|
println!("Waiting for next message");
|
|
let next: FrontendMessage = reader.read_proto().await?;
|
|
|
|
match next {
|
|
FrontendMessage::Terminate => {
|
|
println!("Received Terminate");
|
|
break;
|
|
}
|
|
FrontendMessage::Query(data) => {
|
|
println!("Received Query: {:?}", data);
|
|
let db_schema = state.db_schema();
|
|
match parse_and_validate(data.query.as_str().to_string(), &db_schema) {
|
|
Ok(operation) => {
|
|
match state.interpret(operation) {
|
|
Ok(_) => {
|
|
send_query_response(&mut writer).await?;
|
|
}
|
|
Err(err) => {
|
|
send_error_response(&mut writer, &format!("error interpreting: {:?}", err)).await?;
|
|
}
|
|
}
|
|
},
|
|
Err(Error::ParsingError(err)) => {
|
|
send_error_response(&mut writer, &format!("parsing error: {:?}", err)).await?;
|
|
}
|
|
Err(Error::ValidationError(v)) => {
|
|
send_error_response(&mut writer, &format!("validation error: {:?}", v)).await?;
|
|
}
|
|
};
|
|
send_ready_for_query(&mut writer).await?;
|
|
}
|
|
}
|
|
writer.flush().await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_error_response(
|
|
writer: &mut impl BackendProtoWriter,
|
|
error_message: &str,
|
|
) -> anyhow::Result<()> {
|
|
writer
|
|
.write_proto(
|
|
ErrorResponseData {
|
|
code: b'M',
|
|
message: error_message.to_string().into(),
|
|
}
|
|
.into(),
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_ready_for_query(writer: &mut impl BackendProtoWriter) -> anyhow::Result<()> {
|
|
writer
|
|
.write_proto(BackendMessage::from(ReadyForQueryData { status: b'I' }))
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_empty_query(writer: &mut impl BackendProtoWriter) -> anyhow::Result<()> {
|
|
writer
|
|
.write_proto(BackendMessage::EmptyQueryResponse)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_row_description(writer: &mut impl BackendProtoWriter) -> anyhow::Result<()> {
|
|
let columns = vec![
|
|
ColumnDescription {
|
|
name: "id".to_string().into(),
|
|
table_oid: 123,
|
|
column_index: 1,
|
|
type_oid: 23,
|
|
type_size: 4,
|
|
type_modifier: -1,
|
|
format_code: 0,
|
|
},
|
|
ColumnDescription {
|
|
name: "argument".to_string().into(),
|
|
table_oid: 123,
|
|
column_index: 2,
|
|
type_oid: 23,
|
|
type_size: 4,
|
|
type_modifier: -1,
|
|
format_code: 0,
|
|
},
|
|
ColumnDescription {
|
|
name: "description".to_string().into(),
|
|
table_oid: 123,
|
|
column_index: 3,
|
|
type_oid: 1043,
|
|
type_size: 32,
|
|
type_modifier: -1,
|
|
format_code: 0,
|
|
},
|
|
];
|
|
|
|
writer
|
|
.write_proto(
|
|
RowDescriptionData {
|
|
columns: columns.into(),
|
|
}
|
|
.into(),
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_query_response(writer: &mut impl BackendProtoWriter) -> anyhow::Result<()> {
|
|
send_row_description(writer).await?;
|
|
|
|
write_row(writer, b"0", b"1337", b"auto").await?;
|
|
write_row(writer, b"1", b"69", b"bus").await?;
|
|
write_row(writer, b"2", b"420", b"kolo").await?;
|
|
|
|
writer
|
|
.write_proto(
|
|
CommandCompleteData {
|
|
tag: "SELECT 3".to_string().into(),
|
|
}
|
|
.into(),
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn write_row(
|
|
writer: &mut impl BackendProtoWriter,
|
|
first: &[u8],
|
|
second: &[u8],
|
|
third: &[u8],
|
|
) -> anyhow::Result<()> {
|
|
let row_data = vec![
|
|
first.to_vec().into(),
|
|
second.to_vec().into(),
|
|
third.to_vec().into(),
|
|
]
|
|
.into();
|
|
|
|
writer
|
|
.write_proto(DataRowData { columns: row_data }.into())
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|