From d1eb86c2a58fe377ff7396f13de68e908bc63f26 Mon Sep 17 00:00:00 2001 From: arvinxx Date: Tue, 12 Mar 2024 00:26:43 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20wip:=20sync=20with=20local?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chat/(desktop)/features/SessionHeader.tsx | 13 +- src/database/core/db.ts | 2 +- src/libs/sync/index.ts | 120 ++++++++++++++++++ src/services/global.ts | 5 + src/store/global/slices/common/action.ts | 9 +- .../global/slices/common/initialState.ts | 2 + 6 files changed, 148 insertions(+), 3 deletions(-) create mode 100644 src/libs/sync/index.ts diff --git a/src/app/chat/(desktop)/features/SessionHeader.tsx b/src/app/chat/(desktop)/features/SessionHeader.tsx index 2c40470f6eb49..504d1d1a26650 100644 --- a/src/app/chat/(desktop)/features/SessionHeader.tsx +++ b/src/app/chat/(desktop)/features/SessionHeader.tsx @@ -1,11 +1,14 @@ import { ActionIcon, Logo } from '@lobehub/ui'; +import { Tag } from 'antd'; import { createStyles } from 'antd-style'; import { MessageSquarePlus } from 'lucide-react'; import { memo } from 'react'; import { useTranslation } from 'react-i18next'; import { Flexbox } from 'react-layout-kit'; +import useSWR from 'swr'; import { DESKTOP_HEADER_ICON_SIZE } from '@/const/layoutTokens'; +import { useGlobalStore } from '@/store/global'; import { useSessionStore } from '@/store/session'; import SessionSearchBar from '../../features/SessionSearchBar'; @@ -24,11 +27,19 @@ const Header = memo(() => { const { styles } = useStyles(); const { t } = useTranslation('chat'); const [createSession] = useSessionStore((s) => [s.createSession]); + const [syncEnabled, enabledSync] = useGlobalStore((s) => [s.syncEnabled, s.enabledSync]); + + useSWR('enableSync', enabledSync, { revalidateOnFocus: false }); return ( - + + + + 同步 + + createSession()} diff --git a/src/database/core/db.ts b/src/database/core/db.ts index e059c6141ab69..6f292d9596b91 100644 --- a/src/database/core/db.ts +++ b/src/database/core/db.ts @@ -13,7 +13,7 @@ import { migrateSettingsToUser } from './migrations/migrateSettingsToUser'; import { dbSchemaV1, dbSchemaV2, dbSchemaV3, dbSchemaV4, dbSchemaV5, dbSchemaV6 } from './schemas'; import { DBModel, LOBE_CHAT_LOCAL_DB_NAME } from './types/db'; -interface LobeDBSchemaMap { +export interface LobeDBSchemaMap { files: DB_File; messages: DB_Message; plugins: DB_Plugin; diff --git a/src/libs/sync/index.ts b/src/libs/sync/index.ts new file mode 100644 index 0000000000000..d573ad4279200 --- /dev/null +++ b/src/libs/sync/index.ts @@ -0,0 +1,120 @@ +import isEqual from 'fast-deep-equal'; +import { IndexeddbPersistence } from 'y-indexeddb'; +import { WebrtcProvider } from 'y-webrtc'; +import * as Y from 'yjs'; +import { Doc, Map } from 'yjs'; + +import { LocalDBInstance } from '@/database/core'; +import { LobeDBSchemaMap } from '@/database/core/db'; + +interface StartDataSyncParams { + name?: string; + onInit?: () => void; + password?: string; +} +class SyncBus { + private ydoc: Doc; + + constructor() { + this.ydoc = new Y.Doc(); + } + + startDataSync = async ({ name, password, onInit }: StartDataSyncParams) => { + // this.loadDataFromDBtoYjs('users'); + // if need file should dependon the file module + // this.loadDataFromDBtoYjs('files'); + + console.log('start init yjs...'); + await Promise.all([ + this.loadDataFromDBtoYjs('sessions'), + this.loadDataFromDBtoYjs('sessionGroups'), + this.loadDataFromDBtoYjs('topics'), + this.loadDataFromDBtoYjs('messages'), + this.loadDataFromDBtoYjs('plugins'), + ]); + onInit?.(); + console.log('yjs init success'); + + // clients connected to the same room-name share document updates + const provider = new WebrtcProvider(name || 'abc', this.ydoc, { + password: password, + }); + + const persistence = new IndexeddbPersistence('lobechat-data-sync', this.ydoc); + + provider.on('synced', () => { + console.log('WebrtcProvider: synced'); + }); + + persistence.on('synced', () => { + console.log('IndexeddbPersistence: synced'); + }); + }; + + internalUpdateYMap = (ymap: Map, key: string, item: any) => { + ymap.set(key, { ...item, _internalUpdate: true }); + }; + + loadDataFromDBtoYjs = async (tableKey: keyof LobeDBSchemaMap) => { + const table = LocalDBInstance[tableKey]; + const items = await table.toArray(); + const yItemMap = this.ydoc.getMap(tableKey); + items.forEach((item) => { + this.internalUpdateYMap(yItemMap, item.id, item); + }); + + table.hook('creating', (primaryKey, item) => { + console.log(tableKey, 'creating', primaryKey, item); + yItemMap.set(primaryKey, item); + }); + table.hook('updating', (item, primaryKey) => { + console.log('[DB]', tableKey, 'updating', primaryKey, item); + yItemMap.set(primaryKey, item); + }); + table.hook('deleting', (primaryKey) => { + console.log(tableKey, 'deleting', primaryKey); + yItemMap.delete(primaryKey); + }); + + yItemMap.observe(async (event) => { + // abort local change + if (event.transaction.local) return; + + console.log(tableKey, ':', event.keysChanged); + const pools = Array.from(event.keys).map(async ([id, payload]) => { + const item: any = yItemMap.get(id); + + if (item._internalUpdate) { + return; + } + + switch (payload.action) { + case 'add': { + console.log('新增:', payload); + + break; + } + case 'update': { + console.log(id, payload.newValue, payload.oldValue); + const item: any = yItemMap.get(id); + console.log('nextValue', item); + const current = await table.get(id); + + // 如果相等则不更新 + if (isEqual(item, current)) return; + + await table.update(id, item); + break; + } + case 'delete': { + break; + } + } + }); + + await Promise.all(pools); + }); + }; +} + +export const syncBus = new SyncBus(); diff --git a/src/services/global.ts b/src/services/global.ts index 12d6618c7de4f..62e252d1b5ad1 100644 --- a/src/services/global.ts +++ b/src/services/global.ts @@ -1,3 +1,4 @@ +import { syncBus } from '@/libs/sync'; import { GlobalServerConfig } from '@/types/settings'; import { API_ENDPOINTS } from './_url'; @@ -20,6 +21,10 @@ class GlobalService { return res.json(); }; + + enabledSync = async () => { + await syncBus.startDataSync({ name: 'abc' }); + }; } export const globalService = new GlobalService(); diff --git a/src/store/global/slices/common/action.ts b/src/store/global/slices/common/action.ts index e7d52de0a0352..58058b362398c 100644 --- a/src/store/global/slices/common/action.ts +++ b/src/store/global/slices/common/action.ts @@ -24,6 +24,7 @@ const n = setNamespace('common'); * 设置操作 */ export interface CommonAction { + enabledSync: () => Promise; refreshUserConfig: () => Promise; switchBackToChat: (sessionId?: string) => void; updateAvatar: (avatar: string) => Promise; @@ -41,13 +42,19 @@ export const createCommonSlice: StateCreator< [], CommonAction > = (set, get) => ({ + enabledSync: async () => { + set({ syncEnabled: false }); + await globalService.enabledSync(); + set({ syncEnabled: true }); + }, + refreshUserConfig: async () => { await mutate([USER_CONFIG_FETCH_KEY, true]); }, + switchBackToChat: (sessionId) => { get().router?.push(SESSION_CHAT_URL(sessionId || INBOX_SESSION_ID, get().isMobile)); }, - updateAvatar: async (avatar) => { await userService.updateAvatar(avatar); await get().refreshUserConfig(); diff --git a/src/store/global/slices/common/initialState.ts b/src/store/global/slices/common/initialState.ts index dcdb2b828e3bc..514d6be9fc6be 100644 --- a/src/store/global/slices/common/initialState.ts +++ b/src/store/global/slices/common/initialState.ts @@ -25,9 +25,11 @@ export interface GlobalCommonState { latestVersion?: string; router?: AppRouterInstance; sidebarKey: SidebarTabKey; + syncEnabled: boolean; } export const initialCommonState: GlobalCommonState = { isMobile: false, sidebarKey: SidebarTabKey.Chat, + syncEnabled: false, };