Ensure identifiers start with alphabetical character Rename parse_variable_name -> parse_column_name Add DB value parsers and condition parser placeholder Fix number parser, basic condition parser Move select parser to select module Add create statement parser Move condition parser to common; add delete statement parser Add drop statement parser Add insert parser Add update parser, combine operation parsers into one Add initial validation, fix compiler warnings Validation WIP Allow more spaces in create statement, update TableSchema struct Add create index parser and validator Add todo in parse_identifier Rework the new structure, many other changes
197 lines
5.7 KiB
Rust
197 lines
5.7 KiB
Rust
use minisql::interpreter::State;
|
|
use parser::{parse_and_validate, Error};
|
|
use proto::handshake::response::HandshakeResponse;
|
|
use proto::handshake::server::do_server_handshake;
|
|
use proto::message::backend::{
|
|
BackendMessage, ColumnDescription, CommandCompleteData, DataRowData, ErrorResponseData,
|
|
ReadyForQueryData, RowDescriptionData,
|
|
};
|
|
use proto::message::frontend::FrontendMessage;
|
|
use proto::reader::oneway::OneWayProtoReader;
|
|
use proto::reader::protoreader::ProtoReader;
|
|
use proto::writer::backend::BackendProtoWriter;
|
|
use proto::writer::protowriter::{ProtoFlush, ProtoWriter};
|
|
use tokio::io::{BufReader, BufWriter};
|
|
use tokio::net::{TcpListener, TcpStream};
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
let addr = "0.0.0.0:5432";
|
|
let listener = TcpListener::bind(&addr).await?;
|
|
println!("Server started at {addr}");
|
|
|
|
loop {
|
|
let (socket, _) = listener.accept().await?;
|
|
println!("New client connected: {}", socket.peer_addr()?);
|
|
tokio::spawn(async move {
|
|
let reason = handle_stream(socket).await;
|
|
println!("Client disconnected: {reason:?}");
|
|
});
|
|
}
|
|
}
|
|
|
|
async fn handle_stream(mut stream: TcpStream) -> anyhow::Result<()> {
|
|
let (reader, writer) = stream.split();
|
|
let mut writer = ProtoWriter::new(BufWriter::new(writer));
|
|
let mut reader = ProtoReader::new(BufReader::new(reader), 1024);
|
|
|
|
let response = HandshakeResponse::new("minisql", 123, 123);
|
|
|
|
let request = do_server_handshake(&mut writer, &mut reader, response).await?;
|
|
|
|
println!("Handshake complete:\n{request:?}");
|
|
let mut state = State::new();
|
|
|
|
loop {
|
|
println!("Waiting for next message");
|
|
let next: FrontendMessage = reader.read_proto().await?;
|
|
|
|
match next {
|
|
FrontendMessage::Terminate => {
|
|
println!("Received Terminate");
|
|
break;
|
|
}
|
|
FrontendMessage::Query(data) => {
|
|
println!("Received Query: {:?}", data);
|
|
let metadata = state.metadata();
|
|
match parse_and_validate(data.query.as_str().to_string(), &metadata) {
|
|
Ok(operation) => {
|
|
match state.interpret(operation) {
|
|
Ok(_) => {
|
|
send_query_response(&mut writer).await?;
|
|
}
|
|
Err(err) => {
|
|
send_error_response(&mut writer, &format!("error interpreting: {:?}", err)).await?;
|
|
}
|
|
}
|
|
},
|
|
Err(Error::ParsingError(err)) => {
|
|
send_error_response(&mut writer, &format!("parsing error: {:?}", err)).await?;
|
|
}
|
|
Err(Error::ValidationError(v)) => {
|
|
send_error_response(&mut writer, &format!("validation error: {:?}", v)).await?;
|
|
}
|
|
};
|
|
send_ready_for_query(&mut writer).await?;
|
|
}
|
|
}
|
|
writer.flush().await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_error_response(
|
|
writer: &mut impl BackendProtoWriter,
|
|
error_message: &str,
|
|
) -> anyhow::Result<()> {
|
|
writer
|
|
.write_proto(
|
|
ErrorResponseData {
|
|
code: b'M',
|
|
message: error_message.to_string().into(),
|
|
}
|
|
.into(),
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_ready_for_query(writer: &mut impl BackendProtoWriter) -> anyhow::Result<()> {
|
|
writer
|
|
.write_proto(BackendMessage::from(ReadyForQueryData { status: b'I' }))
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_empty_query(writer: &mut impl BackendProtoWriter) -> anyhow::Result<()> {
|
|
writer
|
|
.write_proto(BackendMessage::EmptyQueryResponse)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_row_description(writer: &mut impl BackendProtoWriter) -> anyhow::Result<()> {
|
|
let columns = vec![
|
|
ColumnDescription {
|
|
name: "id".to_string().into(),
|
|
table_oid: 123,
|
|
column_index: 1,
|
|
type_oid: 23,
|
|
type_size: 4,
|
|
type_modifier: -1,
|
|
format_code: 0,
|
|
},
|
|
ColumnDescription {
|
|
name: "argument".to_string().into(),
|
|
table_oid: 123,
|
|
column_index: 2,
|
|
type_oid: 23,
|
|
type_size: 4,
|
|
type_modifier: -1,
|
|
format_code: 0,
|
|
},
|
|
ColumnDescription {
|
|
name: "description".to_string().into(),
|
|
table_oid: 123,
|
|
column_index: 3,
|
|
type_oid: 1043,
|
|
type_size: 32,
|
|
type_modifier: -1,
|
|
format_code: 0,
|
|
},
|
|
];
|
|
|
|
writer
|
|
.write_proto(
|
|
RowDescriptionData {
|
|
columns: columns.into(),
|
|
}
|
|
.into(),
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_query_response(writer: &mut impl BackendProtoWriter) -> anyhow::Result<()> {
|
|
send_row_description(writer).await?;
|
|
|
|
write_row(writer, b"0", b"1337", b"auto").await?;
|
|
write_row(writer, b"1", b"69", b"bus").await?;
|
|
write_row(writer, b"2", b"420", b"kolo").await?;
|
|
|
|
writer
|
|
.write_proto(
|
|
CommandCompleteData {
|
|
tag: "SELECT 3".to_string().into(),
|
|
}
|
|
.into(),
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn write_row(
|
|
writer: &mut impl BackendProtoWriter,
|
|
first: &[u8],
|
|
second: &[u8],
|
|
third: &[u8],
|
|
) -> anyhow::Result<()> {
|
|
let row_data = vec![
|
|
first.to_vec().into(),
|
|
second.to_vec().into(),
|
|
third.to_vec().into(),
|
|
]
|
|
.into();
|
|
|
|
writer
|
|
.write_proto(DataRowData { columns: row_data }.into())
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|