feat(proto): add example server
This commit is contained in:
parent
0318169876
commit
84d9fa2d50
4 changed files with 388 additions and 3 deletions
184
server/src/main.rs
Normal file
184
server/src/main.rs
Normal file
|
|
@ -0,0 +1,184 @@
|
|||
use proto::handshake::response::HandshakeResponse;
|
||||
use proto::handshake::server::do_server_handshake;
|
||||
use proto::message::backend::{
|
||||
BackendMessage, ColumnDescription, CommandCompleteData, DataRowData, ErrorResponseData,
|
||||
ReadyForQueryData, RowDescriptionData,
|
||||
};
|
||||
use proto::message::frontend::FrontendMessage;
|
||||
use proto::reader::oneway::OneWayProtoReader;
|
||||
use proto::reader::protoreader::ProtoReader;
|
||||
use proto::writer::backend::BackendProtoWriter;
|
||||
use proto::writer::protowriter::{ProtoFlush, ProtoWriter};
|
||||
use tokio::io::{BufReader, BufWriter};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let addr = "0.0.0.0:5432";
|
||||
let listener = TcpListener::bind(&addr).await?;
|
||||
println!("Server started at {addr}");
|
||||
|
||||
loop {
|
||||
let (socket, _) = listener.accept().await?;
|
||||
println!("New client connected: {}", socket.peer_addr()?);
|
||||
tokio::spawn(async move {
|
||||
let reason = handle_stream(socket).await;
|
||||
println!("Client disconnected: {reason:?}");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_stream(mut stream: TcpStream) -> anyhow::Result<()> {
|
||||
let (reader, writer) = stream.split();
|
||||
let mut writer = ProtoWriter::new(BufWriter::new(writer));
|
||||
let mut reader = ProtoReader::new(BufReader::new(reader), 1024);
|
||||
|
||||
let response = HandshakeResponse::new("minisql", 123, 123);
|
||||
let request = do_server_handshake(&mut writer, &mut reader, response).await?;
|
||||
|
||||
println!("Handshake complete:\n{request:?}");
|
||||
|
||||
loop {
|
||||
println!("Waiting for next message");
|
||||
let next: FrontendMessage = reader.read_proto().await?;
|
||||
|
||||
match next {
|
||||
FrontendMessage::Terminate => {
|
||||
println!("Received Terminate");
|
||||
break;
|
||||
}
|
||||
FrontendMessage::Query(data) => {
|
||||
println!("Received Query: {:?}", data);
|
||||
if data.query.as_str().contains("car") {
|
||||
println!("Sending error message");
|
||||
send_error_response(&mut writer, "Car not found").await?;
|
||||
} else if data.query.as_str().to_lowercase().contains("select") {
|
||||
println!("Sending table");
|
||||
send_query_repsonse(&mut writer).await?;
|
||||
} else {
|
||||
println!("Sending empty query");
|
||||
send_empty_query(&mut writer).await?;
|
||||
}
|
||||
send_ready_for_query(&mut writer).await?;
|
||||
}
|
||||
}
|
||||
writer.flush().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_error_response(
|
||||
writer: &mut impl BackendProtoWriter,
|
||||
error_message: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
writer
|
||||
.write_proto(
|
||||
ErrorResponseData {
|
||||
code: b'M',
|
||||
message: error_message.to_string().into(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_ready_for_query(writer: &mut impl BackendProtoWriter) -> anyhow::Result<()> {
|
||||
writer
|
||||
.write_proto(BackendMessage::from(ReadyForQueryData { status: b'I' }))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_empty_query(writer: &mut impl BackendProtoWriter) -> anyhow::Result<()> {
|
||||
writer
|
||||
.write_proto(BackendMessage::EmptyQueryResponse)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_row_description(writer: &mut impl BackendProtoWriter) -> anyhow::Result<()> {
|
||||
let columns = vec![
|
||||
ColumnDescription {
|
||||
name: "id".to_string().into(),
|
||||
table_oid: 123,
|
||||
column_index: 1,
|
||||
type_oid: 23,
|
||||
type_size: 4,
|
||||
type_modifier: -1,
|
||||
format_code: 0,
|
||||
},
|
||||
ColumnDescription {
|
||||
name: "argument".to_string().into(),
|
||||
table_oid: 123,
|
||||
column_index: 2,
|
||||
type_oid: 23,
|
||||
type_size: 4,
|
||||
type_modifier: -1,
|
||||
format_code: 0,
|
||||
},
|
||||
ColumnDescription {
|
||||
name: "description".to_string().into(),
|
||||
table_oid: 123,
|
||||
column_index: 3,
|
||||
type_oid: 1043,
|
||||
type_size: 32,
|
||||
type_modifier: -1,
|
||||
format_code: 0,
|
||||
},
|
||||
];
|
||||
|
||||
writer
|
||||
.write_proto(
|
||||
RowDescriptionData {
|
||||
columns: columns.into(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_query_repsonse(writer: &mut impl BackendProtoWriter) -> anyhow::Result<()> {
|
||||
send_row_description(writer).await?;
|
||||
|
||||
write_row(writer, b"0", b"1337", b"auto").await?;
|
||||
write_row(writer, b"1", b"69", b"bus").await?;
|
||||
write_row(writer, b"2", b"420", b"kolo").await?;
|
||||
|
||||
writer
|
||||
.write_proto(
|
||||
CommandCompleteData {
|
||||
tag: "SELECT 3".to_string().into(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_row(
|
||||
writer: &mut impl BackendProtoWriter,
|
||||
first: &[u8],
|
||||
second: &[u8],
|
||||
third: &[u8],
|
||||
) -> anyhow::Result<()> {
|
||||
let row_data = vec![
|
||||
first.to_vec().into(),
|
||||
second.to_vec().into(),
|
||||
third.to_vec().into(),
|
||||
]
|
||||
.into();
|
||||
|
||||
writer
|
||||
.write_proto(DataRowData { columns: row_data }.into())
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue