Compare commits

...

14 commits

Author SHA1 Message Date
Yura Dupyn
72013ac9d3 Demo 2026-05-03 16:18:49 +02:00
Yuriy Dupyn
e6002df18e Merge branch 'lint-fix' into 'main'
Lint fix

See merge request x433485/minisql!40
2024-02-05 23:55:34 +01:00
Yuriy Dupyn
a3bc0b07e2 Merge remote-tracking branch 'origin/main' into lint-fix 2024-02-05 23:54:38 +01:00
Yuriy Dupyn
fa34eac409 Merge branch 'dependencies' into 'main'
Cleanup dependencies

See merge request x433485/minisql!39
2024-02-05 23:53:10 +01:00
Jindřich Moravec
0702f4e3e1 Merge branch 'update-readme' into 'main'
Update README.md

See merge request x433485/minisql!41
2024-02-05 23:52:28 +01:00
Jindřich Moravec
25824d78f9 chore: cleanup todos and unused parameter 2024-02-05 23:50:25 +01:00
Maxim Svistunov
b7c16b4073 Update README.md 2024-02-05 23:49:05 +01:00
Maxim Svistunov
3499dbcdf7 Merge branch 'demo-data-10k' into 'main'
Add a 10k demo data table

See merge request x433485/minisql!38
2024-02-05 23:48:26 +01:00
Yuriy Dupyn
ae03d52c0c clippy 2024-02-05 23:44:33 +01:00
Jindřich Moravec
d36d3300d0 fix: add missing features 2024-02-05 23:42:06 +01:00
Yuriy Dupyn
588db169f8 Linting 2024-02-05 23:35:49 +01:00
Maxim Svistunov
9ea0a8ff18 Make two more entries have value "42" in the table 2024-02-05 23:26:24 +01:00
Maxim Svistunov
b1641876df Add a 10k demo data table 2024-02-05 23:20:58 +01:00
Yuriy Dupyn
c25c6edc6a formatting 2024-02-05 23:11:38 +01:00
41 changed files with 11084 additions and 680 deletions

2
.gitignore vendored
View file

@ -1,3 +1,3 @@
.idea .idea
/target target
tmp_repl.txt tmp_repl.txt

View file

@ -1,16 +0,0 @@
stages:
- primary
build-and-test:
stage: primary
image: rust:1.74.0
tags:
- shared-fi
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
script:
# build and test in single job for faster execution
# because shared runner doesn't allow caching
- cargo build --verbose
- cargo test --verbose

View file

@ -15,28 +15,29 @@
## Running the server ## Running the server
```bash ```bash
cargo run -p server -- [OPTIONS] --file <FILE> cargo run -p server -- [OPTIONS] --folder <FOLDER>
``` ```
``` ```
Options: Options:
-a, --address <ADDRESS> IP address for the server to listen on [default: 127.0.0.1] -a, --address <ADDRESS> IP address for the server to listen on [default: 127.0.0.1]
-p, --port <PORT> Port for the server to listen on [default: 5432] -p, --port <PORT> Port for the server to listen on [default: 5432]
-f, --file <FILE> Path to the data file -f, --folder <FOLDER> Path to the folder for database data
-t, --throttle <DELAY> Delay between rows in milliseconds
-h, --help Print help -h, --help Print help
``` ```
This will start the server listening on `<ADDRESS>:<PORT>` and load the state from `<FILE>`. This will start the server listening on `<ADDRESS>:<PORT>` and load the state from `<FOLDER>`.
If the `<FILE>` does not exist, it will be created. If the `<FOLDER>` does not exist, it will be created.
### Demo Database ### Demo Database
A database with demo data is available in `demo.json`. To run the server with this database, use: Commands that set up a database with demo data are available in `demo-1.sql`, `demo-2.sql`, and `demo-3.sql`.
```bash
cargo run -p server -- --file demo.json
```
This database contains two tables: These files showcase the following things:
- Table `users` with columns `id`, `name`, `surname`, `email` - `demo-1.sql`: small tables that handle Unicode and make use of the Optional type
- Table `cars` with columns `id`, `vid`, `brand`, `model`, `year` - `demo-2.sql`: a bigger (1000 rows) table with realistic data
- `demo-3.sql`: a big (10,000 rows) table with simple data (was easier to generate)
See comments in these files for details.
## Running the client ## Running the client
```bash ```bash
@ -56,7 +57,7 @@ SQL queries can be entered line by line. The client will print the result of eac
To exit the REPL client, enter `exit` or `quit`. To exit the REPL client, enter `exit` or `quit`.
# Features # Features
- SQL must be on single line, in **UPPERCASE** and end with `;`, as it should be - SQL must be on single line, in **UPPERCASE**, as it should be. The `;` at EOL is optional.
- Supported operations: `CREATE TABLE`, `CREATE INDEX`, `SELECT`, `INSERT`, `DELETE` - Supported operations: `CREATE TABLE`, `CREATE INDEX`, `SELECT`, `INSERT`, `DELETE`
- Supported data types: `UUID`, `STRING`, `INT`, `NUMBER` - Supported data types: `UUID`, `STRING`, `INT`, `NUMBER`
- Supported subset of PostgreSQL protocol, without authentication and simple query flow - Supported subset of PostgreSQL protocol, without authentication and simple query flow
@ -129,6 +130,7 @@ DELETE FROM users WHERE name = "Christina";
- `STRING` - string enclosed in double quotes, e.g. `"Hello World"` - `STRING` - string enclosed in double quotes, e.g. `"Hello World"`
- `INT` - integer, e.g. `12345` - `INT` - integer, e.g. `12345`
- `NUMBER` - floating point number, e.g. `123.45` - `NUMBER` - floating point number, e.g. `123.45`
- `Option(Something)` - optional type, e.g. `Some("james.stuart@gmail.com")` or `None`
## Testing with `psql` ## Testing with `psql`
Thanks to the subset of PostgreSQL protocol implemented in `proto`, the server can be tested with `psql`, Thanks to the subset of PostgreSQL protocol implemented in `proto`, the server can be tested with `psql`,

View file

