From 9b9f9f16f66d034d83209b2d7e89a83545f8f91e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jind=C5=99ich=20Moravec?= Date: Mon, 5 Feb 2024 21:36:58 +0100 Subject: [PATCH] feat: metadata serialization --- Cargo.lock | 5 ++- minisql/Cargo.toml | 3 +- minisql/src/error.rs | 13 ++---- minisql/src/interpreter2.rs | 44 +++++++++++++++++++-- storage_engine/src/segments/store_header.rs | 6 +-- storage_engine/src/store.rs | 2 +- 6 files changed, 54 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b653809..42c5df8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,6 +289,7 @@ dependencies = [ "bincode", "proto", "serde", + "serde_json", "storage_engine", "thiserror", "tokio", @@ -557,9 +558,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.112" +version = "1.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d1bd37ce2324cf3bf85e5a25f96eb4baf0d5aa6eba43e7ae8958870c4ec48ed" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" dependencies = [ "itoa", "ryu", diff --git a/minisql/Cargo.toml b/minisql/Cargo.toml index ca06eb6..4029a5c 100644 --- a/minisql/Cargo.toml +++ b/minisql/Cargo.toml @@ -11,8 +11,9 @@ anyhow = "1.0.79" async-trait = "0.1.77" bimap = { version = "0.6.3", features = ["serde"] } bincode = "2.0.0-rc.3" -serde = { version = "1.0.196", features = ["derive"] } +serde = { version = "1.0.196", features = ["derive", "rc"] } tokio = { version = "1.34.0", features = ["full"] } thiserror = "1.0.50" proto = { path = "../proto" } storage_engine = { path = "../storage_engine" } +serde_json = "1.0.113" diff --git a/minisql/src/error.rs b/minisql/src/error.rs index db2d416..03b772f 100644 --- a/minisql/src/error.rs +++ b/minisql/src/error.rs @@ -14,11 +14,13 @@ pub enum RuntimeError { #[error("table {0} already indexes column {1}")] AttemptToIndexAlreadyIndexedColumn(TableName, ColumnName), #[error("File-System Error: {0}")] - IoError(std::io::Error), + IoError(#[from] std::io::Error), #[error("Storage Engine error for table {0}: {1}")] StorageEngineError(TableName, storage_engine::error::Error), #[error("runtime anyhow error: {0}")] - AnyhowError(anyhow::Error) + AnyhowError(#[from] anyhow::Error), + #[error("serde error: {0}")] + SerdeError(#[from] serde_json::Error), } #[derive(Debug, Error)] @@ -32,10 +34,3 @@ pub enum TypeConversionError { #[error("unknown type with oid {oid} and size {size}")] UnknownType { oid: PgOid, size: i16 }, } - - -impl From for RuntimeError { - fn from(e: anyhow::Error) -> RuntimeError { - Self::AnyhowError(e) - } -} diff --git a/minisql/src/interpreter2.rs b/minisql/src/interpreter2.rs index 7004b1f..cb4eea1 100644 --- a/minisql/src/interpreter2.rs +++ b/minisql/src/interpreter2.rs @@ -11,6 +11,7 @@ use std::path::{Path, PathBuf}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::rc::Rc; +use serde::{Deserialize, Serialize}; use tokio::fs; use storage_engine::store::Store; @@ -18,11 +19,14 @@ use storage_engine::cursor::{ReadCursor, WriteCursor}; use storage_engine::cursor_capabilities::traversal::CursorCanTraverse; use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex; +const METADATA_FILE: &'static str = "metadata.json"; + // ==============Interpreter================ -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct State { table_name_position_mapping: BiMap, table_schemas: Vec>, + #[serde(skip)] tables: Tables, } @@ -63,6 +67,19 @@ impl Table { Ok(table) } + async fn connect(table_schema: Rc, db_path: &Path) -> DbResult { + let table_folder_name = table_schema.table_name(); + let path_to_table_folder = db_path.join(table_folder_name); + + let store: Store = Store::connect(&path_to_table_folder).await.map_err(|e| RuntimeError::StorageEngineError(table_schema.table_name().to_string(), e))?; + + let table = Self { + schema: table_schema, + store, + }; + Ok(table) + } + async fn read(&self) -> DbResult> { let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?; Ok(cursor) @@ -153,8 +170,29 @@ impl StateHandler { Ok(state) } - pub async fn connect(db_path: PathBuf) -> Self { - todo!() + pub async fn connect(db_path: PathBuf) -> DbResult { + let metadata_file = db_path.join(METADATA_FILE); + let metadata_raw = fs::read_to_string(metadata_file).await?; + let mut metadata: State = serde_json::from_str(&metadata_raw)?; + + let mut tables = Vec::with_capacity(metadata.table_schemas.len()); + for table_schema in &metadata.table_schemas { + let table = Table::connect(table_schema.clone(), &db_path).await?; + tables.push(RwLock::new(table)); + } + + metadata.tables = tables; + Ok(Self { + db_path, + state: RwLock::new(metadata), + }) + } + + async fn save_metadata(&self) -> DbResult<()> { + let metadata_file = self.db_path.join(METADATA_FILE); + let state = self.state.read().await; + let metadata_raw = serde_json::to_string(&*state)?; + fs::write(metadata_file, metadata_raw).await.map_err(|e| RuntimeError::IoError(e)) } pub async fn interpret(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { diff --git a/storage_engine/src/segments/store_header.rs b/storage_engine/src/segments/store_header.rs index 77d6de6..b8bdfd5 100644 --- a/storage_engine/src/segments/store_header.rs +++ b/storage_engine/src/segments/store_header.rs @@ -17,7 +17,7 @@ pub struct StoreHeader { #[derive(Debug, Clone)] pub struct StoreHeaderFixedPart { - pub table_folder: String, // This one is not encoded into the file + pub table_folder: PathBuf, // This one is not encoded into the file pub number_of_columns: usize, pub deleted_count: usize, @@ -64,7 +64,7 @@ impl StoreHeader { vec![0; Self::indexed_columns_size(header.number_of_columns)] } - pub async fn decode_fixed(table_folder: &str, result: &[u8]) -> Result { + pub async fn decode_fixed(table_folder: &PathBuf, result: &[u8]) -> Result { let (number_of_columns, _) = decode::(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; @@ -78,7 +78,7 @@ impl StoreHeader { decode::(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; let header = StoreHeaderFixedPart { - table_folder: table_folder.to_string(), + table_folder: table_folder.clone(), number_of_columns, deleted_count, total_count, diff --git a/storage_engine/src/store.rs b/storage_engine/src/store.rs index cf741a8..bb256ff 100644 --- a/storage_engine/src/store.rs +++ b/storage_engine/src/store.rs @@ -120,7 +120,7 @@ impl Store { Ok(file) } - pub async fn connect(table_folder: &str) -> Result + pub async fn connect(table_folder: &PathBuf) -> Result where T: std::fmt::Debug + Encode + Decode + Ord { let path_to_table = Path::new(table_folder);