SarasDB: Multi-Modal In-Memory Datastore in Rust
Disclaimer & Intro
This post has been made as my notes, even though I attempt to explain what I have built and how, I do not owe anyone any explanation. Do NOT expect anything.
My blog is my garden.
Rust is an absolutly fantastic language. Yes, Ada is safer, DLang provides more meta-features and C/C++ are giants but Rust's borrow checker is incredible, not to mention the ever growing ecosystem.
I have been learning Rust for the past 2 months, WHY? Because I'm bored.
So what did I do ? I decide to build something like fauna.com or surrealdb.com, after all what is the best way to learn if not to give oneself pain.
sarasDB is a stupid attempt by me to develop something that I can use for my own projects. It works so I'm okay with that. Remember that I'm NOT a database implementation engineer and there are probably 100+ optimizations that can be done here. It is read-heavy optimized. It is only kinda fault tolerant..so that was clickbait.
Another reason for development of this data store is Bio-tech applications but that is for future blogs 😉 .
So what is sarasDB? It is a multi-modal data store/base that represents data items in a weird (or special) way.
Memory Storage
The heart of this database is a thread-safe HashMap wrapped in an RwLock and an Arc. This design allows multiple readers and exclusive write access.
#[derive(Debug, Default, Clone)]
pub struct Database {
store: Arc<RwLock<HashMap<String, DataFormat>>>,
}
Let me break down what each item does here:
- HashMap<String, DataFormat>:
- Keys are strings (like "user_profile" or "product_data")
- Values are the DataFormat enum (which can be documents(JSON), vectors, tables, or directed graphs)
- RwLock<...>:
- RwLock stands for "Read-Write Lock"
- Allows multiple threads to read data simultaneously
- Ensures only ONE thread can write at a time
- Prevents data races during concurrent access
- Arc<...>:
- Arc stands for "Atomic Reference Counted" smart pointer
- Allows safe sharing of the lock across multiple parts of the application
- Ensures thread-safe reference counting
The DataFormat enum defines the four supported data types:
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum DataFormat {
Document(serde_json::Value),
Vector(Vec<f64>),
Table(HashMap<String, Vec<String>>),
#[serde(with = "graph_serde")]
Graph(DiGraph<String, String>),
}
The #[derive(Debug, Serialize, Deserialize, Clone)] attribute automatically generates implementations for the Debug, Serialize, Deserialize, and Clone traits, making the enum easier to debug, serialize/deserialize (e.g., to/from JSON), and clone.
The variants of DataFormat include:
- Document:
- Holds a serde_json::Value, which can represent any valid JSON data.
- Makes it ideal for dynamic or unknown structures.
- Vector:
- Holds a vector of 64-bit floating-point numbers (Vec
). - Efficiently stores numerical data in a simple, ordered collection for mathematical or analytical operations.
- Holds a vector of 64-bit floating-point numbers (Vec
- Table:
- Holds a HashMap where keys are Strings and values are vectors of Strings , representing a table-like structure.
- Represents tabular data with named columns, making it suitable for structured datasets like CSVs or other databases.
- Graph:
- Represents a graph using a custom serialization format (graph_serde) , with nodes and edges represented by a DiGraph
from the petgraph crate. - Enables the representation of graph data with nodes and edges, useful for network or relational data analysis.
- Represents a graph using a custom serialization format (graph_serde) , with nodes and edges represented by a DiGraph
Custom Serialization for Graphs
Storing graphs in a JSON-compatible format requires careful serialization. I serialize nodes and edges as a sequence of tuples.
("node", "A", None, None) // Node "A"
("edge", "A", "B", "label") // Edge from "A" to "B" with label
Serialization is implemented using a serialize_graph function:
// Custom serialization for DiGraph
fn serialize_graph<S>(graph: &DiGraph<String, String>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(graph.node_count() + graph.edge_count()))?;
for node in graph.node_indices() {
seq.serialize_element(&("node".to_string(), graph[node].clone(), None::<String>, None::<String>))?;
}
for edge in graph.edge_indices() {
let (source, target) = graph.edge_endpoints(edge).unwrap();
seq.serialize_element(&("edge".to_string(), graph[source].clone(), Some(graph[target].clone()), Some(graph[edge].clone())))?;
}
seq.end()
}
This above defines a custom serialization function serialize_graph for serializing a DiGraph<String, String> (a directed graph with string nodes and edges) using the serde crate. The function serializes the graph into a sequence of elements, starting with nodes and then edges. For each node, it serializes a tuple containing the string "node", the node's value, and two None values representing optional fields for additional data. For each edge, it serializes a tuple with the string "edge", the source and target nodes, and the edge label. The serialize_seq method is used to begin the serialization, followed by serialize_element for each node and edge, and finally, seq.end() completes the serialization process, returning the result.
Deserialization is implemented as below:
// Custom deserialization for DiGraph
fn deserialize_graph<'de, D>(deserializer: D) -> Result<DiGraph<String, String>, D::Error>
where
D: Deserializer<'de>,
{
use serde::de::{SeqAccess, Visitor};
use std::fmt;
struct GraphVisitor;
impl<'de> Visitor<'de> for GraphVisitor {
type Value = DiGraph<String, String>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a sequence of nodes and edges")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut graph = DiGraph::new();
let mut nodes = vec![];
while let Some(value) = seq.next_element::<(String, String, Option<String>, Option<String>)>()? {
match value {
(node_type, node, None, None) if node_type == "node" => {
nodes.push(graph.add_node(node));
}
(edge_type, source, Some(target), Some(edge_label)) if edge_type == "edge" => {
let source_index = nodes.iter().position(|n| graph[*n] == source).ok_or_else(|| {
serde::de::Error::custom("source node not found")
})?;
let target_index = nodes.iter().position(|n| graph[*n] == target).ok_or_else(|| {
serde::de::Error::custom("target node not found")
})?;
graph.add_edge(
NodeIndex::new(source_index),
NodeIndex::new(target_index),
edge_label,
);
}
_ => return Err(serde::de::Error::custom("unexpected item")),
}
}
Ok(graph)
}
}
deserializer.deserialize_seq(GraphVisitor)
}
This above defines a custom deserialization function deserialize_graph for deserializing a DiGraph<String, String> (a directed graph with string nodes and edges). The function uses the serde crate's deserialization capabilities to read data into a graph structure. A GraphVisitor struct implements the Visitor trait to guide the deserialization process, expecting a sequence of nodes and edges. The visit_seq method processes the sequence, adding nodes to the graph when it encounters a "node" entry, and creating edges when it encounters an "edge" entry, linking source and target nodes. If any unexpected data is encountered or a node reference cannot be resolved, it returns an error. Finally, the graph is built and returned as a DiGraph. The deserializer.deserialize_seq(GraphVisitor) line triggers the deserialization using this custom visitor logic.
Error handling and exporting serialize functions is as follows:
#[derive(Debug, Error)]
pub enum DBError {
#[error("Data not found for key: {0}")]
NotFound(String),
#[error("Unsupported data type for operation")]
UnsupportedDataType,
#[error("Invalid format: {0}")]
InvalidFormat(String),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Database error: {0}")]
GeneralError(String),
}
type DbResult<T> = Result<T, DBError>;
mod graph_serde {
use super::*;
pub(super) use super::serialize_graph as serialize;
pub(super) use super::deserialize_graph as deserialize;
}
The above defines an enumeration DBError with various error variants related to our database, each annotated with the #[derive(Debug, Error)] attributes for easier debugging and custom error messaging. The variants include NotFound, UnsupportedDataType, InvalidFormat, SerializationError, and GeneralError , each providing a descriptive message when an error occurs. A type alias DbResult<T> is also defined as Result<T, DBError>, standardizing error handling for database operations. Additionally, a graph_serde module is created to manage custom serialization and deserialization for graph data by re-exporting the serializegraph and deserializegraph functions from the parent module under the names serialize and deserialize, respectively.
Imports
Before we go any further , let us look at our current imports:
use async_std::sync::RwLock;
use petgraph::graph::{DiGraph, NodeIndex};
use serde::{Deserialize, Serialize, Serializer, Deserializer};
use serde::ser::SerializeSeq;
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
use tonic_reflection::server::Builder as ReflectionBuilder;
use tonic::{transport::Server, Request, Response, Status};
use proto::kvdb_server::{Kvdb, KvdbServer};
use proto::{StoreRequest, StoreResponse, UpdateRequest, UpdateResponse, DeleteRequest, DeleteResponse, QueryRequest, QueryResponse};
The imports in the above bring in a variety of modules and types from external crates and the standard library to support asynchronous operations, graph manipulation, serialization, error handling, and gRPC server functionality. RwLock from async-std is used for thread-safe read-write access in asynchronous contexts, while petgraph::graph::{DiGraph, NodeIndex} provides tools for creating and manipulating directed graphs. serde::{Deserialize, Serialize, Serializer, Deserializer} enables easy serialization and deserialization, and serde::ser::SerializeSeq is used for custom sequence serialization. The HashMap from the standard library supports efficient key-value storage, and Arc facilitates safe shared ownership of data across threads. The thiserror::Error trait is used for custom error handling, and tonic_reflection::server::Builder provides support for gRPC reflection. tonic imports bring in core gRPC components like Server, Request, Response, and Status, while the proto::kvdb_server::{Kvdb, KvdbServer} and proto::{StoreRequest, StoreResponse, UpdateRequest, UpdateResponse, DeleteRequest, DeleteResponse, QueryRequest, QueryResponse} imports are used for defining and handling the gRPC server and various key-value store operations.
looking at proto/kvdb.rs
This below code defines several data structures and a gRPC client-server implementation for a key-value database service using the tonic and prost libraries. The StoreRequest, UpdateRequest, DeleteRequest, and QueryRequest structs are used for handling key-value store operations with fields like key, data (as JSON), and format (such as document, vector, table, or graph). The corresponding response structs, StoreResponse, UpdateResponse, DeleteResponse, and QueryResponse, indicate the success of these operations.
The KvdbClient struct provides methods for connecting to and interacting with the server to store, update, delete, and query data, while the KvdbServer trait defines the server-side methods that must be implemented to handle these requests. The KvdbServer struct manages compression, message size limits, and asynchronous service handling. Each method in the client and server is wrapped in an asynchronous function, allowing communication with the key-value store service via gRPC, utilizing protocols defined by prost.
// proto/kvdb.rs
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StoreRequest {
#[prost(string, tag = "1")]
pub key: ::prost::alloc::string::String,
/// JSON serialized data
#[prost(string, tag = "2")]
pub data: ::prost::alloc::string::String,
/// "document", "vector", "table", "graph"
#[prost(string, tag = "3")]
pub format: ::prost::alloc::string::String,
}
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct StoreResponse {
#[prost(bool, tag = "1")]
pub success: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateRequest {
#[prost(string, tag = "1")]
pub key: ::prost::alloc::string::String,
/// JSON serialized data
#[prost(string, tag = "2")]
pub data: ::prost::alloc::string::String,
/// "document", "vector", "table", "graph"
#[prost(string, tag = "3")]
pub format: ::prost::alloc::string::String,
}
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct UpdateResponse {
#[prost(bool, tag = "1")]
pub success: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DeleteRequest {
#[prost(string, tag = "1")]
pub key: ::prost::alloc::string::String,
}
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct DeleteResponse {
#[prost(bool, tag = "1")]
pub success: bool,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueryRequest {
#[prost(string, tag = "1")]
pub key: ::prost::alloc::string::String,
/// "document", "vector", "table", "graph"
#[prost(string, tag = "2")]
pub query_type: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueryResponse {
/// JSON serialized result
#[prost(string, tag = "1")]
pub data: ::prost::alloc::string::String,
}
/// client implementations.
pub mod kvdb_client {
#![allow(
unused_variables,
dead_code,
missing_docs,
clippy::wildcard_imports,
clippy::let_unit_value,
)]
use tonic::codegen::*;
use tonic::codegen::http::Uri;
#[derive(Debug, Clone)]
pub struct KvdbClient<T> {
inner: tonic::client::Grpc<T>,
}
impl KvdbClient<tonic::transport::Channel> {
/// Attempt to create a new client by connecting to a given endpoint.
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
Ok(Self::new(conn))
}
}
impl<T> KvdbClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Self { inner }
}
pub fn with_origin(inner: T, origin: Uri) -> Self {
let inner = tonic::client::Grpc::with_origin(inner, origin);
Self { inner }
}
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> KvdbClient<InterceptedService<T, F>>
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
>,
>,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
>>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
{
KvdbClient::new(InterceptedService::new(inner, interceptor))
}
/// Compress requests with the given encoding.
///
/// This requires the server to support it otherwise it might respond with an
/// error.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.send_compressed(encoding);
self
}
/// Enable decompressing responses.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.accept_compressed(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_decoding_message_size(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
pub async fn store_data(
&mut self,
request: impl tonic::IntoRequest<super::StoreRequest>,
) -> std::result::Result<tonic::Response<super::StoreResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/kvdb.Kvdb/StoreData");
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new("kvdb.Kvdb", "StoreData"));
self.inner.unary(req, path, codec).await
}
pub async fn update_data(
&mut self,
request: impl tonic::IntoRequest<super::UpdateRequest>,
) -> std::result::Result<tonic::Response<super::UpdateResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/kvdb.Kvdb/UpdateData");
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new("kvdb.Kvdb", "UpdateData"));
self.inner.unary(req, path, codec).await
}
pub async fn delete_data(
&mut self,
request: impl tonic::IntoRequest<super::DeleteRequest>,
) -> std::result::Result<tonic::Response<super::DeleteResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/kvdb.Kvdb/DeleteData");
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new("kvdb.Kvdb", "DeleteData"));
self.inner.unary(req, path, codec).await
}
pub async fn query_data(
&mut self,
request: impl tonic::IntoRequest<super::QueryRequest>,
) -> std::result::Result<tonic::Response<super::QueryResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/kvdb.Kvdb/QueryData");
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new("kvdb.Kvdb", "QueryData"));
self.inner.unary(req, path, codec).await
}
}
}
/// server implementations.
pub mod kvdb_server {
#![allow(
unused_variables,
dead_code,
missing_docs,
clippy::wildcard_imports,
clippy::let_unit_value,
)]
use tonic::codegen::*;
/// trait containing gRPC methods that should be implemented for use with KvdbServer.
#[async_trait]
pub trait Kvdb: std::marker::Send + std::marker::Sync + 'static {
async fn store_data(
&self,
request: tonic::Request<super::StoreRequest>,
) -> std::result::Result<tonic::Response<super::StoreResponse>, tonic::Status>;
async fn update_data(
&self,
request: tonic::Request<super::UpdateRequest>,
) -> std::result::Result<tonic::Response<super::UpdateResponse>, tonic::Status>;
async fn delete_data(
&self,
request: tonic::Request<super::DeleteRequest>,
) -> std::result::Result<tonic::Response<super::DeleteResponse>, tonic::Status>;
async fn query_data(
&self,
request: tonic::Request<super::QueryRequest>,
) -> std::result::Result<tonic::Response<super::QueryResponse>, tonic::Status>;
}
#[derive(Debug)]
pub struct KvdbServer<T> {
inner: Arc<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
max_decoding_message_size: Option<usize>,
max_encoding_message_size: Option<usize>,
}
impl<T> KvdbServer<T> {
pub fn new(inner: T) -> Self {
Self::from_arc(Arc::new(inner))
}
pub fn from_arc(inner: Arc<T>) -> Self {
Self {
inner,
accept_compression_encodings: Default::default(),
send_compression_encodings: Default::default(),
max_decoding_message_size: None,
max_encoding_message_size: None,
}
}
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> InterceptedService<Self, F>
where
F: tonic::service::Interceptor,
{
InterceptedService::new(Self::new(inner), interceptor)
}
/// Enable decompressing requests with the given encoding.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.accept_compression_encodings.enable(encoding);
self
}
/// Compress responses with the given encoding, if the client supports it.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.send_compression_encodings.enable(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.max_decoding_message_size = Some(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.max_encoding_message_size = Some(limit);
self
}
}
impl<T, B> tonic::codegen::Service<http::Request<B>> for KvdbServer<T>
where
T: Kvdb,
B: Body + std::marker::Send + 'static,
B::Error: Into<StdError> + std::marker::Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
match req.uri().path() {
"/kvdb.Kvdb/StoreData" => {
#[allow(non_camel_case_types)]
struct StoreDataSvc<T: Kvdb>(pub Arc<T>);
impl<T: Kvdb> tonic::server::UnaryService<super::StoreRequest>
for StoreDataSvc<T> {
type Response = super::StoreResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::StoreRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Kvdb>::store_data(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = StoreDataSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/kvdb.Kvdb/UpdateData" => {
#[allow(non_camel_case_types)]
struct UpdateDataSvc<T: Kvdb>(pub Arc<T>);
impl<T: Kvdb> tonic::server::UnaryService<super::UpdateRequest>
for UpdateDataSvc<T> {
type Response = super::UpdateResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::UpdateRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Kvdb>::update_data(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = UpdateDataSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/kvdb.Kvdb/DeleteData" => {
#[allow(non_camel_case_types)]
struct DeleteDataSvc<T: Kvdb>(pub Arc<T>);
impl<T: Kvdb> tonic::server::UnaryService<super::DeleteRequest>
for DeleteDataSvc<T> {
type Response = super::DeleteResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::DeleteRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Kvdb>::delete_data(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = DeleteDataSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/kvdb.Kvdb/QueryData" => {
#[allow(non_camel_case_types)]
struct QueryDataSvc<T: Kvdb>(pub Arc<T>);
impl<T: Kvdb> tonic::server::UnaryService<super::QueryRequest>
for QueryDataSvc<T> {
type Response = super::QueryResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::QueryRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Kvdb>::query_data(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = QueryDataSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
let mut response = http::Response::new(empty_body());
let headers = response.headers_mut();
headers
.insert(
tonic::Status::GRPC_STATUS,
(tonic::Code::Unimplemented as i32).into(),
);
headers
.insert(
http::header::CONTENT_TYPE,
tonic::metadata::GRPC_CONTENT_TYPE,
);
Ok(response)
})
}
}
}
}
impl<T> Clone for KvdbServer<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
Self {
inner,
accept_compression_encodings: self.accept_compression_encodings,
send_compression_encodings: self.send_compression_encodings,
max_decoding_message_size: self.max_decoding_message_size,
max_encoding_message_size: self.max_encoding_message_size,
}
}
}
/// gRPC service name
pub const SERVICE_NAME: &str = "kvdb.Kvdb";
impl<T> tonic::server::NamedService for KvdbServer<T> {
const NAME: &'static str = SERVICE_NAME;
}
}
Database struct implementation
The below code defines an asynchronous Database struct that manages data using a thread-safe RwLock and Arc. The struct includes methods for inserting (store_data), updating (update_data), deleting (delete_data), and querying (query_data) data, where each operation interacts with a key-value store. The data is stored in a HashMap, and each method operates asynchronously to handle concurrent access. For querying, the method checks the type of data (e.g., document, vector, table, graph) and returns the appropriate value or an error if the data is not found or the type is unsupported.
impl Database {
pub async fn new() -> Self {
Database {
store: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn store_data(&self, key: &str, data: DataFormat) -> DbResult<()> {
let mut store = self.store.write().await;
store.insert(key.to_string(), data);
Ok(())
}
pub async fn update_data(&self, key: &str, data: DataFormat) -> DbResult<()> {
let mut store = self.store.write().await;
if store.contains_key(key) {
store.insert(key.to_string(), data);
Ok(())
} else {
Err(DBError::NotFound(key.to_string()))
}
}
pub async fn delete_data(&self, key: &str) -> DbResult<()> {
let mut store = self.store.write().await;
if store.remove(key).is_some() {
Ok(())
} else {
Err(DBError::NotFound(key.to_string()))
}
}
pub async fn query_data(&self, key: &str, query_type: &str) -> DbResult<DataFormat> {
let store = self.store.read().await;
match store.get(key) {
Some(data) => match (query_type, data) {
("document", DataFormat::Document(doc)) => Ok(DataFormat::Document(doc.clone())),
("vector", DataFormat::Vector(vec)) => Ok(DataFormat::Vector(vec.clone())),
("table", DataFormat::Table(table)) => Ok(DataFormat::Table(table.clone())),
("graph", DataFormat::Graph(graph)) => Ok(DataFormat::Graph(graph.clone())),
_ => Err(DBError::UnsupportedDataType),
},
None => Err(DBError::NotFound(key.to_string())),
}
}
}
Boring Stuff
Below code defines a helper function convert_graph_to_serializable that converts a directed graph (DiGraph<String, String>) into a serializable structure. The function iterates through all nodes and edges in the graph, collecting them into a Vec<(String, String, Option<String>, Option<String>)>. For each node, it adds an entry with the node's data, and for each edge, it adds entries for the source and target nodes along with the edge's data. Additionally, the code imports a proto module using tonic::include_proto!("kvdb"), enabling the use of Protocol Buffers (protobuf) for gRPC communication related to a key-value database (kvdb).
// Helper function to convert a graph to a serializable structure
fn convert_graph_to_serializable(graph: &DiGraph<String, String>) -> Vec<(String, String, Option<String>, Option<String>)> {
let mut elements = Vec::new();
for node in graph.node_indices() {
elements.push(("node".to_string(), graph[node].clone(), None, None));
}
for edge in graph.edge_indices() {
let (source, target) = graph.edge_endpoints(edge).unwrap();
elements.push((
"edge".to_string(),
graph[source].clone(),
Some(graph[target].clone()),
Some(graph[edge].clone()),
));
}
elements
}
// Importing the proto module.
mod proto {
tonic::include_proto!("kvdb");
}
Cool Stuff (The Kvdb Service)
The KvdbService struct is a implementation of a key-value database service that uses Database struct as its underlying data store. The service is annotated with #[derive(Debug, Default)] to automatically generate debugging and default behaviors. The KvdbService struct implements the Kvdb trait using tonic::async_trait, providing several asynchronous methods to interact with the database. These methods include storing, updating, deleting, and querying data. Each method extracts the necessary data from the incoming gRPC request, processes it (including deserialization of JSON data into appropriate formats like document, vector, table, or graph), and interacts with the database using the respective store_data, update_data, delete_data, or query_data functions.
The store_data, update_data, and delete_data methods handle requests to modify the database. They ensure the data is properly deserialized into the appropriate DataFormat before being passed to the database methods, returning errors if the data format is invalid or if a database operation fails. The query_data method queries the database and returns the requested data, serialized into JSON format based on its type. If an error occurs during any operation, the methods return a corresponding Status error, such as invalid_argument or not_found, providing error messages to the client.
#[derive(Debug, Default)]
pub struct KvdbService {
db: Database,
}
#[tonic::async_trait]
impl Kvdb for KvdbService {
async fn store_data(
&self,
request: Request<StoreRequest>,
) -> Result<Response<StoreResponse>, Status> {
let req = request.into_inner();
let key = req.key;
let format = req.format.as_str();
let data: DataFormat = match format {
"document" => DataFormat::Document(
serde_json::from_str(&req.data).map_err(|e| Status::invalid_argument(e.to_string()))?,
),
"vector" => DataFormat::Vector(
serde_json::from_str(&req.data).map_err(|e| Status::invalid_argument(e.to_string()))?,
),
"table" => DataFormat::Table(
serde_json::from_str(&req.data).map_err(|e| Status::invalid_argument(e.to_string()))?,
),
"graph" => {
let graph = graph_serde::deserialize(&mut serde_json::Deserializer::from_str(&req.data))
.map_err(|e| Status::invalid_argument(e.to_string()))?;
DataFormat::Graph(graph)
}
_ => return Err(Status::invalid_argument("Invalid data format")),
};
self.db.store_data(&key, data).await.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(StoreResponse { success: true }))
}
async fn update_data(
&self,
request: Request<UpdateRequest>,
) -> Result<Response<UpdateResponse>, Status> {
let req = request.into_inner();
let key = req.key;
let format = req.format.as_str();
let data: DataFormat = match format {
"document" => DataFormat::Document(
serde_json::from_str(&req.data).map_err(|e| Status::invalid_argument(e.to_string()))?,
),
"vector" => DataFormat::Vector(
serde_json::from_str(&req.data).map_err(|e| Status::invalid_argument(e.to_string()))?,
),
"table" => DataFormat::Table(
serde_json::from_str(&req.data).map_err(|e| Status::invalid_argument(e.to_string()))?,
),
"graph" => {
let graph = graph_serde::deserialize(&mut serde_json::Deserializer::from_str(&req.data))
.map_err(|e| Status::invalid_argument(e.to_string()))?;
DataFormat::Graph(graph)
}
_ => return Err(Status::invalid_argument("Invalid data format")),
};
self.db.update_data(&key, data).await.map_err(|e| Status::not_found(e.to_string()))?;
Ok(Response::new(UpdateResponse { success: true }))
}
async fn delete_data(
&self,
request: Request<DeleteRequest>,
) -> Result<Response<DeleteResponse>, Status> {
let key = request.into_inner().key;
self.db.delete_data(&key).await.map_err(|e| Status::not_found(e.to_string()))?;
Ok(Response::new(DeleteResponse { success: true }))
}
async fn query_data(
&self,
request: Request<QueryRequest>,
) -> Result<Response<QueryResponse>, Status> {
let req = request.into_inner();
let key = req.key;
let query_type = req.query_type;
let result = self.db.query_data(&key, &query_type).await.map_err(|e| Status::internal(e.to_string()))?;
let data = match result {
DataFormat::Document(doc) => serde_json::to_string(&doc),
DataFormat::Vector(vec) => serde_json::to_string(&vec),
DataFormat::Table(table) => serde_json::to_string(&table),
DataFormat::Graph(graph) => serde_json::to_string(&convert_graph_to_serializable(&graph)),
}
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(QueryResponse { data }))
}
}
Run it!
The following is my server implementation:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let kvdb = KvdbService {
db: Database::new().await,
};
Server::builder()
.add_service(KvdbServer::new(kvdb)) // Ensure KvdbServer is added
.add_service(ReflectionBuilder::configure().build().unwrap()) // Add reflection
.serve(addr)
.await?;
Ok(())
}
I will use grpcurl for a simple demo with JSON documents.
- Store Data:

- Update Data

- Query Data

- Delete Data

- Query Data after delete

Conclusion
Hope you liked my approach, I will NOT be working on this for the time being, but you are free to use it in your org/home as long as you follow the license.