@ -1,10 +1,13 @@
-- Create the users table -- Create the users table
CREATE TABLE users (id UUID PRIMARY KEY, name STRING, surname STRING); CREATE TABLE users (id UUID PRIMARY KEY, name STRING, surname STRING);
-- Create the cars table with vid as an optional type -- Create the cars table with vid as an optional type
CREATE TABLE cars (id UUID PRIMARY KEY, vid Option(STRING), brand STRING, model STRING); CREATE TABLE cars (id UUID PRIMARY KEY, vid Option(STRING), brand STRING, model STRING);
-- Insert entries into users -- Insert entries into users
INSERT INTO users (id, name, surname) VALUES (u1001, "Jiří", "Novák"); INSERT INTO users (id, name, surname) VALUES (u1001, "Jiří", "Novák");
INSERT INTO users (id, name, surname) VALUES (u1002, "Petr", "Svoboda"); INSERT INTO users (id, name, surname) VALUES (u1002, "Petr", "Svoboda");
INSERT INTO users (id, name, surname) VALUES (u1003, "Marek", "Dvořák"); INSERT INTO users (id, name, surname) VALUES (u1003, "Marek", "Dvořák");
@ -17,6 +20,7 @@ INSERT INTO users (id, name, surname) VALUES (u1009, "ゆうた", "わたなべ"
INSERT INTO users (id, name, surname) VALUES (u1010, "あやか", "おかもと"); INSERT INTO users (id, name, surname) VALUES (u1010, "あやか", "おかもと");
-- Insert entries into cars -- Insert entries into cars
INSERT INTO cars (id, vid, brand, model) VALUES (u2001, None, "Toyota", "Corolla"); INSERT INTO cars (id, vid, brand, model) VALUES (u2001, None, "Toyota", "Corolla");
INSERT INTO cars (id, vid, brand, model) VALUES (u2002, None, "Ford", "Fiesta"); INSERT INTO cars (id, vid, brand, model) VALUES (u2002, None, "Ford", "Fiesta");
INSERT INTO cars (id, vid, brand, model) VALUES (u2003, Some("WBAKB0C52AC111480"), "Volkswagen", "Golf"); INSERT INTO cars (id, vid, brand, model) VALUES (u2003, Some("WBAKB0C52AC111480"), "Volkswagen", "Golf");
@ -29,16 +33,29 @@ INSERT INTO cars (id, vid, brand, model) VALUES (u2009, None, "Kia", "Optima");
INSERT INTO cars (id, vid, brand, model) VALUES (u2010, Some("1G4GG5E35DF715445"), "Audi", "A4"); INSERT INTO cars (id, vid, brand, model) VALUES (u2010, Some("1G4GG5E35DF715445"), "Audi", "A4");
-- SELECT and DELETE commands for users and cars -- SELECT and DELETE commands for users and cars
SELECT * FROM users;
SELECT name FROM users;
SELECT name, surname FROM users;
SELECT * FROM users WHERE name = "さくら";
DELETE FROM users WHERE surname = "Novák";
SELECT * FROM users; SELECT * FROM users;
SELECT name FROM users;
SELECT name, surname FROM users;
SELECT * FROM users WHERE name = "さくら";
DELETE FROM users WHERE surname = "Novák";
SELECT * FROM users;
SELECT * FROM cars; SELECT * FROM cars;
SELECT brand FROM cars; SELECT brand FROM cars;
SELECT brand, model FROM cars; SELECT brand, model FROM cars;
SELECT * FROM cars WHERE brand = "Ford"; SELECT * FROM cars WHERE brand = "Ford";
DELETE FROM cars WHERE brand = "Ford"; DELETE FROM cars WHERE brand = "Ford";
SELECT * FROM cars; SELECT * FROM cars;

View file

@ -1,9 +1,13 @@
-- Create the people table -- Create the people table
-- (not using the "users" name so that you can use this file in the same -- (not using the "users" name so that you can use this file in the same
-- DB instance as demo-3.sql) -- DB instance as demo-3.sql)
CREATE TABLE people (id UUID PRIMARY KEY, first_name STRING, surname STRING); CREATE TABLE people (id UUID PRIMARY KEY, first_name STRING, surname STRING);
-- Insert entries into people -- Insert entries into people
SELECT * FROM people;
INSERT INTO people (id, first_name, surname) VALUES (u1, "Ranice", "Hardman"); INSERT INTO people (id, first_name, surname) VALUES (u1, "Ranice", "Hardman");
INSERT INTO people (id, first_name, surname) VALUES (u2, "Amara", "Fieldsend"); INSERT INTO people (id, first_name, surname) VALUES (u2, "Amara", "Fieldsend");
INSERT INTO people (id, first_name, surname) VALUES (u3, "Stillmann", "Metzing"); INSERT INTO people (id, first_name, surname) VALUES (u3, "Stillmann", "Metzing");
@ -1005,7 +1009,22 @@ INSERT INTO people (id, first_name, surname) VALUES (u998, "James", "Dinning");
INSERT INTO people (id, first_name, surname) VALUES (u999, "Alvina", "Varcoe"); INSERT INTO people (id, first_name, surname) VALUES (u999, "Alvina", "Varcoe");
INSERT INTO people (id, first_name, surname) VALUES (u1000, "James", "Postance"); INSERT INTO people (id, first_name, surname) VALUES (u1000, "James", "Postance");
INSERT INTO people (id, first_name, surname) VALUES (u1001, "James", "Postance");
-- SELECT commands for people -- SELECT commands for people
SELECT * FROM people;
INSERT INTO people (id, first_name, surname) VALUES (u1001, "Foo", "Bar");
SELECT * FROM people WHERE first_name = "James"; SELECT * FROM people WHERE first_name = "James";
CREATE INDEX PeopleName ON people (first_name); CREATE INDEX PeopleName ON people (first_name);
SELECT * FROM people WHERE first_name = "James"; SELECT * FROM people WHERE first_name = "James";
DELETE FROM people;

10023
demo-3.sql Normal file

File diff suppressed because it is too large Load diff

103
demo.txt Normal file
View file

@ -0,0 +1,103 @@
:kill-pane
cargo build
cargo test
rm -rf db_demo
cargo run --bin server --
cargo run --bin server -- --folder db_demo
rlwrap cargo run --bin client --
======================Games Table========================================
SELECT * FROM games;
REATE TABLE games (id UUID PRIMARY KEY, name STRING, year INT, price NUMBER);
INSERT INTO games (id, name, year, price) VALUES (u1, "skyrim", 2011, 1024.5);
INSERT INTO games (id, name, year, price) VALUES (u2, "DOOM 2", 1994, 350.0);
DELETE FROM games;
WHERE name = "Christina";
SELECT * FROM games;
SELECT * FROM games WHERE id=u2;
SELECT * FROM games WHERE name="skyrim";
SELECT name, year, year, year, name FROM games;
SELECT * FROM games WHERE id=u3;
===================Users Table======================
// Optionals
CREATE TABLE users (id UUID PRIMARY KEY, name STRING, surname STRING, email Option(STRING));
INSERT INTO users (id, name, surname, email) VALUES (u1, "Hero", "Protagonist 😊", Some("snow_crash@gmail.com"));
INSERT INTO users (id, name, surname, email) VALUES (u26, "Arnold", "schwarzenegger", Some("gettothechoppa@yahoo.com"));
INSERT INTO users (id, name, surname, email) VALUES (u27, "Arnold", "Vosloo", None);
INSERT INTO users (id, name, surname, email) VALUES (u29, "New", "Guy", None);
"hello" : String
Some("hello") : Option(String)
None : Option(String)
SELECT * FROM users;
SELECT * FROM users WHERE email=None;
SELECT * FROM users WHERE email=Some("gettothechoppa@yahoo.com");
DELETE FROM users WHERE id=u1;
DELETE FROM users WHERE id=u27;
CREATE TABLE users1 (id UUID PRIMARY KEY, name STRING, surname STRING, email Option(Option(STRING)));
INSERT INTO users1 (id, name, surname, email) VALUES (u1, "Hero", "Protagonist 😊", None);
INSERT INTO users1 (id, name, surname, email) VALUES (u26, "Arnold", "schwarzenegger", Some(Some("gettothechoppa@yahoo.com")));
SELECT * FROM users1;
SELECT * FROM users1 WHERE email=None;
DELETE FROM users;
// ==============Indexes, Concurrency, Garbage Collection================
CREATE INDEX CarsYear ON cars (year);

View file

@ -14,7 +14,7 @@ bincode = { workspace = true }
serde = { workspace = true, features = ["derive", "rc"] } serde = { workspace = true, features = ["derive", "rc"] }
serde_json = "1.0.113" serde_json = "1.0.113"
thiserror = { workspace = true } thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros"] } tokio = { workspace = true, features = ["macros", "sync"] }
proto = { path = "../proto" } proto = { path = "../proto" }
storage_engine = { path = "../storage_engine" } storage_engine = { path = "../storage_engine" }

View file

@ -1,25 +1,27 @@
use crate::error::RuntimeError;
use crate::internals::row::Row;
use crate::operation::{ColumnSelection, Condition, Operation}; use crate::operation::{ColumnSelection, Condition, Operation};
use crate::response_writer::{CompleteStatus, ResponseWriter};
use crate::result::DbResult; use crate::result::DbResult;
use crate::schema::{Column, TableName, TablePosition, TableSchema}; use crate::schema::{Column, TableName, TablePosition, TableSchema};
use crate::type_system::Value; use crate::type_system::Value;
use crate::error::RuntimeError;
use crate::response_writer::{ResponseWriter, CompleteStatus};
use crate::internals::row::Row;
use bimap::BiMap; use bimap::BiMap;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::fs; use tokio::fs;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use storage_engine::store::Store;
use storage_engine::segments::entry::Entry;
use storage_engine::cursor::{ReadCursor, WriteCursor};
use storage_engine::cursor_capabilities::traversal::CursorCanTraverse;
use storage_engine::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex};
use crate::cancellation::Cancellation; use crate::cancellation::Cancellation;
use storage_engine::cursor::{ReadCursor, WriteCursor};
use storage_engine::cursor_capabilities::index_access::{
CursorCanReadIndex, CursorCanWriteToIndex,
};
use storage_engine::cursor_capabilities::traversal::CursorCanTraverse;
use storage_engine::segments::entry::Entry;
use storage_engine::store::Store;
const METADATA_FILE: &'static str = "metadata.json"; const METADATA_FILE: &'static str = "metadata.json";
@ -42,7 +44,7 @@ pub type Tables = Vec<RwLock<Table>>;
#[derive(Debug)] #[derive(Debug)]
pub struct Table { pub struct Table {
schema: Arc<TableSchema>, schema: Arc<TableSchema>,
store: Store<Value> store: Store<Value>,
} }
pub type DbSchema = Vec<(TableName, TablePosition, Arc<TableSchema>)>; pub type DbSchema = Vec<(TableName, TablePosition, Arc<TableSchema>)>;
@ -60,7 +62,10 @@ impl Table {
let number_of_columns = table_schema.number_of_columns(); let number_of_columns = table_schema.number_of_columns();
let primary_column = table_schema.primary_column() as storage_engine::store::Column; let primary_column = table_schema.primary_column() as storage_engine::store::Column;
let store: Store<Value> = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap(); let store: Store<Value> =
Store::new(&path_to_table_folder, number_of_columns, primary_column)
.await
.unwrap();
let table = Self { let table = Self {
schema: Arc::new(table_schema), schema: Arc::new(table_schema),
@ -73,7 +78,9 @@ impl Table {
let table_folder_name = table_schema.table_name(); let table_folder_name = table_schema.table_name();
let path_to_table_folder = db_path.join(table_folder_name); let path_to_table_folder = db_path.join(table_folder_name);
let store: Store<Value> = Store::connect(&path_to_table_folder).await.map_err(|e| RuntimeError::StorageEngineError(table_schema.table_name().to_string(), e))?; let store: Store<Value> = Store::connect(&path_to_table_folder).await.map_err(|e| {
RuntimeError::StorageEngineError(table_schema.table_name().to_string(), e)
})?;
let table = Self { let table = Self {
schema: table_schema, schema: table_schema,
@ -83,12 +90,18 @@ impl Table {
} }
async fn read(&self) -> DbResult<ReadCursor<Value>> { async fn read(&self) -> DbResult<ReadCursor<Value>> {
let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?; let cursor = self
.store
.read_cursor()
.await
.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?;
Ok(cursor) Ok(cursor)
} }
async fn write(&mut self) -> DbResult<WriteCursor<Value>> { async fn write(&mut self) -> DbResult<WriteCursor<Value>> {
let cursor = self.store.write_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e))?; let cursor = self.store.write_cursor().await.map_err(|e| {
RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e)
})?;
Ok(cursor) Ok(cursor)
} }
@ -128,7 +141,6 @@ impl State {
} }
async fn attach_table(&mut self, table: Table) { async fn attach_table(&mut self, table: Table) {
// TODO: You need to update the global DB SCHEMA!
let new_table_position: TablePosition = self.tables.len(); let new_table_position: TablePosition = self.tables.len();
self.table_name_position_mapping self.table_name_position_mapping
.insert(table.schema().table_name().clone(), new_table_position); .insert(table.schema().table_name().clone(), new_table_position);
@ -136,16 +148,30 @@ impl State {
self.tables.push(RwLock::new(table)); self.tables.push(RwLock::new(table));
} }
async fn select_all_rows<W, C>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection) -> DbResult<usize> async fn select_all_rows<W, C>(
where W: ResponseWriter, table: &Table,
C: Cancellation mut cursor: ReadCursor<'_, Value>,
response_writer: &mut W,
cancellation: &C,
column_selection: ColumnSelection,
) -> DbResult<usize>
where
W: ResponseWriter,
C: Cancellation,
{ {
let mut count = 0; let mut count = 0;
while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? { while let Some(entry) = cursor
.next_alive()
.await
.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?
{
count += 1; count += 1;
let row: Row = From::from(entry); let row: Row = From::from(entry);
let restricted_row = row.restrict_columns(&column_selection); let restricted_row = row.restrict_columns(&column_selection);
response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; response_writer
.write_table_row(&restricted_row)
.await
.map_err(|e| RuntimeError::AnyhowError(e))?;
if cancellation.is_canceled() { if cancellation.is_canceled() {
break; break;
@ -155,16 +181,31 @@ impl State {
Ok(count) Ok(count)
} }
async fn select_eq<W, C>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult<usize> async fn select_eq<W, C>(
where W: ResponseWriter, table: &Table,
C: Cancellation mut cursor: ReadCursor<'_, Value>,
response_writer: &mut W,
cancellation: &C,
column_selection: ColumnSelection,
column: Column,
value: Value,
) -> DbResult<usize>
where
W: ResponseWriter,
C: Cancellation,
{ {
let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; let entries = cursor
.select_entries_where_eq(column as storage_engine::store::Column, &value)
.await
.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
let count = entries.len(); let count = entries.len();
for entry in entries { for entry in entries {
let row: Row = From::from(entry); let row: Row = From::from(entry);
let restricted_row = row.restrict_columns(&column_selection); let restricted_row = row.restrict_columns(&column_selection);
response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?; response_writer
.write_table_row(&restricted_row)
.await
.map_err(|e| RuntimeError::AnyhowError(e))?;
if cancellation.is_canceled() { if cancellation.is_canceled() {
break; break;
@ -174,28 +215,43 @@ impl State {
Ok(count) Ok(count)
} }
async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>) -> DbResult<usize> { async fn delete_all_rows(
let count = cursor.delete_all_entries(true) table_name: String,
.await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; mut cursor: WriteCursor<'_, Value>,
) -> DbResult<usize> {
let count = cursor
.delete_all_entries(true)
.await
.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?;
Ok(count) Ok(count)
} }
async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, eq_column: Column, value: Value) -> DbResult<usize> { async fn delete_all_eq(
let count = table_name: String,
cursor.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true) mut cursor: WriteCursor<'_, Value>,
.await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?; eq_column: Column,
value: Value,
) -> DbResult<usize> {
let count = cursor
.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true)
.await
.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?;
Ok(count) Ok(count)
} }
} }
impl StateHandler { impl StateHandler {
pub fn is_existing_db(db_path: &PathBuf) -> bool { pub fn is_existing_db(db_path: &PathBuf) -> bool {
db_path.exists() && db_path.is_dir() && db_path.exists()
db_path.join(METADATA_FILE).exists() && db_path.join(METADATA_FILE).is_file() && db_path.is_dir()
&& db_path.join(METADATA_FILE).exists()
&& db_path.join(METADATA_FILE).is_file()
} }
pub async fn new(db_path: PathBuf) -> DbResult<Self> { pub async fn new(db_path: PathBuf) -> DbResult<Self> {
fs::create_dir(db_path.clone()).await.map_err(|e| RuntimeError::IoError(e))?; fs::create_dir(db_path.clone())
.await
.map_err(|e| RuntimeError::IoError(e))?;
let state = Self { let state = Self {
db_path, db_path,
@ -228,14 +284,21 @@ impl StateHandler {
let metadata_file = self.db_path.join(METADATA_FILE); let metadata_file = self.db_path.join(METADATA_FILE);
let state = self.state.read().await; let state = self.state.read().await;
let metadata_raw = serde_json::to_string(&*state)?; let metadata_raw = serde_json::to_string(&*state)?;
fs::write(metadata_file, metadata_raw).await.map_err(|e| RuntimeError::IoError(e)) fs::write(metadata_file, metadata_raw)
.await
.map_err(|e| RuntimeError::IoError(e))
} }
pub async fn read_state(&self) -> RwLockReadGuard<State> { pub async fn read_state(&self) -> RwLockReadGuard<State> {
self.state.read().await self.state.read().await
} }
pub async fn interpret<W: ResponseWriter, C: Cancellation>(&self, response_writer: &mut W, cancellation: &C, operation: Operation) -> DbResult<()> { pub async fn interpret<W: ResponseWriter, C: Cancellation>(
&self,
response_writer: &mut W,
cancellation: &C,
operation: Operation,
) -> DbResult<()> {
use Operation::*; use Operation::*;
match operation { match operation {
@ -245,12 +308,38 @@ impl StateHandler {
let table = state.table_at(table_position).await; let table = state.table_at(table_position).await;
let cursor = table.read().await?; let cursor = table.read().await?;
response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?; response_writer
.write_table_header(&table.schema, &column_selection)
.await
.map_err(|e| RuntimeError::AnyhowError(e))?;
let count = match maybe_condition { let count = match maybe_condition {
None => State::select_all_rows(&table, cursor, response_writer, cancellation, column_selection).await?, None => {
Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, cancellation, column_selection, eq_column, value).await? State::select_all_rows(
&table,
cursor,
response_writer,
cancellation,
column_selection,
)
.await?
}
Some(Condition::Eq(eq_column, value)) => {
State::select_eq(
&table,
cursor,
response_writer,
cancellation,
column_selection,
eq_column,
value,
)
.await?
}
}; };
response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) response_writer
.write_command_complete(CompleteStatus::Select(count))
.await
.map_err(|e| RuntimeError::AnyhowError(e))
} }
Insert(table_position, values) => { Insert(table_position, values) => {
let state = self.state.read().await; let state = self.state.read().await;
@ -259,9 +348,14 @@ impl StateHandler {
let mut cursor = table.write().await?; let mut cursor = table.write().await?;
let entry = Entry::new(values); let entry = Entry::new(values);
cursor.insert_entry(entry).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; cursor.insert_entry(entry).await.map_err(|e| {
RuntimeError::StorageEngineError(table.table_name().to_string(), e)
})?;
response_writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await.map_err(|e| RuntimeError::AnyhowError(e)) response_writer
.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 })
.await
.map_err(|e| RuntimeError::AnyhowError(e))
} }
Delete(table_position, maybe_condition) => { Delete(table_position, maybe_condition) => {
let state = self.state.read().await; let state = self.state.read().await;
@ -272,10 +366,15 @@ impl StateHandler {
let count = match maybe_condition { let count = match maybe_condition {
None => State::delete_all_rows(table_name, cursor).await?, None => State::delete_all_rows(table_name, cursor).await?,
Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, eq_column, value).await? Some(Condition::Eq(eq_column, value)) => {
State::delete_all_eq(table_name, cursor, eq_column, value).await?
}
}; };
response_writer.write_command_complete(CompleteStatus::Delete(count)).await.map_err(|e| RuntimeError::AnyhowError(e)) response_writer
.write_command_complete(CompleteStatus::Delete(count))
.await
.map_err(|e| RuntimeError::AnyhowError(e))
} }
CreateTable(table_schema) => { CreateTable(table_schema) => {
{ {
@ -285,34 +384,42 @@ impl StateHandler {
// WARNING: We need to drop the write lock on state unless we want a deadlock. // WARNING: We need to drop the write lock on state unless we want a deadlock.
} }
self.save_metadata().await?; self.save_metadata().await?;
response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e)) response_writer
.write_command_complete(CompleteStatus::CreateTable)
.await
.map_err(|e| RuntimeError::AnyhowError(e))
} }
CreateIndex(table_position, column) => { CreateIndex(table_position, column) => {
let state = self.state.read().await; let state = self.state.read().await;
let mut table = state.table_at_mut(table_position).await; let mut table = state.table_at_mut(table_position).await;
let mut cursor = table.write().await?; let mut cursor = table.write().await?;
cursor.attach_index(column as storage_engine::store::Column).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?; cursor
response_writer.write_command_complete(CompleteStatus::CreateIndex).await.map_err(|e| RuntimeError::AnyhowError(e)) .attach_index(column as storage_engine::store::Column)
.await
.map_err(|e| {
RuntimeError::StorageEngineError(table.table_name().to_string(), e)
})?;
response_writer
.write_command_complete(CompleteStatus::CreateIndex)
.await
.map_err(|e| RuntimeError::AnyhowError(e))
} }
} }
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::cancellation::DummyCancellation;
use crate::operation::Operation; use crate::operation::Operation;
use crate::schema::Column;
use crate::response_writer::ResponseWriterStub; use crate::response_writer::ResponseWriterStub;
use crate::schema::Column;
use crate::type_system::{DbType, IndexableValue, Value}; use crate::type_system::{DbType, IndexableValue, Value};
use std::collections::HashSet; use std::collections::HashSet;
use tokio::fs::{File, OpenOptions, DirBuilder};
use tokio::fs; use tokio::fs;
use crate::cancellation::DummyCancellation; use tokio::fs::{DirBuilder, File, OpenOptions};
impl Drop for State { impl Drop for State {
fn drop(&mut self) { fn drop(&mut self) {
@ -344,18 +451,24 @@ mod tests {
let mut response_writer = ResponseWriterStub::new(); let mut response_writer = ResponseWriterStub::new();
state state
.interpret(&mut response_writer, &DummyCancellation, Operation::CreateTable(users_schema.clone())).await .interpret(
&mut response_writer,
&DummyCancellation,
Operation::CreateTable(users_schema.clone()),
)
.await
.unwrap(); .unwrap();
{ {
println!("==EMPTY SELECT==="); println!("==EMPTY SELECT===");
let users_position: TablePosition = 0; let users_position: TablePosition = 0;
state state
.interpret(&mut response_writer, &DummyCancellation, Operation::Select( .interpret(
users_position, &mut response_writer,
users_schema.all_selection(), &DummyCancellation,
None, Operation::Select(users_position, users_schema.all_selection(), None),
)).await )
.await
.unwrap(); .unwrap();
} }
@ -370,27 +483,27 @@ mod tests {
println!("About to insert!"); println!("About to insert!");
state state
.interpret(&mut response_writer, &DummyCancellation, Operation::Insert( .interpret(
users, &mut response_writer,
vec![id.clone(), name.clone(), age.clone()], &DummyCancellation,
)).await Operation::Insert(users, vec![id.clone(), name.clone(), age.clone()]),
)
.await
.unwrap(); .unwrap();
} }
{ {
println!("==SELECT==="); println!("==SELECT===");
let users_position: TablePosition = 0; let users_position: TablePosition = 0;
state state
.interpret(&mut response_writer, &DummyCancellation, Operation::Select( .interpret(
users_position, &mut response_writer,
users_schema.all_selection(), &DummyCancellation,
None, Operation::Select(users_position, users_schema.all_selection(), None),
)).await )
.await
.unwrap(); .unwrap();
} }
// assert!(false); // assert!(false);
// assert!(state.tables.len() == 1); // assert!(state.tables.len() == 1);
@ -400,4 +513,3 @@ mod tests {
// assert!(table.table_name() == &users); // assert!(table.table_name() == &users);
} }
} }

View file

@ -1,3 +1,4 @@
pub mod cancellation;
mod error; mod error;
mod internals; mod internals;
pub mod interpreter; pub mod interpreter;
@ -8,4 +9,3 @@ pub mod restricted_row;
mod result; mod result;
pub mod schema; pub mod schema;
pub mod type_system; pub mod type_system;
pub mod cancellation;

View file

@ -46,8 +46,7 @@ impl ResponseWriterStub {
} }
#[async_trait] #[async_trait]
impl ResponseWriter for ResponseWriterStub impl ResponseWriter for ResponseWriterStub {
{
async fn write_table_header( async fn write_table_header(
&mut self, &mut self,
table_schema: &TableSchema, table_schema: &TableSchema,

View file

@ -1,6 +1,6 @@
use crate::operation::ColumnSelection;
use crate::schema::{Column, TableSchema}; use crate::schema::{Column, TableSchema};
use crate::type_system::Value; use crate::type_system::Value;
use crate::operation::ColumnSelection;
use std::ops::Index; use std::ops::Index;
use std::slice::SliceIndex; use std::slice::SliceIndex;
use storage_engine::segments::entry::EntryDetailed; use storage_engine::segments::entry::EntryDetailed;

View file

@ -59,9 +59,8 @@ impl TableSchema {
} }
pub fn get_columns(&self) -> Vec<&ColumnName> { pub fn get_columns(&self) -> Vec<&ColumnName> {
let mut columns_in_random_order: Vec<_> = self.column_name_position_mapping let mut columns_in_random_order: Vec<_> =
.iter() self.column_name_position_mapping.iter().collect();
.collect();
columns_in_random_order.sort_by(|&(_, column0), &(_, column1)| column0.cmp(column1)); columns_in_random_order.sort_by(|&(_, column0), &(_, column1)| column0.cmp(column1));
let columns: Vec<_> = columns_in_random_order let columns: Vec<_> = columns_in_random_order

View file

@ -2,9 +2,7 @@ use crate::error::TypeConversionError;
use proto::message::primitive::pgoid::PgOid; use proto::message::primitive::pgoid::PgOid;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::cmp::Ordering; use std::cmp::Ordering;
// TODO: Private??? use bincode::{Decode, Encode};
// use bincode::{Encode, Encoder, EncodeError, Decode, Decoder, DecodeError};
use bincode::{Encode, Decode};
// ==============Types================ // ==============Types================
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
@ -92,7 +90,6 @@ impl PartialOrd for Value {
} }
} }
// TODO: Make column know about indexable types
impl Ord for Value { impl Ord for Value {
fn cmp(&self, other: &Self) -> Ordering { fn cmp(&self, other: &Self) -> Ordering {
match (self, other) { match (self, other) {
@ -120,18 +117,6 @@ impl Ord for Value {
} }
} }
// impl Encode for Value {
// fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
// todo!()
// }
// }
// impl Decode for Value {
// fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, DecodeError> {
// todo!()
// }
// }
impl DbType { impl DbType {
fn new_n_option(n: usize, inside: DbType) -> DbType { fn new_n_option(n: usize, inside: DbType) -> DbType {
if n == 0 { if n == 0 {

View file

@ -1,6 +1,12 @@
use crate::syntax::RawQuerySyntax; use crate::syntax::RawQuerySyntax;
use minisql::{interpreter2::DbSchema, operation::Operation}; use minisql::{interpreter2::DbSchema, operation::Operation};
use nom::{branch::alt, character::complete::{multispace0, char}, multi::many1, sequence::{delimited, terminated}, IResult}; use nom::{
branch::alt,
character::complete::{char, multispace0},
multi::many1,
sequence::{delimited, terminated},
IResult,
};
use thiserror::Error; use thiserror::Error;
use crate::{ use crate::{
@ -35,7 +41,10 @@ fn parse_statement(input: &str) -> IResult<&str, RawQuerySyntax> {
/// Parse one or more statements /// Parse one or more statements
#[allow(dead_code)] #[allow(dead_code)]
fn parse_statement1(input: &str) -> IResult<&str, Vec<RawQuerySyntax>> { fn parse_statement1(input: &str) -> IResult<&str, Vec<RawQuerySyntax>> {
many1(terminated(parse_statement, delimited(multispace0, char(';'), multispace0)))(input) many1(terminated(
parse_statement,
delimited(multispace0, char(';'), multispace0),
))(input)
} }
pub fn parse_and_validate(str_query: String, db_schema: &DbSchema) -> Result<Operation, Error> { pub fn parse_and_validate(str_query: String, db_schema: &DbSchema) -> Result<Operation, Error> {
@ -53,11 +62,9 @@ mod test {
#[test] #[test]
fn test_parse_two_select() { fn test_parse_two_select() {
let (rest, sntx) = parse_statement1("SELECT * FROM users ; SELECT * FROM cities ; ").expect("should parse"); let (rest, sntx) = parse_statement1("SELECT * FROM users ; SELECT * FROM cities ; ")
assert_eq!( .expect("should parse");
sntx.len(), assert_eq!(sntx.len(), 2);
2
);
assert_eq!(rest, ""); assert_eq!(rest, "");
} }
@ -68,11 +75,10 @@ mod test {
SELECT * FROM users ; SELECT * FROM users ;
INSERT INTO table1 (id, data) VALUES (u4, 30) ; INSERT INTO table1 (id, data) VALUES (u4, 30) ;
INSERT INTO table1 (id, data) VALUES (u5, 40) ; INSERT INTO table1 (id, data) VALUES (u5, 40) ;
"#).expect("should parse"); "#,
assert_eq!( )
sntx.len(), .expect("should parse");
4 assert_eq!(sntx.len(), 4);
);
assert_eq!(rest, ""); assert_eq!(rest, "");
} }
} }

View file

@ -1,10 +1,19 @@
use minisql::type_system::DbType; use minisql::type_system::DbType;
use nom::{ use nom::{
branch::alt, bytes::complete::{tag, take_while}, character::{complete::{alphanumeric1, anychar, char, multispace0, multispace1}, is_alphanumeric}, combinator::peek, error::make_error, sequence::{delimited, terminated}, IResult, Parser branch::alt,
bytes::complete::{tag, take_while},
character::{
complete::{alphanumeric1, anychar, char, multispace0, multispace1},
is_alphanumeric,
},
combinator::peek,
error::make_error,
sequence::{delimited, terminated},
IResult, Parser,
}; };
use crate::syntax::Condition;
use super::literal::parse_literal; use super::literal::parse_literal;
use crate::syntax::Condition;
pub fn parse_table_name(input: &str) -> IResult<&str, &str> { pub fn parse_table_name(input: &str) -> IResult<&str, &str> {
alt(( alt((
@ -16,11 +25,9 @@ pub fn parse_table_name(input: &str) -> IResult<&str, &str> {
pub fn parse_identifier(input: &str) -> IResult<&str, &str> { pub fn parse_identifier(input: &str) -> IResult<&str, &str> {
let (_, first) = peek(anychar)(input)?; let (_, first) = peek(anychar)(input)?;
if first.is_alphabetic() || first == '_' { if first.is_alphabetic() || first == '_' {
take_while(|c: char| { take_while(|c: char| match c {
match c {
'a'..='z' | 'A'..='Z' | '_' | '0'..='9' => true, 'a'..='z' | 'A'..='Z' | '_' | '0'..='9' => true,
_ => false _ => false,
}
})(input) })(input)
} else { } else {
Err(nom::Err::Error(make_error( Err(nom::Err::Error(make_error(
@ -35,22 +42,13 @@ pub fn parse_column_name(input: &str) -> IResult<&str, String> {
} }
pub fn parse_db_type(input: &str) -> IResult<&str, DbType> { pub fn parse_db_type(input: &str) -> IResult<&str, DbType> {
let (input, db_type) = alt( let (input, db_type) = alt((
( tag("STRING").map(|_| DbType::String),
tag("STRING") tag("INT").map(|_| DbType::Int),
.map(|_| DbType::String), tag("NUMBER").map(|_| DbType::Number),
tag("INT") tag("UUID").map(|_| DbType::Uuid),
.map(|_| DbType::Int), delimited(tag("Option("), parse_db_type, tag(")")).map(|ty| DbType::Option(Box::new(ty))),
tag("NUMBER") ))(input)?;
.map(|_| DbType::Number),
tag("UUID")
.map(|_| DbType::Uuid),
delimited(tag("Option("), parse_db_type, tag(")"))
.map(|ty| {
DbType::Option(Box::new(ty))
})
)
)(input)?;
Ok((input, db_type)) Ok((input, db_type))
} }
@ -122,10 +120,7 @@ mod tests {
parse_identifier("_variable__Test").expect("should parse").1, parse_identifier("_variable__Test").expect("should parse").1,
"_variable__Test" "_variable__Test"
); );
assert!(matches!( assert!(matches!(parse_identifier("123_variable__Test"), Err(_)));
parse_identifier("123_variable__Test"),
Err(_)
));
} }
#[test] #[test]
@ -139,8 +134,12 @@ mod tests {
#[test] #[test]
fn test_parse_nested_option_int_type() { fn test_parse_nested_option_int_type() {
assert_eq!( assert_eq!(
parse_db_type("Option(Option(Option(INT)))").expect("should parse").1, parse_db_type("Option(Option(Option(INT)))")
DbType::Option(Box::new(DbType::Option(Box::new(DbType::Option(Box::new(DbType::Int)))))) .expect("should parse")
.1,
DbType::Option(Box::new(DbType::Option(Box::new(DbType::Option(
Box::new(DbType::Int)
)))))
); );
} }
} }

View file

@ -77,8 +77,7 @@ mod tests {
#[test] #[test]
fn test_parse_create_no_quotes_table_name() { fn test_parse_create_no_quotes_table_name() {
parse_create("CREATE TABLE Table1(id UUID PRIMARY KEY,column1 INT)") parse_create("CREATE TABLE Table1(id UUID PRIMARY KEY,column1 INT)").expect("should parse");
.expect("should parse");
} }
#[test] #[test]
@ -91,8 +90,8 @@ mod tests {
#[test] #[test]
fn test_parse_create() { fn test_parse_create() {
let (_, create) = parse_create("CREATE TABLE \"Table1\"( id UUID , column1 INT )") let (_, create) =
.expect("should parse"); parse_create("CREATE TABLE \"Table1\"( id UUID , column1 INT )").expect("should parse");
assert!(matches!(create, RawQuerySyntax::CreateTable(_))); assert!(matches!(create, RawQuerySyntax::CreateTable(_)));
match create { match create {
RawQuerySyntax::CreateTable(schema) => { RawQuerySyntax::CreateTable(schema) => {
@ -117,7 +116,9 @@ mod tests {
#[test] #[test]
fn test_parse_create_option() { fn test_parse_create_option() {
let (_, create) = parse_create("CREATE TABLE games (id UUID PRIMARY KEY, name STRING, year Option(INT), price NUMBER)") let (_, create) = parse_create(
"CREATE TABLE games (id UUID PRIMARY KEY, name STRING, year Option(INT), price NUMBER)",
)
.expect("should parse"); .expect("should parse");
assert!(matches!(create, RawQuerySyntax::CreateTable(_))); assert!(matches!(create, RawQuerySyntax::CreateTable(_)));
match create { match create {
@ -139,16 +140,12 @@ mod tests {
assert_eq!(column1_column.type_, DbType::String); assert_eq!(column1_column.type_, DbType::String);
let column = schema.get_column(&"year".to_string()); let column = schema.get_column(&"year".to_string());
let Some(column) = column else { let Some(column) = column else { panic!() };
panic!()
};
assert_eq!(column.column_name, "year".to_string()); assert_eq!(column.column_name, "year".to_string());
assert_eq!(column.type_, DbType::Option(Box::new(DbType::Int))); assert_eq!(column.type_, DbType::Option(Box::new(DbType::Int)));
let column = schema.get_column(&"price".to_string()); let column = schema.get_column(&"price".to_string());
let Some(column) = column else { let Some(column) = column else { panic!() };
panic!()
};
assert_eq!(column.column_name, "price".to_string()); assert_eq!(column.column_name, "price".to_string());
assert_eq!(column.type_, DbType::Number); assert_eq!(column.type_, DbType::Number);
} }

View file

@ -29,22 +29,19 @@ mod tests {
#[test] #[test]
fn test_parse_delete() { fn test_parse_delete() {
let (_, sntx) = let (_, sntx) = parse_delete("DELETE FROM \"T1\" WHERE id = 1").expect("should parse");
parse_delete("DELETE FROM \"T1\" WHERE id = 1").expect("should parse");
assert!(matches!(sntx, RawQuerySyntax::Delete(_, _))) assert!(matches!(sntx, RawQuerySyntax::Delete(_, _)))
} }
#[test] #[test]
fn test_parse_delete_with_spaces() { fn test_parse_delete_with_spaces() {
let (_, sntx) = let (_, sntx) = parse_delete("DELETE FROM T1 WHERE id = 1").expect("should parse");
parse_delete("DELETE FROM T1 WHERE id = 1").expect("should parse");
assert!(matches!(sntx, RawQuerySyntax::Delete(_, _))) assert!(matches!(sntx, RawQuerySyntax::Delete(_, _)))
} }
#[test] #[test]
fn test_parse_delete_none() { fn test_parse_delete_none() {
let (_, sntx) = let (_, sntx) = parse_delete("DELETE FROM games WHERE year = None").expect("should parse");
parse_delete("DELETE FROM games WHERE year = None").expect("should parse");
if let RawQuerySyntax::Delete(tname, Some(Condition::Eq(column_name, lit))) = sntx { if let RawQuerySyntax::Delete(tname, Some(Condition::Eq(column_name, lit))) = sntx {
assert_eq!(tname, "games".to_string()); assert_eq!(tname, "games".to_string());
assert_eq!(column_name, "year".to_string()); assert_eq!(column_name, "year".to_string());

View file

@ -68,10 +68,7 @@ mod tests {
insertion_values, insertion_values,
vec![ vec![
("id".to_string(), Literal::Int(1)), ("id".to_string(), Literal::Int(1)),
( ("data".to_string(), Literal::String("Text".to_string()))
"data".to_string(),
Literal::String("Text".to_string())
)
] ]
); );
} }
@ -83,8 +80,7 @@ mod tests {
#[test] #[test]
fn test_parse_insert_with_spaces() { fn test_parse_insert_with_spaces() {
let sql = let sql = "INSERT INTO \"MyTable\" ( id, data ) VALUES ( 1, \"Text\" )";
"INSERT INTO \"MyTable\" ( id, data ) VALUES ( 1, \"Text\" )";
let operation = parse_insert(sql).expect("should parse"); let operation = parse_insert(sql).expect("should parse");
match operation { match operation {
("", RawQuerySyntax::Insert(table_name, insertion_values)) => { ("", RawQuerySyntax::Insert(table_name, insertion_values)) => {
@ -93,10 +89,7 @@ mod tests {
insertion_values, insertion_values,
vec![ vec![
("id".to_string(), Literal::Int(1)), ("id".to_string(), Literal::Int(1)),
( ("data".to_string(), Literal::String("Text".to_string()))
"data".to_string(),
Literal::String("Text".to_string())
)
] ]
); );
} }
@ -117,18 +110,12 @@ mod tests {
insertion_values, insertion_values,
vec![ vec![
("id".to_string(), Literal::Uuid(12345)), ("id".to_string(), Literal::Uuid(12345)),
( ("name".to_string(), Literal::String("Doom".to_string())),
"name".to_string(),
Literal::String("Doom".to_string())
),
( (
"year".to_string(), "year".to_string(),
Literal::Some(Box::new(Literal::Int(1993))) Literal::Some(Box::new(Literal::Int(1993)))
), ),
( ("price".to_string(), Literal::Number(6.5))
"price".to_string(),
Literal::Number(6.5)
)
] ]
); );
} }

View file

@ -1,5 +1,12 @@
use nom::{ use nom::{
branch::alt, bytes::complete::tag, character::complete::{char, digit1, none_of, u64}, combinator::opt, error::make_error, multi::many0, sequence::{delimited, pair, preceded}, IResult, Parser branch::alt,
bytes::complete::tag,
character::complete::{char, digit1, none_of, u64},
combinator::opt,
error::make_error,
multi::many0,
sequence::{delimited, pair, preceded},
IResult, Parser,
}; };
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -13,7 +20,13 @@ pub enum Literal {
} }
pub fn parse_literal(input: &str) -> IResult<&str, Literal> { pub fn parse_literal(input: &str) -> IResult<&str, Literal> {
alt((parse_option, parse_string, parse_number, parse_int, parse_uuid))(input) alt((
parse_option,
parse_string,
parse_number,
parse_int,
parse_uuid,
))(input)
} }
pub fn parse_number(input: &str) -> IResult<&str, Literal> { pub fn parse_number(input: &str) -> IResult<&str, Literal> {
@ -92,16 +105,16 @@ pub fn parse_string(input: &str) -> IResult<&str, Literal> {
} }
pub fn parse_uuid(input: &str) -> IResult<&str, Literal> { pub fn parse_uuid(input: &str) -> IResult<&str, Literal> {
let (input, value) = pair(char('u'), u64)(input) let (input, value) =
.map(|(input, (_, v))| (input, Literal::Uuid(v)))?; pair(char('u'), u64)(input).map(|(input, (_, v))| (input, Literal::Uuid(v)))?;
Ok((input, value)) Ok((input, value))
} }
pub fn parse_option(input: &str) -> IResult<&str, Literal> { pub fn parse_option(input: &str) -> IResult<&str, Literal> {
let (input, inner) = alt((tag("None") let (input, inner) = alt((
.map(|_| Literal::None), delimited(tag("Some("), parse_literal, tag(")")).map(|v| { tag("None").map(|_| Literal::None),
Literal::Some(Box::new(v)) delimited(tag("Some("), parse_literal, tag(")")).map(|v| Literal::Some(Box::new(v))),
})))(input)?; ))(input)?;
Ok((input, inner)) Ok((input, inner))
} }
@ -114,24 +127,15 @@ mod tests {
fn test_string_parser() { fn test_string_parser() {
assert_eq!( assert_eq!(
parse_string(r#""simple""#), parse_string(r#""simple""#),
Ok(( Ok(("", Literal::String(String::from("simple"))))
"",
Literal::String(String::from("simple"))
))
); );
assert_eq!( assert_eq!(
parse_string(r#""\"\t\r\n\\""#), parse_string(r#""\"\t\r\n\\""#),
Ok(( Ok(("", Literal::String(String::from("\"\t\r\n\\"))))
"",
Literal::String(String::from("\"\t\r\n\\"))
))
); );
assert_eq!( assert_eq!(
parse_string(r#""name is \"John\".""#), parse_string(r#""name is \"John\".""#),
Ok(( Ok(("", Literal::String(String::from("name is \"John\"."))))
"",
Literal::String(String::from("name is \"John\"."))
))
); );
} }
@ -151,9 +155,7 @@ mod tests {
assert_eq!(input, ""); assert_eq!(input, "");
assert_eq!( assert_eq!(
value, value,
Literal::String( Literal::String("abcdefghkjklmnopqrstuvwxyz!@#$%^&*()_+ ".to_string())
"abcdefghkjklmnopqrstuvwxyz!@#$%^&*()_+ ".to_string()
)
); );
} }
@ -192,18 +194,12 @@ mod tests {
#[test] #[test]
fn test_parse_int() { fn test_parse_int() {
assert_eq!( assert_eq!(parse_literal("5134616"), Ok(("", Literal::Int(5134616))));
parse_literal("5134616"),
Ok(("", Literal::Int(5134616)))
);
} }
#[test] #[test]
fn test_parse_uuid() { fn test_parse_uuid() {
assert_eq!( assert_eq!(parse_uuid("u131515"), Ok(("", Literal::Uuid(131515))))
parse_uuid("u131515"),
Ok(("", Literal::Uuid(131515)))
)
} }
#[test] #[test]
@ -214,7 +210,10 @@ mod tests {
); );
assert_eq!( assert_eq!(
parse_option("Some(Some(3))"), parse_option("Some(Some(3))"),
Ok(("", Literal::Some(Box::new(Literal::Some(Box::new(Literal::Int(3))))))) Ok((
"",
Literal::Some(Box::new(Literal::Some(Box::new(Literal::Int(3)))))
))
); );
assert_eq!( assert_eq!(
parse_option("Some(None)"), parse_option("Some(None)"),

View file

@ -42,7 +42,9 @@ pub fn try_parse_column_selection(input: &str) -> IResult<&str, ColumnSelection>
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::parsing::{ use crate::parsing::{
common::{parse_column_name, parse_table_name}, literal::Literal, select::parse_select common::{parse_column_name, parse_table_name},
literal::Literal,
select::parse_select,
}; };
use crate::syntax::{ColumnSelection, RawQuerySyntax}; use crate::syntax::{ColumnSelection, RawQuerySyntax};

View file

@ -71,4 +71,3 @@ impl RawTableSchema {
.collect() .collect()
} }
} }

View file

@ -37,7 +37,7 @@ pub enum ValidationError {
expected_type: DbType, expected_type: DbType,
}, },
#[error("Expected type {expected_type:?}, received None")] #[error("Expected type {expected_type:?}, received None")]
UnexpectedNoneValue{ expected_type: DbType }, UnexpectedNoneValue { expected_type: DbType },
#[error("values for required columns {0:?} are missing")] #[error("values for required columns {0:?} are missing")]
RequiredColumnsAreMissing(Vec<ColumnName>), RequiredColumnsAreMissing(Vec<ColumnName>),
} }
@ -284,7 +284,10 @@ fn validate_condition(
)?; )?;
let value_type: DbType = type_from_literal_with_type_hint(&value, &expected_type)?; let value_type: DbType = type_from_literal_with_type_hint(&value, &expected_type)?;
if expected_type.eq(&value_type) { if expected_type.eq(&value_type) {
Ok(Some(operation::Condition::Eq(column, literal_to_value(value, &expected_type)))) Ok(Some(operation::Condition::Eq(
column,
literal_to_value(value, &expected_type),
)))
} else { } else {
Err(ValidationError::TypeMismatch { Err(ValidationError::TypeMismatch {
column_name: column_name.to_string(), column_name: column_name.to_string(),
@ -339,10 +342,7 @@ where
None None
} }
fn get_table_schema( fn get_table_schema(db_schema: &DbSchema, table_name: &TableName) -> Option<Arc<TableSchema>> {
db_schema: &DbSchema,
table_name: &TableName,
) -> Option<Arc<TableSchema>> {
let (_, _, table_schema) = db_schema let (_, _, table_schema) = db_schema
.iter() .iter()
.find(|(tname, _, _)| table_name.eq(tname))?; .find(|(tname, _, _)| table_name.eq(tname))?;
@ -364,11 +364,14 @@ fn literal_to_value(lit: Literal, hint: &DbType) -> Value {
// type we want from the literal // type we want from the literal
panic!() panic!()
} }
}, }
} }
} }
fn type_from_literal_with_type_hint(lit: &Literal, hint: &DbType) -> Result<DbType, ValidationError> { fn type_from_literal_with_type_hint(
lit: &Literal,
hint: &DbType,
) -> Result<DbType, ValidationError> {
Ok(match lit { Ok(match lit {
Literal::Number(_) => DbType::Number, Literal::Number(_) => DbType::Number,
Literal::String(_) => DbType::String, Literal::String(_) => DbType::String,
@ -379,7 +382,9 @@ fn type_from_literal_with_type_hint(lit: &Literal, hint: &DbType) -> Result<DbTy
if matches!(hint, DbType::Option(_)) { if matches!(hint, DbType::Option(_)) {
hint.clone() hint.clone()
} else { } else {
return Err(ValidationError::UnexpectedNoneValue { expected_type: hint.clone() }) return Err(ValidationError::UnexpectedNoneValue {
expected_type: hint.clone(),
});
} }
} }
}) })

View file

@ -10,4 +10,4 @@ rand = { workspace = true }
rand_pcg = "0.3.1" rand_pcg = "0.3.1"
rand_seeder = "0.2.3" rand_seeder = "0.2.3"
thiserror = { workspace = true } thiserror = { workspace = true }
tokio = { workspace = true, features = ["io-util", "macros", "test-util"] } tokio = { workspace = true, features = ["io-util", "macros"] }

View file

@ -10,7 +10,7 @@ anyhow = { workspace = true }
async-trait = { workspace = true } async-trait = { workspace = true }
clap = { workspace = true, features = ["derive"] } clap = { workspace = true, features = ["derive"] }
rand = { workspace = true } rand = { workspace = true }
tokio = { workspace = true, features = ["io-util", "macros", "net", "rt-multi-thread"] } tokio = { workspace = true, features = ["io-util", "macros", "net", "rt-multi-thread", "time"] }
minisql = { path = "../minisql" } minisql = { path = "../minisql" }
parser = { path = "../parser" } parser = { path = "../parser" }

View file

@ -1,6 +1,6 @@
use minisql::cancellation::Cancellation;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use minisql::cancellation::Cancellation;
pub struct ResetCancelToken { pub struct ResetCancelToken {
is_canceled: Arc<AtomicBool>, is_canceled: Arc<AtomicBool>,

View file

@ -85,7 +85,7 @@ async fn handle_stream(
let mut wrapped_writer = ServerProtoWrapper::new(writer, config.get_throttle()); let mut wrapped_writer = ServerProtoWrapper::new(writer, config.get_throttle());
let result = match request { let result = match request {
Ok(req) => { Ok(req) => {
handle_connection(&mut reader, &mut wrapped_writer, req, state, token, config).await handle_connection(&mut reader, &mut wrapped_writer, req, state, token).await
} }
Err(ServerHandshakeError::IsCancelRequest(cancel)) => { Err(ServerHandshakeError::IsCancelRequest(cancel)) => {
handle_cancellation(cancel.pid, cancel.secret, &tokens).await handle_cancellation(cancel.pid, cancel.secret, &tokens).await
@ -141,7 +141,6 @@ async fn handle_connection<R, W>(
request: HandshakeRequest, request: HandshakeRequest,
state: SharedDbState, state: SharedDbState,
token: ResetCancelToken, token: ResetCancelToken,
config: Arc<Configuration>,
) -> anyhow::Result<()> ) -> anyhow::Result<()>
where where
R: FrontendProtoReader + Send, R: FrontendProtoReader + Send,
@ -157,7 +156,7 @@ where
break; break;
} }
FrontendMessage::Query(data) => { FrontendMessage::Query(data) => {
let result = handle_query(writer, &state, data.query.into(), &token, &config).await; let result = handle_query(writer, &state, data.query.into(), &token).await;
match result { match result {
Ok(_) => {} Ok(_) => {}
Err(e) => writer.write_error_message(&e.to_string()).await?, Err(e) => writer.write_error_message(&e.to_string()).await?,
@ -176,7 +175,6 @@ async fn handle_query<W>(
state: &SharedDbState, state: &SharedDbState,
query: String, query: String,
token: &ResetCancelToken, token: &ResetCancelToken,
config: &Arc<Configuration>,
) -> anyhow::Result<()> ) -> anyhow::Result<()>
where where
W: BackendProtoWriter + ProtoFlush + Send, W: BackendProtoWriter + ProtoFlush + Send,

View file

@ -1,9 +1,9 @@
use bincode;
use bincode::{Decode, Encode};
use bincode::config::{BigEndian, Configuration, Fixint}; use bincode::config::{BigEndian, Configuration, Fixint};
use std::mem::size_of; use bincode::{Decode, Encode};
const BIN_CONFIG: Configuration<BigEndian, Fixint> = bincode::config::standard().with_big_endian().with_fixed_int_encoding(); const BIN_CONFIG: Configuration<BigEndian, Fixint> = bincode::config::standard()
.with_big_endian()
.with_fixed_int_encoding();
pub fn encode<T: Encode>(t: &T) -> Result<Vec<u8>, bincode::error::EncodeError> { pub fn encode<T: Encode>(t: &T) -> Result<Vec<u8>, bincode::error::EncodeError> {
bincode::encode_to_vec(t, BIN_CONFIG) bincode::encode_to_vec(t, BIN_CONFIG)
@ -13,51 +13,33 @@ pub fn decode<T: Decode>(bytes: &[u8]) -> Result<(T, usize), bincode::error::Dec
bincode::decode_from_slice(bytes, BIN_CONFIG) bincode::decode_from_slice(bytes, BIN_CONFIG)
} }
pub fn encode_vector<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
let size: usize = ts.len();
let mut result = encode(&size)?;
for t in ts {
result.append(&mut encode(&t)?);
}
Ok(result)
}
pub fn decode_vector<T: Decode>(bytes: &[u8]) -> Result<Vec<T>, bincode::error::DecodeError> {
let mut offset = size_of::<usize>();
let result_len: usize = decode(&bytes[..offset])?.0;
let mut result: Vec<T> = Vec::with_capacity(result_len);
for _ in 0..result_len {
let (x, bytes_consumed) = decode::<T>(&bytes[offset..])?;
offset += bytes_consumed;
result.push(x);
}
Ok(result)
}
// We don't care about encoding the length here (since it will be used for a row with known column // We don't care about encoding the length here (since it will be used for a row with known column
// size) // size)
pub fn encode_sequence<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> { pub fn encode_sequence<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
let mut result = vec![]; let mut result = vec![];
for t in ts { for t in ts {
result.append(&mut encode(&t)?); result.append(&mut encode(&t)?);
} }
Ok(result) Ok(result)
} }
pub fn encode_sequence_with_sizes<T: Encode>(ts: &[T]) -> Result<(Vec<u8>, Vec<usize>), bincode::error::EncodeError> { pub fn encode_sequence_with_sizes<T: Encode>(
ts: &[T],
) -> Result<(Vec<u8>, Vec<usize>), bincode::error::EncodeError> {
let mut result_bytes = vec![]; let mut result_bytes = vec![];
let mut sizes = Vec::with_capacity(ts.len()); let mut sizes = Vec::with_capacity(ts.len());
for t in ts { for t in ts {
let mut bytes = encode(&t)?; let mut bytes = encode(&t)?;
sizes.push(bytes.len()); sizes.push(bytes.len());
result_bytes.append(&mut bytes); result_bytes.append(&mut bytes);
} }
Ok((result_bytes, sizes)) Ok((result_bytes, sizes))
} }
pub fn decode_sequence<T: Decode>(len: usize, bytes: &[u8]) -> Result<Vec<T>, bincode::error::DecodeError> { pub fn decode_sequence<T: Decode>(
len: usize,
bytes: &[u8],
) -> Result<Vec<T>, bincode::error::DecodeError> {
let mut result: Vec<T> = Vec::with_capacity(len); let mut result: Vec<T> = Vec::with_capacity(len);
let mut offset = 0; let mut offset = 0;
for _ in 0..len { for _ in 0..len {
@ -67,21 +49,3 @@ pub fn decode_sequence<T: Decode>(len: usize, bytes: &[u8]) -> Result<Vec<T>, bi
} }
Ok(result) Ok(result)
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encoding_decoding() {
let xs: Vec<String> = vec!["foo".to_string(), "bar".to_string()];
let exs = encode_vector(&xs[..]).unwrap();
// WARNING: Don't forget to specify the type here
let dxs = decode_vector::<String>(&exs[..]).unwrap();
assert!(dxs == xs);
}
}

View file

@ -1,21 +1,24 @@
use tokio::fs::{File, OpenOptions};
use tokio::fs;
use std::path::Path;
use std::marker::PhantomData;
use std::collections::{BTreeMap, HashSet}; use std::collections::{BTreeMap, HashSet};
use std::marker::PhantomData;
use std::path::Path;
use tokio::fs;
use tokio::fs::{File, OpenOptions};
use bincode; use bincode;
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
use crate::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex};
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
use crate::cursor_capabilities::traversal::CursorCanTraverse;
use crate::index::Index;
use crate::segments::entry::EntryDetailed; use crate::segments::entry::EntryDetailed;
use crate::segments::entry_header::EntryHeader; use crate::segments::entry_header::EntryHeader;
use crate::segments::store_header::StoreHeader; use crate::segments::store_header::StoreHeader;
use crate::store::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME}; use crate::store::{
use crate::index::Index; Column, FilePosition, Result, Store, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME,
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite}; ROWS_FILE_NAME,
use crate::cursor_capabilities::traversal::CursorCanTraverse; };
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex};
const GARBAGE_COLLECTION_TRIGGER: usize = 100; const GARBAGE_COLLECTION_TRIGGER: usize = 100;
@ -45,10 +48,9 @@ pub struct AppendOnlyCursor<T> {
eof_file_position: FilePosition, eof_file_position: FilePosition,
} }
// ===========Implementations============= // ===========Implementations=============
// ===primitive capabilities=== // ===primitive capabilities===
impl <T>CursorCanRead<T> for ReadCursor<'_, T> { impl<T> CursorCanRead<T> for ReadCursor<'_, T> {
fn file(&mut self) -> &mut File { fn file(&mut self) -> &mut File {
&mut self.file &mut self.file
} }
@ -58,7 +60,7 @@ impl <T>CursorCanRead<T> for ReadCursor<'_, T> {
} }
} }
impl <T>CursorCanRead<T> for WriteCursor<'_, T> { impl<T> CursorCanRead<T> for WriteCursor<'_, T> {
fn file(&mut self) -> &mut File { fn file(&mut self) -> &mut File {
&mut self.file &mut self.file
} }
@ -68,7 +70,7 @@ impl <T>CursorCanRead<T> for WriteCursor<'_, T> {
} }
} }
impl <T>CursorCanRead<T> for AppendOnlyCursor<T> { impl<T> CursorCanRead<T> for AppendOnlyCursor<T> {
fn file(&mut self) -> &mut File { fn file(&mut self) -> &mut File {
&mut self.file &mut self.file
} }
@ -78,58 +80,73 @@ impl <T>CursorCanRead<T> for AppendOnlyCursor<T> {
} }
} }
impl <T>CursorCanWrite<T> for WriteCursor<'_, T> {} impl<T> CursorCanWrite<T> for WriteCursor<'_, T> {}
impl <T>CursorCanWrite<T> for AppendOnlyCursor<T> {} impl<T> CursorCanWrite<T> for AppendOnlyCursor<T> {}
// ===capability to access header=== // ===capability to access header===
impl <T>CursorCanTraverse<T> for ReadCursor<'_, T> { impl<T> CursorCanTraverse<T> for ReadCursor<'_, T> {
fn header(&self) -> &StoreHeader { &self.header } fn header(&self) -> &StoreHeader {
&self.header
}
} }
impl <T>CursorCanTraverse<T> for WriteCursor<'_, T> { impl<T> CursorCanTraverse<T> for WriteCursor<'_, T> {
fn header(&self) -> &StoreHeader { &self.header } fn header(&self) -> &StoreHeader {
self.header
}
} }
impl <T>CursorCanTraverse<T> for AppendOnlyCursor<T> { impl<T> CursorCanTraverse<T> for AppendOnlyCursor<T> {
fn header(&self) -> &StoreHeader { &self.header } fn header(&self) -> &StoreHeader {
&self.header
}
} }
impl <T>CursorCanModifyEntries<T> for WriteCursor<'_, T> { impl<T> CursorCanModifyEntries<T> for WriteCursor<'_, T> {
fn header_mut(&mut self) -> &mut StoreHeader { self.header } fn header_mut(&mut self) -> &mut StoreHeader {
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } self.header
}
fn set_eof_file_position(&mut self, new_file_position: FilePosition) {
self.eof_file_position = new_file_position
}
} }
impl <T>CursorCanModifyEntries<T> for AppendOnlyCursor<T> { impl<T> CursorCanModifyEntries<T> for AppendOnlyCursor<T> {
fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header } fn header_mut(&mut self) -> &mut StoreHeader {
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position } &mut self.header
}
fn set_eof_file_position(&mut self, new_file_position: FilePosition) {
self.eof_file_position = new_file_position
}
} }
// ===capability to access index=== // ===capability to access index===
impl <T>CursorCanReadIndex<T> for ReadCursor<'_, T> { impl<T> CursorCanReadIndex<T> for ReadCursor<'_, T> {
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes } fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] {
self.indexes
}
} }
impl <T>CursorCanReadIndex<T> for WriteCursor<'_, T> { impl<T> CursorCanReadIndex<T> for WriteCursor<'_, T> {
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes } fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] {
self.indexes
}
} }
impl <T>CursorCanWriteToIndex<T> for WriteCursor<'_, T> { impl<T> CursorCanWriteToIndex<T> for WriteCursor<'_, T> {
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>] { self.indexes } fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>] {
self.indexes
}
} }
// ===Specifics=== // ===Specifics===
impl <'cursor, T> ReadCursor<'cursor, T> { impl<'cursor, T> ReadCursor<'cursor, T> {
pub async fn new<'store: 'cursor>(store: &'store Store<T>) -> Result<Self> pub async fn new<'store: 'cursor>(store: &'store Store<T>) -> Result<Self>
where T: Send + Sync where
T: Send + Sync,
{ {
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
let file: File = let file: File = OpenOptions::new().read(true).open(path_to_rows).await?;
OpenOptions::new()
.read(true)
.open(path_to_rows)
.await?;
let mut cursor = Self { let mut cursor = Self {
header: store.header.clone(), header: store.header.clone(),
@ -147,15 +164,14 @@ impl <'cursor, T> ReadCursor<'cursor, T> {
} }
} }
impl <'cursor, T> WriteCursor<'cursor, T> impl<'cursor, T> WriteCursor<'cursor, T> {
{
// 'store lives atleast as long as 'cursor // 'store lives atleast as long as 'cursor
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self> pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
where T: Send where
T: Send,
{ {
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME); let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
let file: File = let file: File = OpenOptions::new()
OpenOptions::new()
.read(true) .read(true)
.write(true) .write(true)
.open(path_to_rows) .open(path_to_rows)
@ -179,7 +195,8 @@ impl <'cursor, T> WriteCursor<'cursor, T>
// ===Entry Header Manipulation=== // ===Entry Header Manipulation===
// assumes we are at the start of valid entry. // assumes we are at the start of valid entry.
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()> async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()>
where T: Send where
T: Send,
{ {
let bytes: Vec<u8> = entry_header.encode()?; let bytes: Vec<u8> = entry_header.encode()?;
self.write_bytes(&bytes).await?; self.write_bytes(&bytes).await?;
@ -187,8 +204,13 @@ impl <'cursor, T> WriteCursor<'cursor, T>
} }
// ===Deletion=== // ===Deletion===
pub async fn mark_deleted_at(&mut self, file_position: FilePosition, enable_garbage_collector: bool) -> Result<()> pub async fn mark_deleted_at(
where T: Encode + Decode + Ord + Send + Sync + Clone + Ord &mut self,
file_position: FilePosition,
enable_garbage_collector: bool,
) -> Result<()>
where
T: Encode + Decode + Ord + Send + Sync + Clone + Ord,
{ {
self.seek_to(file_position).await?; self.seek_to(file_position).await?;
let mut entry_header = self.read_entry_header().await?; let mut entry_header = self.read_entry_header().await?;
@ -205,9 +227,7 @@ impl <'cursor, T> WriteCursor<'cursor, T>
// Update index // Update index
self.seek_to(file_position).await?; self.seek_to(file_position).await?;
match self.next().await? { match self.next().await? {
Some(entry) => { Some(entry) => self.delete_entry_values_from_indexes(entry).await?,
self.delete_entry_values_from_indexes(entry).await?
},
None => { None => {
// SAFETY: We just modified its header, so it must exist. // SAFETY: We just modified its header, so it must exist.
unreachable!() unreachable!()
@ -221,12 +241,19 @@ impl <'cursor, T> WriteCursor<'cursor, T>
} }
} }
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T, enable_garbage_collector: bool) -> Result<Option<EntryDetailed<T>>> async fn find_first_eq_bruteforce_and_delete(
where T: Encode + Decode + Ord + Send + Sync + Clone &mut self,
column: Column,
t0: &T,
enable_garbage_collector: bool,
) -> Result<Option<EntryDetailed<T>>>
where
T: Encode + Decode + Ord + Send + Sync + Clone,
{ {
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?; let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
if let Some(entry) = maybe_entry { if let Some(entry) = maybe_entry {
self.mark_deleted_at(entry.file_position, enable_garbage_collector).await?; self.mark_deleted_at(entry.file_position, enable_garbage_collector)
.await?;
Ok(Some(entry)) Ok(Some(entry))
} else { } else {
Ok(maybe_entry) Ok(maybe_entry)
@ -235,17 +262,23 @@ impl <'cursor, T> WriteCursor<'cursor, T>
// Doesn't update indexes. // Doesn't update indexes.
async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<usize> async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<usize>
where T: Encode + Decode + Ord + Send + Sync + Clone where
T: Encode + Decode + Ord + Send + Sync + Clone,
{ {
let mut count = 0; let mut count = 0;
while let Some(_) = self.find_first_eq_bruteforce_and_delete(column, t0, false).await? { while self
.find_first_eq_bruteforce_and_delete(column, t0, false)
.await?
.is_some()
{
count += 1; count += 1;
} }
Ok(count) Ok(count)
} }
pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result<usize> pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result<usize>
where T: Encode + Decode + Ord + Send + Sync + Clone where
T: Encode + Decode + Ord + Send + Sync + Clone,
{ {
let mut count = 0; let mut count = 0;
while let Some(entry) = self.next_alive().await? { while let Some(entry) = self.next_alive().await? {
@ -259,11 +292,16 @@ impl <'cursor, T> WriteCursor<'cursor, T>
Ok(count) Ok(count)
} }
pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result<usize> pub async fn delete_entries_where_eq(
where T: Encode + Decode + Ord + Send + Sync + Clone &mut self,
column: Column,
value: &T,
enable_garbage_collector: bool,
) -> Result<usize>
where
T: Encode + Decode + Ord + Send + Sync + Clone,
{ {
let count = let count = if self.header().is_column_indexed(column) {
if self.header().is_column_indexed(column) {
let entries = self.index_lookup(column, value).await?; let entries = self.index_lookup(column, value).await?;
let count = entries.len(); let count = entries.len();
for entry in entries { for entry in entries {
@ -271,7 +309,9 @@ impl <'cursor, T> WriteCursor<'cursor, T>
} }
count count
} else { } else {
let count = self.find_all_eq_bruteforce_and_delete(column, value).await?; let count = self
.find_all_eq_bruteforce_and_delete(column, value)
.await?;
count count
}; };
if enable_garbage_collector { if enable_garbage_collector {
@ -283,10 +323,11 @@ impl <'cursor, T> WriteCursor<'cursor, T>
// ===Indexing=== // ===Indexing===
// WARNING: Assumes the column is NOT indexable. // WARNING: Assumes the column is NOT indexable.
pub async fn attach_index(&mut self, column: Column) -> Result<()> pub async fn attach_index(&mut self, column: Column) -> Result<()>
where T: Ord + Decode + Encode + Send + Sync where
T: Ord + Decode + Encode + Send + Sync,
{ {
// New Index // New Index
let index = Store::create_empty_index_at(&self.header, column).await?; let index = Store::create_empty_index_at(self.header, column).await?;
self.indexes[column as usize] = Some(index); self.indexes[column as usize] = Some(index);
// Mark column as indexed // Mark column as indexed
@ -304,7 +345,8 @@ impl <'cursor, T> WriteCursor<'cursor, T>
// ===Garbage Collection=== // ===Garbage Collection===
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()> async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
where T: Send + Sync + Decode + Encode + Clone + Ord where
T: Send + Sync + Decode + Encode + Clone + Ord,
{ {
if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER { if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER {
println!("=======START GARBAGE COLLECTOR===="); println!("=======START GARBAGE COLLECTOR====");
@ -315,13 +357,15 @@ impl <'cursor, T> WriteCursor<'cursor, T>
} }
pub async fn initiate_garbage_collection(&mut self) -> Result<usize> pub async fn initiate_garbage_collection(&mut self) -> Result<usize>
where T: Send + Sync + Decode + Encode + Clone + Ord where
T: Send + Sync + Decode + Encode + Clone + Ord,
{ {
let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?; let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?;
// Since garbage collection changes FilePositions of live entries, we need to update the // Since garbage collection changes FilePositions of live entries, we need to update the
// indexes too. // indexes too.
let mut in_memory_indexes: Vec<Option<BTreeMap<T, HashSet<FilePosition>>>> = Vec::with_capacity(self.header.number_of_columns); let mut in_memory_indexes: Vec<Option<BTreeMap<T, HashSet<FilePosition>>>> =
Vec::with_capacity(self.header.number_of_columns);
for column in 0..self.header.number_of_columns { for column in 0..self.header.number_of_columns {
if self.header.is_column_indexed(column as Column) { if self.header.is_column_indexed(column as Column) {
let in_memory_index = BTreeMap::new(); let in_memory_index = BTreeMap::new();
@ -337,12 +381,19 @@ impl <'cursor, T> WriteCursor<'cursor, T>
{ {
while let Some(live_entry) = self.next_alive().await? { while let Some(live_entry) = self.next_alive().await? {
entries_deleted += 1; entries_deleted += 1;
let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?; let file_position = cursor_to_intermediate
.append_entry_no_indexing(&live_entry.forget())
.await?;
// Update index. (Wouldn't it be nice if we had `for let ...`?) // Update index. (Wouldn't it be nice if we had `for let ...`?)
for (maybe_in_memory_index, value) in in_memory_indexes.iter_mut().zip(&live_entry.data) { for (maybe_in_memory_index, value) in
in_memory_indexes.iter_mut().zip(&live_entry.data)
{
if let Some(in_memory_index) = maybe_in_memory_index { if let Some(in_memory_index) = maybe_in_memory_index {
in_memory_index.entry(value.clone()).or_insert_with(HashSet::new).insert(file_position); in_memory_index
.entry(value.clone())
.or_insert_with(HashSet::new)
.insert(file_position);
} }
} }
} }
@ -369,7 +420,8 @@ impl <'cursor, T> WriteCursor<'cursor, T>
// current file // current file
let path_to_table = Path::new(&self.header.table_folder); let path_to_table = Path::new(&self.header.table_folder);
let path_to_rows = path_to_table.join(ROWS_FILE_NAME); let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
let path_to_intermediate_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); let path_to_intermediate_rows =
path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
fs::remove_file(path_to_rows.clone()).await?; fs::remove_file(path_to_rows.clone()).await?;
fs::rename(path_to_intermediate_rows, path_to_rows).await?; fs::rename(path_to_intermediate_rows, path_to_rows).await?;
@ -377,13 +429,15 @@ impl <'cursor, T> WriteCursor<'cursor, T>
} }
async fn spawn_cursor_to_intermediate_file(&self) -> Result<AppendOnlyCursor<T>> async fn spawn_cursor_to_intermediate_file(&self) -> Result<AppendOnlyCursor<T>>
where T: Send where
T: Send,
{ {
let table_folder = self.header.table_folder.clone(); let table_folder = self.header.table_folder.clone();
let path_to_table = Path::new(&table_folder); let path_to_table = Path::new(&table_folder);
let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME); let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
let intermediate_file: File = Store::<T>::create_empty_rows_file(path_to_rows, &self.header).await?; let intermediate_file: File =
Store::<T>::create_empty_rows_file(path_to_rows, self.header).await?;
let intermediate_header: StoreHeader = StoreHeader { let intermediate_header: StoreHeader = StoreHeader {
table_folder, table_folder,

View file

@ -1,14 +1,14 @@
use async_trait::async_trait; use async_trait::async_trait;
use crate::binary_coding::encode;
use bincode; use bincode;
use bincode::Encode; use bincode::Encode;
use crate::binary_coding::encode;
use crate::cursor_capabilities::primitive::CursorCanWrite;
use crate::cursor_capabilities::traversal::CursorCanTraverse;
use crate::segments::entry::Entry; use crate::segments::entry::Entry;
use crate::segments::store_header::StoreHeader; use crate::segments::store_header::StoreHeader;
use crate::store::{FilePosition, Result}; use crate::store::{FilePosition, Result};
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
use crate::cursor_capabilities::traversal::CursorCanTraverse;
#[async_trait] #[async_trait]
pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> { pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
@ -17,7 +17,8 @@ pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
// ===Store Header Manipulation=== // ===Store Header Manipulation===
async fn increment_total_count(&mut self) -> Result<()> async fn increment_total_count(&mut self) -> Result<()>
where T: Send where
T: Send,
{ {
self.seek_to_start().await?; self.seek_to_start().await?;
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?; self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
@ -27,17 +28,20 @@ pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
} }
async fn increment_deleted_count(&mut self) -> Result<()> async fn increment_deleted_count(&mut self) -> Result<()>
where T: Send where
T: Send,
{ {
self.seek_to_start().await?; self.seek_to_start().await?;
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?; self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64)
.await?;
let new_count = self.header_mut().increment_deleted_count(); let new_count = self.header_mut().increment_deleted_count();
self.write_bytes(&encode::<usize>(&new_count)?).await?; self.write_bytes(&encode::<usize>(&new_count)?).await?;
Ok(()) Ok(())
} }
async fn set_header(&mut self, header: &StoreHeader) -> Result<()> async fn set_header(&mut self, header: &StoreHeader) -> Result<()>
where T: Send where
T: Send,
{ {
self.seek_to_start().await?; self.seek_to_start().await?;
let encoded_header: Vec<u8> = header.encode()?; let encoded_header: Vec<u8> = header.encode()?;
@ -51,7 +55,8 @@ pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
// Moves cursor to the end. // Moves cursor to the end.
// Returns file position to the start of the new entry. // Returns file position to the start of the new entry.
async fn append_entry_no_indexing(&mut self, entry: &Entry<T>) -> Result<FilePosition> async fn append_entry_no_indexing(&mut self, entry: &Entry<T>) -> Result<FilePosition>
where T: Encode + Send + Sync where
T: Encode + Send + Sync,
{ {
self.increment_total_count().await?; self.increment_total_count().await?;

View file

@ -5,44 +5,45 @@ use async_trait::async_trait;
use bincode; use bincode;
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
use crate::error::Error;
use crate::segments::entry::{Entry, EntryDetailed};
use crate::store::{FilePosition, Column, Result};
use crate::index::Index;
use crate::cursor_capabilities::traversal::CursorCanTraverse;
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries; use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
use crate::cursor_capabilities::traversal::CursorCanTraverse;
use crate::error::Error;
use crate::index::Index;
use crate::segments::entry::{Entry, EntryDetailed};
use crate::store::{Column, FilePosition, Result};
#[async_trait] #[async_trait]
pub trait CursorCanReadIndex<T>: CursorCanTraverse<T> { pub trait CursorCanReadIndex<T>: CursorCanTraverse<T> {
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>]; fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>];
async fn index_lookup(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>> async fn index_lookup(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>>
where T: Encode + Decode + Ord + Send + Sync where
T: Encode + Decode + Ord + Send + Sync,
{ {
match &self.indexes()[column as usize] { match &self.indexes()[column as usize] {
Some(index) => { Some(index) => {
let file_positions = index.lookup(value).await?.unwrap_or_else(|| HashSet::new()); let file_positions = index.lookup(value).await?.unwrap_or_else(HashSet::new);
let mut entries: Vec<EntryDetailed<T>> = vec![]; let mut entries: Vec<EntryDetailed<T>> = vec![];
for &file_position in file_positions.iter() { for &file_position in file_positions.iter() {
match self.read_entry_at(file_position).await? { match self.read_entry_at(file_position).await? {
Some(entry) => { Some(entry) => entries.push(entry),
entries.push(entry) None => return Err(Error::IndexIsStoringEofFilePosition(column)),
},
None => {
return Err(Error::IndexIsStoringEofFilePosition(column))
}
} }
} }
Ok(entries) Ok(entries)
}, }
None => None => Err(Error::AttemptToIndexNonIndexableColumn(column)),
Err(Error::AttemptToIndexNonIndexableColumn(column))
} }
} }
async fn select_entries_where_eq(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>> async fn select_entries_where_eq(
where T: Encode + Decode + Ord + Send + Sync &mut self,
column: Column,
value: &T,
) -> Result<Vec<EntryDetailed<T>>>
where
T: Encode + Decode + Ord + Send + Sync,
{ {
if self.header().is_column_indexed(column) { if self.header().is_column_indexed(column) {
self.index_lookup(column, value).await self.index_lookup(column, value).await
@ -59,9 +60,7 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
// Assumes that the column is indexable. // Assumes that the column is indexable.
fn mut_index_at(&mut self, column: Column) -> &mut Index<T, FilePosition> { fn mut_index_at(&mut self, column: Column) -> &mut Index<T, FilePosition> {
match &mut self.indexes_mut()[column as usize] { match &mut self.indexes_mut()[column as usize] {
Some(index) => { Some(index) => index,
index
},
None => { None => {
unreachable!() unreachable!()
} }
@ -69,8 +68,14 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
} }
// Assumes that the column is indexable. // Assumes that the column is indexable.
async fn insert_into_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> async fn insert_into_index(
where T: Encode + Decode + Ord + Send + Sync + 'async_trait &mut self,
column: Column,
value: T,
file_position: FilePosition,
) -> Result<()>
where
T: Encode + Decode + Ord + Send + Sync + 'async_trait,
{ {
let index = self.mut_index_at(column as Column); let index = self.mut_index_at(column as Column);
index.insert(value, file_position).await?; index.insert(value, file_position).await?;
@ -78,8 +83,14 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
} }
// Assumes that the column is indexable. // Assumes that the column is indexable.
async fn delete_from_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()> async fn delete_from_index(
where T: Encode + Decode + Ord + Send + Sync + 'async_trait &mut self,
column: Column,
value: T,
file_position: FilePosition,
) -> Result<()>
where
T: Encode + Decode + Ord + Send + Sync + 'async_trait,
{ {
let index = self.mut_index_at(column as Column); let index = self.mut_index_at(column as Column);
index.delete(value, file_position).await?; index.delete(value, file_position).await?;
@ -87,15 +98,22 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
} }
async fn insert_entry(&mut self, entry: Entry<T>) -> Result<FilePosition> async fn insert_entry(&mut self, entry: Entry<T>) -> Result<FilePosition>
where T: Encode + Decode + Ord + Send + Sync + 'async_trait where
T: Encode + Decode + Ord + Send + Sync + 'async_trait,
{ {
let file_position = self.append_entry_no_indexing(&entry).await?; let file_position = self.append_entry_no_indexing(&entry).await?;
// insert the indexable values of the entry into corresponding indexes. // insert the indexable values of the entry into corresponding indexes.
for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() { for (column, (value, should_index)) in entry
.data
.into_iter()
.zip(self.header().indexed_columns.clone())
.enumerate()
{
if should_index { if should_index {
// SAFETY: If should_index is true, then the column is indexable. // SAFETY: If should_index is true, then the column is indexable.
self.insert_into_index(column as Column, value, file_position).await? self.insert_into_index(column as Column, value, file_position)
.await?
} }
} }
@ -103,12 +121,19 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
} }
async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed<T>) -> Result<()> async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed<T>) -> Result<()>
where T: Encode + Decode + Ord + Send + Sync + 'async_trait where
T: Encode + Decode + Ord + Send + Sync + 'async_trait,
{
for (column, (value, should_index)) in entry
.data
.into_iter()
.zip(self.header().indexed_columns.clone())
.enumerate()
{ {
for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() {
if should_index { if should_index {
// SAFETY: If should_index is true, then the column is indexable. // SAFETY: If should_index is true, then the column is indexable.
self.delete_from_index(column as Column, value, entry.file_position).await? self.delete_from_index(column as Column, value, entry.file_position)
.await?
} }
} }
Ok(()) Ok(())

View file

@ -1,4 +1,4 @@
pub(crate) mod primitive;
pub mod traversal;
pub mod entry_modification; pub mod entry_modification;
pub mod index_access; pub mod index_access;
pub(crate) mod primitive;
pub mod traversal;

View file

@ -1,11 +1,11 @@
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom};
use tokio::fs::File;
use async_trait::async_trait; use async_trait::async_trait;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
use crate::store::{FilePosition, Result}; use crate::store::{FilePosition, Result};
#[async_trait] #[async_trait]
pub(crate) trait CursorCanRead<T> { pub trait CursorCanRead<T> {
fn file(&mut self) -> &mut File; fn file(&mut self) -> &mut File;
fn eof_file_position(&self) -> FilePosition; fn eof_file_position(&self) -> FilePosition;
@ -55,7 +55,7 @@ pub(crate) trait CursorCanRead<T> {
} }
#[async_trait] #[async_trait]
pub(crate) trait CursorCanWrite<T>: CursorCanRead<T> { pub trait CursorCanWrite<T>: CursorCanRead<T> {
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> { async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
Ok(self.file().write(bytes).await?) Ok(self.file().write(bytes).await?)
} }

View file

@ -1,24 +1,24 @@
use tokio::io::AsyncReadExt;
use async_trait::async_trait; use async_trait::async_trait;
use tokio::io::AsyncReadExt;
use crate::binary_coding::decode;
use bincode; use bincode;
use bincode::Decode; use bincode::Decode;
use crate::binary_coding::decode;
use crate::error::{Error, DecodeErrorKind}; use crate::cursor_capabilities::primitive::CursorCanRead;
use crate::error::{DecodeErrorKind, Error};
use crate::segments::entry::EntryDetailed; use crate::segments::entry::EntryDetailed;
use crate::segments::entry_header::EntryHeaderWithDataSize; use crate::segments::entry_header::EntryHeaderWithDataSize;
use crate::segments::store_header::StoreHeader; use crate::segments::store_header::StoreHeader;
use crate::store::{FilePosition, Column, Result}; use crate::store::{Column, FilePosition, Result};
use crate::cursor_capabilities::primitive::CursorCanRead;
#[async_trait] #[async_trait]
pub trait CursorCanTraverse<T>: CursorCanRead<T> { pub trait CursorCanTraverse<T>: CursorCanRead<T> {
fn header(&self) -> &StoreHeader; fn header(&self) -> &StoreHeader;
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> { async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64)
.await
} }
async fn read_entry_header(&mut self) -> Result<EntryHeaderWithDataSize> { async fn read_entry_header(&mut self) -> Result<EntryHeaderWithDataSize> {
@ -30,14 +30,21 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
Ok(header) Ok(header)
} }
async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result<EntryHeaderWithDataSize> { async fn read_entry_header_at(
&mut self,
file_position: FilePosition,
) -> Result<EntryHeaderWithDataSize> {
self.seek_to(file_position).await?; self.seek_to(file_position).await?;
self.read_entry_header().await self.read_entry_header().await
} }
// Returns None when file_position == eof_file_position // Returns None when file_position == eof_file_position
async fn read_entry_at(&mut self, file_position: FilePosition) -> Result<Option<EntryDetailed<T>>> async fn read_entry_at(
where T: Decode &mut self,
file_position: FilePosition,
) -> Result<Option<EntryDetailed<T>>>
where
T: Decode,
{ {
self.seek_to(file_position).await?; self.seek_to(file_position).await?;
self.next().await self.next().await
@ -46,12 +53,11 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
// ===Iteration=== // ===Iteration===
// The following functions assume that the current file position is at a valid entry or EOF. // The following functions assume that the current file position is at a valid entry or EOF.
// WARNING: This moves the file_position to start of the data, so you can't just call // WARNING: This moves the file_position to start of the data, so you can't just call
// next_entry_header() a bunch of times. You must move the cursor! // next_entry_header() a bunch of times. You must move the cursor!
async fn next_entry_header(&mut self) -> Result<Option<EntryHeaderWithDataSize>> { async fn next_entry_header(&mut self) -> Result<Option<EntryHeaderWithDataSize>> {
if self.is_at_eof().await? { if self.is_at_eof().await? {
return Ok(None) return Ok(None);
} }
let entry_header = self.read_entry_header().await?; let entry_header = self.read_entry_header().await?;
@ -60,31 +66,47 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
} }
// This is meant to be used after next_entry_header() is called. // This is meant to be used after next_entry_header() is called.
async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result<FilePosition>{ async fn jump_from_start_of_entry_data_to_next_entry(
&mut self,
entry_header: &EntryHeaderWithDataSize,
) -> Result<FilePosition> {
let file_position = self.seek_by(entry_header.size_of_data() as i64).await?; let file_position = self.seek_by(entry_header.size_of_data() as i64).await?;
Ok(file_position) Ok(file_position)
} }
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>> async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
where T: Decode where
T: Decode,
{ {
let file_position = self.current_file_position().await?; let file_position = self.current_file_position().await?;
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; let Some(entry_header) = self.next_entry_header().await? else {
return Ok(None);
};
let mut data_bytes: Vec<u8> = vec![0; entry_header.size_of_data()]; let mut data_bytes: Vec<u8> = vec![0; entry_header.size_of_data()];
self.read_bytes(&mut data_bytes).await?; self.read_bytes(&mut data_bytes).await?;
let entry: EntryDetailed<T> = let entry: EntryDetailed<T> = EntryDetailed::decode(
EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?; entry_header,
file_position,
self.header().number_of_columns,
&data_bytes,
)?;
Ok(Some(entry)) Ok(Some(entry))
} }
// Like next, but only reads the column, not the whole entry. // Like next, but only reads the column, not the whole entry.
async fn next_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>> async fn next_at_column(
where T: Decode + Send &mut self,
column: Column,
) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
where
T: Decode + Send,
{ {
let file_position = self.current_file_position().await?; let file_position = self.current_file_position().await?;
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) }; let Some(entry_header) = self.next_entry_header().await? else {
return Ok(None);
};
let file_position_at_start_of_data = self.current_file_position().await?; let file_position_at_start_of_data = self.current_file_position().await?;
// figuring out how much to decode // figuring out how much to decode
@ -94,49 +116,59 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
// reading and decoding // reading and decoding
let mut bytes: Vec<u8> = vec![0; entry_header.data_sizes[column as usize]]; let mut bytes: Vec<u8> = vec![0; entry_header.data_sizes[column as usize]];
self.read_bytes(&mut bytes).await?; self.read_bytes(&mut bytes).await?;
let (value, _) = let (value, _) = decode::<T>(&bytes[..])
decode::<T>(&bytes[..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
// jumping to next entry // jumping to next entry
self.seek_to(file_position_at_start_of_data).await?; self.seek_to(file_position_at_start_of_data).await?;
self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?; self.jump_from_start_of_entry_data_to_next_entry(&entry_header)
.await?;
Ok(Some((entry_header, file_position, value))) Ok(Some((entry_header, file_position, value)))
} }
async fn next_alive_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>> async fn next_alive_at_column(
where T: Decode + Send &mut self,
column: Column,
) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
where
T: Decode + Send,
{ {
while let Some((header, file_position, t)) = self.next_at_column(column).await? { while let Some((header, file_position, t)) = self.next_at_column(column).await? {
if !header.is_deleted { if !header.is_deleted {
return Ok(Some((header, file_position, t))) return Ok(Some((header, file_position, t)));
} }
} }
Ok(None) Ok(None)
} }
async fn next_alive(&mut self) -> Result<Option<EntryDetailed<T>>> async fn next_alive(&mut self) -> Result<Option<EntryDetailed<T>>>
where T: Decode where
T: Decode,
{ {
while let Some(entry) = self.next().await? { while let Some(entry) = self.next().await? {
if !entry.header.is_deleted { if !entry.header.is_deleted {
return Ok(Some(entry)) return Ok(Some(entry));
} }
} }
Ok(None) Ok(None)
} }
// ===Search=== // ===Search===
async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>> async fn find_first_eq_bruteforce(
where T: Decode + PartialEq + Send + Sync &mut self,
column: Column,
t0: &T,
) -> Result<Option<EntryDetailed<T>>>
where
T: Decode + PartialEq + Send + Sync,
{ {
let mut file_position = self.current_file_position().await?; let mut file_position = self.current_file_position().await?;
while let Some((_, _, t)) = self.next_alive_at_column(column).await? { while let Some((_, _, t)) = self.next_alive_at_column(column).await? {
if &t == t0 { if &t == t0 {
// go back and decode the whole entry // go back and decode the whole entry
self.seek_to(file_position).await?; self.seek_to(file_position).await?;
return self.next().await return self.next().await;
} else { } else {
file_position = self.current_file_position().await?; file_position = self.current_file_position().await?;
} }
@ -144,8 +176,13 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
Ok(None) Ok(None)
} }
async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Vec<EntryDetailed<T>>> async fn find_all_eq_bruteforce(
where T: Decode + PartialEq + Send + Sync &mut self,
column: Column,
t0: &T,
) -> Result<Vec<EntryDetailed<T>>>
where
T: Decode + PartialEq + Send + Sync,
{ {
let mut entries = vec![]; let mut entries = vec![];
while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? { while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? {
@ -156,7 +193,8 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
// ===Debugging=== // ===Debugging===
async fn read_entries(&mut self) -> Result<()> async fn read_entries(&mut self) -> Result<()>
where T: Decode + std::fmt::Debug where
T: Decode + std::fmt::Debug,
{ {
self.seek_to_start_of_data().await?; self.seek_to_start_of_data().await?;
while let Some(entry) = self.next().await? { while let Some(entry) = self.next().await? {

View file

@ -7,7 +7,6 @@ use std::hash::Hash;
use std::io::SeekFrom; use std::io::SeekFrom;
use crate::binary_coding::{decode, encode}; use crate::binary_coding::{decode, encode};
use bincode;
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
use crate::error::{DecodeErrorKind, Error}; use crate::error::{DecodeErrorKind, Error};
@ -63,7 +62,7 @@ where
Ok(()) Ok(())
} }
pub fn insert_desynced(&mut self, k: K, v: V) -> () { pub fn insert_desynced(&mut self, k: K, v: V) {
self.data.entry(k).or_insert_with(HashSet::new).insert(v); self.data.entry(k).or_insert_with(HashSet::new).insert(v);
} }

View file

@ -1,7 +1,7 @@
pub mod store;
mod binary_coding; mod binary_coding;
pub mod cursor;
pub mod cursor_capabilities;
pub mod error; pub mod error;
mod index; mod index;
pub mod cursor;
pub mod segments; pub mod segments;
pub mod cursor_capabilities; pub mod store;

View file

@ -1,9 +1,9 @@
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence}; use crate::binary_coding::{decode_sequence, encode_sequence, encode_sequence_with_sizes};
use crate::store::{Result, FilePosition}; use crate::error::{DecodeErrorKind, Error};
use crate::error::{Error, DecodeErrorKind};
use crate::segments::entry_header::{EntryHeader, EntryHeaderWithDataSize}; use crate::segments::entry_header::{EntryHeader, EntryHeaderWithDataSize};
use crate::store::{FilePosition, Result};
#[derive(Debug)] #[derive(Debug)]
pub struct Entry<T> { pub struct Entry<T> {
@ -18,14 +18,18 @@ pub struct EntryDetailed<T> {
pub data: Vec<T>, pub data: Vec<T>,
} }
impl <T>Entry<T> { impl<T> Entry<T> {
pub fn new(data: Vec<T>) -> Self { pub fn new(data: Vec<T>) -> Self {
Self { header: EntryHeader { is_deleted: false }, data } Self {
header: EntryHeader { is_deleted: false },
data,
}
} }
// FORMAT: [EntryHeaderWithDataSize, ..sequence of data] // FORMAT: [EntryHeaderWithDataSize, ..sequence of data]
pub fn encode(&self) -> Result<Vec<u8>> pub fn encode(&self) -> Result<Vec<u8>>
where T: Encode where
T: Encode,
{ {
let mut result: Vec<u8> = self.header.encode()?; let mut result: Vec<u8> = self.header.encode()?;
@ -36,17 +40,28 @@ impl <T>Entry<T> {
} }
} }
impl <T>EntryDetailed<T> { impl<T> EntryDetailed<T> {
pub fn decode(header: EntryHeaderWithDataSize, file_position: FilePosition, number_of_columns: usize, bytes: &[u8]) -> Result<Self> pub fn decode(
where T: Decode header: EntryHeaderWithDataSize,
file_position: FilePosition,
number_of_columns: usize,
bytes: &[u8],
) -> Result<Self>
where
T: Decode,
{ {
let data = decode_sequence::<T>(number_of_columns, bytes) let data = decode_sequence::<T>(number_of_columns, bytes)
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?; .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?;
Ok(EntryDetailed { header, file_position, data }) Ok(EntryDetailed {
header,
file_position,
data,
})
} }
pub fn forget(&self) -> Entry<T> pub fn forget(&self) -> Entry<T>
where T: Clone where
T: Clone,
{ {
Entry { Entry {
header: self.header.clone().into(), header: self.header.clone().into(),

View file

@ -1,6 +1,6 @@
use crate::binary_coding::{decode, encode, decode_sequence}; use crate::binary_coding::{decode, decode_sequence, encode};
use crate::store::{Result, Column}; use crate::error::{DecodeErrorKind, Error};
use crate::error::{Error, DecodeErrorKind}; use crate::store::{Column, Result};
use std::mem::size_of; use std::mem::size_of;
#[derive(Debug)] #[derive(Debug)]
@ -24,7 +24,9 @@ impl EntryHeader {
impl From<EntryHeaderWithDataSize> for EntryHeader { impl From<EntryHeaderWithDataSize> for EntryHeader {
fn from(entry: EntryHeaderWithDataSize) -> Self { fn from(entry: EntryHeaderWithDataSize) -> Self {
Self { is_deleted: entry.is_deleted, } Self {
is_deleted: entry.is_deleted,
}
} }
} }
@ -34,11 +36,11 @@ impl EntryHeaderWithDataSize {
pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE; pub const DATA_SIZES_OFFSET: usize = Self::IS_DELETED_OFFSET + Self::IS_DELETED_SIZE;
pub fn size(number_of_columns: usize) -> usize { pub fn size(number_of_columns: usize) -> usize {
let size_of_data_sizes: usize = number_of_columns*size_of::<usize>(); let size_of_data_sizes: usize = number_of_columns * size_of::<usize>();
Self::IS_DELETED_SIZE + size_of_data_sizes Self::IS_DELETED_SIZE + size_of_data_sizes
} }
pub fn size_of_data(&self) -> usize{ pub fn size_of_data(&self) -> usize {
self.data_sizes.iter().sum() self.data_sizes.iter().sum()
} }
@ -48,21 +50,23 @@ impl EntryHeaderWithDataSize {
if i < column as usize { if i < column as usize {
sum += size; sum += size;
} else { } else {
break break;
} }
} }
sum sum
} }
pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result<Self> { pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result<Self> {
let (is_deleted, _) = let (is_deleted, _) = decode::<bool>(bytes)
decode::<bool>(&bytes)
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?; .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
let data_sizes = decode_sequence::<usize>(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..]) let data_sizes =
decode_sequence::<usize>(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..])
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?; .map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?;
Ok(Self { is_deleted, data_sizes } ) Ok(Self {
is_deleted,
data_sizes,
})
} }
} }

View file

@ -1,6 +1,6 @@
use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence}; use crate::binary_coding::{decode, decode_sequence, encode, encode_sequence};
use crate::store::{Result, Column}; use crate::error::{DecodeErrorKind, Error};
use crate::error::{Error, DecodeErrorKind}; use crate::store::{Column, Result};
use std::mem::size_of; use std::mem::size_of;
use std::path::PathBuf; use std::path::PathBuf;
@ -30,14 +30,19 @@ impl StoreHeader {
pub const DELETED_COUNT_SIZE: usize = size_of::<usize>(); pub const DELETED_COUNT_SIZE: usize = size_of::<usize>();
pub const TOTAL_COUNT_SIZE: usize = size_of::<usize>(); pub const TOTAL_COUNT_SIZE: usize = size_of::<usize>();
pub const PRIMARY_COLUMN_SIZE: usize = size_of::<Column>(); pub const PRIMARY_COLUMN_SIZE: usize = size_of::<Column>();
pub const FIXED_SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE; pub const FIXED_SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE
+ Self::DELETED_COUNT_SIZE
+ Self::TOTAL_COUNT_SIZE
+ Self::PRIMARY_COLUMN_SIZE;
pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0; pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0;
pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE; pub const DELETED_COUNT_OFFSET: usize =
Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE;
pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE; pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE;
pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE; pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE;
#[allow(dead_code)] #[allow(dead_code)]
pub const INDEXED_COLUMNS_OFFSET: usize = Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE; pub const INDEXED_COLUMNS_OFFSET: usize =
Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE;
fn indexed_columns_size(number_of_columns: usize) -> usize { fn indexed_columns_size(number_of_columns: usize) -> usize {
size_of::<bool>() * number_of_columns size_of::<bool>() * number_of_columns
@ -64,18 +69,28 @@ impl StoreHeader {
vec![0; Self::indexed_columns_size(header.number_of_columns)] vec![0; Self::indexed_columns_size(header.number_of_columns)]
} }
pub async fn decode_fixed(table_folder: &PathBuf, result: &[u8]) -> Result<StoreHeaderFixedPart> { pub async fn decode_fixed(
let (number_of_columns, _) = table_folder: &PathBuf,
decode::<usize>(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE]) result: &[u8],
) -> Result<StoreHeaderFixedPart> {
let (number_of_columns, _) = decode::<usize>(
&result[Self::NUMBER_OF_COLUMNS_OFFSET
..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE],
)
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?; .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
let (deleted_count, _) = let (deleted_count, _) = decode::<usize>(
decode::<usize>(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE]) &result
[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE],
)
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?; .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?;
let (total_count, _) = let (total_count, _) = decode::<usize>(
decode::<usize>(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE]) &result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE],
)
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?; .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?;
let (primary_column, _) = let (primary_column, _) = decode::<Column>(
decode::<Column>(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE]) &result[Self::PRIMARY_COLUMN_OFFSET
..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE],
)
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?; .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?;
let header = StoreHeaderFixedPart { let header = StoreHeaderFixedPart {
table_folder: table_folder.clone(), table_folder: table_folder.clone(),
@ -89,12 +104,11 @@ impl StoreHeader {
} }
pub async fn decode_rest(header: StoreHeaderFixedPart, result: &[u8]) -> Result<StoreHeader> { pub async fn decode_rest(header: StoreHeaderFixedPart, result: &[u8]) -> Result<StoreHeader> {
let indexed_columns: Vec<bool> = let indexed_columns: Vec<bool> = decode_sequence::<bool>(header.number_of_columns, result)
decode_sequence::<bool>(header.number_of_columns, result)
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?; .map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?;
Ok(StoreHeader { Ok(StoreHeader {
table_folder: header.table_folder.into(), table_folder: header.table_folder,
number_of_columns: header.number_of_columns, number_of_columns: header.number_of_columns,
deleted_count: header.deleted_count, deleted_count: header.deleted_count,
total_count: header.total_count, total_count: header.total_count,
@ -104,8 +118,6 @@ impl StoreHeader {
}) })
} }
// returns new count // returns new count
pub fn increment_total_count(&mut self) -> usize { pub fn increment_total_count(&mut self) -> usize {
self.total_count += 1; self.total_count += 1;

View file

@ -1,22 +1,20 @@
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::fs::{File, OpenOptions, DirBuilder};
use tokio::fs;
use std::path::{Path, PathBuf};
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::fs::{DirBuilder, File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::error::Error;
use crate::cursor::{ReadCursor, WriteCursor}; use crate::cursor::{ReadCursor, WriteCursor};
use crate::cursor_capabilities::traversal::CursorCanTraverse; use crate::cursor_capabilities::traversal::CursorCanTraverse;
use crate::segments::store_header::StoreHeader; use crate::error::Error;
use crate::index::Index; use crate::index::Index;
use crate::segments::store_header::StoreHeader;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
pub type Column = u64; pub type Column = u64;
pub type FilePosition = u64; pub type FilePosition = u64;
// TODO: Consider adding another type parameter for indexable values
#[derive(Debug)] #[derive(Debug)]
pub struct Store<T> { pub struct Store<T> {
pub header: StoreHeader, pub header: StoreHeader,
@ -25,23 +23,26 @@ pub struct Store<T> {
pub type StoreIndexes<T> = Vec<Option<Index<T, FilePosition>>>; pub type StoreIndexes<T> = Vec<Option<Index<T, FilePosition>>>;
//===Store=== //===Store===
pub async fn store_exists(table_folder: &str) -> Result<bool> { pub async fn store_exists(table_folder: &str) -> Result<bool> {
Ok(fs::metadata(table_folder).await.is_ok()) Ok(fs::metadata(table_folder).await.is_ok())
} }
pub const ROWS_FILE_NAME: &'static str = "rows"; pub const ROWS_FILE_NAME: &str = "rows";
pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate"; pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &str = "rows_intermediate";
impl <T>Store<T> { impl<T> Store<T> {
// ===Creation=== // ===Creation===
pub async fn new(path_to_table: &Path, number_of_columns: usize, primary_column: Column) -> Result<Self> pub async fn new(
where T: Encode + Decode + Ord path_to_table: &Path,
number_of_columns: usize,
primary_column: Column,
) -> Result<Self>
where
T: Encode + Decode + Ord,
{ {
let path_to_rows = path_to_table.join(ROWS_FILE_NAME); let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
DirBuilder::new() DirBuilder::new().create(path_to_table).await?;
.create(path_to_table).await?;
let header = { let header = {
let mut indexed_columns = vec![false; number_of_columns]; let mut indexed_columns = vec![false; number_of_columns];
@ -61,53 +62,63 @@ impl <T>Store<T> {
let indexes: StoreIndexes<T> = Self::create_initial_indexes(&header).await?; let indexes: StoreIndexes<T> = Self::create_initial_indexes(&header).await?;
let store = Self { let store = Self { header, indexes };
header,
indexes,
};
Ok(store) Ok(store)
} }
pub fn path_to_index_file(header: &StoreHeader, column: Column) -> PathBuf { pub fn path_to_index_file(header: &StoreHeader, column: Column) -> PathBuf {
let path_to_table = Path::new(&header.table_folder); let path_to_table = Path::new(&header.table_folder);
let path_to_index = path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column.to_string())); let path_to_index =
path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column));
path_to_index path_to_index
} }
pub async fn create_empty_index_at(header: &StoreHeader, column: Column) -> Result<Index<T, FilePosition>> pub async fn create_empty_index_at(
where T: Encode + Decode + Ord header: &StoreHeader,
column: Column,
) -> Result<Index<T, FilePosition>>
where
T: Encode + Decode + Ord,
{ {
let path_to_index = Self::path_to_index_file(&header, column); let path_to_index = Self::path_to_index_file(header, column);
let index = Index::new(path_to_index).await?; let index = Index::new(path_to_index).await?;
Ok(index) Ok(index)
} }
pub async fn create_initial_indexes(header: &StoreHeader) -> Result<StoreIndexes<T>> pub async fn create_initial_indexes(header: &StoreHeader) -> Result<StoreIndexes<T>>
where T: Encode + Decode + Ord where
T: Encode + Decode + Ord,
{ {
let mut result: StoreIndexes<T> = Vec::with_capacity(header.number_of_columns); let mut result: StoreIndexes<T> = Vec::with_capacity(header.number_of_columns);
for _ in 0..header.number_of_columns { for _ in 0..header.number_of_columns {
result.push(None) result.push(None)
} }
result[header.primary_column as usize] = Some(Self::create_empty_index_at(&header, header.primary_column).await?); result[header.primary_column as usize] =
Some(Self::create_empty_index_at(header, header.primary_column).await?);
Ok(result) Ok(result)
} }
pub async fn connect_index_at(header: &StoreHeader, column: Column) -> Result<Index<T, FilePosition>> pub async fn connect_index_at(
where T: Encode + Decode + Ord header: &StoreHeader,
column: Column,
) -> Result<Index<T, FilePosition>>
where
T: Encode + Decode + Ord,
{ {
let path_to_index = Self::path_to_index_file(&header, column); let path_to_index = Self::path_to_index_file(header, column);
let index: Index<T, FilePosition> = Index::connect(path_to_index).await?; let index: Index<T, FilePosition> = Index::connect(path_to_index).await?;
Ok(index) Ok(index)
} }
pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result<File> { pub async fn create_empty_rows_file(
let mut file: File = path_to_rows: PathBuf,
OpenOptions::new() header: &StoreHeader,
) -> Result<File> {
let mut file: File = OpenOptions::new()
.write(true) .write(true)
.read(true) .read(true)
.create_new(true) .create_new(true)
@ -121,13 +132,13 @@ impl <T>Store<T> {
} }
pub async fn connect(table_folder: &PathBuf) -> Result<Self> pub async fn connect(table_folder: &PathBuf) -> Result<Self>
where T: std::fmt::Debug + Encode + Decode + Ord where
T: std::fmt::Debug + Encode + Decode + Ord,
{ {
let path_to_table = Path::new(table_folder); let path_to_table = Path::new(table_folder);
let path_to_rows = path_to_table.join(ROWS_FILE_NAME); let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
let mut file: File = let mut file: File = OpenOptions::new()
OpenOptions::new()
.read(true) .read(true)
.write(true) .write(true)
.open(path_to_rows) .open(path_to_rows)
@ -146,12 +157,13 @@ impl <T>Store<T> {
StoreHeader::decode_rest(fixed_header, &rest_bytes).await? StoreHeader::decode_rest(fixed_header, &rest_bytes).await?
}; };
let indexes: StoreIndexes<T> = { let indexes: StoreIndexes<T> = {
let mut result = Vec::with_capacity(header.number_of_columns); let mut result = Vec::with_capacity(header.number_of_columns);
for (column, &is_indexed) in header.indexed_columns.iter().enumerate() { for (column, &is_indexed) in header.indexed_columns.iter().enumerate() {
if is_indexed { if is_indexed {
result.push(Some(Self::connect_index_at(&header, column as Column).await?)) result.push(Some(
Self::connect_index_at(&header, column as Column).await?,
))
} else { } else {
result.push(None) result.push(None)
} }
@ -160,29 +172,29 @@ impl <T>Store<T> {
result result
}; };
let store = Self { let store = Self { header, indexes };
header,
indexes
};
Ok(store) Ok(store)
} }
// ===Cursors=== // ===Cursors===
pub async fn read_cursor(&self) -> Result<ReadCursor<T>> pub async fn read_cursor(&self) -> Result<ReadCursor<T>>
where T: Send + Sync where
T: Send + Sync,
{ {
ReadCursor::new(self).await ReadCursor::new(self).await
} }
pub async fn write_cursor(&mut self) -> Result<WriteCursor<T>> pub async fn write_cursor(&mut self) -> Result<WriteCursor<T>>
where T: Send + Sync where
T: Send + Sync,
{ {
WriteCursor::new(self).await WriteCursor::new(self).await
} }
// ===Indexes=== // ===Indexes===
pub async fn attach_index(&mut self, column: Column) -> Result<()> pub async fn attach_index(&mut self, column: Column) -> Result<()>
where T: Ord + Decode + Encode + Send + Sync where
T: Ord + Decode + Encode + Send + Sync,
{ {
if self.header.is_column_indexed(column) { if self.header.is_column_indexed(column) {
Err(Error::ColumnAlreadyIndexed(column)) Err(Error::ColumnAlreadyIndexed(column))
@ -195,7 +207,8 @@ impl <T>Store<T> {
// For debugging. // For debugging.
#[allow(dead_code)] #[allow(dead_code)]
pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error> pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>
where T: Send + Sync where
T: Send + Sync,
{ {
let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?; let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
let bytes = cursor.read_all_bytes().await?; let bytes = cursor.read_all_bytes().await?;
@ -206,11 +219,11 @@ impl <T>Store<T> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::segments::entry::Entry; use crate::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex};
use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex};
use crate::cursor_capabilities::traversal::CursorCanTraverse; use crate::cursor_capabilities::traversal::CursorCanTraverse;
use crate::segments::entry::Entry;
impl <T>Drop for Store<T> { impl<T> Drop for Store<T> {
fn drop(&mut self) { fn drop(&mut self) {
println!("DROPPING TEST FOLDER"); println!("DROPPING TEST FOLDER");
let table_folder = self.header.table_folder.clone(); let table_folder = self.header.table_folder.clone();
@ -219,7 +232,6 @@ mod tests {
} }
} }
#[tokio::test] #[tokio::test]
async fn test_create() { async fn test_create() {
type Data = u32; type Data = u32;
@ -227,7 +239,9 @@ mod tests {
let table_path = Path::new("test_table_0"); let table_path = Path::new("test_table_0");
let number_of_columns = 5; let number_of_columns = 5;
let primary_column = 0; let primary_column = 0;
let store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
.await
.unwrap();
assert!(store.header.number_of_columns == number_of_columns); assert!(store.header.number_of_columns == number_of_columns);
assert!(store.header.total_count == 0); assert!(store.header.total_count == 0);
@ -242,7 +256,9 @@ mod tests {
let table_path = Path::new("test_table_1"); let table_path = Path::new("test_table_1");
let number_of_columns = 5; let number_of_columns = 5;
let primary_column = 0; let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
.await
.unwrap();
{ {
let mut cursor = store.write_cursor().await.unwrap(); let mut cursor = store.write_cursor().await.unwrap();
@ -264,7 +280,9 @@ mod tests {
let table_path = Path::new("test_table_2"); let table_path = Path::new("test_table_2");
let number_of_columns = 5; let number_of_columns = 5;
let primary_column = 0; let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
.await
.unwrap();
{ {
let mut cursor = store.write_cursor().await.unwrap(); let mut cursor = store.write_cursor().await.unwrap();
@ -284,8 +302,8 @@ mod tests {
let entry0 = cursor.next().await.unwrap().unwrap(); let entry0 = cursor.next().await.unwrap().unwrap();
let entry1 = cursor.next().await.unwrap().unwrap(); let entry1 = cursor.next().await.unwrap().unwrap();
assert!(entry0.data == vec![1,2,3,4,5]); assert!(entry0.data == vec![1, 2, 3, 4, 5]);
assert!(entry1.data == vec![6,7,8,9,10]); assert!(entry1.data == vec![6, 7, 8, 9, 10]);
} }
} }
@ -296,7 +314,9 @@ mod tests {
let table_path = Path::new("test_table_3"); let table_path = Path::new("test_table_3");
let number_of_columns = 5; let number_of_columns = 5;
let primary_column = 0; let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
.await
.unwrap();
{ {
let mut cursor = store.write_cursor().await.unwrap(); let mut cursor = store.write_cursor().await.unwrap();
@ -319,8 +339,8 @@ mod tests {
} }
assert!(entries.len() == 2); assert!(entries.len() == 2);
assert!(entries[0].data == vec![1,2,3,4,5]); assert!(entries[0].data == vec![1, 2, 3, 4, 5]);
assert!(entries[1].data == vec![6,7,8,9,10]); assert!(entries[1].data == vec![6, 7, 8, 9, 10]);
} }
} }
@ -331,7 +351,9 @@ mod tests {
let table_path = Path::new("test_table_4"); let table_path = Path::new("test_table_4");
let number_of_columns = 5; let number_of_columns = 5;
let primary_column = 0; let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
.await
.unwrap();
let value = 200; let value = 200;
{ {
@ -356,7 +378,10 @@ mod tests {
let mut cursor = store.read_cursor().await.unwrap(); let mut cursor = store.read_cursor().await.unwrap();
let column = 1; let column = 1;
let entries = cursor.select_entries_where_eq(column, &value).await.unwrap(); let entries = cursor
.select_entries_where_eq(column, &value)
.await
.unwrap();
assert!(entries.len() == 2); assert!(entries.len() == 2);
assert!(entries[0].data == vec![1, value, 3, 4, 5]); assert!(entries[0].data == vec![1, value, 3, 4, 5]);
@ -371,7 +396,9 @@ mod tests {
let table_path = Path::new("test_table_5"); let table_path = Path::new("test_table_5");
let number_of_columns = 5; let number_of_columns = 5;
let primary_column = 0; let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
.await
.unwrap();
let column: Column = 1; let column: Column = 1;
@ -402,7 +429,10 @@ mod tests {
let mut cursor = store.read_cursor().await.unwrap(); let mut cursor = store.read_cursor().await.unwrap();
let column = 1; let column = 1;
let entries = cursor.select_entries_where_eq(column, &value).await.unwrap(); let entries = cursor
.select_entries_where_eq(column, &value)
.await
.unwrap();
assert!(entries.len() == 2); assert!(entries.len() == 2);
// Order may be non-deterministic. // Order may be non-deterministic.
assert!(entries[0].data[column as usize] == value); assert!(entries[0].data[column as usize] == value);
@ -417,7 +447,9 @@ mod tests {
let table_path = Path::new("test_table_6"); let table_path = Path::new("test_table_6");
let number_of_columns = 5; let number_of_columns = 5;
let primary_column = 0; let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
.await
.unwrap();
let value = 200; let value = 200;
let (_file_position0, file_position1, _file_position2, _file_position3) = { let (_file_position0, file_position1, _file_position2, _file_position3) = {
@ -436,7 +468,12 @@ mod tests {
let file_position3 = cursor.insert_entry(entry3).await.unwrap(); let file_position3 = cursor.insert_entry(entry3).await.unwrap();
assert!(store.header.total_count == 4); assert!(store.header.total_count == 4);
(file_position0, file_position1, file_position2, file_position3) (
file_position0,
file_position1,
file_position2,
file_position3,
)
}; };
{ {
@ -454,7 +491,9 @@ mod tests {
let table_path = Path::new("test_table_7"); let table_path = Path::new("test_table_7");
let number_of_columns = 5; let number_of_columns = 5;
let primary_column = 0; let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
.await
.unwrap();
let column: Column = 1; let column: Column = 1;
@ -480,13 +519,21 @@ mod tests {
let file_position3 = cursor.insert_entry(entry3).await.unwrap(); let file_position3 = cursor.insert_entry(entry3).await.unwrap();
assert!(store.header.total_count == 4); assert!(store.header.total_count == 4);
(file_position0, file_position1, file_position2, file_position3) (
file_position0,
file_position1,
file_position2,
file_position3,
)
}; };
{ {
assert!(store.header.deleted_count == 0); assert!(store.header.deleted_count == 0);
let mut cursor = store.write_cursor().await.unwrap(); let mut cursor = store.write_cursor().await.unwrap();
cursor.delete_entries_where_eq(column, &value, false).await.unwrap(); cursor
.delete_entries_where_eq(column, &value, false)
.await
.unwrap();
assert!(store.header.deleted_count == 2); assert!(store.header.deleted_count == 2);
} }
} }
@ -498,7 +545,9 @@ mod tests {
let table_path = Path::new("test_table_8"); let table_path = Path::new("test_table_8");
let number_of_columns = 5; let number_of_columns = 5;
let primary_column = 0; let primary_column = 0;
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap(); let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
.await
.unwrap();
let column: Column = 1; let column: Column = 1;
@ -524,13 +573,21 @@ mod tests {
let file_position3 = cursor.insert_entry(entry3).await.unwrap(); let file_position3 = cursor.insert_entry(entry3).await.unwrap();
assert!(store.header.total_count == 4); assert!(store.header.total_count == 4);
(file_position0, file_position1, file_position2, file_position3) (
file_position0,
file_position1,
file_position2,
file_position3,
)
}; };
{ {
assert!(store.header.deleted_count == 0); assert!(store.header.deleted_count == 0);
let mut cursor = store.write_cursor().await.unwrap(); let mut cursor = store.write_cursor().await.unwrap();
cursor.delete_entries_where_eq(column, &value, false).await.unwrap(); cursor
.delete_entries_where_eq(column, &value, false)
.await
.unwrap();
assert!(cursor.header().deleted_count == 2); assert!(cursor.header().deleted_count == 2);
assert!(cursor.header().total_count == 4); assert!(cursor.header().total_count == 4);
@ -539,5 +596,4 @@ mod tests {
assert!(cursor.header().total_count == 2); assert!(cursor.header().total_count == 2);
} }
} }
} }