Merge branch 'interpreter-to-storage-engine' into interpreter2-misc
# Conflicts: # minisql/src/interpreter2.rs
This commit is contained in:
commit
d5140ea814
5 changed files with 104 additions and 21 deletions
|
|
@ -15,9 +15,10 @@ use serde::{Deserialize, Serialize};
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
|
|
||||||
use storage_engine::store::Store;
|
use storage_engine::store::Store;
|
||||||
|
use storage_engine::segments::entry::Entry;
|
||||||
use storage_engine::cursor::{ReadCursor, WriteCursor};
|
use storage_engine::cursor::{ReadCursor, WriteCursor};
|
||||||
use storage_engine::cursor_capabilities::traversal::CursorCanTraverse;
|
use storage_engine::cursor_capabilities::traversal::CursorCanTraverse;
|
||||||
use storage_engine::cursor_capabilities::index_access::CursorCanReadIndex;
|
use storage_engine::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex};
|
||||||
use crate::cancellation::Cancellation;
|
use crate::cancellation::Cancellation;
|
||||||
|
|
||||||
const METADATA_FILE: &'static str = "metadata.json";
|
const METADATA_FILE: &'static str = "metadata.json";
|
||||||
|
|
@ -172,6 +173,19 @@ impl State {
|
||||||
|
|
||||||
Ok(count)
|
Ok(count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>) -> DbResult<usize> {
|
||||||
|
let count = cursor.delete_all_entries(true)
|
||||||
|
.await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?;
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, eq_column: Column, value: Value) -> DbResult<usize> {
|
||||||
|
let count =
|
||||||
|
cursor.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true)
|
||||||
|
.await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?;
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StateHandler {
|
impl StateHandler {
|
||||||
|
|
@ -238,14 +252,28 @@ impl StateHandler {
|
||||||
}
|
}
|
||||||
Insert(table_position, values) => {
|
Insert(table_position, values) => {
|
||||||
let state = self.state.read().await;
|
let state = self.state.read().await;
|
||||||
todo!()
|
|
||||||
|
let mut table = state.table_at_mut(table_position).await;
|
||||||
|
let mut cursor = table.write().await?;
|
||||||
|
|
||||||
|
let entry = Entry::new(values);
|
||||||
|
cursor.insert_entry(entry).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
|
||||||
|
|
||||||
|
response_writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||||
}
|
}
|
||||||
Delete(table_position, maybe_condition) => {
|
Delete(table_position, maybe_condition) => {
|
||||||
let state = self.state.read().await;
|
let state = self.state.read().await;
|
||||||
|
|
||||||
let mut table = state.table_at_mut(table_position).await;
|
let mut table = state.table_at_mut(table_position).await;
|
||||||
|
let table_name = table.table_name().clone();
|
||||||
let cursor = table.write().await?;
|
let cursor = table.write().await?;
|
||||||
|
|
||||||
todo!()
|
let count = match maybe_condition {
|
||||||
|
None => State::delete_all_rows(table_name, cursor).await?,
|
||||||
|
Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, eq_column, value).await?
|
||||||
|
};
|
||||||
|
|
||||||
|
response_writer.write_command_complete(CompleteStatus::Delete(count)).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||||
}
|
}
|
||||||
CreateTable(table_schema) => {
|
CreateTable(table_schema) => {
|
||||||
let mut state = self.state.write().await;
|
let mut state = self.state.write().await;
|
||||||
|
|
@ -255,7 +283,11 @@ impl StateHandler {
|
||||||
}
|
}
|
||||||
CreateIndex(table_position, column) => {
|
CreateIndex(table_position, column) => {
|
||||||
let state = self.state.read().await;
|
let state = self.state.read().await;
|
||||||
todo!()
|
|
||||||
|
let mut table = state.table_at_mut(table_position).await;
|
||||||
|
let mut cursor = table.write().await?;
|
||||||
|
cursor.attach_index(column as storage_engine::store::Column).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
|
||||||
|
response_writer.write_command_complete(CompleteStatus::CreateIndex).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -280,7 +312,6 @@ mod tests {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
println!("CLEANING UP INTERPRETER STATE");
|
println!("CLEANING UP INTERPRETER STATE");
|
||||||
|
|
||||||
// TODO: This should be part of the state schema
|
|
||||||
let table_folder = "db-test-0";
|
let table_folder = "db-test-0";
|
||||||
// Seems no one has figured out how to do AsyncDrop yet.
|
// Seems no one has figured out how to do AsyncDrop yet.
|
||||||
std::fs::remove_dir_all(table_folder).unwrap();
|
std::fs::remove_dir_all(table_folder).unwrap();
|
||||||
|
|
@ -310,6 +341,8 @@ mod tests {
|
||||||
.interpret(&mut response_writer, &DummyCancellation, Operation::CreateTable(users_schema.clone())).await
|
.interpret(&mut response_writer, &DummyCancellation, Operation::CreateTable(users_schema.clone())).await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
{
|
||||||
|
println!("==EMPTY SELECT===");
|
||||||
let users_position: TablePosition = 0;
|
let users_position: TablePosition = 0;
|
||||||
state
|
state
|
||||||
.interpret(&mut response_writer, &DummyCancellation, Operation::Select(
|
.interpret(&mut response_writer, &DummyCancellation, Operation::Select(
|
||||||
|
|
@ -318,6 +351,39 @@ mod tests {
|
||||||
None,
|
None,
|
||||||
)).await
|
)).await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let users = 0;
|
||||||
|
let (id, name, age) = (
|
||||||
|
Value::Uuid(0),
|
||||||
|
Value::String("Plato".to_string()),
|
||||||
|
Value::Int(64),
|
||||||
|
);
|
||||||
|
|
||||||
|
println!("About to insert!");
|
||||||
|
|
||||||
|
state
|
||||||
|
.interpret(&mut response_writer, &DummyCancellation, Operation::Insert(
|
||||||
|
users,
|
||||||
|
vec![id.clone(), name.clone(), age.clone()],
|
||||||
|
)).await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
{
|
||||||
|
println!("==SELECT===");
|
||||||
|
let users_position: TablePosition = 0;
|
||||||
|
state
|
||||||
|
.interpret(&mut response_writer, &DummyCancellation, Operation::Select(
|
||||||
|
users_position,
|
||||||
|
users_schema.all_selection(),
|
||||||
|
None,
|
||||||
|
)).await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// assert!(false);
|
// assert!(false);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ impl ResponseWriter for ResponseWriterStub
|
||||||
|
|
||||||
async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()> {
|
async fn write_table_row(&mut self, row: &RestrictedRow) -> anyhow::Result<()> {
|
||||||
for (_, value) in row.iter() {
|
for (_, value) in row.iter() {
|
||||||
print!("{:?}", value)
|
print!("{:?}, ", value)
|
||||||
}
|
}
|
||||||
println!();
|
println!();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -59,10 +59,16 @@ impl TableSchema {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_columns(&self) -> Vec<&ColumnName> {
|
pub fn get_columns(&self) -> Vec<&ColumnName> {
|
||||||
self.column_name_position_mapping
|
let mut columns_in_random_order: Vec<_> = self.column_name_position_mapping
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(name, _)| name)
|
.collect();
|
||||||
.collect()
|
columns_in_random_order.sort_by(|&(_, column0), &(_, column1)| column0.cmp(column1));
|
||||||
|
|
||||||
|
let columns: Vec<_> = columns_in_random_order
|
||||||
|
.iter()
|
||||||
|
.map(|(name, _)| *name)
|
||||||
|
.collect();
|
||||||
|
columns
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn does_column_exist(&self, column_name: &ColumnName) -> bool {
|
pub fn does_column_exist(&self, column_name: &ColumnName) -> bool {
|
||||||
|
|
|
||||||
|
|
@ -244,6 +244,21 @@ impl <'cursor, T> WriteCursor<'cursor, T>
|
||||||
Ok(count)
|
Ok(count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result<usize>
|
||||||
|
where T: Encode + Decode + Ord + Send + Sync + Clone
|
||||||
|
{
|
||||||
|
let mut count = 0;
|
||||||
|
while let Some(entry) = self.next_alive().await? {
|
||||||
|
count += 1;
|
||||||
|
self.mark_deleted_at(entry.file_position, false).await?
|
||||||
|
}
|
||||||
|
|
||||||
|
if enable_garbage_collector {
|
||||||
|
self.attempt_garbage_collection_if_necessary().await?;
|
||||||
|
}
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result<usize>
|
pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result<usize>
|
||||||
where T: Encode + Decode + Ord + Send + Sync + Clone
|
where T: Encode + Decode + Ord + Send + Sync + Clone
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -23,10 +23,6 @@ impl <T>Entry<T> {
|
||||||
Self { header: EntryHeader { is_deleted: false }, data }
|
Self { header: EntryHeader { is_deleted: false }, data }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_deleted(data: Vec<T>) -> Self {
|
|
||||||
Self { header: EntryHeader { is_deleted: true}, data }
|
|
||||||
}
|
|
||||||
|
|
||||||
// FORMAT: [EntryHeaderWithDataSize, ..sequence of data]
|
// FORMAT: [EntryHeaderWithDataSize, ..sequence of data]
|
||||||
pub fn encode(&self) -> Result<Vec<u8>>
|
pub fn encode(&self) -> Result<Vec<u8>>
|
||||||
where T: Encode
|
where T: Encode
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue