Introduce a global database lock (for table creation)

This commit is contained in:
Yuriy Dupyn 2024-02-05 20:31:43 +01:00
parent b884a6286d
commit 4c2e14acdc
2 changed files with 72 additions and 7 deletions

View file

@ -13,6 +13,8 @@ pub enum RuntimeError {
AttemptToIndexNonIndexableColumn(TableName, ColumnName),
#[error("table {0} already indexes column {1}")]
AttemptToIndexAlreadyIndexedColumn(TableName, ColumnName),
#[error("File-System Error: {0}")]
IoError(std::io::Error),
#[error("Storage Engine error for table {0}: {1}")]
StorageEngineError(TableName, storage_engine::error::Error),
#[error("runtime anyhow error: {0}")]

View file

@ -7,7 +7,10 @@ use crate::response_writer::{ResponseWriter, CompleteStatus};
use crate::internals::row::Row;
use bimap::BiMap;
use std::path::Path;
use std::path::{Path, PathBuf};
use tokio::sync::RwLock;
use tokio::fs;
use storage_engine::store::Store;
use storage_engine::cursor::{ReadCursor, WriteCursor};
@ -21,6 +24,12 @@ pub struct State {
tables: Tables,
}
pub struct StateHandler {
db_path: PathBuf,
state: RwLock<State>,
}
pub type Tables = Vec<Table>;
#[derive(Debug)]
@ -166,6 +175,61 @@ impl State {
}
}
impl StateHandler {
pub async fn new(db_path: PathBuf) -> DbResult<Self> {
fs::create_dir(db_path.clone()).await.map_err(|e| RuntimeError::IoError(e))?;
let state = Self {
db_path,
state: RwLock::new(State::new()),
};
Ok(state)
}
pub async fn connect(db_path: PathBuf) -> Self {
todo!()
}
pub async fn interpret<Writer: ResponseWriter>(&self, response_writer: &mut Writer, operation: Operation) -> DbResult<()> {
use Operation::*;
match operation {
Select(table_position, column_selection, maybe_condition) => {
let state = self.state.read().await;
let table: &Table = state.table_at(table_position);
let cursor = table.read().await?;
response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?;
let count = match maybe_condition {
None => State::select_all_rows(&table, cursor, response_writer, column_selection).await?,
Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, column_selection, eq_column, value).await?
};
response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e))
}
Insert(table_position, values) => {
let state = self.state.read().await;
todo!()
}
Delete(table_position, maybe_condition) => {
let state = self.state.read().await;
todo!()
}
CreateTable(table_schema) => {
let mut state = self.state.write().await;
let table = Table::new(table_schema, &self.db_path).await?;
state.attach_table(table).await;
response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e))
}
CreateIndex(table_position, column) => {
let state = self.state.read().await;
todo!()
}
}
}
}
@ -202,22 +266,21 @@ mod tests {
#[tokio::test]
async fn test_table_creation() {
let mut state = State::new();
let db_path = Path::new("db-test-0");
let state = StateHandler::new(db_path.to_path_buf()).await.unwrap();
let users_schema = users_schema();
let users = users_schema.table_name().clone();
let db_path = Path::new("db-test-0");
fs::create_dir(db_path).await.unwrap();
let mut response_writer = ResponseWriterStub::new();
state
.interpret(db_path, &mut response_writer, Operation::CreateTable(users_schema.clone())).await
.interpret(&mut response_writer, Operation::CreateTable(users_schema.clone())).await
.unwrap();
let users_position: TablePosition = 0;
state
.interpret(db_path, &mut response_writer, Operation::Select(
.interpret(&mut response_writer, Operation::Select(
users_position,
users_schema.all_selection(),
None,