Skip to content

Commit

Permalink
refactor: crate folder
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy committed Mar 28, 2024
1 parent d30cf93 commit be3607c
Show file tree
Hide file tree
Showing 26 changed files with 890 additions and 969 deletions.
2 changes: 0 additions & 2 deletions libs/client-api/src/collab_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,3 @@ pub use plugin::*;
pub use sink::*;
pub use sink_config::*;
pub use sync_control::*;

pub use collab_rt_entity::collab_msg;
2 changes: 1 addition & 1 deletion libs/client-api/src/collab_sync/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use collab::core::collab_state::SyncState;
use collab::core::origin::CollabOrigin;
use collab::preclude::{Collab, CollabPlugin};
use collab_entity::{CollabObject, CollabType};
use collab_rt_entity::collab_msg::{ClientCollabMessage, ServerCollabMessage, UpdateSync};
use collab_rt_entity::{ClientCollabMessage, ServerCollabMessage, UpdateSync};
use collab_rt_protocol::{Message, SyncMessage};
use futures_util::SinkExt;
use tokio_stream::StreamExt;
Expand Down
8 changes: 4 additions & 4 deletions libs/client-api/src/collab_sync/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::collab_sync::sink_queue::{QueueItem, SinkQueue};
use crate::collab_sync::{check_update_contiguous, SyncError, SyncObject};
use futures_util::SinkExt;

use collab_rt_entity::collab_msg::{CollabSinkMessage, MsgId, ServerCollabMessage};
use collab_rt_entity::{MsgId, ServerCollabMessage, SinkMessage};
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
Expand Down Expand Up @@ -78,7 +78,7 @@ impl<E, Sink, Msg> CollabSink<Sink, Msg>
where
E: Into<anyhow::Error> + Send + Sync + 'static,
Sink: SinkExt<Vec<Msg>, Error = E> + Send + Sync + Unpin + 'static,
Msg: CollabSinkMessage,
Msg: SinkMessage,
{
pub fn new(
uid: i64,
Expand Down Expand Up @@ -416,7 +416,7 @@ fn get_next_batch_item<Msg>(
msg_queue: &mut SinkQueue<Msg>,
) -> Vec<QueueItem<Msg>>
where
Msg: CollabSinkMessage,
Msg: SinkMessage,
{
let mut next_sending_items = vec![];
let mut requeue_items = vec![];
Expand Down Expand Up @@ -485,7 +485,7 @@ impl<Msg> CollabSinkRunner<Msg> {
) where
E: Into<anyhow::Error> + Send + Sync + 'static,
Sink: SinkExt<Vec<Msg>, Error = E> + Send + Sync + Unpin + 'static,
Msg: CollabSinkMessage,
Msg: SinkMessage,
{
if let Some(sink) = weak_sink.upgrade() {
sink.notify();
Expand Down
15 changes: 6 additions & 9 deletions libs/client-api/src/collab_sync/sink_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::{Deref, DerefMut};

use collab_rt_entity::collab_msg::{CollabSinkMessage, MsgId};
use collab_rt_entity::{MsgId, SinkMessage};

pub(crate) struct SinkQueue<Msg> {
#[allow(dead_code)]
Expand All @@ -13,7 +13,7 @@ pub(crate) struct SinkQueue<Msg> {

impl<Msg> SinkQueue<Msg>
where
Msg: CollabSinkMessage,
Msg: SinkMessage,
{
pub(crate) fn new(uid: i64) -> Self {
Self {
Expand All @@ -29,7 +29,7 @@ where

impl<Msg> Deref for SinkQueue<Msg>
where
Msg: CollabSinkMessage,
Msg: SinkMessage,
{
type Target = BinaryHeap<QueueItem<Msg>>;

Expand All @@ -40,7 +40,7 @@ where

impl<Msg> DerefMut for SinkQueue<Msg>
where
Msg: CollabSinkMessage,
Msg: SinkMessage,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.queue
Expand All @@ -56,7 +56,7 @@ pub(crate) struct QueueItem<Msg> {

impl<Msg> QueueItem<Msg>
where
Msg: CollabSinkMessage,
Msg: SinkMessage,
{
pub fn new(msg: Msg, msg_id: MsgId) -> Self {
Self { inner: msg, msg_id }
Expand All @@ -77,11 +77,8 @@ where

impl<Msg> QueueItem<Msg>
where
Msg: CollabSinkMessage,
Msg: SinkMessage,
{
pub fn mergeable(&self) -> bool {
self.inner.mergeable()
}
pub fn merge(&mut self, other: &Self, max_size: &usize) -> Result<bool, Error> {
self.inner.merge(other.message(), max_size)
}
Expand Down
2 changes: 1 addition & 1 deletion libs/client-api/src/collab_sync/sync_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use collab::core::origin::CollabOrigin;
use collab::preclude::Collab;

use anyhow::anyhow;
use collab_rt_entity::collab_msg::{
use collab_rt_entity::{
AckCode, BroadcastSync, ClientCollabMessage, InitSync, ServerCollabMessage, ServerInit,
UpdateSync,
};
Expand Down
2 changes: 1 addition & 1 deletion libs/client-api/src/native/http_native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{RefreshTokenAction, RefreshTokenRetryCondition};
use anyhow::anyhow;
use app_error::AppError;
use async_trait::async_trait;
use collab_rt_entity::realtime_proto::HttpRealtimeMessage;
use collab_rt_entity::HttpRealtimeMessage;
use database_entity::dto::CollabParams;
use futures_util::stream;
use prost::Message;
Expand Down
10 changes: 5 additions & 5 deletions libs/client-api/src/ws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::broadcast::{channel, Receiver, Sender};

use crate::ws::msg_queue::{AggregateMessageQueue, AggregateMessagesReceiver};
use crate::ws::{ConnectState, ConnectStateNotify, WSError, WebSocketChannel};
use crate::ServerFixIntervalPing;
use crate::{af_spawn, retry_connect};
use collab_rt_entity::collab_msg::{ClientCollabMessage, ServerCollabMessage};
use collab_rt_entity::message::{RealtimeMessage, SystemMessage};
use collab_rt_entity::user::UserMessage;

use crate::ws::msg_queue::{AggregateMessageQueue, AggregateMessagesReceiver};
use client_websocket::{CloseCode, CloseFrame, Message, WebSocketStream};
use collab_rt_entity::user::UserMessage;
use collab_rt_entity::ClientCollabMessage;
use collab_rt_entity::ServerCollabMessage;
use collab_rt_entity::{RealtimeMessage, SystemMessage};
use tokio::sync::{oneshot, Mutex};
use tracing::{error, info, trace, warn};

Expand Down
4 changes: 2 additions & 2 deletions libs/client-api/src/ws/handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::af_spawn;
use collab_rt_entity::collab_msg::ClientCollabMessage;
use collab_rt_entity::message::RealtimeMessage;
use collab_rt_entity::ClientCollabMessage;
use collab_rt_entity::RealtimeMessage;
use futures_util::Sink;
use std::fmt::Debug;
use std::pin::Pin;
Expand Down
4 changes: 2 additions & 2 deletions libs/client-api/src/ws/msg_queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use client_websocket::Message;
use collab_rt_entity::collab_msg::{ClientCollabMessage, MsgId};
use collab_rt_entity::message::RealtimeMessage;
use collab_rt_entity::RealtimeMessage;
use collab_rt_entity::{ClientCollabMessage, MsgId};
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::sync::{Arc, Weak};
use std::time::Duration;
Expand Down
Loading

0 comments on commit be3607c

Please sign in to comment.