cargo format
This commit is contained in:
parent
4d45da0cd1
commit
845db102c2
33 changed files with 885 additions and 530 deletions
|
|
@ -1,5 +1,5 @@
|
|||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct ResetCancelToken {
|
||||
is_canceled: Arc<AtomicBool>,
|
||||
|
|
@ -31,4 +31,4 @@ impl Clone for ResetCancelToken {
|
|||
is_canceled: self.is_canceled.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use clap::Parser;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::path::PathBuf;
|
||||
use clap::Parser;
|
||||
|
||||
const LOCAL_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
|
||||
|
||||
|
|
@ -9,7 +9,12 @@ const LOCAL_IPV4: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
|
|||
pub struct Configuration {
|
||||
#[arg(short, long, default_value_t = LOCAL_IPV4, help = "IP address for the server to listen on")]
|
||||
address: IpAddr,
|
||||
#[arg(short, long, default_value = "5432", help = "Port for the server to listen on")]
|
||||
#[arg(
|
||||
short,
|
||||
long,
|
||||
default_value = "5432",
|
||||
help = "Port for the server to listen on"
|
||||
)]
|
||||
port: u16,
|
||||
#[arg(short, long, help = "Path to the data file")]
|
||||
file: PathBuf,
|
||||
|
|
@ -25,4 +30,4 @@ impl Configuration {
|
|||
pub fn get_file_path(&self) -> &PathBuf {
|
||||
&self.file
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,10 +24,10 @@ use crate::config::Configuration;
|
|||
use crate::persistence::state_to_file;
|
||||
use crate::proto_wrapper::{CompleteStatus, ServerProto};
|
||||
|
||||
mod config;
|
||||
mod proto_wrapper;
|
||||
mod cancellation;
|
||||
mod config;
|
||||
mod persistence;
|
||||
mod proto_wrapper;
|
||||
|
||||
type TokenStore = Arc<Mutex<HashMap<(i32, i32), ResetCancelToken>>>;
|
||||
type SharedDbState = Arc<RwLock<State>>;
|
||||
|
|
@ -65,16 +65,17 @@ async fn get_state(config: &Configuration) -> anyhow::Result<State> {
|
|||
println!("WARNING: No DB state file found, creating new one");
|
||||
Ok(State::new())
|
||||
}
|
||||
Err(e) => {
|
||||
Err(e)?
|
||||
}
|
||||
Ok(state) => {
|
||||
Ok(state)
|
||||
}
|
||||
Err(e) => Err(e)?,
|
||||
Ok(state) => Ok(state),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_stream(mut stream: TcpStream, state: SharedDbState, tokens: TokenStore, config: Arc<Configuration>) -> anyhow::Result<()> {
|
||||
async fn handle_stream(
|
||||
mut stream: TcpStream,
|
||||
state: SharedDbState,
|
||||
tokens: TokenStore,
|
||||
config: Arc<Configuration>,
|
||||
) -> anyhow::Result<()> {
|
||||
let (reader, writer) = stream.split();
|
||||
let mut writer = ProtoWriter::new(BufWriter::new(writer));
|
||||
let mut reader = ProtoReader::new(BufReader::new(reader), 1024);
|
||||
|
|
@ -88,7 +89,9 @@ async fn handle_stream(mut stream: TcpStream, state: SharedDbState, tokens: Toke
|
|||
|
||||
let result = match request {
|
||||
Ok(req) => handle_connection(&mut reader, &mut writer, req, state, token, config).await,
|
||||
Err(ServerHandshakeError::IsCancelRequest(cancel)) => handle_cancellation(cancel.pid, cancel.secret, &tokens).await,
|
||||
Err(ServerHandshakeError::IsCancelRequest(cancel)) => {
|
||||
handle_cancellation(cancel.pid, cancel.secret, &tokens).await
|
||||
}
|
||||
Err(e) => Err(anyhow::anyhow!("Error during handshake: {:?}", e)),
|
||||
};
|
||||
|
||||
|
|
@ -134,10 +137,17 @@ async fn handle_cancellation(pid: i32, key: i32, tokens: &TokenStore) -> anyhow:
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_connection<R, W>(reader: &mut R, writer: &mut W, request: HandshakeRequest, state: SharedDbState, token: ResetCancelToken, config: Arc<Configuration>) -> anyhow::Result<()>
|
||||
where
|
||||
R: FrontendProtoReader + Send,
|
||||
W: BackendProtoWriter + ProtoFlush + Send,
|
||||
async fn handle_connection<R, W>(
|
||||
reader: &mut R,
|
||||
writer: &mut W,
|
||||
request: HandshakeRequest,
|
||||
state: SharedDbState,
|
||||
token: ResetCancelToken,
|
||||
config: Arc<Configuration>,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
R: FrontendProtoReader + Send,
|
||||
W: BackendProtoWriter + ProtoFlush + Send,
|
||||
{
|
||||
println!("Client connected: {:?}", request);
|
||||
|
||||
|
|
@ -152,9 +162,7 @@ async fn handle_connection<R, W>(reader: &mut R, writer: &mut W, request: Handsh
|
|||
let result = handle_query(writer, &state, data.query.into(), &token, &config).await;
|
||||
match result {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
writer.write_error_message(&e.to_string()).await?
|
||||
}
|
||||
Err(e) => writer.write_error_message(&e.to_string()).await?,
|
||||
}
|
||||
writer.write_ready_for_query().await?;
|
||||
}
|
||||
|
|
@ -165,9 +173,15 @@ async fn handle_connection<R, W>(reader: &mut R, writer: &mut W, request: Handsh
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_query<W>(writer: &mut W, state: &SharedDbState, query: String, token: &ResetCancelToken, config: &Arc<Configuration>) -> anyhow::Result<()>
|
||||
where
|
||||
W: BackendProtoWriter + ProtoFlush + Send,
|
||||
async fn handle_query<W>(
|
||||
writer: &mut W,
|
||||
state: &SharedDbState,
|
||||
query: String,
|
||||
token: &ResetCancelToken,
|
||||
config: &Arc<Configuration>,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
W: BackendProtoWriter + ProtoFlush + Send,
|
||||
{
|
||||
// Make sure token is reset before next query
|
||||
token.reset();
|
||||
|
|
@ -184,11 +198,15 @@ async fn handle_query<W>(writer: &mut W, state: &SharedDbState, query: String, t
|
|||
|
||||
match response {
|
||||
Response::Deleted(i) => {
|
||||
writer.write_command_complete(CompleteStatus::Delete(i)).await?;
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::Delete(i))
|
||||
.await?;
|
||||
true
|
||||
}
|
||||
Response::Inserted => {
|
||||
writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await?;
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 })
|
||||
.await?;
|
||||
true
|
||||
}
|
||||
Response::Selected(schema, columns, mut rows) => {
|
||||
|
|
@ -207,22 +225,30 @@ async fn handle_query<W>(writer: &mut W, state: &SharedDbState, query: String, t
|
|||
}
|
||||
}
|
||||
|
||||
writer.write_command_complete(CompleteStatus::Select(sent_rows)).await?;
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::Select(sent_rows))
|
||||
.await?;
|
||||
}
|
||||
_ => {
|
||||
writer.write_command_complete(CompleteStatus::Select(0)).await?;
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::Select(0))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
false
|
||||
},
|
||||
}
|
||||
Response::TableCreated => {
|
||||
writer.write_command_complete(CompleteStatus::CreateTable).await?;
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::CreateTable)
|
||||
.await?;
|
||||
true
|
||||
},
|
||||
}
|
||||
Response::IndexCreated => {
|
||||
writer.write_command_complete(CompleteStatus::CreateIndex).await?;
|
||||
writer
|
||||
.write_command_complete(CompleteStatus::CreateIndex)
|
||||
.await?;
|
||||
true
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use minisql::interpreter::State;
|
||||
use std::path::PathBuf;
|
||||
use tokio::{fs, io};
|
||||
use minisql::interpreter::State;
|
||||
|
||||
pub async fn state_from_file(path: &PathBuf) -> io::Result<State> {
|
||||
let content = fs::read_to_string(path).await?;
|
||||
|
|
|
|||
|
|
@ -1,20 +1,20 @@
|
|||
use async_trait::async_trait;
|
||||
use minisql::operation::ColumnSelection;
|
||||
use minisql::restricted_row::RestrictedRow;
|
||||
use minisql::schema::{Column, TableSchema};
|
||||
use proto::message::backend::{
|
||||
BackendMessage, ColumnDescription, CommandCompleteData, DataRowData, ErrorResponseData,
|
||||
ReadyForQueryData, RowDescriptionData,
|
||||
};
|
||||
use proto::message::primitive::pglist::PgList;
|
||||
use proto::writer::backend::BackendProtoWriter;
|
||||
use rand::Rng;
|
||||
use rand_pcg::Pcg64;
|
||||
use rand_seeder::Seeder;
|
||||
use std::fmt;
|
||||
use minisql::operation::ColumnSelection;
|
||||
use minisql::restricted_row::RestrictedRow;
|
||||
use minisql::schema::{Column, TableSchema};
|
||||
use proto::message::backend::{BackendMessage, ColumnDescription, CommandCompleteData, DataRowData, ErrorResponseData, ReadyForQueryData, RowDescriptionData};
|
||||
use proto::message::primitive::pglist::PgList;
|
||||
use proto::writer::backend::BackendProtoWriter;
|
||||
|
||||
pub enum CompleteStatus {
|
||||
Insert {
|
||||
oid: i32,
|
||||
rows: i32,
|
||||
},
|
||||
Insert { oid: i32, rows: i32 },
|
||||
Delete(usize),
|
||||
Select(usize),
|
||||
CreateTable,
|
||||
|
|
@ -38,24 +38,36 @@ pub trait ServerProto {
|
|||
async fn write_error_message(&mut self, error_message: &str) -> anyhow::Result<()>;
|
||||
async fn write_ready_for_query(&mut self) -> anyhow::Result<()>;
|
||||
async fn write_empty_query(&mut self) -> anyhow::Result<()>;
|
||||
async fn write_table_header(&mut self, table_schema: &TableSchema, columns: &ColumnSelection) -> anyhow::Result<()>;
|
||||
async fn write_table_header(
|
||||
&mut self,
|
||||
table_schema: &TableSchema,
|
||||
columns: &ColumnSelection,
|
||||
) -> anyhow::Result<()>;
|
||||
async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()>;
|
||||
async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<W> ServerProto for W where W: BackendProtoWriter + Send {
|
||||
impl<W> ServerProto for W
|
||||
where
|
||||
W: BackendProtoWriter + Send,
|
||||
{
|
||||
async fn write_error_message(&mut self, error_message: &str) -> anyhow::Result<()> {
|
||||
self.write_proto(ErrorResponseData {
|
||||
code: b'M',
|
||||
message: format!("{error_message}\0").into(),
|
||||
}.into()).await?;
|
||||
self.write_proto(
|
||||
ErrorResponseData {
|
||||
code: b'M',
|
||||
message: format!("{error_message}\0").into(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_ready_for_query(&mut self) -> anyhow::Result<()> {
|
||||
self.write_proto(ReadyForQueryData { status: b'I' }.into()).await?;
|
||||
self.write_proto(ReadyForQueryData { status: b'I' }.into())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -64,35 +76,52 @@ impl<W> ServerProto for W where W: BackendProtoWriter + Send {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_table_header(&mut self, table_schema: &TableSchema, columns: &ColumnSelection) -> anyhow::Result<()> {
|
||||
let columns = columns.iter()
|
||||
async fn write_table_header(
|
||||
&mut self,
|
||||
table_schema: &TableSchema,
|
||||
columns: &ColumnSelection,
|
||||
) -> anyhow::Result<()> {
|
||||
let columns = columns
|
||||
.iter()
|
||||
.map(|column| column_to_description(table_schema, *column))
|
||||
.collect::<anyhow::Result<Vec<ColumnDescription>>>()?;
|
||||
|
||||
self.write_proto(RowDescriptionData { columns: columns.into() }.into()).await?;
|
||||
self.write_proto(
|
||||
RowDescriptionData {
|
||||
columns: columns.into(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()> {
|
||||
let values = row.iter()
|
||||
let values = row
|
||||
.iter()
|
||||
.map(|(_, value)| value.as_text_bytes().into())
|
||||
.collect::<Vec<PgList<u8, i32>>>();
|
||||
|
||||
self.write_proto(BackendMessage::DataRow(DataRowData {
|
||||
columns: values.into(),
|
||||
})).await?;
|
||||
}))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_command_complete(&mut self, status: CompleteStatus) -> anyhow::Result<()> {
|
||||
self.write_proto(BackendMessage::CommandComplete(CommandCompleteData {
|
||||
tag: status.to_string().into(),
|
||||
})).await?;
|
||||
}))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn column_to_description(schema: &TableSchema, column: Column) -> anyhow::Result<ColumnDescription> {
|
||||
fn column_to_description(
|
||||
schema: &TableSchema,
|
||||
column: Column,
|
||||
) -> anyhow::Result<ColumnDescription> {
|
||||
let table_name = schema.table_name();
|
||||
let table_oid = table_name_to_oid(table_name);
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue