Create table partial implementation

This commit is contained in:
Yuriy Dupyn 2024-02-05 19:27:46 +01:00
parent f973df2ca2
commit 2dd0555174
5 changed files with 98 additions and 32 deletions

View file

@ -1,13 +1,13 @@
use crate::operation::{ColumnSelection, Condition, Operation};
use crate::restricted_row::RestrictedRow;
use crate::result::DbResult;
use crate::schema::{Column, TableName, TablePosition, TableSchema};
use crate::type_system::Value;
use crate::error::RuntimeError;
use crate::response_writer::ResponseWriter;
use crate::response_writer::{ResponseWriter, CompleteStatus};
use crate::internals::row::Row;
use bimap::BiMap;
use std::path::Path;
use storage_engine::store::Store;
use storage_engine::cursor::{ReadCursor, WriteCursor};
@ -38,8 +38,23 @@ impl Default for State {
}
impl Table {
async fn new(table_schema: TableSchema, db_path: &Path) -> DbResult<Self> {
let table_folder_name = table_schema.table_name();
let path_to_table_folder = db_path.join(table_folder_name);
let number_of_columns = table_schema.number_of_columns();
let primary_column = table_schema.primary_column() as storage_engine::store::Column;
let store: Store<Value> = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap();
let table = Self {
schema: table_schema,
store,
};
Ok(table)
}
async fn read(&self) -> DbResult<ReadCursor<Value>> {
let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e))?;
let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?;
Ok(cursor)
}
@ -47,6 +62,14 @@ impl Table {
let cursor = self.store.write_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e))?;
Ok(cursor)
}
pub fn schema(&self) -> &TableSchema {
&self.schema
}
pub fn table_name(&self) -> &TableName {
self.schema.table_name()
}
}
impl State {
@ -58,14 +81,12 @@ impl State {
}
pub fn db_schema(&self) -> DbSchema {
// let mut schema: DbSchema = Vec::new();
// for (table_name, &table_position) in &self.table_name_position_mapping {
// let table_schema = self.tables[table_position].schema();
// schema.push((table_name.clone(), table_position, table_schema));
// }
// schema
todo!()
let mut schema: DbSchema = Vec::new();
for (table_name, &table_position) in &self.table_name_position_mapping {
let table_schema: &TableSchema = self.tables[table_position].schema();
schema.push((table_name.clone(), table_position, table_schema));
}
schema
}
fn table_at(&self, table_position: TablePosition) -> &Table {
@ -76,22 +97,32 @@ impl State {
&mut self.tables[table_position]
}
async fn attach_table(&mut self, table: Table) {
// TODO: You need to update the global DB SCHEMA!
let new_table_position: TablePosition = self.tables.len();
self.table_name_position_mapping
.insert(table.schema().table_name().clone(), new_table_position);
self.tables.push(table);
}
async fn select_all_rows<Writer: ResponseWriter>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection) -> DbResult<()> {
while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.schema.table_name().to_string(), e))? {
while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? {
let row: Row = From::from(entry);
let restricted_row = row.restrict_columns(&column_selection);
response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?;
}
// TODO: You need to write COmmand complete
Ok(())
}
async fn select_eq<Writer: ResponseWriter>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut Writer, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult<()> {
let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.schema.table_name().to_string(), e))?;
let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
for entry in entries {
let row: Row = From::from(entry);
let restricted_row = row.restrict_columns(&column_selection);
response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?;
}
// TODO: You need to write COmmand complete
Ok(())
}
@ -125,7 +156,12 @@ impl State {
todo!()
}
CreateTable(table_schema) => {
todo!()
// TODO: This needs to be given from a config
let db_path = "test_db";
let table = Table::new(table_schema, Path::new(db_path)).await?;
self.attach_table(table).await;
response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e))
}
CreateIndex(table_position, column) => {
todo!()
@ -140,9 +176,35 @@ impl State {
#[cfg(test)]
mod tests {
use super::*;
use crate::operation::Operation;
use crate::schema::Column;
use crate::type_system::{DbType, IndexableValue, Value};
use std::collections::HashSet;
#[tokio::test]
async fn new_state() {
}
fn users_schema() -> TableSchema {
TableSchema::new(
"users".to_string(),
"id".to_string(),
vec!["id".to_string(), "name".to_string(), "age".to_string()],
vec![DbType::Uuid, DbType::String, DbType::Int],
)
}
#[test]
fn test_table_creation() {
let mut state = State::new();
let users_schema = users_schema();
let users = users_schema.table_name().clone();
// state
// .interpret(Operation::CreateTable(users_schema))
// .unwrap();
// assert!(state.tables.len() == 1);
// let table = &state.tables[0];
// assert!(table.rows().len() == 0);
// assert!(table.table_name() == &users);
}
}

View file

@ -50,6 +50,10 @@ impl TableSchema {
&self.table_name
}
pub fn primary_column(&self) -> Column {
self.primary_key
}
pub fn column_type(&self, column: Column) -> DbType {
self.types[column].clone()
}

View file

@ -364,7 +364,7 @@ impl <'cursor, T> WriteCursor<'cursor, T>
async fn spawn_cursor_to_intermediate_file(&self) -> Result<AppendOnlyCursor<T>>
where T: Send
{
let table_folder = self.header.table_folder.to_string();
let table_folder = self.header.table_folder.clone();
let path_to_table = Path::new(&table_folder);
let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);

View file

@ -2,10 +2,11 @@ use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence};
use crate::store::{Result, Column};
use crate::error::{Error, DecodeErrorKind};
use std::mem::size_of;
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub struct StoreHeader {
pub table_folder: String, // This one is not encoded into the file
pub table_folder: PathBuf, // This one is not encoded into the file
pub number_of_columns: usize,
pub deleted_count: usize,
@ -93,7 +94,7 @@ impl StoreHeader {
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?;
Ok(StoreHeader {
table_folder: header.table_folder,
table_folder: header.table_folder.into(),
number_of_columns: header.number_of_columns,
deleted_count: header.deleted_count,
total_count: header.total_count,

View file

@ -36,10 +36,9 @@ pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_i
impl <T>Store<T> {
// ===Creation===
pub async fn new(table_folder: &str, number_of_columns: usize, primary_column: Column) -> Result<Self>
pub async fn new(path_to_table: &Path, number_of_columns: usize, primary_column: Column) -> Result<Self>
where T: Encode + Decode + Ord
{
let path_to_table = Path::new(table_folder);
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
DirBuilder::new()
.create(path_to_table).await?;
@ -48,7 +47,7 @@ impl <T>Store<T> {
let mut indexed_columns = vec![false; number_of_columns];
indexed_columns[primary_column as usize] = true;
StoreHeader {
table_folder: table_folder.to_string(),
table_folder: path_to_table.to_path_buf(),
number_of_columns,
deleted_count: 0,
total_count: 0,
@ -225,7 +224,7 @@ mod tests {
async fn test_create() {
type Data = u32;
let table_path = "test_table_0";
let table_path = Path::new("test_table_0");
let number_of_columns = 5;
let primary_column = 0;
let store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
@ -240,7 +239,7 @@ mod tests {
async fn test_insert() {
type Data = u32;
let table_path = "test_table_1";
let table_path = Path::new("test_table_1");
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
@ -262,7 +261,7 @@ mod tests {
async fn test_select_next() {
type Data = u32;
let table_path = "test_table_2";
let table_path = Path::new("test_table_2");
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
@ -294,7 +293,7 @@ mod tests {
async fn test_select_all() {
type Data = u32;
let table_path = "test_table_3";
let table_path = Path::new("test_table_3");
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
@ -329,7 +328,7 @@ mod tests {
async fn test_select_eq() {
type Data = u32;
let table_path = "test_table_4";
let table_path = Path::new("test_table_4");
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
@ -369,7 +368,7 @@ mod tests {
async fn test_select_eq_indexed() {
type Data = u32;
let table_path = "test_table_5";
let table_path = Path::new("test_table_5");
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
@ -415,7 +414,7 @@ mod tests {
async fn test_delete_entry() {
type Data = u32;
let table_path = "test_table_6";
let table_path = Path::new("test_table_6");
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
@ -452,7 +451,7 @@ mod tests {
async fn test_delete_where_eq() {
type Data = u32;
let table_path = "test_table_7";
let table_path = Path::new("test_table_7");
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
@ -496,7 +495,7 @@ mod tests {
async fn test_garbage_collection() {
type Data = u32;
let table_path = "test_table_8";
let table_path = Path::new("test_table_8");
let number_of_columns = 5;
let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();