Compare commits
No commits in common. "72013ac9d3423f551e852584169141f3b627e095" and "09fa9829130ba7a24a19a4e758017125de847149" have entirely different histories.
72013ac9d3
...
09fa982913
41 changed files with 665 additions and 11069 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -1,3 +1,3 @@
|
|||
.idea
|
||||
target
|
||||
/target
|
||||
tmp_repl.txt
|
||||
|
|
|
|||
16
.gitlab-ci.yml
Normal file
16
.gitlab-ci.yml
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
stages:
|
||||
- primary
|
||||
|
||||
build-and-test:
|
||||
stage: primary
|
||||
image: rust:1.74.0
|
||||
tags:
|
||||
- shared-fi
|
||||
rules:
|
||||
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
|
||||
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
|
||||
script:
|
||||
# build and test in single job for faster execution
|
||||
# because shared runner doesn't allow caching
|
||||
- cargo build --verbose
|
||||
- cargo test --verbose
|
||||
26
README.md
26
README.md
|
|
@ -15,29 +15,28 @@
|
|||
|
||||
## Running the server
|
||||
```bash
|
||||
cargo run -p server -- [OPTIONS] --folder <FOLDER>
|
||||
cargo run -p server -- [OPTIONS] --file <FILE>
|
||||
```
|
||||
```
|
||||
Options:
|
||||
-a, --address <ADDRESS> IP address for the server to listen on [default: 127.0.0.1]
|
||||
-p, --port <PORT> Port for the server to listen on [default: 5432]
|
||||
-f, --folder <FOLDER> Path to the folder for database data
|
||||
-t, --throttle <DELAY> Delay between rows in milliseconds
|
||||
-f, --file <FILE> Path to the data file
|
||||
-h, --help Print help
|
||||
```
|
||||
|
||||
This will start the server listening on `<ADDRESS>:<PORT>` and load the state from `<FOLDER>`.
|
||||
If the `<FOLDER>` does not exist, it will be created.
|
||||
This will start the server listening on `<ADDRESS>:<PORT>` and load the state from `<FILE>`.
|
||||
If the `<FILE>` does not exist, it will be created.
|
||||
|
||||
### Demo Database
|
||||
Commands that set up a database with demo data are available in `demo-1.sql`, `demo-2.sql`, and `demo-3.sql`.
|
||||
A database with demo data is available in `demo.json`. To run the server with this database, use:
|
||||
```bash
|
||||
cargo run -p server -- --file demo.json
|
||||
```
|
||||
|
||||
These files showcase the following things:
|
||||
- `demo-1.sql`: small tables that handle Unicode and make use of the Optional type
|
||||
- `demo-2.sql`: a bigger (1000 rows) table with realistic data
|
||||
- `demo-3.sql`: a big (10,000 rows) table with simple data (was easier to generate)
|
||||
|
||||
See comments in these files for details.
|
||||
This database contains two tables:
|
||||
- Table `users` with columns `id`, `name`, `surname`, `email`
|
||||
- Table `cars` with columns `id`, `vid`, `brand`, `model`, `year`
|
||||
|
||||
## Running the client
|
||||
```bash
|
||||
|
|
@ -57,7 +56,7 @@ SQL queries can be entered line by line. The client will print the result of eac
|
|||
To exit the REPL client, enter `exit` or `quit`.
|
||||
|
||||
# Features
|
||||
- SQL must be on single line, in **UPPERCASE**, as it should be. The `;` at EOL is optional.
|
||||
- SQL must be on single line, in **UPPERCASE** and end with `;`, as it should be
|
||||
- Supported operations: `CREATE TABLE`, `CREATE INDEX`, `SELECT`, `INSERT`, `DELETE`
|
||||
- Supported data types: `UUID`, `STRING`, `INT`, `NUMBER`
|
||||
- Supported subset of PostgreSQL protocol, without authentication and simple query flow
|
||||
|
|
@ -130,7 +129,6 @@ DELETE FROM users WHERE name = "Christina";
|
|||
- `STRING` - string enclosed in double quotes, e.g. `"Hello World"`
|
||||
- `INT` - integer, e.g. `12345`
|
||||
- `NUMBER` - floating point number, e.g. `123.45`
|
||||
- `Option(Something)` - optional type, e.g. `Some("james.stuart@gmail.com")` or `None`
|
||||
|
||||
## Testing with `psql`
|
||||
Thanks to the subset of PostgreSQL protocol implemented in `proto`, the server can be tested with `psql`,
|
||||
|
|
|
|||
17
demo-1.sql
17
demo-1.sql
|
|
@ -1,13 +1,10 @@
|
|||
-- Create the users table
|
||||
|
||||
CREATE TABLE users (id UUID PRIMARY KEY, name STRING, surname STRING);
|
||||
|
||||
-- Create the cars table with vid as an optional type
|
||||
|
||||
CREATE TABLE cars (id UUID PRIMARY KEY, vid Option(STRING), brand STRING, model STRING);
|
||||
|
||||
-- Insert entries into users
|
||||
|
||||
INSERT INTO users (id, name, surname) VALUES (u1001, "Jiří", "Novák");
|
||||
INSERT INTO users (id, name, surname) VALUES (u1002, "Petr", "Svoboda");
|
||||
INSERT INTO users (id, name, surname) VALUES (u1003, "Marek", "Dvořák");
|
||||
|
|
@ -20,7 +17,6 @@ INSERT INTO users (id, name, surname) VALUES (u1009, "ゆうた", "わたなべ"
|
|||
INSERT INTO users (id, name, surname) VALUES (u1010, "あやか", "おかもと");
|
||||
|
||||
-- Insert entries into cars
|
||||
|
||||
INSERT INTO cars (id, vid, brand, model) VALUES (u2001, None, "Toyota", "Corolla");
|
||||
INSERT INTO cars (id, vid, brand, model) VALUES (u2002, None, "Ford", "Fiesta");
|
||||
INSERT INTO cars (id, vid, brand, model) VALUES (u2003, Some("WBAKB0C52AC111480"), "Volkswagen", "Golf");
|
||||
|
|
@ -33,29 +29,16 @@ INSERT INTO cars (id, vid, brand, model) VALUES (u2009, None, "Kia", "Optima");
|
|||
INSERT INTO cars (id, vid, brand, model) VALUES (u2010, Some("1G4GG5E35DF715445"), "Audi", "A4");
|
||||
|
||||
-- SELECT and DELETE commands for users and cars
|
||||
|
||||
SELECT * FROM users;
|
||||
|
||||
SELECT name FROM users;
|
||||
|
||||
SELECT name, surname FROM users;
|
||||
|
||||
SELECT * FROM users WHERE name = "さくら";
|
||||
|
||||
DELETE FROM users WHERE surname = "Novák";
|
||||
|
||||
SELECT * FROM users;
|
||||
|
||||
|
||||
|
||||
SELECT * FROM cars;
|
||||
|
||||
SELECT brand FROM cars;
|
||||
|
||||
SELECT brand, model FROM cars;
|
||||
|
||||
SELECT * FROM cars WHERE brand = "Ford";
|
||||
|
||||
DELETE FROM cars WHERE brand = "Ford";
|
||||
|
||||
SELECT * FROM cars;
|
||||
|
|
|
|||
19
demo-2.sql
19
demo-2.sql
|
|
@ -1,13 +1,9 @@
|
|||
-- Create the people table
|
||||
-- (not using the "users" name so that you can use this file in the same
|
||||
-- DB instance as demo-3.sql)
|
||||
|
||||
CREATE TABLE people (id UUID PRIMARY KEY, first_name STRING, surname STRING);
|
||||
|
||||
-- Insert entries into people
|
||||
|
||||
SELECT * FROM people;
|
||||
|
||||
INSERT INTO people (id, first_name, surname) VALUES (u1, "Ranice", "Hardman");
|
||||
INSERT INTO people (id, first_name, surname) VALUES (u2, "Amara", "Fieldsend");
|
||||
INSERT INTO people (id, first_name, surname) VALUES (u3, "Stillmann", "Metzing");
|
||||
|
|
@ -1009,22 +1005,7 @@ INSERT INTO people (id, first_name, surname) VALUES (u998, "James", "Dinning");
|
|||
INSERT INTO people (id, first_name, surname) VALUES (u999, "Alvina", "Varcoe");
|
||||
INSERT INTO people (id, first_name, surname) VALUES (u1000, "James", "Postance");
|
||||
|
||||
|
||||
INSERT INTO people (id, first_name, surname) VALUES (u1001, "James", "Postance");
|
||||
|
||||
-- SELECT commands for people
|
||||
|
||||
SELECT * FROM people;
|
||||
|
||||
INSERT INTO people (id, first_name, surname) VALUES (u1001, "Foo", "Bar");
|
||||
|
||||
SELECT * FROM people WHERE first_name = "James";
|
||||
|
||||
CREATE INDEX PeopleName ON people (first_name);
|
||||
|
||||
SELECT * FROM people WHERE first_name = "James";
|
||||
|
||||
|
||||
DELETE FROM people;
|
||||
|
||||
|
||||
|
|
|
|||
10023
demo-3.sql
10023
demo-3.sql
File diff suppressed because it is too large
Load diff
103
demo.txt
103
demo.txt
|
|
@ -1,103 +0,0 @@
|
|||
|
||||
:kill-pane
|
||||
|
||||
cargo build
|
||||
|
||||
cargo test
|
||||
|
||||
|
||||
rm -rf db_demo
|
||||
|
||||
cargo run --bin server --
|
||||
|
||||
cargo run --bin server -- --folder db_demo
|
||||
|
||||
rlwrap cargo run --bin client --
|
||||
|
||||
|
||||
======================Games Table========================================
|
||||
|
||||
SELECT * FROM games;
|
||||
|
||||
|
||||
REATE TABLE games (id UUID PRIMARY KEY, name STRING, year INT, price NUMBER);
|
||||
|
||||
INSERT INTO games (id, name, year, price) VALUES (u1, "skyrim", 2011, 1024.5);
|
||||
|
||||
INSERT INTO games (id, name, year, price) VALUES (u2, "DOOM 2", 1994, 350.0);
|
||||
|
||||
DELETE FROM games;
|
||||
|
||||
WHERE name = "Christina";
|
||||
|
||||
SELECT * FROM games;
|
||||
|
||||
SELECT * FROM games WHERE id=u2;
|
||||
|
||||
SELECT * FROM games WHERE name="skyrim";
|
||||
|
||||
SELECT name, year, year, year, name FROM games;
|
||||
|
||||
|
||||
SELECT * FROM games WHERE id=u3;
|
||||
|
||||
===================Users Table======================
|
||||
// Optionals
|
||||
|
||||
|
||||
CREATE TABLE users (id UUID PRIMARY KEY, name STRING, surname STRING, email Option(STRING));
|
||||
|
||||
|
||||
INSERT INTO users (id, name, surname, email) VALUES (u1, "Hero", "Protagonist 😊", Some("snow_crash@gmail.com"));
|
||||
|
||||
|
||||
|
||||
INSERT INTO users (id, name, surname, email) VALUES (u26, "Arnold", "schwarzenegger", Some("gettothechoppa@yahoo.com"));
|
||||
|
||||
INSERT INTO users (id, name, surname, email) VALUES (u27, "Arnold", "Vosloo", None);
|
||||
|
||||
|
||||
INSERT INTO users (id, name, surname, email) VALUES (u29, "New", "Guy", None);
|
||||
|
||||
|
||||
"hello" : String
|
||||
|
||||
Some("hello") : Option(String)
|
||||
|
||||
None : Option(String)
|
||||
|
||||
|
||||
SELECT * FROM users;
|
||||
|
||||
|
||||
SELECT * FROM users WHERE email=None;
|
||||
|
||||
SELECT * FROM users WHERE email=Some("gettothechoppa@yahoo.com");
|
||||
|
||||
|
||||
|
||||
DELETE FROM users WHERE id=u1;
|
||||
|
||||
DELETE FROM users WHERE id=u27;
|
||||
|
||||
|
||||
CREATE TABLE users1 (id UUID PRIMARY KEY, name STRING, surname STRING, email Option(Option(STRING)));
|
||||
|
||||
INSERT INTO users1 (id, name, surname, email) VALUES (u1, "Hero", "Protagonist 😊", None);
|
||||
|
||||
INSERT INTO users1 (id, name, surname, email) VALUES (u26, "Arnold", "schwarzenegger", Some(Some("gettothechoppa@yahoo.com")));
|
||||
|
||||
SELECT * FROM users1;
|
||||
|
||||
SELECT * FROM users1 WHERE email=None;
|
||||
|
||||
DELETE FROM users;
|
||||
|
||||
// ==============Indexes, Concurrency, Garbage Collection================
|
||||
|
||||
|
||||
|
||||
CREATE INDEX CarsYear ON cars (year);
|
||||
|
||||
|
||||
|
||||
|
|
@ -14,7 +14,7 @@ bincode = { workspace = true }
|
|||
serde = { workspace = true, features = ["derive", "rc"] }
|
||||
serde_json = "1.0.113"
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "sync"] }
|
||||
tokio = { workspace = true, features = ["macros"] }
|
||||
|
||||
proto = { path = "../proto" }
|
||||
storage_engine = { path = "../storage_engine" }
|
||||
|
|
|
|||
|
|
@ -1,27 +1,25 @@
|
|||
use crate::error::RuntimeError;
|
||||
use crate::internals::row::Row;
|
||||
use crate::operation::{ColumnSelection, Condition, Operation};
|
||||
use crate::response_writer::{CompleteStatus, ResponseWriter};
|
||||
use crate::result::DbResult;
|
||||
use crate::schema::{Column, TableName, TablePosition, TableSchema};
|
||||
use crate::type_system::Value;
|
||||
use crate::error::RuntimeError;
|
||||
use crate::response_writer::{ResponseWriter, CompleteStatus};
|
||||
use crate::internals::row::Row;
|
||||
|
||||
use bimap::BiMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs;
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
use crate::cancellation::Cancellation;
|
||||
use storage_engine::cursor::{ReadCursor, WriteCursor};
|
||||
use storage_engine::cursor_capabilities::index_access::{
|
||||
CursorCanReadIndex, CursorCanWriteToIndex,
|
||||
};
|
||||
use storage_engine::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use storage_engine::segments::entry::Entry;
|
||||
use storage_engine::store::Store;
|
||||
use storage_engine::segments::entry::Entry;
|
||||
use storage_engine::cursor::{ReadCursor, WriteCursor};
|
||||
use storage_engine::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use storage_engine::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex};
|
||||
use crate::cancellation::Cancellation;
|
||||
|
||||
const METADATA_FILE: &'static str = "metadata.json";
|
||||
|
||||
|
|
@ -44,7 +42,7 @@ pub type Tables = Vec<RwLock<Table>>;
|
|||
#[derive(Debug)]
|
||||
pub struct Table {
|
||||
schema: Arc<TableSchema>,
|
||||
store: Store<Value>,
|
||||
store: Store<Value>
|
||||
}
|
||||
|
||||
pub type DbSchema = Vec<(TableName, TablePosition, Arc<TableSchema>)>;
|
||||
|
|
@ -62,10 +60,7 @@ impl Table {
|
|||
|
||||
let number_of_columns = table_schema.number_of_columns();
|
||||
let primary_column = table_schema.primary_column() as storage_engine::store::Column;
|
||||
let store: Store<Value> =
|
||||
Store::new(&path_to_table_folder, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
let store: Store<Value> = Store::new(&path_to_table_folder, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let table = Self {
|
||||
schema: Arc::new(table_schema),
|
||||
|
|
@ -78,9 +73,7 @@ impl Table {
|
|||
let table_folder_name = table_schema.table_name();
|
||||
let path_to_table_folder = db_path.join(table_folder_name);
|
||||
|
||||
let store: Store<Value> = Store::connect(&path_to_table_folder).await.map_err(|e| {
|
||||
RuntimeError::StorageEngineError(table_schema.table_name().to_string(), e)
|
||||
})?;
|
||||
let store: Store<Value> = Store::connect(&path_to_table_folder).await.map_err(|e| RuntimeError::StorageEngineError(table_schema.table_name().to_string(), e))?;
|
||||
|
||||
let table = Self {
|
||||
schema: table_schema,
|
||||
|
|
@ -90,18 +83,12 @@ impl Table {
|
|||
}
|
||||
|
||||
async fn read(&self) -> DbResult<ReadCursor<Value>> {
|
||||
let cursor = self
|
||||
.store
|
||||
.read_cursor()
|
||||
.await
|
||||
.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?;
|
||||
let cursor = self.store.read_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.table_name().to_string(), e))?;
|
||||
Ok(cursor)
|
||||
}
|
||||
|
||||
async fn write(&mut self) -> DbResult<WriteCursor<Value>> {
|
||||
let cursor = self.store.write_cursor().await.map_err(|e| {
|
||||
RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e)
|
||||
})?;
|
||||
let cursor = self.store.write_cursor().await.map_err(|e| RuntimeError::StorageEngineError(self.schema.table_name().to_string(), e))?;
|
||||
Ok(cursor)
|
||||
}
|
||||
|
||||
|
|
@ -141,6 +128,7 @@ impl State {
|
|||
}
|
||||
|
||||
async fn attach_table(&mut self, table: Table) {
|
||||
// TODO: You need to update the global DB SCHEMA!
|
||||
let new_table_position: TablePosition = self.tables.len();
|
||||
self.table_name_position_mapping
|
||||
.insert(table.schema().table_name().clone(), new_table_position);
|
||||
|
|
@ -148,30 +136,16 @@ impl State {
|
|||
self.tables.push(RwLock::new(table));
|
||||
}
|
||||
|
||||
async fn select_all_rows<W, C>(
|
||||
table: &Table,
|
||||
mut cursor: ReadCursor<'_, Value>,
|
||||
response_writer: &mut W,
|
||||
cancellation: &C,
|
||||
column_selection: ColumnSelection,
|
||||
) -> DbResult<usize>
|
||||
where
|
||||
W: ResponseWriter,
|
||||
C: Cancellation,
|
||||
async fn select_all_rows<W, C>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection) -> DbResult<usize>
|
||||
where W: ResponseWriter,
|
||||
C: Cancellation
|
||||
{
|
||||
let mut count = 0;
|
||||
while let Some(entry) = cursor
|
||||
.next_alive()
|
||||
.await
|
||||
.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?
|
||||
{
|
||||
while let Some(entry) = cursor.next_alive().await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))? {
|
||||
count += 1;
|
||||
let row: Row = From::from(entry);
|
||||
let restricted_row = row.restrict_columns(&column_selection);
|
||||
response_writer
|
||||
.write_table_row(&restricted_row)
|
||||
.await
|
||||
.map_err(|e| RuntimeError::AnyhowError(e))?;
|
||||
response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?;
|
||||
|
||||
if cancellation.is_canceled() {
|
||||
break;
|
||||
|
|
@ -181,31 +155,16 @@ impl State {
|
|||
Ok(count)
|
||||
}
|
||||
|
||||
async fn select_eq<W, C>(
|
||||
table: &Table,
|
||||
mut cursor: ReadCursor<'_, Value>,
|
||||
response_writer: &mut W,
|
||||
cancellation: &C,
|
||||
column_selection: ColumnSelection,
|
||||
column: Column,
|
||||
value: Value,
|
||||
) -> DbResult<usize>
|
||||
where
|
||||
W: ResponseWriter,
|
||||
C: Cancellation,
|
||||
async fn select_eq<W, C>(table: &Table, mut cursor: ReadCursor<'_, Value>, response_writer: &mut W, cancellation: &C, column_selection: ColumnSelection, column: Column, value: Value) -> DbResult<usize>
|
||||
where W: ResponseWriter,
|
||||
C: Cancellation
|
||||
{
|
||||
let entries = cursor
|
||||
.select_entries_where_eq(column as storage_engine::store::Column, &value)
|
||||
.await
|
||||
.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
|
||||
let entries = cursor.select_entries_where_eq(column as storage_engine::store::Column, &value).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
|
||||
let count = entries.len();
|
||||
for entry in entries {
|
||||
let row: Row = From::from(entry);
|
||||
let restricted_row = row.restrict_columns(&column_selection);
|
||||
response_writer
|
||||
.write_table_row(&restricted_row)
|
||||
.await
|
||||
.map_err(|e| RuntimeError::AnyhowError(e))?;
|
||||
response_writer.write_table_row(&restricted_row).await.map_err(|e| RuntimeError::AnyhowError(e))?;
|
||||
|
||||
if cancellation.is_canceled() {
|
||||
break;
|
||||
|
|
@ -215,43 +174,28 @@ impl State {
|
|||
Ok(count)
|
||||
}
|
||||
|
||||
async fn delete_all_rows(
|
||||
table_name: String,
|
||||
mut cursor: WriteCursor<'_, Value>,
|
||||
) -> DbResult<usize> {
|
||||
let count = cursor
|
||||
.delete_all_entries(true)
|
||||
.await
|
||||
.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?;
|
||||
async fn delete_all_rows(table_name: String, mut cursor: WriteCursor<'_, Value>) -> DbResult<usize> {
|
||||
let count = cursor.delete_all_entries(true)
|
||||
.await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?;
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
async fn delete_all_eq(
|
||||
table_name: String,
|
||||
mut cursor: WriteCursor<'_, Value>,
|
||||
eq_column: Column,
|
||||
value: Value,
|
||||
) -> DbResult<usize> {
|
||||
let count = cursor
|
||||
.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true)
|
||||
.await
|
||||
.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?;
|
||||
async fn delete_all_eq(table_name: String, mut cursor: WriteCursor<'_, Value>, eq_column: Column, value: Value) -> DbResult<usize> {
|
||||
let count =
|
||||
cursor.delete_entries_where_eq(eq_column as storage_engine::store::Column, &value, true)
|
||||
.await.map_err(|e| RuntimeError::StorageEngineError(table_name, e))?;
|
||||
Ok(count)
|
||||
}
|
||||
}
|
||||
|
||||
impl StateHandler {
|
||||
pub fn is_existing_db(db_path: &PathBuf) -> bool {
|
||||
db_path.exists()
|
||||
&& db_path.is_dir()
|
||||
&& db_path.join(METADATA_FILE).exists()
|
||||
&& db_path.join(METADATA_FILE).is_file()
|
||||
db_path.exists() && db_path.is_dir() &&
|
||||
db_path.join(METADATA_FILE).exists() && db_path.join(METADATA_FILE).is_file()
|
||||
}
|
||||
|
||||
pub async fn new(db_path: PathBuf) -> DbResult<Self> {
|
||||
fs::create_dir(db_path.clone())
|
||||
.await
|
||||
.map_err(|e| RuntimeError::IoError(e))?;
|
||||
fs::create_dir(db_path.clone()).await.map_err(|e| RuntimeError::IoError(e))?;
|
||||
|
||||
let state = Self {
|
||||
db_path,
|
||||
|
|
@ -284,21 +228,14 @@ impl StateHandler {
|
|||
let metadata_file = self.db_path.join(METADATA_FILE);
|
||||
let state = self.state.read().await;
|
||||
let metadata_raw = serde_json::to_string(&*state)?;
|
||||
fs::write(metadata_file, metadata_raw)
|
||||
.await
|
||||
.map_err(|e| RuntimeError::IoError(e))
|
||||
fs::write(metadata_file, metadata_raw).await.map_err(|e| RuntimeError::IoError(e))
|
||||
}
|
||||
|
||||
pub async fn read_state(&self) -> RwLockReadGuard<State> {
|
||||
self.state.read().await
|
||||
}
|
||||
|
||||
pub async fn interpret<W: ResponseWriter, C: Cancellation>(
|
||||
&self,
|
||||
response_writer: &mut W,
|
||||
cancellation: &C,
|
||||
operation: Operation,
|
||||
) -> DbResult<()> {
|
||||
pub async fn interpret<W: ResponseWriter, C: Cancellation>(&self, response_writer: &mut W, cancellation: &C, operation: Operation) -> DbResult<()> {
|
||||
use Operation::*;
|
||||
|
||||
match operation {
|
||||
|
|
@ -308,38 +245,12 @@ impl StateHandler {
|
|||
let table = state.table_at(table_position).await;
|
||||
let cursor = table.read().await?;
|
||||
|
||||
response_writer
|
||||
.write_table_header(&table.schema, &column_selection)
|
||||
.await
|
||||
.map_err(|e| RuntimeError::AnyhowError(e))?;
|
||||
response_writer.write_table_header(&table.schema, &column_selection).await.map_err(|e| RuntimeError::AnyhowError(e))?;
|
||||
let count = match maybe_condition {
|
||||
None => {
|
||||
State::select_all_rows(
|
||||
&table,
|
||||
cursor,
|
||||
response_writer,
|
||||
cancellation,
|
||||
column_selection,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
Some(Condition::Eq(eq_column, value)) => {
|
||||
State::select_eq(
|
||||
&table,
|
||||
cursor,
|
||||
response_writer,
|
||||
cancellation,
|
||||
column_selection,
|
||||
eq_column,
|
||||
value,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
None => State::select_all_rows(&table, cursor, response_writer, cancellation, column_selection).await?,
|
||||
Some(Condition::Eq(eq_column, value)) => State::select_eq(&table, cursor, response_writer, cancellation, column_selection, eq_column, value).await?
|
||||
};
|
||||
response_writer
|
||||
.write_command_complete(CompleteStatus::Select(count))
|
||||
.await
|
||||
.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
response_writer.write_command_complete(CompleteStatus::Select(count)).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
}
|
||||
Insert(table_position, values) => {
|
||||
let state = self.state.read().await;
|
||||
|
|
@ -348,14 +259,9 @@ impl StateHandler {
|
|||
let mut cursor = table.write().await?;
|
||||
|
||||
let entry = Entry::new(values);
|
||||
cursor.insert_entry(entry).await.map_err(|e| {
|
||||
RuntimeError::StorageEngineError(table.table_name().to_string(), e)
|
||||
})?;
|
||||
cursor.insert_entry(entry).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
|
||||
|
||||
response_writer
|
||||
.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 })
|
||||
.await
|
||||
.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
response_writer.write_command_complete(CompleteStatus::Insert { oid: 0, rows: 1 }).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
}
|
||||
Delete(table_position, maybe_condition) => {
|
||||
let state = self.state.read().await;
|
||||
|
|
@ -366,15 +272,10 @@ impl StateHandler {
|
|||
|
||||
let count = match maybe_condition {
|
||||
None => State::delete_all_rows(table_name, cursor).await?,
|
||||
Some(Condition::Eq(eq_column, value)) => {
|
||||
State::delete_all_eq(table_name, cursor, eq_column, value).await?
|
||||
}
|
||||
Some(Condition::Eq(eq_column, value)) => State::delete_all_eq(table_name, cursor, eq_column, value).await?
|
||||
};
|
||||
|
||||
response_writer
|
||||
.write_command_complete(CompleteStatus::Delete(count))
|
||||
.await
|
||||
.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
response_writer.write_command_complete(CompleteStatus::Delete(count)).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
}
|
||||
CreateTable(table_schema) => {
|
||||
{
|
||||
|
|
@ -384,42 +285,34 @@ impl StateHandler {
|
|||
// WARNING: We need to drop the write lock on state unless we want a deadlock.
|
||||
}
|
||||
self.save_metadata().await?;
|
||||
response_writer
|
||||
.write_command_complete(CompleteStatus::CreateTable)
|
||||
.await
|
||||
.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
response_writer.write_command_complete(CompleteStatus::CreateTable).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
}
|
||||
CreateIndex(table_position, column) => {
|
||||
let state = self.state.read().await;
|
||||
|
||||
let mut table = state.table_at_mut(table_position).await;
|
||||
let mut cursor = table.write().await?;
|
||||
cursor
|
||||
.attach_index(column as storage_engine::store::Column)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
RuntimeError::StorageEngineError(table.table_name().to_string(), e)
|
||||
})?;
|
||||
response_writer
|
||||
.write_command_complete(CompleteStatus::CreateIndex)
|
||||
.await
|
||||
.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
cursor.attach_index(column as storage_engine::store::Column).await.map_err(|e| RuntimeError::StorageEngineError(table.table_name().to_string(), e))?;
|
||||
response_writer.write_command_complete(CompleteStatus::CreateIndex).await.map_err(|e| RuntimeError::AnyhowError(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::cancellation::DummyCancellation;
|
||||
use crate::operation::Operation;
|
||||
use crate::response_writer::ResponseWriterStub;
|
||||
use crate::schema::Column;
|
||||
use crate::response_writer::ResponseWriterStub;
|
||||
use crate::type_system::{DbType, IndexableValue, Value};
|
||||
use std::collections::HashSet;
|
||||
use tokio::fs::{File, OpenOptions, DirBuilder};
|
||||
use tokio::fs;
|
||||
use tokio::fs::{DirBuilder, File, OpenOptions};
|
||||
use crate::cancellation::DummyCancellation;
|
||||
|
||||
impl Drop for State {
|
||||
fn drop(&mut self) {
|
||||
|
|
@ -451,24 +344,18 @@ mod tests {
|
|||
let mut response_writer = ResponseWriterStub::new();
|
||||
|
||||
state
|
||||
.interpret(
|
||||
&mut response_writer,
|
||||
&DummyCancellation,
|
||||
Operation::CreateTable(users_schema.clone()),
|
||||
)
|
||||
.await
|
||||
.interpret(&mut response_writer, &DummyCancellation, Operation::CreateTable(users_schema.clone())).await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
println!("==EMPTY SELECT===");
|
||||
let users_position: TablePosition = 0;
|
||||
state
|
||||
.interpret(
|
||||
&mut response_writer,
|
||||
&DummyCancellation,
|
||||
Operation::Select(users_position, users_schema.all_selection(), None),
|
||||
)
|
||||
.await
|
||||
.interpret(&mut response_writer, &DummyCancellation, Operation::Select(
|
||||
users_position,
|
||||
users_schema.all_selection(),
|
||||
None,
|
||||
)).await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
|
|
@ -483,27 +370,27 @@ mod tests {
|
|||
println!("About to insert!");
|
||||
|
||||
state
|
||||
.interpret(
|
||||
&mut response_writer,
|
||||
&DummyCancellation,
|
||||
Operation::Insert(users, vec![id.clone(), name.clone(), age.clone()]),
|
||||
)
|
||||
.await
|
||||
.interpret(&mut response_writer, &DummyCancellation, Operation::Insert(
|
||||
users,
|
||||
vec![id.clone(), name.clone(), age.clone()],
|
||||
)).await
|
||||
.unwrap();
|
||||
}
|
||||
{
|
||||
println!("==SELECT===");
|
||||
let users_position: TablePosition = 0;
|
||||
state
|
||||
.interpret(
|
||||
&mut response_writer,
|
||||
&DummyCancellation,
|
||||
Operation::Select(users_position, users_schema.all_selection(), None),
|
||||
)
|
||||
.await
|
||||
.interpret(&mut response_writer, &DummyCancellation, Operation::Select(
|
||||
users_position,
|
||||
users_schema.all_selection(),
|
||||
None,
|
||||
)).await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// assert!(false);
|
||||
|
||||
// assert!(state.tables.len() == 1);
|
||||
|
|
@ -513,3 +400,4 @@ mod tests {
|
|||
// assert!(table.table_name() == &users);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
pub mod cancellation;
|
||||
mod error;
|
||||
mod internals;
|
||||
pub mod interpreter;
|
||||
|
|
@ -9,3 +8,4 @@ pub mod restricted_row;
|
|||
mod result;
|
||||
pub mod schema;
|
||||
pub mod type_system;
|
||||
pub mod cancellation;
|
||||
|
|
|
|||
|
|
@ -46,7 +46,8 @@ impl ResponseWriterStub {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ResponseWriter for ResponseWriterStub {
|
||||
impl ResponseWriter for ResponseWriterStub
|
||||
{
|
||||
async fn write_table_header(
|
||||
&mut self,
|
||||
table_schema: &TableSchema,
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use crate::operation::ColumnSelection;
|
||||
use crate::schema::{Column, TableSchema};
|
||||
use crate::type_system::Value;
|
||||
use crate::operation::ColumnSelection;
|
||||
use std::ops::Index;
|
||||
use std::slice::SliceIndex;
|
||||
use storage_engine::segments::entry::EntryDetailed;
|
||||
|
|
|
|||
|
|
@ -59,8 +59,9 @@ impl TableSchema {
|
|||
}
|
||||
|
||||
pub fn get_columns(&self) -> Vec<&ColumnName> {
|
||||
let mut columns_in_random_order: Vec<_> =
|
||||
self.column_name_position_mapping.iter().collect();
|
||||
let mut columns_in_random_order: Vec<_> = self.column_name_position_mapping
|
||||
.iter()
|
||||
.collect();
|
||||
columns_in_random_order.sort_by(|&(_, column0), &(_, column1)| column0.cmp(column1));
|
||||
|
||||
let columns: Vec<_> = columns_in_random_order
|
||||
|
|
|
|||
|
|
@ -2,7 +2,9 @@ use crate::error::TypeConversionError;
|
|||
use proto::message::primitive::pgoid::PgOid;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::Ordering;
|
||||
use bincode::{Decode, Encode};
|
||||
// TODO: Private???
|
||||
// use bincode::{Encode, Encoder, EncodeError, Decode, Decoder, DecodeError};
|
||||
use bincode::{Encode, Decode};
|
||||
|
||||
// ==============Types================
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
|
||||
|
|
@ -90,6 +92,7 @@ impl PartialOrd for Value {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: Make column know about indexable types
|
||||
impl Ord for Value {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
match (self, other) {
|
||||
|
|
@ -117,6 +120,18 @@ impl Ord for Value {
|
|||
}
|
||||
}
|
||||
|
||||
// impl Encode for Value {
|
||||
// fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
|
||||
// todo!()
|
||||
// }
|
||||
// }
|
||||
|
||||
// impl Decode for Value {
|
||||
// fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, DecodeError> {
|
||||
// todo!()
|
||||
// }
|
||||
// }
|
||||
|
||||
impl DbType {
|
||||
fn new_n_option(n: usize, inside: DbType) -> DbType {
|
||||
if n == 0 {
|
||||
|
|
|
|||
|
|
@ -1,12 +1,6 @@
|
|||
use crate::syntax::RawQuerySyntax;
|
||||
use minisql::{interpreter2::DbSchema, operation::Operation};
|
||||
use nom::{
|
||||
branch::alt,
|
||||
character::complete::{char, multispace0},
|
||||
multi::many1,
|
||||
sequence::{delimited, terminated},
|
||||
IResult,
|
||||
};
|
||||
use nom::{branch::alt, character::complete::{multispace0, char}, multi::many1, sequence::{delimited, terminated}, IResult};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::{
|
||||
|
|
@ -41,10 +35,7 @@ fn parse_statement(input: &str) -> IResult<&str, RawQuerySyntax> {
|
|||
/// Parse one or more statements
|
||||
#[allow(dead_code)]
|
||||
fn parse_statement1(input: &str) -> IResult<&str, Vec<RawQuerySyntax>> {
|
||||
many1(terminated(
|
||||
parse_statement,
|
||||
delimited(multispace0, char(';'), multispace0),
|
||||
))(input)
|
||||
many1(terminated(parse_statement, delimited(multispace0, char(';'), multispace0)))(input)
|
||||
}
|
||||
|
||||
pub fn parse_and_validate(str_query: String, db_schema: &DbSchema) -> Result<Operation, Error> {
|
||||
|
|
@ -62,9 +53,11 @@ mod test {
|
|||
|
||||
#[test]
|
||||
fn test_parse_two_select() {
|
||||
let (rest, sntx) = parse_statement1("SELECT * FROM users ; SELECT * FROM cities ; ")
|
||||
.expect("should parse");
|
||||
assert_eq!(sntx.len(), 2);
|
||||
let (rest, sntx) = parse_statement1("SELECT * FROM users ; SELECT * FROM cities ; ").expect("should parse");
|
||||
assert_eq!(
|
||||
sntx.len(),
|
||||
2
|
||||
);
|
||||
assert_eq!(rest, "");
|
||||
}
|
||||
|
||||
|
|
@ -75,10 +68,11 @@ mod test {
|
|||
SELECT * FROM users ;
|
||||
INSERT INTO table1 (id, data) VALUES (u4, 30) ;
|
||||
INSERT INTO table1 (id, data) VALUES (u5, 40) ;
|
||||
"#,
|
||||
)
|
||||
.expect("should parse");
|
||||
assert_eq!(sntx.len(), 4);
|
||||
"#).expect("should parse");
|
||||
assert_eq!(
|
||||
sntx.len(),
|
||||
4
|
||||
);
|
||||
assert_eq!(rest, "");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,19 +1,10 @@
|
|||
use minisql::type_system::DbType;
|
||||
use nom::{
|
||||
branch::alt,
|
||||
bytes::complete::{tag, take_while},
|
||||
character::{
|
||||
complete::{alphanumeric1, anychar, char, multispace0, multispace1},
|
||||
is_alphanumeric,
|
||||
},
|
||||
combinator::peek,
|
||||
error::make_error,
|
||||
sequence::{delimited, terminated},
|
||||
IResult, Parser,
|
||||
branch::alt, bytes::complete::{tag, take_while}, character::{complete::{alphanumeric1, anychar, char, multispace0, multispace1}, is_alphanumeric}, combinator::peek, error::make_error, sequence::{delimited, terminated}, IResult, Parser
|
||||
};
|
||||
|
||||
use super::literal::parse_literal;
|
||||
use crate::syntax::Condition;
|
||||
use super::literal::parse_literal;
|
||||
|
||||
pub fn parse_table_name(input: &str) -> IResult<&str, &str> {
|
||||
alt((
|
||||
|
|
@ -25,9 +16,11 @@ pub fn parse_table_name(input: &str) -> IResult<&str, &str> {
|
|||
pub fn parse_identifier(input: &str) -> IResult<&str, &str> {
|
||||
let (_, first) = peek(anychar)(input)?;
|
||||
if first.is_alphabetic() || first == '_' {
|
||||
take_while(|c: char| match c {
|
||||
take_while(|c: char| {
|
||||
match c {
|
||||
'a'..='z' | 'A'..='Z' | '_' | '0'..='9' => true,
|
||||
_ => false,
|
||||
_ => false
|
||||
}
|
||||
})(input)
|
||||
} else {
|
||||
Err(nom::Err::Error(make_error(
|
||||
|
|
@ -42,13 +35,22 @@ pub fn parse_column_name(input: &str) -> IResult<&str, String> {
|
|||
}
|
||||
|
||||
pub fn parse_db_type(input: &str) -> IResult<&str, DbType> {
|
||||
let (input, db_type) = alt((
|
||||
tag("STRING").map(|_| DbType::String),
|
||||
tag("INT").map(|_| DbType::Int),
|
||||
tag("NUMBER").map(|_| DbType::Number),
|
||||
tag("UUID").map(|_| DbType::Uuid),
|
||||
delimited(tag("Option("), parse_db_type, tag(")")).map(|ty| DbType::Option(Box::new(ty))),
|
||||
))(input)?;
|
||||
let (input, db_type) = alt(
|
||||
(
|
||||
tag("STRING")
|
||||
.map(|_| DbType::String),
|
||||
tag("INT")
|
||||
.map(|_| DbType::Int),
|
||||
tag("NUMBER")
|
||||
.map(|_| DbType::Number),
|
||||
tag("UUID")
|
||||
.map(|_| DbType::Uuid),
|
||||
delimited(tag("Option("), parse_db_type, tag(")"))
|
||||
.map(|ty| {
|
||||
DbType::Option(Box::new(ty))
|
||||
})
|
||||
)
|
||||
)(input)?;
|
||||
Ok((input, db_type))
|
||||
}
|
||||
|
||||
|
|
@ -120,7 +122,10 @@ mod tests {
|
|||
parse_identifier("_variable__Test").expect("should parse").1,
|
||||
"_variable__Test"
|
||||
);
|
||||
assert!(matches!(parse_identifier("123_variable__Test"), Err(_)));
|
||||
assert!(matches!(
|
||||
parse_identifier("123_variable__Test"),
|
||||
Err(_)
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -134,12 +139,8 @@ mod tests {
|
|||
#[test]
|
||||
fn test_parse_nested_option_int_type() {
|
||||
assert_eq!(
|
||||
parse_db_type("Option(Option(Option(INT)))")
|
||||
.expect("should parse")
|
||||
.1,
|
||||
DbType::Option(Box::new(DbType::Option(Box::new(DbType::Option(
|
||||
Box::new(DbType::Int)
|
||||
)))))
|
||||
parse_db_type("Option(Option(Option(INT)))").expect("should parse").1,
|
||||
DbType::Option(Box::new(DbType::Option(Box::new(DbType::Option(Box::new(DbType::Int))))))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -77,7 +77,8 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_parse_create_no_quotes_table_name() {
|
||||
parse_create("CREATE TABLE Table1(id UUID PRIMARY KEY,column1 INT)").expect("should parse");
|
||||
parse_create("CREATE TABLE Table1(id UUID PRIMARY KEY,column1 INT)")
|
||||
.expect("should parse");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -90,8 +91,8 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_parse_create() {
|
||||
let (_, create) =
|
||||
parse_create("CREATE TABLE \"Table1\"( id UUID , column1 INT )").expect("should parse");
|
||||
let (_, create) = parse_create("CREATE TABLE \"Table1\"( id UUID , column1 INT )")
|
||||
.expect("should parse");
|
||||
assert!(matches!(create, RawQuerySyntax::CreateTable(_)));
|
||||
match create {
|
||||
RawQuerySyntax::CreateTable(schema) => {
|
||||
|
|
@ -116,9 +117,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_parse_create_option() {
|
||||
let (_, create) = parse_create(
|
||||
"CREATE TABLE games (id UUID PRIMARY KEY, name STRING, year Option(INT), price NUMBER)",
|
||||
)
|
||||
let (_, create) = parse_create("CREATE TABLE games (id UUID PRIMARY KEY, name STRING, year Option(INT), price NUMBER)")
|
||||
.expect("should parse");
|
||||
assert!(matches!(create, RawQuerySyntax::CreateTable(_)));
|
||||
match create {
|
||||
|
|
@ -140,12 +139,16 @@ mod tests {
|
|||
assert_eq!(column1_column.type_, DbType::String);
|
||||
|
||||
let column = schema.get_column(&"year".to_string());
|
||||
let Some(column) = column else { panic!() };
|
||||
let Some(column) = column else {
|
||||
panic!()
|
||||
};
|
||||
assert_eq!(column.column_name, "year".to_string());
|
||||
assert_eq!(column.type_, DbType::Option(Box::new(DbType::Int)));
|
||||
|
||||
let column = schema.get_column(&"price".to_string());
|
||||
let Some(column) = column else { panic!() };
|
||||
let Some(column) = column else {
|
||||
panic!()
|
||||
};
|
||||
assert_eq!(column.column_name, "price".to_string());
|
||||
assert_eq!(column.type_, DbType::Number);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,19 +29,22 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_parse_delete() {
|
||||
let (_, sntx) = parse_delete("DELETE FROM \"T1\" WHERE id = 1").expect("should parse");
|
||||
let (_, sntx) =
|
||||
parse_delete("DELETE FROM \"T1\" WHERE id = 1").expect("should parse");
|
||||
assert!(matches!(sntx, RawQuerySyntax::Delete(_, _)))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_with_spaces() {
|
||||
let (_, sntx) = parse_delete("DELETE FROM T1 WHERE id = 1").expect("should parse");
|
||||
let (_, sntx) =
|
||||
parse_delete("DELETE FROM T1 WHERE id = 1").expect("should parse");
|
||||
assert!(matches!(sntx, RawQuerySyntax::Delete(_, _)))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_none() {
|
||||
let (_, sntx) = parse_delete("DELETE FROM games WHERE year = None").expect("should parse");
|
||||
let (_, sntx) =
|
||||
parse_delete("DELETE FROM games WHERE year = None").expect("should parse");
|
||||
if let RawQuerySyntax::Delete(tname, Some(Condition::Eq(column_name, lit))) = sntx {
|
||||
assert_eq!(tname, "games".to_string());
|
||||
assert_eq!(column_name, "year".to_string());
|
||||
|
|
|
|||
|
|
@ -68,7 +68,10 @@ mod tests {
|
|||
insertion_values,
|
||||
vec![
|
||||
("id".to_string(), Literal::Int(1)),
|
||||
("data".to_string(), Literal::String("Text".to_string()))
|
||||
(
|
||||
"data".to_string(),
|
||||
Literal::String("Text".to_string())
|
||||
)
|
||||
]
|
||||
);
|
||||
}
|
||||
|
|
@ -80,7 +83,8 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_parse_insert_with_spaces() {
|
||||
let sql = "INSERT INTO \"MyTable\" ( id, data ) VALUES ( 1, \"Text\" )";
|
||||
let sql =
|
||||
"INSERT INTO \"MyTable\" ( id, data ) VALUES ( 1, \"Text\" )";
|
||||
let operation = parse_insert(sql).expect("should parse");
|
||||
match operation {
|
||||
("", RawQuerySyntax::Insert(table_name, insertion_values)) => {
|
||||
|
|
@ -89,7 +93,10 @@ mod tests {
|
|||
insertion_values,
|
||||
vec![
|
||||
("id".to_string(), Literal::Int(1)),
|
||||
("data".to_string(), Literal::String("Text".to_string()))
|
||||
(
|
||||
"data".to_string(),
|
||||
Literal::String("Text".to_string())
|
||||
)
|
||||
]
|
||||
);
|
||||
}
|
||||
|
|
@ -110,12 +117,18 @@ mod tests {
|
|||
insertion_values,
|
||||
vec![
|
||||
("id".to_string(), Literal::Uuid(12345)),
|
||||
("name".to_string(), Literal::String("Doom".to_string())),
|
||||
(
|
||||
"name".to_string(),
|
||||
Literal::String("Doom".to_string())
|
||||
),
|
||||
(
|
||||
"year".to_string(),
|
||||
Literal::Some(Box::new(Literal::Int(1993)))
|
||||
),
|
||||
("price".to_string(), Literal::Number(6.5))
|
||||
(
|
||||
"price".to_string(),
|
||||
Literal::Number(6.5)
|
||||
)
|
||||
]
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,12 +1,5 @@
|
|||
use nom::{
|
||||
branch::alt,
|
||||
bytes::complete::tag,
|
||||
character::complete::{char, digit1, none_of, u64},
|
||||
combinator::opt,
|
||||
error::make_error,
|
||||
multi::many0,
|
||||
sequence::{delimited, pair, preceded},
|
||||
IResult, Parser,
|
||||
branch::alt, bytes::complete::tag, character::complete::{char, digit1, none_of, u64}, combinator::opt, error::make_error, multi::many0, sequence::{delimited, pair, preceded}, IResult, Parser
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
|
|
@ -20,13 +13,7 @@ pub enum Literal {
|
|||
}
|
||||
|
||||
pub fn parse_literal(input: &str) -> IResult<&str, Literal> {
|
||||
alt((
|
||||
parse_option,
|
||||
parse_string,
|
||||
parse_number,
|
||||
parse_int,
|
||||
parse_uuid,
|
||||
))(input)
|
||||
alt((parse_option, parse_string, parse_number, parse_int, parse_uuid))(input)
|
||||
}
|
||||
|
||||
pub fn parse_number(input: &str) -> IResult<&str, Literal> {
|
||||
|
|
@ -105,16 +92,16 @@ pub fn parse_string(input: &str) -> IResult<&str, Literal> {
|
|||
}
|
||||
|
||||
pub fn parse_uuid(input: &str) -> IResult<&str, Literal> {
|
||||
let (input, value) =
|
||||
pair(char('u'), u64)(input).map(|(input, (_, v))| (input, Literal::Uuid(v)))?;
|
||||
let (input, value) = pair(char('u'), u64)(input)
|
||||
.map(|(input, (_, v))| (input, Literal::Uuid(v)))?;
|
||||
Ok((input, value))
|
||||
}
|
||||
|
||||
pub fn parse_option(input: &str) -> IResult<&str, Literal> {
|
||||
let (input, inner) = alt((
|
||||
tag("None").map(|_| Literal::None),
|
||||
delimited(tag("Some("), parse_literal, tag(")")).map(|v| Literal::Some(Box::new(v))),
|
||||
))(input)?;
|
||||
let (input, inner) = alt((tag("None")
|
||||
.map(|_| Literal::None), delimited(tag("Some("), parse_literal, tag(")")).map(|v| {
|
||||
Literal::Some(Box::new(v))
|
||||
})))(input)?;
|
||||
Ok((input, inner))
|
||||
}
|
||||
|
||||
|
|
@ -127,15 +114,24 @@ mod tests {
|
|||
fn test_string_parser() {
|
||||
assert_eq!(
|
||||
parse_string(r#""simple""#),
|
||||
Ok(("", Literal::String(String::from("simple"))))
|
||||
Ok((
|
||||
"",
|
||||
Literal::String(String::from("simple"))
|
||||
))
|
||||
);
|
||||
assert_eq!(
|
||||
parse_string(r#""\"\t\r\n\\""#),
|
||||
Ok(("", Literal::String(String::from("\"\t\r\n\\"))))
|
||||
Ok((
|
||||
"",
|
||||
Literal::String(String::from("\"\t\r\n\\"))
|
||||
))
|
||||
);
|
||||
assert_eq!(
|
||||
parse_string(r#""name is \"John\".""#),
|
||||
Ok(("", Literal::String(String::from("name is \"John\"."))))
|
||||
Ok((
|
||||
"",
|
||||
Literal::String(String::from("name is \"John\"."))
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -155,7 +151,9 @@ mod tests {
|
|||
assert_eq!(input, "");
|
||||
assert_eq!(
|
||||
value,
|
||||
Literal::String("abcdefghkjklmnopqrstuvwxyz!@#$%^&*()_+ ".to_string())
|
||||
Literal::String(
|
||||
"abcdefghkjklmnopqrstuvwxyz!@#$%^&*()_+ ".to_string()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -194,12 +192,18 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_parse_int() {
|
||||
assert_eq!(parse_literal("5134616"), Ok(("", Literal::Int(5134616))));
|
||||
assert_eq!(
|
||||
parse_literal("5134616"),
|
||||
Ok(("", Literal::Int(5134616)))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_uuid() {
|
||||
assert_eq!(parse_uuid("u131515"), Ok(("", Literal::Uuid(131515))))
|
||||
assert_eq!(
|
||||
parse_uuid("u131515"),
|
||||
Ok(("", Literal::Uuid(131515)))
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -210,10 +214,7 @@ mod tests {
|
|||
);
|
||||
assert_eq!(
|
||||
parse_option("Some(Some(3))"),
|
||||
Ok((
|
||||
"",
|
||||
Literal::Some(Box::new(Literal::Some(Box::new(Literal::Int(3)))))
|
||||
))
|
||||
Ok(("", Literal::Some(Box::new(Literal::Some(Box::new(Literal::Int(3)))))))
|
||||
);
|
||||
assert_eq!(
|
||||
parse_option("Some(None)"),
|
||||
|
|
|
|||
|
|
@ -42,9 +42,7 @@ pub fn try_parse_column_selection(input: &str) -> IResult<&str, ColumnSelection>
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::parsing::{
|
||||
common::{parse_column_name, parse_table_name},
|
||||
literal::Literal,
|
||||
select::parse_select,
|
||||
common::{parse_column_name, parse_table_name}, literal::Literal, select::parse_select
|
||||
};
|
||||
use crate::syntax::{ColumnSelection, RawQuerySyntax};
|
||||
|
||||
|
|
|
|||
|
|
@ -71,3 +71,4 @@ impl RawTableSchema {
|
|||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -284,10 +284,7 @@ fn validate_condition(
|
|||
)?;
|
||||
let value_type: DbType = type_from_literal_with_type_hint(&value, &expected_type)?;
|
||||
if expected_type.eq(&value_type) {
|
||||
Ok(Some(operation::Condition::Eq(
|
||||
column,
|
||||
literal_to_value(value, &expected_type),
|
||||
)))
|
||||
Ok(Some(operation::Condition::Eq(column, literal_to_value(value, &expected_type))))
|
||||
} else {
|
||||
Err(ValidationError::TypeMismatch {
|
||||
column_name: column_name.to_string(),
|
||||
|
|
@ -342,7 +339,10 @@ where
|
|||
None
|
||||
}
|
||||
|
||||
fn get_table_schema(db_schema: &DbSchema, table_name: &TableName) -> Option<Arc<TableSchema>> {
|
||||
fn get_table_schema(
|
||||
db_schema: &DbSchema,
|
||||
table_name: &TableName,
|
||||
) -> Option<Arc<TableSchema>> {
|
||||
let (_, _, table_schema) = db_schema
|
||||
.iter()
|
||||
.find(|(tname, _, _)| table_name.eq(tname))?;
|
||||
|
|
@ -364,14 +364,11 @@ fn literal_to_value(lit: Literal, hint: &DbType) -> Value {
|
|||
// type we want from the literal
|
||||
panic!()
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn type_from_literal_with_type_hint(
|
||||
lit: &Literal,
|
||||
hint: &DbType,
|
||||
) -> Result<DbType, ValidationError> {
|
||||
fn type_from_literal_with_type_hint(lit: &Literal, hint: &DbType) -> Result<DbType, ValidationError> {
|
||||
Ok(match lit {
|
||||
Literal::Number(_) => DbType::Number,
|
||||
Literal::String(_) => DbType::String,
|
||||
|
|
@ -382,9 +379,7 @@ fn type_from_literal_with_type_hint(
|
|||
if matches!(hint, DbType::Option(_)) {
|
||||
hint.clone()
|
||||
} else {
|
||||
return Err(ValidationError::UnexpectedNoneValue {
|
||||
expected_type: hint.clone(),
|
||||
});
|
||||
return Err(ValidationError::UnexpectedNoneValue { expected_type: hint.clone() })
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
|
|||
|
|
@ -10,4 +10,4 @@ rand = { workspace = true }
|
|||
rand_pcg = "0.3.1"
|
||||
rand_seeder = "0.2.3"
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["io-util", "macros"] }
|
||||
tokio = { workspace = true, features = ["io-util", "macros", "test-util"] }
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ anyhow = { workspace = true }
|
|||
async-trait = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
rand = { workspace = true }
|
||||
tokio = { workspace = true, features = ["io-util", "macros", "net", "rt-multi-thread", "time"] }
|
||||
tokio = { workspace = true, features = ["io-util", "macros", "net", "rt-multi-thread"] }
|
||||
|
||||
minisql = { path = "../minisql" }
|
||||
parser = { path = "../parser" }
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use minisql::cancellation::Cancellation;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use minisql::cancellation::Cancellation;
|
||||
|
||||
pub struct ResetCancelToken {
|
||||
is_canceled: Arc<AtomicBool>,
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ async fn handle_stream(
|
|||
let mut wrapped_writer = ServerProtoWrapper::new(writer, config.get_throttle());
|
||||
let result = match request {
|
||||
Ok(req) => {
|
||||
handle_connection(&mut reader, &mut wrapped_writer, req, state, token).await
|
||||
handle_connection(&mut reader, &mut wrapped_writer, req, state, token, config).await
|
||||
}
|
||||
Err(ServerHandshakeError::IsCancelRequest(cancel)) => {
|
||||
handle_cancellation(cancel.pid, cancel.secret, &tokens).await
|
||||
|
|
@ -141,6 +141,7 @@ async fn handle_connection<R, W>(
|
|||
request: HandshakeRequest,
|
||||
state: SharedDbState,
|
||||
token: ResetCancelToken,
|
||||
config: Arc<Configuration>,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
R: FrontendProtoReader + Send,
|
||||
|
|
@ -156,7 +157,7 @@ where
|
|||
break;
|
||||
}
|
||||
FrontendMessage::Query(data) => {
|
||||
let result = handle_query(writer, &state, data.query.into(), &token).await;
|
||||
let result = handle_query(writer, &state, data.query.into(), &token, &config).await;
|
||||
match result {
|
||||
Ok(_) => {}
|
||||
Err(e) => writer.write_error_message(&e.to_string()).await?,
|
||||
|
|
@ -175,6 +176,7 @@ async fn handle_query<W>(
|
|||
state: &SharedDbState,
|
||||
query: String,
|
||||
token: &ResetCancelToken,
|
||||
config: &Arc<Configuration>,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
W: BackendProtoWriter + ProtoFlush + Send,
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
use bincode::config::{BigEndian, Configuration, Fixint};
|
||||
use bincode;
|
||||
use bincode::{Decode, Encode};
|
||||
use bincode::config::{BigEndian, Configuration, Fixint};
|
||||
use std::mem::size_of;
|
||||
|
||||
const BIN_CONFIG: Configuration<BigEndian, Fixint> = bincode::config::standard()
|
||||
.with_big_endian()
|
||||
.with_fixed_int_encoding();
|
||||
const BIN_CONFIG: Configuration<BigEndian, Fixint> = bincode::config::standard().with_big_endian().with_fixed_int_encoding();
|
||||
|
||||
pub fn encode<T: Encode>(t: &T) -> Result<Vec<u8>, bincode::error::EncodeError> {
|
||||
bincode::encode_to_vec(t, BIN_CONFIG)
|
||||
|
|
@ -13,33 +13,51 @@ pub fn decode<T: Decode>(bytes: &[u8]) -> Result<(T, usize), bincode::error::Dec
|
|||
bincode::decode_from_slice(bytes, BIN_CONFIG)
|
||||
}
|
||||
|
||||
// We don't care about encoding the length here (since it will be used for a row with known column
|
||||
// size)
|
||||
pub fn encode_sequence<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
|
||||
let mut result = vec![];
|
||||
pub fn encode_vector<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
|
||||
let size: usize = ts.len();
|
||||
let mut result = encode(&size)?;
|
||||
for t in ts {
|
||||
result.append(&mut encode(&t)?);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn encode_sequence_with_sizes<T: Encode>(
|
||||
ts: &[T],
|
||||
) -> Result<(Vec<u8>, Vec<usize>), bincode::error::EncodeError> {
|
||||
pub fn decode_vector<T: Decode>(bytes: &[u8]) -> Result<Vec<T>, bincode::error::DecodeError> {
|
||||
let mut offset = size_of::<usize>();
|
||||
let result_len: usize = decode(&bytes[..offset])?.0;
|
||||
let mut result: Vec<T> = Vec::with_capacity(result_len);
|
||||
for _ in 0..result_len {
|
||||
let (x, bytes_consumed) = decode::<T>(&bytes[offset..])?;
|
||||
offset += bytes_consumed;
|
||||
result.push(x);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// We don't care about encoding the length here (since it will be used for a row with known column
|
||||
// size)
|
||||
pub fn encode_sequence<T: Encode>(ts: &[T]) -> Result<Vec<u8>, bincode::error::EncodeError> {
|
||||
let mut result = vec![];
|
||||
for t in ts {
|
||||
result.append(&mut encode(&t)?);
|
||||
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn encode_sequence_with_sizes<T: Encode>(ts: &[T]) -> Result<(Vec<u8>, Vec<usize>), bincode::error::EncodeError> {
|
||||
let mut result_bytes = vec![];
|
||||
let mut sizes = Vec::with_capacity(ts.len());
|
||||
for t in ts {
|
||||
let mut bytes = encode(&t)?;
|
||||
sizes.push(bytes.len());
|
||||
result_bytes.append(&mut bytes);
|
||||
|
||||
}
|
||||
Ok((result_bytes, sizes))
|
||||
}
|
||||
|
||||
pub fn decode_sequence<T: Decode>(
|
||||
len: usize,
|
||||
bytes: &[u8],
|
||||
) -> Result<Vec<T>, bincode::error::DecodeError> {
|
||||
pub fn decode_sequence<T: Decode>(len: usize, bytes: &[u8]) -> Result<Vec<T>, bincode::error::DecodeError> {
|
||||
let mut result: Vec<T> = Vec::with_capacity(len);
|
||||
let mut offset = 0;
|
||||
for _ in 0..len {
|
||||
|
|
@ -49,3 +67,21 @@ pub fn decode_sequence<T: Decode>(
|
|||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_encoding_decoding() {
|
||||
let xs: Vec<String> = vec!["foo".to_string(), "bar".to_string()];
|
||||
|
||||
let exs = encode_vector(&xs[..]).unwrap();
|
||||
|
||||
// WARNING: Don't forget to specify the type here
|
||||
let dxs = decode_vector::<String>(&exs[..]).unwrap();
|
||||
|
||||
assert!(dxs == xs);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,24 +1,21 @@
|
|||
use std::collections::{BTreeMap, HashSet};
|
||||
use std::marker::PhantomData;
|
||||
use std::path::Path;
|
||||
use tokio::fs;
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
use tokio::fs;
|
||||
use std::path::Path;
|
||||
use std::marker::PhantomData;
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
|
||||
use bincode;
|
||||
use bincode::{Decode, Encode};
|
||||
|
||||
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
|
||||
use crate::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex};
|
||||
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::index::Index;
|
||||
use crate::segments::entry::EntryDetailed;
|
||||
use crate::segments::entry_header::EntryHeader;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::store::{
|
||||
Column, FilePosition, Result, Store, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME,
|
||||
ROWS_FILE_NAME,
|
||||
};
|
||||
use crate::store::{Store, FilePosition, Column, Result, ROWS_FILE_NAME, GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME};
|
||||
use crate::index::Index;
|
||||
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
|
||||
use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex};
|
||||
|
||||
const GARBAGE_COLLECTION_TRIGGER: usize = 100;
|
||||
|
||||
|
|
@ -48,6 +45,7 @@ pub struct AppendOnlyCursor<T> {
|
|||
eof_file_position: FilePosition,
|
||||
}
|
||||
|
||||
|
||||
// ===========Implementations=============
|
||||
// ===primitive capabilities===
|
||||
impl <T>CursorCanRead<T> for ReadCursor<'_, T> {
|
||||
|
|
@ -83,70 +81,55 @@ impl<T> CursorCanRead<T> for AppendOnlyCursor<T> {
|
|||
impl <T>CursorCanWrite<T> for WriteCursor<'_, T> {}
|
||||
impl <T>CursorCanWrite<T> for AppendOnlyCursor<T> {}
|
||||
|
||||
|
||||
// ===capability to access header===
|
||||
impl <T>CursorCanTraverse<T> for ReadCursor<'_, T> {
|
||||
fn header(&self) -> &StoreHeader {
|
||||
&self.header
|
||||
}
|
||||
fn header(&self) -> &StoreHeader { &self.header }
|
||||
}
|
||||
|
||||
impl <T>CursorCanTraverse<T> for WriteCursor<'_, T> {
|
||||
fn header(&self) -> &StoreHeader {
|
||||
self.header
|
||||
}
|
||||
fn header(&self) -> &StoreHeader { &self.header }
|
||||
}
|
||||
|
||||
impl <T>CursorCanTraverse<T> for AppendOnlyCursor<T> {
|
||||
fn header(&self) -> &StoreHeader {
|
||||
&self.header
|
||||
}
|
||||
fn header(&self) -> &StoreHeader { &self.header }
|
||||
}
|
||||
|
||||
impl <T>CursorCanModifyEntries<T> for WriteCursor<'_, T> {
|
||||
fn header_mut(&mut self) -> &mut StoreHeader {
|
||||
self.header
|
||||
}
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition) {
|
||||
self.eof_file_position = new_file_position
|
||||
}
|
||||
fn header_mut(&mut self) -> &mut StoreHeader { self.header }
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
|
||||
}
|
||||
|
||||
impl <T>CursorCanModifyEntries<T> for AppendOnlyCursor<T> {
|
||||
fn header_mut(&mut self) -> &mut StoreHeader {
|
||||
&mut self.header
|
||||
}
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition) {
|
||||
self.eof_file_position = new_file_position
|
||||
}
|
||||
fn header_mut(&mut self) -> &mut StoreHeader { &mut self.header }
|
||||
fn set_eof_file_position(&mut self, new_file_position: FilePosition) { self.eof_file_position = new_file_position }
|
||||
}
|
||||
|
||||
// ===capability to access index===
|
||||
impl <T>CursorCanReadIndex<T> for ReadCursor<'_, T> {
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] {
|
||||
self.indexes
|
||||
}
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
|
||||
}
|
||||
|
||||
impl <T>CursorCanReadIndex<T> for WriteCursor<'_, T> {
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] {
|
||||
self.indexes
|
||||
}
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>] { &self.indexes }
|
||||
}
|
||||
|
||||
impl <T>CursorCanWriteToIndex<T> for WriteCursor<'_, T> {
|
||||
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>] {
|
||||
self.indexes
|
||||
}
|
||||
fn indexes_mut(&mut self) -> &mut [Option<Index<T, FilePosition>>] { self.indexes }
|
||||
}
|
||||
|
||||
|
||||
// ===Specifics===
|
||||
impl <'cursor, T> ReadCursor<'cursor, T> {
|
||||
pub async fn new<'store: 'cursor>(store: &'store Store<T>) -> Result<Self>
|
||||
where
|
||||
T: Send + Sync,
|
||||
where T: Send + Sync
|
||||
{
|
||||
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
|
||||
let file: File = OpenOptions::new().read(true).open(path_to_rows).await?;
|
||||
let file: File =
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.open(path_to_rows)
|
||||
.await?;
|
||||
|
||||
let mut cursor = Self {
|
||||
header: store.header.clone(),
|
||||
|
|
@ -164,14 +147,15 @@ impl<'cursor, T> ReadCursor<'cursor, T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'cursor, T> WriteCursor<'cursor, T> {
|
||||
impl <'cursor, T> WriteCursor<'cursor, T>
|
||||
{
|
||||
// 'store lives atleast as long as 'cursor
|
||||
pub async fn new<'store: 'cursor>(store: &'store mut Store<T>) -> Result<Self>
|
||||
where
|
||||
T: Send,
|
||||
where T: Send
|
||||
{
|
||||
let path_to_rows = Path::new(&store.header.table_folder).join(ROWS_FILE_NAME);
|
||||
let file: File = OpenOptions::new()
|
||||
let file: File =
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(path_to_rows)
|
||||
|
|
@ -195,8 +179,7 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
// ===Entry Header Manipulation===
|
||||
// assumes we are at the start of valid entry.
|
||||
async fn set_new_entry_header(&mut self, entry_header: EntryHeader) -> Result<()>
|
||||
where
|
||||
T: Send,
|
||||
where T: Send
|
||||
{
|
||||
let bytes: Vec<u8> = entry_header.encode()?;
|
||||
self.write_bytes(&bytes).await?;
|
||||
|
|
@ -204,13 +187,8 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
}
|
||||
|
||||
// ===Deletion===
|
||||
pub async fn mark_deleted_at(
|
||||
&mut self,
|
||||
file_position: FilePosition,
|
||||
enable_garbage_collector: bool,
|
||||
) -> Result<()>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + Clone + Ord,
|
||||
pub async fn mark_deleted_at(&mut self, file_position: FilePosition, enable_garbage_collector: bool) -> Result<()>
|
||||
where T: Encode + Decode + Ord + Send + Sync + Clone + Ord
|
||||
{
|
||||
self.seek_to(file_position).await?;
|
||||
let mut entry_header = self.read_entry_header().await?;
|
||||
|
|
@ -227,7 +205,9 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
// Update index
|
||||
self.seek_to(file_position).await?;
|
||||
match self.next().await? {
|
||||
Some(entry) => self.delete_entry_values_from_indexes(entry).await?,
|
||||
Some(entry) => {
|
||||
self.delete_entry_values_from_indexes(entry).await?
|
||||
},
|
||||
None => {
|
||||
// SAFETY: We just modified its header, so it must exist.
|
||||
unreachable!()
|
||||
|
|
@ -241,19 +221,12 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
}
|
||||
}
|
||||
|
||||
async fn find_first_eq_bruteforce_and_delete(
|
||||
&mut self,
|
||||
column: Column,
|
||||
t0: &T,
|
||||
enable_garbage_collector: bool,
|
||||
) -> Result<Option<EntryDetailed<T>>>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + Clone,
|
||||
async fn find_first_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T, enable_garbage_collector: bool) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Encode + Decode + Ord + Send + Sync + Clone
|
||||
{
|
||||
let maybe_entry = self.find_first_eq_bruteforce(column, t0).await?;
|
||||
if let Some(entry) = maybe_entry {
|
||||
self.mark_deleted_at(entry.file_position, enable_garbage_collector)
|
||||
.await?;
|
||||
self.mark_deleted_at(entry.file_position, enable_garbage_collector).await?;
|
||||
Ok(Some(entry))
|
||||
} else {
|
||||
Ok(maybe_entry)
|
||||
|
|
@ -262,23 +235,17 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
|
||||
// Doesn't update indexes.
|
||||
async fn find_all_eq_bruteforce_and_delete(&mut self, column: Column, t0: &T) -> Result<usize>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + Clone,
|
||||
where T: Encode + Decode + Ord + Send + Sync + Clone
|
||||
{
|
||||
let mut count = 0;
|
||||
while self
|
||||
.find_first_eq_bruteforce_and_delete(column, t0, false)
|
||||
.await?
|
||||
.is_some()
|
||||
{
|
||||
while let Some(_) = self.find_first_eq_bruteforce_and_delete(column, t0, false).await? {
|
||||
count += 1;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
pub async fn delete_all_entries(&mut self, enable_garbage_collector: bool) -> Result<usize>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + Clone,
|
||||
where T: Encode + Decode + Ord + Send + Sync + Clone
|
||||
{
|
||||
let mut count = 0;
|
||||
while let Some(entry) = self.next_alive().await? {
|
||||
|
|
@ -292,16 +259,11 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
Ok(count)
|
||||
}
|
||||
|
||||
pub async fn delete_entries_where_eq(
|
||||
&mut self,
|
||||
column: Column,
|
||||
value: &T,
|
||||
enable_garbage_collector: bool,
|
||||
) -> Result<usize>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + Clone,
|
||||
pub async fn delete_entries_where_eq(&mut self, column: Column, value: &T, enable_garbage_collector: bool) -> Result<usize>
|
||||
where T: Encode + Decode + Ord + Send + Sync + Clone
|
||||
{
|
||||
let count = if self.header().is_column_indexed(column) {
|
||||
let count =
|
||||
if self.header().is_column_indexed(column) {
|
||||
let entries = self.index_lookup(column, value).await?;
|
||||
let count = entries.len();
|
||||
for entry in entries {
|
||||
|
|
@ -309,9 +271,7 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
}
|
||||
count
|
||||
} else {
|
||||
let count = self
|
||||
.find_all_eq_bruteforce_and_delete(column, value)
|
||||
.await?;
|
||||
let count = self.find_all_eq_bruteforce_and_delete(column, value).await?;
|
||||
count
|
||||
};
|
||||
if enable_garbage_collector {
|
||||
|
|
@ -323,11 +283,10 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
// ===Indexing===
|
||||
// WARNING: Assumes the column is NOT indexable.
|
||||
pub async fn attach_index(&mut self, column: Column) -> Result<()>
|
||||
where
|
||||
T: Ord + Decode + Encode + Send + Sync,
|
||||
where T: Ord + Decode + Encode + Send + Sync
|
||||
{
|
||||
// New Index
|
||||
let index = Store::create_empty_index_at(self.header, column).await?;
|
||||
let index = Store::create_empty_index_at(&self.header, column).await?;
|
||||
self.indexes[column as usize] = Some(index);
|
||||
|
||||
// Mark column as indexed
|
||||
|
|
@ -345,8 +304,7 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
|
||||
// ===Garbage Collection===
|
||||
async fn attempt_garbage_collection_if_necessary(&mut self) -> Result<()>
|
||||
where
|
||||
T: Send + Sync + Decode + Encode + Clone + Ord,
|
||||
where T: Send + Sync + Decode + Encode + Clone + Ord
|
||||
{
|
||||
if self.header.deleted_count > GARBAGE_COLLECTION_TRIGGER {
|
||||
println!("=======START GARBAGE COLLECTOR====");
|
||||
|
|
@ -357,15 +315,13 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
}
|
||||
|
||||
pub async fn initiate_garbage_collection(&mut self) -> Result<usize>
|
||||
where
|
||||
T: Send + Sync + Decode + Encode + Clone + Ord,
|
||||
where T: Send + Sync + Decode + Encode + Clone + Ord
|
||||
{
|
||||
let mut cursor_to_intermediate = self.spawn_cursor_to_intermediate_file().await?;
|
||||
// Since garbage collection changes FilePositions of live entries, we need to update the
|
||||
// indexes too.
|
||||
|
||||
let mut in_memory_indexes: Vec<Option<BTreeMap<T, HashSet<FilePosition>>>> =
|
||||
Vec::with_capacity(self.header.number_of_columns);
|
||||
let mut in_memory_indexes: Vec<Option<BTreeMap<T, HashSet<FilePosition>>>> = Vec::with_capacity(self.header.number_of_columns);
|
||||
for column in 0..self.header.number_of_columns {
|
||||
if self.header.is_column_indexed(column as Column) {
|
||||
let in_memory_index = BTreeMap::new();
|
||||
|
|
@ -381,19 +337,12 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
{
|
||||
while let Some(live_entry) = self.next_alive().await? {
|
||||
entries_deleted += 1;
|
||||
let file_position = cursor_to_intermediate
|
||||
.append_entry_no_indexing(&live_entry.forget())
|
||||
.await?;
|
||||
let file_position = cursor_to_intermediate.append_entry_no_indexing(&live_entry.forget()).await?;
|
||||
|
||||
// Update index. (Wouldn't it be nice if we had `for let ...`?)
|
||||
for (maybe_in_memory_index, value) in
|
||||
in_memory_indexes.iter_mut().zip(&live_entry.data)
|
||||
{
|
||||
for (maybe_in_memory_index, value) in in_memory_indexes.iter_mut().zip(&live_entry.data) {
|
||||
if let Some(in_memory_index) = maybe_in_memory_index {
|
||||
in_memory_index
|
||||
.entry(value.clone())
|
||||
.or_insert_with(HashSet::new)
|
||||
.insert(file_position);
|
||||
in_memory_index.entry(value.clone()).or_insert_with(HashSet::new).insert(file_position);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -420,8 +369,7 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
// current file
|
||||
let path_to_table = Path::new(&self.header.table_folder);
|
||||
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||
let path_to_intermediate_rows =
|
||||
path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
|
||||
let path_to_intermediate_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
|
||||
fs::remove_file(path_to_rows.clone()).await?;
|
||||
fs::rename(path_to_intermediate_rows, path_to_rows).await?;
|
||||
|
||||
|
|
@ -429,15 +377,13 @@ impl<'cursor, T> WriteCursor<'cursor, T> {
|
|||
}
|
||||
|
||||
async fn spawn_cursor_to_intermediate_file(&self) -> Result<AppendOnlyCursor<T>>
|
||||
where
|
||||
T: Send,
|
||||
where T: Send
|
||||
{
|
||||
let table_folder = self.header.table_folder.clone();
|
||||
let path_to_table = Path::new(&table_folder);
|
||||
let path_to_rows = path_to_table.join(GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME);
|
||||
|
||||
let intermediate_file: File =
|
||||
Store::<T>::create_empty_rows_file(path_to_rows, self.header).await?;
|
||||
let intermediate_file: File = Store::<T>::create_empty_rows_file(path_to_rows, &self.header).await?;
|
||||
|
||||
let intermediate_header: StoreHeader = StoreHeader {
|
||||
table_folder,
|
||||
|
|
|
|||
|
|
@ -1,14 +1,14 @@
|
|||
use async_trait::async_trait;
|
||||
|
||||
use crate::binary_coding::encode;
|
||||
use bincode;
|
||||
use bincode::Encode;
|
||||
use crate::binary_coding::encode;
|
||||
|
||||
use crate::cursor_capabilities::primitive::CursorCanWrite;
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::segments::entry::Entry;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::store::{FilePosition, Result};
|
||||
use crate::cursor_capabilities::primitive::{CursorCanRead, CursorCanWrite};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
|
||||
|
|
@ -17,8 +17,7 @@ pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
|
|||
|
||||
// ===Store Header Manipulation===
|
||||
async fn increment_total_count(&mut self) -> Result<()>
|
||||
where
|
||||
T: Send,
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
self.seek_to(StoreHeader::TOTAL_COUNT_OFFSET as u64).await?;
|
||||
|
|
@ -28,20 +27,17 @@ pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
|
|||
}
|
||||
|
||||
async fn increment_deleted_count(&mut self) -> Result<()>
|
||||
where
|
||||
T: Send,
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64)
|
||||
.await?;
|
||||
self.seek_to(StoreHeader::DELETED_COUNT_OFFSET as u64).await?;
|
||||
let new_count = self.header_mut().increment_deleted_count();
|
||||
self.write_bytes(&encode::<usize>(&new_count)?).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn set_header(&mut self, header: &StoreHeader) -> Result<()>
|
||||
where
|
||||
T: Send,
|
||||
where T: Send
|
||||
{
|
||||
self.seek_to_start().await?;
|
||||
let encoded_header: Vec<u8> = header.encode()?;
|
||||
|
|
@ -55,8 +51,7 @@ pub trait CursorCanModifyEntries<T>: CursorCanTraverse<T> + CursorCanWrite<T> {
|
|||
// Moves cursor to the end.
|
||||
// Returns file position to the start of the new entry.
|
||||
async fn append_entry_no_indexing(&mut self, entry: &Entry<T>) -> Result<FilePosition>
|
||||
where
|
||||
T: Encode + Send + Sync,
|
||||
where T: Encode + Send + Sync
|
||||
{
|
||||
self.increment_total_count().await?;
|
||||
|
||||
|
|
|
|||
|
|
@ -5,45 +5,44 @@ use async_trait::async_trait;
|
|||
use bincode;
|
||||
use bincode::{Decode, Encode};
|
||||
|
||||
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::error::Error;
|
||||
use crate::index::Index;
|
||||
use crate::segments::entry::{Entry, EntryDetailed};
|
||||
use crate::store::{Column, FilePosition, Result};
|
||||
use crate::store::{FilePosition, Column, Result};
|
||||
use crate::index::Index;
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::cursor_capabilities::entry_modification::CursorCanModifyEntries;
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanReadIndex<T>: CursorCanTraverse<T> {
|
||||
fn indexes(&mut self) -> &[Option<Index<T, FilePosition>>];
|
||||
|
||||
async fn index_lookup(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync,
|
||||
where T: Encode + Decode + Ord + Send + Sync
|
||||
{
|
||||
match &self.indexes()[column as usize] {
|
||||
Some(index) => {
|
||||
let file_positions = index.lookup(value).await?.unwrap_or_else(HashSet::new);
|
||||
let file_positions = index.lookup(value).await?.unwrap_or_else(|| HashSet::new());
|
||||
let mut entries: Vec<EntryDetailed<T>> = vec![];
|
||||
for &file_position in file_positions.iter() {
|
||||
match self.read_entry_at(file_position).await? {
|
||||
Some(entry) => entries.push(entry),
|
||||
None => return Err(Error::IndexIsStoringEofFilePosition(column)),
|
||||
Some(entry) => {
|
||||
entries.push(entry)
|
||||
},
|
||||
None => {
|
||||
return Err(Error::IndexIsStoringEofFilePosition(column))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
None => Err(Error::AttemptToIndexNonIndexableColumn(column)),
|
||||
},
|
||||
None =>
|
||||
Err(Error::AttemptToIndexNonIndexableColumn(column))
|
||||
}
|
||||
}
|
||||
|
||||
async fn select_entries_where_eq(
|
||||
&mut self,
|
||||
column: Column,
|
||||
value: &T,
|
||||
) -> Result<Vec<EntryDetailed<T>>>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync,
|
||||
async fn select_entries_where_eq(&mut self, column: Column, value: &T) -> Result<Vec<EntryDetailed<T>>>
|
||||
where T: Encode + Decode + Ord + Send + Sync
|
||||
{
|
||||
if self.header().is_column_indexed(column) {
|
||||
self.index_lookup(column, value).await
|
||||
|
|
@ -60,7 +59,9 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
|
|||
// Assumes that the column is indexable.
|
||||
fn mut_index_at(&mut self, column: Column) -> &mut Index<T, FilePosition> {
|
||||
match &mut self.indexes_mut()[column as usize] {
|
||||
Some(index) => index,
|
||||
Some(index) => {
|
||||
index
|
||||
},
|
||||
None => {
|
||||
unreachable!()
|
||||
}
|
||||
|
|
@ -68,14 +69,8 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
|
|||
}
|
||||
|
||||
// Assumes that the column is indexable.
|
||||
async fn insert_into_index(
|
||||
&mut self,
|
||||
column: Column,
|
||||
value: T,
|
||||
file_position: FilePosition,
|
||||
) -> Result<()>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + 'async_trait,
|
||||
async fn insert_into_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()>
|
||||
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||
{
|
||||
let index = self.mut_index_at(column as Column);
|
||||
index.insert(value, file_position).await?;
|
||||
|
|
@ -83,14 +78,8 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
|
|||
}
|
||||
|
||||
// Assumes that the column is indexable.
|
||||
async fn delete_from_index(
|
||||
&mut self,
|
||||
column: Column,
|
||||
value: T,
|
||||
file_position: FilePosition,
|
||||
) -> Result<()>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + 'async_trait,
|
||||
async fn delete_from_index(&mut self, column: Column, value: T, file_position: FilePosition) -> Result<()>
|
||||
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||
{
|
||||
let index = self.mut_index_at(column as Column);
|
||||
index.delete(value, file_position).await?;
|
||||
|
|
@ -98,22 +87,15 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
|
|||
}
|
||||
|
||||
async fn insert_entry(&mut self, entry: Entry<T>) -> Result<FilePosition>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + 'async_trait,
|
||||
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||
{
|
||||
let file_position = self.append_entry_no_indexing(&entry).await?;
|
||||
|
||||
// insert the indexable values of the entry into corresponding indexes.
|
||||
for (column, (value, should_index)) in entry
|
||||
.data
|
||||
.into_iter()
|
||||
.zip(self.header().indexed_columns.clone())
|
||||
.enumerate()
|
||||
{
|
||||
for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() {
|
||||
if should_index {
|
||||
// SAFETY: If should_index is true, then the column is indexable.
|
||||
self.insert_into_index(column as Column, value, file_position)
|
||||
.await?
|
||||
self.insert_into_index(column as Column, value, file_position).await?
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -121,19 +103,12 @@ pub trait CursorCanWriteToIndex<T>: CursorCanReadIndex<T> + CursorCanModifyEntri
|
|||
}
|
||||
|
||||
async fn delete_entry_values_from_indexes(&mut self, entry: EntryDetailed<T>) -> Result<()>
|
||||
where
|
||||
T: Encode + Decode + Ord + Send + Sync + 'async_trait,
|
||||
{
|
||||
for (column, (value, should_index)) in entry
|
||||
.data
|
||||
.into_iter()
|
||||
.zip(self.header().indexed_columns.clone())
|
||||
.enumerate()
|
||||
where T: Encode + Decode + Ord + Send + Sync + 'async_trait
|
||||
{
|
||||
for (column, (value, should_index)) in entry.data.into_iter().zip(self.header().indexed_columns.clone()).enumerate() {
|
||||
if should_index {
|
||||
// SAFETY: If should_index is true, then the column is indexable.
|
||||
self.delete_from_index(column as Column, value, entry.file_position)
|
||||
.await?
|
||||
self.delete_from_index(column as Column, value, entry.file_position).await?
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
pub mod entry_modification;
|
||||
pub mod index_access;
|
||||
pub(crate) mod primitive;
|
||||
pub mod traversal;
|
||||
pub mod entry_modification;
|
||||
pub mod index_access;
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
use async_trait::async_trait;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, SeekFrom};
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::store::{FilePosition, Result};
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanRead<T> {
|
||||
pub(crate) trait CursorCanRead<T> {
|
||||
fn file(&mut self) -> &mut File;
|
||||
fn eof_file_position(&self) -> FilePosition;
|
||||
|
||||
|
|
@ -55,7 +55,7 @@ pub trait CursorCanRead<T> {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanWrite<T>: CursorCanRead<T> {
|
||||
pub(crate) trait CursorCanWrite<T>: CursorCanRead<T> {
|
||||
async fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize> {
|
||||
Ok(self.file().write(bytes).await?)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,24 +1,24 @@
|
|||
use async_trait::async_trait;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::binary_coding::decode;
|
||||
use bincode;
|
||||
use bincode::Decode;
|
||||
use crate::binary_coding::decode;
|
||||
|
||||
use crate::cursor_capabilities::primitive::CursorCanRead;
|
||||
use crate::error::{DecodeErrorKind, Error};
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use crate::segments::entry::EntryDetailed;
|
||||
use crate::segments::entry_header::EntryHeaderWithDataSize;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::store::{Column, FilePosition, Result};
|
||||
use crate::store::{FilePosition, Column, Result};
|
||||
use crate::cursor_capabilities::primitive::CursorCanRead;
|
||||
|
||||
|
||||
#[async_trait]
|
||||
pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
||||
fn header(&self) -> &StoreHeader;
|
||||
|
||||
async fn seek_to_start_of_data(&mut self) -> Result<FilePosition> {
|
||||
self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64)
|
||||
.await
|
||||
self.seek_to(StoreHeader::size(self.header().number_of_columns) as u64).await
|
||||
}
|
||||
|
||||
async fn read_entry_header(&mut self) -> Result<EntryHeaderWithDataSize> {
|
||||
|
|
@ -30,21 +30,14 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
|||
Ok(header)
|
||||
}
|
||||
|
||||
async fn read_entry_header_at(
|
||||
&mut self,
|
||||
file_position: FilePosition,
|
||||
) -> Result<EntryHeaderWithDataSize> {
|
||||
async fn read_entry_header_at(&mut self, file_position: FilePosition) -> Result<EntryHeaderWithDataSize> {
|
||||
self.seek_to(file_position).await?;
|
||||
self.read_entry_header().await
|
||||
}
|
||||
|
||||
// Returns None when file_position == eof_file_position
|
||||
async fn read_entry_at(
|
||||
&mut self,
|
||||
file_position: FilePosition,
|
||||
) -> Result<Option<EntryDetailed<T>>>
|
||||
where
|
||||
T: Decode,
|
||||
async fn read_entry_at(&mut self, file_position: FilePosition) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode
|
||||
{
|
||||
self.seek_to(file_position).await?;
|
||||
self.next().await
|
||||
|
|
@ -53,11 +46,12 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
|||
// ===Iteration===
|
||||
// The following functions assume that the current file position is at a valid entry or EOF.
|
||||
|
||||
|
||||
// WARNING: This moves the file_position to start of the data, so you can't just call
|
||||
// next_entry_header() a bunch of times. You must move the cursor!
|
||||
async fn next_entry_header(&mut self) -> Result<Option<EntryHeaderWithDataSize>> {
|
||||
if self.is_at_eof().await? {
|
||||
return Ok(None);
|
||||
return Ok(None)
|
||||
}
|
||||
|
||||
let entry_header = self.read_entry_header().await?;
|
||||
|
|
@ -66,47 +60,31 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
|||
}
|
||||
|
||||
// This is meant to be used after next_entry_header() is called.
|
||||
async fn jump_from_start_of_entry_data_to_next_entry(
|
||||
&mut self,
|
||||
entry_header: &EntryHeaderWithDataSize,
|
||||
) -> Result<FilePosition> {
|
||||
async fn jump_from_start_of_entry_data_to_next_entry(&mut self, entry_header: &EntryHeaderWithDataSize) -> Result<FilePosition>{
|
||||
let file_position = self.seek_by(entry_header.size_of_data() as i64).await?;
|
||||
Ok(file_position)
|
||||
}
|
||||
|
||||
async fn next(&mut self) -> Result<Option<EntryDetailed<T>>>
|
||||
where
|
||||
T: Decode,
|
||||
where T: Decode
|
||||
{
|
||||
let file_position = self.current_file_position().await?;
|
||||
let Some(entry_header) = self.next_entry_header().await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
|
||||
|
||||
let mut data_bytes: Vec<u8> = vec![0; entry_header.size_of_data()];
|
||||
self.read_bytes(&mut data_bytes).await?;
|
||||
let entry: EntryDetailed<T> = EntryDetailed::decode(
|
||||
entry_header,
|
||||
file_position,
|
||||
self.header().number_of_columns,
|
||||
&data_bytes,
|
||||
)?;
|
||||
let entry: EntryDetailed<T> =
|
||||
EntryDetailed::decode(entry_header, file_position, self.header().number_of_columns, &mut data_bytes)?;
|
||||
|
||||
Ok(Some(entry))
|
||||
}
|
||||
|
||||
// Like next, but only reads the column, not the whole entry.
|
||||
async fn next_at_column(
|
||||
&mut self,
|
||||
column: Column,
|
||||
) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
|
||||
where
|
||||
T: Decode + Send,
|
||||
async fn next_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
|
||||
where T: Decode + Send
|
||||
{
|
||||
let file_position = self.current_file_position().await?;
|
||||
let Some(entry_header) = self.next_entry_header().await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
let Some(entry_header) = self.next_entry_header().await? else { return Ok(None) };
|
||||
let file_position_at_start_of_data = self.current_file_position().await?;
|
||||
|
||||
// figuring out how much to decode
|
||||
|
|
@ -116,59 +94,49 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
|||
// reading and decoding
|
||||
let mut bytes: Vec<u8> = vec![0; entry_header.data_sizes[column as usize]];
|
||||
self.read_bytes(&mut bytes).await?;
|
||||
let (value, _) = decode::<T>(&bytes[..])
|
||||
let (value, _) =
|
||||
decode::<T>(&bytes[..])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
|
||||
|
||||
// jumping to next entry
|
||||
self.seek_to(file_position_at_start_of_data).await?;
|
||||
self.jump_from_start_of_entry_data_to_next_entry(&entry_header)
|
||||
.await?;
|
||||
self.jump_from_start_of_entry_data_to_next_entry(&entry_header).await?;
|
||||
|
||||
Ok(Some((entry_header, file_position, value)))
|
||||
}
|
||||
|
||||
async fn next_alive_at_column(
|
||||
&mut self,
|
||||
column: Column,
|
||||
) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
|
||||
where
|
||||
T: Decode + Send,
|
||||
async fn next_alive_at_column(&mut self, column: Column) -> Result<Option<(EntryHeaderWithDataSize, FilePosition, T)>>
|
||||
where T: Decode + Send
|
||||
{
|
||||
while let Some((header, file_position, t)) = self.next_at_column(column).await? {
|
||||
if !header.is_deleted {
|
||||
return Ok(Some((header, file_position, t)));
|
||||
return Ok(Some((header, file_position, t)))
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn next_alive(&mut self) -> Result<Option<EntryDetailed<T>>>
|
||||
where
|
||||
T: Decode,
|
||||
where T: Decode
|
||||
{
|
||||
while let Some(entry) = self.next().await? {
|
||||
if !entry.header.is_deleted {
|
||||
return Ok(Some(entry));
|
||||
return Ok(Some(entry))
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
// ===Search===
|
||||
async fn find_first_eq_bruteforce(
|
||||
&mut self,
|
||||
column: Column,
|
||||
t0: &T,
|
||||
) -> Result<Option<EntryDetailed<T>>>
|
||||
where
|
||||
T: Decode + PartialEq + Send + Sync,
|
||||
async fn find_first_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Option<EntryDetailed<T>>>
|
||||
where T: Decode + PartialEq + Send + Sync
|
||||
{
|
||||
let mut file_position = self.current_file_position().await?;
|
||||
while let Some((_, _, t)) = self.next_alive_at_column(column).await? {
|
||||
if &t == t0 {
|
||||
// go back and decode the whole entry
|
||||
self.seek_to(file_position).await?;
|
||||
return self.next().await;
|
||||
return self.next().await
|
||||
} else {
|
||||
file_position = self.current_file_position().await?;
|
||||
}
|
||||
|
|
@ -176,13 +144,8 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
async fn find_all_eq_bruteforce(
|
||||
&mut self,
|
||||
column: Column,
|
||||
t0: &T,
|
||||
) -> Result<Vec<EntryDetailed<T>>>
|
||||
where
|
||||
T: Decode + PartialEq + Send + Sync,
|
||||
async fn find_all_eq_bruteforce(&mut self, column: Column, t0: &T) -> Result<Vec<EntryDetailed<T>>>
|
||||
where T: Decode + PartialEq + Send + Sync
|
||||
{
|
||||
let mut entries = vec![];
|
||||
while let Some(entry) = self.find_first_eq_bruteforce(column, t0).await? {
|
||||
|
|
@ -193,8 +156,7 @@ pub trait CursorCanTraverse<T>: CursorCanRead<T> {
|
|||
|
||||
// ===Debugging===
|
||||
async fn read_entries(&mut self) -> Result<()>
|
||||
where
|
||||
T: Decode + std::fmt::Debug,
|
||||
where T: Decode + std::fmt::Debug
|
||||
{
|
||||
self.seek_to_start_of_data().await?;
|
||||
while let Some(entry) = self.next().await? {
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ use std::hash::Hash;
|
|||
use std::io::SeekFrom;
|
||||
|
||||
use crate::binary_coding::{decode, encode};
|
||||
use bincode;
|
||||
use bincode::{Decode, Encode};
|
||||
|
||||
use crate::error::{DecodeErrorKind, Error};
|
||||
|
|
@ -62,7 +63,7 @@ where
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert_desynced(&mut self, k: K, v: V) {
|
||||
pub fn insert_desynced(&mut self, k: K, v: V) -> () {
|
||||
self.data.entry(k).or_insert_with(HashSet::new).insert(v);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
pub mod store;
|
||||
mod binary_coding;
|
||||
pub mod cursor;
|
||||
pub mod cursor_capabilities;
|
||||
pub mod error;
|
||||
mod index;
|
||||
pub mod cursor;
|
||||
pub mod segments;
|
||||
pub mod store;
|
||||
pub mod cursor_capabilities;
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
use bincode::{Decode, Encode};
|
||||
|
||||
use crate::binary_coding::{decode_sequence, encode_sequence, encode_sequence_with_sizes};
|
||||
use crate::error::{DecodeErrorKind, Error};
|
||||
use crate::binary_coding::{encode_sequence, encode_sequence_with_sizes, decode_sequence};
|
||||
use crate::store::{Result, FilePosition};
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use crate::segments::entry_header::{EntryHeader, EntryHeaderWithDataSize};
|
||||
use crate::store::{FilePosition, Result};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Entry<T> {
|
||||
|
|
@ -20,16 +20,12 @@ pub struct EntryDetailed<T> {
|
|||
|
||||
impl <T>Entry<T> {
|
||||
pub fn new(data: Vec<T>) -> Self {
|
||||
Self {
|
||||
header: EntryHeader { is_deleted: false },
|
||||
data,
|
||||
}
|
||||
Self { header: EntryHeader { is_deleted: false }, data }
|
||||
}
|
||||
|
||||
// FORMAT: [EntryHeaderWithDataSize, ..sequence of data]
|
||||
pub fn encode(&self) -> Result<Vec<u8>>
|
||||
where
|
||||
T: Encode,
|
||||
where T: Encode
|
||||
{
|
||||
let mut result: Vec<u8> = self.header.encode()?;
|
||||
|
||||
|
|
@ -41,27 +37,16 @@ impl<T> Entry<T> {
|
|||
}
|
||||
|
||||
impl <T>EntryDetailed<T> {
|
||||
pub fn decode(
|
||||
header: EntryHeaderWithDataSize,
|
||||
file_position: FilePosition,
|
||||
number_of_columns: usize,
|
||||
bytes: &[u8],
|
||||
) -> Result<Self>
|
||||
where
|
||||
T: Decode,
|
||||
pub fn decode(header: EntryHeaderWithDataSize, file_position: FilePosition, number_of_columns: usize, bytes: &[u8]) -> Result<Self>
|
||||
where T: Decode
|
||||
{
|
||||
let data = decode_sequence::<T>(number_of_columns, bytes)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryData, e))?;
|
||||
Ok(EntryDetailed {
|
||||
header,
|
||||
file_position,
|
||||
data,
|
||||
})
|
||||
Ok(EntryDetailed { header, file_position, data })
|
||||
}
|
||||
|
||||
pub fn forget(&self) -> Entry<T>
|
||||
where
|
||||
T: Clone,
|
||||
where T: Clone
|
||||
{
|
||||
Entry {
|
||||
header: self.header.clone().into(),
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use crate::binary_coding::{decode, decode_sequence, encode};
|
||||
use crate::error::{DecodeErrorKind, Error};
|
||||
use crate::store::{Column, Result};
|
||||
use crate::binary_coding::{decode, encode, decode_sequence};
|
||||
use crate::store::{Result, Column};
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use std::mem::size_of;
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
@ -24,9 +24,7 @@ impl EntryHeader {
|
|||
|
||||
impl From<EntryHeaderWithDataSize> for EntryHeader {
|
||||
fn from(entry: EntryHeaderWithDataSize) -> Self {
|
||||
Self {
|
||||
is_deleted: entry.is_deleted,
|
||||
}
|
||||
Self { is_deleted: entry.is_deleted, }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -50,23 +48,21 @@ impl EntryHeaderWithDataSize {
|
|||
if i < column as usize {
|
||||
sum += size;
|
||||
} else {
|
||||
break;
|
||||
break
|
||||
}
|
||||
}
|
||||
sum
|
||||
}
|
||||
|
||||
pub fn decode(bytes: &mut [u8], number_of_columns: usize) -> Result<Self> {
|
||||
let (is_deleted, _) = decode::<bool>(bytes)
|
||||
let (is_deleted, _) =
|
||||
decode::<bool>(&bytes)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryIsDeleted, e))?;
|
||||
|
||||
let data_sizes =
|
||||
decode_sequence::<usize>(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..])
|
||||
let data_sizes = decode_sequence::<usize>(number_of_columns, &bytes[Self::DATA_SIZES_OFFSET..])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::EntryHeaderWithDataSizes, e))?;
|
||||
|
||||
Ok(Self {
|
||||
is_deleted,
|
||||
data_sizes,
|
||||
})
|
||||
Ok(Self { is_deleted, data_sizes } )
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use crate::binary_coding::{decode, decode_sequence, encode, encode_sequence};
|
||||
use crate::error::{DecodeErrorKind, Error};
|
||||
use crate::store::{Column, Result};
|
||||
use crate::binary_coding::{encode, encode_sequence, decode, decode_sequence};
|
||||
use crate::store::{Result, Column};
|
||||
use crate::error::{Error, DecodeErrorKind};
|
||||
use std::mem::size_of;
|
||||
use std::path::PathBuf;
|
||||
|
||||
|
|
@ -30,19 +30,14 @@ impl StoreHeader {
|
|||
pub const DELETED_COUNT_SIZE: usize = size_of::<usize>();
|
||||
pub const TOTAL_COUNT_SIZE: usize = size_of::<usize>();
|
||||
pub const PRIMARY_COLUMN_SIZE: usize = size_of::<Column>();
|
||||
pub const FIXED_SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE
|
||||
+ Self::DELETED_COUNT_SIZE
|
||||
+ Self::TOTAL_COUNT_SIZE
|
||||
+ Self::PRIMARY_COLUMN_SIZE;
|
||||
pub const FIXED_SIZE: usize = Self::NUMBER_OF_COLUMNS_SIZE + Self::DELETED_COUNT_SIZE + Self::TOTAL_COUNT_SIZE + Self::PRIMARY_COLUMN_SIZE;
|
||||
|
||||
pub const NUMBER_OF_COLUMNS_OFFSET: usize = 0;
|
||||
pub const DELETED_COUNT_OFFSET: usize =
|
||||
Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE;
|
||||
pub const DELETED_COUNT_OFFSET: usize = Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE;
|
||||
pub const TOTAL_COUNT_OFFSET: usize = Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE;
|
||||
pub const PRIMARY_COLUMN_OFFSET: usize = Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE;
|
||||
#[allow(dead_code)]
|
||||
pub const INDEXED_COLUMNS_OFFSET: usize =
|
||||
Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE;
|
||||
pub const INDEXED_COLUMNS_OFFSET: usize = Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE;
|
||||
|
||||
fn indexed_columns_size(number_of_columns: usize) -> usize {
|
||||
size_of::<bool>() * number_of_columns
|
||||
|
|
@ -69,28 +64,18 @@ impl StoreHeader {
|
|||
vec![0; Self::indexed_columns_size(header.number_of_columns)]
|
||||
}
|
||||
|
||||
pub async fn decode_fixed(
|
||||
table_folder: &PathBuf,
|
||||
result: &[u8],
|
||||
) -> Result<StoreHeaderFixedPart> {
|
||||
let (number_of_columns, _) = decode::<usize>(
|
||||
&result[Self::NUMBER_OF_COLUMNS_OFFSET
|
||||
..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE],
|
||||
)
|
||||
pub async fn decode_fixed(table_folder: &PathBuf, result: &[u8]) -> Result<StoreHeaderFixedPart> {
|
||||
let (number_of_columns, _) =
|
||||
decode::<usize>(&result[Self::NUMBER_OF_COLUMNS_OFFSET..Self::NUMBER_OF_COLUMNS_OFFSET + Self::NUMBER_OF_COLUMNS_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderNumberOfColumns, e))?;
|
||||
let (deleted_count, _) = decode::<usize>(
|
||||
&result
|
||||
[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE],
|
||||
)
|
||||
let (deleted_count, _) =
|
||||
decode::<usize>(&result[Self::DELETED_COUNT_OFFSET..Self::DELETED_COUNT_OFFSET + Self::DELETED_COUNT_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderDeletedCount, e))?;
|
||||
let (total_count, _) = decode::<usize>(
|
||||
&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE],
|
||||
)
|
||||
let (total_count, _) =
|
||||
decode::<usize>(&result[Self::TOTAL_COUNT_OFFSET..Self::TOTAL_COUNT_OFFSET + Self::TOTAL_COUNT_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderTotalCount, e))?;
|
||||
let (primary_column, _) = decode::<Column>(
|
||||
&result[Self::PRIMARY_COLUMN_OFFSET
|
||||
..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE],
|
||||
)
|
||||
let (primary_column, _) =
|
||||
decode::<Column>(&result[Self::PRIMARY_COLUMN_OFFSET..Self::PRIMARY_COLUMN_OFFSET + Self::PRIMARY_COLUMN_SIZE])
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderPrimaryColumn, e))?;
|
||||
let header = StoreHeaderFixedPart {
|
||||
table_folder: table_folder.clone(),
|
||||
|
|
@ -104,11 +89,12 @@ impl StoreHeader {
|
|||
}
|
||||
|
||||
pub async fn decode_rest(header: StoreHeaderFixedPart, result: &[u8]) -> Result<StoreHeader> {
|
||||
let indexed_columns: Vec<bool> = decode_sequence::<bool>(header.number_of_columns, result)
|
||||
let indexed_columns: Vec<bool> =
|
||||
decode_sequence::<bool>(header.number_of_columns, result)
|
||||
.map_err(|e| Error::DecodeError(DecodeErrorKind::StoreHeaderIndexedColumns, e))?;
|
||||
|
||||
Ok(StoreHeader {
|
||||
table_folder: header.table_folder,
|
||||
table_folder: header.table_folder.into(),
|
||||
number_of_columns: header.number_of_columns,
|
||||
deleted_count: header.deleted_count,
|
||||
total_count: header.total_count,
|
||||
|
|
@ -118,6 +104,8 @@ impl StoreHeader {
|
|||
})
|
||||
}
|
||||
|
||||
|
||||
|
||||
// returns new count
|
||||
pub fn increment_total_count(&mut self) -> usize {
|
||||
self.total_count += 1;
|
||||
|
|
|
|||
|
|
@ -1,20 +1,22 @@
|
|||
use bincode::{Decode, Encode};
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs;
|
||||
use tokio::fs::{DirBuilder, File, OpenOptions};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::fs::{File, OpenOptions, DirBuilder};
|
||||
use tokio::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use bincode::{Decode, Encode};
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::cursor::{ReadCursor, WriteCursor};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::error::Error;
|
||||
use crate::index::Index;
|
||||
use crate::segments::store_header::StoreHeader;
|
||||
use crate::index::Index;
|
||||
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub type Column = u64;
|
||||
pub type FilePosition = u64;
|
||||
|
||||
// TODO: Consider adding another type parameter for indexable values
|
||||
#[derive(Debug)]
|
||||
pub struct Store<T> {
|
||||
pub header: StoreHeader,
|
||||
|
|
@ -23,26 +25,23 @@ pub struct Store<T> {
|
|||
|
||||
pub type StoreIndexes<T> = Vec<Option<Index<T, FilePosition>>>;
|
||||
|
||||
|
||||
//===Store===
|
||||
pub async fn store_exists(table_folder: &str) -> Result<bool> {
|
||||
Ok(fs::metadata(table_folder).await.is_ok())
|
||||
}
|
||||
|
||||
pub const ROWS_FILE_NAME: &str = "rows";
|
||||
pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &str = "rows_intermediate";
|
||||
pub const ROWS_FILE_NAME: &'static str = "rows";
|
||||
pub const GARBAGE_COLLECTION_INTERMEDIATE_ROWS_FILE_NAME: &'static str = "rows_intermediate";
|
||||
|
||||
impl <T>Store<T> {
|
||||
// ===Creation===
|
||||
pub async fn new(
|
||||
path_to_table: &Path,
|
||||
number_of_columns: usize,
|
||||
primary_column: Column,
|
||||
) -> Result<Self>
|
||||
where
|
||||
T: Encode + Decode + Ord,
|
||||
pub async fn new(path_to_table: &Path, number_of_columns: usize, primary_column: Column) -> Result<Self>
|
||||
where T: Encode + Decode + Ord
|
||||
{
|
||||
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||
DirBuilder::new().create(path_to_table).await?;
|
||||
DirBuilder::new()
|
||||
.create(path_to_table).await?;
|
||||
|
||||
let header = {
|
||||
let mut indexed_columns = vec![false; number_of_columns];
|
||||
|
|
@ -62,63 +61,53 @@ impl<T> Store<T> {
|
|||
|
||||
let indexes: StoreIndexes<T> = Self::create_initial_indexes(&header).await?;
|
||||
|
||||
let store = Self { header, indexes };
|
||||
let store = Self {
|
||||
header,
|
||||
indexes,
|
||||
};
|
||||
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
pub fn path_to_index_file(header: &StoreHeader, column: Column) -> PathBuf {
|
||||
let path_to_table = Path::new(&header.table_folder);
|
||||
let path_to_index =
|
||||
path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column));
|
||||
let path_to_index = path_to_table.join(&format!("{}_{}", ROWS_FILE_NAME, column.to_string()));
|
||||
path_to_index
|
||||
}
|
||||
|
||||
pub async fn create_empty_index_at(
|
||||
header: &StoreHeader,
|
||||
column: Column,
|
||||
) -> Result<Index<T, FilePosition>>
|
||||
where
|
||||
T: Encode + Decode + Ord,
|
||||
pub async fn create_empty_index_at(header: &StoreHeader, column: Column) -> Result<Index<T, FilePosition>>
|
||||
where T: Encode + Decode + Ord
|
||||
{
|
||||
let path_to_index = Self::path_to_index_file(header, column);
|
||||
let path_to_index = Self::path_to_index_file(&header, column);
|
||||
let index = Index::new(path_to_index).await?;
|
||||
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
pub async fn create_initial_indexes(header: &StoreHeader) -> Result<StoreIndexes<T>>
|
||||
where
|
||||
T: Encode + Decode + Ord,
|
||||
where T: Encode + Decode + Ord
|
||||
{
|
||||
let mut result: StoreIndexes<T> = Vec::with_capacity(header.number_of_columns);
|
||||
for _ in 0..header.number_of_columns {
|
||||
result.push(None)
|
||||
}
|
||||
|
||||
result[header.primary_column as usize] =
|
||||
Some(Self::create_empty_index_at(header, header.primary_column).await?);
|
||||
result[header.primary_column as usize] = Some(Self::create_empty_index_at(&header, header.primary_column).await?);
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn connect_index_at(
|
||||
header: &StoreHeader,
|
||||
column: Column,
|
||||
) -> Result<Index<T, FilePosition>>
|
||||
where
|
||||
T: Encode + Decode + Ord,
|
||||
pub async fn connect_index_at(header: &StoreHeader, column: Column) -> Result<Index<T, FilePosition>>
|
||||
where T: Encode + Decode + Ord
|
||||
{
|
||||
let path_to_index = Self::path_to_index_file(header, column);
|
||||
let path_to_index = Self::path_to_index_file(&header, column);
|
||||
let index: Index<T, FilePosition> = Index::connect(path_to_index).await?;
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
pub async fn create_empty_rows_file(
|
||||
path_to_rows: PathBuf,
|
||||
header: &StoreHeader,
|
||||
) -> Result<File> {
|
||||
let mut file: File = OpenOptions::new()
|
||||
pub async fn create_empty_rows_file(path_to_rows: PathBuf, header: &StoreHeader) -> Result<File> {
|
||||
let mut file: File =
|
||||
OpenOptions::new()
|
||||
.write(true)
|
||||
.read(true)
|
||||
.create_new(true)
|
||||
|
|
@ -132,13 +121,13 @@ impl<T> Store<T> {
|
|||
}
|
||||
|
||||
pub async fn connect(table_folder: &PathBuf) -> Result<Self>
|
||||
where
|
||||
T: std::fmt::Debug + Encode + Decode + Ord,
|
||||
where T: std::fmt::Debug + Encode + Decode + Ord
|
||||
{
|
||||
let path_to_table = Path::new(table_folder);
|
||||
let path_to_rows = path_to_table.join(ROWS_FILE_NAME);
|
||||
|
||||
let mut file: File = OpenOptions::new()
|
||||
let mut file: File =
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(path_to_rows)
|
||||
|
|
@ -157,13 +146,12 @@ impl<T> Store<T> {
|
|||
StoreHeader::decode_rest(fixed_header, &rest_bytes).await?
|
||||
};
|
||||
|
||||
|
||||
let indexes: StoreIndexes<T> = {
|
||||
let mut result = Vec::with_capacity(header.number_of_columns);
|
||||
for (column, &is_indexed) in header.indexed_columns.iter().enumerate() {
|
||||
if is_indexed {
|
||||
result.push(Some(
|
||||
Self::connect_index_at(&header, column as Column).await?,
|
||||
))
|
||||
result.push(Some(Self::connect_index_at(&header, column as Column).await?))
|
||||
} else {
|
||||
result.push(None)
|
||||
}
|
||||
|
|
@ -172,29 +160,29 @@ impl<T> Store<T> {
|
|||
result
|
||||
};
|
||||
|
||||
let store = Self { header, indexes };
|
||||
let store = Self {
|
||||
header,
|
||||
indexes
|
||||
};
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
// ===Cursors===
|
||||
pub async fn read_cursor(&self) -> Result<ReadCursor<T>>
|
||||
where
|
||||
T: Send + Sync,
|
||||
where T: Send + Sync
|
||||
{
|
||||
ReadCursor::new(self).await
|
||||
}
|
||||
|
||||
pub async fn write_cursor(&mut self) -> Result<WriteCursor<T>>
|
||||
where
|
||||
T: Send + Sync,
|
||||
where T: Send + Sync
|
||||
{
|
||||
WriteCursor::new(self).await
|
||||
}
|
||||
|
||||
// ===Indexes===
|
||||
pub async fn attach_index(&mut self, column: Column) -> Result<()>
|
||||
where
|
||||
T: Ord + Decode + Encode + Send + Sync,
|
||||
where T: Ord + Decode + Encode + Send + Sync
|
||||
{
|
||||
if self.header.is_column_indexed(column) {
|
||||
Err(Error::ColumnAlreadyIndexed(column))
|
||||
|
|
@ -207,8 +195,7 @@ impl<T> Store<T> {
|
|||
// For debugging.
|
||||
#[allow(dead_code)]
|
||||
pub async fn read_all_bytes(&mut self) -> std::result::Result<Vec<u8>, std::io::Error>
|
||||
where
|
||||
T: Send + Sync,
|
||||
where T: Send + Sync
|
||||
{
|
||||
let mut cursor = self.read_cursor().await.map_err(|e| e.to_io_or_panic())?;
|
||||
let bytes = cursor.read_all_bytes().await?;
|
||||
|
|
@ -219,9 +206,9 @@ impl<T> Store<T> {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::cursor_capabilities::index_access::{CursorCanReadIndex, CursorCanWriteToIndex};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
use crate::segments::entry::Entry;
|
||||
use crate::cursor_capabilities::index_access::{CursorCanWriteToIndex, CursorCanReadIndex};
|
||||
use crate::cursor_capabilities::traversal::CursorCanTraverse;
|
||||
|
||||
impl <T>Drop for Store<T> {
|
||||
fn drop(&mut self) {
|
||||
|
|
@ -232,6 +219,7 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create() {
|
||||
type Data = u32;
|
||||
|
|
@ -239,9 +227,7 @@ mod tests {
|
|||
let table_path = Path::new("test_table_0");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
let store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
assert!(store.header.number_of_columns == number_of_columns);
|
||||
assert!(store.header.total_count == 0);
|
||||
|
|
@ -256,9 +242,7 @@ mod tests {
|
|||
let table_path = Path::new("test_table_1");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
{
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
|
@ -280,9 +264,7 @@ mod tests {
|
|||
let table_path = Path::new("test_table_2");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
{
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
|
@ -314,9 +296,7 @@ mod tests {
|
|||
let table_path = Path::new("test_table_3");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
{
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
|
|
@ -351,9 +331,7 @@ mod tests {
|
|||
let table_path = Path::new("test_table_4");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let value = 200;
|
||||
{
|
||||
|
|
@ -378,10 +356,7 @@ mod tests {
|
|||
let mut cursor = store.read_cursor().await.unwrap();
|
||||
let column = 1;
|
||||
|
||||
let entries = cursor
|
||||
.select_entries_where_eq(column, &value)
|
||||
.await
|
||||
.unwrap();
|
||||
let entries = cursor.select_entries_where_eq(column, &value).await.unwrap();
|
||||
|
||||
assert!(entries.len() == 2);
|
||||
assert!(entries[0].data == vec![1, value, 3, 4, 5]);
|
||||
|
|
@ -396,9 +371,7 @@ mod tests {
|
|||
let table_path = Path::new("test_table_5");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let column: Column = 1;
|
||||
|
||||
|
|
@ -429,10 +402,7 @@ mod tests {
|
|||
let mut cursor = store.read_cursor().await.unwrap();
|
||||
let column = 1;
|
||||
|
||||
let entries = cursor
|
||||
.select_entries_where_eq(column, &value)
|
||||
.await
|
||||
.unwrap();
|
||||
let entries = cursor.select_entries_where_eq(column, &value).await.unwrap();
|
||||
assert!(entries.len() == 2);
|
||||
// Order may be non-deterministic.
|
||||
assert!(entries[0].data[column as usize] == value);
|
||||
|
|
@ -447,9 +417,7 @@ mod tests {
|
|||
let table_path = Path::new("test_table_6");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let value = 200;
|
||||
let (_file_position0, file_position1, _file_position2, _file_position3) = {
|
||||
|
|
@ -468,12 +436,7 @@ mod tests {
|
|||
let file_position3 = cursor.insert_entry(entry3).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 4);
|
||||
(
|
||||
file_position0,
|
||||
file_position1,
|
||||
file_position2,
|
||||
file_position3,
|
||||
)
|
||||
(file_position0, file_position1, file_position2, file_position3)
|
||||
};
|
||||
|
||||
{
|
||||
|
|
@ -491,9 +454,7 @@ mod tests {
|
|||
let table_path = Path::new("test_table_7");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let column: Column = 1;
|
||||
|
||||
|
|
@ -519,21 +480,13 @@ mod tests {
|
|||
let file_position3 = cursor.insert_entry(entry3).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 4);
|
||||
(
|
||||
file_position0,
|
||||
file_position1,
|
||||
file_position2,
|
||||
file_position3,
|
||||
)
|
||||
(file_position0, file_position1, file_position2, file_position3)
|
||||
};
|
||||
|
||||
{
|
||||
assert!(store.header.deleted_count == 0);
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
cursor
|
||||
.delete_entries_where_eq(column, &value, false)
|
||||
.await
|
||||
.unwrap();
|
||||
cursor.delete_entries_where_eq(column, &value, false).await.unwrap();
|
||||
assert!(store.header.deleted_count == 2);
|
||||
}
|
||||
}
|
||||
|
|
@ -545,9 +498,7 @@ mod tests {
|
|||
let table_path = Path::new("test_table_8");
|
||||
let number_of_columns = 5;
|
||||
let primary_column = 0;
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut store: Store<Data> = Store::new(table_path, number_of_columns, primary_column).await.unwrap();
|
||||
|
||||
let column: Column = 1;
|
||||
|
||||
|
|
@ -573,21 +524,13 @@ mod tests {
|
|||
let file_position3 = cursor.insert_entry(entry3).await.unwrap();
|
||||
|
||||
assert!(store.header.total_count == 4);
|
||||
(
|
||||
file_position0,
|
||||
file_position1,
|
||||
file_position2,
|
||||
file_position3,
|
||||
)
|
||||
(file_position0, file_position1, file_position2, file_position3)
|
||||
};
|
||||
|
||||
{
|
||||
assert!(store.header.deleted_count == 0);
|
||||
let mut cursor = store.write_cursor().await.unwrap();
|
||||
cursor
|
||||
.delete_entries_where_eq(column, &value, false)
|
||||
.await
|
||||
.unwrap();
|
||||
cursor.delete_entries_where_eq(column, &value, false).await.unwrap();
|
||||
assert!(cursor.header().deleted_count == 2);
|
||||
assert!(cursor.header().total_count == 4);
|
||||
|
||||
|
|
@ -596,4 +539,5 @@ mod tests {
|
|||
assert!(cursor.header().total_count == 2);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue