Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Feb 27, 2024
1 parent e7a512b commit 47eff78
Show file tree
Hide file tree
Showing 37 changed files with 1,631 additions and 1,653 deletions.
2 changes: 1 addition & 1 deletion libsql-ffi/bundled/SQLite3MultipleCiphers/src/sqlite3.c
Original file line number Diff line number Diff line change
Expand Up @@ -136111,7 +136111,7 @@ static int xferOptimization(
}
autoIncStep(pParse, regNextRowid, regRowid);
}else if( pDest->pIndex==0 && !(db->mDbFlags & DBFLAG_VacuumInto) ){
addr1 = sqlite3VdbeAddOp2(v, OP_NewRowid, iDest, regRowid);
addr1 = sqlite3VdbeAddOp3(v, OP_NewRowid, iDest, regRowid, regNextRowid);
}else{
addr1 = sqlite3VdbeAddOp2(v, OP_Rowid, iSrc, regRowid);
assert( (pDest->tabFlags & TF_Autoincrement)==0 );
Expand Down
7 changes: 4 additions & 3 deletions libsql-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub struct RpcClientConfig<C = HttpConnector> {
}

impl<C: Connector> RpcClientConfig<C> {
pub(crate) async fn configure(self) -> anyhow::Result<(Channel, tonic::transport::Uri)> {
let uri = tonic::transport::Uri::from_maybe_shared(self.remote_url)?;
pub(crate) async fn configure(&self) -> anyhow::Result<(Channel, tonic::transport::Uri)> {
let uri = tonic::transport::Uri::from_maybe_shared(self.remote_url.clone())?;
let mut builder = Channel::builder(uri.clone());
if let Some(ref tls_config) = self.tls_config {
let cert_pem = std::fs::read_to_string(&tls_config.cert)?;
Expand All @@ -38,7 +38,8 @@ impl<C: Connector> RpcClientConfig<C> {
builder = builder.tls_config(tls_config)?;
}

let channel = builder.connect_with_connector_lazy(self.connector.map_err(Into::into));
let channel =
builder.connect_with_connector_lazy(self.connector.clone().map_err(Into::into));

Ok((channel, uri))
}
Expand Down
12 changes: 8 additions & 4 deletions libsql-server/src/connection/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::LIBSQL_PAGE_SIZE;
use crate::{namespace::NamespaceName, LIBSQL_PAGE_SIZE};
use bytesize::mb;
use url::Url;

Expand Down Expand Up @@ -28,7 +28,7 @@ pub struct DatabaseConfig {
#[serde(default)]
pub is_shared_schema: bool,
#[serde(default)]
pub shared_schema_name: Option<String>,
pub shared_schema_name: Option<NamespaceName>,
}

impl DatabaseConfig {
Expand Down Expand Up @@ -79,7 +79,11 @@ impl From<&metadata::DatabaseConfig> for DatabaseConfig {
allow_attach: value.allow_attach,
max_row_size: value.max_row_size.unwrap_or_else(default_max_row_size),
is_shared_schema: value.shared_schema.unwrap_or(false),
shared_schema_name: value.shared_schema_name.clone(),
// namespace name is coming from primary, we assume it's valid
shared_schema_name: value
.shared_schema_name
.clone()
.map(NamespaceName::new_unchecked),
}
}
}
Expand All @@ -98,7 +102,7 @@ impl From<&DatabaseConfig> for metadata::DatabaseConfig {
allow_attach: value.allow_attach,
max_row_size: Some(value.max_row_size),
shared_schema: Some(value.is_shared_schema),
shared_schema_name: value.shared_schema_name.clone(),
shared_schema_name: value.shared_schema_name.as_ref().map(|s| s.to_string()),
}
}
}
1 change: 0 additions & 1 deletion libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ where
self.make_connection().await
}
}

pub struct LibSqlConnection<T> {
inner: Arc<Mutex<Connection<T>>>,
}
Expand Down
60 changes: 38 additions & 22 deletions libsql-server/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use libsql_replication::rpc::replication::NAMESPACE_METADATA_KEY;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::time::{Duration, Instant};

Expand Down Expand Up @@ -210,6 +210,43 @@ pub trait MakeConnection: Send + Sync + 'static {
max_concurrent_requests,
)
}

fn map<F, T>(self, f: F) -> Map<Self, F>
where
F: Fn(Self::Connection) -> T + Send + Sync + 'static,
Self: Sized,
{
Map { inner: self, f }
}
}

pub struct Map<T, F> {
inner: T,
f: F,
}

#[async_trait::async_trait]
impl<F, T, O> MakeConnection for Map<T, F>
where
F: Fn(T::Connection) -> O + Send + Sync + 'static,
T: MakeConnection,
O: Connection,
{
type Connection = O;

async fn create(&self) -> Result<Self::Connection, Error> {
let conn = self.inner.create().await?;
Ok((self.f)(conn))
}
}

#[async_trait::async_trait]
impl<T: MakeConnection> MakeConnection for Arc<T> {
type Connection = T::Connection;

async fn create(&self) -> Result<Self::Connection, Error> {
self.as_ref().create().await
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -290,14 +327,6 @@ impl Drop for WaitersGuard<'_> {
}
}

fn now_millis() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}

#[async_trait::async_trait]
impl<F: MakeConnection> MakeConnection for MakeThrottledConnection<F> {
type Connection = TrackedConnection<F::Connection>;
Expand Down Expand Up @@ -341,7 +370,6 @@ impl<F: MakeConnection> MakeConnection for MakeThrottledConnection<F> {
Ok(TrackedConnection {
permit,
inner,
atime: AtomicU64::new(now_millis()),
created_at: Instant::now(),
})
}
Expand All @@ -352,7 +380,6 @@ pub struct TrackedConnection<DB> {
inner: DB,
#[allow(dead_code)] // just hold on to it
permit: tokio::sync::OwnedSemaphorePermit,
atime: AtomicU64,
created_at: Instant,
}

Expand All @@ -363,14 +390,6 @@ impl<T> Drop for TrackedConnection<T> {
}
}

impl<DB: Connection> TrackedConnection<DB> {
pub fn idle_time(&self) -> Duration {
let now = now_millis();
let atime = self.atime.load(Ordering::Relaxed);
Duration::from_millis(now.saturating_sub(atime))
}
}

#[async_trait::async_trait]
impl<DB: Connection> Connection for TrackedConnection<DB> {
#[inline]
Expand All @@ -381,7 +400,6 @@ impl<DB: Connection> Connection for TrackedConnection<DB> {
builder: B,
replication_index: Option<FrameNo>,
) -> crate::Result<B> {
self.atime.store(now_millis(), Ordering::Relaxed);
self.inner
.execute_program(pgm, ctx, builder, replication_index)
.await
Expand All @@ -394,7 +412,6 @@ impl<DB: Connection> Connection for TrackedConnection<DB> {
ctx: RequestContext,
replication_index: Option<FrameNo>,
) -> crate::Result<crate::Result<DescribeResponse>> {
self.atime.store(now_millis(), Ordering::Relaxed);
self.inner.describe(sql, ctx, replication_index).await
}

Expand All @@ -405,7 +422,6 @@ impl<DB: Connection> Connection for TrackedConnection<DB> {

#[inline]
async fn checkpoint(&self) -> Result<()> {
self.atime.store(now_millis(), Ordering::Relaxed);
self.inner.checkpoint().await
}

Expand Down
84 changes: 0 additions & 84 deletions libsql-server/src/database.rs

This file was deleted.

Loading

0 comments on commit 47eff78

Please sign in to comment.