From 9b9f9f16f66d034d83209b2d7e89a83545f8f91e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jind=C5=99ich=20Moravec?= Date: Mon, 5 Feb 2024 21:36:58 +0100 Subject: [PATCH 1/2] feat: metadata serialization --- Cargo.lock | 5 ++- minisql/Cargo.toml | 3 +- minisql/src/error.rs | 13 ++---- minisql/src/interpreter2.rs | 44 +++++++++++++++++++-- storage_engine/src/segments/store_header.rs | 6 +-- storage_engine/src/store.rs | 2 +- 6 files changed, 54 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b653809..42c5df8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,6 +289,7 @@ dependencies = [ "bincode", "proto", "serde", + "serde_json", "storage_engine", "thiserror", "tokio", @@ -557,9 +558,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.112" +version = "1.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d1bd37ce2324cf3bf85e5a25f96eb4baf0d5aa6eba43e7ae8958870c4ec48ed" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" dependencies = [ "itoa", "ryu", diff --git a/minisql/Cargo.toml b/minisql/Cargo.toml index ca06eb6..4029a5c 100644 --- a/minisql/Cargo.toml +++ b/minisql/Cargo.toml @@ -11,8 +11,9 @@ anyhow = "1.0.79" async-trait = "0.1.77" bimap = { version = "0.6.3", features = ["serde"] } bincode = "2.0.0-rc.3" -serde = { version = "1.0.196", features = ["derive"] } +serde = { version = "1.0.196", features = ["derive", "rc"] } tokio = { version = "1.34.0", features = ["full"] } thiserror = "1.0.50" proto = { path = "../proto" } storage_engine = { path = "../storage_engine" } +serde_json = "1.0.113" diff --git a/minisql/src/error.rs b/minisql/src/error.rs index db2d416..03b772f 100644 --- a/minisql/src/error.rs +++ b/minisql/src/error.rs @@ -14,11 +14,13 @@ pub enum RuntimeError { #[error("table {0} already indexes column {1}")] AttemptToIndexAlreadyIndexedColumn(TableName, ColumnName), #[error("File-System Error: {0}")] - IoError(std::io::Error), + IoError(#[from] std::io::Error), #[error("Storage Engine error for table {0}: {1}")] StorageEngineError(TableName, storage_engine::error::Error), #[error("runtime anyhow error: {0}")] - AnyhowError(anyhow::Error) + AnyhowError(#[from] anyhow::Error), + #[error("serde error: {0}")] + SerdeError(#[from] serde_json::Error), } #[derive(Debug, Error)] @@ -32,10 +34,3 @@ pub enum TypeConversionError { #[error("unknown type with oid {oid} and size {size}")] UnknownType { oid: PgOid, size: i16 }, } - - -impl From for RuntimeError { - fn from(e: anyhow::Error) -> RuntimeError { - Self::AnyhowError(e) - } -} diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 7004b1f..cb4eea1 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -11,6 +11,7 @@ use std::path::{Path, PathBuf}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::rc::Rc; +use serde::{Deserialize, Serialize}; use tokio::fs; use storage_engine::store::Store; @@ -18,11 +19,14 @@ use storage_engine::cursor::{ReadCursor, WriteCursor}; use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex; +const METADATA_FILE: &'static str = "metadata.json"; + // ==============Interpreter================ -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct State { table_name_position_mapping: BiMap, table_schemas: Vec>, + #[serde(skip)] tables: Tables, } @@ -63,6 +67,19 @@ impl Table { Ok(table) } + async fn connect(table_schema: Rc, db_path: &Path) -> DbResult { + let table_folder_name = table_schema.table_name(); + let path_to_table_folder = db_path.join(table_folder_name); + + let store: Store = Store::connect(&path_to_table_folder).await.map_err(|e| RuntimeError::StorageEngineError(table_schema.table_name().to_string(), e))?; + + let table = Self { + schema: table_schema, + store, + }; + Ok(table) + } + async fn read(&self) -> DbResult> { let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?; Ok(cursor) @@ -153,8 +170,29 @@ impl StateHandler { Ok(state) } - pub async fn connect(db_path: PathBuf) -> Self { - todo!() + pub async fn connect(db_path: PathBuf) -> DbResult { + let metadata_file = db_path.join(METADATA_FILE); + let metadata_raw = fs::read_to_string(metadata_file).await?; + let mut metadata: State = serde_json::from_str(&metadata_raw)?; + + let mut tables = Vec::with_capacity(metadata.table_schemas.len()); + for table_schema in &metadata.table_schemas { + let table = Table::connect(table_schema.clone(), &db_path).await?; + tables.push(RwLock::new(table)); + } + + metadata.tables = tables; + Ok(Self { + db_path, + state: RwLock::new(metadata), + }) + } + + async fn save_metadata(&self) -> DbResult<()> { + let metadata_file = self.db_path.join(METADATA_FILE); + let state = self.state.read().await; + let metadata_raw = serde_json::to_string(&*state)?; + fs::write(metadata_file, metadata_raw).await.map_err(|e| RuntimeError::IoError(e)) } pub async fn interpret(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { diff --git a/storage_engine/src/segments/store_header.rs b/storage_engine/src/segments/store_header.rs index 77d6de6..b8bdfd5 100644 --- a/storage_engine/src/segments/store_header.rs +++ b/storage_engine/src/segments/store_header.rs @@ -17,7 +17,7 @@ pub struct StoreHeader { #[derive(Debug, Clone)] pub struct StoreHeaderFixedPart { - pub table_folder: String, // This one is not encoded into the file + pub table_folder: PathBuf, // This one is not encoded into the file pub number_of_columns: usize, pub deleted_count: usize, @@ -64,7 +64,7 @@ impl StoreHeader { vec![0; Self::indexed_columns_size(header.number_of_columns)] } - pub async fn decode_fixed(table_folder: &str, result: &[u8]) -> Result { + pub async fn decode_fixed(table_folder: &PathBuf, result: &[u8]) -> Result { let (number_of_columns, _) = decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; @@ -78,7 +78,7 @@ impl StoreHeader { decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; let header = StoreHeaderFixedPart { - table_folder: table_folder.to_string(), + table_folder: table_folder.clone(), number_of_columns, deleted_count, total_count, diff --git a/storage_engine/src/store.rs b/storage_engine/src/store.rs index cf741a8..bb256ff 100644 --- a/storage_engine/src/store.rs +++ b/storage_engine/src/store.rs @@ -120,7 +120,7 @@ impl Store { Ok(file) } - pub async fn connect(table_folder: &str) -> Result + pub async fn connect(table_folder: &PathBuf) -> Result where T: std::fmt::Debug + Encode + Decode + Ord { let path_to_table = Path::new(table_folder); From f5d45f6a1d3d7658ba765196dcdea788940571c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jind=C5=99ich=20Moravec?= Date: Mon, 5 Feb 2024 21:59:33 +0100 Subject: [PATCH 2/2] feat: connect server to the new interpreter --- minisql/src/interpreter2.rs | 25 +++++++--- parser/src/core.rs | 2 +- parser/src/validation.rs | 67 ++++++++++++------------- server/src/config.rs | 8 +-- server/src/main.rs | 98 ++++++------------------------------- server/src/persistence.rs | 15 ------ 6 files changed, 70 insertions(+), 145 deletions(-) delete mode 100644 server/src/persistence.rs diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index cb4eea1..81075cb 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -8,9 +8,9 @@ use crate::internals::row::Row; use bimap::BiMap; use std::path::{Path, PathBuf}; +use std::sync::Arc; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use std::rc::Rc; use serde::{Deserialize, Serialize}; use tokio::fs; @@ -25,7 +25,7 @@ const METADATA_FILE: &'static str = "metadata.json"; #[derive(Debug, Serialize, Deserialize)] pub struct State { table_name_position_mapping: BiMap, - table_schemas: Vec>, + table_schemas: Vec>, #[serde(skip)] tables: Tables, } @@ -39,11 +39,11 @@ pub type Tables = Vec>; #[derive(Debug)] pub struct Table { - schema: Rc, + schema: Arc, store: Store } -pub type DbSchema = Vec<(TableName, TablePosition, Rc)>; +pub type DbSchema = Vec<(TableName, TablePosition, Arc)>; // To satisfy clippy. impl Default for State { fn default() -> Self { @@ -61,13 +61,13 @@ impl Table { let store: Store = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap(); let table = Self { - schema: Rc::new(table_schema), + schema: Arc::new(table_schema), store, }; Ok(table) } - async fn connect(table_schema: Rc, db_path: &Path) -> DbResult { + async fn connect(table_schema: Arc, db_path: &Path) -> DbResult { let table_folder_name = table_schema.table_name(); let path_to_table_folder = db_path.join(table_folder_name); @@ -90,7 +90,7 @@ impl Table { Ok(cursor) } - pub fn schema(&self) -> Rc { + pub fn schema(&self) -> Arc { self.schema.clone() } @@ -111,7 +111,7 @@ impl State { pub fn db_schema(&self) -> DbSchema { let mut schema: DbSchema = Vec::new(); for (table_name, &table_position) in &self.table_name_position_mapping { - let table_schema: Rc = self.table_schemas[table_position].clone(); + let table_schema: Arc = self.table_schemas[table_position].clone(); schema.push((table_name.clone(), table_position, table_schema)); } schema @@ -160,6 +160,11 @@ impl State { } impl StateHandler { + pub fn is_existing_db(db_path: &PathBuf) -> bool { + db_path.exists() && db_path.is_dir() && + db_path.join(METADATA_FILE).exists() && db_path.join(METADATA_FILE).is_file() + } + pub async fn new(db_path: PathBuf) -> DbResult { fs::create_dir(db_path.clone()).await.map_err(|e| RuntimeError::IoError(e))?; @@ -195,6 +200,10 @@ impl StateHandler { fs::write(metadata_file, metadata_raw).await.map_err(|e| RuntimeError::IoError(e)) } + pub async fn read_state(&self) -> RwLockReadGuard { + self.state.read().await + } + pub async fn interpret(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { use Operation::*; diff --git a/parser/src/core.rs b/parser/src/core.rs index 1782f6b..28059fe 100644 --- a/parser/src/core.rs +++ b/parser/src/core.rs @@ -1,5 +1,5 @@ use crate::syntax::RawQuerySyntax; -use minisql::{interpreter::DbSchema, operation::Operation}; +use minisql::{interpreter2::DbSchema, operation::Operation}; use nom::{branch::alt, character::complete::{multispace0, char}, multi::many1, sequence::{delimited, terminated}, IResult}; use thiserror::Error; diff --git a/parser/src/validation.rs b/parser/src/validation.rs index 741d0fd..87eec28 100644 --- a/parser/src/validation.rs +++ b/parser/src/validation.rs @@ -1,4 +1,5 @@ use std::collections::{BTreeMap, HashSet}; +use std::sync::Arc; use thiserror::Error; use crate::parsing::literal::Literal; @@ -6,7 +7,7 @@ use crate::syntax; use crate::syntax::{ColumnSchema, RawQuerySyntax, RawTableSchema}; use minisql::operation; use minisql::{ - interpreter::DbSchema, + interpreter2::DbSchema, operation::Operation, schema::{Column, ColumnName, TableName, TablePosition, TableSchema}, type_system::DbType, @@ -63,15 +64,15 @@ pub fn validate_operation( } } -fn validate_table_exists<'a>( - db_schema: &DbSchema<'a>, - table_name: &'a TableName, -) -> Result<(TablePosition, &'a TableSchema), ValidationError> { +fn validate_table_exists( + db_schema: &DbSchema, + table_name: &TableName, +) -> Result<(TablePosition, Arc), ValidationError> { db_schema .iter() .find(|(tname, _, _)| table_name.eq(tname)) .ok_or(ValidationError::TableDoesNotExist(table_name.to_string())) - .map(|(_, table_position, table_schema)| (*table_position, *table_schema)) + .map(|(_, table_position, table_schema)| (*table_position, table_schema.clone())) } fn validate_create_table( @@ -167,7 +168,7 @@ fn validate_select( .iter() .filter_map(|column_name| schema.get_column(column_name)) .collect(); - let validated_condition = validate_condition(condition, schema)?; + let validated_condition = validate_condition(condition, &schema)?; Ok(Operation::Select( table_position, selection, @@ -178,7 +179,7 @@ fn validate_select( } } syntax::ColumnSelection::All => { - let validated_condition = validate_condition(condition, schema)?; + let validated_condition = validate_condition(condition, &schema)?; Ok(Operation::Select( table_position, schema.all_selection(), @@ -267,13 +268,13 @@ fn validate_delete( db_schema: &DbSchema, ) -> Result { let (table_position, schema) = validate_table_exists(db_schema, &table_name)?; - let validated_condition = validate_condition(condition, schema)?; + let validated_condition = validate_condition(condition, &schema)?; Ok(Operation::Delete(table_position, validated_condition)) } fn validate_condition( condition: Option, - schema: &TableSchema, + schema: &Arc, ) -> Result, ValidationError> { match condition { Some(condition) => match condition { @@ -338,14 +339,14 @@ where None } -fn get_table_schema<'a>( - db_schema: &DbSchema<'a>, - table_name: &'a TableName, -) -> Option<&'a TableSchema> { +fn get_table_schema( + db_schema: &DbSchema, + table_name: &TableName, +) -> Option> { let (_, _, table_schema) = db_schema .iter() .find(|(tname, _, _)| table_name.eq(tname))?; - Some(table_schema) + Some(table_schema.clone()) } fn literal_to_value(lit: Literal, hint: &DbType) -> Value { @@ -428,11 +429,11 @@ mod tests { } } - fn db_schema(users_schema: &TableSchema) -> DbSchema { - vec![("users".to_string(), 0, users_schema)] + fn db_schema(users_schema: Arc) -> DbSchema { + vec![("users".to_string(), 0, users_schema.clone())] } - fn empty_db_schema() -> DbSchema<'static> { + fn empty_db_schema() -> DbSchema { vec![] } @@ -547,7 +548,7 @@ mod tests { #[test] fn test_create_already_exists() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = CreateTable(raw_users_schema()); let result = validate_operation(syntax, &db_schema); @@ -561,7 +562,7 @@ mod tests { #[test] fn test_select_basic() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let id = 0; let name = 1; @@ -583,7 +584,7 @@ mod tests { #[test] fn test_select_non_existent_table() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Select("does_not_exist".to_string(), ColumnSelection::All, None); @@ -594,7 +595,7 @@ mod tests { #[test] fn test_select_eq() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let id = 0; @@ -622,7 +623,7 @@ mod tests { #[test] fn test_select_eq_columns_selection() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let name = 1; @@ -652,7 +653,7 @@ mod tests { #[test] fn test_select_eq_columns_selection_nonexistent_column_selected() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Select( "users".to_string(), @@ -666,7 +667,7 @@ mod tests { #[test] fn test_select_eq_non_existent_column() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Select( "users".to_string(), @@ -680,7 +681,7 @@ mod tests { #[test] fn test_select_eq_type_error() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Select( "users".to_string(), @@ -695,7 +696,7 @@ mod tests { #[test] fn test_insert() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; @@ -732,7 +733,7 @@ mod tests { #[test] fn test_insert_non_existent_column() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Insert( "users".to_string(), @@ -750,7 +751,7 @@ mod tests { #[test] fn test_insert_ill_typed_column() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = Insert( "users".to_string(), @@ -768,7 +769,7 @@ mod tests { #[test] fn test_delete_all() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; @@ -786,7 +787,7 @@ mod tests { #[test] fn test_delete_eq() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let age = 2; @@ -816,7 +817,7 @@ mod tests { #[test] fn test_create_index() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let users_position = 0; let age = 2; @@ -836,7 +837,7 @@ mod tests { #[test] fn test_create_index_nonexistent_column() { let users_schema: TableSchema = users_schema(); - let db_schema: DbSchema = db_schema(&users_schema); + let db_schema: DbSchema = db_schema(Arc::new(users_schema)); let syntax: RawQuerySyntax = CreateIndex("users".to_string(), "does_not_exist".to_string()); let result = validate_operation(syntax, &db_schema); diff --git a/server/src/config.rs b/server/src/config.rs index 2dedcbc..f709511 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -17,8 +17,8 @@ pub struct Configuration { help = "Port for the server to listen on" )] port: u16, - #[arg(short, long, help = "Path to the data file")] - file: PathBuf, + #[arg(short, long, help = "Path to the folder for database data")] + folder: PathBuf, #[arg(short, long, help = "Delay between rows in milliseconds")] throttle: Option, } @@ -30,8 +30,8 @@ impl Configuration { } #[inline] - pub fn get_file_path(&self) -> &PathBuf { - &self.file + pub fn get_folder_path(&self) -> &PathBuf { + &self.folder } #[inline] diff --git a/server/src/main.rs b/server/src/main.rs index e1e6f3d..a31ae91 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,14 +1,13 @@ use std::collections::HashMap; -use std::io::ErrorKind; use std::sync::Arc; use clap::Parser; use tokio::io::{BufReader, BufWriter}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::Mutex; -use minisql::interpreter::{Response, State}; -use minisql::response_writer::{CompleteStatus, ResponseWriter}; +use minisql::interpreter2::StateHandler; +use minisql::response_writer::ResponseWriter; use parser::parse_and_validate; use proto::handshake::errors::ServerHandshakeError; use proto::handshake::request::HandshakeRequest; @@ -22,23 +21,21 @@ use proto::writer::protowriter::{ProtoFlush, ProtoWriter}; use crate::cancellation::ResetCancelToken; use crate::config::Configuration; -use crate::persistence::state_to_file; use crate::proto_wrapper::ServerProtoWrapper; mod cancellation; mod config; -mod persistence; mod proto_wrapper; type TokenStore = Arc>>; -type SharedDbState = Arc>; +type SharedDbState = Arc; #[tokio::main] async fn main() -> anyhow::Result<()> { let config = Configuration::parse(); let config = Arc::new(config); - let state = Arc::new(RwLock::new(get_state(&config).await?)); + let state = Arc::new(get_state(&config).await?); let tokens = Arc::new(Mutex::new(HashMap::<(i32, i32), ResetCancelToken>::new())); let addr = config.get_socket_address(); @@ -59,15 +56,12 @@ async fn main() -> anyhow::Result<()> { } } -async fn get_state(config: &Configuration) -> anyhow::Result { - let result = persistence::state_from_file(config.get_file_path()).await; - match result { - Err(ref e) if e.kind() == ErrorKind::NotFound => { - println!("WARNING: No DB state file found, creating new one"); - Ok(State::new()) - } - Err(e) => Err(e)?, - Ok(state) => Ok(state), +async fn get_state(config: &Configuration) -> anyhow::Result { + let path = config.get_folder_path(); + if StateHandler::is_existing_db(&path) { + Ok(StateHandler::connect(path.clone()).await?) + } else { + Ok(StateHandler::new(path.clone()).await?) } } @@ -191,75 +185,11 @@ where token.reset(); let operation = { - let state = state.read().await; - let db_schema = state.db_schema(); + let db_schema = state.read_state().await.db_schema(); parse_and_validate(query, &db_schema)? }; - let need_write = { - let mut state = state.write().await; - let response = state.interpret(operation)?; - - match response { - Response::Deleted(i) => { - writer - .write_command_complete(CompleteStatus::Delete(i)) - .await?; - true - } - Response::Inserted => { - writer - .write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }) - .await?; - true - } - Response::Selected(schema, columns, mut rows) => { - writer.write_table_header(schema, &columns).await?; - match rows.next() { - Some(row) => { - writer.write_table_row(&row).await?; - - let mut sent_rows = 1; - for row in rows { - sent_rows += 1; - writer.write_table_row(&row).await?; - if token.is_canceled() { - token.reset(); - break; - } - } - - writer - .write_command_complete(CompleteStatus::Select(sent_rows)) - .await?; - } - _ => { - writer - .write_command_complete(CompleteStatus::Select(0)) - .await?; - } - } - false - } - Response::TableCreated => { - writer - .write_command_complete(CompleteStatus::CreateTable) - .await?; - true - } - Response::IndexCreated => { - writer - .write_command_complete(CompleteStatus::CreateIndex) - .await?; - true - } - } - }; - - if need_write { - let state = state.read().await; - state_to_file(&state, config.get_file_path()).await?; - } - + // TODO: PASS DOWN RESET CANCEL TOKEN + state.interpret(writer, operation).await?; Ok(()) } diff --git a/server/src/persistence.rs b/server/src/persistence.rs deleted file mode 100644 index 12479c2..0000000 --- a/server/src/persistence.rs +++ /dev/null @@ -1,15 +0,0 @@ -use minisql::interpreter::State; -use std::path::PathBuf; -use tokio::{fs, io}; - -pub async fn state_from_file(path: &PathBuf) -> io::Result { - let content = fs::read_to_string(path).await?; - let state = serde_json::from_str(&content)?; - Ok(state) -} - -pub async fn state_to_file(state: &State, path: &PathBuf) -> io::Result<()> { - let content = serde_json::to_string(state)?; - fs::write(path, content).await?; - Ok(()) -}