refactor: create trait for writing response from interpreter
This commit is contained in:
parent
b87ff160d2
commit
b5405d7575
7 changed files with 118 additions and 80 deletions
|
|
@ -1,6 +1,7 @@
|
|||
use clap::Parser;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
const LOCAL_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
|
||||
|
||||
|
|
@ -34,7 +35,7 @@ impl Configuration {
|
|||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_throttle(&self) -> Option<u64> {
|
||||
self.throttle
|
||||
pub fn get_throttle(&self) -> Option<Duration> {
|
||||
self.throttle.map(|d| Duration::from_millis(d))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ use tokio::net::{TcpListener, TcpStream};
|
|||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use minisql::interpreter::{Response, State};
|
||||
use minisql::response_writer::{CompleteStatus, ResponseWriter};
|
||||
use parser::parse_and_validate;
|
||||
use proto::handshake::errors::ServerHandshakeError;
|
||||
use proto::handshake::request::HandshakeRequest;
|
||||
|
|
@ -22,7 +23,7 @@ use proto::writer::protowriter::{ProtoFlush, ProtoWriter};
|
|||
use crate::cancellation::ResetCancelToken;
|
||||
use crate::config::Configuration;
|
||||
use crate::persistence::state_to_file;
|
||||
use crate::proto_wrapper::{CompleteStatus, ServerProto};
|
||||
use crate::proto_wrapper::ServerProtoWrapper;
|
||||
|
||||
mod cancellation;
|
||||
mod config;
|
||||
|
|
@ -87,8 +88,11 @@ async fn handle_stream(
|
|||
let response = HandshakeResponse::new("minisql", pid, key);
|
||||
let request = do_server_handshake(&mut writer, &mut reader, response).await;
|
||||
|
||||
let mut wrapped_writer = ServerProtoWrapper::new(writer, config.get_throttle());
|
||||
let result = match request {
|
||||
Ok(req) => handle_connection(&mut reader, &mut writer, req, state, token, config).await,
|
||||
Ok(req) => {
|
||||
handle_connection(&mut reader, &mut wrapped_writer, req, state, token, config).await
|
||||
}
|
||||
Err(ServerHandshakeError::IsCancelRequest(cancel)) => {
|
||||
handle_cancellation(cancel.pid, cancel.secret, &tokens).await
|
||||
}
|
||||
|
|
@ -139,7 +143,7 @@ async fn handle_cancellation(pid: i32, key: i32, tokens: &TokenStore) -> anyhow:
|
|||
|
||||
async fn handle_connection<R, W>(
|
||||
reader: &mut R,
|
||||
writer: &mut W,
|
||||
writer: &mut ServerProtoWrapper<W>,
|
||||
request: HandshakeRequest,
|
||||
state: SharedDbState,
|
||||
token: ResetCancelToken,
|
||||
|
|
@ -174,7 +178,7 @@ where
|
|||
}
|
||||
|
||||
async fn handle_query<W>(
|
||||
writer: &mut W,
|
||||
writer: &mut ServerProtoWrapper<W>,
|
||||
state: &SharedDbState,
|
||||
query: String,
|
||||
token: &ResetCancelToken,
|
||||
|
|
@ -223,10 +227,6 @@ where
|
|||
token.reset();
|
||||
break;
|
||||
}
|
||||
if let Some(throttle) = config.get_throttle() {
|
||||
writer.flush().await?;
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(throttle)).await;
|
||||
}
|
||||
}
|
||||
|
||||
writer
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
use async_trait::async_trait;
|
||||
use minisql::operation::ColumnSelection;
|
||||
use minisql::response_writer::{CompleteStatus, ResponseWriter};
|
||||
use minisql::restricted_row::RestrictedRow;
|
||||
use minisql::schema::{Column, TableSchema};
|
||||
use proto::message::backend::{
|
||||
|
|
@ -9,71 +10,57 @@ use proto::message::backend::{
|
|||
use proto::message::primitive::pglist::PgList;
|
||||
use proto::message::primitive::pgoid::PgOid;
|
||||
use proto::writer::backend::BackendProtoWriter;
|
||||
use std::fmt;
|
||||
use proto::writer::protowriter::ProtoFlush;
|
||||
use std::io::Error;
|
||||
use std::time::Duration;
|
||||
|
||||
pub enum CompleteStatus {
|
||||
Insert { oid: i32, rows: i32 },
|
||||
Delete(usize),
|
||||
Select(usize),
|
||||
CreateTable,
|
||||
CreateIndex,
|
||||
}
|
||||
pub struct ServerProtoWrapper<W>(W, Option<Duration>);
|
||||
|
||||
impl fmt::Display for CompleteStatus {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
CompleteStatus::Insert { oid, rows } => write!(f, "INSERT {} {}", oid, rows),
|
||||
CompleteStatus::Delete(rows) => write!(f, "DELETE {}", rows),
|
||||
CompleteStatus::Select(rows) => write!(f, "SELECT {}", rows),
|
||||
CompleteStatus::CreateTable => write!(f, "CREATE TABLE"),
|
||||
CompleteStatus::CreateIndex => write!(f, "CREATE INDEX"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ServerProto {
|
||||
async fn write_error_message(&mut self, error_message: &str) -> anyhow::Result<()>;
|
||||
async fn write_ready_for_query(&mut self) -> anyhow::Result<()>;
|
||||
async fn write_empty_query(&mut self) -> anyhow::Result<()>;
|
||||
async fn write_table_header(
|
||||
&mut self,
|
||||
table_schema: &TableSchema,
|
||||
columns: &ColumnSelection,
|
||||
) -> anyhow::Result<()>;
|
||||
async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()>;
|
||||
async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<W> ServerProto for W
|
||||
impl<W> ServerProtoWrapper<W>
|
||||
where
|
||||
W: BackendProtoWriter + Send,
|
||||
W: BackendProtoWriter + ProtoFlush + Send,
|
||||
{
|
||||
async fn write_error_message(&mut self, error_message: &str) -> anyhow::Result<()> {
|
||||
self.write_proto(
|
||||
ErrorResponseData {
|
||||
code: b'M',
|
||||
message: format!("{error_message}\0").into(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await?;
|
||||
pub fn new(writer: W, throttle: Option<Duration>) -> Self {
|
||||
Self(writer, throttle)
|
||||
}
|
||||
|
||||
pub async fn write_error_message(&mut self, error_message: &str) -> anyhow::Result<()> {
|
||||
self.0
|
||||
.write_proto(
|
||||
ErrorResponseData {
|
||||
code: b'M',
|
||||
message: format!("{error_message}\0").into(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_ready_for_query(&mut self) -> anyhow::Result<()> {
|
||||
self.write_proto(ReadyForQueryData { status: b'I' }.into())
|
||||
pub async fn write_ready_for_query(&mut self) -> anyhow::Result<()> {
|
||||
self.0
|
||||
.write_proto(ReadyForQueryData { status: b'I' }.into())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_empty_query(&mut self) -> anyhow::Result<()> {
|
||||
self.write_proto(BackendMessage::EmptyQueryResponse).await?;
|
||||
Ok(())
|
||||
#[async_trait]
|
||||
impl<W> ProtoFlush for ServerProtoWrapper<W>
|
||||
where
|
||||
W: ProtoFlush + Send,
|
||||
{
|
||||
async fn flush(&mut self) -> Result<(), Error> {
|
||||
self.0.flush().await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<W> ResponseWriter for ServerProtoWrapper<W>
|
||||
where
|
||||
W: BackendProtoWriter + ProtoFlush + Send,
|
||||
{
|
||||
async fn write_table_header(
|
||||
&mut self,
|
||||
table_schema: &TableSchema,
|
||||
|
|
@ -84,13 +71,14 @@ where
|
|||
.map(|column| column_to_description(table_schema, *column))
|
||||
.collect::<anyhow::Result<Vec<ColumnDescription>>>()?;
|
||||
|
||||
self.write_proto(
|
||||
RowDescriptionData {
|
||||
columns: columns.into(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await?;
|
||||
self.0
|
||||
.write_proto(
|
||||
RowDescriptionData {
|
||||
columns: columns.into(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -100,18 +88,26 @@ where
|
|||
.map(|(_, value)| value.as_text_bytes().into())
|
||||
.collect::<Vec<PgList<u8, i32>>>();
|
||||
|
||||
self.write_proto(BackendMessage::DataRow(DataRowData {
|
||||
columns: values.into(),
|
||||
}))
|
||||
.await?;
|
||||
self.0
|
||||
.write_proto(BackendMessage::DataRow(DataRowData {
|
||||
columns: values.into(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
if let Some(throttle) = self.1 {
|
||||
self.0.flush().await?;
|
||||
tokio::time::sleep(throttle).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()> {
|
||||
self.write_proto(BackendMessage::CommandComplete(CommandCompleteData {
|
||||
tag: status.to_string().into(),
|
||||
}))
|
||||
.await?;
|
||||
self.0
|
||||
.write_proto(BackendMessage::CommandComplete(CommandCompleteData {
|
||||
tag: status.to_string().into(),
|
||||
}))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue