104 lines
3.6 KiB
Rust
104 lines
3.6 KiB
Rust
use async_trait::async_trait;
|
|
use minisql::restricted_row::RestrictedRow;
|
|
use minisql::schema::TableSchema;
|
|
use minisql::type_system::{Value};
|
|
use proto::message::backend::{BackendMessage, ColumnDescription, CommandCompleteData, DataRowData, ErrorResponseData, ReadyForQueryData, RowDescriptionData};
|
|
use proto::message::primitive::pglist::PgList;
|
|
use proto::writer::backend::BackendProtoWriter;
|
|
|
|
pub enum CompleteStatus {
|
|
Insert {
|
|
oid: i32,
|
|
rows: i32,
|
|
},
|
|
Delete(usize),
|
|
Select(usize),
|
|
}
|
|
|
|
impl CompleteStatus {
|
|
fn to_string(&self) -> String {
|
|
match self {
|
|
CompleteStatus::Insert { oid, rows } => format!("INSERT {} {}", oid, rows),
|
|
CompleteStatus::Delete(rows) => format!("DELETE {}", rows),
|
|
CompleteStatus::Select(rows) => format!("SELECT {}", rows),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
pub trait ServerProto {
|
|
async fn write_error_message(&mut self, error_message: &str) -> anyhow::Result<()>;
|
|
async fn write_ready_for_query(&mut self) -> anyhow::Result<()>;
|
|
async fn write_empty_query(&mut self) -> anyhow::Result<()>;
|
|
async fn write_table_header(&mut self, table_schema: &TableSchema, row: &RestrictedRow) -> anyhow::Result<()>;
|
|
async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()>;
|
|
async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()>;
|
|
}
|
|
|
|
#[async_trait]
|
|
impl<W> ServerProto for W where W: BackendProtoWriter + Send {
|
|
async fn write_error_message(&mut self, error_message: &str) -> anyhow::Result<()> {
|
|
self.write_proto(ErrorResponseData {
|
|
code: b'M',
|
|
message: format!("{error_message}\0").into(),
|
|
}.into()).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn write_ready_for_query(&mut self) -> anyhow::Result<()> {
|
|
self.write_proto(ReadyForQueryData { status: b'I' }.into()).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn write_empty_query(&mut self) -> anyhow::Result<()> {
|
|
self.write_proto(BackendMessage::EmptyQueryResponse).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn write_table_header(&mut self, table_schema: &TableSchema, row: &RestrictedRow) -> anyhow::Result<()> {
|
|
let columns = row.iter()
|
|
.map(|(index, value)| value_to_column_description(table_schema, value, index))
|
|
.collect::<anyhow::Result<Vec<ColumnDescription>>>()?;
|
|
|
|
self.write_proto(RowDescriptionData { columns: columns.into() }.into()).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()> {
|
|
let values = row.iter()
|
|
.map(|(_, value)| value.as_text_bytes().into())
|
|
.collect::<Vec<PgList<u8, i32>>>();
|
|
|
|
self.write_proto(BackendMessage::DataRow(DataRowData {
|
|
columns: values.into(),
|
|
})).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()> {
|
|
self.write_proto(BackendMessage::CommandComplete(CommandCompleteData {
|
|
tag: status.to_string().into(),
|
|
})).await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn value_to_column_description(schema: &TableSchema, value: &Value, index: &usize) -> anyhow::Result<ColumnDescription> {
|
|
let name = schema.column_name_from_column_position(*index)?;
|
|
|
|
let table_oid = schema.table_name().as_bytes().as_ptr() as i32;
|
|
let column_index = (*index).try_into()?;
|
|
let type_oid = value.type_oid();
|
|
let type_size = value.type_size();
|
|
|
|
Ok(ColumnDescription {
|
|
name: name.to_string().into(),
|
|
table_oid,
|
|
column_index,
|
|
type_oid,
|
|
type_size,
|
|
type_modifier: -1,
|
|
format_code: 0, // text format
|
|
})
|
|
}
|