minisql/minisql/src/internals/table.rs
2024-01-20 15:08:18 +01:00

265 lines
8.7 KiB
Rust

use std::collections::{BTreeMap, HashMap, HashSet};
use crate::error::Error;
use crate::internals::column_index::ColumnIndex;
use crate::internals::row::{ColumnPosition, Row};
use crate::schema::{ColumnName, TableSchema, TableName};
use crate::result::DbResult;
use crate::type_system::{IndexableValue, Uuid, Value};
#[derive(Debug)]
pub struct Table {
schema: TableSchema,
rows: Rows, // TODO: Consider wrapping this in a lock. Also consider if we need to have the
// same lock for both rows and indexes
indexes: HashMap<ColumnPosition, ColumnIndex>,
}
pub type Rows = BTreeMap<Uuid, Row>;
impl Table {
pub fn new(table_schema: TableSchema) -> Self {
Self {
schema: table_schema,
rows: BTreeMap::new(),
indexes: HashMap::new(),
}
}
pub fn schema(&self) -> &TableSchema {
&self.schema
}
pub fn rows(&self) -> &Rows {
&self.rows
}
pub fn indexes(&self) -> &HashMap<ColumnPosition, ColumnIndex> {
&self.indexes
}
pub fn table_name(&self) -> &TableName {
&self.schema.table_name()
}
// ======Selection======
fn get_row_by_id(&self, id: Uuid) -> Option<Row> {
self.rows.get(&id).cloned()
}
fn get_rows_by_ids(&self, ids: HashSet<Uuid>) -> Vec<Row> {
ids.into_iter()
.filter_map(|id| self.get_row_by_id(id))
.collect()
}
fn get_rows_by_value(&self, column_position: ColumnPosition, value: &Value) -> Vec<Row> {
// brute-force search
self.rows
.values()
.filter_map(|row| {
if row.get(column_position) == Some(value) {
Some(row.clone())
} else {
None
}
})
.collect()
}
pub fn select_all_rows<'a>(&'a self, selected_column_positions: Vec<ColumnPosition>) -> impl Iterator<Item=Row> + 'a {
self.rows
.values()
.map(move |row| row.restrict_columns(&selected_column_positions))
}
pub fn select_rows_where_eq<'a>(
&'a self,
selected_column_positions: Vec<ColumnPosition>,
column_position: ColumnPosition,
value: Value,
) -> DbResult<impl Iterator<Item=Row> + 'a> {
let restrict_columns_of_row = move |row: Row| row.restrict_columns(&selected_column_positions);
match value {
Value::Indexable(value) => match self.fetch_ids_from_index(column_position, &value)? {
Some(ids) =>
Ok(self
.get_rows_by_ids(ids)
.into_iter()
.map(restrict_columns_of_row)
),
None =>
Ok(self
.get_rows_by_value(column_position, &Value::Indexable(value))
.into_iter()
.map(restrict_columns_of_row)
),
},
_ =>
Ok(self
.get_rows_by_value(column_position, &value)
.into_iter()
.map(restrict_columns_of_row)
),
}
}
// ======Insertion======
pub fn insert_row_at(&mut self, id: Uuid, row: Row) -> DbResult<()> {
if self.rows.get(&id).is_some() {
return Err(Error::AttemptingToInsertAlreadyPresentId(
self.table_name().clone(),
id,
));
}
for (column_position, column_index) in &mut self.indexes {
match row.get(*column_position) {
Some(Value::Indexable(val)) => column_index.add(val.clone(), id),
Some(_) => {}
None => {
return Err(Error::ColumnPositionDoesNotExist(
self.schema.table_name().clone(), // Note that I can't simply use self.table_name() here because of rust borrowing rules.
*column_position,
))
}
}
}
let _ = self.rows.insert(id, row);
Ok(())
}
// ======Deletion======
fn delete_row_by_id(&mut self, id: Uuid) -> usize {
match self.rows.remove(&id) {
Some(row) => {
for (column_position, column_index) in &mut self.indexes {
if let Value::Indexable(value) = &row[*column_position] {
let _ = column_index.remove(value, id);
};
}
1
}
None => 0,
}
}
fn delete_rows_by_ids(&mut self, ids: HashSet<Uuid>) -> usize {
let mut total_count = 0;
for id in ids {
total_count += self.delete_row_by_id(id)
}
total_count
}
fn delete_rows_by_value(&mut self, column_position: ColumnPosition, value: &Value) -> usize {
let matched_ids: HashSet<Uuid> = self
.rows
.iter()
.filter_map(|(id, row)| {
if row.get(column_position) == Some(value) {
Some(*id)
} else {
None
}
})
.collect();
self.delete_rows_by_ids(matched_ids)
}
pub fn delete_all_rows(&mut self) -> usize {
let number_of_rows = self.rows.len();
self.rows = BTreeMap::new();
self.indexes = HashMap::new();
number_of_rows
}
pub fn delete_rows_where_eq(
&mut self,
column_position: ColumnPosition,
value: Value,
) -> DbResult<usize> {
match value {
Value::Indexable(value) => match self.fetch_ids_from_index(column_position, &value)? {
Some(ids) => Ok(self.delete_rows_by_ids(ids)),
None => Ok(self.delete_rows_by_value(column_position, &Value::Indexable(value))),
},
_ => Ok(self.delete_rows_by_value(column_position, &value)),
}
}
// ======Indexing======
pub fn attach_index(&mut self, column_position: ColumnPosition) -> DbResult<()> {
let mut column_index: ColumnIndex = ColumnIndex::new();
update_index_from_table(&mut column_index, self, column_position)?;
self.indexes.insert(column_position, column_index);
Ok(())
}
fn fetch_ids_from_index(
&self,
column_position: ColumnPosition,
value: &IndexableValue,
) -> DbResult<Option<HashSet<Uuid>>> {
if self.schema.is_primary(column_position) {
match value {
IndexableValue::Uuid(id) => Ok(Some(HashSet::from([*id]))),
_ => {
let column_name: ColumnName = self
.schema
.column_name_from_column_position(column_position)?;
let type_ = self.schema.column_type(column_position);
Err(Error::ValueDoesNotMatchExpectedType(
self.table_name().clone(),
column_name,
type_,
Value::Indexable(value.clone()),
))
}
}
} else {
match self.indexes.get(&column_position) {
Some(index) => {
// Note that we are cloning the ids here! This can be very wasteful in some cases.
// It would be possible to just return a reference,
// but this seems fairly non-trivial.
let ids = index.get(value).cloned();
Ok(ids)
}
None => Ok(None),
}
}
}
}
// Should be used in the case when an index is created after the table has existed for a
// while. In such a case you need to build the index from the already existing rows.
fn update_index_from_table(
column_index: &mut ColumnIndex,
table: &Table,
column_position: ColumnPosition,
) -> DbResult<()> {
for (id, row) in &table.rows {
let value = match row.get(column_position) {
Some(Value::Indexable(value)) => value.clone(),
Some(_) => {
let column_name: ColumnName = table
.schema
.column_name_from_column_position(column_position)?;
return Err(Error::AttemptToIndexNonIndexableColumn(
table.table_name().to_string(),
column_name,
));
}
None => {
return Err(Error::ColumnPositionDoesNotExist(
table.table_name().to_string(),
column_position,
))
}
};
column_index.add(value, *id)
}
Ok(())
}