Skip to content

Commit

Permalink
feat: Add support for async proxy connection. (#2278)
Browse files Browse the repository at this point in the history
* Try to attach async to proxy trait first.

* Update proxy connection to support async.

* Add example.

* Try to fix CI.

* Remove CI for cloudflare worker example at this moment...

* Improve SQL serializer
  • Loading branch information
langyo authored and tyt2y3 committed Aug 20, 2024
1 parent c43e15e commit 9f203fc
Show file tree
Hide file tree
Showing 16 changed files with 474 additions and 88 deletions.
5 changes: 5 additions & 0 deletions examples/proxy_cloudflare_worker_example/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
target
node_modules
.wrangler
build
dist
52 changes: 52 additions & 0 deletions examples/proxy_cloudflare_worker_example/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[package]
name = "sea-orm-proxy-cloudflare-worker-example"
version = "0.1.0"
authors = ["Langyo <[email protected]>"]
edition = "2021"
publish = false

[workspace]

[package.metadata.release]
release = false

# https://github.com/rustwasm/wasm-pack/issues/1247
[package.metadata.wasm-pack.profile.release]
wasm-opt = false

[lib]
crate-type = ["cdylib"]

[dependencies]
anyhow = "1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
once_cell = "1"
async-trait = "0.1"

worker = { version = "0.3.0", features = ['http', 'axum', "d1"] }
worker-macros = { version = "0.3.0", features = ['http'] }
axum = { version = "0.7", default-features = false, features = ["macros"] }
tower-service = "0.3.2"

chrono = "0.4"
uuid = { version = "1", features = ["v4"] }

console_error_panic_hook = { version = "0.1" }
wasm-bindgen = "0.2.92"
wasm-bindgen-futures = { version = "0.4" }
gloo = "0.11"
oneshot = "0.1"

sea-orm = { path = "../../", default-features = false, features = [
"macros",
"proxy",
"with-uuid",
"with-chrono",
"with-json",
"debug-print",
] }

[patch.crates-io]
# https://github.com/cloudflare/workers-rs/pull/591
worker = { git = "https://github.com/cloudflare/workers-rs.git", rev = "ff2e6a0fd58b7e7b4b7651aba46e04067597eb03" }
13 changes: 13 additions & 0 deletions examples/proxy_cloudflare_worker_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# SeaORM Proxy Demo for Cloudflare Workers

This is a simple Cloudflare worker written in Rust. It uses the `sea-orm` ORM to interact with SQLite that is stored in the Cloudflare D1. It also uses `axum` as the server framework.

It's inspired by the [Cloudflare Workers Demo with Rust](https://github.com/logankeenan/full-stack-rust-cloudflare-axum).

## Run

Make sure you have `npm` and `cargo` installed. Be sure to use the latest version of `nodejs` and `rust`.

```bash
npx wrangler dev
```
12 changes: 12 additions & 0 deletions examples/proxy_cloudflare_worker_example/Wrangler.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name = "axum"
main = "build/worker/shim.mjs"
compatibility_date = "2024-07-08"

[[d1_databases]]
binding = "test-d1"
database_name = "axumtest"
# Change it if you want to use your own database
database_id = "00000000-0000-0000-0000-000000000000"

[build]
command = "cargo install -q worker-build && worker-build --release"
17 changes: 17 additions & 0 deletions examples/proxy_cloudflare_worker_example/src/entity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize)]
#[sea_orm(table_name = "posts")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i64,

pub title: String,
pub text: String,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}
16 changes: 16 additions & 0 deletions examples/proxy_cloudflare_worker_example/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use anyhow::Result;
use axum::{body::Body, response::Response};
use tower_service::Service;
use worker::{event, Context, Env, HttpRequest};

pub(crate) mod entity;
pub(crate) mod orm;
pub(crate) mod route;

