use async_trait::async_trait; use rand::Rng; use rand_pcg::Pcg64; use rand_seeder::Seeder; use std::fmt; use minisql::operation::ColumnSelection; use minisql::restricted_row::RestrictedRow; use minisql::schema::{Column, TableSchema}; use proto::message::backend::{BackendMessage, ColumnDescription, CommandCompleteData, DataRowData, ErrorResponseData, ReadyForQueryData, RowDescriptionData}; use proto::message::primitive::pglist::PgList; use proto::writer::backend::BackendProtoWriter; pub enum CompleteStatus { Insert { oid: i32, rows: i32, }, Delete(usize), Select(usize), CreateTable, CreateIndex, } impl fmt::Display for CompleteStatus { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { CompleteStatus::Insert { oid, rows } => write!(f, "INSERT {} {}", oid, rows), CompleteStatus::Delete(rows) => write!(f, "DELETE {}", rows), CompleteStatus::Select(rows) => write!(f, "SELECT {}", rows), CompleteStatus::CreateTable => write!(f, "CREATE TABLE"), CompleteStatus::CreateIndex => write!(f, "CREATE INDEX"), } } } #[async_trait] pub trait ServerProto { async fn write_error_message(&mut self, error_message: &str) -> anyhow::Result<()>; async fn write_ready_for_query(&mut self) -> anyhow::Result<()>; async fn write_empty_query(&mut self) -> anyhow::Result<()>; async fn write_table_header(&mut self, table_schema: &TableSchema, columns: &ColumnSelection) -> anyhow::Result<()>; async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()>; async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()>; } #[async_trait] impl ServerProto for W where W: BackendProtoWriter + Send { async fn write_error_message(&mut self, error_message: &str) -> anyhow::Result<()> { self.write_proto(ErrorResponseData { code: b'M', message: format!("{error_message}\0").into(), }.into()).await?; Ok(()) } async fn write_ready_for_query(&mut self) -> anyhow::Result<()> { self.write_proto(ReadyForQueryData { status: b'I' }.into()).await?; Ok(()) } async fn write_empty_query(&mut self) -> anyhow::Result<()> { self.write_proto(BackendMessage::EmptyQueryResponse).await?; Ok(()) } async fn write_table_header(&mut self, table_schema: &TableSchema, columns: &ColumnSelection) -> anyhow::Result<()> { let columns = columns.iter() .map(|column| column_to_description(table_schema, *column)) .collect::>>()?; self.write_proto(RowDescriptionData { columns: columns.into() }.into()).await?; Ok(()) } async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()> { let values = row.iter() .map(|(_, value)| value.as_text_bytes().into()) .collect::>>(); self.write_proto(BackendMessage::DataRow(DataRowData { columns: values.into(), })).await?; Ok(()) } async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()> { self.write_proto(BackendMessage::CommandComplete(CommandCompleteData { tag: status.to_string().into(), })).await?; Ok(()) } } fn column_to_description(schema: &TableSchema, column: Column) -> anyhow::Result { let table_name = schema.table_name(); let table_oid = table_name_to_oid(table_name); let column_type = schema.column_type(column); let name = schema.column_name_from_column(column); let column_index = column.try_into()?; let type_oid = column_type.type_oid(); let type_size = column_type.type_size(); Ok(ColumnDescription { name: name.to_string().into(), table_oid, column_index, type_oid, type_size, type_modifier: -1, format_code: 0, // text format }) } /// Hashes the table name into single i32 used as a substitute for the table OID /// in the PostgreSQL protocol. fn table_name_to_oid(table_name: &str) -> i32 { let mut rng: Pcg64 = Seeder::from(table_name).make_rng(); rng.gen::() }