Make table schemas less painful

This commit is contained in:
Yuriy Dupyn 2024-02-05 21:02:30 +01:00
parent 4c2e14acdc
commit 84fc58471d

View file

@ -9,7 +9,8 @@ use crate::internals::row::Row;
use bimap::BiMap; use bimap::BiMap;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tokio::sync::RwLock; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::rc::Rc;
use tokio::fs; use tokio::fs;
use storage_engine::store::Store; use storage_engine::store::Store;
@ -21,24 +22,24 @@ use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex;
#[derive(Debug)] #[derive(Debug)]
pub struct State { pub struct State {
table_name_position_mapping: BiMap<TableName, TablePosition>, table_name_position_mapping: BiMap<TableName, TablePosition>,
table_schemas: Vec<Rc<TableSchema>>,
tables: Tables, tables: Tables,
} }
pub struct StateHandler { pub struct StateHandler {
db_path: PathBuf, db_path: PathBuf,
state: RwLock<State>, state: RwLock<State>,
} }
pub type Tables = Vec<Table>; pub type Tables = Vec<RwLock<Table>>;
#[derive(Debug)] #[derive(Debug)]
pub struct Table { pub struct Table {
schema: TableSchema, schema: Rc<TableSchema>,
store: Store<Value> store: Store<Value>
} }
pub type DbSchema<'a> = Vec<(TableName, TablePosition, &'a TableSchema)>; pub type DbSchema = Vec<(TableName, TablePosition, Rc<TableSchema>)>;
// To satisfy clippy. // To satisfy clippy.
impl Default for State { impl Default for State {
fn default() -> Self { fn default() -> Self {
@ -56,7 +57,7 @@ impl Table {
let store: Store<Value> = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap(); let store: Store<Value> = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap();
let table = Self { let table = Self {
schema: table_schema, schema: Rc::new(table_schema),
store, store,
}; };
Ok(table) Ok(table)
@ -72,8 +73,8 @@ impl Table {
Ok(cursor) Ok(cursor)
} }
pub fn schema(&self) -> &TableSchema { pub fn schema(&self) -> Rc<TableSchema> {
&self.schema self.schema.clone()
} }
pub fn table_name(&self) -> &TableName { pub fn table_name(&self) -> &TableName {
@ -85,6 +86,7 @@ impl State {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
table_name_position_mapping: BiMap::new(), table_name_position_mapping: BiMap::new(),
table_schemas: vec![],
tables: vec![], tables: vec![],
} }
} }
@ -92,18 +94,18 @@ impl State {
pub fn db_schema(&self) -> DbSchema { pub fn db_schema(&self) -> DbSchema {
let mut schema: DbSchema = Vec::new(); let mut schema: DbSchema = Vec::new();
for (table_name, &table_position) in &self.table_name_position_mapping { for (table_name, &table_position) in &self.table_name_position_mapping {
let table_schema: &TableSchema = self.tables[table_position].schema(); let table_schema: Rc<TableSchema> = self.table_schemas[table_position].clone();
schema.push((table_name.clone(), table_position, table_schema)); schema.push((table_name.clone(), table_position, table_schema));
} }
schema schema
} }
fn table_at(&self, table_position: TablePosition) -> &Table { async fn table_at(&self, table_position: TablePosition) -> RwLockReadGuard<Table> {
&self.tables[table_position] self.tables[table_position].read().await
} }
fn table_at_mut(&mut self, table_position: TablePosition) -> &mut Table { async fn table_at_mut(&self, table_position: TablePosition) -> RwLockWriteGuard<Table> {
&mut self.tables[table_position] self.tables[table_position].write().await
} }
async fn attach_table(&mut self, table: Table) { async fn attach_table(&mut self, table: Table) {
@ -111,7 +113,8 @@ impl State {
let new_table_position: TablePosition = self.tables.len(); let new_table_position: TablePosition = self.tables.len();
self.table_name_position_mapping self.table_name_position_mapping
.insert(table.schema().table_name().clone(), new_table_position); .insert(table.schema().table_name().clone(), new_table_position);
self.tables.push(table); self.table_schemas.push(table.schema());
self.tables.push(RwLock::new(table));
} }
async fn select_all_rows<Writer: ResponseWriter>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection) -> DbResult<usize> { async fn select_all_rows<Writer: ResponseWriter>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection) -> DbResult<usize> {
@ -138,41 +141,41 @@ impl State {
Ok(count) Ok(count)
} }
pub async fn interpret<Writer: ResponseWriter>(&mut self, db_path: &Path, response_writer: &mut Writer, operation: Operation) -> DbResult<()> { // pub async fn interpret<Writer: ResponseWriter>(&mut self, db_path: &Path, response_writer: &mut Writer, operation: Operation) -> DbResult<()> {
use Operation::*; // use Operation::*;
match operation { // match operation {
Select(table_position, column_selection, maybe_condition) => { // Select(table_position, column_selection, maybe_condition) => {
let table: &Table = self.table_at(table_position); // let table: &Table = self.table_at(table_position);
let cursor = table.read().await?; // let cursor = table.read().await?;
response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; // response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?;
let count = match maybe_condition { // let count = match maybe_condition {
None => Self::select_all_rows(&table, cursor, response_writer, column_selection).await?, // None => Self::select_all_rows(&table, cursor, response_writer, column_selection).await?,
Some(Condition::Eq(eq_column, value)) => Self::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await? // Some(Condition::Eq(eq_column, value)) => Self::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await?
}; // };
response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) // response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e))
} // }
Insert(table_position, values) => { // Insert(table_position, values) => {
let table: &mut Table = self.table_at_mut(table_position); // let table: &mut Table = self.table_at_mut(table_position);
let cursor = table.write().await?; // let cursor = table.write().await?;
todo!() // todo!()
} // }
Delete(table_position, maybe_condition) => { // Delete(table_position, maybe_condition) => {
let table: &mut Table = self.table_at_mut(table_position); // let table: &mut Table = self.table_at_mut(table_position);
let cursor = table.write().await?; // let cursor = table.write().await?;
todo!() // todo!()
} // }
CreateTable(table_schema) => { // CreateTable(table_schema) => {
let table = Table::new(table_schema, Path::new(db_path)).await?; // let table = Table::new(table_schema, Path::new(db_path)).await?;
self.attach_table(table).await; // self.attach_table(table).await;
response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e)) // response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e))
} // }
CreateIndex(table_position, column) => { // CreateIndex(table_position, column) => {
todo!() // todo!()
} // }
} // }
} // }
} }
impl StateHandler { impl StateHandler {
@ -197,7 +200,7 @@ impl StateHandler {
Select(table_position, column_selection, maybe_condition) => { Select(table_position, column_selection, maybe_condition) => {
let state = self.state.read().await; let state = self.state.read().await;
let table: &Table = state.table_at(table_position); let table = state.table_at(table_position).await;
let cursor = table.read().await?; let cursor = table.read().await?;
response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?;