// https://developers.cloudflare.com/workers/languages/rust
#[event(fetch)]
async fn fetch(req: HttpRequest, env: Env, _ctx: Context) -> Result<Response<Body>> {
console_error_panic_hook::set_once();

Ok(route::router(env).call(req).await?)
}
218 changes: 218 additions & 0 deletions examples/proxy_cloudflare_worker_example/src/orm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
use anyhow::{anyhow, Context, Result};
use std::{collections::BTreeMap, sync::Arc};
use wasm_bindgen::JsValue;

use sea_orm::{
ConnectionTrait, Database, DatabaseConnection, DbBackend, DbErr, ProxyDatabaseTrait,
ProxyExecResult, ProxyRow, RuntimeErr, Schema, Statement, Value, Values,
};
use worker::{console_log, Env};

struct ProxyDb {
env: Arc<Env>,
}

impl std::fmt::Debug for ProxyDb {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProxyDb").finish()
}
}

impl ProxyDb {
async fn do_query(env: Arc<Env>, statement: Statement) -> Result<Vec<ProxyRow>> {
let sql = statement.sql.clone();
let values = match statement.values {
Some(Values(values)) => values
.iter()
.map(|val| match &val {
Value::BigInt(Some(val)) => JsValue::from(val.to_string()),
Value::BigUnsigned(Some(val)) => JsValue::from(val.to_string()),
Value::Int(Some(val)) => JsValue::from(*val),
Value::Unsigned(Some(val)) => JsValue::from(*val),
Value::SmallInt(Some(val)) => JsValue::from(*val),
Value::SmallUnsigned(Some(val)) => JsValue::from(*val),
Value::TinyInt(Some(val)) => JsValue::from(*val),
Value::TinyUnsigned(Some(val)) => JsValue::from(*val),

Value::Float(Some(val)) => JsValue::from_f64(*val as f64),
Value::Double(Some(val)) => JsValue::from_f64(*val),

Value::Bool(Some(val)) => JsValue::from(*val),
Value::Bytes(Some(val)) => JsValue::from(format!(
"X'{}'",
val.iter()
.map(|byte| format!("{:02x}", byte))
.collect::<String>()
)),
Value::Char(Some(val)) => JsValue::from(val.to_string()),
Value::Json(Some(val)) => JsValue::from(val.to_string()),
Value::String(Some(val)) => JsValue::from(val.to_string()),

Value::ChronoDate(Some(val)) => JsValue::from(val.to_string()),
Value::ChronoDateTime(Some(val)) => JsValue::from(val.to_string()),
Value::ChronoDateTimeLocal(Some(val)) => JsValue::from(val.to_string()),
Value::ChronoDateTimeUtc(Some(val)) => JsValue::from(val.to_string()),
Value::ChronoDateTimeWithTimeZone(Some(val)) => JsValue::from(val.to_string()),

_ => JsValue::NULL,
})
.collect(),
None => Vec::new(),
};

console_log!("SQL query values: {:?}", values);
let ret = env.d1("test-d1")?.prepare(sql).bind(&values)?.all().await?;
if let Some(message) = ret.error() {
return Err(anyhow!(message.to_string()));
}

let ret = ret.results::<serde_json::Value>()?;
let ret = ret
.iter()
.map(|row| {
let mut values = BTreeMap::new();
for (key, value) in row.as_object().unwrap() {
values.insert(
key.clone(),
match &value {
serde_json::Value::Bool(val) => Value::Bool(Some(*val)),
serde_json::Value::Number(val) => {
if val.is_i64() {
Value::BigInt(Some(val.as_i64().unwrap()))
} else if val.is_u64() {
Value::BigUnsigned(Some(val.as_u64().unwrap()))
} else {
Value::Double(Some(val.as_f64().unwrap()))
}
}
serde_json::Value::String(val) => {
Value::String(Some(Box::new(val.clone())))
}
_ => unreachable!("Unsupported JSON value"),
},
);
}
ProxyRow { values }
})
.collect();
console_log!("SQL query result: {:?}", ret);

Ok(ret)
}

async fn do_execute(env: Arc<Env>, statement: Statement) -> Result<ProxyExecResult> {
let sql = statement.sql.clone();
let values = match statement.values {
Some(Values(values)) => values
.iter()
.map(|val| match &val {
Value::BigInt(Some(val)) => JsValue::from(val.to_string()),
Value::BigUnsigned(Some(val)) => JsValue::from(val.to_string()),
Value::Int(Some(val)) => JsValue::from(*val),
Value::Unsigned(Some(val)) => JsValue::from(*val),
Value::SmallInt(Some(val)) => JsValue::from(*val),
Value::SmallUnsigned(Some(val)) => JsValue::from(*val),
Value::TinyInt(Some(val)) => JsValue::from(*val),
Value::TinyUnsigned(Some(val)) => JsValue::from(*val),

Value::Float(Some(val)) => JsValue::from_f64(*val as f64),
Value::Double(Some(val)) => JsValue::from_f64(*val),

Value::Bool(Some(val)) => JsValue::from(*val),
Value::Bytes(Some(val)) => JsValue::from(format!(
"X'{}'",
val.iter()
.map(|byte| format!("{:02x}", byte))
.collect::<String>()
)),
Value::Char(Some(val)) => JsValue::from(val.to_string()),
Value::Json(Some(val)) => JsValue::from(val.to_string()),
Value::String(Some(val)) => JsValue::from(val.to_string()),

Value::ChronoDate(Some(val)) => JsValue::from(val.to_string()),
Value::ChronoDateTime(Some(val)) => JsValue::from(val.to_string()),
Value::ChronoDateTimeLocal(Some(val)) => JsValue::from(val.to_string()),
Value::ChronoDateTimeUtc(Some(val)) => JsValue::from(val.to_string()),
Value::ChronoDateTimeWithTimeZone(Some(val)) => JsValue::from(val.to_string()),

_ => JsValue::NULL,
})
.collect(),
None => Vec::new(),
};

let ret = env
.d1("test-d1")?
.prepare(sql)
.bind(&values)?
.run()
.await?
.meta()?;
console_log!("SQL execute result: {:?}", ret);

let last_insert_id = ret
.as_ref()
.map(|meta| meta.last_row_id.unwrap_or(0))
.unwrap_or(0) as u64;
let rows_affected = ret
.as_ref()
.map(|meta| meta.rows_written.unwrap_or(0))
.unwrap_or(0) as u64;

Ok(ProxyExecResult {
last_insert_id,
rows_affected,
})
}
}

