Merge branch 'data-sender' into 'main'
Interface for sending responses from interpreter See merge request x433485/minisql!33
This commit is contained in:
commit
ad061036db
7 changed files with 118 additions and 80 deletions
10
Cargo.lock
generated
10
Cargo.lock
generated
|
|
@ -67,15 +67,15 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "anyhow"
|
name = "anyhow"
|
||||||
version = "1.0.76"
|
version = "1.0.79"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "59d2a3357dde987206219e78ecfbbb6e8dad06cbb65292758d3270e6254f7355"
|
checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-trait"
|
name = "async-trait"
|
||||||
version = "0.1.74"
|
version = "0.1.77"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9"
|
checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2 1.0.78",
|
"proc-macro2 1.0.78",
|
||||||
"quote 1.0.35",
|
"quote 1.0.35",
|
||||||
|
|
@ -283,6 +283,8 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||||
name = "minisql"
|
name = "minisql"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"async-trait",
|
||||||
"bimap",
|
"bimap",
|
||||||
"proto",
|
"proto",
|
||||||
"serde",
|
"serde",
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,8 @@ rust-version = "1.74"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
anyhow = "1.0.79"
|
||||||
|
async-trait = "0.1.77"
|
||||||
bimap = { version = "0.6.3", features = ["serde"] }
|
bimap = { version = "0.6.3", features = ["serde"] }
|
||||||
serde = { version = "1.0.196", features = ["derive"] }
|
serde = { version = "1.0.196", features = ["derive"] }
|
||||||
thiserror = "1.0.50"
|
thiserror = "1.0.50"
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ mod error;
|
||||||
mod internals;
|
mod internals;
|
||||||
pub mod interpreter;
|
pub mod interpreter;
|
||||||
pub mod operation;
|
pub mod operation;
|
||||||
|
pub mod response_writer;
|
||||||
pub mod restricted_row;
|
pub mod restricted_row;
|
||||||
mod result;
|
mod result;
|
||||||
pub mod schema;
|
pub mod schema;
|
||||||
|
|
|
||||||
36
minisql/src/response_writer.rs
Normal file
36
minisql/src/response_writer.rs
Normal file
|
|
@ -0,0 +1,36 @@
|
||||||
|
use crate::operation::ColumnSelection;
|
||||||
|
use crate::restricted_row::RestrictedRow;
|
||||||
|
use crate::schema::TableSchema;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
pub enum CompleteStatus {
|
||||||
|
Insert { oid: i32, rows: i32 },
|
||||||
|
Delete(usize),
|
||||||
|
Select(usize),
|
||||||
|
CreateTable,
|
||||||
|
CreateIndex,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for CompleteStatus {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
CompleteStatus::Insert { oid, rows } => write!(f, "INSERT {} {}", oid, rows),
|
||||||
|
CompleteStatus::Delete(rows) => write!(f, "DELETE {}", rows),
|
||||||
|
CompleteStatus::Select(rows) => write!(f, "SELECT {}", rows),
|
||||||
|
CompleteStatus::CreateTable => write!(f, "CREATE TABLE"),
|
||||||
|
CompleteStatus::CreateIndex => write!(f, "CREATE INDEX"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait ResponseWriter {
|
||||||
|
async fn write_table_header(
|
||||||
|
&mut self,
|
||||||
|
table_schema: &TableSchema,
|
||||||
|
columns: &ColumnSelection,
|
||||||
|
) -> anyhow::Result<()>;
|
||||||
|
async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()>;
|
||||||
|
async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()>;
|
||||||
|
}
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
const LOCAL_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
|
const LOCAL_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
|
||||||
|
|
||||||
|
|
@ -34,7 +35,7 @@ impl Configuration {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn get_throttle(&self) -> Option<u64> {
|
pub fn get_throttle(&self) -> Option<Duration> {
|
||||||
self.throttle
|
self.throttle.map(|d| Duration::from_millis(d))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio::sync::{Mutex, RwLock};
|
use tokio::sync::{Mutex, RwLock};
|
||||||
|
|
||||||
use minisql::interpreter::{Response, State};
|
use minisql::interpreter::{Response, State};
|
||||||
|
use minisql::response_writer::{CompleteStatus, ResponseWriter};
|
||||||
use parser::parse_and_validate;
|
use parser::parse_and_validate;
|
||||||
use proto::handshake::errors::ServerHandshakeError;
|
use proto::handshake::errors::ServerHandshakeError;
|
||||||
use proto::handshake::request::HandshakeRequest;
|
use proto::handshake::request::HandshakeRequest;
|
||||||
|
|
@ -22,7 +23,7 @@ use proto::writer::protowriter::{ProtoFlush, ProtoWriter};
|
||||||
use crate::cancellation::ResetCancelToken;
|
use crate::cancellation::ResetCancelToken;
|
||||||
use crate::config::Configuration;
|
use crate::config::Configuration;
|
||||||
use crate::persistence::state_to_file;
|
use crate::persistence::state_to_file;
|
||||||
use crate::proto_wrapper::{CompleteStatus, ServerProto};
|
use crate::proto_wrapper::ServerProtoWrapper;
|
||||||
|
|
||||||
mod cancellation;
|
mod cancellation;
|
||||||
mod config;
|
mod config;
|
||||||
|
|
@ -87,8 +88,11 @@ async fn handle_stream(
|
||||||
let response = HandshakeResponse::new("minisql", pid, key);
|
let response = HandshakeResponse::new("minisql", pid, key);
|
||||||
let request = do_server_handshake(&mut writer, &mut reader, response).await;
|
let request = do_server_handshake(&mut writer, &mut reader, response).await;
|
||||||
|
|
||||||
|
let mut wrapped_writer = ServerProtoWrapper::new(writer, config.get_throttle());
|
||||||
let result = match request {
|
let result = match request {
|
||||||
Ok(req) => handle_connection(&mut reader, &mut writer, req, state, token, config).await,
|
Ok(req) => {
|
||||||
|
handle_connection(&mut reader, &mut wrapped_writer, req, state, token, config).await
|
||||||
|
}
|
||||||
Err(ServerHandshakeError::IsCancelRequest(cancel)) => {
|
Err(ServerHandshakeError::IsCancelRequest(cancel)) => {
|
||||||
handle_cancellation(cancel.pid, cancel.secret, &tokens).await
|
handle_cancellation(cancel.pid, cancel.secret, &tokens).await
|
||||||
}
|
}
|
||||||
|
|
@ -139,7 +143,7 @@ async fn handle_cancellation(pid: i32, key: i32, tokens: &TokenStore) -> anyhow:
|
||||||
|
|
||||||
async fn handle_connection<R, W>(
|
async fn handle_connection<R, W>(
|
||||||
reader: &mut R,
|
reader: &mut R,
|
||||||
writer: &mut W,
|
writer: &mut ServerProtoWrapper<W>,
|
||||||
request: HandshakeRequest,
|
request: HandshakeRequest,
|
||||||
state: SharedDbState,
|
state: SharedDbState,
|
||||||
token: ResetCancelToken,
|
token: ResetCancelToken,
|
||||||
|
|
@ -174,7 +178,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_query<W>(
|
async fn handle_query<W>(
|
||||||
writer: &mut W,
|
writer: &mut ServerProtoWrapper<W>,
|
||||||
state: &SharedDbState,
|
state: &SharedDbState,
|
||||||
query: String,
|
query: String,
|
||||||
token: &ResetCancelToken,
|
token: &ResetCancelToken,
|
||||||
|
|
@ -223,10 +227,6 @@ where
|
||||||
token.reset();
|
token.reset();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if let Some(throttle) = config.get_throttle() {
|
|
||||||
writer.flush().await?;
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_millis(throttle)).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
writer
|
writer
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use minisql::operation::ColumnSelection;
|
use minisql::operation::ColumnSelection;
|
||||||
|
use minisql::response_writer::{CompleteStatus, ResponseWriter};
|
||||||
use minisql::restricted_row::RestrictedRow;
|
use minisql::restricted_row::RestrictedRow;
|
||||||
use minisql::schema::{Column, TableSchema};
|
use minisql::schema::{Column, TableSchema};
|
||||||
use proto::message::backend::{
|
use proto::message::backend::{
|
||||||
|
|
@ -9,71 +10,57 @@ use proto::message::backend::{
|
||||||
use proto::message::primitive::pglist::PgList;
|
use proto::message::primitive::pglist::PgList;
|
||||||
use proto::message::primitive::pgoid::PgOid;
|
use proto::message::primitive::pgoid::PgOid;
|
||||||
use proto::writer::backend::BackendProtoWriter;
|
use proto::writer::backend::BackendProtoWriter;
|
||||||
use std::fmt;
|
use proto::writer::protowriter::ProtoFlush;
|
||||||
|
use std::io::Error;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
pub enum CompleteStatus {
|
pub struct ServerProtoWrapper<W>(W, Option<Duration>);
|
||||||
Insert { oid: i32, rows: i32 },
|
|
||||||
Delete(usize),
|
|
||||||
Select(usize),
|
|
||||||
CreateTable,
|
|
||||||
CreateIndex,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for CompleteStatus {
|
impl<W> ServerProtoWrapper<W>
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
match self {
|
|
||||||
CompleteStatus::Insert { oid, rows } => write!(f, "INSERT {} {}", oid, rows),
|
|
||||||
CompleteStatus::Delete(rows) => write!(f, "DELETE {}", rows),
|
|
||||||
CompleteStatus::Select(rows) => write!(f, "SELECT {}", rows),
|
|
||||||
CompleteStatus::CreateTable => write!(f, "CREATE TABLE"),
|
|
||||||
CompleteStatus::CreateIndex => write!(f, "CREATE INDEX"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait ServerProto {
|
|
||||||
async fn write_error_message(&mut self, error_message: &str) -> anyhow::Result<()>;
|
|
||||||
async fn write_ready_for_query(&mut self) -> anyhow::Result<()>;
|
|
||||||
async fn write_empty_query(&mut self) -> anyhow::Result<()>;
|
|
||||||
async fn write_table_header(
|
|
||||||
&mut self,
|
|
||||||
table_schema: &TableSchema,
|
|
||||||
columns: &ColumnSelection,
|
|
||||||
) -> anyhow::Result<()>;
|
|
||||||
async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()>;
|
|
||||||
async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<W> ServerProto for W
|
|
||||||
where
|
where
|
||||||
W: BackendProtoWriter + Send,
|
W: BackendProtoWriter + ProtoFlush + Send,
|
||||||
{
|
{
|
||||||
async fn write_error_message(&mut self, error_message: &str) -> anyhow::Result<()> {
|
pub fn new(writer: W, throttle: Option<Duration>) -> Self {
|
||||||
self.write_proto(
|
Self(writer, throttle)
|
||||||
ErrorResponseData {
|
}
|
||||||
code: b'M',
|
|
||||||
message: format!("{error_message}\0").into(),
|
pub async fn write_error_message(&mut self, error_message: &str) -> anyhow::Result<()> {
|
||||||
}
|
self.0
|
||||||
.into(),
|
.write_proto(
|
||||||
)
|
ErrorResponseData {
|
||||||
.await?;
|
code: b'M',
|
||||||
|
message: format!("{error_message}\0").into(),
|
||||||
|
}
|
||||||
|
.into(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn write_ready_for_query(&mut self) -> anyhow::Result<()> {
|
pub async fn write_ready_for_query(&mut self) -> anyhow::Result<()> {
|
||||||
self.write_proto(ReadyForQueryData { status: b'I' }.into())
|
self.0
|
||||||
|
.write_proto(ReadyForQueryData { status: b'I' }.into())
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn write_empty_query(&mut self) -> anyhow::Result<()> {
|
#[async_trait]
|
||||||
self.write_proto(BackendMessage::EmptyQueryResponse).await?;
|
impl<W> ProtoFlush for ServerProtoWrapper<W>
|
||||||
Ok(())
|
where
|
||||||
|
W: ProtoFlush + Send,
|
||||||
|
{
|
||||||
|
async fn flush(&mut self) -> Result<(), Error> {
|
||||||
|
self.0.flush().await
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<W> ResponseWriter for ServerProtoWrapper<W>
|
||||||
|
where
|
||||||
|
W: BackendProtoWriter + ProtoFlush + Send,
|
||||||
|
{
|
||||||
async fn write_table_header(
|
async fn write_table_header(
|
||||||
&mut self,
|
&mut self,
|
||||||
table_schema: &TableSchema,
|
table_schema: &TableSchema,
|
||||||
|
|
@ -84,13 +71,14 @@ where
|
||||||
.map(|column| column_to_description(table_schema, *column))
|
.map(|column| column_to_description(table_schema, *column))
|
||||||
.collect::<anyhow::Result<Vec<ColumnDescription>>>()?;
|
.collect::<anyhow::Result<Vec<ColumnDescription>>>()?;
|
||||||
|
|
||||||
self.write_proto(
|
self.0
|
||||||
RowDescriptionData {
|
.write_proto(
|
||||||
columns: columns.into(),
|
RowDescriptionData {
|
||||||
}
|
columns: columns.into(),
|
||||||
.into(),
|
}
|
||||||
)
|
.into(),
|
||||||
.await?;
|
)
|
||||||
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -100,18 +88,26 @@ where
|
||||||
.map(|(_, value)| value.as_text_bytes().into())
|
.map(|(_, value)| value.as_text_bytes().into())
|
||||||
.collect::<Vec<PgList<u8, i32>>>();
|
.collect::<Vec<PgList<u8, i32>>>();
|
||||||
|
|
||||||
self.write_proto(BackendMessage::DataRow(DataRowData {
|
self.0
|
||||||
columns: values.into(),
|
.write_proto(BackendMessage::DataRow(DataRowData {
|
||||||
}))
|
columns: values.into(),
|
||||||
.await?;
|
}))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if let Some(throttle) = self.1 {
|
||||||
|
self.0.flush().await?;
|
||||||
|
tokio::time::sleep(throttle).await;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()> {
|
async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()> {
|
||||||
self.write_proto(BackendMessage::CommandComplete(CommandCompleteData {
|
self.0
|
||||||
tag: status.to_string().into(),
|
.write_proto(BackendMessage::CommandComplete(CommandCompleteData {
|
||||||
}))
|
tag: status.to_string().into(),
|
||||||
.await?;
|
}))
|
||||||
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue