diff --git a/package.json b/package.json index 8cc7089e8705b..4d637453c5557 100644 --- a/package.json +++ b/package.json @@ -146,7 +146,9 @@ "use-merge-value": "^1", "utility-types": "^3", "uuid": "^9", + "y-webrtc": "^10.3.0", "yaml": "^2", + "yjs": "^13.6.14", "zod": "^3", "zustand": "^4.5.2", "zustand-utils": "^1.3.2" diff --git a/src/app/chat/(desktop)/features/SessionHeader.tsx b/src/app/chat/(desktop)/features/SessionHeader.tsx index 2c40470f6eb49..0eec8fd2c53eb 100644 --- a/src/app/chat/(desktop)/features/SessionHeader.tsx +++ b/src/app/chat/(desktop)/features/SessionHeader.tsx @@ -9,6 +9,7 @@ import { DESKTOP_HEADER_ICON_SIZE } from '@/const/layoutTokens'; import { useSessionStore } from '@/store/session'; import SessionSearchBar from '../../features/SessionSearchBar'; +import SyncStatusTag from '../../features/SyncStatusTag'; export const useStyles = createStyles(({ css, token }) => ({ logo: css` @@ -28,7 +29,10 @@ const Header = memo(() => { return ( - + + + + createSession()} diff --git a/src/app/chat/features/SyncStatusTag/index.tsx b/src/app/chat/features/SyncStatusTag/index.tsx new file mode 100644 index 0000000000000..3f2f68b1e6a76 --- /dev/null +++ b/src/app/chat/features/SyncStatusTag/index.tsx @@ -0,0 +1,136 @@ +import { Avatar, Icon, Tag } from '@lobehub/ui'; +import { Tag as ATag, Badge, Button, Popover, Typography } from 'antd'; +import { useTheme } from 'antd-style'; +import isEqual from 'fast-deep-equal'; +import { + LucideCloudCog, + LucideCloudy, + LucideLaptop, + LucideRefreshCw, + LucideSmartphone, +} from 'lucide-react'; +import Link from 'next/link'; +import { memo } from 'react'; +import { Flexbox } from 'react-layout-kit'; + +import { useSyncEvent } from '@/hooks/useSyncData'; +import { useGlobalStore } from '@/store/global'; +import { settingsSelectors } from '@/store/global/selectors'; + +const text = { + ready: '已连接', + synced: '已同步', + syncing: '同步中', +} as const; + +const SyncStatusTag = memo(() => { + const [syncStatus, isSyncing, enableSync, channelName] = useGlobalStore((s) => [ + s.syncStatus, + s.syncStatus === 'syncing', + s.syncEnabled, + settingsSelectors.syncConfig(s).channelName, + s.setSettings, + ]); + const users = useGlobalStore((s) => s.syncAwareness, isEqual); + const refreshConnection = useGlobalStore((s) => s.refreshConnection); + const syncEvent = useSyncEvent(); + + const theme = useTheme(); + + return enableSync ? ( + + + 频道:{channelName} + + {/*
*/} + {/* {*/} + {/* setSettings({ sync: { channelName: e.target.value } });*/} + {/* }}*/} + {/* size={'small'}*/} + {/* value={channelName}*/} + {/* variant={'borderless'}*/} + {/* />*/} + {/*
*/} +
+ + {users.map((user) => ( + + + } + background={theme.purple1} + shape={'square'} + /> + + + + {user.name || user.id} + {user.current && ( + + current + + )} + + + {user.device} · {user.os} · {user.browser} + + + + ))} + +
+ } + // open + placement={'bottomLeft'} + > + } + > + {text[syncStatus]} + + + ) : ( + + 会话数据仅存储于当前使用的浏览器。如果使用不同浏览器打开时,数据不会互相同步。如你需要在多个设备间同步数据,请配置并开启云端同步。 + + + +
+ } + placement={'bottomLeft'} + > +
+ + 本地 + +
+ + ); +}); + +export default SyncStatusTag; diff --git a/src/app/settings/features/SettingList/index.tsx b/src/app/settings/features/SettingList/index.tsx index 5baed54d790dd..795ff7ba2a47c 100644 --- a/src/app/settings/features/SettingList/index.tsx +++ b/src/app/settings/features/SettingList/index.tsx @@ -1,4 +1,4 @@ -import { Bot, Info, Mic2, Settings2, Webhook } from 'lucide-react'; +import { Bot, Cloudy, Info, Mic2, Settings2, Webhook } from 'lucide-react'; import Link from 'next/link'; import { memo } from 'react'; import { useTranslation } from 'react-i18next'; @@ -17,6 +17,7 @@ const SettingList = memo(({ activeTab, mobile }) => { const items = [ { icon: Settings2, label: t('tab.common'), value: SettingsTabs.Common }, + { icon: Cloudy, label: t('tab.sync'), value: SettingsTabs.Sync }, { icon: Webhook, label: t('tab.llm'), value: SettingsTabs.LLM }, { icon: Mic2, label: t('tab.tts'), value: SettingsTabs.TTS }, { icon: Bot, label: t('tab.agent'), value: SettingsTabs.Agent }, diff --git a/src/app/settings/sync/Sync.tsx b/src/app/settings/sync/Sync.tsx new file mode 100644 index 0000000000000..5964bfb8dfea5 --- /dev/null +++ b/src/app/settings/sync/Sync.tsx @@ -0,0 +1,49 @@ +import { SiWebrtc } from '@icons-pack/react-simple-icons'; +import { Form, type ItemGroup } from '@lobehub/ui'; +import { Form as AntForm, Input } from 'antd'; +import isEqual from 'fast-deep-equal'; +import { memo } from 'react'; +import { useTranslation } from 'react-i18next'; + +import { FORM_STYLE } from '@/const/layoutTokens'; +import { useGlobalStore } from '@/store/global'; +import { settingsSelectors } from '@/store/global/selectors'; + +import { useSyncSettings } from '../hooks/useSyncSettings'; + +type SettingItemGroup = ItemGroup; + +const Theme = memo(() => { + const { t } = useTranslation('setting'); + const [form] = AntForm.useForm(); + + const settings = useGlobalStore(settingsSelectors.currentSettings, isEqual); + const [setSettings] = useGlobalStore((s) => [s.setSettings]); + + useSyncSettings(form); + + const config: SettingItemGroup = { + children: [ + { + children: , + desc: t('sync.webrtc.channelName.desc'), + label: t('sync.webrtc.channelName.title'), + name: ['sync', 'channelName'], + }, + ], + icon: SiWebrtc, + title: t('sync.webrtc.title'), + }; + + return ( +
+ ); +}); + +export default Theme; diff --git a/src/app/settings/sync/layout.tsx b/src/app/settings/sync/layout.tsx new file mode 100644 index 0000000000000..4693b7724472e --- /dev/null +++ b/src/app/settings/sync/layout.tsx @@ -0,0 +1,9 @@ +import { PropsWithChildren } from 'react'; + +import { SettingsTabs } from '@/store/global/initialState'; + +import SettingLayout from '../layout.server'; + +export default ({ children }: PropsWithChildren) => { + return {children}; +}; diff --git a/src/app/settings/sync/page.tsx b/src/app/settings/sync/page.tsx new file mode 100644 index 0000000000000..5a863a9a4f061 --- /dev/null +++ b/src/app/settings/sync/page.tsx @@ -0,0 +1,18 @@ +'use client'; + +import { memo } from 'react'; +import { useTranslation } from 'react-i18next'; + +import PageTitle from '@/components/PageTitle'; + +import Sync from './Sync'; + +export default memo(() => { + const { t } = useTranslation('setting'); + return ( + <> + + + + ); +}); diff --git a/src/const/settings.ts b/src/const/settings.ts index 20d0d4f136d3e..c60be8b013794 100644 --- a/src/const/settings.ts +++ b/src/const/settings.ts @@ -6,6 +6,7 @@ import { GlobalDefaultAgent, GlobalLLMConfig, GlobalSettings, + GlobalSyncSettings, GlobalTTSConfig, } from '@/types/settings'; @@ -113,9 +114,14 @@ export const DEFAULT_TOOL_CONFIG = { }, }; +const DEFAULT_SYNC_CONFIG: GlobalSyncSettings = { + channelName: 'atc', +}; + export const DEFAULT_SETTINGS: GlobalSettings = { defaultAgent: DEFAULT_AGENT, languageModel: DEFAULT_LLM_CONFIG, + sync: DEFAULT_SYNC_CONFIG, tool: DEFAULT_TOOL_CONFIG, tts: DEFAULT_TTS_CONFIG, ...DEFAULT_BASE_SETTINGS, diff --git a/src/database/core/__tests__/model.test.ts b/src/database/core/__tests__/model.test.ts index b79c2a6d2a2f4..75be5d261da70 100644 --- a/src/database/core/__tests__/model.test.ts +++ b/src/database/core/__tests__/model.test.ts @@ -37,7 +37,7 @@ describe('BaseModel', () => { content: 'Hello, World!', }; - const result = await baseModel['_add'](validData); + const result = await baseModel['_addWithSync'](validData); expect(result).toHaveProperty('id'); expect(console.error).not.toHaveBeenCalled(); @@ -49,7 +49,7 @@ describe('BaseModel', () => { content: 'Hello, World!', }; - await expect(baseModel['_add'](invalidData)).rejects.toThrow(TypeError); + await expect(baseModel['_addWithSync'](invalidData)).rejects.toThrow(TypeError); }); }); }); diff --git a/src/database/core/db.ts b/src/database/core/db.ts index e059c6141ab69..6f292d9596b91 100644 --- a/src/database/core/db.ts +++ b/src/database/core/db.ts @@ -13,7 +13,7 @@ import { migrateSettingsToUser } from './migrations/migrateSettingsToUser'; import { dbSchemaV1, dbSchemaV2, dbSchemaV3, dbSchemaV4, dbSchemaV5, dbSchemaV6 } from './schemas'; import { DBModel, LOBE_CHAT_LOCAL_DB_NAME } from './types/db'; -interface LobeDBSchemaMap { +export interface LobeDBSchemaMap { files: DB_File; messages: DB_Message; plugins: DB_Plugin; diff --git a/src/database/core/index.ts b/src/database/core/index.ts index a344f4ab2fbc0..f686e56c091b0 100644 --- a/src/database/core/index.ts +++ b/src/database/core/index.ts @@ -1,2 +1,3 @@ export { LocalDBInstance } from './db'; export * from './model'; +export { syncBus } from './sync'; diff --git a/src/database/core/model.ts b/src/database/core/model.ts index 42d003b44c541..22edc55b02376 100644 --- a/src/database/core/model.ts +++ b/src/database/core/model.ts @@ -5,6 +5,7 @@ import { DBBaseFieldsSchema } from '@/database/core/types/db'; import { nanoid } from '@/utils/uuid'; import { LocalDB, LocalDBInstance, LocalDBSchema } from './db'; +import { syncBus } from './sync'; export class BaseModel { protected readonly db: LocalDB; @@ -21,10 +22,14 @@ export class BaseModel( + protected async _addWithSync( data: T, id: string | number = nanoid(), primaryKey: string = 'id', @@ -51,6 +56,9 @@ export class BaseModel { + await this.updateYMapItem(item.id); + }); + await Promise.all(pools); return { added: validatedData.length, @@ -144,7 +156,7 @@ export class BaseModel) { + protected async _updateWithSync(id: string, data: Partial) { // we need to check whether the data is valid // pick data related schema from the full schema const keys = Object.keys(data); @@ -162,6 +174,37 @@ export class BaseModel { + this.yMap.delete(id); + }); + } + + protected async _clearWithSync() { + const result = await this.table.clear(); + // sync clear data to yjs data map + this.yMap.clear(); + return result; + } + + private updateYMapItem = async (id: string) => { + const newData = await this.table.get(id); + this.yMap.set(id, newData); + }; } diff --git a/src/database/core/sync.ts b/src/database/core/sync.ts new file mode 100644 index 0000000000000..a4e7274fe71fb --- /dev/null +++ b/src/database/core/sync.ts @@ -0,0 +1,241 @@ +import { throttle } from 'lodash-es'; +import { WebrtcProvider } from 'y-webrtc'; +import { Doc } from 'yjs'; + +import { OnSyncEvent, OnSyncStatusChange, StartDataSyncParams } from '@/types/sync'; + +import { LobeDBSchemaMap, LocalDBInstance } from './db'; + +declare global { + interface Window { + __ONLY_USE_FOR_CLEANUP_IN_DEV?: WebrtcProvider | null; + } +} + +let ydoc: Doc; + +if (typeof window !== 'undefined') { + ydoc = new Doc(); +} + +class SyncBus { + private ydoc: Doc = ydoc; + private provider: WebrtcProvider | null = null; + + startDataSync = async (params: StartDataSyncParams) => { + // 开发时由于存在 fast refresh 全局实例会缓存在运行时中 + // 因此需要在每次重新连接时清理上一次的实例 + if (window.__ONLY_USE_FOR_CLEANUP_IN_DEV) { + this.cleanProvider(window.__ONLY_USE_FOR_CLEANUP_IN_DEV); + } + + this.connect(params); + }; + + connect = (params: StartDataSyncParams) => { + const { + channel, + onSyncEvent, + onSyncStatusChange, + user, + onAwarenessChange, + signaling = 'wss://y-webrtc-signaling.lobehub.com', + } = params; + console.log('[YJS] start to listen sync event...'); + this.initYjsObserve(onSyncEvent, onSyncStatusChange); + + console.log(`[WebRTC] init provider... room: ${channel.name}`); + // clients connected to the same room-name share document updates + this.provider = new WebrtcProvider(channel.name, this.ydoc, { + password: channel.password, + signaling: [signaling], + }); + + // 只在开发时解决全局实例缓存问题 + if (process.env.NODE_ENV === 'development') { + window.__ONLY_USE_FOR_CLEANUP_IN_DEV = this.provider; + } + + const provider = this.provider; + + console.log(`[WebRTC] provider init success`); + + // 当本地设备正确连接到 WebRTC Provider 后,触发 status 事件 + // 当开始连接,则开始监听事件 + provider.on('status', async ({ connected }) => { + console.log('[WebRTC] peer connected status:', connected); + if (connected) { + // this.initObserve(onSyncEvent, onSyncStatusChange); + onSyncStatusChange?.('ready'); + } + }); + + // provider.on('peers', (peers) => { + // console.log(peers); + // }); + + // 当各方的数据均完成同步后,YJS 对象之间的数据已经一致时,触发 synced 事件 + provider.on('synced', async ({ synced }) => { + console.log('[WebRTC] peer sync status:', synced); + if (synced) { + console.groupCollapsed('[WebRTC] start to init yjs data...'); + onSyncStatusChange?.('syncing'); + await this.initSync(); + onSyncStatusChange?.('synced'); + console.groupEnd(); + console.log('[WebRTC] yjs data init success'); + } else { + console.log('[WebRTC] sync failed,try to reconnect...'); + await this.reconnect(params); + } + }); + + const awareness = provider.awareness; + + awareness.setLocalState({ clientID: awareness.clientID, user }); + onAwarenessChange?.([{ ...user, clientID: awareness.clientID, current: true }]); + + awareness.on('change', () => { + const state = Array.from(awareness.getStates().values()).map((s) => ({ + ...s.user, + clientID: s.clientID, + current: s.clientID === awareness.clientID, + })); + + onAwarenessChange?.(state); + }); + + return provider; + }; + + reconnect = async (params: StartDataSyncParams) => { + this.cleanProvider(this.provider); + + this.connect(params); + }; + + private cleanProvider(provider: WebrtcProvider | null) { + if (provider) { + console.log(`[WebRTC] clean provider...`); + provider.room?.destroy(); + provider.awareness?.destroy(); + provider.destroy(); + console.log(`[WebRTC] clean success`); + console.log(`[WebRTC] -------------------`); + } + } + + manualSync = async () => { + console.log('[WebRTC] try to manual init sync...'); + await this.initSync(); + }; + + getYMap = (tableKey: keyof LobeDBSchemaMap) => { + return this.ydoc.getMap(tableKey); + }; + + private initSync = async () => { + await Promise.all( + ['sessions', 'sessionGroups', 'topics', 'messages', 'plugins'].map(async (tableKey) => + this.loadDataFromDBtoYjs(tableKey as keyof LobeDBSchemaMap), + ), + ); + }; + + private initYjsObserve = (onEvent: OnSyncEvent, onSyncStatusChange: OnSyncStatusChange) => { + ['sessions', 'sessionGroups', 'topics', 'messages', 'plugins'].forEach((tableKey) => { + // listen yjs change + this.observeYMapChange(tableKey as keyof LobeDBSchemaMap, onEvent, onSyncStatusChange); + }); + }; + + private observeYMapChange = ( + tableKey: keyof LobeDBSchemaMap, + onEvent: OnSyncEvent, + onSyncStatusChange: OnSyncStatusChange, + ) => { + const table = LocalDBInstance[tableKey]; + const yItemMap = this.getYMap(tableKey); + const updateSyncEvent = throttle(onEvent, 1000); + + // 定义一个变量来保存定时器的ID + // eslint-disable-next-line no-undef + let debounceTimer: NodeJS.Timeout; + + yItemMap.observe(async (event) => { + // abort local change + if (event.transaction.local) return; + + // 每次有变更时,都先清除之前的定时器(如果有的话),然后设置新的定时器 + clearTimeout(debounceTimer); + + onSyncStatusChange('syncing'); + + console.log(`[YJS] observe ${tableKey} changes:`, event.keysChanged.size); + const pools = Array.from(event.keys).map(async ([id, payload]) => { + const item: any = yItemMap.get(id); + + switch (payload.action) { + case 'add': + case 'update': { + const itemInTable = await table.get(id); + if (!itemInTable) { + await table.add(item, id); + } else { + await table.update(id, item); + } + break; + } + + case 'delete': { + await table.delete(id); + break; + } + } + }); + + await Promise.all(pools); + + updateSyncEvent(tableKey); + + // 设置定时器,500ms 后更新状态为'synced' + debounceTimer = setTimeout(() => { + onSyncStatusChange('synced'); + }, 1000); + }); + }; + + private loadDataFromDBtoYjs = async (tableKey: keyof LobeDBSchemaMap) => { + const table = LocalDBInstance[tableKey]; + const items = await table.toArray(); + const yItemMap = this.getYMap(tableKey); + + // 定义每批次最多包含的数据条数 + const batchSize = 50; + + // 计算总批次数 + const totalBatches = Math.ceil(items.length / batchSize); + + for (let i = 0; i < totalBatches; i++) { + // 计算当前批次的起始和结束索引 + const start = i * batchSize; + const end = start + batchSize; + + // 获取当前批次的数据 + const batchItems = items.slice(start, end); + + // 将当前批次的数据推送到 Yjs 中 + + this.ydoc.transact(() => { + batchItems.forEach((item) => { + // TODO: 需要改表,所有 table 都需要有 id 字段 + yItemMap.set(item.id || (item as any).identifier, item); + }); + }); + } + + console.log('[DB]:', tableKey, yItemMap.size); + }; +} + +export const syncBus = new SyncBus(); diff --git a/src/database/models/file.ts b/src/database/models/file.ts index d04766d560090..df742208f04d5 100644 --- a/src/database/models/file.ts +++ b/src/database/models/file.ts @@ -11,7 +11,7 @@ class _FileModel extends BaseModel<'files'> { async create(file: DB_File) { const id = nanoid(); - return this._add(file, `file-${id}`); + return this._addWithSync(file, `file-${id}`); } async findById(id: string) { diff --git a/src/database/models/message.ts b/src/database/models/message.ts index 143c71a3e2473..e57bcce0afadf 100644 --- a/src/database/models/message.ts +++ b/src/database/models/message.ts @@ -107,7 +107,7 @@ class _MessageModel extends BaseModel { const messageData: DB_Message = this.mapChatMessageToDBMessage(data as ChatMessage); - return this._add(messageData, id); + return this._addWithSync(messageData, id); } async batchCreate(messages: ChatMessage[]) { @@ -148,11 +148,11 @@ class _MessageModel extends BaseModel { // **************** Delete *************** // async delete(id: string) { - return this.table.delete(id); + return super._deleteWithSync(id); } async clearTable() { - return this.table.clear(); + return this._clearWithSync(); } /** @@ -178,13 +178,13 @@ class _MessageModel extends BaseModel { const messageIds = await query.primaryKeys(); // Use the bulkDelete method to delete all selected messages in bulk - return this.table.bulkDelete(messageIds); + return this._bulkDeleteWithSync(messageIds); } // **************** Update *************** // async update(id: string, data: DeepPartial) { - return this._update(id, data); + return super._updateWithSync(id, data); } async updatePluginState(id: string, key: string, value: any) { diff --git a/src/database/models/plugin.ts b/src/database/models/plugin.ts index a12042ec893ee..44f44a49607ad 100644 --- a/src/database/models/plugin.ts +++ b/src/database/models/plugin.ts @@ -12,7 +12,6 @@ class _PluginModel extends BaseModel { getList = async (): Promise => { return this.table.toArray(); }; - // **************** Create *************** // create = async (plugin: DB_Plugin) => { @@ -27,10 +26,10 @@ class _PluginModel extends BaseModel { // **************** Delete *************** // delete(id: string) { - return this.table.delete(id); + return this._deleteWithSync(id); } clear() { - return this.table.clear(); + return this._clearWithSync(); } // **************** Update *************** // diff --git a/src/database/models/session.ts b/src/database/models/session.ts index 43ba801731c42..e79e51028b3d6 100644 --- a/src/database/models/session.ts +++ b/src/database/models/session.ts @@ -173,7 +173,7 @@ class _SessionModel extends BaseModel { async create(type: 'agent' | 'group', defaultValue: Partial, id = uuid()) { const data = merge(DEFAULT_AGENT_LOBE_SESSION, { type, ...defaultValue }); const dataDB = this.mapToDB_Session(data); - return this._add(dataDB, id); + return this._addWithSync(dataDB, id); } async batchCreate(sessions: LobeAgentSession[]) { @@ -200,7 +200,7 @@ class _SessionModel extends BaseModel { const newSession = merge(session, { meta: { title: newTitle } }); - return this._add(newSession, uuid()); + return this._addWithSync(newSession, uuid()); } // **************** Delete *************** // @@ -225,7 +225,7 @@ class _SessionModel extends BaseModel { } // Finally, delete the session itself - await this.table.delete(id); + await this._deleteWithSync(id); }); } @@ -236,7 +236,7 @@ class _SessionModel extends BaseModel { // **************** Update *************** // async update(id: string, data: Partial) { - return this._update(id, data); + return super._updateWithSync(id, data); } async updatePinned(id: string, pinned: boolean) { diff --git a/src/database/models/sessionGroup.ts b/src/database/models/sessionGroup.ts index 01d3eeddbe1a7..ffce3c19e0cb5 100644 --- a/src/database/models/sessionGroup.ts +++ b/src/database/models/sessionGroup.ts @@ -42,7 +42,7 @@ class _SessionGroupModel extends BaseModel { // **************** Create *************** // async create(name: string, sort?: number, id = nanoid()) { - return this._add({ name, sort }, id); + return this._addWithSync({ name, sort }, id); } async batchCreate(groups: SessionGroups) { @@ -69,7 +69,7 @@ class _SessionGroupModel extends BaseModel { // **************** Update *************** // async update(id: string, data: Partial) { - return super._update(id, data); + return super._updateWithSync(id, data); } async updateOrder(sortMap: { id: string; sort: number }[]) { diff --git a/src/database/models/topic.ts b/src/database/models/topic.ts index addb36658bc1f..838314411e9a6 100644 --- a/src/database/models/topic.ts +++ b/src/database/models/topic.ts @@ -116,7 +116,10 @@ class _TopicModel extends BaseModel { // **************** Create *************** // async create({ title, favorite, sessionId, messages }: CreateTopicParams, id = nanoid()) { - const topic = await this._add({ favorite: favorite ? 1 : 0, sessionId, title: title }, id); + const topic = await this._addWithSync( + { favorite: favorite ? 1 : 0, sessionId, title: title }, + id, + ); // add topicId to these messages if (messages) { @@ -214,7 +217,7 @@ class _TopicModel extends BaseModel { // **************** Update *************** // async update(id: string, data: Partial) { - return this._update(id, data); + return super._updateWithSync(id, { ...data, updatedAt: Date.now() }); } async toggleFavorite(id: string, newState?: boolean) { diff --git a/src/hooks/useSyncData.ts b/src/hooks/useSyncData.ts new file mode 100644 index 0000000000000..e44660575782d --- /dev/null +++ b/src/hooks/useSyncData.ts @@ -0,0 +1,43 @@ +import { useCallback } from 'react'; + +import { useChatStore } from '@/store/chat'; +import { useGlobalStore } from '@/store/global'; +import { useSessionStore } from '@/store/session'; + +export const useSyncEvent = () => { + const [refreshMessages, refreshTopic] = useChatStore((s) => [s.refreshMessages, s.refreshTopic]); + const [refreshSessions] = useSessionStore((s) => [s.refreshSessions]); + + return useCallback((tableKey: string) => { + // console.log('triggerSync Event:', tableKey); + + switch (tableKey) { + case 'messages': { + refreshMessages(); + break; + } + + case 'topics': { + refreshTopic(); + break; + } + + case 'sessions': { + refreshSessions(); + break; + } + + default: { + break; + } + } + }, []); +}; + +export const useEnabledDataSync = () => { + const [userId, useEnabledSync] = useGlobalStore((s) => [s.userId, s.useEnabledSync]); + + const syncEvent = useSyncEvent(); + + useEnabledSync(userId, syncEvent); +}; diff --git a/src/layout/GlobalLayout/StoreHydration.tsx b/src/layout/GlobalLayout/StoreHydration.tsx index 1bb5087db523a..57b0fa47cc912 100644 --- a/src/layout/GlobalLayout/StoreHydration.tsx +++ b/src/layout/GlobalLayout/StoreHydration.tsx @@ -2,6 +2,7 @@ import { useResponsive } from 'antd-style'; import { useRouter } from 'next/navigation'; import { memo, useEffect } from 'react'; +import { useEnabledDataSync } from '@/hooks/useSyncData'; import { useGlobalStore } from '@/store/global'; import { useEffectAfterGlobalHydrated } from '@/store/global/hooks/useEffectAfterHydrated'; @@ -10,10 +11,13 @@ const StoreHydration = memo(() => { s.useFetchServerConfig, s.useFetchUserConfig, ]); + const { isLoading } = useFetchServerConfig(); useFetchUserConfig(!isLoading); + useEnabledDataSync(); + useEffect(() => { // refs: https://github.com/pmndrs/zustand/blob/main/docs/integrations/persisting-store-data.md#hashydrated useGlobalStore.persist.rehydrate(); @@ -46,6 +50,7 @@ const StoreHydration = memo(() => { router.prefetch('/market'); router.prefetch('/settings/common'); router.prefetch('/settings/agent'); + router.prefetch('/settings/sync'); }, [router]); return null; diff --git a/src/locales/default/setting.ts b/src/locales/default/setting.ts index ac9955eb75969..126f51873e27a 100644 --- a/src/locales/default/setting.ts +++ b/src/locales/default/setting.ts @@ -410,14 +410,28 @@ export default { placeholder: '请输入助手的标识符,需要是唯一的,比如 web-development', tooltips: '分享到助手市场', }, + sync: { + webrtc: { + channelName: { + desc: 'WebRTC 将使用此名创建同步频道,确保频道名称唯一', + placeholder: '请输入同步频道名称', + title: '同步频道名称', + }, + channelPassword: { + desc: '请输入一个同步频道密码', + title: '同步频道密码', + }, + title: 'WebRTC 同步', + }, + }, tab: { about: '关于', agent: '默认助手', common: '通用设置', llm: '语言模型', + sync: '云端同步', tts: '语音服务', }, - tools: { builtins: { groupName: '内置插件', diff --git a/src/services/global.ts b/src/services/global.ts index 12d6618c7de4f..7d6324414a1ea 100644 --- a/src/services/global.ts +++ b/src/services/global.ts @@ -1,4 +1,6 @@ +import { syncBus } from '@/database/core'; import { GlobalServerConfig } from '@/types/settings'; +import { StartDataSyncParams } from '@/types/sync'; import { API_ENDPOINTS } from './_url'; @@ -20,6 +22,21 @@ class GlobalService { return res.json(); }; + + enabledSync = async (params: StartDataSyncParams) => { + if (typeof window === 'undefined') return false; + + await syncBus.startDataSync(params); + return true; + }; + + reconnect = async (params: StartDataSyncParams) => { + if (typeof window === 'undefined') return false; + + await syncBus.reconnect(params); + await syncBus.manualSync(); + return true; + }; } export const globalService = new GlobalService(); diff --git a/src/store/chat/slices/topic/action.ts b/src/store/chat/slices/topic/action.ts index 319f9985f1cbb..a2b1b7ff6bfdd 100644 --- a/src/store/chat/slices/topic/action.ts +++ b/src/store/chat/slices/topic/action.ts @@ -9,6 +9,7 @@ import { StateCreator } from 'zustand/vanilla'; import { chainSummaryTitle } from '@/chains/summaryTitle'; import { LOADING_FLAT } from '@/const/message'; import { TraceNameMap } from '@/const/trace'; +import { useClientDataSWR } from '@/libs/swr'; import { chatService } from '@/services/chat'; import { messageService } from '@/services/message'; import { topicService } from '@/services/topic'; @@ -22,6 +23,9 @@ import { topicSelectors } from './selectors'; const n = setNamespace('topic'); +const SWR_USE_FETCH_TOPIC = 'SWR_USE_FETCH_TOPIC'; +const SWR_USE_SEARCH_TOPIC = 'SWR_USE_SEARCH_TOPIC'; + export interface ChatTopicAction { favoriteTopic: (id: string, favState: boolean) => Promise; openNewTopicOrSaveTopic: () => Promise; @@ -141,18 +145,25 @@ export const chatTopic: StateCreator< }, // query useFetchTopics: (sessionId) => - useSWR(sessionId, async (sessionId) => topicService.getTopics({ sessionId }), { - onSuccess: (topics) => { - set({ topics, topicsInit: true }, false, n('useFetchTopics(success)', { sessionId })); + useClientDataSWR( + [SWR_USE_FETCH_TOPIC, sessionId], + async ([, sessionId]: [string, string]) => topicService.getTopics({ sessionId }), + { + onSuccess: (topics) => { + set({ topics, topicsInit: true }, false, n('useFetchTopics(success)', { sessionId })); + }, }, - dedupingInterval: 0, - }), + ), useSearchTopics: (keywords) => - useSWR(keywords, topicService.searchTopics, { - onSuccess: (data) => { - set({ searchTopics: data }, false, n('useSearchTopics(success)', { keywords })); + useSWR( + [SWR_USE_SEARCH_TOPIC, keywords], + ([, keywords]: [string, string]) => topicService.searchTopics(keywords), + { + onSuccess: (data) => { + set({ searchTopics: data }, false, n('useSearchTopics(success)', { keywords })); + }, }, - }), + ), switchTopic: async (id) => { set({ activeTopicId: id }, false, n('toggleTopic')); @@ -213,6 +224,6 @@ export const chatTopic: StateCreator< set({ topicLoadingId: id }, false, n('updateTopicLoading')); }, refreshTopic: async () => { - await mutate(get().activeId); + await mutate([SWR_USE_FETCH_TOPIC, get().activeId]); }, }); diff --git a/src/store/global/slices/common/action.ts b/src/store/global/slices/common/action.ts index e7d52de0a0352..20421b03e7721 100644 --- a/src/store/global/slices/common/action.ts +++ b/src/store/global/slices/common/action.ts @@ -11,7 +11,9 @@ import { messageService } from '@/services/message'; import { UserConfig, userService } from '@/services/user'; import type { GlobalStore } from '@/store/global'; import type { GlobalServerConfig, GlobalSettings } from '@/types/settings'; +import { OnSyncEvent } from '@/types/sync'; import { merge } from '@/utils/merge'; +import { browserInfo } from '@/utils/platform'; import { setNamespace } from '@/utils/storeDebug'; import { switchLang } from '@/utils/switchLang'; @@ -24,11 +26,13 @@ const n = setNamespace('common'); * 设置操作 */ export interface CommonAction { + refreshConnection: (onEvent: OnSyncEvent) => Promise; refreshUserConfig: () => Promise; switchBackToChat: (sessionId?: string) => void; updateAvatar: (avatar: string) => Promise; useCheckLatestVersion: () => SWRResponse; useCheckTrace: (shouldFetch: boolean) => SWRResponse; + useEnabledSync: (userId: string | undefined, onEvent: OnSyncEvent) => SWRResponse; useFetchServerConfig: () => SWRResponse; useFetchUserConfig: (initServer: boolean) => SWRResponse; } @@ -41,13 +45,38 @@ export const createCommonSlice: StateCreator< [], CommonAction > = (set, get) => ({ + refreshConnection: async (onEvent) => { + const sync = settingsSelectors.syncConfig(get()); + if (!sync.channelName) return; + + await globalService.reconnect({ + channel: { + name: sync.channelName, + password: sync.channelPassword, + }, + onAwarenessChange(state) { + set({ syncAwareness: state }); + }, + onSyncEvent: onEvent, + onSyncStatusChange: (status) => { + set({ syncStatus: status }); + }, + signaling: sync.signaling, + user: { + // name: userId, + id: get().userId!, + ...browserInfo, + }, + }); + }, + refreshUserConfig: async () => { await mutate([USER_CONFIG_FETCH_KEY, true]); }, + switchBackToChat: (sessionId) => { get().router?.push(SESSION_CHAT_URL(sessionId || INBOX_SESSION_ID, get().isMobile)); }, - updateAvatar: async (avatar) => { await userService.updateAvatar(avatar); await get().refreshUserConfig(); @@ -78,6 +107,42 @@ export const createCommonSlice: StateCreator< revalidateOnFocus: false, }, ), + useEnabledSync: (userId, onEvent) => + useSWR( + ['enableSync', userId], + async () => { + if (!userId) return false; + + const sync = settingsSelectors.syncConfig(get()); + if (!sync.channelName) return false; + + return globalService.enabledSync({ + channel: { + name: sync.channelName, + password: sync.channelPassword, + }, + onAwarenessChange(state) { + set({ syncAwareness: state }); + }, + onSyncEvent: onEvent, + onSyncStatusChange: (status) => { + set({ syncStatus: status }); + }, + signaling: sync.signaling, + user: { + // name: userId, + id: userId, + ...browserInfo, + }, + }); + }, + { + onSuccess: (syncEnabled) => { + set({ syncEnabled }); + }, + revalidateOnFocus: false, + }, + ), useFetchServerConfig: () => useSWR('fetchGlobalConfig', globalService.getGlobalConfig, { onSuccess: (data) => { diff --git a/src/store/global/slices/common/initialState.ts b/src/store/global/slices/common/initialState.ts index dcdb2b828e3bc..72c31bf20e0e4 100644 --- a/src/store/global/slices/common/initialState.ts +++ b/src/store/global/slices/common/initialState.ts @@ -1,5 +1,7 @@ import { AppRouterInstance } from 'next/dist/shared/lib/app-router-context.shared-runtime'; +import { PeerSyncStatus, SyncAwarenessState } from '@/types/sync'; + export enum SidebarTabKey { Chat = 'chat', Market = 'market', @@ -11,6 +13,7 @@ export enum SettingsTabs { Agent = 'agent', Common = 'common', LLM = 'llm', + Sync = 'sync', TTS = 'tts', } @@ -25,9 +28,15 @@ export interface GlobalCommonState { latestVersion?: string; router?: AppRouterInstance; sidebarKey: SidebarTabKey; + syncAwareness: SyncAwarenessState[]; + syncEnabled: boolean; + syncStatus: PeerSyncStatus; } export const initialCommonState: GlobalCommonState = { isMobile: false, sidebarKey: SidebarTabKey.Chat, + syncAwareness: [], + syncEnabled: false, + syncStatus: 'ready', }; diff --git a/src/store/global/slices/settings/selectors/settings.ts b/src/store/global/slices/settings/selectors/settings.ts index aa7fa80ea8553..9ff1f656c9377 100644 --- a/src/store/global/slices/settings/selectors/settings.ts +++ b/src/store/global/slices/settings/selectors/settings.ts @@ -44,6 +44,8 @@ const currentLanguage = (s: GlobalStore) => { const dalleConfig = (s: GlobalStore) => currentSettings(s).tool?.dalle || {}; const isDalleAutoGenerating = (s: GlobalStore) => currentSettings(s).tool?.dalle?.autoGenerate; +const syncConfig = (s: GlobalStore) => currentSettings(s).sync; + export const settingsSelectors = { currentLanguage, currentSettings, @@ -55,4 +57,5 @@ export const settingsSelectors = { exportSettings, isDalleAutoGenerating, password, + syncConfig, }; diff --git a/src/types/settings/index.ts b/src/types/settings/index.ts index 3a931761dd26a..66c821da0eb58 100644 --- a/src/types/settings/index.ts +++ b/src/types/settings/index.ts @@ -28,12 +28,19 @@ export interface GlobalServerConfig { }; } +export interface GlobalSyncSettings { + channelName?: string; + channelPassword?: string; + signaling?: string; +} + /** * 配置设置 */ export interface GlobalSettings extends GlobalBaseSettings { defaultAgent: GlobalDefaultAgent; languageModel: GlobalLLMConfig; + sync: GlobalSyncSettings; tool: GlobalTool; tts: GlobalTTSConfig; } diff --git a/src/types/sync.ts b/src/types/sync.ts new file mode 100644 index 0000000000000..4d5bfed108fc9 --- /dev/null +++ b/src/types/sync.ts @@ -0,0 +1,32 @@ +import { LobeDBSchemaMap } from '@/database/core/db'; + +export type OnSyncEvent = (tableKey: keyof LobeDBSchemaMap) => void; +export type OnSyncStatusChange = (status: PeerSyncStatus) => void; + +export type PeerSyncStatus = 'syncing' | 'synced' | 'ready'; + +export interface StartDataSyncParams { + channel: { + name: string; + password?: string; + }; + onAwarenessChange: (state: SyncAwarenessState[]) => void; + onSyncEvent: OnSyncEvent; + onSyncStatusChange: OnSyncStatusChange; + signaling?: string; + user: SyncUserInfo; +} + +export interface SyncUserInfo { + browser?: string; + device?: string; + id: string; + isMobile: boolean; + name?: string; + os?: string; +} + +export interface SyncAwarenessState extends SyncUserInfo { + clientID: number; + current: boolean; +} diff --git a/src/utils/platform.ts b/src/utils/platform.ts index 44e5e8d16b119..0a5eb87044891 100644 --- a/src/utils/platform.ts +++ b/src/utils/platform.ts @@ -1,6 +1,6 @@ import UAParser from 'ua-parser-js'; -const getPaser = () => { +const getParser = () => { if (typeof window === 'undefined') return new UAParser('Node'); let ua = navigator.userAgent; @@ -8,11 +8,18 @@ const getPaser = () => { }; export const getPlatform = () => { - return getPaser().getOS().name; + return getParser().getOS().name; }; export const getBrowser = () => { - return getPaser().getResult().browser.name; + return getParser().getResult().browser.name; +}; + +export const browserInfo = { + browser: getBrowser(), + device: getParser().getDevice().vendor, + isMobile: getParser().getDevice().type === 'mobile', + os: getParser().getOS().name, }; export const isMacOS = () => getPlatform() === 'Mac OS';