#[async_trait::async_trait]
impl ProxyDatabaseTrait for ProxyDb {
async fn query(&self, statement: Statement) -> Result<Vec<ProxyRow>, DbErr> {
console_log!("SQL query: {:?}", statement);

let env = self.env.clone();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let ret = Self::do_query(env, statement).await;
tx.send(ret).unwrap();
});

let ret = rx.await.unwrap();
ret.map_err(|err| DbErr::Conn(RuntimeErr::Internal(err.to_string())))
}

async fn execute(&self, statement: Statement) -> Result<ProxyExecResult, DbErr> {
console_log!("SQL execute: {:?}", statement);

let env = self.env.clone();
let (tx, rx) = oneshot::channel();
wasm_bindgen_futures::spawn_local(async move {
let ret = Self::do_execute(env, statement).await;
tx.send(ret).unwrap();
});

let ret = rx.await.unwrap();
ret.map_err(|err| DbErr::Conn(RuntimeErr::Internal(err.to_string())))
}
}

pub async fn init_db(env: Arc<Env>) -> Result<DatabaseConnection> {
let db = Database::connect_proxy(DbBackend::Sqlite, Arc::new(Box::new(ProxyDb { env })))
.await
.context("Failed to connect to database")?;
let builder = db.get_database_backend();

console_log!("Connected to database");

db.execute(
builder.build(
Schema::new(builder)
.create_table_from_entity(crate::entity::Entity)
.if_not_exists(),
),
)
.await?;

Ok(db)
}
Loading

0 comments on commit 9f203fc

Please sign in to comment.