From 95e14349793e6e35b3680bddc7bb54fdb2382187 Mon Sep 17 00:00:00 2001 From: Arvin Xu Date: Thu, 14 Mar 2024 06:14:11 +0000 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix:=20fix=20sync=20status?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 1 - src/app/api/plugin/store/route.ts | 2 +- src/app/chat/features/SyncStatusTag/index.tsx | 77 ++++---- src/const/settings.ts | 4 +- src/database/core/sync.ts | 186 +++++++++++++----- src/hooks/useSyncData.ts | 20 +- src/services/global.ts | 8 + src/store/global/slices/common/action.ts | 31 ++- .../global/slices/common/initialState.ts | 5 +- src/types/sync.ts | 7 +- 10 files changed, 249 insertions(+), 92 deletions(-) diff --git a/package.json b/package.json index 32e61b0ea22e2..855a33f69d913 100644 --- a/package.json +++ b/package.json @@ -146,7 +146,6 @@ "use-merge-value": "^1", "utility-types": "^3", "uuid": "^9", - "y-indexeddb": "^9.0.12", "y-webrtc": "^10.3.0", "yaml": "^2", "yjs": "^13.6.14", diff --git a/src/app/api/plugin/store/route.ts b/src/app/api/plugin/store/route.ts index afe198ba99bed..a203a2467f5b3 100644 --- a/src/app/api/plugin/store/route.ts +++ b/src/app/api/plugin/store/route.ts @@ -11,7 +11,7 @@ export const GET = async (req: Request) => { let res: Response; - res = await fetch(pluginStore.getPluginIndexUrl(locale as any)); + res = await fetch(pluginStore.getPluginIndexUrl(locale as any), { next: { revalidate: 3600 } }); if (res.status === 404) { res = await fetch(pluginStore.getPluginIndexUrl(DEFAULT_LANG)); diff --git a/src/app/chat/features/SyncStatusTag/index.tsx b/src/app/chat/features/SyncStatusTag/index.tsx index eca947d8db083..3f2f68b1e6a76 100644 --- a/src/app/chat/features/SyncStatusTag/index.tsx +++ b/src/app/chat/features/SyncStatusTag/index.tsx @@ -1,10 +1,10 @@ import { Avatar, Icon, Tag } from '@lobehub/ui'; -import { Tag as ATag, Badge, Button, Input, Popover, Typography } from 'antd'; -import { createStyles, useTheme } from 'antd-style'; +import { Tag as ATag, Badge, Button, Popover, Typography } from 'antd'; +import { useTheme } from 'antd-style'; import isEqual from 'fast-deep-equal'; import { - LucideCloud, LucideCloudCog, + LucideCloudy, LucideLaptop, LucideRefreshCw, LucideSmartphone, @@ -13,46 +13,57 @@ import Link from 'next/link'; import { memo } from 'react'; import { Flexbox } from 'react-layout-kit'; +import { useSyncEvent } from '@/hooks/useSyncData'; import { useGlobalStore } from '@/store/global'; -import { settingsSelectors } from '@/store/global/slices/settings/selectors'; +import { settingsSelectors } from '@/store/global/selectors'; -export const useStyles = createStyles(({ css, token }) => ({ - logo: css` - fill: ${token.colorText}; - `, - top: css` - position: sticky; - top: 0; - `, -})); +const text = { + ready: '已连接', + synced: '已同步', + syncing: '同步中', +} as const; const SyncStatusTag = memo(() => { - const [isSyncing, enableSync, channelName, setSettings] = useGlobalStore((s) => [ + const [syncStatus, isSyncing, enableSync, channelName] = useGlobalStore((s) => [ + s.syncStatus, s.syncStatus === 'syncing', s.syncEnabled, settingsSelectors.syncConfig(s).channelName, s.setSettings, ]); const users = useGlobalStore((s) => s.syncAwareness, isEqual); + const refreshConnection = useGlobalStore((s) => s.refreshConnection); + const syncEvent = useSyncEvent(); const theme = useTheme(); + return enableSync ? ( - - 频道: -
- { - setSettings({ sync: { channelName: e.target.value } }); - }} - size={'small'} - value={channelName} - variant={'borderless'} - /> -
+ + + 频道:{channelName} + + {/*
*/} + {/* {*/} + {/* setSettings({ sync: { channelName: e.target.value } });*/} + {/* }}*/} + {/* size={'small'}*/} + {/* value={channelName}*/} + {/* variant={'borderless'}*/} + {/* />*/} + {/*
*/}
{users.map((user) => ( @@ -87,16 +98,16 @@ const SyncStatusTag = memo(() => {
} - open + // open placement={'bottomLeft'} > - } + color={syncStatus !== 'synced' ? 'blue' : 'green'} + icon={} > - {isSyncing ? '同步中' : '已同步'} - + {text[syncStatus]} +
) : ( { - // 如果之前实例化过,则断开 - if (provider) { - provider.destroy(); + startDataSync = async (params: StartDataSyncParams) => { + // 开发时由于存在 fast refresh 全局实例会缓存在运行时中 + // 因此需要在每次重新连接时清理上一次的实例 + if (window.__ONLY_USE_FOR_CLEANUP_IN_DEV) { + this.cleanProvider(window.__ONLY_USE_FOR_CLEANUP_IN_DEV); } + this.connect(params); + }; + + connect = (params: StartDataSyncParams) => { + const { + channel, + onSyncEvent, + onSyncStatusChange, + user, + onAwarenessChange, + signaling = 'wss://y-webrtc-signaling.lobehub.com', + } = params; + console.log('[YJS] start to listen sync event...'); + this.initYjsObserve(onSyncEvent, onSyncStatusChange); + + console.log(`[WebRTC] init provider... room: ${channel.name}`); // clients connected to the same room-name share document updates - provider = new WebrtcProvider(channel.name, this.ydoc, { + this.provider = new WebrtcProvider(channel.name, this.ydoc, { password: channel.password, signaling: [signaling], }); - provider.on('synced', async ({ synced }) => { - console.log('WebrtcProvider', synced, this.getYMap('messages').size); - if (synced) { - onSync?.('syncing'); - await this.initSync(); - console.log('yjs init success', this.getYMap('messages').size); - onSync?.('synced'); - } - }); + // 只在开发时解决全局实例缓存问题 + if (process.env.NODE_ENV === 'development') { + window.__ONLY_USE_FOR_CLEANUP_IN_DEV = this.provider; + } + + const provider = this.provider; + + console.log(`[WebRTC] provider init success`); + // 当本地设备正确连接到 WebRTC Provider 后,触发 status 事件 + // 当开始连接,则开始监听事件 provider.on('status', async ({ connected }) => { - console.log('status', connected); - // 当开始连接,则初始化数据 + console.log('[WebRTC] peer connected status:', connected); if (connected) { - console.log('start Observe...'); - this.initObserve(onEvent); + // this.initObserve(onSyncEvent, onSyncStatusChange); + onSyncStatusChange?.('ready'); + } + }); + + // provider.on('peers', (peers) => { + // console.log(peers); + // }); + + // 当各方的数据均完成同步后,YJS 对象之间的数据已经一致时,触发 synced 事件 + provider.on('synced', async ({ synced }) => { + console.log('[WebRTC] peer sync status:', synced); + if (synced) { + console.groupCollapsed('[WebRTC] start to init yjs data...'); + onSyncStatusChange?.('syncing'); + await this.initSync(); + onSyncStatusChange?.('synced'); + console.groupEnd(); + console.log('[WebRTC] yjs data init success'); + } else { + console.log('[WebRTC] sync failed,try to reconnect...'); + await this.reconnect(params); } }); const awareness = provider.awareness; awareness.setLocalState({ clientID: awareness.clientID, user }); + onAwarenessChange?.([{ ...user, clientID: awareness.clientID, current: true }]); awareness.on('change', () => { const state = Array.from(awareness.getStates().values()).map((s) => ({ @@ -71,6 +104,30 @@ class SyncBus { onAwarenessChange?.(state); }); + + return provider; + }; + + reconnect = async (params: StartDataSyncParams) => { + this.cleanProvider(this.provider); + + this.connect(params); + }; + + private cleanProvider(provider: WebrtcProvider | null) { + if (provider) { + console.log(`[WebRTC] clean provider...`); + provider.room?.destroy(); + provider.awareness?.destroy(); + provider.destroy(); + console.log(`[WebRTC] clean success`); + console.log(`[WebRTC] -------------------`); + } + } + + manualSync = async () => { + console.log('[WebRTC] try to manual init sync...'); + await this.initSync(); }; getYMap = (tableKey: keyof LobeDBSchemaMap) => { @@ -79,28 +136,42 @@ class SyncBus { private initSync = async () => { await Promise.all( - ['sessions', 'sessionGroups', 'topics', 'messages', 'plugins'].map(async (tableKey) => { - return this.loadDataFromDBtoYjs(tableKey as keyof LobeDBSchemaMap); - }), + ['sessions', 'sessionGroups', 'topics', 'messages', 'plugins'].map(async (tableKey) => + this.loadDataFromDBtoYjs(tableKey as keyof LobeDBSchemaMap), + ), ); }; - private initObserve = (onEvent: OnSyncEvent) => { + private initYjsObserve = (onEvent: OnSyncEvent, onSyncStatusChange: OnSyncStatusChange) => { ['sessions', 'sessionGroups', 'topics', 'messages', 'plugins'].forEach((tableKey) => { // listen yjs change - this.observeYMapChange(tableKey as keyof LobeDBSchemaMap, onEvent); + this.observeYMapChange(tableKey as keyof LobeDBSchemaMap, onEvent, onSyncStatusChange); }); }; - private observeYMapChange = (tableKey: keyof LobeDBSchemaMap, onEvent: OnSyncEvent) => { + private observeYMapChange = ( + tableKey: keyof LobeDBSchemaMap, + onEvent: OnSyncEvent, + onSyncStatusChange: OnSyncStatusChange, + ) => { const table = LocalDBInstance[tableKey]; const yItemMap = this.getYMap(tableKey); + const updateSyncEvent = throttle(onEvent, 1000); + + // 定义一个变量来保存定时器的ID + // eslint-disable-next-line no-undef + let debounceTimer: NodeJS.Timeout; yItemMap.observe(async (event) => { // abort local change if (event.transaction.local) return; - console.log(`observe ${tableKey} changes:`, event.keysChanged.size); + // 每次有变更时,都先清除之前的定时器(如果有的话),然后设置新的定时器 + clearTimeout(debounceTimer); + + onSyncStatusChange('syncing'); + + console.log(`[YJS] observe ${tableKey} changes:`, event.keysChanged.size); const pools = Array.from(event.keys).map(async ([id, payload]) => { const item: any = yItemMap.get(id); @@ -124,7 +195,13 @@ class SyncBus { }); await Promise.all(pools); - onEvent?.(tableKey); + + updateSyncEvent(tableKey); + + // 设置定时器,500ms 后更新状态为'synced' + debounceTimer = setTimeout(() => { + onSyncStatusChange('synced'); + }, 1000); }); }; @@ -133,10 +210,31 @@ class SyncBus { const items = await table.toArray(); const yItemMap = this.getYMap(tableKey); - items.forEach((item) => { - // TODO: 需要改表,所有 table 都需要有 id 字段 - yItemMap.set(item.id || (item as any).identifier, item); - }); + // 定义每批次最多包含的数据条数 + const batchSize = 50; + + // 计算总批次数 + const totalBatches = Math.ceil(items.length / batchSize); + + for (let i = 0; i < totalBatches; i++) { + // 计算当前批次的起始和结束索引 + const start = i * batchSize; + const end = start + batchSize; + + // 获取当前批次的数据 + const batchItems = items.slice(start, end); + + // 将当前批次的数据推送到 Yjs 中 + + this.ydoc.transact(() => { + batchItems.forEach((item) => { + // TODO: 需要改表,所有 table 都需要有 id 字段 + yItemMap.set(item.id || (item as any).identifier, item); + }); + }); + } + + console.log('[DB]:', tableKey, yItemMap.size); }; } diff --git a/src/hooks/useSyncData.ts b/src/hooks/useSyncData.ts index d63ff1f4b65aa..e44660575782d 100644 --- a/src/hooks/useSyncData.ts +++ b/src/hooks/useSyncData.ts @@ -1,14 +1,16 @@ +import { useCallback } from 'react'; + import { useChatStore } from '@/store/chat'; import { useGlobalStore } from '@/store/global'; import { useSessionStore } from '@/store/session'; -export const useEnabledDataSync = () => { - const [userId, useEnabledSync] = useGlobalStore((s) => [s.userId, s.useEnabledSync]); - +export const useSyncEvent = () => { const [refreshMessages, refreshTopic] = useChatStore((s) => [s.refreshMessages, s.refreshTopic]); const [refreshSessions] = useSessionStore((s) => [s.refreshSessions]); - useEnabledSync(userId, (tableKey) => { + return useCallback((tableKey: string) => { + // console.log('triggerSync Event:', tableKey); + switch (tableKey) { case 'messages': { refreshMessages(); @@ -29,5 +31,13 @@ export const useEnabledDataSync = () => { break; } } - }); + }, []); +}; + +export const useEnabledDataSync = () => { + const [userId, useEnabledSync] = useGlobalStore((s) => [s.userId, s.useEnabledSync]); + + const syncEvent = useSyncEvent(); + + useEnabledSync(userId, syncEvent); }; diff --git a/src/services/global.ts b/src/services/global.ts index 4aaa1c9fcbd61..7d6324414a1ea 100644 --- a/src/services/global.ts +++ b/src/services/global.ts @@ -29,6 +29,14 @@ class GlobalService { await syncBus.startDataSync(params); return true; }; + + reconnect = async (params: StartDataSyncParams) => { + if (typeof window === 'undefined') return false; + + await syncBus.reconnect(params); + await syncBus.manualSync(); + return true; + }; } export const globalService = new GlobalService(); diff --git a/src/store/global/slices/common/action.ts b/src/store/global/slices/common/action.ts index 143a2ea162c5a..20421b03e7721 100644 --- a/src/store/global/slices/common/action.ts +++ b/src/store/global/slices/common/action.ts @@ -26,6 +26,7 @@ const n = setNamespace('common'); * 设置操作 */ export interface CommonAction { + refreshConnection: (onEvent: OnSyncEvent) => Promise; refreshUserConfig: () => Promise; switchBackToChat: (sessionId?: string) => void; updateAvatar: (avatar: string) => Promise; @@ -44,6 +45,31 @@ export const createCommonSlice: StateCreator< [], CommonAction > = (set, get) => ({ + refreshConnection: async (onEvent) => { + const sync = settingsSelectors.syncConfig(get()); + if (!sync.channelName) return; + + await globalService.reconnect({ + channel: { + name: sync.channelName, + password: sync.channelPassword, + }, + onAwarenessChange(state) { + set({ syncAwareness: state }); + }, + onSyncEvent: onEvent, + onSyncStatusChange: (status) => { + set({ syncStatus: status }); + }, + signaling: sync.signaling, + user: { + // name: userId, + id: get().userId!, + ...browserInfo, + }, + }); + }, + refreshUserConfig: async () => { await mutate([USER_CONFIG_FETCH_KEY, true]); }, @@ -51,7 +77,6 @@ export const createCommonSlice: StateCreator< switchBackToChat: (sessionId) => { get().router?.push(SESSION_CHAT_URL(sessionId || INBOX_SESSION_ID, get().isMobile)); }, - updateAvatar: async (avatar) => { await userService.updateAvatar(avatar); await get().refreshUserConfig(); @@ -99,8 +124,8 @@ export const createCommonSlice: StateCreator< onAwarenessChange(state) { set({ syncAwareness: state }); }, - onEvent, - onSync: (status) => { + onSyncEvent: onEvent, + onSyncStatusChange: (status) => { set({ syncStatus: status }); }, signaling: sync.signaling, diff --git a/src/store/global/slices/common/initialState.ts b/src/store/global/slices/common/initialState.ts index 32fef0301afa7..72c31bf20e0e4 100644 --- a/src/store/global/slices/common/initialState.ts +++ b/src/store/global/slices/common/initialState.ts @@ -1,6 +1,6 @@ import { AppRouterInstance } from 'next/dist/shared/lib/app-router-context.shared-runtime'; -import { SyncAwarenessState } from '@/types/sync'; +import { PeerSyncStatus, SyncAwarenessState } from '@/types/sync'; export enum SidebarTabKey { Chat = 'chat', @@ -30,7 +30,7 @@ export interface GlobalCommonState { sidebarKey: SidebarTabKey; syncAwareness: SyncAwarenessState[]; syncEnabled: boolean; - syncStatus?: 'syncing' | 'synced' | 'hold'; + syncStatus: PeerSyncStatus; } export const initialCommonState: GlobalCommonState = { @@ -38,4 +38,5 @@ export const initialCommonState: GlobalCommonState = { sidebarKey: SidebarTabKey.Chat, syncAwareness: [], syncEnabled: false, + syncStatus: 'ready', }; diff --git a/src/types/sync.ts b/src/types/sync.ts index 774e66437fa5a..4d5bfed108fc9 100644 --- a/src/types/sync.ts +++ b/src/types/sync.ts @@ -1,6 +1,9 @@ import { LobeDBSchemaMap } from '@/database/core/db'; export type OnSyncEvent = (tableKey: keyof LobeDBSchemaMap) => void; +export type OnSyncStatusChange = (status: PeerSyncStatus) => void; + +export type PeerSyncStatus = 'syncing' | 'synced' | 'ready'; export interface StartDataSyncParams { channel: { @@ -8,8 +11,8 @@ export interface StartDataSyncParams { password?: string; }; onAwarenessChange: (state: SyncAwarenessState[]) => void; - onEvent: OnSyncEvent; - onSync: (status: 'syncing' | 'synced') => void; + onSyncEvent: OnSyncEvent; + onSyncStatusChange: OnSyncStatusChange; signaling?: string; user: SyncUserInfo; }