feat: metadata serialization
This commit is contained in:
parent
610d70378e
commit
9b9f9f16f6
6 changed files with 54 additions and 19 deletions
5
Cargo.lock
generated
5
Cargo.lock
generated
|
|
@ -289,6 +289,7 @@ dependencies = [
|
|||
"bincode",
|
||||
"proto",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"storage_engine",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
|
|
@ -557,9 +558,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.112"
|
||||
version = "1.0.113"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4d1bd37ce2324cf3bf85e5a25f96eb4baf0d5aa6eba43e7ae8958870c4ec48ed"
|
||||
checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"ryu",
|
||||
|
|
|
|||
|
|
@ -11,8 +11,9 @@ anyhow = "1.0.79"
|
|||
async-trait = "0.1.77"
|
||||
bimap = { version = "0.6.3", features = ["serde"] }
|
||||
bincode = "2.0.0-rc.3"
|
||||
serde = { version = "1.0.196", features = ["derive"] }
|
||||
serde = { version = "1.0.196", features = ["derive", "rc"] }
|
||||
tokio = { version = "1.34.0", features = ["full"] }
|
||||
thiserror = "1.0.50"
|
||||
proto = { path = "../proto" }
|
||||
storage_engine = { path = "../storage_engine" }
|
||||
serde_json = "1.0.113"
|
||||
|
|
|
|||
|
|
@ -14,11 +14,13 @@ pub enum RuntimeError {
|
|||
#[error("table {0} already indexes column {1}")]
|
||||
AttemptToIndexAlreadyIndexedColumn(TableName, ColumnName),
|
||||
#[error("File-System Error: {0}")]
|
||||
IoError(std::io::Error),
|
||||
IoError(#[from] std::io::Error),
|
||||
#[error("Storage Engine error for table {0}: {1}")]
|
||||
StorageEngineError(TableName, storage_engine::error::Error),
|
||||
#[error("runtime anyhow error: {0}")]
|
||||
AnyhowError(anyhow::Error)
|
||||
AnyhowError(#[from] anyhow::Error),
|
||||
#[error("serde error: {0}")]
|
||||
SerdeError(#[from] serde_json::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
|
@ -32,10 +34,3 @@ pub enum TypeConversionError {
|
|||
#[error("unknown type with oid {oid} and size {size}")]
|
||||
UnknownType { oid: PgOid, size: i16 },
|
||||
}
|
||||
|
||||
|
||||
impl From<anyhow::Error> for RuntimeError {
|
||||
fn from(e: anyhow::Error) -> RuntimeError {
|
||||
Self::AnyhowError(e)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ use std::path::{Path, PathBuf};
|
|||
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use std::rc::Rc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs;
|
||||
|
||||
use storage_engine::store::Store;
|
||||
|
|
@ -18,11 +19,14 @@ use storage_engine::cursor::{ReadCursor, WriteCursor};
|
|||
use storage_engine::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex;
|
||||
|
||||
const METADATA_FILE: &'static str = "metadata.json";
|
||||
|
||||
// ==============Interpreter================
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct State {
|
||||
table_name_position_mapping: BiMap<TableName, TablePosition>,
|
||||
table_schemas: Vec<Rc<TableSchema>>,
|
||||
#[serde(skip)]
|
||||
tables: Tables,
|
||||
}
|
||||
|
||||
|
|
@ -63,6 +67,19 @@ impl Table {
|
|||
Ok(table)
|
||||
}
|
||||
|
||||
async fn connect(table_schema: Rc<TableSchema>, db_path: &Path) -> DbResult<Self> {
|
||||
let table_folder_name = table_schema.table_name();
|
||||
let path_to_table_folder = db_path.join(table_folder_name);
|
||||
|
||||
let store: Store<Value> = Store::connect(&path_to_table_folder).await.map_err(|e| RuntimeError::StorageEngineError(table_schema.table_name().to_string(), e))?;
|
||||
|
||||
let table = Self {
|
||||
schema: table_schema,
|
||||
store,
|
||||
};
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
async fn read(&self) -> DbResult<ReadCursor<Value>> {
|
||||
let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?;
|
||||
Ok(cursor)
|
||||
|
|
@ -153,8 +170,29 @@ impl StateHandler {
|
|||
Ok(state)
|
||||
}
|
||||
|
||||
pub async fn connect(db_path: PathBuf) -> Self {
|
||||
todo!()
|
||||
pub async fn connect(db_path: PathBuf) -> DbResult<Self> {
|
||||
let metadata_file = db_path.join(METADATA_FILE);
|
||||
let metadata_raw = fs::read_to_string(metadata_file).await?;
|
||||
let mut metadata: State = serde_json::from_str(&metadata_raw)?;
|
||||
|
||||
let mut tables = Vec::with_capacity(metadata.table_schemas.len());
|
||||
for table_schema in &metadata.table_schemas {
|
||||
let table = Table::connect(table_schema.clone(), &db_path).await?;
|
||||
tables.push(RwLock::new(table));
|
||||
}
|
||||
|
||||
metadata.tables = tables;
|
||||
Ok(Self {
|
||||
db_path,
|
||||
state: RwLock::new(metadata),
|
||||
})
|
||||
}
|
||||
|
||||
async fn save_metadata(&self) -> DbResult<()> {
|
||||
let metadata_file = self.db_path.join(METADATA_FILE);
|
||||
let state = self.state.read().await;
|
||||
let metadata_raw = serde_json::to_string(&*state)?;
|
||||
fs::write(metadata_file, metadata_raw).await.map_err(|e| RuntimeError::IoError(e))
|
||||
}
|
||||
|
||||
pub async fn interpret<Writer: ResponseWriter>(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> {
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ pub struct StoreHeader {
|
|||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StoreHeaderFixedPart {
|
||||
pub table_folder: String, // This one is not encoded into the file
|
||||
pub table_folder: PathBuf, // This one is not encoded into the file
|
||||
|
||||
pub number_of_columns: usize,
|
||||
pub deleted_count: usize,
|
||||
|
|
@ -64,7 +64,7 @@ impl StoreHeader {
|
|||
vec![0; Self::indexed_columns_size(header.number_of_columns)]
|
||||
}
|
||||
|
||||
pub async fn decode_fixed(table_folder: &str, result: &[u8]) -> Result<StoreHeaderFixedPart> {
|
||||
pub async fn decode_fixed(table_folder: &PathBuf, result: &[u8]) -> Result<StoreHeaderFixedPart> {
|
||||
let (number_of_columns, _) =
|
||||
decode::<usize>(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
|
||||
|
|
@ -78,7 +78,7 @@ impl StoreHeader {
|
|||
decode::<Column>(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?;
|
||||
let header = StoreHeaderFixedPart {
|
||||
table_folder: table_folder.to_string(),
|
||||
table_folder: table_folder.clone(),
|
||||
number_of_columns,
|
||||
deleted_count,
|
||||
total_count,
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ impl <T>Store<T> {
|
|||
Ok(file)
|
||||
}
|
||||
|
||||
pub async fn connect(table_folder: &str) -> Result<Self>
|
||||
pub async fn connect(table_folder: &PathBuf) -> Result<Self>
|
||||
where T: std::fmt::Debug + Encode + Decode + Ord
|
||||
{
|
||||
let path_to_table = Path::new(table_folder);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue